from tempfile import TemporaryDirectory from typing import Optional, Any import pika import threading import time from queue import Queue import datetime import json from uuid import uuid4, UUID from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy import logging import io import zlib import os.path import requests from .reqs_graphql import get_catalog, get_object import xml.etree.ElementTree as ET from .reqs.request_xml_service import RequestXmlService import zipfile from .config import Config from .zip import Zip import boto3 import query_itv.db as db from sqlalchemy.orm import Session from fastapi import FastAPI, Response, Form, UploadFile, File, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing_extensions import Annotated import pathlib from shutil import make_archive NEW_DATA_REQUEST = 97 NEW_DATA_RESPONSE = 1097 tasks = Queue() connected = set() connection: Optional[pika.BlockingConnection] = None channel: Optional[Any] = None server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') def s3_connection(): return boto3.client('s3', endpoint_url=Config.s3_endpoint, aws_access_key_id=Config.s3_key_id, aws_secret_access_key=Config.s3_access_key) def download_file(key: str, bucket: str, filename: str): client = s3_connection() obj = client.get_object(Bucket=bucket, Key=key) with open(f"{filename}", 'wb') as f: for chunk in obj['Body'].iter_chunks(chunk_size=4096): f.write(chunk) def upload_file(filename: str, key: str, bucket: str): client = s3_connection() with open(filename, 'rb') as f: client.put_object(Body=f.read(), Bucket=bucket, Key=key) def get_branch(bndname: str, scheme: str): conn = db.connect_db() with Session(conn) as session: item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join( db.User).filter_by(bndname=bndname).one_or_none() if item: return item.branch, item.local_scheme return None, None def run_tasks(): logger.debug('Task thread started.') while True: task = tasks.get() task() def put_object(params, files, url): date = datetime.datetime.now() req = ET.fromstring(params['query_data']) obj = req.find('chart') class_id = obj.get('Class') con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) ws = OODBWorkspace.ws(Config.oodb_schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) logging.info(class_id) fc = ws.featureClass(class_id) feature = fc.createFeature('') geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011())) res = feature.setGeometry(geom) for attr in obj.findall('Attribute'): name = attr.get('name') value = attr.get('value') res &= feature.addAttribute(name, variantFromString(value)) assert len(files) == 1 file = files[0] dir = TemporaryDirectory() with zipfile.ZipFile(file, 'r') as zip_ref: zip_ref.extractall(dir.name) fp = pathlib.Path(dir.name) for item in fp.glob('**/*'): if not item.is_file(): continue fileVal = FileValue() key = uuid4().hex fileVal.fileName = variantToString(item.relative_to(dir.name)) fileVal.key = variantToString(key) fileVal.bucket = variantToString(Config.s3_bucket) res &= feature.addAttribute('c1000', variantFromFileValue(fileVal)) upload_file(str(item), key, Config.s3_bucket) ws.transaction() res = ws.save() ws.commit(f'Putobject from {params["to"]}') ws.clearData(True) def get_data(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') request_string = req.find('data').text OODBWorkspaceFactory.init('config/workspaces.json') conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json') worker = ResponseWorker(conf) fn = uuid4().hex dir = os.path.join(os.getcwd(), 'tmp', fn) os.makedirs(dir) worker.makeResponse(request_string, dir) make_archive(dir, 'zip', dir, '..') rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': NEW_DATA_RESPONSE, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = fn + '.zip' filepath = os.path.join(os.getcwd(), 'tmp', filename) response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def receive_data(params, files, url): req = ET.fromstring(params['query_data']) commit_el = req.find('commit') commit_id = commit_el.get('id') schema = commit_el.get('schema') bnd = params['from'].replace('tcp://', '') con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) schema_commits = ws.commits(ws.branch()) logger.warning(schema_commits) if commit_id not in schema_commits: logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') return logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) conn = db.connect_db() with Session(conn) as session: for commit in schema_commits[schema_commits.index(commit_id) + 1:]: for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): if user.bndname == Config.self_bnd: continue profiles = {x.scheme: x.to_dict() for x in user.profiles} if len(profiles) == 0 or schema in profiles: item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) logging.warning(item) session.add(item) session.commit() def run_task(query_type, params, files, url): if query_type == NEW_DATA_REQUEST: tasks.put(lambda: get_data(params, files, url)) if query_type == NEW_DATA_RESPONSE: tasks.put(lambda: receive_data(params, files, url)) def main(): global connection global server global channel logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') thread = threading.Thread(target=run_tasks) thread.start() try: logger.warning('Start server') uvicorn.run(app, host="0.0.0.0", port=8000) except KeyboardInterrupt: logger.warning('Exiting') finally: server.server_close() def vers_key(e): return e.version() if __name__ == '__main__': main()