import asyncio from tempfile import TemporaryDirectory import pika import sys import threading import time from queue import Queue import datetime import json from uuid import uuid4, UUID from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy import logging import os import os.path import requests from reqs.graphql import get_catalog, get_object from pygost import gost34112012256 import xml.etree.ElementTree as ET from reqs.request_xml_service import RequestXmlService import zipfile from infra import * from config import Config from zip import Zip import boto3 import db from sqlalchemy.orm import Session NEW_REPLICATION_REQUEST = 99 tasks = Queue() connected = set() logger = logging.getLogger('xmlrpcserver') def s3_connection(): return boto3.client('s3', endpoint_url=Config.s3_endpoint, aws_access_key_id=Config.s3_key_id, aws_secret_access_key=Config.s3_access_key) def download_file(key: str, bucket: str, filename: str): client = s3_connection() obj = client.get_object(Bucket=bucket, Key=key) with open(f"{filename}", 'wb') as f: for chunk in obj['Body'].iter_chunks(chunk_size=4096): f.write(chunk) def upload_file(filename: str, key: str, bucket: str): client = s3_connection() with open(filename, 'rb') as f: client.put_object(Body=f.read(), Bucket=bucket, Key=key) def run_tasks(): logger.debug('Task thread started.') while True: task = tasks.get() task() def replication_task(): while True: print() conn = db.connect_db() with (Session(conn) as session): for bndname in connected: for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all(): if item.user.is_active_now(): replication(bndname, item.commit_id, item.schema) session.delete(item) session.commit() time.sleep(60) def replication(bnd_name: str, commit_id: str, schema: str): date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex res = rxmls.get_request_document(res_id, None) rxmls.set_result(res, 0, '') ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema}) response_params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{Config.remote_bnd}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_REPLICATION_REQUEST, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) oe = IpdExporter(ws) # commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"} # commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"} # commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"} # commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"} z = Zip() pc = oe.previousCommit(commit_id) oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id) oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id) created, updated, deleted = ws.changes(commit_id) qu = GroupQuery(Envelope()) qu.setUids(updated) qu.setLoadArch(True) uids = [] ws.clearData(True) ws.load(qu, uids) updated_files = [] for feature_uid in uids: not_files = True vers = ws.featureVersion(feature_uid) if len(vers) > 1: vers.sort(key=vers_key) not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) if not not_files: for attr in vers[0].feature().attributes('c1000'): updated_files.append(variantToFileValue(attr.val())) # print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}') ws.clearData(True) qu = GroupQuery(Envelope()) qu.setUids(created) ws.load(qu, uids) exported_files = [] for feature_uid in uids: feature = ws.featureByUid(feature_uid) if not feature: continue for attr in feature.attributes('c1000'): updated_files.append(variantToFileValue(attr.val())) # updated_files.append(feature_uid) for x in updated_files: exported_files.append({ 'key': x.key, 'bucket': x.bucket, 'filename': x.fileName, }) download_file(x.key, x.bucket, os.path.join(z.dirname, x.key)) with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f: f.write(json.dumps(exported_files)) ws.clearData(True) # ws.close() filepath = z.pack() response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] logger.warning(response_files) logging.debug('Send replication package') proxy = ServerProxy(Config.enserver) try: proxy.send(response_params, response_files, Config.ret_path) except: logger.error('Error sending') def pika_callback(ch, method, properties, body): commit_info = json.loads(body) schema = commit_info.get('schema') or Config.oodb_schema commit = commit_info['commit'] conn = db.connect_db() with Session(conn) as session: for user in session.query(db.User).filter(db.User.active == True, db.User.upstream == False).all(): item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) session.add(item) session.commit() ch.basic_ack(delivery_tag=method.delivery_tag) def pika_task(): connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) channel = connection.channel() channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) channel.start_consuming() def list_contents(dir_name): logger.warning('list_contents(%s)', dir_name) return os.listdir(dir_name) def aud_add(message): if not isinstance(message, list): logger.warning(message) return 'OK' for item in message: logger.warning(item) return 'OK' def auth_response(challenge, server_id, is_server): # logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}') conn = db.connect_db() with Session(conn) as session: try: user = session.query(db.User).filter((db.User.username == server_id)).one() passwd = user.passwd msg = '%s%s%s' % (challenge, server_id, passwd) response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex() session.commit() return {'error': False, 'response': response} except Exception: return {'error': True, 'response': 'Wrong user/bnd'} def auth_challenge(): logging.debug('get challenge') return uuid4().hex def restore_uuid(oid): uuid = UUID(oid) return str(uuid) def load_catalog(params, files, url): logger.warning('load_catalog') date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1004, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } catalog = get_catalog() logging.debug('Catalog_loaded') filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") zipf.writestr('WF.CLL', catalog) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def get_objects(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('objects') uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1001, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") main_filename = None for uid in uids: obj = json.loads(get_object(uid)) for file in obj['properties'].get('c1000', []): if not main_filename: main_filename = file['fileName'] res = requests.get(f'https://gql.ivazh.ru/item/{file["key"]}') zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def get_metadata(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('getMetadataByIds') uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1024, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") content = ET.Element('getMetadataResponse') for uid in uids: obj = json.loads(get_object(uid)) date = datetime.datetime.fromisoformat(obj['date_updated']) chart = ET.SubElement(content, 'chart', { 'id': UUID(obj['uid']).hex, 'updated': str(date.timestamp()), }) for key in obj['properties']: if not key.startswith('c'): continue mdel = ET.SubElement(chart, 'mdItem', { 'code': key.replace('_', '.'), 'name': key, 'value': str(obj['properties'].get(key, '')), 'isBase': 'false', 'groupId': '', 'groupName': '', }) zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True)) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def apply_commits(params, files, url): logger.warning(params, files, url) assert len(files) == 1 file = files[0] dir = TemporaryDirectory[str] with zipfile.ZipFile(file, 'r') as zip_ref: zip_ref.extractall(dir.name) req = ET.fromstring(params['query_data']) repl = req.find('replication') scheme = repl.get('scheme') commit = repl.get('id') logger.warning(scheme, commit) os.path.join(dir.name, 'export.o5c') con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, scheme) ws = OODBWorkspace.ws(scheme) if not ws.isInit(): res = ws.init(con) logger.warning(res) oe = IpdExporter(ws) oe.improtFromOsm(os.path.join(dir.name, 'export.o5c')) oe.improtFromMbtiles(os.path.join(dir.name, 'export.mbtiles')) with open('export_files.json', 'r') as f: files_data = json.load(f) for file_data in files_data: upload_file(file_data['filename'], file_data['key'], file_data['bucket']) def run_task(query_type, params, files, url): if query_type == 4: tasks.put(lambda: load_catalog(params, files, url)) if query_type == 1: tasks.put(lambda: get_objects(params, files, url)) if query_type == 24: tasks.put(lambda: get_metadata(params, files, url)) if query_type == NEW_REPLICATION_REQUEST: tasks.put(lambda: apply_commits(params, files, url)) def accept(params, files, url): print(params, files, url) logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False)) run_task(params['query_type'], params, files, url) return True def onSent(params, files, callback_url): logger.debug('OnSent') logger.warning(params) def onDelivered(params, files, callback_url): logger.warning('onDelivered') def bnd_connected(bnd_name: str): logger.warning(f'{bnd_name} connected') connected.add(bnd_name) def bnd_disconnected(bnd_name: str): logger.warning(f'{bnd_name} disconnected') if bnd_name in connected: connected.remove(bnd_name) def main(): logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=False, allow_none=True) server.register_function(list_contents) server.register_function(aud_add) server.register_function(auth_response) server.register_function(auth_challenge) server.register_function(accept) server.register_function(onSent) server.register_function(onDelivered) server.register_function(bnd_connected) server.register_function(bnd_disconnected) thread = threading.Thread(target=run_tasks) thread.start() replication_thread = threading.Thread(target=replication_task) replication_thread.start() pika_thread = threading.Thread(target=pika_task) pika_thread.start() try: logger.warning('Start server') server.serve_forever() except KeyboardInterrupt: logger.warning('Exiting') def vers_key(e): return e.version() def test(): #params = {"from": "tcp://kptsp_vb", "query_data": "
", "query_type": 24, "to": "tcp://bnd127", "user_id": "3302", "ts_added": 1679825320.653038} #files = [] #url = 'http://127.0.0.1:7000/xmlrpc' # accept(params, files, url) #get_metadata(params, files, url) #get_catalog() # auth_response('123', 'bnd127', False) con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) ws = OODBWorkspace.ws(Config.oodb_schema) ws.init(con) created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be') qu = GroupQuery(Envelope()) qu.setUids(updated) qu.setLoadArch(True) uids = [] ws.clearData(True) ws.load(qu, uids) updated_files = [] for feature_uid in uids: not_files = True vers = ws.featureVersion(feature_uid) if len(vers) > 1: vers.sort(key=vers_key) not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) if not not_files: updated_files.append(feature_uid) print(updated_files) ws.clearData(True) ws.close() pass if __name__ == '__main__': main() # test()