89 lines
2.7 KiB
Python
89 lines
2.7 KiB
Python
import sqlite3
|
|
from contextlib import contextmanager
|
|
from typing import ContextManager
|
|
|
|
|
|
class Storage:
|
|
def __init__(self):
|
|
self.con = None
|
|
|
|
def set_connection(self):
|
|
self.con = sqlite3.connect("database.db")
|
|
|
|
@contextmanager
|
|
def get_cursor(self) -> ContextManager[sqlite3.Cursor]:
|
|
if self.con is None:
|
|
self.set_connection()
|
|
cur = self.con.cursor()
|
|
try:
|
|
yield cur
|
|
finally:
|
|
cur.close()
|
|
|
|
def create_tables(self):
|
|
with self.get_cursor() as cur:
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS ltl (
|
|
id INTEGER,
|
|
doc_filepath TEXT,
|
|
link TEXT,
|
|
total_cost INTEGER,
|
|
query TEXT,
|
|
answer TEXT,
|
|
PRIMARY KEY (id, doc_filepath)
|
|
);
|
|
|
|
""")
|
|
cur.execute("""
|
|
create trigger if not exists add_link_trig
|
|
after insert
|
|
on ltl
|
|
begin
|
|
update ltl set link = 'https://www.b2b-center.ru/market/view.html?id=' || new.id where id=new.id;
|
|
end;
|
|
""")
|
|
|
|
cur.execute("""
|
|
create trigger if not exists query_answer_trig
|
|
after insert
|
|
on ltl
|
|
begin
|
|
update ltl set total_cost = new.total_cost,
|
|
query = new.query,
|
|
answer = new.answer
|
|
where link = new.link;
|
|
end;
|
|
""")
|
|
self.con.commit()
|
|
|
|
def add_link(self, id: int, doc_filepath: str, total_cost: int | None, query: str | None, answer: str | None):
|
|
with self.get_cursor() as cur:
|
|
cur.execute("INSERT INTO ltl (id, doc_filepath, total_cost, query, answer) VALUES (?, ?, ?, ?, ?)",
|
|
(id, doc_filepath, total_cost, query, answer))
|
|
self.con.commit()
|
|
|
|
def get_links(self):
|
|
with self.get_cursor() as cur:
|
|
res = cur.execute("SELECT DISTINCT link FROM ltl")
|
|
return [row[0] for row in res.fetchall()]
|
|
|
|
def is_link_exists(self, link: str) -> bool:
|
|
with self.get_cursor() as cur:
|
|
res = cur.execute("SELECT * FROM ltl WHERE link = ?", (link,))
|
|
return res.fetchone()
|
|
|
|
def is_doc_exists(self, doc_filepath: str) -> bool:
|
|
with self.get_cursor() as cur:
|
|
res = cur.execute("SELECT * FROM ltl WHERE doc_filepath = ?", (doc_filepath,))
|
|
return res.fetchone()
|
|
|
|
def get_users(self):
|
|
with self.get_cursor() as cur:
|
|
res = cur.execute("SELECT DISTINCT tg_user_id FROM users")
|
|
return [row[0] for row in res.fetchall()]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
s = Storage()
|
|
print(s.get_users())
|