307 lines
10 KiB
Python
307 lines
10 KiB
Python
from json import JSONDecodeError
|
|
from tempfile import TemporaryDirectory, gettempdir
|
|
from typing import Optional, Any
|
|
|
|
import pika
|
|
import threading
|
|
import time
|
|
from queue import Queue
|
|
import datetime
|
|
import json
|
|
from uuid import uuid4, UUID
|
|
from xmlrpc.server import SimpleXMLRPCServer
|
|
from xmlrpc.client import ServerProxy
|
|
import logging
|
|
import io
|
|
import zlib
|
|
import os.path
|
|
import requests
|
|
from .reqs_graphql import get_catalog, get_object
|
|
import xml.etree.ElementTree as ET
|
|
from .reqs.request_xml_service import RequestXmlService
|
|
import zipfile
|
|
from .config import config
|
|
from .zip import Zip
|
|
import boto3
|
|
from sqlalchemy.orm import Session
|
|
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
import uvicorn
|
|
from typing_extensions import Annotated
|
|
import pathlib
|
|
from shutil import make_archive
|
|
|
|
|
|
tasks = Queue()
|
|
|
|
connection: Optional[pika.BlockingConnection] = None
|
|
channel_send: Optional[Any] = None
|
|
channel_receive: Optional[Any] = None
|
|
|
|
server: Optional[SimpleXMLRPCServer] = None
|
|
|
|
logger = logging.getLogger('xmlrpcserver')
|
|
|
|
|
|
def s3_connection():
|
|
return boto3.client('s3', endpoint_url=config.default.REQUEST_ITV.s3.endpoint,
|
|
aws_access_key_id=config.default.REQUEST_ITV.s3.key_id,
|
|
aws_secret_access_key=config.default.REQUEST_ITV.s3.access_key)
|
|
|
|
|
|
def download_file(key: str, bucket: str, filename: str):
|
|
client = s3_connection()
|
|
obj = client.get_object(Bucket=bucket, Key=key)
|
|
with open(f"{filename}", 'wb') as f:
|
|
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
|
|
f.write(chunk)
|
|
|
|
|
|
def upload_file(filename: str, key: str, bucket: str):
|
|
client = s3_connection()
|
|
with open(filename, 'rb') as f:
|
|
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
|
|
|
|
|
|
def restore_uuid(oid):
|
|
uuid = UUID(oid)
|
|
return str(uuid)
|
|
|
|
|
|
def pika_callback(ch, method, properties, body):
|
|
try:
|
|
data = json.loads(body)
|
|
params = data['params']
|
|
url = data['url']
|
|
files = []
|
|
for file in data['files']:
|
|
fn = os.path.join(gettempdir(), uuid4().hex)
|
|
download_file(file['url']['name'], file['url']['bucket'], fn)
|
|
file['url'] = fn
|
|
files.append(file)
|
|
run_task(params['query_type'], params, files, url)
|
|
except JSONDecodeError as e:
|
|
logging.warning(e)
|
|
finally:
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
|
|
|
|
def send_response(params, files, url):
|
|
global channel_send
|
|
files_s3 = []
|
|
for file in files:
|
|
fn = uuid4().hex
|
|
upload_file(file['url'], fn, config.default.REQUEST_ITV.s3.bucket_itv)
|
|
file['url'] = {'name': fn, 'bucket': config.default.REQUEST_ITV.s3.bucket_itv}
|
|
files_s3.append(file)
|
|
data = {
|
|
'params': params,
|
|
'files': files_s3,
|
|
'url': url,
|
|
}
|
|
channel_send.basic_publish(exchange=config.default.REQUEST_ITV.amqp.out_exchange, body=json.dumps(data), routing_key='')
|
|
|
|
|
|
def pika_task():
|
|
global connection
|
|
global channel_receive
|
|
channel_receive = connection.channel()
|
|
channel_receive.basic_consume(queue=config.default.REQUEST_ITV.amqp.incoming_queue, on_message_callback=pika_callback)
|
|
channel_receive.start_consuming()
|
|
|
|
|
|
def load_catalog(params, files, url):
|
|
logger.warning('load_catalog')
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
req = ET.fromstring(params['query_data'])
|
|
req_id = rxmls.get_request_uuid(req)
|
|
res_id = uuid4().hex
|
|
res = rxmls.get_request_document(res_id, req_id)
|
|
rxmls.set_result(res, 0, '')
|
|
response_params = {
|
|
'from': params['to'],
|
|
'to': params['from'],
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '1',
|
|
'user_id_to': params['user_id'],
|
|
'query_type': 1004,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
|
}
|
|
catalog = get_catalog()
|
|
logging.debug('Catalog_loaded')
|
|
filename = uuid4().hex
|
|
filepath = '/tmp/' + filename
|
|
zipf = zipfile.ZipFile(filepath, "w")
|
|
zipf.writestr('WF.CLL', catalog)
|
|
zipf.close()
|
|
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
send_response(response_params, response_files, url)
|
|
|
|
|
|
def get_objects(params, files, url):
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
req = ET.fromstring(params['query_data'])
|
|
req_id = rxmls.get_request_uuid(req)
|
|
res_id = uuid4().hex
|
|
res = rxmls.get_request_document(res_id, req_id)
|
|
rxmls.set_result(res, 0, '')
|
|
objs = req.find('objects')
|
|
uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')]
|
|
|
|
rxmls.set_result(res, 0, '')
|
|
response_params = {
|
|
'from': params['to'],
|
|
'to': params['from'],
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '1',
|
|
'user_id_to': params['user_id'],
|
|
'query_type': 1001,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
|
}
|
|
filename = uuid4().hex
|
|
filepath = '/tmp/' + filename
|
|
zipf = zipfile.ZipFile(filepath, "w")
|
|
main_filename = None
|
|
for uid in uids:
|
|
obj = json.loads(get_object(uid))
|
|
for file in obj['properties'].get('c1000', []):
|
|
if not main_filename:
|
|
main_filename = file['fileName']
|
|
res = requests.get(config.default.REQUEST_ITV.gql.download, params={'item_id': file["key"]})
|
|
zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content)
|
|
zipf.close()
|
|
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
send_response(response_params, response_files, url)
|
|
|
|
|
|
def get_metadata(params, files, url):
|
|
date = datetime.datetime.now()
|
|
rxmls = RequestXmlService()
|
|
req = ET.fromstring(params['query_data'])
|
|
req_id = rxmls.get_request_uuid(req)
|
|
res_id = uuid4().hex
|
|
res = rxmls.get_request_document(res_id, req_id)
|
|
rxmls.set_result(res, 0, '')
|
|
objs = req.find('getMetadataByIds')
|
|
uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')]
|
|
|
|
rxmls.set_result(res, 0, '')
|
|
response_params = {
|
|
'from': params['to'],
|
|
'to': params['from'],
|
|
'ts_added': date.timestamp(),
|
|
'user_id': '1',
|
|
'user_id_to': params['user_id'],
|
|
'query_type': 1024,
|
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
|
}
|
|
filename = uuid4().hex
|
|
filepath = '/tmp/' + filename
|
|
zipf = zipfile.ZipFile(filepath, "w")
|
|
content = ET.Element('getMetadataResponse')
|
|
|
|
for uid in uids:
|
|
obj = json.loads(get_object(uid))
|
|
date = datetime.datetime.fromisoformat(obj['date_updated'])
|
|
chart = ET.SubElement(content, 'chart', {
|
|
'id': UUID(obj['uid']).hex,
|
|
'updated': str(date.timestamp()),
|
|
})
|
|
for key in obj['properties']:
|
|
if not key.startswith('c'):
|
|
continue
|
|
mdel = ET.SubElement(chart, 'mdItem', {
|
|
'code': key.replace('_', '.'),
|
|
'name': key,
|
|
'value': str(obj['properties'].get(key, '')),
|
|
'isBase': 'false',
|
|
'groupId': '',
|
|
'groupName': '',
|
|
})
|
|
zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True))
|
|
zipf.close()
|
|
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
|
send_response(response_params, response_files, url)
|
|
|
|
|
|
def put_object(params, files, url):
|
|
date = datetime.datetime.now()
|
|
req = ET.fromstring(params['query_data'])
|
|
obj = req.find('chart')
|
|
class_id = obj.get('Class')
|
|
con = OOConnectionParams(config.default.REQUEST_ITV.oodb.schema, config.default.REQUEST_ITV.oodb.host,
|
|
config.default.REQUEST_ITV.oodb.port, config.default.REQUEST_ITV.oodb.dbname,
|
|
config.default.REQUEST_ITV.oodb.username, config.default.REQUEST_ITV.oodb.passwd,
|
|
config.default.REQUEST_ITV.oodb.schema)
|
|
ws = OODBWorkspace.ws(config.default.REQUEST_ITV.oodb.schema)
|
|
if not ws.isInit():
|
|
res = ws.init(con)
|
|
logger.warning(res)
|
|
logging.info(class_id)
|
|
fc = ws.featureClass(class_id)
|
|
feature = fc.createFeature('')
|
|
geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011()))
|
|
res = feature.setGeometry(geom)
|
|
for attr in obj.findall('Attribute'):
|
|
name = attr.get('name')
|
|
value = attr.get('value')
|
|
res &= feature.addAttribute(name, variantFromString(value))
|
|
|
|
assert len(files) == 1
|
|
file = files[0]
|
|
dir = TemporaryDirectory()
|
|
with zipfile.ZipFile(file, 'r') as zip_ref:
|
|
zip_ref.extractall(dir.name)
|
|
|
|
fp = pathlib.Path(dir.name)
|
|
for item in fp.glob('**/*'):
|
|
if not item.is_file():
|
|
continue
|
|
fileVal = FileValue()
|
|
key = uuid4().hex
|
|
fileVal.fileName = variantToString(item.relative_to(dir.name))
|
|
fileVal.key = variantToString(key)
|
|
fileVal.bucket = variantToString(config.default.REQUEST_ITV.s3.bucket)
|
|
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
|
|
upload_file(str(item), key, config.default.REQUEST_ITV.s3.bucket)
|
|
|
|
ws.transaction()
|
|
res = ws.save()
|
|
ws.commit(f'Putobject from {params["to"]}')
|
|
ws.clearData(True)
|
|
|
|
|
|
def run_task(query_type, params, files, url):
|
|
if query_type == 4:
|
|
load_catalog(params, files, url)
|
|
if query_type == 1:
|
|
get_objects(params, files, url)
|
|
if query_type == 24:
|
|
get_metadata(params, files, url)
|
|
if query_type == 6:
|
|
put_object(params, files, url)
|
|
|
|
|
|
def main():
|
|
global connection
|
|
global server
|
|
global channel_send
|
|
|
|
connection = pika.BlockingConnection(pika.URLParameters(config.default.REQUEST_ITV.amqp.conn))
|
|
channel_send = connection.channel()
|
|
|
|
logger.setLevel(logging.INFO)
|
|
logger.warning('Use Control-C to exit')
|
|
|
|
pika_task()
|
|
|
|
|
|
def vers_key(e):
|
|
return e.version()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|