Fix connections

This commit is contained in:
ashatora
2024-04-30 21:41:29 +03:00
parent 856e796da1
commit c6ccebd4fe
2 changed files with 33 additions and 17 deletions

11
examples/catalog.json Normal file
View File

@@ -0,0 +1,11 @@
{
"files": [],
"url": "http://10.10.8.79:7001/xmlrpc",
"params": {
"from": "tcp://kptsp_vb",
"query_data": "<?xml version=\"1.0\" encoding=\"utf-8\"?><request><header parcel_id=\"990715ba919544a98f22cc7d3b0d9e8d\"/><getCatalog/></request>",
"query_type": 4,
"to": "tcp://bnd127",
"user_id": 0
}
}

View File

@@ -1,3 +1,4 @@
from json import JSONDecodeError
from tempfile import TemporaryDirectory, gettempdir from tempfile import TemporaryDirectory, gettempdir
from typing import Optional, Any from typing import Optional, Any
@@ -69,31 +70,35 @@ def restore_uuid(oid):
def pika_callback(ch, method, properties, body): def pika_callback(ch, method, properties, body):
data = json.loads(body) try:
params = data['params'] data = json.loads(body)
url = data['url'] params = data['params']
files = [] url = data['url']
for file in params['files']: files = []
fn = os.path.join(gettempdir(), uuid4().hex) for file in data['files']:
download_file(file['name'], file['bucket'], fn) fn = os.path.join(gettempdir(), uuid4().hex)
files.append(fn) download_file(file['name'], file['bucket'], fn)
run_task(params['query_type'], params, files, url) files.append(fn)
ch.basic_ack(delivery_tag=method.delivery_tag) run_task(params['query_type'], params, files, url)
# except JSONDecodeError as e:
# logging.warning(e)
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def send_response(params, files, url): def send_response(params, files, url):
global channel_send global channel_send
files_s3 = [] files_s3 = []
for file in params['files']: for file in files:
fn = uuid4().hex fn = uuid4().hex
upload_file(file, fn, Config.s3_bucket) upload_file(file['url'], fn, Config.s3_bucket)
files_s3.append({'name': fn, 'bucket': Config.s3_bucket}) files_s3.append({'name': fn, 'bucket': Config.s3_bucket})
data = { data = {
'params': params, 'params': params,
'files': files_s3, 'files': files_s3,
'url': url, 'url': url,
} }
channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data)) channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='')
def pika_task(): def pika_task():
@@ -267,13 +272,13 @@ def put_object(params, files, url):
def run_task(query_type, params, files, url): def run_task(query_type, params, files, url):
if query_type == 4: if query_type == 4:
tasks.put(lambda: load_catalog(params, files, url)) load_catalog(params, files, url)
if query_type == 1: if query_type == 1:
tasks.put(lambda: get_objects(params, files, url)) get_objects(params, files, url)
if query_type == 24: if query_type == 24:
tasks.put(lambda: get_metadata(params, files, url)) get_metadata(params, files, url)
if query_type == 6: if query_type == 6:
tasks.put(lambda: put_object(params, files, url)) put_object(params, files, url)
def main(): def main():