From c6ccebd4fe23dab6f9453c20d17cd5ee270745d6 Mon Sep 17 00:00:00 2001 From: ashatora Date: Tue, 30 Apr 2024 21:41:29 +0300 Subject: [PATCH] Fix connections --- examples/catalog.json | 11 +++++++++++ request_itv/__main__.py | 39 ++++++++++++++++++++++----------------- 2 files changed, 33 insertions(+), 17 deletions(-) create mode 100644 examples/catalog.json diff --git a/examples/catalog.json b/examples/catalog.json new file mode 100644 index 0000000..37d99a8 --- /dev/null +++ b/examples/catalog.json @@ -0,0 +1,11 @@ +{ + "files": [], + "url": "http://10.10.8.79:7001/xmlrpc", + "params": { + "from": "tcp://kptsp_vb", + "query_data": "
", + "query_type": 4, + "to": "tcp://bnd127", + "user_id": 0 + } +} \ No newline at end of file diff --git a/request_itv/__main__.py b/request_itv/__main__.py index f384a31..fad5d54 100644 --- a/request_itv/__main__.py +++ b/request_itv/__main__.py @@ -1,3 +1,4 @@ +from json import JSONDecodeError from tempfile import TemporaryDirectory, gettempdir from typing import Optional, Any @@ -69,31 +70,35 @@ def restore_uuid(oid): def pika_callback(ch, method, properties, body): - data = json.loads(body) - params = data['params'] - url = data['url'] - files = [] - for file in params['files']: - fn = os.path.join(gettempdir(), uuid4().hex) - download_file(file['name'], file['bucket'], fn) - files.append(fn) - run_task(params['query_type'], params, files, url) - ch.basic_ack(delivery_tag=method.delivery_tag) + try: + data = json.loads(body) + params = data['params'] + url = data['url'] + files = [] + for file in data['files']: + fn = os.path.join(gettempdir(), uuid4().hex) + download_file(file['name'], file['bucket'], fn) + files.append(fn) + run_task(params['query_type'], params, files, url) + # except JSONDecodeError as e: + # logging.warning(e) + finally: + ch.basic_ack(delivery_tag=method.delivery_tag) def send_response(params, files, url): global channel_send files_s3 = [] - for file in params['files']: + for file in files: fn = uuid4().hex - upload_file(file, fn, Config.s3_bucket) + upload_file(file['url'], fn, Config.s3_bucket) files_s3.append({'name': fn, 'bucket': Config.s3_bucket}) data = { 'params': params, 'files': files_s3, 'url': url, } - channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data)) + channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='') def pika_task(): @@ -267,13 +272,13 @@ def put_object(params, files, url): def run_task(query_type, params, files, url): if query_type == 4: - tasks.put(lambda: load_catalog(params, files, url)) + load_catalog(params, files, url) if query_type == 1: - tasks.put(lambda: get_objects(params, files, url)) + get_objects(params, files, url) if query_type == 24: - tasks.put(lambda: get_metadata(params, files, url)) + get_metadata(params, files, url) if query_type == 6: - tasks.put(lambda: put_object(params, files, url)) + put_object(params, files, url) def main():