diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b4f2c7c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM reg.ivazh.ru/infra-oodb +WORKDIR /app +COPY request_itv ./request_itv +COPY requirements.txt ./ +RUN pip3 install -r requirements.txt && \ + mkdir -p /opt/tnt/bin && \ + ln -s /usr/bin/python3 /opt/tnt/bin/python3 +ENV LD_LIBRARY_PATH "/app" +ENV PYTHONPATH "${PYTHONPATH}:/app" +CMD ["python3", "-m", "request_itv"] \ No newline at end of file diff --git a/request_itv/__main__.py b/request_itv/__main__.py index 2822f7a..f384a31 100644 --- a/request_itv/__main__.py +++ b/request_itv/__main__.py @@ -1,4 +1,4 @@ -from tempfile import TemporaryDirectory +from tempfile import TemporaryDirectory, gettempdir from typing import Optional, Any import pika @@ -35,7 +35,9 @@ from shutil import make_archive tasks = Queue() connection: Optional[pika.BlockingConnection] = None -channel: Optional[Any] = None +channel_send: Optional[Any] = None +channel_receive: Optional[Any] = None + server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') @@ -61,21 +63,47 @@ def upload_file(filename: str, key: str, bucket: str): client.put_object(Body=f.read(), Bucket=bucket, Key=key) -def get_branch(bndname: str, scheme: str): - conn = db.connect_db() - with Session(conn) as session: - item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join( - db.User).filter_by(bndname=bndname).one_or_none() - if item: - return item.branch, item.local_scheme - return None, None - - def restore_uuid(oid): uuid = UUID(oid) return str(uuid) +def pika_callback(ch, method, properties, body): + data = json.loads(body) + params = data['params'] + url = data['url'] + files = [] + for file in params['files']: + fn = os.path.join(gettempdir(), uuid4().hex) + download_file(file['name'], file['bucket'], fn) + files.append(fn) + run_task(params['query_type'], params, files, url) + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def send_response(params, files, url): + global channel_send + files_s3 = [] + for file in params['files']: + fn = uuid4().hex + upload_file(file, fn, Config.s3_bucket) + files_s3.append({'name': fn, 'bucket': Config.s3_bucket}) + data = { + 'params': params, + 'files': files_s3, + 'url': url, + } + channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data)) + + +def pika_task(): + global connection + global channel_receive + channel_receive = connection.channel() + channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback) + channel_receive.start_consuming() + + def load_catalog(params, files, url): logger.warning('load_catalog') date = datetime.datetime.now() @@ -102,8 +130,7 @@ def load_catalog(params, files, url): zipf.writestr('WF.CLL', catalog) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] - proxy = ServerProxy(url) - proxy.send(response_params, response_files, Config.ret_path) + send_response(response_params, response_files, url) def get_objects(params, files, url): @@ -140,8 +167,7 @@ def get_objects(params, files, url): zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] - proxy = ServerProxy(url) - proxy.send(response_params, response_files, Config.ret_path) + send_response(response_params, response_files, url) def get_metadata(params, files, url): @@ -191,8 +217,7 @@ def get_metadata(params, files, url): zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True)) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] - proxy = ServerProxy(url) - proxy.send(response_params, response_files, Config.ret_path) + send_response(response_params, response_files, url) def put_object(params, files, url): @@ -254,21 +279,15 @@ def run_task(query_type, params, files, url): def main(): global connection global server - global channel + global channel_send + + connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + channel_send = connection.channel() logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') - thread = threading.Thread(target=run_tasks) - thread.start() - - try: - logger.warning('Start server') - uvicorn.run(app, host="0.0.0.0", port=8000) - except KeyboardInterrupt: - logger.warning('Exiting') - finally: - server.server_close() + pika_task() def vers_key(e): diff --git a/request_itv/config.py b/request_itv/config.py index 42ecb72..6306fec 100644 --- a/request_itv/config.py +++ b/request_itv/config.py @@ -20,19 +20,13 @@ class Config: rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f' rabbit_queue: str = 'ipd' - - ws_rabbit_params: dict = { - 'host': '10.10.8.83', - 'port': 31005, - 'exchange': 'ipd', - 'user': 'user', - 'password': 'password', - } + rabbit_incoming_queue: str = 'ipd_queue_requests' + rabbit_out_exchange: str = 'ipd_out_itv' s3_endpoint: str = 'http://10.10.8.83:31006' s3_key_id: str = 's57' s3_access_key: str = 'd9MMinLF3U8TLSj' - s3_bucket: str = 'files' + s3_bucket: str = 'itv' gql_url: str = 'https://gql.ivazh.ru/graphql' gql_download: str = 'https://gql.ivazh.ru/item/{key}' diff --git a/request_itv/db.py b/request_itv/db.py index bc6833a..f50b9c0 100644 --- a/request_itv/db.py +++ b/request_itv/db.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List, Optional from sqlalchemy import create_engine, String, select, ForeignKey, Enum from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship -from config import Config +from .config import Config def tow(day: int, hour: int, minute: int): diff --git a/requirements.txt b/requirements.txt index 8ecf94b..ec860b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,7 +25,6 @@ pika-stubs==0.1.3 psycopg==3.1.10 pydantic==2.3.0 pydantic_core==2.6.3 -pygost==5.12 python-dateutil==2.8.2 python-multipart==0.0.6 requests==2.31.0