import functools import pathlib import os import time import re from typing import Self from urllib.parse import urlparse, parse_qs from selenium.webdriver import Keys, ActionChains from selenium.webdriver.common.by import By from webdriver_manager.chrome import ChromeDriverManager from selenium import webdriver from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import NoSuchElementException, TimeoutException from excel_parser import ExcelParser from storage import Storage from telegram_logs import logger import dotenv import asyncio import threading from webserver import dp from aiogram.filters import CommandStart from aiogram.types import Message, ReplyKeyboardMarkup, KeyboardButton from webserver import bot dotenv.load_dotenv('.env') IS_PROD = os.environ.get('PROD_ENV') == '1' class Parser: keyword = "Велесстрой" url = "https://www.b2b-center.ru/market" _driver: webdriver.Chrome _options: webdriver.ChromeOptions = webdriver.ChromeOptions() _service: webdriver.ChromeService @staticmethod def parser_alive(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): if not PARSER_ALIVE: raise KeyboardInterrupt("Бот остановлен по запросу") return func(self, *args, **kwargs) return wrapper def __init__(self): logger.info("Бот запущен") prefs = {"download.default_directory": str(pathlib.Path('./downloads').absolute())} self._options.add_experimental_option("prefs", prefs) if IS_PROD: self._options.add_argument("--disable-extensions") self._options.add_argument("--disable-gpu") self._options.add_argument("window-size=1920,1080") self._options.add_argument("--headless=new") self._service = webdriver.ChromeService(executable_path=ChromeDriverManager().install()) self.storage = Storage() self.storage.create_tables() def __enter__(self) -> Self: self._driver = webdriver.Chrome(service=self._service, options=self._options) return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val is not None: logger.error("Бот остановлен. Причина: " + str(exc_val)) print("Gracefully shutting down...") self._driver.close() def find_elem(self, by: str, value: str): try: return WebDriverWait(self._driver, 10).until(EC.presence_of_element_located((by, value))) except TimeoutException: raise NoSuchElementException("Element not found") def login(self): self._driver.get(self.url) time.sleep(3) # Open login modal element = self.find_elem(By.ID, "auth_ajax_modal_trigger") element.click() login_control = self.find_elem(By.ID, "login_control") password_control = self.find_elem(By.ID, "password_control") login_control.click() login_control.clear() login_control.send_keys("solo@jde.ru") password_control.click() password_control.clear() password_control.send_keys("Tutu@2024") time.sleep(5) password_control.send_keys(Keys.RETURN) def check_ltl(self, text): return len(re.findall(r"[Ll][Tt][Ll]", text)) != 0 @parser_alive def search(self): time.sleep(5) self._driver.get(self.url + f'/?f_keyword={self.keyword}') time.sleep(10) table = self.find_elem(By.CSS_SELECTOR, ".search-results > tbody") links = table.find_elements(By.CSS_SELECTOR, "tr > td:nth-child(1) > a") links = [link.get_attribute("href") for link in links if self.check_ltl(link.find_element(By.CSS_SELECTOR, 'div').text)] links = [link for link in links if f"https://www.b2b-center.ru/market/view.html?id=" + parse_qs(urlparse(link).query).get('id')[ 0] not in self.storage.get_links()] for link in links: if PARSER_ALIVE is False: raise KeyboardInterrupt("Бот остановлен по запросу") logger.info("Обработка заявки: " + link) try: self.accept_documentation(link) except Exception as exc: logger.error("Не удалось обработать заявку. Подробности: " + str(exc)) logger.info("Все LTL заявки обработаны, обновление через 60сек") @parser_alive def parse(self, url: str = None) -> dict: fp = self.download_documentation() e_parser = ExcelParser(fp, url) price = e_parser.calculate() if not price: logger.error("Не удалось расcчитать цену, переходим далее") return price @parser_alive def accept_documentation(self, url: str): time.sleep(3) self._driver.get(url) # Скачать документацию try: download_documentation_button = self.find_elem(By.CSS_SELECTOR, '#auction_info_td > table > tbody > tr:nth-child(4) table input[type=submit]') time.sleep(5) ActionChains(self._driver).scroll_to_element(download_documentation_button).scroll_by_amount(0, 100).perform() download_documentation_button.click() price = self.parse(url) self.send_offer_link(price['price'], nds=price['vat'], delivery_time=price['transport_delivery_date'], delivery_range=price['max_days']) except NoSuchElementException: # logger.info("Отсутствует кнопка скачивания документации, переходим к документации") price = self.parse(url) self.send_offer_link(price['price'], nds=price['vat'], delivery_time=price['transport_delivery_date'], delivery_range=price['max_days']) @parser_alive def download_documentation(self) -> list[pathlib.Path]: try: all_files_1 = set( pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2]) time.sleep(5) documentation_block = self.find_elem(By.CSS_SELECTOR, '#download_documentation') docs = documentation_block.find_elements(By.CSS_SELECTOR, 'a') for doc in docs: href = doc.get_attribute('href') if not href.endswith('.xlsx') and not href.endswith('.xls'): continue self._driver.get(href) time.sleep(3) all_files_2 = set( pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2]) fp = all_files_2 - all_files_1 logger.debug(fp) return [file for file in fp] except Exception as exc: logger.info(f"Не удалось скачать документацию. Подробности: {type(exc)} {str(exc)}") raise KeyboardInterrupt() @parser_alive def send_offer_link(self, price: int, nds: int, delivery_range: str, delivery_time: str): try: logger.info( f"Предварительные данные по заявке: Цена: {price}, НДС: {nds}%, Доставка: {delivery_range} дн., Подача машины {delivery_time}") offer_link = self.find_elem(By.ID, "send_offer_link") ActionChains(self._driver).scroll_to_element(offer_link).scroll_by_amount(0, 100).perform() offer_link.click() price_id = self.find_elem(By.ID, 'price_id') price_id.send_keys(str(price)) nds_elem = self.find_elem(By.CSS_SELECTOR, 'input[name="AUCTION_OFFER[price_vat]"]') if nds == 0: self.find_elem(By.ID, 'price_lot_no_tax').click() else: nds_elem.send_keys(str(nds)) row_11 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_11 > td:nth-child(2) > textarea") row_12 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_12 > td:nth-child(2) > textarea") row_13 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_13 > td:nth-child(2) > textarea") row_15 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_15 > td:nth-child(2) > input[type=submit]") row_11.send_keys(str(delivery_range) + " дней") row_12.send_keys(delivery_time) row_13.send_keys( "Дата подачи транспорта предварительная, согласовывается по звонку оператора ЖДЭ клиенту-отправителю") time.sleep(10) ActionChains(self._driver).scroll_to_element(row_15).scroll_by_amount(0, 100).perform() row_15.click() time.sleep(10) if IS_PROD: self.apply_offer() logger.success("Заявка успешно отправлена") time.sleep(10) except Exception as exc: logger.error(f"Не удалось отправить заявку. Ошибка: {type(exc)} : {str(exc)}") @parser_alive def apply_offer(self): btn = self.find_elem(By.CSS_SELECTOR, "form[name=AUCTION_OFFER] table.form-control_table button") ActionChains(self._driver).scroll_to_element(btn).scroll_by_amount(0, 100).perform() btn.click() pin = self.find_elem(By.CSS_SELECTOR, ".prt-ecp_sign input") pin.send_keys("5732") sign_btn = self.find_elem(By.CSS_SELECTOR, ".prt-ecp_sign button.btn-primary") time.sleep(1) sign_btn.click() time.sleep(10) PARSER_ALIVE = True def parse_runner(): with Parser() as parser: parser.login() while True: if PARSER_ALIVE: parser.search() time.sleep(60) else: time.sleep(1) parser_thread = threading.Thread(target=parse_runner, daemon=True) @dp.message(CommandStart()) async def start_handler(message: Message): s = Storage() if message.from_user.id not in s.get_users(): await message.answer("Вы не зарегистрированы, обратитесь к администратору") return markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]]) await message.answer(f"Hello, {message.from_user.full_name}!", reply_markup=markup) @dp.message() async def message_handler(message: Message): global PARSER_ALIVE s = Storage() if str(message.from_user.id) not in s.get_users(): await message.answer("Вы не зарегистрированы, обратитесь к администратору") return markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]]) if message.text == "Запустить Бот": PARSER_ALIVE = True for chat_id in s.get_users(): await bot.send_message(chat_id, f"Пользователь {message.from_user.full_name} запускает бот", reply_markup=markup) parser_thread.start() return if message.text == "Остановить Бот": for chat_id in s.get_users(): await bot.send_message(chat_id, f"Пользователь {message.from_user.full_name} остановил бот", reply_markup=markup) PARSER_ALIVE = False await message.answer("Бот остановлен", reply_markup=markup) return await message.answer("Неизвестная команда") async def main(): storage = Storage() markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]]) for chat_id in storage.get_users(): await bot.send_message(chat_id, "Контроллер запущен, бот ожидает включения", reply_markup=markup, disable_notification=True) await dp.start_polling(bot) if __name__ == "__main__": asyncio.run(main())