From b4bd7716af06ccf86417c34f904b4c6edcd2153b Mon Sep 17 00:00:00 2001 From: ashatora Date: Mon, 10 Jun 2024 08:34:39 +0300 Subject: [PATCH] Change config model --- replication_itv/__main__.py | 95 ++++++++++++++++++++----------------- replication_itv/config.py | 35 +------------- replication_itv/config.yaml | 40 ++++++++++++++++ replication_itv/db.py | 4 +- requirements.txt | 1 + 5 files changed, 96 insertions(+), 79 deletions(-) create mode 100644 replication_itv/config.yaml diff --git a/replication_itv/__main__.py b/replication_itv/__main__.py index 14b3cb9..b8ed4e3 100644 --- a/replication_itv/__main__.py +++ b/replication_itv/__main__.py @@ -19,7 +19,7 @@ import requests import xml.etree.ElementTree as ET from .reqs.request_xml_service import RequestXmlService import zipfile -from .config import Config +from .config import config from .zip import Zip import boto3 import replication_itv.db as db @@ -48,9 +48,9 @@ logger = logging.getLogger('xmlrpcserver') def s3_connection(): - return boto3.client('s3', endpoint_url=Config.s3_endpoint, - aws_access_key_id=Config.s3_key_id, - aws_secret_access_key=Config.s3_access_key) + return boto3.client('s3', endpoint_url=config.default.REPLICATION_ITV.s3.endpoint, + aws_access_key_id=config.default.REPLICATION_ITV.s3.key_id, + aws_secret_access_key=config.default.REPLICATION_ITV.s3.access_key) def download_file(key: str, bucket: str, filename: str): @@ -153,12 +153,12 @@ def send_object_replication(bndname, ws, ids): 'Name': c1000[0]['fileName'], 'Type': 'OOD', 'metadata_version': '1', - 'source': Config.self_bnd, + 'source': config.default.REPLICATION_ITV.self_bnd, 'system_date': str(created_date.timestamp()), }) xml_version = ET.SubElement(xml_objects, 'version', { 'object_id': chart['uid'].replace('-', ''), - 'source': Config.self_bnd, + 'source': config.default.REPLICATION_ITV.self_bnd, 'system_date': str(created_date.timestamp()), 'version': '1.0', 'version_id': chart['uid'].replace('-', ''), @@ -169,7 +169,7 @@ def send_object_replication(bndname, ws, ids): fp = os.path.join(directory, file['fileName']) if not os.path.exists(directory): os.makedirs(directory) - download_file(file['key'], Config.s3_bucket, fp) + download_file(file['key'], config.default.REPLICATION_ITV.s3.bucket, fp) size = os.stat(fp).st_size _crc32 = crc32(fp) ET.SubElement(xml_version, 'file', { @@ -206,7 +206,7 @@ def send_object_replication(bndname, ws, ids): 'value': str(chart['properties'][attribute]), }) params = { - 'from': f'tcp://{Config.self_bnd}', + 'from': f'tcp://{config.default.REPLICATION_ITV.self_bnd}', 'to': f'tcp://{bndname}', 'ts_added': date.timestamp(), 'user_id': '0', @@ -217,15 +217,17 @@ def send_object_replication(bndname, ws, ids): response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] logger.warning(response_files) logging.debug('Send replication package') - send_response(params, response_files, Config.ret_path) + send_response(params, response_files, config.default.REPLICATION_ITV.ret_path) def replication_old(bnd_name: str, commit_id: str, schema: str): logger.warning('Start replication') - if schema != Config.oodb_schema: + if schema != config.default.REPLICATION_ITV.oodb.schema: return - con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - Config.oodb_username, Config.oodb_passwd, schema) + con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, + config.default.REPLICATION_ITV.oodb.dbname, + config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, + schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) @@ -247,15 +249,17 @@ def replication(bnd_name: str, commit_id: str, schema: str): rxmls.set_result(res, 0, '') ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema}) response_params = { - 'from': f'tcp://{Config.self_bnd}', + 'from': f'tcp://{config.default.REPLICATION_ITV.self_bnd}', 'to': f'tcp://{bnd_name}', 'ts_added': date.timestamp(), 'user_id': '0', 'query_type': NEW_REPLICATION_REQUEST, 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), } - con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - Config.oodb_username, Config.oodb_passwd, schema) + con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, + config.default.REPLICATION_ITV.oodb.dbname, + config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, + schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) @@ -318,18 +322,18 @@ def replication(bnd_name: str, commit_id: str, schema: str): response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] logger.warning(response_files) logging.debug('Send replication package') - send_response(response_params, response_files, Config.ret_path) + send_response(response_params, response_files, config.default.REPLICATION_ITV.ret_path) def pika_callback(ch, method, properties, body): commit_info = json.loads(body) logging.warning(commit_info) - schema = commit_info.get('schema') or Config.oodb_schema + schema = commit_info.get('schema') or config.default.REPLICATION_ITV.oodb.schema commit = commit_info['commit'] conn = db.connect_db() with Session(conn) as session: for user in session.query(db.User).filter(db.User.active == True).all(): - if user.bndname == Config.self_bnd: + if user.bndname == config.default.REPLICATION_ITV.self_bnd: continue profiles = {x.scheme: x.to_dict() for x in user.profiles} if len(profiles) == 0 or schema in profiles: @@ -340,9 +344,9 @@ def pika_callback(ch, method, properties, body): def pika_task(): - connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn)) channel_itv = connection.channel() - channel_itv.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) + channel_itv.basic_consume(queue=config.default.REPLICATION_ITV.amqp.queue, on_message_callback=pika_callback) channel_itv.start_consuming() @@ -356,9 +360,9 @@ def status_callback(ch, method, properties, body): def status_task(): - connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn)) channel_status = connection.channel() - channel_status.basic_consume(queue=Config.rabbit_status_queue, on_message_callback=status_callback) + channel_status.basic_consume(queue=config.default.REPLICATION_ITV.amqp.status_queue, on_message_callback=status_callback) channel_status.start_consuming() @@ -381,9 +385,9 @@ def pika_itv_callback(ch, method, properties, body): def pika_itv_task(): - connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn)) channel_receive = connection.channel() - channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_itv_callback) + channel_receive.basic_consume(queue=config.default.REPLICATION_ITV.amqp.incoming_queue, on_message_callback=pika_itv_callback) channel_receive.start_consuming() @@ -391,17 +395,17 @@ def send_response(params, files, url): files_s3 = [] for file in files: fn = uuid4().hex - upload_file(file['url'], fn, Config.s3_bucket_itv) - file['url'] = {'name': fn, 'bucket': Config.s3_bucket_itv} + upload_file(file['url'], fn, config.default.REPLICATION_ITV.s3.bucket_itv) + file['url'] = {'name': fn, 'bucket': config.default.REPLICATION_ITV.s3.bucket_itv} files_s3.append(file) data = { 'params': params, 'files': files_s3, 'url': url, } - connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn)) channel_send = connection.channel() - channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='') + channel_send.basic_publish(exchange=config.default.REPLICATION_ITV.amqp.out_exchange, body=json.dumps(data), routing_key='') def put_object(params, files, url): @@ -409,9 +413,11 @@ def put_object(params, files, url): req = ET.fromstring(params['query_data']) obj = req.find('chart') class_id = obj.get('Class') - con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) - ws = OODBWorkspace.ws(Config.oodb_schema) + con = OOConnectionParams(config.default.REPLICATION_ITV.oodb.schema, config.default.REPLICATION_ITV.oodb.host, + config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname, + config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, + config.default.REPLICATION_ITV.oodb.schema) + ws = OODBWorkspace.ws(config.default.REPLICATION_ITV.oodb.schema) if not ws.isInit(): res = ws.init(con) logger.warning(res) @@ -439,9 +445,9 @@ def put_object(params, files, url): key = uuid4().hex fileVal.fileName = variantToString(item.relative_to(dir.name)) fileVal.key = variantToString(key) - fileVal.bucket = variantToString(Config.s3_bucket) + fileVal.bucket = variantToString(config.default.REPLICATION_ITV.s3.bucket) res &= feature.addAttribute('c1000', variantFromFileValue(fileVal)) - upload_file(str(item), key, Config.s3_bucket) + upload_file(str(item), key, config.default.REPLICATION_ITV.s3.bucket) ws.transaction() res = ws.save() @@ -466,14 +472,15 @@ def apply_commits(params, files, url): branch, new_scheme = get_branch(bnd, scheme) if new_scheme: scheme = new_scheme - con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - Config.oodb_username, Config.oodb_passwd, scheme) + con = OOConnectionParams(scheme, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, + config.default.REPLICATION_ITV.oodb.dbname, + config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, scheme) ws = OODBWorkspace.ws(scheme) logging.warning("Connected to schema") - if Config.ws_rabbit_params and not ws.hasProducer(): - ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'], - Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'], - Config.ws_rabbit_params['password']) + if config.default.REPLICATION_ITV.get('ws_rabbit_params') and not ws.hasProducer(): + ws.setProducer(config.default.REPLICATION_ITV.ws_rabbit_params.host, config.default.REPLICATION_ITV.ws_rabbit_params.port, + config.default.REPLICATION_ITV.ws_rabbit_params.exchange, config.default.REPLICATION_ITV.ws_rabbit_params.user, + config.default.REPLICATION_ITV.ws_rabbit_params.password) ws.setCurrentUser(bnd) logging.warning("Set branch if needed") if branch: @@ -504,8 +511,8 @@ def get_commit(params, files, url): res_id = uuid4().hex res_doc = rxmls.get_request_document(res_id, req_id) schema = req.find('currentCommit').get('scheme') - con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - Config.oodb_username, Config.oodb_passwd, schema) + con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname, + config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) @@ -531,8 +538,8 @@ def query_commits(params, files, url): schema = commit_el.get('schema') bnd = params['from'].replace('tcp://', '') - con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - Config.oodb_username, Config.oodb_passwd, schema) + con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname, + config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, schema) ws = OODBWorkspace.ws(schema) if not ws.isInit(): res = ws.init(con) @@ -548,7 +555,7 @@ def query_commits(params, files, url): with Session(conn) as session: for commit in schema_commits[schema_commits.index(commit_id) + 1:]: for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): - if user.bndname == Config.self_bnd: + if user.bndname == config.default.REPLICATION_ITV.self_bnd: continue profiles = {x.scheme: x.to_dict() for x in user.profiles} if len(profiles) == 0 or schema in profiles: diff --git a/replication_itv/config.py b/replication_itv/config.py index b4db245..fcde172 100644 --- a/replication_itv/config.py +++ b/replication_itv/config.py @@ -1,34 +1,3 @@ +from bestconfig import Config -class Config: - ret_path: str = 'http://10.10.8.83:32200/' - self_bnd: str = 'bnd127' - enserver: str = 'http://10.10.8.83:32210/xmlrpc' - - pg_host: str = '10.10.8.83' - pg_port: int = 32101 - pg_dbname: str = 'db' - pg_username: str = 'postgres' - pg_password: str = 'Root12345678' - - oodb_host: str = '10.10.8.83' - oodb_port: int = 32100 - oodb_dbname: str = 'db' - oodb_username: str = 'postgres' - oodb_passwd: str = 'Root12345678' - oodb_schema: str = 'documents_src' - - rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f' - rabbit_queue: str = 'ipd' - rabbit_incoming_queue: str = 'ipd_queue_replication' - rabbit_out_exchange: str = 'ipd_out_itv' - rabbit_status_queue: str = 'ipd_status_queue' - - s3_endpoint: str = 'http://10.10.8.83:31006' - s3_key_id: str = 's57' - s3_access_key: str = 'd9MMinLF3U8TLSj' - s3_bucket: str = 'files' - s3_bucket_itv: str = 'itv' - - gql_url: str = 'https://gql.ivazh.ru/graphql' - gql_download: str = 'https://gql.ivazh.ru/item/{key}' - gql_schema: str = 'pdim' +config = Config() diff --git a/replication_itv/config.yaml b/replication_itv/config.yaml new file mode 100644 index 0000000..adee4c2 --- /dev/null +++ b/replication_itv/config.yaml @@ -0,0 +1,40 @@ +version: 0.0.1 +default: + REPLICATION_ITV: + ret_path: 'http://10.10.8.83:32200/' + self_bnd: 'bnd127' + enserver: 'http://10.10.8.83:32210/xmlrpc' + + pg: + host: '10.10.8.83' + port: 32101 + dbname: 'db' + username: 'postgres' + password: 'Root12345678' + + oodb: + host: '10.10.8.83' + port: 32100 + dbname: 'db' + username: 'postgres' + passwd: 'Root12345678' + schema: 'documents_src' + + amqp: + conn: 'amqp://user:password@10.10.8.83:31005/%2f' + queue: 'ipd' + incoming_queue: 'ipd_queue_replication' + out_exchange: 'ipd_out_itv' + status_queue: 'ipd_status_queue' + + s3: + endpoint: 'http://10.10.8.83:31006' + key_id: 's57' + access_key: 'd9MMinLF3U8TLSj' + bucket: 'files' + bucket_itv: 'itv' + + gql: + url: 'https://gql.ivazh.ru/graphql' + download: 'https://gql.ivazh.ru/item/{key}' + schema: 'pdim' diff --git a/replication_itv/db.py b/replication_itv/db.py index f50b9c0..ad0ac6b 100644 --- a/replication_itv/db.py +++ b/replication_itv/db.py @@ -2,7 +2,7 @@ from datetime import datetime from typing import List, Optional from sqlalchemy import create_engine, String, select, ForeignKey, Enum from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship -from .config import Config +from .config import config def tow(day: int, hour: int, minute: int): @@ -163,4 +163,4 @@ class Schemas(Base): def connect_db(): - return create_engine(f"postgresql+psycopg://{Config.pg_username}:{Config.pg_password}@{Config.pg_host}:{Config.pg_port}/{Config.pg_dbname}") + return create_engine(f"postgresql+psycopg://{config.default.REPLICATION_ITV.pg.username}:{config.default.REPLICATION_ITV.pg.password}@{config.default.REPLICATION_ITV.pg.host}:{config.default.REPLICATION_ITV.pg.port}/{config.default.REPLICATION_ITV.pg.dbname}") diff --git a/requirements.txt b/requirements.txt index 50cf277..35063aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,3 +36,4 @@ typing_extensions==4.7.1 urllib3==1.26 uvicorn==0.23.2 yarl==1.9.2 +bestconfig==1.3.6