b2bcenter-parser/main.py

210 lines
8.7 KiB
Python
Raw Normal View History

2024-01-26 16:21:44 +03:00
import pathlib
2024-02-06 10:47:22 +03:00
import os
2024-01-26 16:21:44 +03:00
import time
import re
from typing import Self
2024-02-06 10:47:22 +03:00
from selenium.webdriver import Keys, ActionChains
2024-01-26 16:21:44 +03:00
from selenium.webdriver.common.by import By
2024-02-06 10:47:22 +03:00
from webdriver_manager.chrome import ChromeDriverManager
2024-01-26 16:21:44 +03:00
from selenium import webdriver
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
2024-02-06 10:47:22 +03:00
from excel_parser import ExcelParser
from storage import Storage
2024-01-26 16:21:44 +03:00
2024-02-15 10:56:32 +03:00
from telegram_logs import logger
2024-01-26 16:21:44 +03:00
class Parser:
keyword = "Велесстрой"
url = "https://www.b2b-center.ru/market"
_driver: webdriver.Chrome
_options: webdriver.ChromeOptions = webdriver.ChromeOptions()
_service: webdriver.ChromeService
def __init__(self):
2024-02-15 10:56:32 +03:00
logger.info("Бот запущен")
2024-01-26 18:52:05 +03:00
prefs = {"download.default_directory": str(pathlib.Path('./downloads').absolute())}
2024-02-15 10:56:32 +03:00
2024-01-26 16:21:44 +03:00
self._options.add_experimental_option("prefs", prefs)
2024-02-06 10:47:22 +03:00
# self._options.add_argument("--disable-extensions")
# self._options.add_argument("--disable-gpu")
# self._options.add_argument("--headless=new")
2024-02-15 11:14:58 +03:00
self._options.add_argument("window-size=1920,1080")
2024-01-26 16:21:44 +03:00
self._service = webdriver.ChromeService(executable_path=ChromeDriverManager().install())
2024-02-06 10:47:22 +03:00
self.storage = Storage()
self.storage.create_tables()
2024-01-26 16:21:44 +03:00
def __enter__(self) -> Self:
self._driver = webdriver.Chrome(service=self._service, options=self._options)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
2024-02-15 10:56:32 +03:00
logger.error("Бот остановлен. Причина: " + str(exc_val))
2024-01-26 16:21:44 +03:00
print("Gracefully shutting down...")
2024-02-15 10:56:32 +03:00
2024-01-26 16:21:44 +03:00
self._driver.close()
def find_elem(self, by: str, value: str):
try:
return WebDriverWait(self._driver, 10).until(EC.presence_of_element_located((by, value)))
except TimeoutException:
raise NoSuchElementException("Element not found")
def login(self):
self._driver.get(self.url)
time.sleep(3)
# Open login modal
element = self.find_elem(By.ID, "auth_ajax_modal_trigger")
element.click()
login_control = self.find_elem(By.ID, "login_control")
password_control = self.find_elem(By.ID, "password_control")
login_control.click()
login_control.clear()
login_control.send_keys("jde2015")
password_control.click()
password_control.clear()
password_control.send_keys("Bel8#Ans3")
time.sleep(5)
password_control.send_keys(Keys.RETURN)
2024-02-15 10:56:32 +03:00
def check_ltl(self, text):
return len(re.findall(r"[Ll][Tt][Ll]", text)) != 0
2024-01-26 16:21:44 +03:00
def search(self):
time.sleep(5)
self._driver.get(self.url + f'/?f_keyword={self.keyword}')
time.sleep(10)
table = self.find_elem(By.CSS_SELECTOR, ".search-results > tbody")
links = table.find_elements(By.CSS_SELECTOR, "tr > td:nth-child(1) > a")
2024-02-15 10:56:32 +03:00
links = [link.get_attribute("href") for link in links if
self.check_ltl(link.find_element(By.CSS_SELECTOR, 'div').text)]
links = [link for link in links if link not in self.storage.get_links()]
2024-01-26 16:21:44 +03:00
for link in links:
2024-02-15 10:56:32 +03:00
logger.info("Обработка заявки: " + link)
try:
self.accept_documentation(link)
except Exception as exc:
logger.error("Не удалось обработать заявку. Подробности: " + str(exc))
def parse(self, url: str = None) -> dict:
fp = self.download_documentation()
e_parser = ExcelParser(fp, url)
price = e_parser.calculate()
if not price:
logger.error("Не удалось расcчитать цену, переходим далее")
2024-01-26 16:21:44 +03:00
2024-02-15 10:56:32 +03:00
return price
2024-01-26 16:21:44 +03:00
def accept_documentation(self, url: str):
time.sleep(3)
self._driver.get(url)
# Скачать документацию
try:
2024-02-06 10:47:22 +03:00
download_documentation_button = self.find_elem(By.CSS_SELECTOR,
'#auction_info_td > table > tbody > tr:nth-child(4) table input[type=submit]')
time.sleep(5)
2024-02-15 10:56:32 +03:00
ActionChains(self._driver).scroll_to_element(download_documentation_button).scroll_by_amount(0,
100).perform()
2024-01-26 16:21:44 +03:00
download_documentation_button.click()
2024-02-06 10:47:22 +03:00
2024-02-15 10:56:32 +03:00
price = self.parse(url)
self.send_offer_link(price['price'], nds=price['vat'],
delivery_time=price['transport_delivery_date'],
delivery_range=price['max_days'])
2024-01-26 16:21:44 +03:00
except NoSuchElementException:
2024-02-15 10:56:32 +03:00
# logger.info("Отсутствует кнопка скачивания документации, переходим к документации")
2024-02-06 10:47:22 +03:00
2024-02-15 10:56:32 +03:00
price = self.parse(url)
self.send_offer_link(price['price'], nds=price['vat'],
delivery_time=price['transport_delivery_date'],
delivery_range=price['max_days'])
2024-02-06 10:47:22 +03:00
def download_documentation(self) -> list[pathlib.Path]:
2024-02-15 10:56:32 +03:00
try:
all_files_1 = set(
pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2])
2024-02-06 10:47:22 +03:00
2024-02-15 10:56:32 +03:00
time.sleep(5)
documentation_block = self.find_elem(By.CSS_SELECTOR, '#download_documentation')
docs = documentation_block.find_elements(By.CSS_SELECTOR, 'a')
for doc in docs:
href = doc.get_attribute('href')
if not href.endswith('.xlsx') and not href.endswith('.xls'):
continue
self._driver.get(href)
time.sleep(3)
all_files_2 = set(
pathlib.Path('./downloads') / pathlib.Path(file) for tree in os.walk('./downloads') for file in tree[2])
fp = all_files_2 - all_files_1
logger.debug(fp)
return [file for file in fp]
except Exception as exc:
logger.info(f"Не удалось скачать документацию. Подробности: {type(exc)} {str(exc)}")
raise KeyboardInterrupt()
def send_offer_link(self, price: int, nds: int, delivery_range: str, delivery_time: str):
try:
logger.info(
f"Предварительные данные по заявке: Цена: {price}, НДС: {nds}%, Доставка: {delivery_range} дн., Подача машины {delivery_time}")
offer_link = self.find_elem(By.ID, "send_offer_link")
ActionChains(self._driver).scroll_to_element(offer_link).scroll_by_amount(0, 100).perform()
offer_link.click()
price_id = self.find_elem(By.ID, 'price_id')
price_id.send_keys(str(price))
nds_elem = self.find_elem(By.CSS_SELECTOR, 'input[name="AUCTION_OFFER[price_vat]"]')
if nds == 0:
self.find_elem(By.ID, 'price_lot_no_tax').click()
else:
nds_elem.send_keys(str(nds))
row_11 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_11 > td:nth-child(2) > textarea")
row_12 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_12 > td:nth-child(2) > textarea")
row_13 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_13 > td:nth-child(2) > textarea")
row_15 = self.find_elem(By.CSS_SELECTOR, "tr#row_id_15 > td:nth-child(2) > input[type=submit]")
row_11.send_keys(str(delivery_range) + " дней")
row_12.send_keys(delivery_time)
row_13.send_keys(
"Дата подачи транспорта предварительная, согласовывается по звонку оператора ЖДЭ клиенту-отправителю")
time.sleep(10)
ActionChains(self._driver).scroll_to_element(row_15).scroll_by_amount(0, 100).perform()
row_15.click()
time.sleep(10)
self.apply_offer()
logger.success("Заявка успешно отправлена")
except Exception as exc:
logger.error(f"Не удалось отправить заявку. Ошибка: {type(exc)} : {str(exc)}")
def apply_offer(self):
btn = self.find_elem(By.CSS_SELECTOR, "form[name=AUCTION_OFFER] table.form-control_table button")
btn.click()
2024-02-06 10:47:22 +03:00
2024-01-26 16:21:44 +03:00
if __name__ == "__main__":
with Parser() as parser:
parser.login()
2024-02-15 10:58:52 +03:00
while True:
parser.search()
logger.info("Все LTL заявки обработаны, обновление через 60сек")
time.sleep(60)