From 004eeb4b2b93db5e159b7d71e028cb00940700fc Mon Sep 17 00:00:00 2001 From: Ivan Vazhenin Date: Wed, 22 Nov 2023 20:32:08 +0300 Subject: [PATCH] Add logging to replication --- main.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/main.py b/main.py index 148158a..080f9f5 100644 --- a/main.py +++ b/main.py @@ -81,15 +81,16 @@ def run_tasks(): def replication_task(): while True: conn = db.connect_db() + logging.warning(connected) with Session(conn) as session: - for bndname in connected: - for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all(): - if item.user.is_active_now(): - if item.user.newbnd: - replication(bndname, item.commit_id, item.schema) - else: - replication_old(bndname, item.commit_id, item.schema) - session.delete(item) + for item in session.query(db.Queue).join(db.Queue.user).all(): + bndname = item.user.bndname + if item.user.is_active_now() and bndname != Config.self_bnd: + if item.user.newbnd: + replication(bndname, item.commit_id, item.schema) + else: + replication_old(bndname, item.commit_id, item.schema) + session.delete(item) session.commit() time.sleep(60) @@ -118,7 +119,7 @@ def crc32(filename, chunk_size=65536): return "%08X" % (checksum & 0xFFFFFFFF) -def send_object_replication(ws, ids): +def send_object_replication(bndname, ws, ids): qu = GroupQuery(Envelope()) query = GroupQuery(Envelope.world()) query.setUids(ids) @@ -152,12 +153,12 @@ def send_object_replication(ws, ids): 'Name': c1000[0]['fileName'], 'Type': 'OOD', 'metadata_version': '1', - 'source': 'bnd127', + 'source': Config.self_bnd, 'system_date': str(created_date.timestamp()), }) xml_version = ET.SubElement(xml_objects, 'version', { 'object_id': chart['uid'].replace('-', ''), - 'source': 'bnd127', + 'source': Config.self_bnd, 'system_date': str(created_date.timestamp()), 'version': '1.0', 'version_id': chart['uid'].replace('-', ''), @@ -206,7 +207,7 @@ def send_object_replication(ws, ids): }) params = { 'from': f'tcp://{Config.self_bnd}', - 'to': f'tcp://{Config.remote_bnd}', + 'to': f'tcp://{bndname}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_REPLICATION_REQUEST, @@ -235,12 +236,13 @@ def replication_old(bnd_name: str, commit_id: str, schema: str): logger.warning(res) created, updated, _ = ws.changes(commit_id) ids = list(set(created) | set(updated)) - send_object_replication(ws, ids) + send_object_replication(bnd_name, ws, ids) ws.clearData(True) logger.warning('Replication to old bnd is sent') def replication(bnd_name: str, commit_id: str, schema: str): + logging.warning(f'{bnd_name} {commit_id} {schema}') date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex