Files
2024-06-10 08:34:39 +03:00

601 lines
23 KiB
Python

from json import JSONDecodeError
from tempfile import TemporaryDirectory, gettempdir
from typing import Optional, Any
import pika
import threading
import time
from queue import Queue
import datetime
import json
from uuid import uuid4, UUID
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
import logging
import io
import zlib
import os.path
import requests
import xml.etree.ElementTree as ET
from .reqs.request_xml_service import RequestXmlService
import zipfile
from .config import config
from .zip import Zip
import boto3
import replication_itv.db as db
from sqlalchemy.orm import Session
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing_extensions import Annotated
import pathlib
from shutil import make_archive
NEW_REPLICATION_REQUEST = 99
NEW_COMMIT_REQUEST = 98
NEW_COMMIT_RESPONSE = 1098
NEW_DATA_REQUEST = 97
NEW_DATA_RESPONSE = 1097
tasks = Queue()
connected = set()
server: Optional[SimpleXMLRPCServer] = None
logger = logging.getLogger('xmlrpcserver')
def s3_connection():
return boto3.client('s3', endpoint_url=config.default.REPLICATION_ITV.s3.endpoint,
aws_access_key_id=config.default.REPLICATION_ITV.s3.key_id,
aws_secret_access_key=config.default.REPLICATION_ITV.s3.access_key)
def download_file(key: str, bucket: str, filename: str):
client = s3_connection()
obj = client.get_object(Bucket=bucket, Key=key)
with open(f"{filename}", 'wb') as f:
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
f.write(chunk)
def upload_file(filename: str, key: str, bucket: str):
client = s3_connection()
with open(filename, 'rb') as f:
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
def get_branch(bndname: str, scheme: str):
conn = db.connect_db()
with Session(conn) as session:
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(
db.User).filter_by(bndname=bndname).one_or_none()
if item:
return item.branch, item.local_scheme
return None, None
def replication_task():
while True:
conn = db.connect_db()
with Session(conn) as session:
for item in session.query(db.Queue).join(db.Queue.user).all():
bndname = item.user.bndname
if bndname not in connected:
continue
if item.user.is_active_now():
if item.user.newbnd:
replication(bndname, item.commit_id, item.schema)
else:
replication_old(bndname, item.commit_id, item.schema)
session.delete(item)
session.commit()
time.sleep(60)
def process_chart(ws, chart, chart_class):
for x in chart.get('properties') or {}:
attr = ws.attribute(x)
if not attr:
continue
if str(attr.type()) == 'AT_Domain':
dom = attr.domain()
key = str(chart['properties'][x])
chart['properties'][x] = variantToString(dom.value(variantFromString(key)))
if x == 'c103':
chart['properties'][x] = 'Несекретно'
chart['class'] = chart_class['name'].split('_')[-1]
return chart
def crc32(filename, chunk_size=65536):
"""Compute the CRC-32 checksum of the contents of the given filename"""
with open(filename, "rb") as f:
checksum = 0
while chunk := f.read(chunk_size):
checksum = zlib.crc32(chunk, checksum)
return "%08X" % (checksum & 0xFFFFFFFF)
def send_object_replication(bndname, ws, ids):
qu = GroupQuery(Envelope())
query = GroupQuery(Envelope.world())
query.setUids(ids)
uids = []
ws.load(query, uids)
res = json.loads(ws.dataToJson())
charts = [process_chart(ws, f, x) for x in res for f in x['features']]
for chart in charts:
logger.warning('\n')
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
res.set('replication_package', '1')
res.set('replication_version', date.strftime('%Y%m%d%H%M%S'))
res.set('user_permit', 'AA0AA00020200726D3E75C80B713A7A3D3E75C80B713A7A363F7CB889AA3F520')
rxmls.set_result(res, 0, '')
print(chart)
properties = chart.get('properties') or {}
c1000 = properties.get('c1000')
if not c1000:
logger.warning(f'No file for {chart["uid"].replace("-", "")}')
continue
z = Zip()
xml_objects = ET.SubElement(res, 'objects')
created_date = datetime.datetime.fromisoformat(chart.get('date_created'))
xml_chart = ET.SubElement(res, 'chart', {
'Class': chart['class'],
'ID': chart['uid'].replace('-', ''),
'Name': c1000[0]['fileName'],
'Type': 'OOD',
'metadata_version': '1',
'source': config.default.REPLICATION_ITV.self_bnd,
'system_date': str(created_date.timestamp()),
})
xml_version = ET.SubElement(xml_objects, 'version', {
'object_id': chart['uid'].replace('-', ''),
'source': config.default.REPLICATION_ITV.self_bnd,
'system_date': str(created_date.timestamp()),
'version': '1.0',
'version_id': chart['uid'].replace('-', ''),
})
total_size = 0
for file in c1000:
directory = os.path.join(z.dirname, f'maps/{res_id}/ENC_ROOT/{c1000[0]["fileName"]}')
fp = os.path.join(directory, file['fileName'])
if not os.path.exists(directory):
os.makedirs(directory)
download_file(file['key'], config.default.REPLICATION_ITV.s3.bucket, fp)
size = os.stat(fp).st_size
_crc32 = crc32(fp)
ET.SubElement(xml_version, 'file', {
'cell_file': 'false',
'crc32': _crc32,
'crc32_enc': _crc32,
'file_id': file['key'].replace('-', ''),
'file_ref': os.path.join(f'maps/{res_id}/ENC_ROOT', file['fileName']),
'file_size': str(size),
'file_size_enc': str(size),
})
xml_version.set('crc32', _crc32)
xml_version.set('crc32', _crc32)
total_size += size
xml_version.set('size', str(total_size))
xml_version.set('crc32', str(total_size))
xml_tags = ET.SubElement(res, 'tags')
xml_archs = ET.SubElement(res, 'archs')
xml_arch = ET.SubElement(xml_archs, 'arch', {
'obj_id': chart['uid'].replace('-', ''),
'ver_cl': '8.31',
'ver_id': chart['uid'].replace('-', ''),
})
for attribute in properties:
if attribute.startswith('c'):
ET.SubElement(xml_chart, 'Attribute', {
'name': attribute.replace('_', '.'),
'value': str(chart['properties'][attribute]),
})
ET.SubElement(xml_arch, 'attr', {
'code': attribute.replace('_', '.'),
'name': attribute.replace('_', '.'),
'value': str(chart['properties'][attribute]),
})
params = {
'from': f'tcp://{config.default.REPLICATION_ITV.self_bnd}',
'to': f'tcp://{bndname}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
filepath = z.pack()
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
send_response(params, response_files, config.default.REPLICATION_ITV.ret_path)
def replication_old(bnd_name: str, commit_id: str, schema: str):
logger.warning('Start replication')
if schema != config.default.REPLICATION_ITV.oodb.schema:
return
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port,
config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd,
schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
created, updated, _ = ws.changes(commit_id)
ids = list(set(created) | set(updated))
send_object_replication(bnd_name, ws, ids)
ws.clearData(True)
logger.warning('Replication to old bnd is sent')
def replication(bnd_name: str, commit_id: str, schema: str):
logging.warning(f'{bnd_name} {commit_id} {schema}')
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
rxmls.set_result(res, 0, '')
ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema})
response_params = {
'from': f'tcp://{config.default.REPLICATION_ITV.self_bnd}',
'to': f'tcp://{bnd_name}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port,
config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd,
schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
oe = IpdExporter(ws)
# commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"}
# commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"}
# commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"}
# commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"}
z = Zip()
pc = oe.previousCommit(commit_id)
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id)
oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id)
created, updated, deleted = ws.changes(commit_id)
qu = GroupQuery(Envelope())
qu.setUids(updated)
qu.setLoadArch(True)
uids = []
ws.clearData(True)
ws.load(qu, uids)
updated_files = []
for feature_uid in uids:
not_files = True
vers = ws.featureVersion(feature_uid)
if len(vers) > 1:
vers.sort(key=vers_key)
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
if not not_files:
for attr in vers[0].feature().attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}')
ws.clearData(True)
qu = GroupQuery(Envelope())
qu.setUids(created)
ws.load(qu, uids)
exported_files = []
for feature_uid in uids:
feature = ws.featureByUid(feature_uid)
if not feature:
continue
for attr in feature.attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# updated_files.append(feature_uid)
for x in updated_files:
exported_files.append({
'key': x.key,
'bucket': x.bucket,
'filename': x.fileName,
})
fp = os.path.join(z.dirname, x.key)
os.makedirs(os.path.dirname(fp), exist_ok=True)
download_file(x.key, x.bucket, fp)
with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f:
f.write(json.dumps(exported_files))
ws.clearData(True)
# ws.close()
filepath = z.pack()
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
send_response(response_params, response_files, config.default.REPLICATION_ITV.ret_path)
def pika_callback(ch, method, properties, body):
commit_info = json.loads(body)
logging.warning(commit_info)
schema = commit_info.get('schema') or config.default.REPLICATION_ITV.oodb.schema
commit = commit_info['commit']
conn = db.connect_db()
with Session(conn) as session:
for user in session.query(db.User).filter(db.User.active == True).all():
if user.bndname == config.default.REPLICATION_ITV.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
session.add(item)
session.commit()
ch.basic_ack(delivery_tag=method.delivery_tag)
def pika_task():
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_itv = connection.channel()
channel_itv.basic_consume(queue=config.default.REPLICATION_ITV.amqp.queue, on_message_callback=pika_callback)
channel_itv.start_consuming()
def status_callback(ch, method, properties, body):
status_info = json.loads(body)
if status_info['status'] == 'connected':
connected.add(status_info['bnd_name'])
elif status_info['status'] == 'disconnected':
connected.remove(status_info['bnd_name'])
ch.basic_ack(delivery_tag=method.delivery_tag)
def status_task():
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_status = connection.channel()
channel_status.basic_consume(queue=config.default.REPLICATION_ITV.amqp.status_queue, on_message_callback=status_callback)
channel_status.start_consuming()
def pika_itv_callback(ch, method, properties, body):
try:
data = json.loads(body)
params = data['params']
url = data['url']
files = []
for file in data['files']:
fn = os.path.join(gettempdir(), uuid4().hex)
download_file(file['url']['name'], file['url']['bucket'], fn)
file['url'] = fn
files.append(file)
run_task(params['query_type'], params, files, url)
except JSONDecodeError as e:
logging.warning(e)
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def pika_itv_task():
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_receive = connection.channel()
channel_receive.basic_consume(queue=config.default.REPLICATION_ITV.amqp.incoming_queue, on_message_callback=pika_itv_callback)
channel_receive.start_consuming()
def send_response(params, files, url):
files_s3 = []
for file in files:
fn = uuid4().hex
upload_file(file['url'], fn, config.default.REPLICATION_ITV.s3.bucket_itv)
file['url'] = {'name': fn, 'bucket': config.default.REPLICATION_ITV.s3.bucket_itv}
files_s3.append(file)
data = {
'params': params,
'files': files_s3,
'url': url,
}
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_send = connection.channel()
channel_send.basic_publish(exchange=config.default.REPLICATION_ITV.amqp.out_exchange, body=json.dumps(data), routing_key='')
def put_object(params, files, url):
date = datetime.datetime.now()
req = ET.fromstring(params['query_data'])
obj = req.find('chart')
class_id = obj.get('Class')
con = OOConnectionParams(config.default.REPLICATION_ITV.oodb.schema, config.default.REPLICATION_ITV.oodb.host,
config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd,
config.default.REPLICATION_ITV.oodb.schema)
ws = OODBWorkspace.ws(config.default.REPLICATION_ITV.oodb.schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
logging.info(class_id)
fc = ws.featureClass(class_id)
feature = fc.createFeature('')
geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011()))
res = feature.setGeometry(geom)
for attr in obj.findall('Attribute'):
name = attr.get('name')
value = attr.get('value')
res &= feature.addAttribute(name, variantFromString(value))
assert len(files) == 1
file = files[0]
dir = TemporaryDirectory()
with zipfile.ZipFile(file, 'r') as zip_ref:
zip_ref.extractall(dir.name)
fp = pathlib.Path(dir.name)
for item in fp.glob('**/*'):
if not item.is_file():
continue
fileVal = FileValue()
key = uuid4().hex
fileVal.fileName = variantToString(item.relative_to(dir.name))
fileVal.key = variantToString(key)
fileVal.bucket = variantToString(config.default.REPLICATION_ITV.s3.bucket)
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
upload_file(str(item), key, config.default.REPLICATION_ITV.s3.bucket)
ws.transaction()
res = ws.save()
ws.commit(f'Putobject from {params["to"]}')
ws.clearData(True)
def apply_commits(params, files, url):
logging.warning("Apply commits")
logging.warning(params)
assert len(files) == 1
file = files[0]
dir = TemporaryDirectory()
r = requests.get(file['url'])
with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref:
zip_ref.extractall(dir.name)
req = ET.fromstring(params['query_data'])
repl = req.find('replication')
scheme = repl.get('scheme')
commit = repl.get('id')
bnd = params['from'].replace('tcp://', '')
branch, new_scheme = get_branch(bnd, scheme)
if new_scheme:
scheme = new_scheme
con = OOConnectionParams(scheme, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port,
config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, scheme)
ws = OODBWorkspace.ws(scheme)
logging.warning("Connected to schema")
if config.default.REPLICATION_ITV.get('ws_rabbit_params') and not ws.hasProducer():
ws.setProducer(config.default.REPLICATION_ITV.ws_rabbit_params.host, config.default.REPLICATION_ITV.ws_rabbit_params.port,
config.default.REPLICATION_ITV.ws_rabbit_params.exchange, config.default.REPLICATION_ITV.ws_rabbit_params.user,
config.default.REPLICATION_ITV.ws_rabbit_params.password)
ws.setCurrentUser(bnd)
logging.warning("Set branch if needed")
if branch:
logging.warning(branch)
ws.switchBranch(branch)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
oe = IpdExporter(ws)
logging.warning("Importing...")
if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')):
logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}')
else:
logging.warning(f'Importing commit {commit} finished successfully')
with open(os.path.join(dir.name, 'export_files.json'), 'r') as f:
files_data = json.load(f)
for file_data in files_data:
upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket'])
logging.warning("Finished import")
ws.clearData(True)
def get_commit(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res_doc = rxmls.get_request_document(res_id, req_id)
schema = req.find('currentCommit').get('scheme')
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema})
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': 0,
'query_type': NEW_COMMIT_RESPONSE,
'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True)
}
send_response(response_params, [], url)
def query_commits(params, files, url):
req = ET.fromstring(params['query_data'])
commit_el = req.find('commit')
commit_id = commit_el.get('id')
schema = commit_el.get('schema')
bnd = params['from'].replace('tcp://', '')
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
schema_commits = ws.commits(ws.branch())
logger.warning(schema_commits)
if commit_id not in schema_commits:
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
return
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
conn = db.connect_db()
with Session(conn) as session:
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
if user.bndname == config.default.REPLICATION_ITV.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
logging.warning(item)
session.add(item)
session.commit()
def run_task(query_type, params, files, url):
if query_type == NEW_REPLICATION_REQUEST:
tasks.put(lambda: apply_commits(params, files, url))
if query_type == NEW_COMMIT_REQUEST:
tasks.put(lambda: get_commit(params, files, url))
if query_type == NEW_COMMIT_RESPONSE:
tasks.put(lambda: query_commits(params, files, url))
def main():
global server
logger.setLevel(logging.INFO)
logger.warning('Use Control-C to exit')
replication_thread = threading.Thread(target=replication_task)
replication_thread.start()
pika_thread = threading.Thread(target=pika_task)
pika_thread.start()
status_thread = threading.Thread(target=status_task)
status_thread.start()
pika_itv_task()
def vers_key(e):
return e.version()
if __name__ == '__main__':
main()