from tempfile import TemporaryDirectory, gettempdir from typing import Optional, Any import pika import threading import time from queue import Queue import datetime import json from uuid import uuid4, UUID from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy import logging import io import zlib import os.path import requests from .reqs_graphql import get_catalog, get_object import xml.etree.ElementTree as ET from .reqs.request_xml_service import RequestXmlService import zipfile from .config import Config from .zip import Zip import boto3 import request_itv.db as db from sqlalchemy.orm import Session from fastapi import FastAPI, Response, Form, UploadFile, File, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing_extensions import Annotated import pathlib from shutil import make_archive tasks = Queue() connection: Optional[pika.BlockingConnection] = None channel_send: Optional[Any] = None channel_receive: Optional[Any] = None server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') def s3_connection(): return boto3.client('s3', endpoint_url=Config.s3_endpoint, aws_access_key_id=Config.s3_key_id, aws_secret_access_key=Config.s3_access_key) def download_file(key: str, bucket: str, filename: str): client = s3_connection() obj = client.get_object(Bucket=bucket, Key=key) with open(f"{filename}", 'wb') as f: for chunk in obj['Body'].iter_chunks(chunk_size=4096): f.write(chunk) def upload_file(filename: str, key: str, bucket: str): client = s3_connection() with open(filename, 'rb') as f: client.put_object(Body=f.read(), Bucket=bucket, Key=key) def restore_uuid(oid): uuid = UUID(oid) return str(uuid) def pika_callback(ch, method, properties, body): data = json.loads(body) params = data['params'] url = data['url'] files = [] for file in params['files']: fn = os.path.join(gettempdir(), uuid4().hex) download_file(file['name'], file['bucket'], fn) files.append(fn) run_task(params['query_type'], params, files, url) ch.basic_ack(delivery_tag=method.delivery_tag) def send_response(params, files, url): global channel_send files_s3 = [] for file in params['files']: fn = uuid4().hex upload_file(file, fn, Config.s3_bucket) files_s3.append({'name': fn, 'bucket': Config.s3_bucket}) data = { 'params': params, 'files': files_s3, 'url': url, } channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data)) def pika_task(): global connection global channel_receive channel_receive = connection.channel() channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback) channel_receive.start_consuming() def load_catalog(params, files, url): logger.warning('load_catalog') date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1004, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } catalog = get_catalog() logging.debug('Catalog_loaded') filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") zipf.writestr('WF.CLL', catalog) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] send_response(response_params, response_files, url) def get_objects(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('objects') uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1001, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") main_filename = None for uid in uids: obj = json.loads(get_object(uid)) for file in obj['properties'].get('c1000', []): if not main_filename: main_filename = file['fileName'] res = requests.get(Config.gql_download, params={'item_id': file["key"]}) zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] send_response(response_params, response_files, url) def get_metadata(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('getMetadataByIds') uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1024, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") content = ET.Element('getMetadataResponse') for uid in uids: obj = json.loads(get_object(uid)) date = datetime.datetime.fromisoformat(obj['date_updated']) chart = ET.SubElement(content, 'chart', { 'id': UUID(obj['uid']).hex, 'updated': str(date.timestamp()), }) for key in obj['properties']: if not key.startswith('c'): continue mdel = ET.SubElement(chart, 'mdItem', { 'code': key.replace('_', '.'), 'name': key, 'value': str(obj['properties'].get(key, '')), 'isBase': 'false', 'groupId': '', 'groupName': '', }) zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True)) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] send_response(response_params, response_files, url) def put_object(params, files, url): date = datetime.datetime.now() req = ET.fromstring(params['query_data']) obj = req.find('chart') class_id = obj.get('Class') con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) ws = OODBWorkspace.ws(Config.oodb_schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) logging.info(class_id) fc = ws.featureClass(class_id) feature = fc.createFeature('') geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011())) res = feature.setGeometry(geom) for attr in obj.findall('Attribute'): name = attr.get('name') value = attr.get('value') res &= feature.addAttribute(name, variantFromString(value)) assert len(files) == 1 file = files[0] dir = TemporaryDirectory() with zipfile.ZipFile(file, 'r') as zip_ref: zip_ref.extractall(dir.name) fp = pathlib.Path(dir.name) for item in fp.glob('**/*'): if not item.is_file(): continue fileVal = FileValue() key = uuid4().hex fileVal.fileName = variantToString(item.relative_to(dir.name)) fileVal.key = variantToString(key) fileVal.bucket = variantToString(Config.s3_bucket) res &= feature.addAttribute('c1000', variantFromFileValue(fileVal)) upload_file(str(item), key, Config.s3_bucket) ws.transaction() res = ws.save() ws.commit(f'Putobject from {params["to"]}') ws.clearData(True) def run_task(query_type, params, files, url): if query_type == 4: tasks.put(lambda: load_catalog(params, files, url)) if query_type == 1: tasks.put(lambda: get_objects(params, files, url)) if query_type == 24: tasks.put(lambda: get_metadata(params, files, url)) if query_type == 6: tasks.put(lambda: put_object(params, files, url)) def main(): global connection global server global channel_send connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) channel_send = connection.channel() logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') pika_task() def vers_key(e): return e.version() if __name__ == '__main__': main()