From dada530829068d34f9751907a5de19d4ea3c67e9 Mon Sep 17 00:00:00 2001 From: Ivan Vazhenin Date: Mon, 4 Sep 2023 18:53:19 +0300 Subject: [PATCH] Fix import changes --- main.py | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index d0becff..b16c9a4 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import asyncio +from tempfile import TemporaryDirectory import pika import sys import threading @@ -33,6 +34,9 @@ from zip import Zip import boto3 +NEW_REPLICATION_REQUEST = 99 + + tasks = Queue() connected = False @@ -100,13 +104,13 @@ def pika_callback(ch, method, properties, body): schema = commit_info.get('schema') or Config.oodb_schema res = rxmls.get_request_document(res_id, None) rxmls.set_result(res, 0, '') - ET.SubElement(res, 'replication', {'id': commit_info['commit'], 'schema': schema}) + ET.SubElement(res, 'replication', {'id': commit_info['commit'], 'scheme': schema}) response_params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{Config.remote_bnd}', 'ts_added': date.timestamp(), 'user_id': '0', - 'query_type': 99, + 'query_type': NEW_REPLICATION_REQUEST, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, @@ -347,6 +351,34 @@ def get_metadata(params, files, url): proxy.send(response_params, response_files, Config.ret_path) +def apply_commits(params, files, url): + logger.warning(params, files, url) + assert len(files) == 1 + file = files[0] + dir = TemporaryDirectory[str] + with zipfile.ZipFile(file, 'r') as zip_ref: + zip_ref.extractall(dir.name) + req = ET.fromstring(params['query_data']) + repl = req.find('replication') + scheme = repl.get('scheme') + commit = repl.get('id') + logger.warning(scheme, commit) + os.path.join(dir.name, 'export.o5c') + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + oe = IpdExporter(ws) + oe.improtFromOsm(os.path.join(dir.name, 'export.o5c')) + oe.improtFromMbtiles(os.path.join(dir.name, 'export.mbtiles')) + with open('export_files.json', 'r') as f: + files_data = json.load(f) + for file_data in files_data: + upload_file(file_data['filename'], file_data['key'], file_data['bucket']) + + def run_task(query_type, params, files, url): if query_type == 4: tasks.put(lambda: load_catalog(params, files, url)) @@ -354,6 +386,8 @@ def run_task(query_type, params, files, url): tasks.put(lambda: get_objects(params, files, url)) if query_type == 24: tasks.put(lambda: get_metadata(params, files, url)) + if query_type == NEW_REPLICATION_REQUEST: + tasks.put(lambda: get_metadata(params, files, url)) def accept(params, files, url):