import asyncio import pika import sys import threading import time from queue import Queue import datetime import json from uuid import uuid4, UUID from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy import logging import os import os.path import psycopg import requests from reqs.graphql import get_catalog, get_object from pygost import gost34112012256 import xml.etree.ElementTree as ET from reqs.request_xml_service import RequestXmlService import zipfile from libcommon import * from libdatabase import * from libgeodata import * from libgeodriver import * from libgeodesy import * from libgeom import * from libipdutilities import * from liboodriver import * from config import Config from zip import Zip tasks = Queue() connected = False logger = logging.getLogger('xmlrpcserver') def run_tasks(): logger.debug('Task thread started.') while True: task = tasks.get() task() def replication_task(): return while not connected: time.sleep(1) date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex res = rxmls.get_request_document(res_id, None) res.set('replication_package', '1') rxmls.set_result(res, 0, '') response_params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{Config.remote_bnd}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': 1114, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = '51a8a2c81f774af7bba61b475b4b51b5' filepath = '/tmp/' + filename response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] logging.debug('Send replication package') proxy = ServerProxy(Config.enserver) proxy.send(response_params, response_files, Config.ret_path) def pika_callback(ch, method, properties, body): date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex res = rxmls.get_request_document(res_id, None) rxmls.set_result(res, 0, '') response_params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{Config.remote_bnd}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': 99, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) ws = OODBWorkspace.ws(Config.oodb_schema) ws.init(con) oe = OsmExporter(ws) # commit = '1de21737-09dc-4a45-b7d9-7b11044ee487' z = Zip() nc = oe.nextCommit(body) oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), body, nc) ws.close() filepath = z.pack() response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] logger.warning(response_files) logging.debug('Send replication package') proxy = ServerProxy(Config.enserver) proxy.send(response_params, response_files, Config.ret_path) ch.basic_ack(delivery_tag=method.delivery_tag) def pika_task(): connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) channel = connection.channel() channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) channel.start_consuming() def list_contents(dir_name): logger.warning('list_contents(%s)', dir_name) return os.listdir(dir_name) def aud_add(message): global connected if not isinstance(message, list): logger.warning(message) return 'OK' for item in message: logger.warning(item) if item.get('level', -1) == 0 and item.get('type', -1) == 4002: connected = True return 'OK' def auth_response(challenge, server_id, is_server): # logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}') with psycopg.connect(f'host={Config.pg_host} port={Config.pg_port} dbname={Config.pg_dbname} ' f'user={Config.pg_username} password={Config.pg_password}') as conn: with conn.cursor() as cur: cur.execute('SELECT passwd FROM users where username = %s', (server_id,)) passwd = cur.fetchone() if not passwd: return {'error': True, 'response': 'Wrong user/bnd'} msg = '%s%s%s' % (challenge, server_id, passwd) response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex() return {'error': False, 'response': response} def auth_challenge(): logging.debug('get challenge') return uuid4().hex def restore_uuid(oid): uuid = UUID(oid) return str(uuid) def load_catalog(params, files, url): logger.warning('load_catalog') date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1004, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } catalog = get_catalog() logging.debug('Catalog_loaded') filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") zipf.writestr('WF.CLL', catalog) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def get_objects(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('objects') uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1001, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") main_filename = None for uid in uids: obj = json.loads(get_object(uid)) for file in obj['properties'].get('c1000', []): if not main_filename: main_filename = file['fileName'] res = requests.get(f'https://gql.ivazh.ru/item/{file["key"]}') zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def get_metadata(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('getMetadataByIds') uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1024, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") content = ET.Element('getMetadataResponse') for uid in uids: obj = json.loads(get_object(uid)) date = datetime.datetime.fromisoformat(obj['date_updated']) chart = ET.SubElement(content, 'chart', { 'id': UUID(obj['uid']).hex, 'updated': str(date.timestamp()), }) for key in obj['properties']: if not key.startswith('c'): continue mdel = ET.SubElement(chart, 'mdItem', { 'code': key.replace('_', '.'), 'name': key, 'value': str(obj['properties'].get(key, '')), 'isBase': 'false', 'groupId': '', 'groupName': '', }) zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True)) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def run_task(query_type, params, files, url): if query_type == 4: tasks.put(lambda: load_catalog(params, files, url)) if query_type == 1: tasks.put(lambda: get_objects(params, files, url)) if query_type == 24: tasks.put(lambda: get_metadata(params, files, url)) def accept(params, files, url): print(params, files, url) logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False)) run_task(params['query_type'], params, files, url) return True def onSent(params, files, callback_url): logger.debug('OnSent') logger.warning(params) def onDelivered(params, files, callback_url): logger.warning('onDelivered') def main(): logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=False, allow_none=True) server.register_function(list_contents) server.register_function(aud_add) server.register_function(auth_response) server.register_function(auth_challenge) server.register_function(accept) server.register_function(onSent) server.register_function(onDelivered) thread = threading.Thread(target=run_tasks) thread.start() replication_thread = threading.Thread(target=replication_task) replication_thread.start() pika_thread = threading.Thread(target=pika_task) pika_thread.start() try: logger.warning('Start server') server.serve_forever() except KeyboardInterrupt: logger.warning('Exiting') def test(): #params = {"from": "tcp://kptsp_vb", "query_data": "
", "query_type": 24, "to": "tcp://bnd127", "user_id": "3302", "ts_added": 1679825320.653038} #files = [] #url = 'http://127.0.0.1:7000/xmlrpc' # accept(params, files, url) #get_metadata(params, files, url) #get_catalog() auth_response('123', 'bnd127', False) pass if __name__ == '__main__': main() # test()