From deadd10588231fb010c5aeffe3b6512fdcd25607 Mon Sep 17 00:00:00 2001 From: Ivan Vazhenin Date: Thu, 27 Jul 2023 18:27:26 +0300 Subject: [PATCH] Add s3 files processing --- config.py | 9 ++++- main.py | 102 +++++++++++++++++++++++++++++++++++++++++++---- requirements.txt | 12 +++++- 3 files changed, 112 insertions(+), 11 deletions(-) diff --git a/config.py b/config.py index d0ad7dc..b621088 100644 --- a/config.py +++ b/config.py @@ -16,7 +16,12 @@ class Config: oodb_dbname: str = 'db' oodb_username: str = 'postgres' oodb_passwd: str = 'Root12345678' - oodb_schema: str = 'ood' + oodb_schema: str = 'documents_src' rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f' - rabbit_queue: str = 'ipd' \ No newline at end of file + rabbit_queue: str = 'ipd' + + s3_endpoint: str = 'http://10.10.8.83:31006' + s3_key_id: str = 's57' + s3_access_key: str = 'd9MMinLF3U8TLSj' + s3_bucket: str = 'files' diff --git a/main.py b/main.py index 59bc796..2ab797e 100644 --- a/main.py +++ b/main.py @@ -30,6 +30,7 @@ from libipdutilities import * from liboodriver import * from config import Config from zip import Zip +import boto3 tasks = Queue() @@ -38,6 +39,26 @@ connected = False logger = logging.getLogger('xmlrpcserver') +def s3_connection(): + return boto3.client('s3', endpoint_url=Config.s3_endpoint, + aws_access_key_id=Config.s3_key_id, + aws_secret_access_key=Config.s3_access_key) + + +def download_file(key: str, filename: str): + client = s3_connection() + obj = client.get_object(Bucket=Config.s3_bucket, Key=key) + with open(f"{filename}", 'wb') as f: + for chunk in obj['Body'].iter_chunks(chunk_size=4096): + f.write(chunk) + + +def upload_file(filename: str): + client = s3_connection() + with open(filename, 'rb') as f: + client.put_object(Body=f.read(), Bucket=Config.s3_bucket, Key=os.path.basename(filename)) + + def run_tasks(): logger.debug('Task thread started.') while True: @@ -85,15 +106,53 @@ def pika_callback(ch, method, properties, body): 'query_type': 99, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } - con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) - ws = OODBWorkspace.ws(Config.oodb_schema) + commit_info = json.loads(body) + schema = commit_info.get('schema') or Config.oodb_schema + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) ws.init(con) - oe = OsmExporter(ws) - # commit = '1de21737-09dc-4a45-b7d9-7b11044ee487' + oe = IpdExporter(ws) + # commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"} + # commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"} z = Zip() - nc = oe.nextCommit(body) - oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), body, nc) + + nc = oe.nextCommit(commit_info['commit']) + oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), commit_info['commit'], nc) + oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), commit_info['commit'], nc) + + created, updated, deleted = ws.changes(commit_info['commit']) + qu = GroupQuery(Envelope()) + qu.setUids(updated) + qu.setLoadArch(True) + uids = [] + ws.clearData(True) + ws.load(qu, uids) + updated_files = [] + for feature_uid in uids: + not_files = True + vers = ws.featureVersion(feature_uid) + if len(vers) > 1: + vers.sort(key=vers_key) + not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) + if not not_files: + attr = vers[0].feature().attribute('c1000') + if attr: + updated_files.append(feature_uid) + ws.clearData(True) + qu = GroupQuery(Envelope()) + qu.setUids(created) + ws.load(qu, uids) + for feature_uid in uids: + feature = ws.featureByUid(feature_uid) + if not feature: + continue + attr = feature.attribute('c1000') + if attr: + updated_files.append(feature_uid) + for x in updated_files: + download_file(x, os.path.join(z.dirname, x)) + ws.clearData(True) ws.close() filepath = z.pack() response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] @@ -325,6 +384,10 @@ def main(): logger.warning('Exiting') +def vers_key(e): + return e.version() + + def test(): #params = {"from": "tcp://kptsp_vb", "query_data": "
", "query_type": 24, "to": "tcp://bnd127", "user_id": "3302", "ts_added": 1679825320.653038} #files = [] @@ -332,7 +395,30 @@ def test(): # accept(params, files, url) #get_metadata(params, files, url) #get_catalog() - auth_response('123', 'bnd127', False) + # auth_response('123', 'bnd127', False) + con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) + ws = OODBWorkspace.ws(Config.oodb_schema) + ws.init(con) + created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be') + qu = GroupQuery(Envelope()) + qu.setUids(updated) + qu.setLoadArch(True) + uids = [] + ws.clearData(True) + ws.load(qu, uids) + updated_files = [] + for feature_uid in uids: + not_files = True + vers = ws.featureVersion(feature_uid) + if len(vers) > 1: + vers.sort(key=vers_key) + not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) + if not not_files: + updated_files.append(feature_uid) + print(updated_files) + ws.clearData(True) + ws.close() pass diff --git a/requirements.txt b/requirements.txt index eaab079..f06bbe0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,14 +3,24 @@ aiosignal==1.3.1 async-timeout==4.0.2 attrs==23.1.0 backoff==2.2.1 +boto3==1.28.11 +botocore==1.31.11 certifi==2023.5.7 charset-normalizer==3.2.0 frozenlist==1.3.3 gql==3.4.1 graphql-core==3.2.3 idna==3.4 +jmespath==1.0.1 multidict==6.0.4 +pika==1.3.2 +pika-stubs==0.1.3 +psycopg==3.1.9 pygost==5.12 +python-dateutil==2.8.2 requests==2.31.0 -urllib3==2.0.3 +s3transfer==0.6.1 +six==1.16.0 +typing_extensions==4.7.1 +urllib3==1.26.16 yarl==1.9.2