323 lines
13 KiB
Python
323 lines
13 KiB
Python
import functools
|
||
import pathlib
|
||
import os
|
||
import time
|
||
import re
|
||
from typing import Self
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
from selenium.webdriver import Keys, ActionChains
|
||
from selenium.webdriver.common.by import By
|
||
from webdriver_manager.chrome import ChromeDriverManager
|
||
from selenium import webdriver
|
||
from selenium.webdriver.support.wait import WebDriverWait
|
||
from selenium.webdriver.support import expected_conditions as EC
|
||
from selenium.common.exceptions import NoSuchElementException, TimeoutException
|
||
from excel_parser import ExcelParser
|
||
from storage import Storage
|
||
|
||
from telegram_logs import logger
|
||
|
||
import dotenv
|
||
import asyncio
|
||
import threading
|
||
from webserver import dp
|
||
|
||
from aiogram.filters import CommandStart
|
||
from aiogram.types import Message, ReplyKeyboardMarkup, KeyboardButton
|
||
|
||
from webserver import bot
|
||
|
||
dotenv.load_dotenv('.env')
|
||
IS_PROD = os.environ.get('PROD_ENV') == '1'
|
||
|
||
|
||
class Parser:
|
||
keyword = "Велесстрой"
|
||
url = "https://www.b2b-center.ru/market"
|
||
|
||
_driver: webdriver.Chrome
|
||
_options: webdriver.ChromeOptions = webdriver.ChromeOptions()
|
||
_service: webdriver.ChromeService
|
||
|
||
@staticmethod
|
||
def parser_alive(func):
|
||
@functools.wraps(func)
|
||
def wrapper(self, *args, **kwargs):
|
||
if not PARSER_ALIVE:
|
||
raise KeyboardInterrupt("Бот остановлен по запросу")
|
||
return func(self, *args, **kwargs)
|
||
return wrapper
|
||
|
||
def __init__(self):
|
||
logger.info("Бот запущен")
|
||
prefs = {"download.default_directory": str(pathlib.Path('./downloads').absolute())}
|
||
|
||
self._options.add_experimental_option("prefs", prefs)
|
||
if IS_PROD:
|
||
self._options.add_argument("--disable-extensions")
|
||
self._options.add_argument("--disable-gpu")
|
||
self._options.add_argument("window-size=1920,1080")
|
||
self._options.add_argument("--headless=new")
|
||
self._service = webdriver.ChromeService(executable_path=ChromeDriverManager().install())
|
||
|
||
self.storage = Storage()
|
||
self.storage.create_tables()
|
||
|
||
def __enter__(self) -> Self:
|
||
self._driver = webdriver.Chrome(service=self._service, options=self._options)
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
if exc_val is not None:
|
||
logger.error("Бот остановлен. Причина: " + str(exc_val))
|
||
print("Gracefully shutting down...")
|
||
|
||
self._driver.close()
|
||
|
||
def find_elem(self, by: str, value: str):
|
||
try:
|
||
return WebDriverWait(self._driver, 10).until(EC.presence_of_element_located((by, value)))
|
||
except TimeoutException:
|
||
raise NoSuchElementException("Element not found")
|
||
|
||
def login(self):
|
||
self._driver.get(self.url)
|
||
|
||
time.sleep(3)
|
||
# Open login modal
|
||
element = self.find_elem(By.ID, "auth_ajax_modal_trigger")
|
||
element.click()
|
||
|
||
login_control = self.find_elem(By.ID, "login_control")
|
||
password_control = self.find_elem(By.ID, "password_control")
|
||
|
||
login_control.click()
|
||
login_control.clear()
|
||
login_control.send_keys("solo@jde.ru")
|
||
|
||
password_control.click()
|
||
password_control.clear()
|
||
password_control.send_keys("Tutu@2024")
|
||
time.sleep(5)
|
||
password_control.send_keys(Keys.RETURN)
|
||
|
||
def check_ltl(self, text):
|
||
return len(re.findall(r"[Ll][Tt][Ll]", text)) != 0
|
||
|
||
@parser_alive
|
||
def search(self):
|
||
time.sleep(5)
|
||
self._driver.get(self.url + f'/?f_keyword={self.keyword}')
|
||
time.sleep(10)
|
||
table = self.find_elem(By.CSS_SELECTOR, ".search-results > tbody")
|
||
links = table.find_elements(By.CSS_SELECTOR, "tr > td:nth-child(1) > a")
|
||
|
||
links = [link.get_attribute("href") for link in links if
|
||
self.check_ltl(link.find_element(By.CSS_SELECTOR, 'div').text)]
|
||
links = [link for link in links if
|
||
f"https://www.b2b-center.ru/market/view.html?id=" + parse_qs(urlparse(link).query).get('id')[
|
||
0] not in self.storage.get_links()]
|
||
|
||
for link in links:
|
||
if PARSER_ALIVE is False:
|
||
raise KeyboardInterrupt("Бот остановлен по запросу")
|
||
logger.info("Обработка заявки: " + link)
|
||
try:
|
||
self.accept_documentation(link)
|
||
except Exception as exc:
|
||
logger.error("Не удалось обработать заявку. Подробности: " + str(exc))
|
||
logger.info("Все LTL заявки обработаны, обновление через 60сек")
|
||
|
||
@parser_alive
|
||
def parse(self, url: str = None) -> dict:
|
||
fp = self.download_documentation()
|
||
e_parser = ExcelParser(fp, url)
|
||
price = e_parser.calculate()
|
||
if not price:
|
||
logger.error("Не удалось расcчитать цену, переходим далее")
|
||
|
||
return price
|
||
|
||
@parser_alive
|
||
def accept_documentation(self, url: str):
|
||
time.sleep(3)
|
||
self._driver.get(url)
|
||
|
||
# Скачать документацию
|
||
try:
|
||
download_documentation_button = self.find_elem(By.CSS_SELECTOR,
|
||
'#auction_info_td > table > tbody > tr:nth-child(4) table input[type=submit]')
|
||
time.sleep(5)
|
||
ActionChains(self._driver).scroll_to_element(download_documentation_button).scroll_by_amount(0,
|
||
100).perform()
|
||
download_documentation_button.click()
|
||
|
||
price = self.parse(url)
|
||
self.send_offer_link(price['price'], nds=price['vat'],
|
||
delivery_time=price['transport_delivery_date'],
|
||
delivery_range=price['max_days'])
|
||
|
||
except NoSuchElementException:
|
||
# logger.info("Отсутствует кнопка скачивания документации, переходим к документации")
|
||
|
||
price = self.parse(url)
|
||
self.send_offer_link(price['price'], nds=price['vat'],
|
||
delivery_time=price['transport_delivery_date'],
|
||
delivery_range=price['max_days'])
|
||
|
||
@parser_alive
|
||
def download_documentation(self) -> list[pathlib.Path]:
|
||
try:
|
||
all_files_1 = set(
|
||
pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2])
|
||
|
||
time.sleep(5)
|
||
documentation_block = self.find_elem(By.CSS_SELECTOR, '#download_documentation')
|
||
docs = documentation_block.find_elements(By.CSS_SELECTOR, 'a')
|
||
for doc in docs:
|
||
href = doc.get_attribute('href')
|
||
if not href.endswith('.xlsx') and not href.endswith('.xls'):
|
||
continue
|
||
self._driver.get(href)
|
||
time.sleep(3)
|
||
|
||
all_files_2 = set(
|
||
pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2])
|
||
|
||
fp = all_files_2 - all_files_1
|
||
logger.debug(fp)
|
||
|
||
return [file for file in fp]
|
||
except Exception as exc:
|
||
logger.info(f"Не удалось скачать документацию. Подробности: {type(exc)} {str(exc)}")
|
||
raise KeyboardInterrupt()
|
||
|
||
@parser_alive
|
||
def send_offer_link(self, price: int, nds: int, delivery_range: str, delivery_time: str):
|
||
try:
|
||
logger.info(
|
||
f"Предварительные данные по заявке: Цена: {price}, НДС: {nds}%, Доставка: {delivery_range} дн., Подача машины {delivery_time}")
|
||
offer_link = self.find_elem(By.ID, "send_offer_link")
|
||
ActionChains(self._driver).scroll_to_element(offer_link).scroll_by_amount(0, 100).perform()
|
||
offer_link.click()
|
||
|
||
price_id = self.find_elem(By.ID, 'price_id')
|
||
price_id.send_keys(str(price))
|
||
nds_elem = self.find_elem(By.CSS_SELECTOR, 'input[name="AUCTION_OFFER[price_vat]"]')
|
||
|
||
if nds == 0:
|
||
self.find_elem(By.ID, 'price_lot_no_tax').click()
|
||
else:
|
||
nds_elem.send_keys(str(nds))
|
||
|
||
row_11 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_11 > td:nth-child(2) > textarea")
|
||
row_12 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_12 > td:nth-child(2) > textarea")
|
||
row_13 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_13 > td:nth-child(2) > textarea")
|
||
row_15 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_15 > td:nth-child(2) > input[type=submit]")
|
||
|
||
row_11.send_keys(str(delivery_range) + " дней")
|
||
row_12.send_keys(delivery_time)
|
||
row_13.send_keys(
|
||
"Дата подачи транспорта предварительная, согласовывается по звонку оператора ЖДЭ клиенту-отправителю")
|
||
time.sleep(10)
|
||
|
||
ActionChains(self._driver).scroll_to_element(row_15).scroll_by_amount(0, 100).perform()
|
||
row_15.click()
|
||
time.sleep(10)
|
||
|
||
if IS_PROD:
|
||
self.apply_offer()
|
||
logger.success("Заявка успешно отправлена")
|
||
time.sleep(10)
|
||
except Exception as exc:
|
||
logger.error(f"Не удалось отправить заявку. Ошибка: {type(exc)} : {str(exc)}")
|
||
|
||
@parser_alive
|
||
def apply_offer(self):
|
||
btn = self.find_elem(By.CSS_SELECTOR, "form[name=AUCTION_OFFER] table.form-control_table button")
|
||
ActionChains(self._driver).scroll_to_element(btn).scroll_by_amount(0, 100).perform()
|
||
btn.click()
|
||
|
||
pin = self.find_elem(By.CSS_SELECTOR, ".prt-ecp_sign input")
|
||
pin.send_keys("5732")
|
||
|
||
sign_btn = self.find_elem(By.CSS_SELECTOR, ".prt-ecp_sign button.btn-primary")
|
||
time.sleep(1)
|
||
sign_btn.click()
|
||
time.sleep(10)
|
||
|
||
|
||
PARSER_ALIVE = True
|
||
|
||
|
||
def parse_runner():
|
||
while True:
|
||
with Parser() as parser:
|
||
parser.login()
|
||
while PARSER_ALIVE:
|
||
parser.search()
|
||
time.sleep(60)
|
||
|
||
|
||
parser_thread = threading.Thread(target=parse_runner, daemon=True)
|
||
|
||
|
||
@dp.message(CommandStart())
|
||
async def start_handler(message: Message):
|
||
s = Storage()
|
||
if message.from_user.id not in s.get_users():
|
||
await message.answer("Вы не зарегистрированы, обратитесь к администратору")
|
||
return
|
||
markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]])
|
||
await message.answer(f"Hello, {message.from_user.full_name}!", reply_markup=markup)
|
||
|
||
|
||
@dp.message()
|
||
async def message_handler(message: Message):
|
||
global PARSER_ALIVE, parser_thread
|
||
s = Storage()
|
||
if str(message.from_user.id) not in s.get_users():
|
||
await message.answer("Вы не зарегистрированы, обратитесь к администратору")
|
||
return
|
||
|
||
markup_start = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]])
|
||
markup_shutdown = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Остановить Бот")]])
|
||
if message.text == "Запустить Бот":
|
||
if PARSER_ALIVE:
|
||
await message.answer("Бот уже запущен", reply_markup=markup_shutdown)
|
||
return
|
||
PARSER_ALIVE = True
|
||
for chat_id in s.get_users():
|
||
await bot.send_message(chat_id,
|
||
f"Пользователь {message.from_user.full_name} запускает бот",
|
||
reply_markup=markup_shutdown)
|
||
if not parser_thread.is_alive():
|
||
parser_thread.start()
|
||
return
|
||
if message.text == "Остановить Бот":
|
||
if not PARSER_ALIVE:
|
||
await message.answer("Бот уже остановлен", reply_markup=markup_start)
|
||
for chat_id in s.get_users():
|
||
await bot.send_message(chat_id,
|
||
f"Пользователь {message.from_user.full_name} остановил бот",
|
||
reply_markup=markup_start)
|
||
PARSER_ALIVE = False
|
||
return
|
||
await message.answer("Неизвестная команда")
|
||
|
||
|
||
async def main():
|
||
storage = Storage()
|
||
markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]])
|
||
for chat_id in storage.get_users():
|
||
await bot.send_message(chat_id,
|
||
"Контроллер запущен, бот ожидает включения",
|
||
reply_markup=markup,
|
||
disable_notification=True)
|
||
await dp.start_polling(bot)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|
||
|