From 18be337d34089a45f8f2f827e401a3231c007990 Mon Sep 17 00:00:00 2001 From: ashatora Date: Wed, 1 May 2024 12:01:30 +0300 Subject: [PATCH] Fix connections --- Dockerfile | 10 +++++ examples/replication.json | 11 ++++++ replication_itv/__main__.py | 78 +++++++++++++++++++++++-------------- replication_itv/config.py | 11 ++---- replication_itv/db.py | 2 +- requirements.txt | 2 - 6 files changed, 73 insertions(+), 41 deletions(-) create mode 100644 Dockerfile create mode 100644 examples/replication.json diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8c8243c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM reg.ivazh.ru/infra-oodb +WORKDIR /app +COPY replication_itv ./replication_itv +COPY requirements.txt ./ +RUN pip3 install -r requirements.txt && \ + mkdir -p /opt/tnt/bin && \ + ln -s /usr/bin/python3 /opt/tnt/bin/python3 +ENV LD_LIBRARY_PATH "/app" +ENV PYTHONPATH "${PYTHONPATH}:/app" +CMD ["python3", "-m", "replication_itv"] \ No newline at end of file diff --git a/examples/replication.json b/examples/replication.json new file mode 100644 index 0000000..cb1bc6c --- /dev/null +++ b/examples/replication.json @@ -0,0 +1,11 @@ +{ + "files": [], + "url": "http://10.10.8.79:7000/xmlrpc", + "params": { + "from": "tcp://kptsp_vb", + "query_data": "
", + "query_type": 4, + "to": "tcp://bnd127", + "user_id": 0 + } +} \ No newline at end of file diff --git a/replication_itv/__main__.py b/replication_itv/__main__.py index f853905..9cbd231 100644 --- a/replication_itv/__main__.py +++ b/replication_itv/__main__.py @@ -1,4 +1,5 @@ -from tempfile import TemporaryDirectory +from json import JSONDecodeError +from tempfile import TemporaryDirectory, gettempdir from typing import Optional, Any import pika @@ -16,7 +17,6 @@ import zlib import os.path import requests from .reqs_graphql import get_catalog, get_object -from pygost import gost34112012256 import xml.etree.ElementTree as ET from .reqs.request_xml_service import RequestXmlService import zipfile @@ -43,8 +43,6 @@ NEW_DATA_RESPONSE = 1097 tasks = Queue() connected = set() -connection: Optional[pika.BlockingConnection] = None -channel: Optional[Any] = None server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') @@ -80,13 +78,6 @@ def get_branch(bndname: str, scheme: str): return None, None -def run_tasks(): - logger.debug('Task thread started.') - while True: - task = tasks.get() - task() - - def replication_task(): while True: conn = db.connect_db() @@ -358,13 +349,52 @@ def pika_callback(ch, method, properties, body): def pika_task(): - global connection - global channel connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) - channel = connection.channel() - channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) + channel_itv = connection.channel() + channel_itv.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) + channel_itv.start_consuming() - channel.start_consuming() + +def pika_itv_callback(ch, method, properties, body): + try: + data = json.loads(body) + params = data['params'] + url = data['url'] + files = [] + for file in data['files']: + fn = os.path.join(gettempdir(), uuid4().hex) + download_file(file['url']['name'], file['url']['bucket'], fn) + file['url'] = fn + files.append(file) + run_task(params['query_type'], params, files, url) + except JSONDecodeError as e: + logging.warning(e) + finally: + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def pika_itv_task(): + connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + channel_receive = connection.channel() + channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_itv_callback) + channel_receive.start_consuming() + + +def send_response(params, files, url): + files_s3 = [] + for file in files: + fn = uuid4().hex + upload_file(file['url'], fn, Config.s3_bucket) + file['url'] = {'name': fn, 'bucket': Config.s3_bucket} + files_s3.append(file) + data = { + 'params': params, + 'files': files_s3, + 'url': url, + } + connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + channel_send = connection.channel() + channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='') def put_object(params, files, url): @@ -484,8 +514,7 @@ def get_commit(params, files, url): 'query_type': NEW_COMMIT_RESPONSE, 'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True) } - proxy = ServerProxy(url) - proxy.send(response_params, [], Config.ret_path) + send_response(response_params, [], url) def query_commits(params, files, url): @@ -532,29 +561,18 @@ def run_task(query_type, params, files, url): def main(): - global connection global server - global channel logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') - thread = threading.Thread(target=run_tasks) - thread.start() - replication_thread = threading.Thread(target=replication_task) replication_thread.start() pika_thread = threading.Thread(target=pika_task) pika_thread.start() - try: - logger.warning('Start server') - uvicorn.run(app, host="0.0.0.0", port=8000) - except KeyboardInterrupt: - logger.warning('Exiting') - finally: - server.server_close() + pika_itv_task() def vers_key(e): diff --git a/replication_itv/config.py b/replication_itv/config.py index 42ecb72..cb8e6fb 100644 --- a/replication_itv/config.py +++ b/replication_itv/config.py @@ -20,19 +20,14 @@ class Config: rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f' rabbit_queue: str = 'ipd' - - ws_rabbit_params: dict = { - 'host': '10.10.8.83', - 'port': 31005, - 'exchange': 'ipd', - 'user': 'user', - 'password': 'password', - } + rabbit_incoming_queue: str = 'ipd_queue_replication' + rabbit_out_exchange: str = 'ipd_out_itv' s3_endpoint: str = 'http://10.10.8.83:31006' s3_key_id: str = 's57' s3_access_key: str = 'd9MMinLF3U8TLSj' s3_bucket: str = 'files' + s3_bucket_itv: str = 'itv' gql_url: str = 'https://gql.ivazh.ru/graphql' gql_download: str = 'https://gql.ivazh.ru/item/{key}' diff --git a/replication_itv/db.py b/replication_itv/db.py index bc6833a..f50b9c0 100644 --- a/replication_itv/db.py +++ b/replication_itv/db.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List, Optional from sqlalchemy import create_engine, String, select, ForeignKey, Enum from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship -from config import Config +from .config import Config def tow(day: int, hour: int, minute: int): diff --git a/requirements.txt b/requirements.txt index 8ecf94b..50cf277 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ ---find-links=deps aiohttp==3.8.4 aiosignal==1.3.1 annotated-types==0.5.0 @@ -25,7 +24,6 @@ pika-stubs==0.1.3 psycopg==3.1.10 pydantic==2.3.0 pydantic_core==2.6.3 -pygost==5.12 python-dateutil==2.8.2 python-multipart==0.0.6 requests==2.31.0