diff --git a/db.py b/db.py new file mode 100644 index 0000000..dba4ca7 --- /dev/null +++ b/db.py @@ -0,0 +1,112 @@ +from typing import List, Optional +from sqlalchemy import create_engine, String, select, ForeignKey +from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship + + +class Base(DeclarativeBase): + pass + + +class User(Base): + __tablename__ = 'users' + + id: Mapped[int] = mapped_column(primary_key=True) + username: Mapped[str] + passwd: Mapped[str] + bndname: Mapped[str] + newbnd: Mapped[bool] + active: Mapped[bool] + upstream: Mapped[bool] + + profiles: Mapped[List['Profile']] = relationship( + back_populates='user', cascade='all, delete-orphan' + ) + + schedule: Mapped[List['Schedule']] = relationship( + back_populates='user', cascade='all, delete-orphan' + ) + + queue: Mapped[List['Queue']] = relationship( + back_populates='user', cascade='all, delete-orphan' + ) + + def __repr__(self) -> str: + return f'User(id={self.id!r}, username={self.username!r}, password={self.passwd!r}, newbnd={self.newbnd})' + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'username': self.username, + 'bndname': self.bndname, + 'newbnd': self.newbnd, + 'active': self.active, + 'upstream': self.upstream, + 'profiles': [x.to_dict() for x in self.profiles], + 'schedule': [x.to_dict() for x in self.schedule], + 'queue': [x.to_dict() for x in self.queue], + } + + +class Profile(Base): + __tablename__ = 'profiles' + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey('users.id')) + scheme: Mapped[str] + json: Mapped[str] + + user: Mapped['User'] = relationship(back_populates='profiles') + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'json': self.json, + } + + +class Schedule(Base): + __tablename__ = 'schedule' + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey('users.id')) + day_start: Mapped[int] + hour_start: Mapped[int] + minute_start: Mapped[int] + day_end: Mapped[int] + hour_end: Mapped[int] + minute_end: Mapped[int] + + user: Mapped['User'] = relationship(back_populates='schedule') + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'day_start': self.day_start, + 'hour_start': self.hour_start, + 'minute_start': self.minute_start, + 'day_end': self.day_end, + 'hour_end': self.hour_end, + 'minute_end': self.minute_end, + } + + +class Queue(Base): + __tablename__ = 'queue' + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey('users.id')) + commit_id: Mapped[str] + schema: Mapped[str] + + user: Mapped['User'] = relationship(back_populates='queue') + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'commit_id': self.commit_id, + 'schema': self.schema, + } + + +def connect_db(): + return create_engine("postgresql+psycopg://postgres:Root12345678@10.10.8.83:32101/db") diff --git a/deps/infra-0.1.tar.gz b/deps/infra-0.2-py3-none-any.whl similarity index 50% rename from deps/infra-0.1.tar.gz rename to deps/infra-0.2-py3-none-any.whl index ad9e31b..864ca1e 100644 Binary files a/deps/infra-0.1.tar.gz and b/deps/infra-0.2-py3-none-any.whl differ diff --git a/main.py b/main.py index be0d454..3abda0f 100644 --- a/main.py +++ b/main.py @@ -13,8 +13,6 @@ from xmlrpc.client import ServerProxy import logging import os import os.path - -import psycopg import requests from reqs.graphql import get_catalog, get_object from pygost import gost34112012256 @@ -25,13 +23,15 @@ from infra import * from config import Config from zip import Zip import boto3 +import db +from sqlalchemy.orm import Session NEW_REPLICATION_REQUEST = 99 tasks = Queue() -connected = False +connected = set() logger = logging.getLogger('xmlrpcserver') @@ -64,40 +64,26 @@ def run_tasks(): def replication_task(): - return - while not connected: - time.sleep(1) + while True: + print() + conn = db.connect_db() + with (Session(conn) as session): + for bndname in connected: + for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all(): + replication(bndname, item.commit_id, item.schema) + session.delete(item) + session.commit() + time.sleep(60) + + +def replication(bnd_name: str, commit_id: str, schema: str): date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex - res = rxmls.get_request_document(res_id, None) - res.set('replication_package', '1') - rxmls.set_result(res, 0, '') - response_params = { - 'from': f'tcp://{Config.self_bnd}', - 'to': f'tcp://{Config.remote_bnd}', - 'ts_added': date.timestamp(), - 'user_id': '0', - 'query_type': 1114, - 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) - } - filename = '51a8a2c81f774af7bba61b475b4b51b5' - filepath = '/tmp/' + filename - response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] - logging.debug('Send replication package') - proxy = ServerProxy(Config.enserver) - proxy.send(response_params, response_files, Config.ret_path) - -def pika_callback(ch, method, properties, body): - date = datetime.datetime.now() - rxmls = RequestXmlService() - res_id = uuid4().hex - commit_info = json.loads(body) - schema = commit_info.get('schema') or Config.oodb_schema res = rxmls.get_request_document(res_id, None) rxmls.set_result(res, 0, '') - ET.SubElement(res, 'replication', {'id': commit_info['commit'], 'scheme': schema}) + ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema}) response_params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{Config.remote_bnd}', @@ -119,11 +105,11 @@ def pika_callback(ch, method, properties, body): # commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"} z = Zip() - pc = oe.previousCommit(commit_info['commit']) - oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_info['commit']) - oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_info['commit']) + pc = oe.previousCommit(commit_id) + oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id) + oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id) - created, updated, deleted = ws.changes(commit_info['commit']) + created, updated, deleted = ws.changes(commit_id) qu = GroupQuery(Envelope()) qu.setUids(updated) qu.setLoadArch(True) @@ -173,6 +159,18 @@ def pika_callback(ch, method, properties, body): proxy.send(response_params, response_files, Config.ret_path) except: logger.error('Error sending') + + +def pika_callback(ch, method, properties, body): + commit_info = json.loads(body) + schema = commit_info.get('schema') or Config.oodb_schema + commit = commit_info['commit'] + conn = db.connect_db() + with Session(conn) as session: + for user in session.query(db.User).filter(db.User.active == True, db.User.upstream == False).all(): + item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) + session.add(item) + session.commit() ch.basic_ack(delivery_tag=method.delivery_tag) @@ -190,29 +188,27 @@ def list_contents(dir_name): def aud_add(message): - global connected if not isinstance(message, list): logger.warning(message) return 'OK' for item in message: logger.warning(item) - if item.get('level', -1) == 0 and item.get('type', -1) == 4002: - connected = True return 'OK' def auth_response(challenge, server_id, is_server): # logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}') - with psycopg.connect(f'host={Config.pg_host} port={Config.pg_port} dbname={Config.pg_dbname} ' - f'user={Config.pg_username} password={Config.pg_password}') as conn: - with conn.cursor() as cur: - cur.execute('SELECT passwd FROM users where username = %s', (server_id,)) - passwd = cur.fetchone() - if not passwd: - return {'error': True, 'response': 'Wrong user/bnd'} + conn = db.connect_db() + with Session(conn) as session: + try: + user = session.query(db.User).filter((db.User.username == server_id)).one() + passwd = user.passwd msg = '%s%s%s' % (challenge, server_id, passwd) response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex() + session.commit() return {'error': False, 'response': response} + except Exception: + return {'error': True, 'response': 'Wrong user/bnd'} def auth_challenge(): @@ -380,7 +376,7 @@ def run_task(query_type, params, files, url): if query_type == 24: tasks.put(lambda: get_metadata(params, files, url)) if query_type == NEW_REPLICATION_REQUEST: - tasks.put(lambda: get_metadata(params, files, url)) + tasks.put(lambda: apply_commits(params, files, url)) def accept(params, files, url): @@ -398,11 +394,16 @@ def onSent(params, files, callback_url): def onDelivered(params, files, callback_url): logger.warning('onDelivered') + def bnd_connected(bnd_name: str): logger.warning(f'{bnd_name} connected') + connected.add(bnd_name) + def bnd_disconnected(bnd_name: str): logger.warning(f'{bnd_name} disconnected') + if bnd_name in connected: + connected.remove(bnd_name) def main(): diff --git a/requirements.txt b/requirements.txt index 1422ec4..9cc9ed8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,7 +15,6 @@ jmespath==1.0.1 multidict==6.0.4 pika==1.3.2 pika-stubs==0.1.3 -psycopg==3.1.10 python-dateutil==2.8.2 requests==2.31.0 s3transfer==0.6.2 @@ -23,3 +22,4 @@ six==1.16.0 typing_extensions==4.7.1 urllib3==2.0.4 yarl==1.9.2 +SQLAlchemy~=2.0.20