from json import JSONDecodeError from tempfile import TemporaryDirectory, gettempdir from typing import Optional, Any import pika import threading import time from queue import Queue import datetime import json from uuid import uuid4, UUID from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy import logging import io import zlib import os.path import requests import xml.etree.ElementTree as ET from .reqs.request_xml_service import RequestXmlService import zipfile from .config import Config from .zip import Zip import boto3 import query_itv.db as db from sqlalchemy.orm import Session from fastapi import FastAPI, Response, Form, UploadFile, File, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing_extensions import Annotated import pathlib from shutil import make_archive NEW_DATA_REQUEST = 97 NEW_DATA_RESPONSE = 1097 tasks = Queue() connected = set() connection: Optional[pika.BlockingConnection] = None channel_send: Optional[Any] = None channel_receive: Optional[Any] = None server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') def s3_connection(): return boto3.client('s3', endpoint_url=Config.s3_endpoint, aws_access_key_id=Config.s3_key_id, aws_secret_access_key=Config.s3_access_key) def download_file(key: str, bucket: str, filename: str): client = s3_connection() obj = client.get_object(Bucket=bucket, Key=key) with open(f"{filename}", 'wb') as f: for chunk in obj['Body'].iter_chunks(chunk_size=4096): f.write(chunk) def upload_file(filename: str, key: str, bucket: str): client = s3_connection() with open(filename, 'rb') as f: client.put_object(Body=f.read(), Bucket=bucket, Key=key) def pika_callback(ch, method, properties, body): try: data = json.loads(body) params = data['params'] url = data['url'] files = [] for file in data['files']: fn = os.path.join(gettempdir(), uuid4().hex) download_file(file['url']['name'], file['url']['bucket'], fn) file['url'] = fn files.append(file) run_task(params['query_type'], params, files, url) except JSONDecodeError as e: logging.warning(e) finally: ch.basic_ack(delivery_tag=method.delivery_tag) def send_response(params, files, url): global channel_send files_s3 = [] for file in files: fn = uuid4().hex upload_file(file['url'], fn, Config.s3_bucket_itv) file['url'] = {'name': fn, 'bucket': Config.s3_bucket_itv} files_s3.append(file) data = { 'params': params, 'files': files_s3, 'url': url, } channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='') def pika_task(): global connection global channel_receive channel_receive = connection.channel() channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback) channel_receive.start_consuming() def run_tasks(): logger.debug('Task thread started.') while True: task = tasks.get() task() def put_object(params, files, url): date = datetime.datetime.now() req = ET.fromstring(params['query_data']) obj = req.find('chart') class_id = obj.get('Class') con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) ws = OODBWorkspace.ws(Config.oodb_schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) logging.info(class_id) fc = ws.featureClass(class_id) feature = fc.createFeature('') geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011())) res = feature.setGeometry(geom) for attr in obj.findall('Attribute'): name = attr.get('name') value = attr.get('value') res &= feature.addAttribute(name, variantFromString(value)) assert len(files) == 1 file = files[0] dir = TemporaryDirectory() with zipfile.ZipFile(file, 'r') as zip_ref: zip_ref.extractall(dir.name) fp = pathlib.Path(dir.name) for item in fp.glob('**/*'): if not item.is_file(): continue fileVal = FileValue() key = uuid4().hex fileVal.fileName = variantToString(item.relative_to(dir.name)) fileVal.key = variantToString(key) fileVal.bucket = variantToString(Config.s3_bucket) res &= feature.addAttribute('c1000', variantFromFileValue(fileVal)) upload_file(str(item), key, Config.s3_bucket) ws.transaction() res = ws.save() ws.commit(f'Putobject from {params["to"]}') ws.clearData(True) def get_data(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') request_string = req.find('data').text OODBWorkspaceFactory.init('/app/query_itv/config/workspaces.json') conf = ResponseWorkerConfig('/app/query_itv/config/response2.json', '/app/query_itv/config/workspaces.json') worker = ResponseWorker(conf) fn = uuid4().hex dir = os.path.join(os.getcwd(), 'tmp', fn) os.makedirs(dir) worker.makeResponse(request_string, dir) make_archive(dir, 'zip', dir, '..') rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': NEW_DATA_RESPONSE, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = fn + '.zip' filepath = os.path.join(os.getcwd(), 'tmp', filename) response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] send_response(response_params, response_files, url) def receive_data(params, files, url): req = ET.fromstring(params['query_data']) commit_el = req.find('commit') commit_id = commit_el.get('id') schema = commit_el.get('schema') bnd = params['from'].replace('tcp://', '') con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) schema_commits = ws.commits(ws.branch()) logger.warning(schema_commits) if commit_id not in schema_commits: logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') return logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) conn = db.connect_db() with Session(conn) as session: for commit in schema_commits[schema_commits.index(commit_id) + 1:]: for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): if user.bndname == Config.self_bnd: continue profiles = {x.scheme: x.to_dict() for x in user.profiles} if len(profiles) == 0 or schema in profiles: item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) logging.warning(item) session.add(item) session.commit() def run_task(query_type, params, files, url): if query_type == NEW_DATA_REQUEST: get_data(params, files, url) if query_type == NEW_DATA_RESPONSE: receive_data(params, files, url) def main(): global connection global server global channel_send connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) channel_send = connection.channel() logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') pika_task() def vers_key(e): return e.version() if __name__ == '__main__': main()