Files
request-itv/request_itv/__main__.py
2024-05-01 09:59:22 +03:00

306 lines
9.6 KiB
Python

from json import JSONDecodeError
from tempfile import TemporaryDirectory, gettempdir
from typing import Optional, Any
import pika
import threading
import time
from queue import Queue
import datetime
import json
from uuid import uuid4, UUID
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
import logging
import io
import zlib
import os.path
import requests
from .reqs_graphql import get_catalog, get_object
import xml.etree.ElementTree as ET
from .reqs.request_xml_service import RequestXmlService
import zipfile
from .config import Config
from .zip import Zip
import boto3
import request_itv.db as db
from sqlalchemy.orm import Session
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing_extensions import Annotated
import pathlib
from shutil import make_archive
tasks = Queue()
connection: Optional[pika.BlockingConnection] = None
channel_send: Optional[Any] = None
channel_receive: Optional[Any] = None
server: Optional[SimpleXMLRPCServer] = None
logger = logging.getLogger('xmlrpcserver')
def s3_connection():
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
aws_access_key_id=Config.s3_key_id,
aws_secret_access_key=Config.s3_access_key)
def download_file(key: str, bucket: str, filename: str):
client = s3_connection()
obj = client.get_object(Bucket=bucket, Key=key)
with open(f"{filename}", 'wb') as f:
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
f.write(chunk)
def upload_file(filename: str, key: str, bucket: str):
client = s3_connection()
with open(filename, 'rb') as f:
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
def restore_uuid(oid):
uuid = UUID(oid)
return str(uuid)
def pika_callback(ch, method, properties, body):
try:
data = json.loads(body)
params = data['params']
url = data['url']
files = []
for file in data['files']:
fn = os.path.join(gettempdir(), uuid4().hex)
download_file(file['url']['name'], file['url']['bucket'], fn)
file['url'] = fn
files.append(file)
run_task(params['query_type'], params, files, url)
except JSONDecodeError as e:
logging.warning(e)
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def send_response(params, files, url):
global channel_send
files_s3 = []
for file in files:
fn = uuid4().hex
upload_file(file['url'], fn, Config.s3_bucket)
file['url'] = {'name': fn, 'bucket': Config.s3_bucket}
files_s3.append(file)
data = {
'params': params,
'files': files_s3,
'url': url,
}
channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='')
def pika_task():
global connection
global channel_receive
channel_receive = connection.channel()
channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback)
channel_receive.start_consuming()
def load_catalog(params, files, url):
logger.warning('load_catalog')
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, req_id)
rxmls.set_result(res, 0, '')
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': params['user_id'],
'query_type': 1004,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
catalog = get_catalog()
logging.debug('Catalog_loaded')
filename = uuid4().hex
filepath = '/tmp/' + filename
zipf = zipfile.ZipFile(filepath, "w")
zipf.writestr('WF.CLL', catalog)
zipf.close()
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
send_response(response_params, response_files, url)
def get_objects(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, req_id)
rxmls.set_result(res, 0, '')
objs = req.find('objects')
uids = [restore_uuid(x.get('object_id')) for x in objs.findall('object')]
rxmls.set_result(res, 0, '')
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': params['user_id'],
'query_type': 1001,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
filename = uuid4().hex
filepath = '/tmp/' + filename
zipf = zipfile.ZipFile(filepath, "w")
main_filename = None
for uid in uids:
obj = json.loads(get_object(uid))
for file in obj['properties'].get('c1000', []):
if not main_filename:
main_filename = file['fileName']
res = requests.get(Config.gql_download, params={'item_id': file["key"]})
zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content)
zipf.close()
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
send_response(response_params, response_files, url)
def get_metadata(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, req_id)
rxmls.set_result(res, 0, '')
objs = req.find('getMetadataByIds')
uids = [restore_uuid(x.get('id')) for x in objs.findall('chart')]
rxmls.set_result(res, 0, '')
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': params['user_id'],
'query_type': 1024,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
filename = uuid4().hex
filepath = '/tmp/' + filename
zipf = zipfile.ZipFile(filepath, "w")
content = ET.Element('getMetadataResponse')
for uid in uids:
obj = json.loads(get_object(uid))
date = datetime.datetime.fromisoformat(obj['date_updated'])
chart = ET.SubElement(content, 'chart', {
'id': UUID(obj['uid']).hex,
'updated': str(date.timestamp()),
})
for key in obj['properties']:
if not key.startswith('c'):
continue
mdel = ET.SubElement(chart, 'mdItem', {
'code': key.replace('_', '.'),
'name': key,
'value': str(obj['properties'].get(key, '')),
'isBase': 'false',
'groupId': '',
'groupName': '',
})
zipf.writestr(f'metadata.xml', ET.tostring(content, encoding='unicode', xml_declaration=True))
zipf.close()
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
send_response(response_params, response_files, url)
def put_object(params, files, url):
date = datetime.datetime.now()
req = ET.fromstring(params['query_data'])
obj = req.find('chart')
class_id = obj.get('Class')
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
logging.info(class_id)
fc = ws.featureClass(class_id)
feature = fc.createFeature('')
geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011()))
res = feature.setGeometry(geom)
for attr in obj.findall('Attribute'):
name = attr.get('name')
value = attr.get('value')
res &= feature.addAttribute(name, variantFromString(value))
assert len(files) == 1
file = files[0]
dir = TemporaryDirectory()
with zipfile.ZipFile(file, 'r') as zip_ref:
zip_ref.extractall(dir.name)
fp = pathlib.Path(dir.name)
for item in fp.glob('**/*'):
if not item.is_file():
continue
fileVal = FileValue()
key = uuid4().hex
fileVal.fileName = variantToString(item.relative_to(dir.name))
fileVal.key = variantToString(key)
fileVal.bucket = variantToString(Config.s3_bucket)
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
upload_file(str(item), key, Config.s3_bucket)
ws.transaction()
res = ws.save()
ws.commit(f'Putobject from {params["to"]}')
ws.clearData(True)
def run_task(query_type, params, files, url):
if query_type == 4:
load_catalog(params, files, url)
if query_type == 1:
get_objects(params, files, url)
if query_type == 24:
get_metadata(params, files, url)
if query_type == 6:
put_object(params, files, url)
def main():
global connection
global server
global channel_send
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel_send = connection.channel()
logger.setLevel(logging.INFO)
logger.warning('Use Control-C to exit')
pika_task()
def vers_key(e):
return e.version()
if __name__ == '__main__':
main()