.
This commit is contained in:
310
xmlrpcserver/__main__.py
Normal file
310
xmlrpcserver/__main__.py
Normal file
@@ -0,0 +1,310 @@
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Optional, Any
|
||||
|
||||
import pika
|
||||
import threading
|
||||
import time
|
||||
from queue import Queue
|
||||
import datetime
|
||||
import json
|
||||
from uuid import uuid4, UUID
|
||||
from xmlrpc.server import SimpleXMLRPCServer
|
||||
from xmlrpc.client import ServerProxy
|
||||
import logging
|
||||
import io
|
||||
import zlib
|
||||
import os.path
|
||||
import requests
|
||||
from .reqs_graphql import get_catalog, get_object
|
||||
from pygost import gost34112012256
|
||||
import xml.etree.ElementTree as ET
|
||||
from .reqs.request_xml_service import RequestXmlService
|
||||
import zipfile
|
||||
from .config import Config
|
||||
from .zip import Zip
|
||||
import boto3
|
||||
import xmlrpcserver.db as db
|
||||
from sqlalchemy.orm import Session
|
||||
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
import uvicorn
|
||||
from typing_extensions import Annotated
|
||||
import pathlib
|
||||
from shutil import make_archive
|
||||
|
||||
NEW_REPLICATION_REQUEST = 99
|
||||
NEW_COMMIT_REQUEST = 98
|
||||
NEW_COMMIT_RESPONSE = 1098
|
||||
NEW_DATA_REQUEST = 97
|
||||
NEW_DATA_RESPONSE = 1097
|
||||
|
||||
connected = set()
|
||||
|
||||
server: Optional[SimpleXMLRPCServer] = None
|
||||
|
||||
logger = logging.getLogger('xmlrpcserver')
|
||||
|
||||
|
||||
def s3_connection():
|
||||
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
|
||||
aws_access_key_id=Config.s3_key_id,
|
||||
aws_secret_access_key=Config.s3_access_key)
|
||||
|
||||
|
||||
def download_file(key: str, bucket: str, filename: str):
|
||||
client = s3_connection()
|
||||
obj = client.get_object(Bucket=bucket, Key=key)
|
||||
with open(f"{filename}", 'wb') as f:
|
||||
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
|
||||
f.write(chunk)
|
||||
|
||||
|
||||
def upload_file(filename: str, key: str, bucket: str):
|
||||
client = s3_connection()
|
||||
with open(filename, 'rb') as f:
|
||||
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
|
||||
|
||||
|
||||
def get_branch(bndname: str, scheme: str):
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(
|
||||
db.User).filter_by(bndname=bndname).one_or_none()
|
||||
if item:
|
||||
return item.branch, item.local_scheme
|
||||
return None, None
|
||||
|
||||
|
||||
def list_contents(dir_name):
|
||||
logger.warning('list_contents(%s)', dir_name)
|
||||
return os.listdir(dir_name)
|
||||
|
||||
|
||||
def aud_add(message):
|
||||
if not isinstance(message, list):
|
||||
logger.warning(message)
|
||||
return 'OK'
|
||||
for item in message:
|
||||
logger.warning(item)
|
||||
return 'OK'
|
||||
|
||||
|
||||
def auth_response(challenge, server_id, is_server):
|
||||
# logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}')
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
try:
|
||||
user = session.query(db.User).filter((db.User.username == server_id)).one()
|
||||
passwd = user.passwd
|
||||
msg = '%s%s%s' % (challenge, server_id, passwd)
|
||||
response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex()
|
||||
session.commit()
|
||||
return {'error': False, 'response': response}
|
||||
except Exception:
|
||||
return {'error': True, 'response': 'Wrong user/bnd'}
|
||||
|
||||
|
||||
def auth_challenge():
|
||||
logging.debug('get challenge')
|
||||
return uuid4().hex
|
||||
|
||||
|
||||
def put_object(params, files, url):
|
||||
date = datetime.datetime.now()
|
||||
req = ET.fromstring(params['query_data'])
|
||||
obj = req.find('chart')
|
||||
class_id = obj.get('Class')
|
||||
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
||||
ws = OODBWorkspace.ws(Config.oodb_schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
logging.info(class_id)
|
||||
fc = ws.featureClass(class_id)
|
||||
feature = fc.createFeature('')
|
||||
geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011()))
|
||||
res = feature.setGeometry(geom)
|
||||
for attr in obj.findall('Attribute'):
|
||||
name = attr.get('name')
|
||||
value = attr.get('value')
|
||||
res &= feature.addAttribute(name, variantFromString(value))
|
||||
|
||||
assert len(files) == 1
|
||||
file = files[0]
|
||||
dir = TemporaryDirectory()
|
||||
with zipfile.ZipFile(file, 'r') as zip_ref:
|
||||
zip_ref.extractall(dir.name)
|
||||
|
||||
fp = pathlib.Path(dir.name)
|
||||
for item in fp.glob('**/*'):
|
||||
if not item.is_file():
|
||||
continue
|
||||
fileVal = FileValue()
|
||||
key = uuid4().hex
|
||||
fileVal.fileName = variantToString(item.relative_to(dir.name))
|
||||
fileVal.key = variantToString(key)
|
||||
fileVal.bucket = variantToString(Config.s3_bucket)
|
||||
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
|
||||
upload_file(str(item), key, Config.s3_bucket)
|
||||
|
||||
ws.transaction()
|
||||
res = ws.save()
|
||||
ws.commit(f'Putobject from {params["to"]}')
|
||||
ws.clearData(True)
|
||||
|
||||
|
||||
def accept(params, files, url):
|
||||
print(params, files, url)
|
||||
logger.warning('Accept: ' + json.dumps(params, ensure_ascii=False))
|
||||
# TODO: run_task(params['query_type'], params, files, url)
|
||||
return True
|
||||
|
||||
|
||||
def onSent(params, files, callback_url):
|
||||
logger.debug('OnSent')
|
||||
logger.warning(params)
|
||||
|
||||
|
||||
def onDelivered(params, files, callback_url):
|
||||
logger.warning('onDelivered')
|
||||
|
||||
|
||||
def bnd_connected(bnd_name: str):
|
||||
logger.warning(f'{bnd_name} connected')
|
||||
connected.add(bnd_name)
|
||||
|
||||
|
||||
def bnd_disconnected(bnd_name: str):
|
||||
logger.warning(f'{bnd_name} disconnected')
|
||||
if bnd_name in connected:
|
||||
connected.remove(bnd_name)
|
||||
|
||||
|
||||
def xmlrpc_task():
|
||||
global server
|
||||
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=True, allow_none=True)
|
||||
server.register_function(list_contents)
|
||||
server.register_function(aud_add)
|
||||
server.register_function(auth_response)
|
||||
server.register_function(auth_challenge)
|
||||
server.register_function(accept)
|
||||
server.register_function(onSent)
|
||||
server.register_function(onDelivered)
|
||||
server.register_function(bnd_connected)
|
||||
server.register_function(bnd_disconnected)
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
app.add_middleware(CORSMiddleware,
|
||||
allow_origins=['http://10.10.8.24:3000'],
|
||||
allow_credentials=True,
|
||||
allow_methods=['*'],
|
||||
allow_headers=['*']
|
||||
)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def fa_root():
|
||||
return {"message": "Hello World"}
|
||||
|
||||
|
||||
@app.get("/login")
|
||||
async def fa_login_get(response: Response):
|
||||
response.set_cookie(key='sessionid', value='87654321')
|
||||
response.set_cookie(key='csrftoken', value='12345678')
|
||||
return {"login": "Ok", "success": True}
|
||||
|
||||
|
||||
@app.post("/login")
|
||||
async def fa_login(response: Response):
|
||||
logger.warning('login action')
|
||||
response.set_cookie(key='sessionid', value='87654321')
|
||||
response.set_cookie(key='csrftoken', value='12345678')
|
||||
return {"login": "Ok", "success": True}
|
||||
|
||||
|
||||
@app.post('/webapi')
|
||||
async def fa_webapi(request: Request):
|
||||
body = await request.body()
|
||||
logger.warning('webapi')
|
||||
if 'has_perm' in body.decode('utf-8'):
|
||||
return Response(
|
||||
content='<?xml version="1.0"?><methodResponse><params><param><value><struct><member><name>success</name><value><boolean>1</boolean></value></member></struct></value></param></params></methodResponse>',
|
||||
media_type='application/xml')
|
||||
else:
|
||||
return Response(
|
||||
content='<?xml version="1.0"?><methodResponse><params><param><value><boolean>1</boolean></value></param></params></methodResponse>',
|
||||
media_type='application/xml')
|
||||
|
||||
|
||||
@app.post("/api/easo/PutObject")
|
||||
async def fa_put_object(response: Response,
|
||||
object_attrs: Annotated[str, Form()],
|
||||
object_file: Annotated[UploadFile, File()]
|
||||
):
|
||||
date = datetime.datetime.now()
|
||||
files = [{'name': object_file.filename, 'url': object_file.file.name, 'size': object_file.size}]
|
||||
request_params = {
|
||||
'from': '',
|
||||
'to': '',
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '0',
|
||||
'user_id_to': '0',
|
||||
'query_type': 6,
|
||||
'query_data': object_attrs,
|
||||
}
|
||||
put_object(request_params, files, None)
|
||||
return {"Upload": "Ok"}
|
||||
|
||||
|
||||
@app.get("/cr")
|
||||
async def correction_replication(bnd_name: str, schema: str):
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
res_id = uuid4().hex
|
||||
|
||||
res = rxmls.get_request_document(res_id, None)
|
||||
ET.SubElement(res, 'currentCommit', {'scheme': schema})
|
||||
params = {
|
||||
'from': f'tcp://{Config.self_bnd}',
|
||||
'to': f'tcp://{bnd_name}',
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '0',
|
||||
'query_type': NEW_COMMIT_REQUEST,
|
||||
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
|
||||
}
|
||||
proxy = ServerProxy(Config.enserver)
|
||||
try:
|
||||
proxy.send(params, [], Config.ret_path)
|
||||
except:
|
||||
logger.error('Error sending')
|
||||
|
||||
|
||||
def main():
|
||||
global connection
|
||||
global server
|
||||
global channel
|
||||
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.warning('Use Control-C to exit')
|
||||
|
||||
xmlrpc_thread = threading.Thread(target=xmlrpc_task)
|
||||
xmlrpc_thread.start()
|
||||
|
||||
try:
|
||||
logger.warning('Start server')
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
except KeyboardInterrupt:
|
||||
logger.warning('Exiting')
|
||||
finally:
|
||||
server.server_close()
|
||||
|
||||
|
||||
def vers_key(e):
|
||||
return e.version()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user