From 635e03be6d98021c8bab1bc86f7daee5361e4f6a Mon Sep 17 00:00:00 2001 From: ashatora Date: Tue, 30 Apr 2024 19:59:25 +0300 Subject: [PATCH] Fix connections --- Dockerfile | 15 ++++++++++ xmlrpcserver/__main__.py | 63 +++++++++++++++++++++++++++++----------- xmlrpcserver/config.py | 13 ++------- 3 files changed, 64 insertions(+), 27 deletions(-) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..cdfe352 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM reg.ivazh.ru/infra-oodb +WORKDIR /app +COPY xmlrpcserver ./xmlrpcserver +COPY requirements.txt ./ +RUN cd xmlrpcserver/deps/pygost-5.12/ && \ + python3 setup.py install && \ + cd ../../.. && \ + pip3 install -r requirements.txt && \ + mkdir -p /opt/tnt/bin && \ + ln -s /usr/bin/python3 /opt/tnt/bin/python3 +ENV LD_LIBRARY_PATH "/app" +ENV PYTHONPATH "${PYTHONPATH}:/app" +EXPOSE 9000 +EXPOSE 8000 +CMD ["python3", "-m", "xmlrpcserver"] \ No newline at end of file diff --git a/xmlrpcserver/__main__.py b/xmlrpcserver/__main__.py index a5ba9d7..8e9d748 100644 --- a/xmlrpcserver/__main__.py +++ b/xmlrpcserver/__main__.py @@ -1,4 +1,4 @@ -from tempfile import TemporaryDirectory +from tempfile import TemporaryDirectory, gettempdir from typing import Optional, Any import pika @@ -15,6 +15,8 @@ import io import zlib import os.path import requests +from pika.channel import Channel + from .reqs_graphql import get_catalog, get_object from pygost import gost34112012256 import xml.etree.ElementTree as ET @@ -28,7 +30,7 @@ from sqlalchemy.orm import Session from fastapi import FastAPI, Response, Form, UploadFile, File, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn -from typing_extensions import Annotated +from typing_extensions import Annotated, Union import pathlib from shutil import make_archive @@ -44,6 +46,10 @@ server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') +connection: Optional[pika.BlockingConnection] = None +channel_send: Optional[Any] = None +channel_receive: Optional[Any] = None + def s3_connection(): return boto3.client('s3', endpoint_url=Config.s3_endpoint, @@ -65,19 +71,27 @@ def upload_file(filename: str, key: str, bucket: str): client.put_object(Body=f.read(), Bucket=bucket, Key=key) -def get_branch(bndname: str, scheme: str): - conn = db.connect_db() - with Session(conn) as session: - item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join( - db.User).filter_by(bndname=bndname).one_or_none() - if item: - return item.branch, item.local_scheme - return None, None +def pika_callback(ch, method, properties, body): + data = json.loads(body) + params = data['params'] + url = data['url'] + files = [] + for file in params['files']: + fn = os.path.join(gettempdir(), uuid4().hex) + download_file(file['name'], file['bucket'], fn) + files.append(fn) + proxy = ServerProxy(url) + proxy.send(params, files, Config.ret_path) + + ch.basic_ack(delivery_tag=method.delivery_tag) -def list_contents(dir_name): - logger.warning('list_contents(%s)', dir_name) - return os.listdir(dir_name) +def pika_task(): + global connection + global channel_receive + channel_receive = connection.channel() + channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback) + channel_receive.start_consuming() def aud_add(message): @@ -155,9 +169,19 @@ def put_object(params, files, url): def accept(params, files, url): - print(params, files, url) + global channel_send + files_s3 = [] + for file in params['files']: + fn = uuid4().hex + upload_file(file, fn, Config.s3_bucket) + files_s3.append({'name': fn, 'bucket': Config.s3_bucket}) + data = { + 'params': params, + 'files': files_s3, + 'url': url, + } + channel_send.basic_publish(exchange=Config.rabbit_send_exchange, body=json.dumps(data)) logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False)) - # TODO: run_task(params['query_type'], params, files, url) return True @@ -184,7 +208,6 @@ def bnd_disconnected(bnd_name: str): def xmlrpc_task(): global server server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=True, allow_none=True) - server.register_function(list_contents) server.register_function(aud_add) server.register_function(auth_response) server.register_function(auth_challenge) @@ -284,15 +307,21 @@ async def correction_replication(bnd_name: str, schema: str): def main(): global connection + global channel_send global server - global channel logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') + connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + channel_send = connection.channel() + xmlrpc_thread = threading.Thread(target=xmlrpc_task) xmlrpc_thread.start() + pika_thread = threading.Thread(target=pika_task) + pika_thread.start() + try: logger.warning('Start server') uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/xmlrpcserver/config.py b/xmlrpcserver/config.py index 1e6eff7..a129994 100644 --- a/xmlrpcserver/config.py +++ b/xmlrpcserver/config.py @@ -19,20 +19,13 @@ class Config: oodb_schema: str = 'documents_src' rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f' - rabbit_queue: str = 'ipd' - - ws_rabbit_params: dict = { - 'host': '10.10.8.83', - 'port': 31005, - 'exchange': 'ipd', - 'user': 'user', - 'password': 'password', - } + rabbit_send_exchange: str = 'ipd_incoming_itv' + rabbit_incoming_queue: str = 'ipd_out_queue' s3_endpoint: str = 'http://10.10.8.83:31006' s3_key_id: str = 's57' s3_access_key: str = 'd9MMinLF3U8TLSj' - s3_bucket: str = 'files' + s3_bucket: str = 'itv' gql_url: str = 'https://gql.ivazh.ru/graphql' gql_download: str = 'https://gql.ivazh.ru/item/{key}'