From cf7990e96ad0c5fed00ada855163901dc356ab95 Mon Sep 17 00:00:00 2001 From: Ivan Vazhenin Date: Mon, 17 Jul 2023 20:18:05 +0300 Subject: [PATCH] Add new bnd replication request --- .gitignore | 3 +- config.py | 22 +++++++++ main.py | 119 +++++++++++++++++++++++++++++++++++++---------- requirements.txt | 19 ++++++-- zip.py | 23 +++++++++ 5 files changed, 158 insertions(+), 28 deletions(-) create mode 100644 config.py create mode 100644 zip.py diff --git a/.gitignore b/.gitignore index 757fee3..9e6c9c9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/.idea \ No newline at end of file +/.idea +/venv \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..d0ad7dc --- /dev/null +++ b/config.py @@ -0,0 +1,22 @@ + +class Config: + ret_path: str = 'http://10.10.8.81:9000/' + self_bnd: str = 'bnd127' + enserver: str = 'http://127.0.0.1:7000/xmlrpc' + remote_bnd: str = 'bnd128' + + pg_host: str = '10.10.8.83' + pg_port: int = 32101 + pg_dbname: str = 'db' + pg_username: str = 'postgres' + pg_password: str = 'Root12345678' + + oodb_host: str = '10.10.8.83' + oodb_port: int = 32100 + oodb_dbname: str = 'db' + oodb_username: str = 'postgres' + oodb_passwd: str = 'Root12345678' + oodb_schema: str = 'ood' + + rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f' + rabbit_queue: str = 'ipd' \ No newline at end of file diff --git a/main.py b/main.py index 08570da..59bc796 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,6 @@ import asyncio +import pika +import sys import threading import time from queue import Queue @@ -10,26 +12,41 @@ from xmlrpc.client import ServerProxy import logging import os import os.path + +import psycopg import requests from reqs.graphql import get_catalog, get_object from pygost import gost34112012256 import xml.etree.ElementTree as ET from reqs.request_xml_service import RequestXmlService import zipfile +from libcommon import * +from libdatabase import * +from libgeodata import * +from libgeodriver import * +from libgeodesy import * +from libgeom import * +from libipdutilities import * +from liboodriver import * +from config import Config +from zip import Zip -PASSWORD = 'gost_2012$a742ec53198ec2a5027086fba8814a89982a57112d1a72d02260161108f39b50' tasks = Queue() connected = False +logger = logging.getLogger('xmlrpcserver') + def run_tasks(): + logger.debug('Task thread started.') while True: task = tasks.get() task() def replication_task(): + return while not connected: time.sleep(1) date = datetime.datetime.now() @@ -39,8 +56,8 @@ def replication_task(): res.set('replication_package', '1') rxmls.set_result(res, 0, '') response_params = { - 'from': 'tcp://bnd127', - 'to': 'tcp://kptsp_vb', + 'from': f'tcp://{Config.self_bnd}', + 'to': f'tcp://{Config.remote_bnd}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': 1114, @@ -50,23 +67,63 @@ def replication_task(): filepath = '/tmp/' + filename response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] logging.debug('Send replication package') - proxy = ServerProxy('http://127.0.0.1:7000/xmlrpc') - proxy.send(response_params, response_files, 'http://10.10.8.27:9000/') + proxy = ServerProxy(Config.enserver) + proxy.send(response_params, response_files, Config.ret_path) + + +def pika_callback(ch, method, properties, body): + date = datetime.datetime.now() + rxmls = RequestXmlService() + res_id = uuid4().hex + res = rxmls.get_request_document(res_id, None) + rxmls.set_result(res, 0, '') + response_params = { + 'from': f'tcp://{Config.self_bnd}', + 'to': f'tcp://{Config.remote_bnd}', + 'ts_added': date.timestamp(), + 'user_id': '0', + 'query_type': 99, + 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), + } + con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) + ws = OODBWorkspace.ws(Config.oodb_schema) + ws.init(con) + oe = OsmExporter(ws) + # commit = '1de21737-09dc-4a45-b7d9-7b11044ee487' + z = Zip() + nc = oe.nextCommit(body) + oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), body, nc) + ws.close() + filepath = z.pack() + response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] + logger.warning(response_files) + logging.debug('Send replication package') + proxy = ServerProxy(Config.enserver) + proxy.send(response_params, response_files, Config.ret_path) + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def pika_task(): + connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + channel = connection.channel() + channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) + + channel.start_consuming() -# Expose a function def list_contents(dir_name): - logging.debug('list_contents(%s)', dir_name) + logger.warning('list_contents(%s)', dir_name) return os.listdir(dir_name) def aud_add(message): global connected if not isinstance(message, list): - logging.warning(message) + logger.warning(message) return 'OK' for item in message: - logging.warning(item) + logger.warning(item) if item.get('level', -1) == 0 and item.get('type', -1) == 4002: connected = True return 'OK' @@ -74,13 +131,20 @@ def aud_add(message): def auth_response(challenge, server_id, is_server): # logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}') - msg = '%s%s%s' % (challenge, server_id, PASSWORD) - response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex() - return {'error': False, 'response': response} + with psycopg.connect(f'host={Config.pg_host} port={Config.pg_port} dbname={Config.pg_dbname} ' + f'user={Config.pg_username} password={Config.pg_password}') as conn: + with conn.cursor() as cur: + cur.execute('SELECT passwd FROM users where username = %s', (server_id,)) + passwd = cur.fetchone() + if not passwd: + return {'error': True, 'response': 'Wrong user/bnd'} + msg = '%s%s%s' % (challenge, server_id, passwd) + response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex() + return {'error': False, 'response': response} def auth_challenge(): - # logging.debug('get challenge') + logging.debug('get challenge') return uuid4().hex @@ -90,7 +154,7 @@ def restore_uuid(oid): def load_catalog(params, files, url): - logging.debug('load_catalog') + logger.warning('load_catalog') date = datetime.datetime.now() rxmls = RequestXmlService() req = ET.fromstring(params['query_data']) @@ -116,7 +180,7 @@ def load_catalog(params, files, url): zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) - proxy.send(response_params, response_files, 'http://10.10.8.27:9000/') + proxy.send(response_params, response_files, Config.ret_path) def get_objects(params, files, url): @@ -154,7 +218,7 @@ def get_objects(params, files, url): zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) - proxy.send(response_params, response_files, 'http://10.10.8.27:9000/') + proxy.send(response_params, response_files, Config.ret_path) def get_metadata(params, files, url): @@ -205,7 +269,7 @@ def get_metadata(params, files, url): zipf.close() response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}] proxy = ServerProxy(url) - proxy.send(response_params, response_files, 'http://10.10.8.27:9000/') + proxy.send(response_params, response_files, Config.ret_path) def run_task(query_type, params, files, url): @@ -219,22 +283,23 @@ def run_task(query_type, params, files, url): def accept(params, files, url): print(params, files, url) - print('Accept') + logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False)) run_task(params['query_type'], params, files, url) return True def onSent(params, files, callback_url): - logging.debug('onSent') + logger.debug('OnSent') + logger.warning(params) def onDelivered(params, files, callback_url): - logging.debug('onDelivered') + logger.warning('onDelivered') def main(): - logging.debug('Use Control-C to exit') - logging.basicConfig(level=logging.DEBUG) + logger.setLevel(logging.INFO) + logger.warning('Use Control-C to exit') server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=False, allow_none=True) server.register_function(list_contents) server.register_function(aud_add) @@ -250,10 +315,14 @@ def main(): replication_thread = threading.Thread(target=replication_task) replication_thread.start() + pika_thread = threading.Thread(target=pika_task) + pika_thread.start() + try: + logger.warning('Start server') server.serve_forever() except KeyboardInterrupt: - logging.debug('Exiting') + logger.warning('Exiting') def test(): @@ -262,7 +331,9 @@ def test(): #url = 'http://127.0.0.1:7000/xmlrpc' # accept(params, files, url) #get_metadata(params, files, url) - get_catalog() + #get_catalog() + auth_response('123', 'bnd127', False) + pass if __name__ == '__main__': diff --git a/requirements.txt b/requirements.txt index 70f74e1..eaab079 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,16 @@ -gql~=3.4.0 -pygost~=5.11 -requests~=2.28.2 \ No newline at end of file +aiohttp==3.8.4 +aiosignal==1.3.1 +async-timeout==4.0.2 +attrs==23.1.0 +backoff==2.2.1 +certifi==2023.5.7 +charset-normalizer==3.2.0 +frozenlist==1.3.3 +gql==3.4.1 +graphql-core==3.2.3 +idna==3.4 +multidict==6.0.4 +pygost==5.12 +requests==2.31.0 +urllib3==2.0.3 +yarl==1.9.2 diff --git a/zip.py b/zip.py new file mode 100644 index 0000000..5653eaf --- /dev/null +++ b/zip.py @@ -0,0 +1,23 @@ +import os +from tempfile import TemporaryDirectory, mktemp +from zipfile import ZipFile + + +class Zip: + dir: TemporaryDirectory[str] + + def __init__(self): + self.dir = TemporaryDirectory() + + @property + def dirname(self): + return self.dir.name + + def pack(self): + tmp_zip = mktemp(suffix='.zip') + with ZipFile(tmp_zip, 'w') as zip_object: + for folder_name, sub_folders, file_names in os.walk(self.dir.name): + for filename in file_names: + file_path = os.path.join(folder_name, filename) + zip_object.write(file_path, os.path.basename(file_path)) + return tmp_zip