From a90f60153a3c03cd69cf33d79ffe3e920a3157f1 Mon Sep 17 00:00:00 2001 From: Ivan Vazhenin Date: Mon, 31 Jul 2023 20:11:07 +0300 Subject: [PATCH] Add s3 files processing --- config.py | 2 +- main.py | 54 +++++++++++++++++++++++++++++++++++------------------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/config.py b/config.py index b621088..2d30ed1 100644 --- a/config.py +++ b/config.py @@ -12,7 +12,7 @@ class Config: pg_password: str = 'Root12345678' oodb_host: str = '10.10.8.83' - oodb_port: int = 32100 + oodb_port: int = 32110 oodb_dbname: str = 'db' oodb_username: str = 'postgres' oodb_passwd: str = 'Root12345678' diff --git a/main.py b/main.py index 2ab797e..d0becff 100644 --- a/main.py +++ b/main.py @@ -45,18 +45,18 @@ def s3_connection(): aws_secret_access_key=Config.s3_access_key) -def download_file(key: str, filename: str): +def download_file(key: str, bucket: str, filename: str): client = s3_connection() - obj = client.get_object(Bucket=Config.s3_bucket, Key=key) + obj = client.get_object(Bucket=bucket, Key=key) with open(f"{filename}", 'wb') as f: for chunk in obj['Body'].iter_chunks(chunk_size=4096): f.write(chunk) -def upload_file(filename: str): +def upload_file(filename: str, key: str, bucket: str): client = s3_connection() with open(filename, 'rb') as f: - client.put_object(Body=f.read(), Bucket=Config.s3_bucket, Key=os.path.basename(filename)) + client.put_object(Body=f.read(), Bucket=bucket, Key=key) def run_tasks(): @@ -96,8 +96,11 @@ def pika_callback(ch, method, properties, body): date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex + commit_info = json.loads(body) + schema = commit_info.get('schema') or Config.oodb_schema res = rxmls.get_request_document(res_id, None) rxmls.set_result(res, 0, '') + ET.SubElement(res, 'replication', {'id': commit_info['commit'], 'schema': schema}) response_params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{Config.remote_bnd}', @@ -106,20 +109,22 @@ def pika_callback(ch, method, properties, body): 'query_type': 99, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } - commit_info = json.loads(body) - schema = commit_info.get('schema') or Config.oodb_schema con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) - ws.init(con) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) oe = IpdExporter(ws) # commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"} # commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"} + # commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"} + # commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"} z = Zip() - nc = oe.nextCommit(commit_info['commit']) - oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), commit_info['commit'], nc) - oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), commit_info['commit'], nc) + pc = oe.previousCommit(commit_info['commit']) + oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_info['commit']) + oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_info['commit']) created, updated, deleted = ws.changes(commit_info['commit']) qu = GroupQuery(Envelope()) @@ -136,30 +141,41 @@ def pika_callback(ch, method, properties, body): vers.sort(key=vers_key) not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) if not not_files: - attr = vers[0].feature().attribute('c1000') - if attr: - updated_files.append(feature_uid) + for attr in vers[0].feature().attributes('c1000'): + updated_files.append(variantToFileValue(attr.val())) + # print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}') ws.clearData(True) qu = GroupQuery(Envelope()) qu.setUids(created) ws.load(qu, uids) + exported_files = [] for feature_uid in uids: feature = ws.featureByUid(feature_uid) if not feature: continue - attr = feature.attribute('c1000') - if attr: - updated_files.append(feature_uid) + for attr in feature.attributes('c1000'): + updated_files.append(variantToFileValue(attr.val())) + # updated_files.append(feature_uid) for x in updated_files: - download_file(x, os.path.join(z.dirname, x)) + exported_files.append({ + 'key': x.key, + 'bucket': x.bucket, + 'filename': x.fileName, + }) + download_file(x.key, x.bucket, os.path.join(z.dirname, x.key)) + with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f: + f.write(json.dumps(exported_files)) ws.clearData(True) - ws.close() + # ws.close() filepath = z.pack() response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] logger.warning(response_files) logging.debug('Send replication package') proxy = ServerProxy(Config.enserver) - proxy.send(response_params, response_files, Config.ret_path) + try: + proxy.send(response_params, response_files, Config.ret_path) + except: + logger.error('Error sending') ch.basic_ack(delivery_tag=method.delivery_tag)