This commit is contained in:
Ivan Vazhenin
2024-04-25 20:34:51 +03:00
commit 90d628be01
20 changed files with 1517 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@@ -0,0 +1,8 @@
/.idea
/venv
.pytest_cache/
dist/
build/
__pycache__/
*.pyc
*.egg-info/

119
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,119 @@
---
image: $BUILDER_IMAGE
variables:
project: "xmlrpcserver"
stages:
- init
# - lint
# - test
- build
- pack
- upload
- deploy
- deploy_qa
include:
- project: "ipd/infra"
file: ".version.yml"
- project: "ipd/infra"
file: ".python_lint.yml"
- project: "ipd/infra"
file: ".python_jobs.yml"
- project: "ipd/infra"
file: ".ansistrano-cd.yml"
#lint:
# variables:
# project: "journal"
# extends: .python_lint
# allow_failure: false
# only:
# refs:
# - merge_requests
# changes:
# - "**/*.py"
#pylint:
# variables:
# project: "journal"
# extends: .pylint
# allow_failure: false
# only:
# refs:
# - merge_requests
# changes:
# - "**/*.py"
#pytest:
# stage: test
# tags: [docker]
# image: $REGISTRY_ADDRESS/ipd/$CI_PROJECT_NAME:builder
# services:
# - name: $DOCKER_postgis
# alias: postgres
# - name: $DOCKER_queue
# alias: rabbitmq
# variables:
# ENV_FOR_DYNACONF: testing
# POSTGRES_PASSWORD: postgres
# POSTGRES_HOST: postgres
# POSTGRES__PGBOUNCER: ""
# AMQP__LOGIN: user
# AMQP__PASSWORD: user
# before_script:
# - apt install python -y
# - pip3 install -i $PYPI_REGISTRY --force-reinstall "poetry==1.4.2"
# - export PATH=$(pwd)/.venv/bin:$PATH
# - rm poetry.lock || true
# - poetry config virtualenvs.in-project true
# - poetry config repositories.nexus $PYPI_REGISTRY
# - poetry source add nexus https://nexus.sitronics-kt.dev/repository/pypi/simple/ --default
# - poetry install
# script:
# - poetry env info
# - poetry run coverage run --concurrency=thread --source=. -m pytest tests --junitxml=report.xml
# - poetry run coverage report
# - poetry run genbadge tests -i - < report.xml -o - > badge.svg
# allow_failure: false
# coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
# only:
# refs:
# - devel
# - merge_requests
# artifacts:
# when: always
# paths:
# - badge.svg
# reports:
# junit: report.xml
docker_builder_build:
extends: .builder_docker_image_dnd
make_dist:
extends: .dist_make_new
docker_build:
extends: .docker_build
get_version:
stage: upload
tags: [docker]
script:
- buildVersion=$(cat ./VERSION)
- echo "serviceVersion=$buildVersion" >> buildVersion.env
- echo $buildVersion
only:
refs:
- merge_requests
artifacts:
reports:
dotenv: buildVersion.env
#deploy_qa_autotest:
# extends: .deploy_compose_qa
# variables:
# GEODATA_TAG: test
# serviceVersion: $serviceVersion

0
README.md Normal file
View File

1
VERSION Normal file
View File

@@ -0,0 +1 @@
0.0.1

53
pyproject.toml Normal file
View File

@@ -0,0 +1,53 @@
[tool.poetry]
name = "query-itv"
version = "0.1.2"
description = ""
authors = ["Sitronics <info@sitroincs-kt.ru>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.8"
aiohttp = "3.8.6"
aiosignal = "1.3.1"
annotated-types = "0.5.0"
anyio = "3.7.1"
async-timeout = "4.0.3"
attrs = "23.1.0"
backoff = "2.2.1"
boto3 = "1.26.76"
botocore = "1.29.76"
certifi = "2023.7.22"
charset-normalizer = "3.3.2"
click = "8.1.7"
fastapi = "0.92.0"
frozenlist = "1.4.0"
gql = "3.4.1"
graphql-core = "3.2.1"
greenlet = "2.0.0"
h11 = "0.14.0"
idna = "3.4"
jmespath = "1.0.1"
multidict = "6.0.4"
pika = "1.3.2"
pika-stubs = "0.1.3"
psycopg = "3.1.10"
pydantic = "1.10.13"
pydantic_core = "2.6.3"
pygost = { path = "./xmlrpcserver/deps/pygost-5.12", develop = true }
python-dateutil = "2.8.2"
python-multipart = "0.0.6"
requests = "2.31.0"
s3transfer = "0.6.2"
six = "1.16.0"
sniffio = "1.3.0"
SQLAlchemy = "2.0.20"
typing_extensions = "4.8.0"
urllib3 = "1.26.18"
uvicorn = "0.20.0"
yarl = "1.9.2"
deb_structurer = "*"
[build-system]
requires = ["setuptools", "poetry_core>=1.0"]
build-backend = "poetry.core.masonry.api"

View File

565
replication_itv/__main__.py Normal file
View File

@@ -0,0 +1,565 @@
from tempfile import TemporaryDirectory
from typing import Optional, Any
import pika
import threading
import time
from queue import Queue
import datetime
import json
from uuid import uuid4, UUID
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
import logging
import io
import zlib
import os.path
import requests
from .reqs_graphql import get_catalog, get_object
from pygost import gost34112012256
import xml.etree.ElementTree as ET
from .reqs.request_xml_service import RequestXmlService
import zipfile
from .config import Config
from .zip import Zip
import boto3
import replication_itv.db as db
from sqlalchemy.orm import Session
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing_extensions import Annotated
import pathlib
from shutil import make_archive
NEW_REPLICATION_REQUEST = 99
NEW_COMMIT_REQUEST = 98
NEW_COMMIT_RESPONSE = 1098
NEW_DATA_REQUEST = 97
NEW_DATA_RESPONSE = 1097
tasks = Queue()
connected = set()
connection: Optional[pika.BlockingConnection] = None
channel: Optional[Any] = None
server: Optional[SimpleXMLRPCServer] = None
logger = logging.getLogger('xmlrpcserver')
def s3_connection():
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
aws_access_key_id=Config.s3_key_id,
aws_secret_access_key=Config.s3_access_key)
def download_file(key: str, bucket: str, filename: str):
client = s3_connection()
obj = client.get_object(Bucket=bucket, Key=key)
with open(f"{filename}", 'wb') as f:
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
f.write(chunk)
def upload_file(filename: str, key: str, bucket: str):
client = s3_connection()
with open(filename, 'rb') as f:
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
def get_branch(bndname: str, scheme: str):
conn = db.connect_db()
with Session(conn) as session:
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(
db.User).filter_by(bndname=bndname).one_or_none()
if item:
return item.branch, item.local_scheme
return None, None
def run_tasks():
logger.debug('Task thread started.')
while True:
task = tasks.get()
task()
def replication_task():
while True:
conn = db.connect_db()
with Session(conn) as session:
for item in session.query(db.Queue).join(db.Queue.user).all():
bndname = item.user.bndname
if bndname not in connected:
continue
if item.user.is_active_now():
if item.user.newbnd:
replication(bndname, item.commit_id, item.schema)
else:
replication_old(bndname, item.commit_id, item.schema)
session.delete(item)
session.commit()
time.sleep(60)
def process_chart(ws, chart, chart_class):
for x in chart.get('properties') or {}:
attr = ws.attribute(x)
if not attr:
continue
if str(attr.type()) == 'AT_Domain':
dom = attr.domain()
key = str(chart['properties'][x])
chart['properties'][x] = variantToString(dom.value(variantFromString(key)))
if x == 'c103':
chart['properties'][x] = 'Несекретно'
chart['class'] = chart_class['name'].split('_')[-1]
return chart
def crc32(filename, chunk_size=65536):
"""Compute the CRC-32 checksum of the contents of the given filename"""
with open(filename, "rb") as f:
checksum = 0
while chunk := f.read(chunk_size):
checksum = zlib.crc32(chunk, checksum)
return "%08X" % (checksum & 0xFFFFFFFF)
def send_object_replication(bndname, ws, ids):
qu = GroupQuery(Envelope())
query = GroupQuery(Envelope.world())
query.setUids(ids)
uids = []
ws.load(query, uids)
res = json.loads(ws.dataToJson())
charts = [process_chart(ws, f, x) for x in res for f in x['features']]
for chart in charts:
logger.warning('\n')
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
res.set('replication_package', '1')
res.set('replication_version', date.strftime('%Y%m%d%H%M%S'))
res.set('user_permit', 'AA0AA00020200726D3E75C80B713A7A3D3E75C80B713A7A363F7CB889AA3F520')
rxmls.set_result(res, 0, '')
print(chart)
properties = chart.get('properties') or {}
c1000 = properties.get('c1000')
if not c1000:
logger.warning(f'No file for {chart["uid"].replace("-", "")}')
continue
z = Zip()
xml_objects = ET.SubElement(res, 'objects')
created_date = datetime.datetime.fromisoformat(chart.get('date_created'))
xml_chart = ET.SubElement(res, 'chart', {
'Class': chart['class'],
'ID': chart['uid'].replace('-', ''),
'Name': c1000[0]['fileName'],
'Type': 'OOD',
'metadata_version': '1',
'source': Config.self_bnd,
'system_date': str(created_date.timestamp()),
})
xml_version = ET.SubElement(xml_objects, 'version', {
'object_id': chart['uid'].replace('-', ''),
'source': Config.self_bnd,
'system_date': str(created_date.timestamp()),
'version': '1.0',
'version_id': chart['uid'].replace('-', ''),
})
total_size = 0
for file in c1000:
directory = os.path.join(z.dirname, f'maps/{res_id}/ENC_ROOT/{c1000[0]["fileName"]}')
fp = os.path.join(directory, file['fileName'])
if not os.path.exists(directory):
os.makedirs(directory)
download_file(file['key'], Config.s3_bucket, fp)
size = os.stat(fp).st_size
_crc32 = crc32(fp)
ET.SubElement(xml_version, 'file', {
'cell_file': 'false',
'crc32': _crc32,
'crc32_enc': _crc32,
'file_id': file['key'].replace('-', ''),
'file_ref': os.path.join(f'maps/{res_id}/ENC_ROOT', file['fileName']),
'file_size': str(size),
'file_size_enc': str(size),
})
xml_version.set('crc32', _crc32)
xml_version.set('crc32', _crc32)
total_size += size
xml_version.set('size', str(total_size))
xml_version.set('crc32', str(total_size))
xml_tags = ET.SubElement(res, 'tags')
xml_archs = ET.SubElement(res, 'archs')
xml_arch = ET.SubElement(xml_archs, 'arch', {
'obj_id': chart['uid'].replace('-', ''),
'ver_cl': '8.31',
'ver_id': chart['uid'].replace('-', ''),
})
for attribute in properties:
if attribute.startswith('c'):
ET.SubElement(xml_chart, 'Attribute', {
'name': attribute.replace('_', '.'),
'value': str(chart['properties'][attribute]),
})
ET.SubElement(xml_arch, 'attr', {
'code': attribute.replace('_', '.'),
'name': attribute.replace('_', '.'),
'value': str(chart['properties'][attribute]),
})
params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{bndname}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
filepath = z.pack()
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
proxy = ServerProxy(Config.enserver)
try:
proxy.send(params, response_files, Config.ret_path)
except:
logger.error('Error sending')
def replication_old(bnd_name: str, commit_id: str, schema: str):
logger.warning('Start replication')
if schema != Config.oodb_schema:
return
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
created, updated, _ = ws.changes(commit_id)
ids = list(set(created) | set(updated))
send_object_replication(bnd_name, ws, ids)
ws.clearData(True)
logger.warning('Replication to old bnd is sent')
def replication(bnd_name: str, commit_id: str, schema: str):
logging.warning(f'{bnd_name} {commit_id} {schema}')
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
rxmls.set_result(res, 0, '')
ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema})
response_params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{bnd_name}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
oe = IpdExporter(ws)
# commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"}
# commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"}
# commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"}
# commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"}
z = Zip()
pc = oe.previousCommit(commit_id)
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id)
oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id)
created, updated, deleted = ws.changes(commit_id)
qu = GroupQuery(Envelope())
qu.setUids(updated)
qu.setLoadArch(True)
uids = []
ws.clearData(True)
ws.load(qu, uids)
updated_files = []
for feature_uid in uids:
not_files = True
vers = ws.featureVersion(feature_uid)
if len(vers) > 1:
vers.sort(key=vers_key)
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
if not not_files:
for attr in vers[0].feature().attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}')
ws.clearData(True)
qu = GroupQuery(Envelope())
qu.setUids(created)
ws.load(qu, uids)
exported_files = []
for feature_uid in uids:
feature = ws.featureByUid(feature_uid)
if not feature:
continue
for attr in feature.attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# updated_files.append(feature_uid)
for x in updated_files:
exported_files.append({
'key': x.key,
'bucket': x.bucket,
'filename': x.fileName,
})
fp = os.path.join(z.dirname, x.key)
os.makedirs(os.path.dirname(fp), exist_ok=True)
download_file(x.key, x.bucket, fp)
with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f:
f.write(json.dumps(exported_files))
ws.clearData(True)
# ws.close()
filepath = z.pack()
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
proxy = ServerProxy(Config.enserver)
try:
proxy.send(response_params, response_files, Config.ret_path)
except:
logger.error('Error sending')
def pika_callback(ch, method, properties, body):
commit_info = json.loads(body)
logging.warning(commit_info)
schema = commit_info.get('schema') or Config.oodb_schema
commit = commit_info['commit']
conn = db.connect_db()
with Session(conn) as session:
for user in session.query(db.User).filter(db.User.active == True).all():
if user.bndname == Config.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
session.add(item)
session.commit()
ch.basic_ack(delivery_tag=method.delivery_tag)
def pika_task():
global connection
global channel
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel = connection.channel()
channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
channel.start_consuming()
def put_object(params, files, url):
date = datetime.datetime.now()
req = ET.fromstring(params['query_data'])
obj = req.find('chart')
class_id = obj.get('Class')
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
logging.info(class_id)
fc = ws.featureClass(class_id)
feature = fc.createFeature('')
geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011()))
res = feature.setGeometry(geom)
for attr in obj.findall('Attribute'):
name = attr.get('name')
value = attr.get('value')
res &= feature.addAttribute(name, variantFromString(value))
assert len(files) == 1
file = files[0]
dir = TemporaryDirectory()
with zipfile.ZipFile(file, 'r') as zip_ref:
zip_ref.extractall(dir.name)
fp = pathlib.Path(dir.name)
for item in fp.glob('**/*'):
if not item.is_file():
continue
fileVal = FileValue()
key = uuid4().hex
fileVal.fileName = variantToString(item.relative_to(dir.name))
fileVal.key = variantToString(key)
fileVal.bucket = variantToString(Config.s3_bucket)
res &= feature.addAttribute('c1000', variantFromFileValue(fileVal))
upload_file(str(item), key, Config.s3_bucket)
ws.transaction()
res = ws.save()
ws.commit(f'Putobject from {params["to"]}')
ws.clearData(True)
def apply_commits(params, files, url):
logging.warning("Apply commits")
logging.warning(params)
assert len(files) == 1
file = files[0]
dir = TemporaryDirectory()
r = requests.get(file['url'])
with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref:
zip_ref.extractall(dir.name)
req = ET.fromstring(params['query_data'])
repl = req.find('replication')
scheme = repl.get('scheme')
commit = repl.get('id')
bnd = params['from'].replace('tcp://', '')
branch, new_scheme = get_branch(bnd, scheme)
if new_scheme:
scheme = new_scheme
con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, scheme)
ws = OODBWorkspace.ws(scheme)
logging.warning("Connected to schema")
if Config.ws_rabbit_params and not ws.hasProducer():
ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'],
Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'],
Config.ws_rabbit_params['password'])
ws.setCurrentUser(bnd)
logging.warning("Set branch if needed")
if branch:
logging.warning(branch)
ws.switchBranch(branch)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
oe = IpdExporter(ws)
logging.warning("Importing...")
if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')):
logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}')
else:
logging.warning(f'Importing commit {commit} finished successfully')
with open(os.path.join(dir.name, 'export_files.json'), 'r') as f:
files_data = json.load(f)
for file_data in files_data:
upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket'])
logging.warning("Finished import")
ws.clearData(True)
def get_commit(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res_doc = rxmls.get_request_document(res_id, req_id)
schema = req.find('currentCommit').get('scheme')
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema})
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': 0,
'query_type': NEW_COMMIT_RESPONSE,
'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True)
}
proxy = ServerProxy(url)
proxy.send(response_params, [], Config.ret_path)
def query_commits(params, files, url):
req = ET.fromstring(params['query_data'])
commit_el = req.find('commit')
commit_id = commit_el.get('id')
schema = commit_el.get('schema')
bnd = params['from'].replace('tcp://', '')
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
schema_commits = ws.commits(ws.branch())
logger.warning(schema_commits)
if commit_id not in schema_commits:
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
return
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
conn = db.connect_db()
with Session(conn) as session:
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
if user.bndname == Config.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
logging.warning(item)
session.add(item)
session.commit()
def run_task(query_type, params, files, url):
if query_type == NEW_REPLICATION_REQUEST:
tasks.put(lambda: apply_commits(params, files, url))
if query_type == NEW_COMMIT_REQUEST:
tasks.put(lambda: get_commit(params, files, url))
if query_type == NEW_COMMIT_RESPONSE:
tasks.put(lambda: query_commits(params, files, url))
def main():
global connection
global server
global channel
logger.setLevel(logging.INFO)
logger.warning('Use Control-C to exit')
thread = threading.Thread(target=run_tasks)
thread.start()
replication_thread = threading.Thread(target=replication_task)
replication_thread.start()
pika_thread = threading.Thread(target=pika_task)
pika_thread.start()
try:
logger.warning('Start server')
uvicorn.run(app, host="0.0.0.0", port=8000)
except KeyboardInterrupt:
logger.warning('Exiting')
finally:
server.server_close()
def vers_key(e):
return e.version()
if __name__ == '__main__':
main()

39
replication_itv/config.py Normal file
View File

@@ -0,0 +1,39 @@
class Config:
ret_path: str = 'http://10.10.8.83:32200/'
self_bnd: str = 'bnd127'
enserver: str = 'http://10.10.8.83:32210/xmlrpc'
remote_bnd: str = 'bnd128'
pg_host: str = '10.10.8.83'
pg_port: int = 32101
pg_dbname: str = 'db'
pg_username: str = 'postgres'
pg_password: str = 'Root12345678'
oodb_host: str = '10.10.8.83'
oodb_port: int = 32100
oodb_dbname: str = 'db'
oodb_username: str = 'postgres'
oodb_passwd: str = 'Root12345678'
oodb_schema: str = 'documents_src'
rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f'
rabbit_queue: str = 'ipd'
ws_rabbit_params: dict = {
'host': '10.10.8.83',
'port': 31005,
'exchange': 'ipd',
'user': 'user',
'password': 'password',
}
s3_endpoint: str = 'http://10.10.8.83:31006'
s3_key_id: str = 's57'
s3_access_key: str = 'd9MMinLF3U8TLSj'
s3_bucket: str = 'files'
gql_url: str = 'https://gql.ivazh.ru/graphql'
gql_download: str = 'https://gql.ivazh.ru/item/{key}'
gql_schema: str = 'pdim'

166
replication_itv/db.py Normal file
View File

@@ -0,0 +1,166 @@
from datetime import datetime
from typing import List, Optional
from sqlalchemy import create_engine, String, select, ForeignKey, Enum
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship
from config import Config
def tow(day: int, hour: int, minute: int):
return minute + hour * 60 + day * 60 * 24
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str]
passwd: Mapped[str]
bndname: Mapped[str]
newbnd: Mapped[bool]
active: Mapped[bool]
upstream: Mapped[bool]
profiles: Mapped[List['Profile']] = relationship(
back_populates='user', cascade='all, delete-orphan'
)
income_branches: Mapped[List['IncomeBranch']] = relationship(
back_populates='user', cascade='all, delete-orphan'
)
schedule: Mapped[List['Schedule']] = relationship(
back_populates='user', cascade='all, delete-orphan'
)
queue: Mapped[List['Queue']] = relationship(
back_populates='user', cascade='all, delete-orphan'
)
def __repr__(self) -> str:
return f'User(id={self.id!r}, username={self.username!r}, password={self.passwd!r}, newbnd={self.newbnd})'
def to_dict(self) -> dict:
return {
'id': self.id,
'username': self.username,
'bndname': self.bndname,
'newbnd': self.newbnd,
'active': self.active,
'upstream': self.upstream,
'profiles': [x.to_dict() for x in self.profiles],
'schedule': [x.to_dict() for x in self.schedule],
'queue': [x.to_dict() for x in self.queue],
'income_branches': [x.to_dict() for x in self.income_branches],
}
def is_active_now(self):
if not len(self.schedule):
return True
dt = datetime.now()
curr_tow = tow(dt.weekday(), dt.hour, dt.minute)
for x in self.schedule:
if (tow(x.day_start, x.hour_start, x.minute_start) <= curr_tow
<= tow(x.day_end, x.hour_end, x.minute_end)):
return True
return False
class Profile(Base):
__tablename__ = 'profiles'
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
scheme: Mapped[str]
branch: Mapped[str] = mapped_column(String, nullable=True)
json: Mapped[str] = mapped_column(String, nullable=True)
no_files: Mapped[bool]
user: Mapped['User'] = relationship(back_populates='profiles')
def to_dict(self) -> dict:
return {
'id': self.id,
'scheme': self.scheme,
'branch': self.branch,
'json': self.json,
}
class IncomeBranch(Base):
__tablename__ = 'income_branches'
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
scheme: Mapped[str]
branch: Mapped[str]
local_scheme: Mapped[str]
user: Mapped['User'] = relationship(back_populates='income_branches')
def to_dict(self) -> dict:
return {
'id': self.id,
'scheme': self.scheme,
'branch': self.branch,
'local_scheme': self.local_scheme,
}
class Schedule(Base):
__tablename__ = 'schedule'
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
day_start: Mapped[int]
hour_start: Mapped[int]
minute_start: Mapped[int]
day_end: Mapped[int]
hour_end: Mapped[int]
minute_end: Mapped[int]
user: Mapped['User'] = relationship(back_populates='schedule')
def to_dict(self) -> dict:
return {
'id': self.id,
'day_start': self.day_start,
'hour_start': self.hour_start,
'minute_start': self.minute_start,
'day_end': self.day_end,
'hour_end': self.hour_end,
'minute_end': self.minute_end,
}
class Queue(Base):
__tablename__ = 'queue'
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
commit_id: Mapped[str]
schema: Mapped[str]
user: Mapped['User'] = relationship(back_populates='queue')
def to_dict(self) -> dict:
return {
'id': self.id,
'commit_id': self.commit_id,
'schema': self.schema,
}
class Schemas(Base):
__tablename__ = 'schemas'
id: Mapped[int] = mapped_column(primary_key=True)
schema: Mapped[str]
schema_type: Mapped[str]
def connect_db():
return create_engine(f"postgresql+psycopg://{Config.pg_username}:{Config.pg_password}@{Config.pg_host}:{Config.pg_port}/{Config.pg_dbname}")

View File

View File

@@ -0,0 +1,7 @@
import graphql
CLASSIFIER_VERSION = '8.23'
class Classifier:
def get_classifier(self):

View File

@@ -0,0 +1,71 @@
from datetime import datetime, timedelta
from enumerations import ReplicationPackageStatusEnum
from models import CorrectingReplicationOutPackage, ReplicantInfo
from typing import List
import xml.etree.ElementTree as ET
class CorrectingReplicationService:
EVENT_TYPE: str = 'correcting_replication'
object_list_size: int = 0
deleted_objects_count: int = 0
PACKAGE_TAG_NAME: str = 'package'
PACKAGE_ID_ATTRIBUTE_NAME: str = 'pkg_id'
SERVER_VERSION_ATTRIBUTE_NAME: str = 'server_version'
SERVER_ID_ATTRIBUTE_NAME: str = 'server_id'
OBJECTS_TAG_NAME: str = 'objects'
OBJECT_TAG_NAME: str = 'object'
OBJECT_ID_ATTRIBUTE_NAME: str = 'id'
IS_RSC_OBJECT: str = 'is_rsc'
OBJECT_SOURCE_ATTRIBUTE_NAME: str = 'source'
OBJECT_VERSION_ATTRIBUTE_NAME: str = 'version'
METADATA_VERSION_ATTRIBUTE_NAME: str = 'md_version'
ACTUAL_VERSION_DATE_ATTRIBUTE_NAME: str = 'version_date'
OBJECT_MISSING_ATTRIBUTE_NAME: str = 'missing'
CLASSIFIER_VERSION_ATTRIBUTE_NAME: str = 'metadata_clientVersion'
RESULT_TAG_NAME: str = 'result'
RESULT_MESSAGE_ATTRIBUTE_NAME: str = 'result_message'
VERSION_ID_ATTRIBUTE_NAME: str = 'version_id'
VERSION_SOURCE_ATTRIBUTE_NAME: str = 'version_source'
def start_correcting_replication(self, replicant_id: str, replication_period: int, replication_timeout: int,
manual_run: bool):
replicant = ReplicantInfo()
self.start_correcting_replication(replicant, replication_period, replication_timeout, manual_run)
def start_correcting_replication(self, except_replicant_id: List[str], replication_period: int,
replication_timeout: int, manual_run: bool):
pass
def start_correcting_replication(self, replicant: ReplicantInfo, replication_period: int, replication_timeout: int,
manual_run: bool):
pass
def create_replication_package(self, replicant: ReplicantInfo, replication_period: int, replication_timeout: int) -> CorrectingReplicationOutPackage:
pkg = CorrectingReplicationOutPackage()
pkg.filter_string = replicant.filter_string
pkg.replicant_id = replicant.replicant_id
pkg.server_address = replicant.server_address
pkg.replication_period = replication_period
pkg.replication_timeout = replication_timeout
pkg.status = ReplicationPackageStatusEnum.SENT_REQUEST_TO_CHILD
date = datetime.now()
pkg.replication_date = date
if replication_period:
delta = timedelta(days=-replication_period)
pkg.from_date = date + delta
if replication_timeout:
delta = timedelta(seconds=replication_timeout)
pkg.replication_timeout_date = date + delta
return pkg
def get_package_xml(self, pkg: CorrectingReplicationOutPackage, request_uuid: str) -> str:
request_xml: ET.Element = xml_service.get_common_request_xml(request_uuid)
element = ET.SubElement(request_xml, self.PACKAGE_TAG_NAME, {
self.PACKAGE_ID_ATTRIBUTE_NAME: pkg.package_id,
self.SERVER_VERSION_ATTRIBUTE_NAME: '1.0.0',
self.SERVER_ID_ATTRIBUTE_NAME: 'ipd-server',
self.CLASSIFIER_VERSION_ATTRIBUTE_NAME: classifier.,
})
xml_service.set_request_uuid(request_xml, request_uuid)
return ET.tostring(request_xml, encoding='unicode', xml_declaration=True)

View File

@@ -0,0 +1,10 @@
from enum import Enum
class ReplicationPackageStatusEnum(Enum):
CANCELLED = -4
TIMEDOUT = -3
ERROR = -2
SENT_REQUEST_TO_CHILD = 1
SENT_PACKAGE_TO_CHILD = 2
COMPLETED = 3

View File

@@ -0,0 +1,233 @@
from enum import Enum
class Messages(Enum):
REQUEST_EXECUTED_PARTITIALY = 1
REQUEST_FAILED = 2
OBJECT_FORBIDDEN = 3
INVALID_REQUEST_USER = 4
ACTION_FORBIDDEN = 5
OBJECTS_FORBIDDEN = 6
INVALID_REPLICATION_PARAMETERS = 7
OBJECT_LIST = 11
OBJECT_LIST_ITEM = 12
MISSING_TAG = 13
NO_OBJECTS_AVAILABLE = 14
OBJECT_BASE_CELL_NOT_FOUND = 15
WRONG_VERSION_INTERVAL = 16
REQUEST_PROCESS_STARTED = 17
REQUEST_PROCESS_FINISHED = 18
USER_IS_NOT_VALID = 19
RESPONSE_DELIVERY_ERROR = 20
RESPONSE_DELIVERY_SUCCESS = 21
RESPONSE_SEND = 22
RESPONSE_SEND_ERROR = 23
DUPLICATE_CHART_NAME = 24
EXTENDED_METADATA_FORBIDDEN = 25
MAX_RESULT_COUNT_EXCEEDED = 26
MAX_RESULT_SIZE_EXCEEDED = 27
CLIENT_DISCONNECTED = 28
NO_REQUEST_DATA_PROVIDED = 29
NO_USER_ID_PROVIDER = 30
INVALID_REQUEST_DATA = 31
UNKNOWN_FILTER_ATTRIBUTE_CODE = 32
UNKNOWN_FILTER_OPERATION = 33
ARCHIVE_ACCESS_FORBIDDEN = 34
BROKEN_S57_FILE = 35
UNKNOWN_OBJECT_ID = 36
NO_FILTER_IN_REPORT = 37
UPLOAD_OBJECT_LIST = 100
UPLOAD_OBJECT_LIST_ITEM = 101
UPLOAD_WRONG_STRUCTURE = 102
FOLDER_NOT_FOUND = 103
UPLOAD_WRONG_SUBVERSION = 104
UPLOAD_MISSING_CHARTS_FILE = 105
UPLOAD_NO_CHARTS_IN_FILE = 106
UPLOAD_WRONG_CHART_TYPE = 107
UPLOAD_OBJECT_EXISTS = 108
UPLOAD_BASE_CELL_NOT_FOUND = 109
UPLOAD_WRONG_VERSION_SEQUENCE = 110
WRONG_DELETE_VERSION_NUMBER = 111
CANT_CHANGE_STAMP = 112
OBJECT_ALREADY_UPDATED = 113
OBJECT_LOCKED = 114
UPLOAD_MAIN_FILE_NOT_FOUND = 115
UPLOAD_VERSION_NUMBER_NOT_THE_SAME = 116
FROM_ADDRESS = 117
TO_ADDRESS = 118
UPLOAD_OBJECT_EXISTS_BY_SIZE_AND_CHECKSUM = 119
CORRECTING_REPLICATION_TIMED_OUT = 120
CORRECTING_REPLICATION_AUTO_STARTED = 121
CORRECTING_REPLICATION_MANUAL_STARTED = 122
CORRECTING_REPLICATION_ALREADY_STARTED = 123
CORRECTING_REPLICATION_CANCELLED = 124
REPLICATION_SENT = 125
REPLICATION_RECEIVED = 126
WRONG_OBJECT_FILE_CHECKSUM = 127
WRONG_UPLOADING_FILE_CHECKSUM = 128
WRONG_UPLOADING_FILE_SIZE = 129
CORRECTING_REPLICATION_INFO = 130
CORRECTING_REPLICATION_CLASSIFIER = 131
CORRECTING_REPLICATION_LIST_OBJECT = 132
GET_CORRECTING_REPLICATION_PACKAGE = 133
CORRECTING_REPLICATION_SIZE_LIMIT = 134
PACKAGE_INFO = 135
CORRECTING_REPLICATION_SIZE_ARCHIVE = 136
SET_CORRECTING_REPLICATION_PACKAGE = 137
CHECK_DUPLICATE = 138
CHECK_TAG = 139
DELETED_DUPLICATE = 140
STREAMING_REPLICATION_STARTED = 141
ERROR_PACKAGE_NUM = 142
STREAMING_REPLICATION_FINISHED = 143
DELETE_VERSION_STARTED = 144
DELETE_VERSION_FINISHED = 145
UPLOAD_OBJECTS_EXISTS_DIFF_CHECKSUM = 146
UPLOAD_OBJECTS_WITH_LESS_VERSION = 147
CORRECTING_REPLICATION_OBJECT_COUNTS = 148
PACKAGE_INFO_FAILED_OBJECTS = 149
PACKAGE_INFO_INCOMING_OBJECTS = 150
OBJECT_INFO_DIDNT_ADD = 151
TAG_NOT_FOUND = 200
MISSING_ATTRIBUTE = 201
DUPLICATE_TAG_ID = 202
CANT_HIDE_TAG = 203
CANT_MOVE_TAG = 204
CANT_HIDE_REGION = 205
CANT_HIDE_REGIONS = 206
CANT_HIDE_CHILD_REGION = 207
CANT_HIDE_CHILD_REGIONS = 208
WRONG_MID = 300
MAUNFACTURER_KEY_NOT_FOUND = 301
OBJECT_NOT_FOUND = 302
OBJECT_REQUESTED_FROM_PARENT_SERVER = 303
OBJECT_HAS_NO_FILES = 304
WRONG_EXPIRATION_PERIOD = 400
OBJECT_LIST_HAS_DUBLICATES = 401
ACTIVE_SUBSCRIPTION_ALREADY_EXISTS = 402
ACTIVE_SUBSCRIPTION_NOT_EXISTS = 403
WRONG_TYPE_ALL_CHARTS = 500
NO_CLASSIFIER_FILE_ON_SERVER = 600
NO_CLASSIFIER_VERSION_ON_SERVER = 601
CLASSIFIERS_VERSIONS_DIFFER = 602
WRONG_OBJECT_PARAMETER_CODE = 603
ILLEGAL_PARAMETER_VALUE = 604
ILLEGAL_PARAMETER_TYPE = 605
ILLEGAL_PARAMETER_RANGE = 606
NOT_ALL_REQUIRED_TYPES = 607
ILLEGAL_ATTRIBUTE_VALUE = 608
NOT_ALLOWED_ITEM_FOR_TYPE = 609
NO_CHART_CLASS_WITH_CODE = 610
CHART_CLASS_CONTAINS_NONEXISTED_ELEMENT = 611
CLASS_GROUP_CONTAINS_NONEXISTED_CLASS = 612
SAME_CLASSIFIER_VERSION = 614
NO_CLASSIFIER_VERSION = 615
UNKNOWN_ATTR_VAL_DATA_TYPE = 616
NOT_ALL_USING_ITEMS_IN_CLASSIFIER = 617
NO_REQIRED_CLASS_CODE_IN_CLASSIFIER = 618
REQUIRED_ADDED_TO_CHART_CLASS = 619
SET_CLASSIFIER_STARTED_AUDIT = 620
DATA_VALIDATION_STARTED_AUDIT = 621
DATA_VALIDATION_FAILED_AUDIT = 622
DATA_VALIDATION_ENDED_AUDIT = 623
SAVING_CLASSIFIER_STARTED_AUDIT = 624
SAVING_CLASSIFIER_FINISHED_AUDIT = 625
SEND_SET_CLASSIFIER_REQUEST_AUDIT = 626
GET_SET_CLASSIFIER_REQUEST_AUDIT = 627
GET_GET_CLASSIFIER_REQUEST_AUDIT = 628
IKP_USER_DECISION_TYPE = 700
IKP_FILE_STRUCTURE_CHECKER_TYPE = 701
IKP_FILE_STRUCTURE_DB_LINKS_CHECKER_TYPE = 702
IKP_DB_LINKS_FILE_STRUCTURE_CHECKER_TYPE = 703
IKP_CHECK_SUM_CHECKER_TYPE = 704
IKP_BACKUP_TO_DB_TYPE = 705
IKP_RECOVER_FROM_DB_TYPE = 706
IKP_RECOVER_FROM_BACKUP_SERVER_TYPE = 707
IKP_RECOVER_FROM_CHILD_SERVER_TYPE = 708
IKP_DB_STRUCTURE_CHECKER_TYPE = 709
IKP_DB_STRUCTURE_MISSING_COLUMN = 710
IKP_DB_STRUCTURE_WRONG_COLUMN_TYPE = 711
IKP_DB_STRUCTURE_WRONT_MAX_LENGTH = 712
IKP_DB_STRUCTURE_MISSING_TABLE = 713
IKP_NOTHING_TO_RECOVER = 714
IKP_ERROR_NOT_FIXED = 715
IKP_FULL_BACKUP_TO_DB_TYPE = 716
EXPORT_TO_FOLDER_START = 717
EXPORT_TO_FOLDER_FINISHED = 718
EXPORT_TO_FOLDER_ERROR = 719
EXPORT_TO_FOLDER_DELETE_FILE = 720
IMPORT_FROM_FOLDER_START = 721
IMPORT_FROM_FOLDER_FINISHED = 722
IMPORT_FROM_FOLDER_ERROR = 723
IMPORT_FOLDER_NOT_FOUND = 724
FILE_NOT_FOUND = 725
IMPORT_FROM_FOLDER_MERGE_ERROR = 726
FILE_NOT_FOUND_COUNTER = 727
LAST_CORRECTION_DATE_NEWER_THAN_NEW_CORRECTION = 728
CREATION_DATE_MISSING = 729
IMPORT_FROM_FOLDER_DIDNT_START = 730
EXPORT_TO_FOLDER_DIDNT_START = 731
IKP_STEP_START = 800
IKP_STEP_SUCCESS = 801
IKP_STEP_ERROR = 802
IKP_NO_CHECK_ERROR_FOUND = 803
IKP_SEND_USER_DECISION = 804
IKP_GET_USER_DECISION = 805
IKP_FILE_STRUCTURE_ERROR = 806
IKP_FILE_STRUCTURE_DB_LINKS_ERROR = 807
IKP_CHECK_SUM_ERROR = 808
IKP_DB_LINKS_FILE_STRUCTURE_ERROR = 809
IKP_DB_STRUCTURE_TABLE_COLUMN_ERROR = 810
IKP_DB_STRUCTURE_TABLE_ERROR = 811
IKP_STEP_STOPPED = 812
IKP_WRONG_FILE_ERROR = 813
IKP_TASK_RUN_COLLISION = 814
IKP_FIX_SXF_METADATA_CHECKSUM_INFO = 815
IKP_FIX_SXF_METADATA_CHECKSUM_PASSPORT_ERROR = 816
IKP_FIX_SXF_METADATA_CHECKSUM_FILE_ERROR = 817
IKP_MD_RECOVERING_STARTED = 818
IKP_MD_RECOVERING_NO_VERSION = 819
IKP_MD_RECOVERING_CREATED = 820
IKP_MD_RECOVERING_UPDATED = 821
IKP_MD_RECOVERING_SUCCEED = 822
IKP_MD_RECOVERING_FAILED = 823
IKP_CHECK_AVAILABLE_DISK_SPACE_TYPE = 825
IKP_SCRIPT_RUN = 827
IKP_SCRIPT_END_WORK_SUCCEED = 828
IKP_SCRIPT_END_WORK_FAILED = 829
IKP_SCRIPT_ALREADY_STARTED = 831
IKP_MERGE_SAME_OBJECTS_INFO = 832
IKP_MERGE_SAME_OBJECTS_SUCCESSES = 833
IKP_MERGE_SAME_OBJECTS_WARNING = 834
IKP_MERGE_SAME_OBJECTS_ERROR = 835
IKP_MERGE_SAME_OBJECTS_MERGE_TYPE_VERSIONS = 836
IKP_MERGE_SAME_OBJECTS_MERGE_TYPE_DATES = 837
RSC_CLASSIFIER_UPLOAD_STARTED = 838
RSC_CLASSIFIER_UPLOAD_ERROR = 839
RSC_CLASSIFIER_UPLOAD_FINISHED = 843
IKP_CLEAR_OUT_FOLDER_INFO = 840
IKP_CLEAR_OUT_FOLDER_SUCCESS_DELETE = 841
IKP_CLEAR_OUT_FOLDER_ERROR_DELETE = 842
SET_CHART_METADATA = 844
ERROR_WHILE_COPING_FILE = 845
ERROR_ADD_OBJECT = 848
MISSING_REGIONS_CHARTS = 849
NO_MISSING_REGIONS = 850
IKP_CLEAR_DOWNLOAD_CATALOG_WARNING = 855
IKP_CLEAR_DOWNLOAD_CATALOG_ERROR = 856
IKP_CLEAR_DOWNLOAD_CATALOG_INFO = 857
REPLICATION_PACKAGE_NOT_FOUND = 858
REPLICATION_PACKAGE_WRONG_STATUS = 859
NOT_UNIQUE_METADATA = 860
IKP_SCALE_RECOVERING_STARTED = 861
IKP_SCALE_RECOVERING_FAILED = 862
IKP_SCALE_RECOVERING_FINISHED = 863
OBJECTS_REGIONS_CHECKING_STARTED = 865
OBJECTS_REGIONS_CHECKING_FINISHED = 866
OBJECTS_REGIONS_CHECKING_FAILED = 867
BLACK_HOLE_TRANSMITTER_IS_OFF = 868
LACK_VERSIONS_IN_TIME_PERIOD = 869
ERROR_STOPPING_IKP = 870
IKP_STOPPED = 871
REPLICATION_ERROR_TEST = 872
BLACK_HOLE_NOT_PRESENT = 873

View File

@@ -0,0 +1,23 @@
from datetime import datetime
class ReplicantInfo:
server_address: str
filter_string: str
replicant_id: str
replicant_user_id: str
replicate_classifiers: bool
class CorrectingReplicationOutPackage:
package_id: int
status: int
from_date: datetime
replication_date: datetime
replication_timeout_date: datetime
end_date: datetime
replicant_id: str
server_address: str
filter_string: str
replication_period: int
replication_timeout: int

View File

@@ -0,0 +1,4 @@
class RequestProcessor:
def __init__(self):
pass

View File

@@ -0,0 +1,109 @@
import xml.etree.ElementTree as ET
from replication_itv.reqs.messages import Messages
class RequestXmlService:
REQUEST_NODE_NAME: str = 'request'
HEADER_NODE_NAME: str = 'header'
RESULT_NODE_NAME: str = 'result'
REQUEST_ID_ATTRIBUTE_NAME: str = 'parcel_id'
RESPONSE_ID_ATTRIBUTE_NAME: str = 'reply_parcel_id'
SOURCE_REQUEST_ID_ATTRIBUTE_NAME: str = 'source_parcel_id'
RESULT_CODE_ATTRIBUTE_NAME: str = 'result_code'
RESULT_MESSAGE_ATTRIBUTE_NAME: str = 'result_message'
HEADER_XPATH: str = HEADER_NODE_NAME
def get_request_uuid(self, document: ET.Element) -> str:
return self.get_header_attribute(document, self.REQUEST_ID_ATTRIBUTE_NAME)
def get_response_uuid(self, document: ET.Element) -> str:
return self.get_header_attribute(document, self.RESPONSE_ID_ATTRIBUTE_NAME)
def get_source_request_uuid(self, document: ET.Element) -> str:
return self.get_header_attribute(document, self.SOURCE_REQUEST_ID_ATTRIBUTE_NAME)
def get_header_node(self, document: ET.Element) -> ET.Element:
return document.find(self.HEADER_XPATH)
def get_header_attribute(self, document: ET.Element, attribute_name: str) -> str:
header = self.get_header_node(document)
if header is None:
raise Exception(Messages.MISSING_TAG.value)
result = header.get(attribute_name)
return result.strip() if result else ''
def get_request_document(self, request_uuid: str, response_uuid) -> ET.Element:
document = ET.Element(self.REQUEST_NODE_NAME)
header = ET.SubElement(document, self.HEADER_NODE_NAME)
header.set(self.REQUEST_ID_ATTRIBUTE_NAME, request_uuid.strip())
if response_uuid:
header.set(self.RESPONSE_ID_ATTRIBUTE_NAME, response_uuid.strip())
return document
def get_common_response_xml(self, request_uuid: str, response_uuid: str, result_code: int, result_message: str) -> ET.Element:
document = self.get_request_document(response_uuid, request_uuid)
result = ET.SubElement(document, self.RESULT_NODE_NAME)
result.set(self.RESULT_CODE_ATTRIBUTE_NAME, str(result_code))
result.set(self.RESULT_MESSAGE_ATTRIBUTE_NAME, result_message)
return document
def set_result(self, document: ET.Element, result_code: int, result_message: str):
result = document.find(self.RESULT_NODE_NAME)
if result is None:
result = ET.SubElement(document, self.RESULT_NODE_NAME)
result.set(self.RESULT_CODE_ATTRIBUTE_NAME, str(result_code))
result.set(self.RESULT_MESSAGE_ATTRIBUTE_NAME, result_message)
def set_request_uuid(self, document: ET.Element, request_uuid: str):
header = self.get_header_node(document)
header.set(self.REQUEST_ID_ATTRIBUTE_NAME, request_uuid.strip())
def set_source_request_uuid(self, document: ET.Element, source_request_uuid: str):
header = self.get_header_node(document)
header.set(self.SOURCE_REQUEST_ID_ATTRIBUTE_NAME, source_request_uuid.strip())
def get_common_request_xml(self, request_uuid: str) -> ET.Element:
return self.get_request_document(request_uuid, None)
def get_response_xml(self, document: ET.Element, request_uuid, response_uuid: str, result_code: int, result_message: str) -> ET.Element:
header = document.find(self.HEADER_NODE_NAME)
header.set(self.REQUEST_ID_ATTRIBUTE_NAME, response_uuid.strip())
header.set(self.RESPONSE_ID_ATTRIBUTE_NAME, request_uuid.strip() if request_uuid else None)
result = document.find(self.RESULT_NODE_NAME)
result.set(self.RESULT_CODE_ATTRIBUTE_NAME, str(result_code))
result.set(self.RESULT_MESSAGE_ATTRIBUTE_NAME, result_message)
def get_element(self, document: ET.Element, node: ET.Element, name: str) -> ET.Element:
for child in node:
if child.tag.casefold() == name.casefold():
return child
return ET.SubElement(document, name)
def validate_common_request_xml(self, xml: str):
if not xml:
raise Exception(Messages.NO_REQUEST_DATA_PROVIDED.name)
try:
document = ET.parse(xml)
except:
raise Exception(Messages.INVALID_REQUEST_DATA)
self.validate_common_request_xml(document)
def validate_common_request_xml(self, document):
if not document:
raise Exception(Messages.NO_REQUEST_DATA_PROVIDED.name)
if document.tag != self.REQUEST_NODE_NAME:
raise Exception(Messages.MISSING_TAG.name, self.REQUEST_NODE_NAME)
try:
request_uuid = self.get_header_attribute(document, self.REQUEST_ID_ATTRIBUTE_NAME)
except:
raise Exception(Messages.INVALID_REQUEST_DATA)
if not request_uuid:
raise Exception(Messages.MISSING_ATTRIBUTE, self.REQUEST_ID_ATTRIBUTE_NAME, self.HEADER_NODE_NAME)
def get_result_code(self, document: ET.Element):
result = self.get_element(document, document, self.RESULT_NODE_NAME)
if not result:
return None
return result.get(self.RESULT_CODE_ATTRIBUTE_NAME)

View File

@@ -0,0 +1,46 @@
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from .config import Config
transport = AIOHTTPTransport(url=Config.gql_url)
service = Config.gql_schema
def get_classifier():
client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None)
query = gql(
"""
query getClassifier($name: String!) {
getClassifier(name: $name)
}
"""
)
result = client.execute(query, variable_values={"name": service}, )
return result['getClassifier']
def get_catalog():
client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None)
query = gql(
"""
query getCatalog($name: String!) {
getCatalog(name: $name)
}
"""
)
result = client.execute(query, variable_values={"name": service})
return result['getCatalog']
def get_object(oid: str):
client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None)
query = gql(
"""
query getObjects($oid: String!, $name: String!) {
getObject(name: $name, oid: $oid)
}
"""
)
params = {'oid': oid, 'name': service}
result = client.execute(query, variable_values=params)
return result['getObject']

23
replication_itv/zip.py Normal file
View File

@@ -0,0 +1,23 @@
import os
from tempfile import TemporaryDirectory, mktemp
from zipfile import ZipFile
class Zip:
def __init__(self):
self.dir = TemporaryDirectory()
@property
def dirname(self):
return self.dir.name
def pack(self):
tmp_zip = mktemp(suffix='.zip')
with ZipFile(tmp_zip, 'w') as zip_object:
for folder_name, sub_folders, file_names in os.walk(self.dir.name):
for filename in file_names:
file_path = os.path.join(folder_name, filename)
rel_file_path = os.path.relpath(file_path, self.dir.name)
zip_object.write(file_path, rel_file_path)
return tmp_zip

40
requirements.txt Normal file
View File

@@ -0,0 +1,40 @@
--find-links=deps
aiohttp==3.8.4
aiosignal==1.3.1
annotated-types==0.5.0
anyio==3.7.1
async-timeout==4.0.3
attrs==23.1.0
backoff==2.2.1
boto3==1.28.37
botocore==1.31.40
certifi==2023.7.22
charset-normalizer==3.2.0
click==8.1.7
fastapi==0.103.1
frozenlist==1.4.0
gql==3.5.0b5
graphql-core==3.3.0a3
greenlet==2.0.2
h11==0.14.0
idna==3.4
jmespath==1.0.1
multidict==6.0.4
pika==1.3.2
pika-stubs==0.1.3
psycopg==3.1.10
pydantic==2.3.0
pydantic_core==2.6.3
pygost==5.12
python-dateutil==2.8.2
python-multipart==0.0.6
requests==2.31.0
s3transfer==0.6.2
six==1.16.0
sniffio==1.3.0
SQLAlchemy==2.0.20
starlette==0.27.0
typing_extensions==4.7.1
urllib3==1.26
uvicorn==0.23.2
yarl==1.9.2