b2bcenter-parser/main.py

325 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import functools
import pathlib
import os
import time
import re
from typing import Self
from urllib.parse import urlparse, parse_qs
from selenium.webdriver import Keys, ActionChains
from selenium.webdriver.common.by import By
from webdriver_manager.chrome import ChromeDriverManager
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from excel_parser import ExcelParser
from storage import Storage
from telegram_logs import logger
import dotenv
import asyncio
import threading
from webserver import dp
from aiogram.filters import CommandStart
from aiogram.types import Message, ReplyKeyboardMarkup, KeyboardButton
from webserver import bot
dotenv.load_dotenv('.env')
IS_PROD = os.environ.get('PROD_ENV') == '1'
class Parser:
keyword = "Велесстрой"
url = "https://www.b2b-center.ru/market"
_driver: webdriver.Chrome
_options: webdriver.ChromeOptions = webdriver.ChromeOptions()
_service: webdriver.ChromeService
@staticmethod
def parser_alive(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if not PARSER_ALIVE:
raise KeyboardInterrupt("Бот остановлен по запросу")
return func(self, *args, **kwargs)
return wrapper
def __init__(self):
logger.info("Бот запущен")
prefs = {"download.default_directory": str(pathlib.Path('./downloads').absolute())}
self._options.add_experimental_option("prefs", prefs)
if IS_PROD:
self._options.add_argument("--disable-extensions")
self._options.add_argument("--disable-gpu")
self._options.add_argument("window-size=1920,1080")
self._options.add_argument("--headless=new")
self._service = webdriver.ChromeService(executable_path=ChromeDriverManager().install())
self.storage = Storage()
self.storage.create_tables()
def __enter__(self) -> Self:
self._driver = webdriver.Chrome(service=self._service, options=self._options)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
logger.error("Бот остановлен. Причина: " + str(exc_val))
print("Gracefully shutting down...")
self._driver.close()
def find_elem(self, by: str, value: str):
try:
return WebDriverWait(self._driver, 10).until(EC.presence_of_element_located((by, value)))
except TimeoutException:
raise NoSuchElementException("Element not found")
def login(self):
self._driver.get(self.url)
time.sleep(3)
# Open login modal
element = self.find_elem(By.ID, "auth_ajax_modal_trigger")
element.click()
login_control = self.find_elem(By.ID, "login_control")
password_control = self.find_elem(By.ID, "password_control")
login_control.click()
login_control.clear()
login_control.send_keys("solo@jde.ru")
password_control.click()
password_control.clear()
password_control.send_keys("Tutu@2024")
time.sleep(5)
password_control.send_keys(Keys.RETURN)
def check_ltl(self, text):
return len(re.findall(r"[Ll][Tt][Ll]", text)) != 0
@parser_alive
def search(self):
time.sleep(5)
self._driver.get(self.url + f'/?f_keyword={self.keyword}')
time.sleep(10)
table = self.find_elem(By.CSS_SELECTOR, ".search-results > tbody")
links = table.find_elements(By.CSS_SELECTOR, "tr > td:nth-child(1) > a")
links = [link.get_attribute("href") for link in links if
self.check_ltl(link.find_element(By.CSS_SELECTOR, 'div').text)]
links = [link for link in links if
f"https://www.b2b-center.ru/market/view.html?id=" + parse_qs(urlparse(link).query).get('id')[
0] not in self.storage.get_links()]
for link in links:
if PARSER_ALIVE is False:
raise KeyboardInterrupt("Бот остановлен по запросу")
logger.info("Обработка заявки: " + link)
try:
self.accept_documentation(link)
except Exception as exc:
logger.error("Не удалось обработать заявку. Подробности: " + str(exc))
logger.info("Все LTL заявки обработаны, обновление через 60сек")
@parser_alive
def parse(self, url: str = None) -> dict:
fp = self.download_documentation()
e_parser = ExcelParser(fp, url)
price = e_parser.calculate()
if not price:
logger.error("Не удалось расcчитать цену, переходим далее")
return price
@parser_alive
def accept_documentation(self, url: str):
time.sleep(3)
self._driver.get(url)
# Скачать документацию
try:
download_documentation_button = self.find_elem(By.CSS_SELECTOR,
'#auction_info_td > table > tbody > tr:nth-child(4) table input[type=submit]')
time.sleep(5)
ActionChains(self._driver).scroll_to_element(download_documentation_button).scroll_by_amount(0,
100).perform()
download_documentation_button.click()
price = self.parse(url)
self.send_offer_link(price['price'], nds=price['vat'],
delivery_time=price['transport_delivery_date'],
delivery_range=price['max_days'])
except NoSuchElementException:
# logger.info("Отсутствует кнопка скачивания документации, переходим к документации")
price = self.parse(url)
self.send_offer_link(price['price'], nds=price['vat'],
delivery_time=price['transport_delivery_date'],
delivery_range=price['max_days'])
@parser_alive
def download_documentation(self) -> list[pathlib.Path]:
try:
all_files_1 = set(
pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2])
time.sleep(5)
documentation_block = self.find_elem(By.CSS_SELECTOR, '#download_documentation')
docs = documentation_block.find_elements(By.CSS_SELECTOR, 'a')
for doc in docs:
href = doc.get_attribute('href')
if not href.endswith('.xlsx') and not href.endswith('.xls'):
continue
self._driver.get(href)
time.sleep(3)
all_files_2 = set(
pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2])
fp = all_files_2 - all_files_1
logger.debug(fp)
return [file for file in fp]
except Exception as exc:
logger.info(f"Не удалось скачать документацию. Подробности: {type(exc)} {str(exc)}")
raise KeyboardInterrupt()
@parser_alive
def send_offer_link(self, price: int, nds: int, delivery_range: str, delivery_time: str):
try:
logger.info(
f"Предварительные данные по заявке: Цена: {price}, НДС: {nds}%, Доставка: {delivery_range} дн., Подача машины {delivery_time}")
offer_link = self.find_elem(By.ID, "send_offer_link")
ActionChains(self._driver).scroll_to_element(offer_link).scroll_by_amount(0, 100).perform()
offer_link.click()
price_id = self.find_elem(By.ID, 'price_id')
price_id.send_keys(str(price))
nds_elem = self.find_elem(By.CSS_SELECTOR, 'input[name="AUCTION_OFFER[price_vat]"]')
if nds == 0:
self.find_elem(By.ID, 'price_lot_no_tax').click()
else:
nds_elem.send_keys(str(nds))
row_11 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_11 > td:nth-child(2) > textarea")
row_12 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_12 > td:nth-child(2) > textarea")
row_13 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_13 > td:nth-child(2) > textarea")
row_15 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_15 > td:nth-child(2) > input[type=submit]")
row_11.send_keys(str(delivery_range) + " дней")
row_12.send_keys(delivery_time)
row_13.send_keys(
"Дата подачи транспорта предварительная, согласовывается по звонку оператора ЖДЭ клиенту-отправителю")
time.sleep(10)
ActionChains(self._driver).scroll_to_element(row_15).scroll_by_amount(0, 100).perform()
row_15.click()
time.sleep(10)
if IS_PROD:
self.apply_offer()
logger.success("Заявка успешно отправлена")
time.sleep(10)
except Exception as exc:
logger.error(f"Не удалось отправить заявку. Ошибка: {type(exc)} : {str(exc)}")
@parser_alive
def apply_offer(self):
btn = self.find_elem(By.CSS_SELECTOR, "form[name=AUCTION_OFFER] table.form-control_table button")
ActionChains(self._driver).scroll_to_element(btn).scroll_by_amount(0, 100).perform()
btn.click()
pin = self.find_elem(By.CSS_SELECTOR, ".prt-ecp_sign input")
pin.send_keys("5732")
sign_btn = self.find_elem(By.CSS_SELECTOR, ".prt-ecp_sign button.btn-primary")
time.sleep(1)
sign_btn.click()
time.sleep(10)
PARSER_ALIVE = True
def parse_runner():
while True:
with Parser() as parser:
parser.login()
while PARSER_ALIVE:
parser.search()
time.sleep(60)
parser_thread = None
@dp.message(CommandStart())
async def start_handler(message: Message):
s = Storage()
if message.from_user.id not in s.get_users():
await message.answer("Вы не зарегистрированы, обратитесь к администратору")
return
markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]])
await message.answer(f"Hello, {message.from_user.full_name}!", reply_markup=markup)
@dp.message()
async def message_handler(message: Message):
global PARSER_ALIVE, parser_thread
s = Storage()
if str(message.from_user.id) not in s.get_users():
await message.answer("Вы не зарегистрированы, обратитесь к администратору")
return
markup_start = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]])
markup_shutdown = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Остановить Бот")]])
if message.text == "Запустить Бот":
if PARSER_ALIVE:
await message.answer("Бот уже запущен", reply_markup=markup_shutdown)
return
PARSER_ALIVE = True
for chat_id in s.get_users():
await bot.send_message(chat_id,
f"Пользователь {message.from_user.full_name} запускает бот",
reply_markup=markup_shutdown)
if not parser_thread or not parser_thread.is_alive():
parser_thread = threading.Thread(target=parse_runner, daemon=True)
parser_thread.start()
return
if message.text == "Остановить Бот":
if not PARSER_ALIVE:
await message.answer("Бот уже остановлен", reply_markup=markup_start)
for chat_id in s.get_users():
await bot.send_message(chat_id,
f"Пользователь {message.from_user.full_name} остановил бот",
reply_markup=markup_start)
PARSER_ALIVE = False
return
await message.answer("Неизвестная команда")
async def main():
storage = Storage()
markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text="Запустить Бот")]])
for chat_id in storage.get_users():
await bot.send_message(chat_id,
"Контроллер запущен, бот ожидает включения",
reply_markup=markup,
disable_notification=True)
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())