Change config model
This commit is contained in:
@@ -19,10 +19,9 @@ import requests
|
||||
import xml.etree.ElementTree as ET
|
||||
from .reqs.request_xml_service import RequestXmlService
|
||||
import zipfile
|
||||
from .config import Config
|
||||
from .config import config
|
||||
from .zip import Zip
|
||||
import boto3
|
||||
import query_itv.db as db
|
||||
from sqlalchemy.orm import Session
|
||||
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
@@ -48,9 +47,9 @@ logger = logging.getLogger('xmlrpcserver')
|
||||
|
||||
|
||||
def s3_connection():
|
||||
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
|
||||
aws_access_key_id=Config.s3_key_id,
|
||||
aws_secret_access_key=Config.s3_access_key)
|
||||
return boto3.client('s3', endpoint_url=config.default.QUERY_ITV.s3.endpoint,
|
||||
aws_access_key_id=config.default.QUERY_ITV.s3.key_id,
|
||||
aws_secret_access_key=config.default.QUERY_ITV.s3.access_key)
|
||||
|
||||
|
||||
def download_file(key: str, bucket: str, filename: str):
|
||||
@@ -90,22 +89,22 @@ def send_response(params, files, url):
|
||||
files_s3 = []
|
||||
for file in files:
|
||||
fn = uuid4().hex
|
||||
upload_file(file['url'], fn, Config.s3_bucket_itv)
|
||||
file['url'] = {'name': fn, 'bucket': Config.s3_bucket_itv}
|
||||
upload_file(file['url'], fn, config.default.QUERY_ITV.s3.bucket_itv)
|
||||
file['url'] = {'name': fn, 'bucket': config.default.QUERY_ITV.s3.bucket_itv}
|
||||
files_s3.append(file)
|
||||
data = {
|
||||
'params': params,
|
||||
'files': files_s3,
|
||||
'url': url,
|
||||
}
|
||||
channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='')
|
||||
channel_send.basic_publish(exchange=config.default.QUERY_ITV.amqp.out_exchange, body=json.dumps(data), routing_key='')
|
||||
|
||||
|
||||
def pika_task():
|
||||
global connection
|
||||
global channel_receive
|
||||
channel_receive = connection.channel()
|
||||
channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_callback)
|
||||
channel_receive.basic_consume(queue=config.default.QUERY_ITV.amqp.incoming_queue, on_message_callback=pika_callback)
|
||||
channel_receive.start_consuming()
|
||||
|
||||
|
||||
@@ -121,9 +120,11 @@ def put_object(params, files, url):
|
||||
req = ET.fromstring(params['query_data'])
|
||||
obj = req.find('chart')
|
||||
class_id = obj.get('Class')
|
||||
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
||||
ws = OODBWorkspace.ws(Config.oodb_schema)
|
||||
con = OOConnectionParams(config.default.QUERY_ITV.oodb.schema, config.default.QUERY_ITV.oodb.host,
|
||||
config.default.QUERY_ITV.oodb.port, config.default.QUERY_ITV.oodb.dbname,
|
||||
config.default.QUERY_ITV.oodb.username, config.default.QUERY_ITV.oodb.passwd,
|
||||
config.default.QUERY_ITV.oodb.schema)
|
||||
ws = OODBWorkspace.ws(config.default.QUERY_ITV.oodb.schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
@@ -151,9 +152,9 @@ def put_object(params, files, url):
|
||||
key = uuid4().hex
|
||||
fileVal.fileName = variantToString(item.relative_to(dir.name))
|
||||
fileVal.key = variantToString(key)
|
||||
fileVal.bucket = variantToString(Config.s3_bucket)
|
||||
fileVal.bucket = variantToString(config.default.QUERY_ITV.s3.bucket)
|
||||
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
|
||||
upload_file(str(item), key, Config.s3_bucket)
|
||||
upload_file(str(item), key, config.default.QUERY_ITV.s3.bucket)
|
||||
|
||||
ws.transaction()
|
||||
res = ws.save()
|
||||
@@ -196,45 +197,9 @@ def get_data(params, files, url):
|
||||
send_response(response_params, response_files, url)
|
||||
|
||||
|
||||
def receive_data(params, files, url):
|
||||
req = ET.fromstring(params['query_data'])
|
||||
commit_el = req.find('commit')
|
||||
commit_id = commit_el.get('id')
|
||||
schema = commit_el.get('schema')
|
||||
bnd = params['from'].replace('tcp://', '')
|
||||
|
||||
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, schema)
|
||||
ws = OODBWorkspace.ws(schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
schema_commits = ws.commits(ws.branch())
|
||||
logger.warning(schema_commits)
|
||||
if commit_id not in schema_commits:
|
||||
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
|
||||
return
|
||||
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
|
||||
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
|
||||
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
|
||||
if user.bndname == Config.self_bnd:
|
||||
continue
|
||||
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||
if len(profiles) == 0 or schema in profiles:
|
||||
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||
logging.warning(item)
|
||||
session.add(item)
|
||||
session.commit()
|
||||
|
||||
|
||||
def run_task(query_type, params, files, url):
|
||||
if query_type == NEW_DATA_REQUEST:
|
||||
get_data(params, files, url)
|
||||
if query_type == NEW_DATA_RESPONSE:
|
||||
receive_data(params, files, url)
|
||||
|
||||
|
||||
def main():
|
||||
@@ -242,7 +207,7 @@ def main():
|
||||
global server
|
||||
global channel_send
|
||||
|
||||
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
|
||||
connection = pika.BlockingConnection(pika.URLParameters(config.default.QUERY_ITV.amqp.conn))
|
||||
channel_send = connection.channel()
|
||||
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
Reference in New Issue
Block a user