Change config model

This commit is contained in:
ashatora
2024-06-10 08:34:39 +03:00
parent d181a9c184
commit b4bd7716af
5 changed files with 96 additions and 79 deletions

View File

@@ -19,7 +19,7 @@ import requests
import xml.etree.ElementTree as ET
from .reqs.request_xml_service import RequestXmlService
import zipfile
from .config import Config
from .config import config
from .zip import Zip
import boto3
import replication_itv.db as db
@@ -48,9 +48,9 @@ logger = logging.getLogger('xmlrpcserver')
def s3_connection():
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
aws_access_key_id=Config.s3_key_id,
aws_secret_access_key=Config.s3_access_key)
return boto3.client('s3', endpoint_url=config.default.REPLICATION_ITV.s3.endpoint,
aws_access_key_id=config.default.REPLICATION_ITV.s3.key_id,
aws_secret_access_key=config.default.REPLICATION_ITV.s3.access_key)
def download_file(key: str, bucket: str, filename: str):
@@ -153,12 +153,12 @@ def send_object_replication(bndname, ws, ids):
'Name': c1000[0]['fileName'],
'Type': 'OOD',
'metadata_version': '1',
'source': Config.self_bnd,
'source': config.default.REPLICATION_ITV.self_bnd,
'system_date': str(created_date.timestamp()),
})
xml_version = ET.SubElement(xml_objects, 'version', {
'object_id': chart['uid'].replace('-', ''),
'source': Config.self_bnd,
'source': config.default.REPLICATION_ITV.self_bnd,
'system_date': str(created_date.timestamp()),
'version': '1.0',
'version_id': chart['uid'].replace('-', ''),
@@ -169,7 +169,7 @@ def send_object_replication(bndname, ws, ids):
fp = os.path.join(directory, file['fileName'])
if not os.path.exists(directory):
os.makedirs(directory)
download_file(file['key'], Config.s3_bucket, fp)
download_file(file['key'], config.default.REPLICATION_ITV.s3.bucket, fp)
size = os.stat(fp).st_size
_crc32 = crc32(fp)
ET.SubElement(xml_version, 'file', {
@@ -206,7 +206,7 @@ def send_object_replication(bndname, ws, ids):
'value': str(chart['properties'][attribute]),
})
params = {
'from': f'tcp://{Config.self_bnd}',
'from': f'tcp://{config.default.REPLICATION_ITV.self_bnd}',
'to': f'tcp://{bndname}',
'ts_added': date.timestamp(),
'user_id': '0',
@@ -217,15 +217,17 @@ def send_object_replication(bndname, ws, ids):
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
send_response(params, response_files, Config.ret_path)
send_response(params, response_files, config.default.REPLICATION_ITV.ret_path)
def replication_old(bnd_name: str, commit_id: str, schema: str):
logger.warning('Start replication')
if schema != Config.oodb_schema:
if schema != config.default.REPLICATION_ITV.oodb.schema:
return
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port,
config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd,
schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
@@ -247,15 +249,17 @@ def replication(bnd_name: str, commit_id: str, schema: str):
rxmls.set_result(res, 0, '')
ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema})
response_params = {
'from': f'tcp://{Config.self_bnd}',
'from': f'tcp://{config.default.REPLICATION_ITV.self_bnd}',
'to': f'tcp://{bnd_name}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port,
config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd,
schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
@@ -318,18 +322,18 @@ def replication(bnd_name: str, commit_id: str, schema: str):
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
send_response(response_params, response_files, Config.ret_path)
send_response(response_params, response_files, config.default.REPLICATION_ITV.ret_path)
def pika_callback(ch, method, properties, body):
commit_info = json.loads(body)
logging.warning(commit_info)
schema = commit_info.get('schema') or Config.oodb_schema
schema = commit_info.get('schema') or config.default.REPLICATION_ITV.oodb.schema
commit = commit_info['commit']
conn = db.connect_db()
with Session(conn) as session:
for user in session.query(db.User).filter(db.User.active == True).all():
if user.bndname == Config.self_bnd:
if user.bndname == config.default.REPLICATION_ITV.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
@@ -340,9 +344,9 @@ def pika_callback(ch, method, properties, body):
def pika_task():
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_itv = connection.channel()
channel_itv.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
channel_itv.basic_consume(queue=config.default.REPLICATION_ITV.amqp.queue, on_message_callback=pika_callback)
channel_itv.start_consuming()
@@ -356,9 +360,9 @@ def status_callback(ch, method, properties, body):
def status_task():
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_status = connection.channel()
channel_status.basic_consume(queue=Config.rabbit_status_queue, on_message_callback=status_callback)
channel_status.basic_consume(queue=config.default.REPLICATION_ITV.amqp.status_queue, on_message_callback=status_callback)
channel_status.start_consuming()
@@ -381,9 +385,9 @@ def pika_itv_callback(ch, method, properties, body):
def pika_itv_task():
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_receive = connection.channel()
channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_itv_callback)
channel_receive.basic_consume(queue=config.default.REPLICATION_ITV.amqp.incoming_queue, on_message_callback=pika_itv_callback)
channel_receive.start_consuming()
@@ -391,17 +395,17 @@ def send_response(params, files, url):
files_s3 = []
for file in files:
fn = uuid4().hex
upload_file(file['url'], fn, Config.s3_bucket_itv)
file['url'] = {'name': fn, 'bucket': Config.s3_bucket_itv}
upload_file(file['url'], fn, config.default.REPLICATION_ITV.s3.bucket_itv)
file['url'] = {'name': fn, 'bucket': config.default.REPLICATION_ITV.s3.bucket_itv}
files_s3.append(file)
data = {
'params': params,
'files': files_s3,
'url': url,
}
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
connection = pika.BlockingConnection(pika.URLParameters(config.default.REPLICATION_ITV.amqp.conn))
channel_send = connection.channel()
channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='')
channel_send.basic_publish(exchange=config.default.REPLICATION_ITV.amqp.out_exchange, body=json.dumps(data), routing_key='')
def put_object(params, files, url):
@@ -409,9 +413,11 @@ def put_object(params, files, url):
req = ET.fromstring(params['query_data'])
obj = req.find('chart')
class_id = obj.get('Class')
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
con = OOConnectionParams(config.default.REPLICATION_ITV.oodb.schema, config.default.REPLICATION_ITV.oodb.host,
config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd,
config.default.REPLICATION_ITV.oodb.schema)
ws = OODBWorkspace.ws(config.default.REPLICATION_ITV.oodb.schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
@@ -439,9 +445,9 @@ def put_object(params, files, url):
key = uuid4().hex
fileVal.fileName = variantToString(item.relative_to(dir.name))
fileVal.key = variantToString(key)
fileVal.bucket = variantToString(Config.s3_bucket)
fileVal.bucket = variantToString(config.default.REPLICATION_ITV.s3.bucket)
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
upload_file(str(item), key, Config.s3_bucket)
upload_file(str(item), key, config.default.REPLICATION_ITV.s3.bucket)
ws.transaction()
res = ws.save()
@@ -466,14 +472,15 @@ def apply_commits(params, files, url):
branch, new_scheme = get_branch(bnd, scheme)
if new_scheme:
scheme = new_scheme
con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, scheme)
con = OOConnectionParams(scheme, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port,
config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, scheme)
ws = OODBWorkspace.ws(scheme)
logging.warning("Connected to schema")
if Config.ws_rabbit_params and not ws.hasProducer():
ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'],
Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'],
Config.ws_rabbit_params['password'])
if config.default.REPLICATION_ITV.get('ws_rabbit_params') and not ws.hasProducer():
ws.setProducer(config.default.REPLICATION_ITV.ws_rabbit_params.host, config.default.REPLICATION_ITV.ws_rabbit_params.port,
config.default.REPLICATION_ITV.ws_rabbit_params.exchange, config.default.REPLICATION_ITV.ws_rabbit_params.user,
config.default.REPLICATION_ITV.ws_rabbit_params.password)
ws.setCurrentUser(bnd)
logging.warning("Set branch if needed")
if branch:
@@ -504,8 +511,8 @@ def get_commit(params, files, url):
res_id = uuid4().hex
res_doc = rxmls.get_request_document(res_id, req_id)
schema = req.find('currentCommit').get('scheme')
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
@@ -531,8 +538,8 @@ def query_commits(params, files, url):
schema = commit_el.get('schema')
bnd = params['from'].replace('tcp://', '')
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
con = OOConnectionParams(schema, config.default.REPLICATION_ITV.oodb.host, config.default.REPLICATION_ITV.oodb.port, config.default.REPLICATION_ITV.oodb.dbname,
config.default.REPLICATION_ITV.oodb.username, config.default.REPLICATION_ITV.oodb.passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
@@ -548,7 +555,7 @@ def query_commits(params, files, url):
with Session(conn) as session:
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
if user.bndname == Config.self_bnd:
if user.bndname == config.default.REPLICATION_ITV.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:

View File

@@ -1,34 +1,3 @@
from bestconfig import Config
class Config:
ret_path: str = 'http://10.10.8.83:32200/'
self_bnd: str = 'bnd127'
enserver: str = 'http://10.10.8.83:32210/xmlrpc'
pg_host: str = '10.10.8.83'
pg_port: int = 32101
pg_dbname: str = 'db'
pg_username: str = 'postgres'
pg_password: str = 'Root12345678'
oodb_host: str = '10.10.8.83'
oodb_port: int = 32100
oodb_dbname: str = 'db'
oodb_username: str = 'postgres'
oodb_passwd: str = 'Root12345678'
oodb_schema: str = 'documents_src'
rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f'
rabbit_queue: str = 'ipd'
rabbit_incoming_queue: str = 'ipd_queue_replication'
rabbit_out_exchange: str = 'ipd_out_itv'
rabbit_status_queue: str = 'ipd_status_queue'
s3_endpoint: str = 'http://10.10.8.83:31006'
s3_key_id: str = 's57'
s3_access_key: str = 'd9MMinLF3U8TLSj'
s3_bucket: str = 'files'
s3_bucket_itv: str = 'itv'
gql_url: str = 'https://gql.ivazh.ru/graphql'
gql_download: str = 'https://gql.ivazh.ru/item/{key}'
gql_schema: str = 'pdim'
config = Config()

View File

@@ -0,0 +1,40 @@
version: 0.0.1
default:
REPLICATION_ITV:
ret_path: 'http://10.10.8.83:32200/'
self_bnd: 'bnd127'
enserver: 'http://10.10.8.83:32210/xmlrpc'
pg:
host: '10.10.8.83'
port: 32101
dbname: 'db'
username: 'postgres'
password: 'Root12345678'
oodb:
host: '10.10.8.83'
port: 32100
dbname: 'db'
username: 'postgres'
passwd: 'Root12345678'
schema: 'documents_src'
amqp:
conn: 'amqp://user:password@10.10.8.83:31005/%2f'
queue: 'ipd'
incoming_queue: 'ipd_queue_replication'
out_exchange: 'ipd_out_itv'
status_queue: 'ipd_status_queue'
s3:
endpoint: 'http://10.10.8.83:31006'
key_id: 's57'
access_key: 'd9MMinLF3U8TLSj'
bucket: 'files'
bucket_itv: 'itv'
gql:
url: 'https://gql.ivazh.ru/graphql'
download: 'https://gql.ivazh.ru/item/{key}'
schema: 'pdim'

View File

@@ -2,7 +2,7 @@ from datetime import datetime
from typing import List, Optional
from sqlalchemy import create_engine, String, select, ForeignKey, Enum
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship
from .config import Config
from .config import config
def tow(day: int, hour: int, minute: int):
@@ -163,4 +163,4 @@ class Schemas(Base):
def connect_db():
return create_engine(f"postgresql+psycopg://{Config.pg_username}:{Config.pg_password}@{Config.pg_host}:{Config.pg_port}/{Config.pg_dbname}")
return create_engine(f"postgresql+psycopg://{config.default.REPLICATION_ITV.pg.username}:{config.default.REPLICATION_ITV.pg.password}@{config.default.REPLICATION_ITV.pg.host}:{config.default.REPLICATION_ITV.pg.port}/{config.default.REPLICATION_ITV.pg.dbname}")

View File

@@ -36,3 +36,4 @@ typing_extensions==4.7.1
urllib3==1.26
uvicorn==0.23.2
yarl==1.9.2
bestconfig==1.3.6