from tempfile import TemporaryDirectory from typing import Optional, Any import pika import threading import time from queue import Queue import datetime import json from uuid import uuid4, UUID from xmlrpc.server import SimpleXMLRPCServer from xmlrpc.client import ServerProxy import logging import io import zlib import os.path import requests from reqs_graphql import get_catalog, get_object from pygost import gost34112012256 import xml.etree.ElementTree as ET from reqs.request_xml_service import RequestXmlService import zipfile from config import Config from zip import Zip import boto3 import db from sqlalchemy.orm import Session from fastapi import FastAPI, Response, Form, UploadFile, File, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn from typing_extensions import Annotated import pathlib from infra import * from shutil import make_archive NEW_REPLICATION_REQUEST = 99 NEW_COMMIT_REQUEST = 98 NEW_COMMIT_RESPONSE = 1098 NEW_DATA_REQUEST = 97 NEW_DATA_RESPONSE = 1097 tasks = Queue() connected = set() connection: Optional[pika.BlockingConnection] = None channel: Optional[Any] = None server: Optional[SimpleXMLRPCServer] = None logger = logging.getLogger('xmlrpcserver') def s3_connection(): return boto3.client('s3', endpoint_url=Config.s3_endpoint, aws_access_key_id=Config.s3_key_id, aws_secret_access_key=Config.s3_access_key) def download_file(key: str, bucket: str, filename: str): client = s3_connection() obj = client.get_object(Bucket=bucket, Key=key) with open(f"{filename}", 'wb') as f: for chunk in obj['Body'].iter_chunks(chunk_size=4096): f.write(chunk) def upload_file(filename: str, key: str, bucket: str): client = s3_connection() with open(filename, 'rb') as f: client.put_object(Body=f.read(), Bucket=bucket, Key=key) def get_branch(bndname: str, scheme: str): conn = db.connect_db() with Session(conn) as session: item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none() if item: return item.branch, item.local_scheme return None, None def get_profile(bndname: str, scheme: str): conn = db.connect_db() with Session(conn) as session: item = session.query(db.Profile).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none() if item: return item return None def run_tasks(): logger.debug('Task thread started.') while True: task = tasks.get() task() def replication_task(): while True: conn = db.connect_db() # logging.warning(connected) with Session(conn) as session: for item in session.query(db.Queue).join(db.Queue.user).all(): bndname = item.user.bndname if item.user.is_active_now() and bndname != Config.self_bnd: if item.user.newbnd: replication(bndname, item.commit_id, item.schema) else: replication_old(bndname, item.commit_id, item.schema) session.delete(item) session.commit() time.sleep(60) def process_chart(ws, chart, chart_class): for x in chart.get('properties') or {}: attr = ws.attribute(x) if not attr: continue if str(attr.type()) == 'AT_Domain': dom = attr.domain() key = str(chart['properties'][x]) chart['properties'][x] = variantToString(dom.value(variantFromString(key))) if x == 'c103': chart['properties'][x] = 'Несекретно' chart['class'] = chart_class['name'].split('_')[-1] return chart def crc32(filename, chunk_size=65536): """Compute the CRC-32 checksum of the contents of the given filename""" with open(filename, "rb") as f: checksum = 0 while chunk := f.read(chunk_size): checksum = zlib.crc32(chunk, checksum) return "%08X" % (checksum & 0xFFFFFFFF) def send_object_replication(bndname, ws, ids): qu = GroupQuery(Envelope()) query = GroupQuery(Envelope.world()) query.setUids(ids) uids = [] ws.load(query, uids) res = json.loads(ws.dataToJson()) charts = [process_chart(ws, f, x) for x in res for f in x['features']] for chart in charts: logger.warning('\n') date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex res = rxmls.get_request_document(res_id, None) res.set('replication_package', '1') res.set('replication_version', date.strftime('%Y%m%d%H%M%S')) res.set('user_permit', 'AA0AA00020200726D3E75C80B713A7A3D3E75C80B713A7A363F7CB889AA3F520') rxmls.set_result(res, 0, '') print(chart) properties = chart.get('properties') or {} c1000 = properties.get('c1000') if not c1000: logger.warning(f'No file for {chart["uid"].replace("-", "")}') continue z = Zip() xml_objects = ET.SubElement(res, 'objects') created_date = datetime.datetime.fromisoformat(chart.get('date_created')) xml_chart = ET.SubElement(res, 'chart', { 'Class': chart['class'], 'ID': chart['uid'].replace('-', ''), 'Name': c1000[0]['fileName'], 'Type': 'OOD', 'metadata_version': '1', 'source': Config.self_bnd, 'system_date': str(created_date.timestamp()), }) xml_version = ET.SubElement(xml_objects, 'version', { 'object_id': chart['uid'].replace('-', ''), 'source': Config.self_bnd, 'system_date': str(created_date.timestamp()), 'version': '1.0', 'version_id': chart['uid'].replace('-', ''), }) total_size = 0 for file in c1000: directory = os.path.join(z.dirname, f'maps/{res_id}/ENC_ROOT/{c1000[0]["fileName"]}') fp = os.path.join(directory, file['fileName']) if not os.path.exists(directory): os.makedirs(directory) download_file(file['key'], Config.s3_bucket, fp) size = os.stat(fp).st_size _crc32 = crc32(fp) ET.SubElement(xml_version, 'file', { 'cell_file': 'false', 'crc32': _crc32, 'crc32_enc': _crc32, 'file_id': file['key'].replace('-', ''), 'file_ref': os.path.join(f'maps/{res_id}/ENC_ROOT', file['fileName']), 'file_size': str(size), 'file_size_enc': str(size), }) xml_version.set('crc32', _crc32) xml_version.set('crc32', _crc32) total_size += size xml_version.set('size', str(total_size)) xml_version.set('crc32', str(total_size)) xml_tags = ET.SubElement(res, 'tags') xml_archs = ET.SubElement(res, 'archs') xml_arch = ET.SubElement(xml_archs, 'arch', { 'obj_id': chart['uid'].replace('-', ''), 'ver_cl': '8.31', 'ver_id': chart['uid'].replace('-', ''), }) for attribute in properties: if attribute.startswith('c'): ET.SubElement(xml_chart, 'Attribute', { 'name': attribute.replace('_', '.'), 'value': str(chart['properties'][attribute]), }) ET.SubElement(xml_arch, 'attr', { 'code': attribute.replace('_', '.'), 'name': attribute.replace('_', '.'), 'value': str(chart['properties'][attribute]), }) params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{bndname}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_REPLICATION_REQUEST, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } filepath = z.pack() response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] logger.warning(response_files) logging.debug('Send replication package') proxy = ServerProxy(Config.enserver) try: proxy.send(params, response_files, Config.ret_path) except: logger.error('Error sending') def replication_old(bnd_name: str, commit_id: str, schema: str): logger.warning('Start replication') if schema != Config.oodb_schema: return con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) created, updated, _ = ws.changes(commit_id) ids = list(set(created) | set(updated)) send_object_replication(bnd_name, ws, ids) ws.clearData(True) logger.warning('Replication to old bnd is sent') def replication(bnd_name: str, commit_id: str, schema: str): logging.warning(f'{bnd_name} {commit_id} {schema}') date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex profile = get_profile(bndname=bnd_name, scheme=schema) res = rxmls.get_request_document(res_id, None) rxmls.set_result(res, 0, '') ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema}) response_params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{bnd_name}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_REPLICATION_REQUEST, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) oe = IpdExporter(ws) # commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"} # commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"} # commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"} # commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"} z = Zip() pc = oe.previousCommit(commit_id) oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id) oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id) created, updated, deleted = ws.changes(commit_id) qu = GroupQuery(Envelope()) qu.setUids(updated) qu.setLoadArch(True) uids = [] ws.clearData(True) ws.load(qu, uids) updated_files = [] for feature_uid in uids: not_files = True vers = ws.featureVersion(feature_uid) if len(vers) > 1: vers.sort(key=vers_key) not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) if not not_files: for attr in vers[0].feature().attributes('c1000'): updated_files.append(variantToFileValue(attr.val())) # print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}') ws.clearData(True) qu = GroupQuery(Envelope()) qu.setUids(created) ws.load(qu, uids) exported_files = [] if not profile or not profile.no_files: for feature_uid in uids: feature = ws.featureByUid(feature_uid) if not feature: continue for attr in feature.attributes('c1000'): updated_files.append(variantToFileValue(attr.val())) # updated_files.append(feature_uid) for x in updated_files: exported_files.append({ 'key': x.key, 'bucket': x.bucket, 'filename': x.fileName, }) fp = os.path.join(z.dirname, x.key) os.makedirs(os.path.dirname(fp), exist_ok=True) download_file(x.key, x.bucket, fp) with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f: f.write(json.dumps(exported_files)) ws.clearData(True) # ws.close() filepath = z.pack() response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] logger.warning(response_files) logging.debug('Send replication package') proxy = ServerProxy(Config.enserver) try: proxy.send(response_params, response_files, Config.ret_path) except: logger.error('Error sending') def pika_callback(ch, method, properties, body): commit_info = json.loads(body) logging.warning(commit_info) schema = commit_info.get('schema') or Config.oodb_schema commit = commit_info['commit'] conn = db.connect_db() with Session(conn) as session: for user in session.query(db.User).filter(db.User.active == True).all(): if user.bndname == Config.self_bnd: continue profiles = {x.scheme: x.to_dict() for x in user.profiles} if len(profiles) == 0 or schema in profiles: item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) logging.warning(item) session.add(item) session.commit() ch.basic_ack(delivery_tag=method.delivery_tag) def pika_task(): global connection global channel connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) channel = connection.channel() channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) channel.start_consuming() def list_contents(dir_name): logger.warning('list_contents(%s)', dir_name) return os.listdir(dir_name) def aud_add(message): if not isinstance(message, list): logger.warning(message) return 'OK' for item in message: logger.warning(item) return 'OK' def auth_response(challenge, server_id, is_server): # logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}') conn = db.connect_db() with Session(conn) as session: try: user = session.query(db.User).filter((db.User.username == server_id)).one() passwd = user.passwd msg = '%s%s%s' % (challenge, server_id, passwd) response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex() session.commit() return {'error': False, 'response': response} except Exception: return {'error': True, 'response': 'Wrong user/bnd'} def auth_challenge(): logging.debug('get challenge') return uuid4().hex def restore_uuid(oid): uuid = UUID(oid) return str(uuid) def load_catalog(params, files, url): logger.warning('load_catalog') date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1004, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } catalog = get_catalog() logging.debug('Catalog_loaded') filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") zipf.writestr('WF.CLL', catalog) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def get_objects(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('objects') uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1001, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") main_filename = None for uid in uids: obj = json.loads(get_object(uid)) for file in obj['properties'].get('c1000', []): if not main_filename: main_filename = file['fileName'] res = requests.get(Config.gql_download, params={'item_id': file["key"]}) zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def get_metadata(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') objs = req.find('getMetadataByIds') uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')] rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': 1024, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = uuid4().hex filepath = '/tmp/' + filename zipf = zipfile.ZipFile(filepath, "w") content = ET.Element('getMetadataResponse') for uid in uids: obj = json.loads(get_object(uid)) date = datetime.datetime.fromisoformat(obj['date_updated']) chart = ET.SubElement(content, 'chart', { 'id': UUID(obj['uid']).hex, 'updated': str(date.timestamp()), }) for key in obj['properties']: if not key.startswith('c'): continue mdel = ET.SubElement(chart, 'mdItem', { 'code': key.replace('_', '.'), 'name': key, 'value': str(obj['properties'].get(key, '')), 'isBase': 'false', 'groupId': '', 'groupName': '', }) zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True)) zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def put_object(params, files, url): date = datetime.datetime.now() req = ET.fromstring(params['query_data']) obj = req.find('chart') class_id = obj.get('Class') con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) ws = OODBWorkspace.ws(Config.oodb_schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) logging.info(class_id) fc = ws.featureClass(class_id) feature = fc.createFeature('') geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011())) res = feature.setGeometry(geom) for attr in obj.findall('Attribute'): name = attr.get('name') value = attr.get('value') res &= feature.addAttribute(name, variantFromString(value)) assert len(files) == 1 file = files[0] dir = TemporaryDirectory() with zipfile.ZipFile(file, 'r') as zip_ref: zip_ref.extractall(dir.name) fp = pathlib.Path(dir.name) for item in fp.glob('**/*'): if not item.is_file(): continue fileVal = FileValue() key = uuid4().hex fileVal.fileName = variantToString(item.relative_to(dir.name)) fileVal.key = variantToString(key) fileVal.bucket = variantToString(Config.s3_bucket) res &= feature.addAttribute('c1000', variantFromFileValue(fileVal)) upload_file(str(item), key, Config.s3_bucket) ws.transaction() res = ws.save() ws.commit(f'Putobject from {params["to"]}') ws.clearData(True) def apply_commits(params, files, url): logging.warning("Apply commits") logging.warning(params) assert len(files) == 1 file = files[0] dir = TemporaryDirectory() r = requests.get(file['url']) with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref: zip_ref.extractall(dir.name) req = ET.fromstring(params['query_data']) repl = req.find('replication') scheme = repl.get('scheme') commit = repl.get('id') bnd = params['from'].replace('tcp://', '') branch, new_scheme = get_branch(bnd, scheme) if new_scheme: scheme = new_scheme con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, scheme) ws = OODBWorkspace.ws(scheme) logging.warning("Connected to schema") if Config.ws_rabbit_params and not ws.hasProducer(): ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'], Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'], Config.ws_rabbit_params['password']) ws.setCurrentUser(bnd) logging.warning("Set branch if needed") if branch: logging.warning(branch) ws.switchBranch(branch) if not ws.isInit(): res = ws.init(con) logger.warning(res) oe = IpdExporter(ws) logging.warning("Importing...") if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')): logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}') else: logging.warning(f'Importing commit {commit} finished successfully') with open(os.path.join(dir.name, 'export_files.json'), 'r') as f: files_data = json.load(f) for file_data in files_data: upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket']) logging.warning("Finished import") ws.clearData(True) def get_commit(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res_doc = rxmls.get_request_document(res_id, req_id) schema = req.find('currentCommit').get('scheme') con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema}) response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': 0, 'query_type': NEW_COMMIT_RESPONSE, 'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True) } proxy = ServerProxy(url) proxy.send(response_params, [], Config.ret_path) def query_commits(params, files, url): req = ET.fromstring(params['query_data']) commit_el = req.find('commit') commit_id = commit_el.get('id') schema = commit_el.get('schema') bnd = params['from'].replace('tcp://', '') con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) schema_commits = ws.commits(ws.branch()) logger.warning(schema_commits) if commit_id not in schema_commits: logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') return logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) conn = db.connect_db() with Session(conn) as session: for commit in schema_commits[schema_commits.index(commit_id) + 1:]: for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): if user.bndname == Config.self_bnd: continue profiles = {x.scheme: x.to_dict() for x in user.profiles} if len(profiles) == 0 or schema in profiles: item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) logging.warning(item) session.add(item) session.commit() def get_data(params, files, url): date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) req_id = rxmls.get_request_uuid(req) res_id = uuid4().hex res = rxmls.get_request_document(res_id, req_id) rxmls.set_result(res, 0, '') request_string = req.find('data').text OODBWorkspaceFactory.init('config/workspaces.json') conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json') worker = ResponseWorker(conf) fn = uuid4().hex dir = os.path.join(os.getcwd(), 'tmp', fn) os.makedirs(dir) worker.makeResponse(request_string, dir) make_archive(dir, 'zip', dir, '.') rxmls.set_result(res, 0, '') response_params = { 'from': params['to'], 'to': params['from'], 'ts_added': date.timestamp(), 'user_id': '1', 'user_id_to': params['user_id'], 'query_type': NEW_DATA_RESPONSE, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True) } filename = fn + '.zip' filepath = os.path.join(os.getcwd(), 'tmp', filename) response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) proxy.send(response_params, response_files, Config.ret_path) def receive_data(params, files, url): req = ET.fromstring(params['query_data']) commit_el = req.find('commit') commit_id = commit_el.get('id') schema = commit_el.get('schema') bnd = params['from'].replace('tcp://', '') con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) schema_commits = ws.commits(ws.branch()) logger.warning(schema_commits) if commit_id not in schema_commits: logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') return logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) conn = db.connect_db() with Session(conn) as session: for commit in schema_commits[schema_commits.index(commit_id) + 1:]: for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): if user.bndname == Config.self_bnd: continue profiles = {x.scheme: x.to_dict() for x in user.profiles} if len(profiles) == 0 or schema in profiles: item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) logging.warning(item) session.add(item) session.commit() def run_task(query_type, params, files, url): if query_type == 4: tasks.put(lambda: load_catalog(params, files, url)) if query_type == 1: tasks.put(lambda: get_objects(params, files, url)) if query_type == 24: tasks.put(lambda: get_metadata(params, files, url)) if query_type == 6: tasks.put(lambda: put_object(params, files, url)) if query_type == NEW_REPLICATION_REQUEST: tasks.put(lambda: apply_commits(params, files, url)) if query_type == NEW_COMMIT_REQUEST: tasks.put(lambda: get_commit(params, files, url)) if query_type == NEW_COMMIT_RESPONSE: tasks.put(lambda: query_commits(params, files, url)) if query_type == NEW_DATA_REQUEST: tasks.put(lambda: get_data(params, files, url)) if query_type == NEW_DATA_RESPONSE: tasks.put(lambda: receive_data(params, files, url)) def accept(params, files, url): print(params, files, url) logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False)) run_task(params['query_type'], params, files, url) return True def onSent(params, files, callback_url): logger.debug('OnSent') logger.warning(params) def onDelivered(params, files, callback_url): logger.warning('onDelivered') def bnd_connected(bnd_name: str): logger.warning(f'{bnd_name} connected') connected.add(bnd_name) def bnd_disconnected(bnd_name: str): logger.warning(f'{bnd_name} disconnected') if bnd_name in connected: connected.remove(bnd_name) def xmlrpc_task(): global server server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=True, allow_none=True) server.register_function(list_contents) server.register_function(aud_add) server.register_function(auth_response) server.register_function(auth_challenge) server.register_function(accept) server.register_function(onSent) server.register_function(onDelivered) server.register_function(bnd_connected) server.register_function(bnd_disconnected) server.serve_forever() app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=['http://10.10.8.24:3000'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'] ) @app.get("/") async def fa_root(): return {"message": "Hello World"} @app.get("/login") async def fa_login_get(response: Response): response.set_cookie(key='sessionid', value='87654321') response.set_cookie(key='csrftoken', value='12345678') return {"login": "Ok", "success": True} @app.post("/login") async def fa_login(response: Response): logger.warning('login action') response.set_cookie(key='sessionid', value='87654321') response.set_cookie(key='csrftoken', value='12345678') return {"login": "Ok", "success": True} @app.post('/webapi') async def fa_webapi(request: Request): body = await request.body() logger.warning('webapi') if 'has_perm' in body.decode('utf-8'): return Response(content='success1', media_type='application/xml') else: return Response(content='1', media_type='application/xml') @app.post("/api/easo/PutObject") async def fa_put_object(response: Response, object_attrs: Annotated[str, Form()], object_file: Annotated[UploadFile, File()] ): date = datetime.datetime.now() files = [{'name': object_file.filename, 'url': object_file.file.name, 'size': object_file.size}] request_params = { 'from': '', 'to': '', 'ts_added': date.timestamp(), 'user_id': '0', 'user_id_to': '0', 'query_type': 6, 'query_data': object_attrs, } put_object(request_params, files, None) return {"Upload": "Ok"} @app.get("/cr") async def correction_replication(bnd_name: str, schema: str): date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex res = rxmls.get_request_document(res_id, None) ET.SubElement(res, 'currentCommit', {'scheme': schema}) params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{bnd_name}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_COMMIT_REQUEST, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } proxy = ServerProxy(Config.enserver) try: proxy.send(params, [], Config.ret_path) except: logger.error('Error sending') @app.get("/get_cr") async def correction_replication(bnd_name: str, schema: str): date = datetime.datetime.now() rxmls = RequestXmlService() res_id = uuid4().hex res = rxmls.get_request_document(res_id, None) con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, Config.oodb_username, Config.oodb_passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) ET.SubElement(res, 'commit', {'id': ws.currentCommit(), 'schema': schema}) params = { 'from': f'tcp://{Config.self_bnd}', 'to': f'tcp://{bnd_name}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_COMMIT_RESPONSE, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } proxy = ServerProxy(Config.enserver) try: proxy.send(params, [], Config.ret_path) except: logger.error('Error sending') def main(): global connection global server global channel logger.setLevel(logging.INFO) logger.warning('Use Control-C to exit') xmlrpc_thread = threading.Thread(target=xmlrpc_task) xmlrpc_thread.start() thread = threading.Thread(target=run_tasks) thread.start() replication_thread = threading.Thread(target=replication_task) replication_thread.start() pika_thread = threading.Thread(target=pika_task) pika_thread.start() try: logger.warning('Start server') uvicorn.run(app, host="0.0.0.0", port=8000) except KeyboardInterrupt: logger.warning('Exiting') finally: server.server_close() def vers_key(e): return e.version() def test(): # with open('/home/ashatora/catalog.json') as f: # j = json.load(f) # j1 = json.loads(j[1]['data']) # with open('/home/ashatora/cat.json', 'w') as f: # f.write(json.dumps(j1, indent=2, ensure_ascii=False)) # return #params = {"from": "tcp://kptsp_vb", "query_data": "
", "query_type": 24, "to": "tcp://bnd127", "user_id": "3302", "ts_added": 1679825320.653038} #files = [] #url = 'http://127.0.0.1:7000/xmlrpc' # accept(params, files, url) #get_metadata(params, files, url) #get_catalog() # auth_response('123', 'bnd127', False) # con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, # Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) # ws = OODBWorkspace.ws(Config.oodb_schema) # ws.init(con) # print(ws.currentCommit()) # print(ws.commits(ws.branch())) # created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be') # qu = GroupQuery(Envelope()) # qu.setUids(updated) # qu.setLoadArch(True) # uids = [] # ws.clearData(True) # ws.load(qu, uids) # updated_files = [] # for feature_uid in uids: # not_files = True # vers = ws.featureVersion(feature_uid) # if len(vers) > 1: # vers.sort(key=vers_key) # not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) # if not not_files: # updated_files.append(feature_uid) # print(updated_files) # ws.clearData(True) # ws.close() #replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src') request_string = """ { "type": "group_query", "data_source": "npd", "uids": [ "2e20130c-541a-4b9f-9efb-f2a0e8b10c33" ] } """ OODBWorkspaceFactory.init('config/workspaces.json') conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json') worker = ResponseWorker(conf) fn = uuid4().hex dir = os.path.join('tmp', fn) os.makedirs(dir) worker.makeResponse(request_string, dir) make_archive(dir, 'zip', dir, '.') pass if __name__ == '__main__': main() # test()