from tempfile import TemporaryDirectory, gettempdir from typing import Optional, Any import pika import threading import time from queue import Queue import datetime import json from uuid import uuid4, UUID from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy import logging import io import zlib import os.path import requests from pika.channel import Channel from pygost import gost34112012256 import xml.etree.ElementTree as ET from .reqs.request_xml_service import RequestXmlService import zipfile from .config import Config from .zip import Zip import boto3 import xmlrpcserver.db as db from sqlalchemy.orm import Session from fastapi import FastAPI, Response, Form, UploadFile, File, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing_extensions import Annotated, Union import pathlib from shutil import make_archive NEW_REPLICATION_REQUEST = 99 NEW_COMMIT_REQUEST = 98 NEW_COMMIT_RESPONSE = 1098 NEW_DATA_REQUEST = 97 NEW_DATA_RESPONSE = 1097 connected = set() server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') connection: Optional[pika.BlockingConnection] = None channel_send: Optional[Any] = None channel_receive: Optional[Any] = None def s3_connection(): return boto3.client('s3', endpoint_url=Config.s3_endpoint, aws_access_key_id=Config.s3_key_id, aws_secret_access_key=Config.s3_access_key) def download_file(key: str, bucket: str, filename: str): client = s3_connection() obj = client.get_object(Bucket=bucket, Key=key) with open(f"{filename}", 'wb') as f: for chunk in obj['Body'].iter_chunks(chunk_size=4096): f.write(chunk) def upload_file(filename: str, key: str, bucket: str): client = s3_connection() with open(filename, 'rb') as f: client.put_object(Body=f.read(), Bucket=bucket, Key=key) def pika_callback(ch, method, properties, body): try: data = json.loads(body) params = data['params'] url = data['url'] files = [] for file in data['files']: fn = os.path.join(gettempdir(), uuid4().hex) download_file(file['url']['name'], file['url']['bucket'], fn) file['url'] = fn files.append(file) proxy = ServerProxy(url) proxy.send(params, files, Config.ret_path) finally: ch.basic_ack(delivery_tag=method.delivery_tag) def pika_task(): global connection global channel_receive channel_receive = connection.channel() channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback) channel_receive.start_consuming() def aud_add(message): if not isinstance(message, list): logger.warning(message) return 'OK' for item in message: logger.warning(item) return 'OK' def auth_response(challenge, server_id, is_server): # logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}') conn = db.connect_db() with Session(conn) as session: try: user = session.query(db.User).filter((db.User.username == server_id)).one() passwd = user.passwd msg = '%s%s%s' % (challenge, server_id, passwd) response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex() session.commit() return {'error': False, 'response': response} except Exception: return {'error': True, 'response': 'Wrong user/bnd'} def auth_challenge(): logging.debug('get challenge') return uuid4().hex def put_object(params, files, url): date = datetime.datetime.now() req = ET.fromstring(params['query_data']) obj = req.find('chart') class_id = obj.get('Class') con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) ws = OODBWorkspace.ws(Config.oodb_schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) logging.info(class_id) fc = ws.featureClass(class_id) feature = fc.createFeature('') geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011())) res = feature.setGeometry(geom) for attr in obj.findall('Attribute'): name = attr.get('name') value = attr.get('value') res &= feature.addAttribute(name, variantFromString(value)) assert len(files) == 1 file = files[0] dir = TemporaryDirectory() with zipfile.ZipFile(file, 'r') as zip_ref: zip_ref.extractall(dir.name) fp = pathlib.Path(dir.name) for item in fp.glob('**/*'): if not item.is_file(): continue fileVal = FileValue() key = uuid4().hex fileVal.fileName = variantToString(item.relative_to(dir.name)) fileVal.key = variantToString(key) fileVal.bucket = variantToString(Config.s3_bucket_itv) res &= feature.addAttribute('c1000', variantFromFileValue(fileVal)) upload_file(str(item), key, Config.s3_bucket_itv) ws.transaction() res = ws.save() ws.commit(f'Putobject from {params["to"]}') ws.clearData(True) def accept(params, files, url): global channel_send files_s3 = [] for file in params['files']: fn = uuid4().hex upload_file(file['url'], fn, Config.s3_bucket) file['url'] = {'name': fn, 'bucket': Config.s3_bucket} files_s3.append(file) data = { 'params': params, 'files': files_s3, 'url': url, } channel_send.basic_publish(exchange=Config.rabbit_send_exchange, body=json.dumps(data)) logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False)) return True def onSent(params, files, callback_url): logger.debug('OnSent') logger.warning(params) def onDelivered(params, files, callback_url): logger.warning('onDelivered') def bnd_connected(bnd_name: str): global channel_send logger.warning(f'{bnd_name} connected') channel_send.basic_publish(exchange=Config.rabbit_status_queue, body=json.dumps({ 'bnd_name': bnd_name, 'status': 'connected' })) connected.add(bnd_name) def bnd_disconnected(bnd_name: str): global channel_send logger.warning(f'{bnd_name} disconnected') channel_send.basic_publish(exchange=Config.rabbit_status_queue, body=json.dumps({ 'bnd_name': bnd_name, 'status': 'disconnected' })) if bnd_name in connected: connected.remove(bnd_name) def xmlrpc_task(): global server server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=True, allow_none=True) server.register_function(aud_add) server.register_function(auth_response) server.register_function(auth_challenge) server.register_function(accept) server.register_function(onSent) server.register_function(onDelivered) server.register_function(bnd_connected) server.register_function(bnd_disconnected) server.serve_forever() app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=['http://10.10.8.79:3000'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'] ) @app.get("/") async def fa_root(): return {"message": "Hello World"} @app.get("/login") async def fa_login_get(response: Response): response.set_cookie(key='sessionid', value='87654321') response.set_cookie(key='csrftoken', value='12345678') return {"login": "Ok", "success": True} @app.post("/login") async def fa_login(response: Response): logger.warning('login action') response.set_cookie(key='sessionid', value='87654321') response.set_cookie(key='csrftoken', value='12345678') return {"login": "Ok", "success": True} @app.post('/webapi') async def fa_webapi(request: Request): body = await request.body() logger.warning('webapi') if 'has_perm' in body.decode('utf-8'): return Response( content='success1', media_type='application/xml') else: return Response( content='1', media_type='application/xml') @app.post("/api/easo/PutObject") async def fa_put_object(response: Response, object_attrs: Annotated[str, Form()], object_file: Annotated[UploadFile, File()] ): date = datetime.datetime.now() files = [{'name': object_file.filename, 'url': object_file.file.name, 'size': object_file.size}] request_params = { 'from': '', 'to': '', 'ts_added': date.timestamp(), 'user_id': '0', 'user_id_to': '0', 'query_type': 6, 'query_data': object_attrs, } put_object(request_params, files, None) return {"Upload": "Ok"} @app.get("/cr") async def correction_replication(bnd_name: str, schema: str): date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex res = rxmls.get_request_document(res_id, None) ET.SubElement(res, 'currentCommit', {'scheme': schema}) params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{bnd_name}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_COMMIT_REQUEST, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } proxy = ServerProxy(Config.enserver) try: proxy.send(params, [], Config.ret_path) except: logger.error('Error sending') def main(): global connection global channel_send global server logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) channel_send = connection.channel() xmlrpc_thread = threading.Thread(target=xmlrpc_task) xmlrpc_thread.start() pika_thread = threading.Thread(target=pika_task) pika_thread.start() try: logger.warning('Start server') uvicorn.run(app, host="0.0.0.0", port=8000) except KeyboardInterrupt: logger.warning('Exiting') finally: server.server_close() def vers_key(e): return e.version() if __name__ == '__main__': main()