766 lines
27 KiB
Python
766 lines
27 KiB
Python
import asyncio
|
|
from tempfile import TemporaryDirectory
|
|
import pika
|
|
import sys
|
|
import threading
|
|
import time
|
|
from queue import Queue
|
|
import datetime
|
|
import json
|
|
from uuid import uuid4, UUID
|
|
from xmlrpc.server import SimpleXMLRPCServer
|
|
from xmlrpc.client import ServerProxy
|
|
import logging
|
|
import os
|
|
import io
|
|
import zlib
|
|
import os.path
|
|
import requests
|
|
from reqs.graphql import get_catalog, get_object
|
|
from pygost import gost34112012256
|
|
import xml.etree.ElementTree as ET
|
|
from reqs.request_xml_service import RequestXmlService
|
|
import zipfile
|
|
from config import Config
|
|
from zip import Zip
|
|
import boto3
|
|
import db
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
|
|
import uvicorn
|
|
from typing_extensions import Annotated
|
|
import pathlib
|
|
from infra import *
|
|
|
|
|
|
NEW_REPLICATION_REQUEST = 99
|
|
|
|
|
|
tasks = Queue()
|
|
connected = set()
|
|
|
|
logger = logging.getLogger('xmlrpcserver')
|
|
|
|
|
|
def s3_connection():
|
|
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
|
|
aws_access_key_id=Config.s3_key_id,
|
|
aws_secret_access_key=Config.s3_access_key)
|
|
|
|
|
|
def download_file(key: str, bucket: str, filename: str):
|
|
client = s3_connection()
|
|
obj = client.get_object(Bucket=bucket, Key=key)
|
|
with open(f"{filename}", 'wb') as f:
|
|
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
|
|
f.write(chunk)
|
|
|
|
|
|
def upload_file(filename: str, key: str, bucket: str):
|
|
client = s3_connection()
|
|
with open(filename, 'rb') as f:
|
|
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
|
|
|
|
|
|
def get_branch(bndname: str, scheme: str):
|
|
conn = db.connect_db()
|
|
with Session(conn) as session:
|
|
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
|
|
if item:
|
|
return item.branch
|
|
return None
|
|
|
|
|
|
def run_tasks():
|
|
logger.debug('Task thread started.')
|
|
while True:
|
|
task = tasks.get()
|
|
task()
|
|
|
|
|
|
def replication_task():
|
|
while True:
|
|
conn = db.connect_db()
|
|
with Session(conn) as session:
|
|
for bndname in connected:
|
|
for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all():
|
|
if item.user.is_active_now():
|
|
if item.user.newbnd:
|
|
replication(bndname, item.commit_id, item.schema)
|
|
else:
|
|
replication_old(bndname, item.commit_id, item.schema)
|
|
session.delete(item)
|
|
session.commit()
|
|
time.sleep(60)
|
|
|
|
|
|
def process_chart(ws, chart, chart_class):
|
|
for x in chart.get('properties') or {}:
|
|
attr = ws.attribute(x)
|
|
if not attr:
|
|
continue
|
|
if str(attr.type()) == 'AT_Domain':
|
|
dom = attr.domain()
|
|
key = str(chart['properties'][x])
|
|
chart['properties'][x] = variantToString(dom.value(variantFromString(key)))
|
|
if x == 'c103':
|
|
chart['properties'][x] = 'Несекретно'
|
|
chart['class'] = chart_class['name'].split('_')[-1]
|
|
return chart
|
|
|
|
|
|
def crc32(filename, chunk_size=65536):
|
|
"""Compute the CRC-32 checksum of the contents of the given filename"""
|
|
with open(filename, "rb") as f:
|
|
checksum = 0
|
|
while chunk := f.read(chunk_size):
|
|
checksum = zlib.crc32(chunk, checksum)
|
|
return "%08X" % (checksum & 0xFFFFFFFF)
|
|
|
|
|
|
def send_object_replication(ws, ids):
|
|
qu = GroupQuery(Envelope())
|
|
query = GroupQuery(Envelope.world())
|
|
query.setUids(ids)
|
|
uids = []
|
|
ws.load(query, uids)
|
|
res = json.loads(ws.dataToJson())
|
|
charts = [process_chart(ws, f, x) for x in res for f in x['features']]
|
|
|
|
for chart in charts:
|
|
logger.warning('\n')
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
res_id = uuid4().hex
|
|
res = rxmls.get_request_document(res_id, None)
|
|
res.set('replication_package', '1')
|
|
res.set('replication_version', date.strftime('%Y%m%d%H%M%S'))
|
|
res.set('user_permit', 'AA0AA00020200726D3E75C80B713A7A3D3E75C80B713A7A363F7CB889AA3F520')
|
|
rxmls.set_result(res, 0, '')
|
|
print(chart)
|
|
properties = chart.get('properties') or {}
|
|
c1000 = properties.get('c1000')
|
|
if not c1000:
|
|
logger.warning(f'No file for {chart["uid"].replace("-", "")}')
|
|
continue
|
|
z = Zip()
|
|
xml_objects = ET.SubElement(res, 'objects')
|
|
created_date = datetime.datetime.fromisoformat(chart.get('date_created'))
|
|
xml_chart = ET.SubElement(res, 'chart', {
|
|
'Class': chart['class'],
|
|
'ID': chart['uid'].replace('-', ''),
|
|
'Name': c1000[0]['fileName'],
|
|
'Type': 'OOD',
|
|
'metadata_version': '1',
|
|
'source': 'bnd127',
|
|
'system_date': str(created_date.timestamp()),
|
|
})
|
|
xml_version = ET.SubElement(xml_objects, 'version', {
|
|
'object_id': chart['uid'].replace('-', ''),
|
|
'source': 'bnd127',
|
|
'system_date': str(created_date.timestamp()),
|
|
'version': '1.0',
|
|
'version_id': chart['uid'].replace('-', ''),
|
|
})
|
|
total_size = 0
|
|
for file in c1000:
|
|
directory = os.path.join(z.dirname, f'maps/{res_id}/ENC_ROOT/{c1000[0]["fileName"]}')
|
|
fp = os.path.join(directory, file['fileName'])
|
|
if not os.path.exists(directory):
|
|
os.makedirs(directory)
|
|
download_file(file['key'], Config.s3_bucket, fp)
|
|
size = os.stat(fp).st_size
|
|
_crc32 = crc32(fp)
|
|
ET.SubElement(xml_version, 'file', {
|
|
'cell_file': 'false',
|
|
'crc32': _crc32,
|
|
'crc32_enc': _crc32,
|
|
'file_id': file['key'].replace('-', ''),
|
|
'file_ref': os.path.join(f'maps/{res_id}/ENC_ROOT', file['fileName']),
|
|
'file_size': str(size),
|
|
'file_size_enc': str(size),
|
|
})
|
|
xml_version.set('crc32', _crc32)
|
|
xml_version.set('crc32', _crc32)
|
|
total_size += size
|
|
xml_version.set('size', str(total_size))
|
|
xml_version.set('crc32', str(total_size))
|
|
|
|
xml_tags = ET.SubElement(res, 'tags')
|
|
xml_archs = ET.SubElement(res, 'archs')
|
|
xml_arch = ET.SubElement(xml_archs, 'arch', {
|
|
'obj_id': chart['uid'].replace('-', ''),
|
|
'ver_cl': '8.31',
|
|
'ver_id': chart['uid'].replace('-', ''),
|
|
})
|
|
for attribute in properties:
|
|
if attribute.startswith('c'):
|
|
ET.SubElement(xml_chart, 'Attribute', {
|
|
'name': attribute.replace('_', '.'),
|
|
'value': str(chart['properties'][attribute]),
|
|
})
|
|
ET.SubElement(xml_arch, 'attr', {
|
|
'code': attribute.replace('_', '.'),
|
|
'name': attribute.replace('_', '.'),
|
|
'value': str(chart['properties'][attribute]),
|
|
})
|
|
params = {
|
|
'from': f'tcp://{Config.self_bnd}',
|
|
'to': f'tcp://{Config.remote_bnd}',
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '0',
|
|
'query_type': NEW_REPLICATION_REQUEST,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
|
|
}
|
|
filepath = z.pack()
|
|
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
logger.warning(response_files)
|
|
logging.debug('Send replication package')
|
|
proxy = ServerProxy(Config.enserver)
|
|
try:
|
|
proxy.send(params, response_files, Config.ret_path)
|
|
except:
|
|
logger.error('Error sending')
|
|
|
|
|
|
def replication_old(bnd_name: str, commit_id: str, schema: str):
|
|
logger.warning('Start replication')
|
|
if schema != Config.oodb_schema:
|
|
return
|
|
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
|
Config.oodb_username, Config.oodb_passwd, schema)
|
|
ws = OODBWorkspace.ws(schema)
|
|
if not ws.isInit():
|
|
res = ws.init(con)
|
|
logger.warning(res)
|
|
created, updated, _ = ws.changes(commit_id)
|
|
ids = list(set(created) | set(updated))
|
|
send_object_replication(ws, ids)
|
|
ws.clearData(True)
|
|
logger.warning('Replication to old bnd is sent')
|
|
|
|
|
|
def replication(bnd_name: str, commit_id: str, schema: str):
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
res_id = uuid4().hex
|
|
|
|
res = rxmls.get_request_document(res_id, None)
|
|
rxmls.set_result(res, 0, '')
|
|
ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema})
|
|
response_params = {
|
|
'from': f'tcp://{Config.self_bnd}',
|
|
'to': f'tcp://{bnd_name}',
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '0',
|
|
'query_type': NEW_REPLICATION_REQUEST,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
|
|
}
|
|
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
|
Config.oodb_username, Config.oodb_passwd, schema)
|
|
ws = OODBWorkspace.ws(schema)
|
|
if not ws.isInit():
|
|
res = ws.init(con)
|
|
logger.warning(res)
|
|
oe = IpdExporter(ws)
|
|
# commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"}
|
|
# commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"}
|
|
# commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"}
|
|
# commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"}
|
|
z = Zip()
|
|
|
|
pc = oe.previousCommit(commit_id)
|
|
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id)
|
|
oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id)
|
|
|
|
created, updated, deleted = ws.changes(commit_id)
|
|
qu = GroupQuery(Envelope())
|
|
qu.setUids(updated)
|
|
qu.setLoadArch(True)
|
|
uids = []
|
|
ws.clearData(True)
|
|
ws.load(qu, uids)
|
|
updated_files = []
|
|
for feature_uid in uids:
|
|
not_files = True
|
|
vers = ws.featureVersion(feature_uid)
|
|
if len(vers) > 1:
|
|
vers.sort(key=vers_key)
|
|
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
|
|
if not not_files:
|
|
for attr in vers[0].feature().attributes('c1000'):
|
|
updated_files.append(variantToFileValue(attr.val()))
|
|
# print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}')
|
|
ws.clearData(True)
|
|
qu = GroupQuery(Envelope())
|
|
qu.setUids(created)
|
|
ws.load(qu, uids)
|
|
exported_files = []
|
|
for feature_uid in uids:
|
|
feature = ws.featureByUid(feature_uid)
|
|
if not feature:
|
|
continue
|
|
for attr in feature.attributes('c1000'):
|
|
updated_files.append(variantToFileValue(attr.val()))
|
|
# updated_files.append(feature_uid)
|
|
for x in updated_files:
|
|
exported_files.append({
|
|
'key': x.key,
|
|
'bucket': x.bucket,
|
|
'filename': x.fileName,
|
|
})
|
|
fp = os.path.join(z.dirname, x.key)
|
|
os.makedirs(os.path.dirname(fp), exist_ok=True)
|
|
download_file(x.key, x.bucket, fp)
|
|
with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f:
|
|
f.write(json.dumps(exported_files))
|
|
ws.clearData(True)
|
|
# ws.close()
|
|
filepath = z.pack()
|
|
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
logger.warning(response_files)
|
|
logging.debug('Send replication package')
|
|
proxy = ServerProxy(Config.enserver)
|
|
try:
|
|
proxy.send(response_params, response_files, Config.ret_path)
|
|
except:
|
|
logger.error('Error sending')
|
|
|
|
|
|
def pika_callback(ch, method, properties, body):
|
|
commit_info = json.loads(body)
|
|
schema = commit_info.get('schema') or Config.oodb_schema
|
|
commit = commit_info['commit']
|
|
conn = db.connect_db()
|
|
with Session(conn) as session:
|
|
for user in session.query(db.User).filter(db.User.active == True, db.User.upstream == False).all():
|
|
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
|
if len(profiles) == 0 or schema in profiles:
|
|
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
|
session.add(item)
|
|
session.commit()
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
|
|
|
|
def pika_task():
|
|
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
|
|
channel = connection.channel()
|
|
channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
|
|
|
|
channel.start_consuming()
|
|
|
|
|
|
def list_contents(dir_name):
|
|
logger.warning('list_contents(%s)', dir_name)
|
|
return os.listdir(dir_name)
|
|
|
|
|
|
def aud_add(message):
|
|
if not isinstance(message, list):
|
|
logger.warning(message)
|
|
return 'OK'
|
|
for item in message:
|
|
logger.warning(item)
|
|
return 'OK'
|
|
|
|
|
|
def auth_response(challenge, server_id, is_server):
|
|
# logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}')
|
|
conn = db.connect_db()
|
|
with Session(conn) as session:
|
|
try:
|
|
user = session.query(db.User).filter((db.User.username == server_id)).one()
|
|
passwd = user.passwd
|
|
msg = '%s%s%s' % (challenge, server_id, passwd)
|
|
response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex()
|
|
session.commit()
|
|
return {'error': False, 'response': response}
|
|
except Exception:
|
|
return {'error': True, 'response': 'Wrong user/bnd'}
|
|
|
|
|
|
def auth_challenge():
|
|
logging.debug('get challenge')
|
|
return uuid4().hex
|
|
|
|
|
|
def restore_uuid(oid):
|
|
uuid = UUID(oid)
|
|
return str(uuid)
|
|
|
|
|
|
def load_catalog(params, files, url):
|
|
logger.warning('load_catalog')
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
req = ET.fromstring(params['query_data'])
|
|
req_id = rxmls.get_request_uuid(req)
|
|
res_id = uuid4().hex
|
|
res = rxmls.get_request_document(res_id, req_id)
|
|
rxmls.set_result(res, 0, '')
|
|
response_params = {
|
|
'from': params['to'],
|
|
'to': params['from'],
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '1',
|
|
'user_id_to': params['user_id'],
|
|
'query_type': 1004,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
|
}
|
|
catalog = get_catalog()
|
|
logging.debug('Catalog_loaded')
|
|
filename = uuid4().hex
|
|
filepath = '/tmp/' + filename
|
|
zipf = zipfile.ZipFile(filepath, "w")
|
|
zipf.writestr('WF.CLL', catalog)
|
|
zipf.close()
|
|
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
proxy = ServerProxy(url)
|
|
proxy.send(response_params, response_files, Config.ret_path)
|
|
|
|
|
|
def get_objects(params, files, url):
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
req = ET.fromstring(params['query_data'])
|
|
req_id = rxmls.get_request_uuid(req)
|
|
res_id = uuid4().hex
|
|
res = rxmls.get_request_document(res_id, req_id)
|
|
rxmls.set_result(res, 0, '')
|
|
objs = req.find('objects')
|
|
uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')]
|
|
|
|
rxmls.set_result(res, 0, '')
|
|
response_params = {
|
|
'from': params['to'],
|
|
'to': params['from'],
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '1',
|
|
'user_id_to': params['user_id'],
|
|
'query_type': 1001,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
|
}
|
|
filename = uuid4().hex
|
|
filepath = '/tmp/' + filename
|
|
zipf = zipfile.ZipFile(filepath, "w")
|
|
main_filename = None
|
|
for uid in uids:
|
|
obj = json.loads(get_object(uid))
|
|
for file in obj['properties'].get('c1000', []):
|
|
if not main_filename:
|
|
main_filename = file['fileName']
|
|
res = requests.get(f'https://gql.ivazh.ru/item/{file["key"]}')
|
|
zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content)
|
|
zipf.close()
|
|
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
proxy = ServerProxy(url)
|
|
proxy.send(response_params, response_files, Config.ret_path)
|
|
|
|
|
|
def get_metadata(params, files, url):
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
req = ET.fromstring(params['query_data'])
|
|
req_id = rxmls.get_request_uuid(req)
|
|
res_id = uuid4().hex
|
|
res = rxmls.get_request_document(res_id, req_id)
|
|
rxmls.set_result(res, 0, '')
|
|
objs = req.find('getMetadataByIds')
|
|
uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')]
|
|
|
|
rxmls.set_result(res, 0, '')
|
|
response_params = {
|
|
'from': params['to'],
|
|
'to': params['from'],
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '1',
|
|
'user_id_to': params['user_id'],
|
|
'query_type': 1024,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
|
}
|
|
filename = uuid4().hex
|
|
filepath = '/tmp/' + filename
|
|
zipf = zipfile.ZipFile(filepath, "w")
|
|
content = ET.Element('getMetadataResponse')
|
|
|
|
for uid in uids:
|
|
obj = json.loads(get_object(uid))
|
|
date = datetime.datetime.fromisoformat(obj['date_updated'])
|
|
chart = ET.SubElement(content, 'chart', {
|
|
'id': UUID(obj['uid']).hex,
|
|
'updated': str(date.timestamp()),
|
|
})
|
|
for key in obj['properties']:
|
|
if not key.startswith('c'):
|
|
continue
|
|
mdel = ET.SubElement(chart, 'mdItem', {
|
|
'code': key.replace('_', '.'),
|
|
'name': key,
|
|
'value': str(obj['properties'].get(key, '')),
|
|
'isBase': 'false',
|
|
'groupId': '',
|
|
'groupName': '',
|
|
})
|
|
zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True))
|
|
zipf.close()
|
|
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
proxy = ServerProxy(url)
|
|
proxy.send(response_params, response_files, Config.ret_path)
|
|
|
|
|
|
def put_object(params, files, url):
|
|
date = datetime.datetime.now()
|
|
req = ET.fromstring(params['query_data'])
|
|
obj = req.find('chart')
|
|
class_id = obj.get('Class')
|
|
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
|
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
|
ws = OODBWorkspace.ws(Config.oodb_schema)
|
|
if not ws.isInit():
|
|
res = ws.init(con)
|
|
logger.warning(res)
|
|
logging.info(class_id)
|
|
fc = ws.featureClass(class_id)
|
|
feature = fc.createFeature('')
|
|
geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011()))
|
|
res = feature.setGeometry(geom)
|
|
for attr in obj.findall('Attribute'):
|
|
name = attr.get('name')
|
|
value = attr.get('value')
|
|
res &= feature.addAttribute(name, variantFromString(value))
|
|
|
|
assert len(files) == 1
|
|
file = files[0]
|
|
dir = TemporaryDirectory()
|
|
with zipfile.ZipFile(file, 'r') as zip_ref:
|
|
zip_ref.extractall(dir.name)
|
|
|
|
fp = pathlib.Path(dir.name)
|
|
for item in fp.glob('**/*'):
|
|
if not item.is_file():
|
|
continue
|
|
fileVal = FileValue()
|
|
key = uuid4().hex
|
|
fileVal.fileName = variantToString(item.relative_to(dir.name))
|
|
fileVal.key = variantToString(key)
|
|
fileVal.bucket = variantToString(Config.s3_bucket)
|
|
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
|
|
upload_file(str(item), key, Config.s3_bucket)
|
|
|
|
ws.transaction()
|
|
res = ws.save()
|
|
ws.commit(f'Putobject from {params["to"]}')
|
|
ws.clearData(True)
|
|
|
|
|
|
def apply_commits(params, files, url):
|
|
assert len(files) == 1
|
|
file = files[0]
|
|
dir = TemporaryDirectory()
|
|
r = requests.get(file['url'])
|
|
with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref:
|
|
zip_ref.extractall(dir.name)
|
|
req = ET.fromstring(params['query_data'])
|
|
repl = req.find('replication')
|
|
scheme = repl.get('scheme')
|
|
commit = repl.get('id')
|
|
branch = get_branch(params['from'].replace('tcp://', ''), scheme)
|
|
con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
|
Config.oodb_username, Config.oodb_passwd, scheme)
|
|
ws = OODBWorkspace.ws(scheme)
|
|
if not ws.isInit():
|
|
res = ws.init(con)
|
|
logger.warning(res)
|
|
oe = IpdExporter(ws)
|
|
if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')):
|
|
logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}')
|
|
else:
|
|
logging.warning(f'Importing commit {commit} finished successfully')
|
|
with open(os.path.join(dir.name, 'export_files.json'), 'r') as f:
|
|
files_data = json.load(f)
|
|
for file_data in files_data:
|
|
upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket'])
|
|
ws.clearData(True)
|
|
|
|
|
|
def run_task(query_type, params, files, url):
|
|
if query_type == 4:
|
|
tasks.put(lambda: load_catalog(params, files, url))
|
|
if query_type == 1:
|
|
tasks.put(lambda: get_objects(params, files, url))
|
|
if query_type == 24:
|
|
tasks.put(lambda: get_metadata(params, files, url))
|
|
if query_type == 6:
|
|
tasks.put(lambda: put_object(params, files, url))
|
|
if query_type == NEW_REPLICATION_REQUEST:
|
|
tasks.put(lambda: apply_commits(params, files, url))
|
|
|
|
|
|
def accept(params, files, url):
|
|
print(params, files, url)
|
|
logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False))
|
|
run_task(params['query_type'], params, files, url)
|
|
return True
|
|
|
|
|
|
def onSent(params, files, callback_url):
|
|
logger.debug('OnSent')
|
|
logger.warning(params)
|
|
|
|
|
|
def onDelivered(params, files, callback_url):
|
|
logger.warning('onDelivered')
|
|
|
|
|
|
def bnd_connected(bnd_name: str):
|
|
logger.warning(f'{bnd_name} connected')
|
|
connected.add(bnd_name)
|
|
|
|
|
|
def bnd_disconnected(bnd_name: str):
|
|
logger.warning(f'{bnd_name} disconnected')
|
|
if bnd_name in connected:
|
|
connected.remove(bnd_name)
|
|
|
|
|
|
def xmlrpc_task():
|
|
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=False, allow_none=True)
|
|
server.register_function(list_contents)
|
|
server.register_function(aud_add)
|
|
server.register_function(auth_response)
|
|
server.register_function(auth_challenge)
|
|
server.register_function(accept)
|
|
server.register_function(onSent)
|
|
server.register_function(onDelivered)
|
|
server.register_function(bnd_connected)
|
|
server.register_function(bnd_disconnected)
|
|
server.serve_forever()
|
|
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
@app.get("/")
|
|
async def fa_root():
|
|
return {"message": "Hello World"}
|
|
|
|
|
|
@app.get("/login")
|
|
async def fa_login_get(response: Response):
|
|
response.set_cookie(key='sessionid', value='87654321')
|
|
response.set_cookie(key='csrftoken', value='12345678')
|
|
return {"login": "Ok", "success": True}
|
|
|
|
|
|
@app.post("/login")
|
|
async def fa_login(response: Response):
|
|
logger.warning('login action')
|
|
response.set_cookie(key='sessionid', value='87654321')
|
|
response.set_cookie(key='csrftoken', value='12345678')
|
|
return {"login": "Ok", "success": True}
|
|
|
|
|
|
@app.post('/webapi')
|
|
async def fa_webapi(request: Request):
|
|
body = await request.body()
|
|
logger.warning('webapi')
|
|
if 'has_perm' in body.decode('utf-8'):
|
|
return Response(content='<?xml version="1.0"?><methodResponse><params><param><value><struct><member><name>success</name><value><boolean>1</boolean></value></member></struct></value></param></params></methodResponse>',
|
|
media_type='application/xml')
|
|
else:
|
|
return Response(content='<?xml version="1.0"?><methodResponse><params><param><value><boolean>1</boolean></value></param></params></methodResponse>',
|
|
media_type='application/xml')
|
|
|
|
|
|
@app.post("/api/easo/PutObject")
|
|
async def fa_put_object(response: Response,
|
|
object_attrs: Annotated[str, Form()],
|
|
object_file: Annotated[UploadFile, File()]
|
|
):
|
|
date = datetime.datetime.now()
|
|
files = [{'name': object_file.filename, 'url': object_file.file.name, 'size': object_file.size}]
|
|
request_params = {
|
|
'from': '',
|
|
'to': '',
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '0',
|
|
'user_id_to': '0',
|
|
'query_type': 6,
|
|
'query_data': object_attrs,
|
|
}
|
|
put_object(request_params, files, None)
|
|
return {"Upload": "Ok"}
|
|
|
|
|
|
def main():
|
|
logger.setLevel(logging.INFO)
|
|
logger.warning('Use Control-C to exit')
|
|
|
|
xmlrpc_thread = threading.Thread(target=xmlrpc_task)
|
|
xmlrpc_thread.start()
|
|
|
|
thread = threading.Thread(target=run_tasks)
|
|
thread.start()
|
|
|
|
replication_thread = threading.Thread(target=replication_task)
|
|
replication_thread.start()
|
|
|
|
pika_thread = threading.Thread(target=pika_task)
|
|
pika_thread.start()
|
|
|
|
try:
|
|
logger.warning('Start server')
|
|
uvicorn.run(app, host="0.0.0.0", port=80)
|
|
except KeyboardInterrupt:
|
|
logger.warning('Exiting')
|
|
|
|
|
|
def vers_key(e):
|
|
return e.version()
|
|
|
|
|
|
def test():
|
|
# with open('/home/ashatora/catalog.json') as f:
|
|
# j = json.load(f)
|
|
# j1 = json.loads(j[1]['data'])
|
|
# with open('/home/ashatora/cat.json', 'w') as f:
|
|
# f.write(json.dumps(j1, indent=2, ensure_ascii=False))
|
|
# return
|
|
#params = {"from": "tcp://kptsp_vb", "query_data": "<?xml version=\"1.0\" encoding=\"utf-8\"?><request><header parcel_id=\"990715ba919544a98f22cc7d3b0d9e8d\"/><getMetadataByIds><chart id=\"fc44343bd1654ee7b03ac1731567bbfd\"/></getMetadataByIds></request>", "query_type": 24, "to": "tcp://bnd127", "user_id": "3302", "ts_added": 1679825320.653038}
|
|
#files = []
|
|
#url = 'http://127.0.0.1:7000/xmlrpc'
|
|
# accept(params, files, url)
|
|
#get_metadata(params, files, url)
|
|
#get_catalog()
|
|
# auth_response('123', 'bnd127', False)
|
|
# con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
|
# Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
|
# ws = OODBWorkspace.ws(Config.oodb_schema)
|
|
# ws.init(con)
|
|
# created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
|
|
# qu = GroupQuery(Envelope())
|
|
# qu.setUids(updated)
|
|
# qu.setLoadArch(True)
|
|
# uids = []
|
|
# ws.clearData(True)
|
|
# ws.load(qu, uids)
|
|
# updated_files = []
|
|
# for feature_uid in uids:
|
|
# not_files = True
|
|
# vers = ws.featureVersion(feature_uid)
|
|
# if len(vers) > 1:
|
|
# vers.sort(key=vers_key)
|
|
# not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
|
|
# if not not_files:
|
|
# updated_files.append(feature_uid)
|
|
# print(updated_files)
|
|
# ws.clearData(True)
|
|
# ws.close()
|
|
replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
|
|
pass
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
# test()
|