Files
xmlrpcserver/main.py
2023-07-31 20:11:07 +03:00

444 lines
15 KiB
Python

import asyncio
import pika
import sys
import threading
import time
from queue import Queue
import datetime
import json
from uuid import uuid4, UUID
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
import logging
import os
import os.path
import psycopg
import requests
from reqs.graphql import get_catalog, get_object
from pygost import gost34112012256
import xml.etree.ElementTree as ET
from reqs.request_xml_service import RequestXmlService
import zipfile
from libcommon import *
from libdatabase import *
from libgeodata import *
from libgeodriver import *
from libgeodesy import *
from libgeom import *
from libipdutilities import *
from liboodriver import *
from config import Config
from zip import Zip
import boto3
tasks = Queue()
connected = False
logger = logging.getLogger('xmlrpcserver')
def s3_connection():
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
aws_access_key_id=Config.s3_key_id,
aws_secret_access_key=Config.s3_access_key)
def download_file(key: str, bucket: str, filename: str):
client = s3_connection()
obj = client.get_object(Bucket=bucket, Key=key)
with open(f"{filename}", 'wb') as f:
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
f.write(chunk)
def upload_file(filename: str, key: str, bucket: str):
client = s3_connection()
with open(filename, 'rb') as f:
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
def run_tasks():
logger.debug('Task thread started.')
while True:
task = tasks.get()
task()
def replication_task():
return
while not connected:
time.sleep(1)
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
res.set('replication_package', '1')
rxmls.set_result(res, 0, '')
response_params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{Config.remote_bnd}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': 1114,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
filename = '51a8a2c81f774af7bba61b475b4b51b5'
filepath = '/tmp/' + filename
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
logging.debug('Send replication package')
proxy = ServerProxy(Config.enserver)
proxy.send(response_params, response_files, Config.ret_path)
def pika_callback(ch, method, properties, body):
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
commit_info = json.loads(body)
schema = commit_info.get('schema') or Config.oodb_schema
res = rxmls.get_request_document(res_id, None)
rxmls.set_result(res, 0, '')
ET.SubElement(res, 'replication', {'id': commit_info['commit'], 'schema': schema})
response_params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{Config.remote_bnd}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': 99,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
oe = IpdExporter(ws)
# commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"}
# commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"}
# commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"}
# commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"}
z = Zip()
pc = oe.previousCommit(commit_info['commit'])
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_info['commit'])
oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_info['commit'])
created, updated, deleted = ws.changes(commit_info['commit'])
qu = GroupQuery(Envelope())
qu.setUids(updated)
qu.setLoadArch(True)
uids = []
ws.clearData(True)
ws.load(qu, uids)
updated_files = []
for feature_uid in uids:
not_files = True
vers = ws.featureVersion(feature_uid)
if len(vers) > 1:
vers.sort(key=vers_key)
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
if not not_files:
for attr in vers[0].feature().attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}')
ws.clearData(True)
qu = GroupQuery(Envelope())
qu.setUids(created)
ws.load(qu, uids)
exported_files = []
for feature_uid in uids:
feature = ws.featureByUid(feature_uid)
if not feature:
continue
for attr in feature.attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# updated_files.append(feature_uid)
for x in updated_files:
exported_files.append({
'key': x.key,
'bucket': x.bucket,
'filename': x.fileName,
})
download_file(x.key, x.bucket, os.path.join(z.dirname, x.key))
with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f:
f.write(json.dumps(exported_files))
ws.clearData(True)
# ws.close()
filepath = z.pack()
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
proxy = ServerProxy(Config.enserver)
try:
proxy.send(response_params, response_files, Config.ret_path)
except:
logger.error('Error sending')
ch.basic_ack(delivery_tag=method.delivery_tag)
def pika_task():
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel = connection.channel()
channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
channel.start_consuming()
def list_contents(dir_name):
logger.warning('list_contents(%s)', dir_name)
return os.listdir(dir_name)
def aud_add(message):
global connected
if not isinstance(message, list):
logger.warning(message)
return 'OK'
for item in message:
logger.warning(item)
if item.get('level', -1) == 0 and item.get('type', -1) == 4002:
connected = True
return 'OK'
def auth_response(challenge, server_id, is_server):
# logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}')
with psycopg.connect(f'host={Config.pg_host} port={Config.pg_port} dbname={Config.pg_dbname} '
f'user={Config.pg_username} password={Config.pg_password}') as conn:
with conn.cursor() as cur:
cur.execute('SELECT passwd FROM users where username = %s', (server_id,))
passwd = cur.fetchone()
if not passwd:
return {'error': True, 'response': 'Wrong user/bnd'}
msg = '%s%s%s' % (challenge, server_id, passwd)
response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex()
return {'error': False, 'response': response}
def auth_challenge():
logging.debug('get challenge')
return uuid4().hex
def restore_uuid(oid):
uuid = UUID(oid)
return str(uuid)
def load_catalog(params, files, url):
logger.warning('load_catalog')
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, req_id)
rxmls.set_result(res, 0, '')
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': params['user_id'],
'query_type': 1004,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
catalog = get_catalog()
logging.debug('Catalog_loaded')
filename = uuid4().hex
filepath = '/tmp/' + filename
zipf = zipfile.ZipFile(filepath, "w")
zipf.writestr('WF.CLL', catalog)
zipf.close()
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
proxy = ServerProxy(url)
proxy.send(response_params, response_files, Config.ret_path)
def get_objects(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, req_id)
rxmls.set_result(res, 0, '')
objs = req.find('objects')
uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')]
rxmls.set_result(res, 0, '')
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': params['user_id'],
'query_type': 1001,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
filename = uuid4().hex
filepath = '/tmp/' + filename
zipf = zipfile.ZipFile(filepath, "w")
main_filename = None
for uid in uids:
obj = json.loads(get_object(uid))
for file in obj['properties'].get('c1000', []):
if not main_filename:
main_filename = file['fileName']
res = requests.get(f'https://gql.ivazh.ru/item/{file["key"]}')
zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content)
zipf.close()
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
proxy = ServerProxy(url)
proxy.send(response_params, response_files, Config.ret_path)
def get_metadata(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, req_id)
rxmls.set_result(res, 0, '')
objs = req.find('getMetadataByIds')
uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')]
rxmls.set_result(res, 0, '')
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': params['user_id'],
'query_type': 1024,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
filename = uuid4().hex
filepath = '/tmp/' + filename
zipf = zipfile.ZipFile(filepath, "w")
content = ET.Element('getMetadataResponse')
for uid in uids:
obj = json.loads(get_object(uid))
date = datetime.datetime.fromisoformat(obj['date_updated'])
chart = ET.SubElement(content, 'chart', {
'id': UUID(obj['uid']).hex,
'updated': str(date.timestamp()),
})
for key in obj['properties']:
if not key.startswith('c'):
continue
mdel = ET.SubElement(chart, 'mdItem', {
'code': key.replace('_', '.'),
'name': key,
'value': str(obj['properties'].get(key, '')),
'isBase': 'false',
'groupId': '',
'groupName': '',
})
zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True))
zipf.close()
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
proxy = ServerProxy(url)
proxy.send(response_params, response_files, Config.ret_path)
def run_task(query_type, params, files, url):
if query_type == 4:
tasks.put(lambda: load_catalog(params, files, url))
if query_type == 1:
tasks.put(lambda: get_objects(params, files, url))
if query_type == 24:
tasks.put(lambda: get_metadata(params, files, url))
def accept(params, files, url):
print(params, files, url)
logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False))
run_task(params['query_type'], params, files, url)
return True
def onSent(params, files, callback_url):
logger.debug('OnSent')
logger.warning(params)
def onDelivered(params, files, callback_url):
logger.warning('onDelivered')
def main():
logger.setLevel(logging.INFO)
logger.warning('Use Control-C to exit')
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=False, allow_none=True)
server.register_function(list_contents)
server.register_function(aud_add)
server.register_function(auth_response)
server.register_function(auth_challenge)
server.register_function(accept)
server.register_function(onSent)
server.register_function(onDelivered)
thread = threading.Thread(target=run_tasks)
thread.start()
replication_thread = threading.Thread(target=replication_task)
replication_thread.start()
pika_thread = threading.Thread(target=pika_task)
pika_thread.start()
try:
logger.warning('Start server')
server.serve_forever()
except KeyboardInterrupt:
logger.warning('Exiting')
def vers_key(e):
return e.version()
def test():
#params = {"from": "tcp://kptsp_vb", "query_data": "<?xml version=\"1.0\" encoding=\"utf-8\"?><request><header parcel_id=\"990715ba919544a98f22cc7d3b0d9e8d\"/><getMetadataByIds><chart id=\"fc44343bd1654ee7b03ac1731567bbfd\"/></getMetadataByIds></request>", "query_type": 24, "to": "tcp://bnd127", "user_id": "3302", "ts_added": 1679825320.653038}
#files = []
#url = 'http://127.0.0.1:7000/xmlrpc'
# accept(params, files, url)
#get_metadata(params, files, url)
#get_catalog()
# auth_response('123', 'bnd127', False)
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
ws.init(con)
created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
qu = GroupQuery(Envelope())
qu.setUids(updated)
qu.setLoadArch(True)
uids = []
ws.clearData(True)
ws.load(qu, uids)
updated_files = []
for feature_uid in uids:
not_files = True
vers = ws.featureVersion(feature_uid)
if len(vers) > 1:
vers.sort(key=vers_key)
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
if not not_files:
updated_files.append(feature_uid)
print(updated_files)
ws.clearData(True)
ws.close()
pass
if __name__ == '__main__':
main()
# test()