Files
xmlrpcserver-dev/xmlrpcserver/__main__.py
2024-04-30 19:59:25 +03:00

340 lines
10 KiB
Python

from tempfile import TemporaryDirectory, gettempdir
from typing import Optional, Any
import pika
import threading
import time
from queue import Queue
import datetime
import json
from uuid import uuid4, UUID
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
import logging
import io
import zlib
import os.path
import requests
from pika.channel import Channel
from .reqs_graphql import get_catalog, get_object
from pygost import gost34112012256
import xml.etree.ElementTree as ET
from .reqs.request_xml_service import RequestXmlService
import zipfile
from .config import Config
from .zip import Zip
import boto3
import xmlrpcserver.db as db
from sqlalchemy.orm import Session
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing_extensions import Annotated, Union
import pathlib
from shutil import make_archive
NEW_REPLICATION_REQUEST = 99
NEW_COMMIT_REQUEST = 98
NEW_COMMIT_RESPONSE = 1098
NEW_DATA_REQUEST = 97
NEW_DATA_RESPONSE = 1097
connected = set()
server: Optional[SimpleXMLRPCServer] = None
logger = logging.getLogger('xmlrpcserver')
connection: Optional[pika.BlockingConnection] = None
channel_send: Optional[Any] = None
channel_receive: Optional[Any] = None
def s3_connection():
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
aws_access_key_id=Config.s3_key_id,
aws_secret_access_key=Config.s3_access_key)
def download_file(key: str, bucket: str, filename: str):
client = s3_connection()
obj = client.get_object(Bucket=bucket, Key=key)
with open(f"{filename}", 'wb') as f:
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
f.write(chunk)
def upload_file(filename: str, key: str, bucket: str):
client = s3_connection()
with open(filename, 'rb') as f:
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
def pika_callback(ch, method, properties, body):
data = json.loads(body)
params = data['params']
url = data['url']
files = []
for file in params['files']:
fn = os.path.join(gettempdir(), uuid4().hex)
download_file(file['name'], file['bucket'], fn)
files.append(fn)
proxy = ServerProxy(url)
proxy.send(params, files, Config.ret_path)
ch.basic_ack(delivery_tag=method.delivery_tag)
def pika_task():
global connection
global channel_receive
channel_receive = connection.channel()
channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback)
channel_receive.start_consuming()
def aud_add(message):
if not isinstance(message, list):
logger.warning(message)
return 'OK'
for item in message:
logger.warning(item)
return 'OK'
def auth_response(challenge, server_id, is_server):
# logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}')
conn = db.connect_db()
with Session(conn) as session:
try:
user = session.query(db.User).filter((db.User.username == server_id)).one()
passwd = user.passwd
msg = '%s%s%s' % (challenge, server_id, passwd)
response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex()
session.commit()
return {'error': False, 'response': response}
except Exception:
return {'error': True, 'response': 'Wrong user/bnd'}
def auth_challenge():
logging.debug('get challenge')
return uuid4().hex
def put_object(params, files, url):
date = datetime.datetime.now()
req = ET.fromstring(params['query_data'])
obj = req.find('chart')
class_id = obj.get('Class')
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
logging.info(class_id)
fc = ws.featureClass(class_id)
feature = fc.createFeature('')
geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011()))
res = feature.setGeometry(geom)
for attr in obj.findall('Attribute'):
name = attr.get('name')
value = attr.get('value')
res &= feature.addAttribute(name, variantFromString(value))
assert len(files) == 1
file = files[0]
dir = TemporaryDirectory()
with zipfile.ZipFile(file, 'r') as zip_ref:
zip_ref.extractall(dir.name)
fp = pathlib.Path(dir.name)
for item in fp.glob('**/*'):
if not item.is_file():
continue
fileVal = FileValue()
key = uuid4().hex
fileVal.fileName = variantToString(item.relative_to(dir.name))
fileVal.key = variantToString(key)
fileVal.bucket = variantToString(Config.s3_bucket)
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
upload_file(str(item), key, Config.s3_bucket)
ws.transaction()
res = ws.save()
ws.commit(f'Putobject from {params["to"]}')
ws.clearData(True)
def accept(params, files, url):
global channel_send
files_s3 = []
for file in params['files']:
fn = uuid4().hex
upload_file(file, fn, Config.s3_bucket)
files_s3.append({'name': fn, 'bucket': Config.s3_bucket})
data = {
'params': params,
'files': files_s3,
'url': url,
}
channel_send.basic_publish(exchange=Config.rabbit_send_exchange, body=json.dumps(data))
logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False))
return True
def onSent(params, files, callback_url):
logger.debug('OnSent')
logger.warning(params)
def onDelivered(params, files, callback_url):
logger.warning('onDelivered')
def bnd_connected(bnd_name: str):
logger.warning(f'{bnd_name} connected')
connected.add(bnd_name)
def bnd_disconnected(bnd_name: str):
logger.warning(f'{bnd_name} disconnected')
if bnd_name in connected:
connected.remove(bnd_name)
def xmlrpc_task():
global server
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=True, allow_none=True)
server.register_function(aud_add)
server.register_function(auth_response)
server.register_function(auth_challenge)
server.register_function(accept)
server.register_function(onSent)
server.register_function(onDelivered)
server.register_function(bnd_connected)
server.register_function(bnd_disconnected)
server.serve_forever()
app = FastAPI()
app.add_middleware(CORSMiddleware,
allow_origins=['http://10.10.8.24:3000'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*']
)
@app.get("/")
async def fa_root():
return {"message": "Hello World"}
@app.get("/login")
async def fa_login_get(response: Response):
response.set_cookie(key='sessionid', value='87654321')
response.set_cookie(key='csrftoken', value='12345678')
return {"login": "Ok", "success": True}
@app.post("/login")
async def fa_login(response: Response):
logger.warning('login action')
response.set_cookie(key='sessionid', value='87654321')
response.set_cookie(key='csrftoken', value='12345678')
return {"login": "Ok", "success": True}
@app.post('/webapi')
async def fa_webapi(request: Request):
body = await request.body()
logger.warning('webapi')
if 'has_perm' in body.decode('utf-8'):
return Response(
content='<?xml version="1.0"?><methodResponse><params><param><value><struct><member><name>success</name><value><boolean>1</boolean></value></member></struct></value></param></params></methodResponse>',
media_type='application/xml')
else:
return Response(
content='<?xml version="1.0"?><methodResponse><params><param><value><boolean>1</boolean></value></param></params></methodResponse>',
media_type='application/xml')
@app.post("/api/easo/PutObject")
async def fa_put_object(response: Response,
object_attrs: Annotated[str, Form()],
object_file: Annotated[UploadFile, File()]
):
date = datetime.datetime.now()
files = [{'name': object_file.filename, 'url': object_file.file.name, 'size': object_file.size}]
request_params = {
'from': '',
'to': '',
'ts_added': date.timestamp(),
'user_id': '0',
'user_id_to': '0',
'query_type': 6,
'query_data': object_attrs,
}
put_object(request_params, files, None)
return {"Upload": "Ok"}
@app.get("/cr")
async def correction_replication(bnd_name: str, schema: str):
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
ET.SubElement(res, 'currentCommit', {'scheme': schema})
params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{bnd_name}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_COMMIT_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
proxy = ServerProxy(Config.enserver)
try:
proxy.send(params, [], Config.ret_path)
except:
logger.error('Error sending')
def main():
global connection
global channel_send
global server
logger.setLevel(logging.INFO)
logger.warning('Use Control-C to exit')
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel_send = connection.channel()
xmlrpc_thread = threading.Thread(target=xmlrpc_task)
xmlrpc_thread.start()
pika_thread = threading.Thread(target=pika_task)
pika_thread.start()
try:
logger.warning('Start server')
uvicorn.run(app, host="0.0.0.0", port=8000)
except KeyboardInterrupt:
logger.warning('Exiting')
finally:
server.server_close()
def vers_key(e):
return e.version()
if __name__ == '__main__':
main()