From 90d628be0161660641f43e9eb14c13b9bbffee7d Mon Sep 17 00:00:00 2001 From: Ivan Vazhenin Date: Thu, 25 Apr 2024 20:34:51 +0300 Subject: [PATCH] . --- .gitignore | 8 + .gitlab-ci.yml | 119 ++++ README.md | 0 VERSION | 1 + pyproject.toml | 53 ++ replication_itv/__init__.py | 0 replication_itv/__main__.py | 565 ++++++++++++++++++ replication_itv/config.py | 39 ++ replication_itv/db.py | 166 +++++ replication_itv/reqs/__init__.py | 0 replication_itv/reqs/classifier.py | 7 + .../reqs/correcting_replication_service.py | 71 +++ replication_itv/reqs/enumerations.py | 10 + replication_itv/reqs/messages.py | 233 ++++++++ replication_itv/reqs/models.py | 23 + replication_itv/reqs/request_processor.py | 4 + replication_itv/reqs/request_xml_service.py | 109 ++++ replication_itv/reqs_graphql.py | 46 ++ replication_itv/zip.py | 23 + requirements.txt | 40 ++ 20 files changed, 1517 insertions(+) create mode 100644 .gitignore create mode 100644 .gitlab-ci.yml create mode 100644 README.md create mode 100644 VERSION create mode 100644 pyproject.toml create mode 100644 replication_itv/__init__.py create mode 100644 replication_itv/__main__.py create mode 100644 replication_itv/config.py create mode 100644 replication_itv/db.py create mode 100644 replication_itv/reqs/__init__.py create mode 100644 replication_itv/reqs/classifier.py create mode 100644 replication_itv/reqs/correcting_replication_service.py create mode 100644 replication_itv/reqs/enumerations.py create mode 100644 replication_itv/reqs/messages.py create mode 100644 replication_itv/reqs/models.py create mode 100644 replication_itv/reqs/request_processor.py create mode 100644 replication_itv/reqs/request_xml_service.py create mode 100644 replication_itv/reqs_graphql.py create mode 100644 replication_itv/zip.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a0e5cb2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/.idea +/venv +.pytest_cache/ +dist/ +build/ +__pycache__/ +*.pyc +*.egg-info/ \ No newline at end of file diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..428e617 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,119 @@ +--- +image: $BUILDER_IMAGE + +variables: + project: "xmlrpcserver" + +stages: + - init +# - lint +# - test + - build + - pack + - upload + - deploy + - deploy_qa + +include: + - project: "ipd/infra" + file: ".version.yml" + - project: "ipd/infra" + file: ".python_lint.yml" + - project: "ipd/infra" + file: ".python_jobs.yml" + - project: "ipd/infra" + file: ".ansistrano-cd.yml" + +#lint: +# variables: +# project: "journal" +# extends: .python_lint +# allow_failure: false +# only: +# refs: +# - merge_requests +# changes: +# - "**/*.py" + +#pylint: +# variables: +# project: "journal" +# extends: .pylint +# allow_failure: false +# only: +# refs: +# - merge_requests +# changes: +# - "**/*.py" + +#pytest: +# stage: test +# tags: [docker] +# image: $REGISTRY_ADDRESS/ipd/$CI_PROJECT_NAME:builder +# services: +# - name: $DOCKER_postgis +# alias: postgres +# - name: $DOCKER_queue +# alias: rabbitmq +# variables: +# ENV_FOR_DYNACONF: testing +# POSTGRES_PASSWORD: postgres +# POSTGRES_HOST: postgres +# POSTGRES__PGBOUNCER: "" +# AMQP__LOGIN: user +# AMQP__PASSWORD: user +# before_script: +# - apt install python -y +# - pip3 install -i $PYPI_REGISTRY --force-reinstall "poetry==1.4.2" +# - export PATH=$(pwd)/.venv/bin:$PATH +# - rm poetry.lock || true +# - poetry config virtualenvs.in-project true +# - poetry config repositories.nexus $PYPI_REGISTRY +# - poetry source add nexus https://nexus.sitronics-kt.dev/repository/pypi/simple/ --default +# - poetry install +# script: +# - poetry env info +# - poetry run coverage run --concurrency=thread --source=. -m pytest tests --junitxml=report.xml +# - poetry run coverage report +# - poetry run genbadge tests -i - < report.xml -o - > badge.svg +# allow_failure: false +# coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/' +# only: +# refs: +# - devel +# - merge_requests +# artifacts: +# when: always +# paths: +# - badge.svg +# reports: +# junit: report.xml + +docker_builder_build: + extends: .builder_docker_image_dnd + +make_dist: + extends: .dist_make_new + +docker_build: + extends: .docker_build + +get_version: + stage: upload + tags: [docker] + script: + - buildVersion=$(cat ./VERSION) + - echo "serviceVersion=$buildVersion" >> buildVersion.env + - echo $buildVersion + only: + refs: + - merge_requests + artifacts: + reports: + dotenv: buildVersion.env + +#deploy_qa_autotest: +# extends: .deploy_compose_qa +# variables: +# GEODATA_TAG: test +# serviceVersion: $serviceVersion diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..8acdd82 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +0.0.1 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..14b6cee --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,53 @@ +[tool.poetry] +name = "query-itv" +version = "0.1.2" +description = "" +authors = ["Sitronics "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.8" +aiohttp = "3.8.6" +aiosignal = "1.3.1" +annotated-types = "0.5.0" +anyio = "3.7.1" +async-timeout = "4.0.3" +attrs = "23.1.0" +backoff = "2.2.1" +boto3 = "1.26.76" +botocore = "1.29.76" +certifi = "2023.7.22" +charset-normalizer = "3.3.2" +click = "8.1.7" +fastapi = "0.92.0" +frozenlist = "1.4.0" +gql = "3.4.1" +graphql-core = "3.2.1" +greenlet = "2.0.0" +h11 = "0.14.0" +idna = "3.4" +jmespath = "1.0.1" +multidict = "6.0.4" +pika = "1.3.2" +pika-stubs = "0.1.3" +psycopg = "3.1.10" +pydantic = "1.10.13" +pydantic_core = "2.6.3" +pygost = { path = "./xmlrpcserver/deps/pygost-5.12", develop = true } +python-dateutil = "2.8.2" +python-multipart = "0.0.6" +requests = "2.31.0" +s3transfer = "0.6.2" +six = "1.16.0" +sniffio = "1.3.0" +SQLAlchemy = "2.0.20" +typing_extensions = "4.8.0" +urllib3 = "1.26.18" +uvicorn = "0.20.0" +yarl = "1.9.2" +deb_structurer = "*" + + +[build-system] +requires = ["setuptools", "poetry_core>=1.0"] +build-backend = "poetry.core.masonry.api" diff --git a/replication_itv/__init__.py b/replication_itv/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/replication_itv/__main__.py b/replication_itv/__main__.py new file mode 100644 index 0000000..f853905 --- /dev/null +++ b/replication_itv/__main__.py @@ -0,0 +1,565 @@ +from tempfile import TemporaryDirectory +from typing import Optional, Any + +import pika +import threading +import time +from queue import Queue +import datetime +import json +from uuid import uuid4, UUID +from xmlrpc.server import SimpleXMLRPCServer +from xmlrpc.client import ServerProxy +import logging +import io +import zlib +import os.path +import requests +from .reqs_graphql import get_catalog, get_object +from pygost import gost34112012256 +import xml.etree.ElementTree as ET +from .reqs.request_xml_service import RequestXmlService +import zipfile +from .config import Config +from .zip import Zip +import boto3 +import replication_itv.db as db +from sqlalchemy.orm import Session +from fastapi import FastAPI, Response, Form, UploadFile, File, Request +from fastapi.middleware.cors import CORSMiddleware +import uvicorn +from typing_extensions import Annotated +import pathlib +from shutil import make_archive + + +NEW_REPLICATION_REQUEST = 99 +NEW_COMMIT_REQUEST = 98 +NEW_COMMIT_RESPONSE = 1098 +NEW_DATA_REQUEST = 97 +NEW_DATA_RESPONSE = 1097 + + +tasks = Queue() +connected = set() + +connection: Optional[pika.BlockingConnection] = None +channel: Optional[Any] = None +server: Optional[SimpleXMLRPCServer] = None + +logger = logging.getLogger('xmlrpcserver') + + +def s3_connection(): + return boto3.client('s3', endpoint_url=Config.s3_endpoint, + aws_access_key_id=Config.s3_key_id, + aws_secret_access_key=Config.s3_access_key) + + +def download_file(key: str, bucket: str, filename: str): + client = s3_connection() + obj = client.get_object(Bucket=bucket, Key=key) + with open(f"{filename}", 'wb') as f: + for chunk in obj['Body'].iter_chunks(chunk_size=4096): + f.write(chunk) + + +def upload_file(filename: str, key: str, bucket: str): + client = s3_connection() + with open(filename, 'rb') as f: + client.put_object(Body=f.read(), Bucket=bucket, Key=key) + + +def get_branch(bndname: str, scheme: str): + conn = db.connect_db() + with Session(conn) as session: + item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join( + db.User).filter_by(bndname=bndname).one_or_none() + if item: + return item.branch, item.local_scheme + return None, None + + +def run_tasks(): + logger.debug('Task thread started.') + while True: + task = tasks.get() + task() + + +def replication_task(): + while True: + conn = db.connect_db() + with Session(conn) as session: + for item in session.query(db.Queue).join(db.Queue.user).all(): + bndname = item.user.bndname + if bndname not in connected: + continue + if item.user.is_active_now(): + if item.user.newbnd: + replication(bndname, item.commit_id, item.schema) + else: + replication_old(bndname, item.commit_id, item.schema) + session.delete(item) + session.commit() + time.sleep(60) + + +def process_chart(ws, chart, chart_class): + for x in chart.get('properties') or {}: + attr = ws.attribute(x) + if not attr: + continue + if str(attr.type()) == 'AT_Domain': + dom = attr.domain() + key = str(chart['properties'][x]) + chart['properties'][x] = variantToString(dom.value(variantFromString(key))) + if x == 'c103': + chart['properties'][x] = 'Несекретно' + chart['class'] = chart_class['name'].split('_')[-1] + return chart + + +def crc32(filename, chunk_size=65536): + """Compute the CRC-32 checksum of the contents of the given filename""" + with open(filename, "rb") as f: + checksum = 0 + while chunk := f.read(chunk_size): + checksum = zlib.crc32(chunk, checksum) + return "%08X" % (checksum & 0xFFFFFFFF) + + +def send_object_replication(bndname, ws, ids): + qu = GroupQuery(Envelope()) + query = GroupQuery(Envelope.world()) + query.setUids(ids) + uids = [] + ws.load(query, uids) + res = json.loads(ws.dataToJson()) + charts = [process_chart(ws, f, x) for x in res for f in x['features']] + + for chart in charts: + logger.warning('\n') + date = datetime.datetime.now() + rxmls = RequestXmlService() + res_id = uuid4().hex + res = rxmls.get_request_document(res_id, None) + res.set('replication_package', '1') + res.set('replication_version', date.strftime('%Y%m%d%H%M%S')) + res.set('user_permit', 'AA0AA00020200726D3E75C80B713A7A3D3E75C80B713A7A363F7CB889AA3F520') + rxmls.set_result(res, 0, '') + print(chart) + properties = chart.get('properties') or {} + c1000 = properties.get('c1000') + if not c1000: + logger.warning(f'No file for {chart["uid"].replace("-", "")}') + continue + z = Zip() + xml_objects = ET.SubElement(res, 'objects') + created_date = datetime.datetime.fromisoformat(chart.get('date_created')) + xml_chart = ET.SubElement(res, 'chart', { + 'Class': chart['class'], + 'ID': chart['uid'].replace('-', ''), + 'Name': c1000[0]['fileName'], + 'Type': 'OOD', + 'metadata_version': '1', + 'source': Config.self_bnd, + 'system_date': str(created_date.timestamp()), + }) + xml_version = ET.SubElement(xml_objects, 'version', { + 'object_id': chart['uid'].replace('-', ''), + 'source': Config.self_bnd, + 'system_date': str(created_date.timestamp()), + 'version': '1.0', + 'version_id': chart['uid'].replace('-', ''), + }) + total_size = 0 + for file in c1000: + directory = os.path.join(z.dirname, f'maps/{res_id}/ENC_ROOT/{c1000[0]["fileName"]}') + fp = os.path.join(directory, file['fileName']) + if not os.path.exists(directory): + os.makedirs(directory) + download_file(file['key'], Config.s3_bucket, fp) + size = os.stat(fp).st_size + _crc32 = crc32(fp) + ET.SubElement(xml_version, 'file', { + 'cell_file': 'false', + 'crc32': _crc32, + 'crc32_enc': _crc32, + 'file_id': file['key'].replace('-', ''), + 'file_ref': os.path.join(f'maps/{res_id}/ENC_ROOT', file['fileName']), + 'file_size': str(size), + 'file_size_enc': str(size), + }) + xml_version.set('crc32', _crc32) + xml_version.set('crc32', _crc32) + total_size += size + xml_version.set('size', str(total_size)) + xml_version.set('crc32', str(total_size)) + + xml_tags = ET.SubElement(res, 'tags') + xml_archs = ET.SubElement(res, 'archs') + xml_arch = ET.SubElement(xml_archs, 'arch', { + 'obj_id': chart['uid'].replace('-', ''), + 'ver_cl': '8.31', + 'ver_id': chart['uid'].replace('-', ''), + }) + for attribute in properties: + if attribute.startswith('c'): + ET.SubElement(xml_chart, 'Attribute', { + 'name': attribute.replace('_', '.'), + 'value': str(chart['properties'][attribute]), + }) + ET.SubElement(xml_arch, 'attr', { + 'code': attribute.replace('_', '.'), + 'name': attribute.replace('_', '.'), + 'value': str(chart['properties'][attribute]), + }) + params = { + 'from': f'tcp://{Config.self_bnd}', + 'to': f'tcp://{bndname}', + 'ts_added': date.timestamp(), + 'user_id': '0', + 'query_type': NEW_REPLICATION_REQUEST, + 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), + } + filepath = z.pack() + response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] + logger.warning(response_files) + logging.debug('Send replication package') + proxy = ServerProxy(Config.enserver) + try: + proxy.send(params, response_files, Config.ret_path) + except: + logger.error('Error sending') + + +def replication_old(bnd_name: str, commit_id: str, schema: str): + logger.warning('Start replication') + if schema != Config.oodb_schema: + return + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + created, updated, _ = ws.changes(commit_id) + ids = list(set(created) | set(updated)) + send_object_replication(bnd_name, ws, ids) + ws.clearData(True) + logger.warning('Replication to old bnd is sent') + + +def replication(bnd_name: str, commit_id: str, schema: str): + logging.warning(f'{bnd_name} {commit_id} {schema}') + date = datetime.datetime.now() + rxmls = RequestXmlService() + res_id = uuid4().hex + + res = rxmls.get_request_document(res_id, None) + rxmls.set_result(res, 0, '') + ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema}) + response_params = { + 'from': f'tcp://{Config.self_bnd}', + 'to': f'tcp://{bnd_name}', + 'ts_added': date.timestamp(), + 'user_id': '0', + 'query_type': NEW_REPLICATION_REQUEST, + 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), + } + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + oe = IpdExporter(ws) + # commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"} + # commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"} + # commit = {"schema": "gcmr", "commit": "9ec2202e-0991-4695-8b0f-33fe28ea8160"} + # commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"} + z = Zip() + + pc = oe.previousCommit(commit_id) + oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id) + oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id) + + created, updated, deleted = ws.changes(commit_id) + qu = GroupQuery(Envelope()) + qu.setUids(updated) + qu.setLoadArch(True) + uids = [] + ws.clearData(True) + ws.load(qu, uids) + updated_files = [] + for feature_uid in uids: + not_files = True + vers = ws.featureVersion(feature_uid) + if len(vers) > 1: + vers.sort(key=vers_key) + not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"]) + if not not_files: + for attr in vers[0].feature().attributes('c1000'): + updated_files.append(variantToFileValue(attr.val())) + # print(f'key: {file_val.key} bucket: {file_val.bucket} fileName:{file_val.fileName}') + ws.clearData(True) + qu = GroupQuery(Envelope()) + qu.setUids(created) + ws.load(qu, uids) + exported_files = [] + for feature_uid in uids: + feature = ws.featureByUid(feature_uid) + if not feature: + continue + for attr in feature.attributes('c1000'): + updated_files.append(variantToFileValue(attr.val())) + # updated_files.append(feature_uid) + for x in updated_files: + exported_files.append({ + 'key': x.key, + 'bucket': x.bucket, + 'filename': x.fileName, + }) + fp = os.path.join(z.dirname, x.key) + os.makedirs(os.path.dirname(fp), exist_ok=True) + download_file(x.key, x.bucket, fp) + with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f: + f.write(json.dumps(exported_files)) + ws.clearData(True) + # ws.close() + filepath = z.pack() + response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}] + logger.warning(response_files) + logging.debug('Send replication package') + proxy = ServerProxy(Config.enserver) + try: + proxy.send(response_params, response_files, Config.ret_path) + except: + logger.error('Error sending') + + +def pika_callback(ch, method, properties, body): + commit_info = json.loads(body) + logging.warning(commit_info) + schema = commit_info.get('schema') or Config.oodb_schema + commit = commit_info['commit'] + conn = db.connect_db() + with Session(conn) as session: + for user in session.query(db.User).filter(db.User.active == True).all(): + if user.bndname == Config.self_bnd: + continue + profiles = {x.scheme: x.to_dict() for x in user.profiles} + if len(profiles) == 0 or schema in profiles: + item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) + session.add(item) + session.commit() + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def pika_task(): + global connection + global channel + connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + channel = connection.channel() + channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback) + + channel.start_consuming() + + +def put_object(params, files, url): + date = datetime.datetime.now() + req = ET.fromstring(params['query_data']) + obj = req.find('chart') + class_id = obj.get('Class') + con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) + ws = OODBWorkspace.ws(Config.oodb_schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + logging.info(class_id) + fc = ws.featureClass(class_id) + feature = fc.createFeature('') + geom = Polygon.fromExtent(Envelope(0.0, 0.0, 1.0, 1.0, SRFactory.PZ9011())) + res = feature.setGeometry(geom) + for attr in obj.findall('Attribute'): + name = attr.get('name') + value = attr.get('value') + res &= feature.addAttribute(name, variantFromString(value)) + + assert len(files) == 1 + file = files[0] + dir = TemporaryDirectory() + with zipfile.ZipFile(file, 'r') as zip_ref: + zip_ref.extractall(dir.name) + + fp = pathlib.Path(dir.name) + for item in fp.glob('**/*'): + if not item.is_file(): + continue + fileVal = FileValue() + key = uuid4().hex + fileVal.fileName = variantToString(item.relative_to(dir.name)) + fileVal.key = variantToString(key) + fileVal.bucket = variantToString(Config.s3_bucket) + res &= feature.addAttribute('c1000', variantFromFileValue(fileVal)) + upload_file(str(item), key, Config.s3_bucket) + + ws.transaction() + res = ws.save() + ws.commit(f'Putobject from {params["to"]}') + ws.clearData(True) + + +def apply_commits(params, files, url): + logging.warning("Apply commits") + logging.warning(params) + assert len(files) == 1 + file = files[0] + dir = TemporaryDirectory() + r = requests.get(file['url']) + with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref: + zip_ref.extractall(dir.name) + req = ET.fromstring(params['query_data']) + repl = req.find('replication') + scheme = repl.get('scheme') + commit = repl.get('id') + bnd = params['from'].replace('tcp://', '') + branch, new_scheme = get_branch(bnd, scheme) + if new_scheme: + scheme = new_scheme + con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, scheme) + ws = OODBWorkspace.ws(scheme) + logging.warning("Connected to schema") + if Config.ws_rabbit_params and not ws.hasProducer(): + ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'], + Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'], + Config.ws_rabbit_params['password']) + ws.setCurrentUser(bnd) + logging.warning("Set branch if needed") + if branch: + logging.warning(branch) + ws.switchBranch(branch) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + oe = IpdExporter(ws) + logging.warning("Importing...") + if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')): + logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}') + else: + logging.warning(f'Importing commit {commit} finished successfully') + with open(os.path.join(dir.name, 'export_files.json'), 'r') as f: + files_data = json.load(f) + for file_data in files_data: + upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket']) + logging.warning("Finished import") + ws.clearData(True) + + +def get_commit(params, files, url): + date = datetime.datetime.now() + rxmls = RequestXmlService() + req = ET.fromstring(params['query_data']) + req_id = rxmls.get_request_uuid(req) + res_id = uuid4().hex + res_doc = rxmls.get_request_document(res_id, req_id) + schema = req.find('currentCommit').get('scheme') + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema}) + + response_params = { + 'from': params['to'], + 'to': params['from'], + 'ts_added': date.timestamp(), + 'user_id': '1', + 'user_id_to': 0, + 'query_type': NEW_COMMIT_RESPONSE, + 'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True) + } + proxy = ServerProxy(url) + proxy.send(response_params, [], Config.ret_path) + + +def query_commits(params, files, url): + req = ET.fromstring(params['query_data']) + commit_el = req.find('commit') + commit_id = commit_el.get('id') + schema = commit_el.get('schema') + bnd = params['from'].replace('tcp://', '') + + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + schema_commits = ws.commits(ws.branch()) + logger.warning(schema_commits) + if commit_id not in schema_commits: + logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') + return + logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) + + conn = db.connect_db() + with Session(conn) as session: + for commit in schema_commits[schema_commits.index(commit_id) + 1:]: + for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): + if user.bndname == Config.self_bnd: + continue + profiles = {x.scheme: x.to_dict() for x in user.profiles} + if len(profiles) == 0 or schema in profiles: + item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) + logging.warning(item) + session.add(item) + session.commit() + + +def run_task(query_type, params, files, url): + if query_type == NEW_REPLICATION_REQUEST: + tasks.put(lambda: apply_commits(params, files, url)) + if query_type == NEW_COMMIT_REQUEST: + tasks.put(lambda: get_commit(params, files, url)) + if query_type == NEW_COMMIT_RESPONSE: + tasks.put(lambda: query_commits(params, files, url)) + + +def main(): + global connection + global server + global channel + + logger.setLevel(logging.INFO) + logger.warning('Use Control-C to exit') + + thread = threading.Thread(target=run_tasks) + thread.start() + + replication_thread = threading.Thread(target=replication_task) + replication_thread.start() + + pika_thread = threading.Thread(target=pika_task) + pika_thread.start() + + try: + logger.warning('Start server') + uvicorn.run(app, host="0.0.0.0", port=8000) + except KeyboardInterrupt: + logger.warning('Exiting') + finally: + server.server_close() + + +def vers_key(e): + return e.version() + + +if __name__ == '__main__': + main() diff --git a/replication_itv/config.py b/replication_itv/config.py new file mode 100644 index 0000000..42ecb72 --- /dev/null +++ b/replication_itv/config.py @@ -0,0 +1,39 @@ + +class Config: + ret_path: str = 'http://10.10.8.83:32200/' + self_bnd: str = 'bnd127' + enserver: str = 'http://10.10.8.83:32210/xmlrpc' + remote_bnd: str = 'bnd128' + + pg_host: str = '10.10.8.83' + pg_port: int = 32101 + pg_dbname: str = 'db' + pg_username: str = 'postgres' + pg_password: str = 'Root12345678' + + oodb_host: str = '10.10.8.83' + oodb_port: int = 32100 + oodb_dbname: str = 'db' + oodb_username: str = 'postgres' + oodb_passwd: str = 'Root12345678' + oodb_schema: str = 'documents_src' + + rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f' + rabbit_queue: str = 'ipd' + + ws_rabbit_params: dict = { + 'host': '10.10.8.83', + 'port': 31005, + 'exchange': 'ipd', + 'user': 'user', + 'password': 'password', + } + + s3_endpoint: str = 'http://10.10.8.83:31006' + s3_key_id: str = 's57' + s3_access_key: str = 'd9MMinLF3U8TLSj' + s3_bucket: str = 'files' + + gql_url: str = 'https://gql.ivazh.ru/graphql' + gql_download: str = 'https://gql.ivazh.ru/item/{key}' + gql_schema: str = 'pdim' diff --git a/replication_itv/db.py b/replication_itv/db.py new file mode 100644 index 0000000..bc6833a --- /dev/null +++ b/replication_itv/db.py @@ -0,0 +1,166 @@ +from datetime import datetime +from typing import List, Optional +from sqlalchemy import create_engine, String, select, ForeignKey, Enum +from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship +from config import Config + + +def tow(day: int, hour: int, minute: int): + return minute + hour * 60 + day * 60 * 24 + + +class Base(DeclarativeBase): + pass + + +class User(Base): + __tablename__ = 'users' + + id: Mapped[int] = mapped_column(primary_key=True) + username: Mapped[str] + passwd: Mapped[str] + bndname: Mapped[str] + newbnd: Mapped[bool] + active: Mapped[bool] + upstream: Mapped[bool] + + profiles: Mapped[List['Profile']] = relationship( + back_populates='user', cascade='all, delete-orphan' + ) + + income_branches: Mapped[List['IncomeBranch']] = relationship( + back_populates='user', cascade='all, delete-orphan' + ) + + schedule: Mapped[List['Schedule']] = relationship( + back_populates='user', cascade='all, delete-orphan' + ) + + queue: Mapped[List['Queue']] = relationship( + back_populates='user', cascade='all, delete-orphan' + ) + + def __repr__(self) -> str: + return f'User(id={self.id!r}, username={self.username!r}, password={self.passwd!r}, newbnd={self.newbnd})' + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'username': self.username, + 'bndname': self.bndname, + 'newbnd': self.newbnd, + 'active': self.active, + 'upstream': self.upstream, + 'profiles': [x.to_dict() for x in self.profiles], + 'schedule': [x.to_dict() for x in self.schedule], + 'queue': [x.to_dict() for x in self.queue], + 'income_branches': [x.to_dict() for x in self.income_branches], + } + + def is_active_now(self): + if not len(self.schedule): + return True + dt = datetime.now() + curr_tow = tow(dt.weekday(), dt.hour, dt.minute) + for x in self.schedule: + if (tow(x.day_start, x.hour_start, x.minute_start) <= curr_tow + <= tow(x.day_end, x.hour_end, x.minute_end)): + return True + return False + + +class Profile(Base): + __tablename__ = 'profiles' + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey('users.id')) + scheme: Mapped[str] + branch: Mapped[str] = mapped_column(String, nullable=True) + json: Mapped[str] = mapped_column(String, nullable=True) + no_files: Mapped[bool] + + user: Mapped['User'] = relationship(back_populates='profiles') + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'scheme': self.scheme, + 'branch': self.branch, + 'json': self.json, + } + + +class IncomeBranch(Base): + __tablename__ = 'income_branches' + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey('users.id')) + scheme: Mapped[str] + branch: Mapped[str] + local_scheme: Mapped[str] + + user: Mapped['User'] = relationship(back_populates='income_branches') + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'scheme': self.scheme, + 'branch': self.branch, + 'local_scheme': self.local_scheme, + } + + +class Schedule(Base): + __tablename__ = 'schedule' + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey('users.id')) + day_start: Mapped[int] + hour_start: Mapped[int] + minute_start: Mapped[int] + day_end: Mapped[int] + hour_end: Mapped[int] + minute_end: Mapped[int] + + user: Mapped['User'] = relationship(back_populates='schedule') + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'day_start': self.day_start, + 'hour_start': self.hour_start, + 'minute_start': self.minute_start, + 'day_end': self.day_end, + 'hour_end': self.hour_end, + 'minute_end': self.minute_end, + } + + +class Queue(Base): + __tablename__ = 'queue' + + id: Mapped[int] = mapped_column(primary_key=True) + user_id: Mapped[int] = mapped_column(ForeignKey('users.id')) + commit_id: Mapped[str] + schema: Mapped[str] + + user: Mapped['User'] = relationship(back_populates='queue') + + def to_dict(self) -> dict: + return { + 'id': self.id, + 'commit_id': self.commit_id, + 'schema': self.schema, + } + + +class Schemas(Base): + __tablename__ = 'schemas' + + id: Mapped[int] = mapped_column(primary_key=True) + schema: Mapped[str] + schema_type: Mapped[str] + + +def connect_db(): + return create_engine(f"postgresql+psycopg://{Config.pg_username}:{Config.pg_password}@{Config.pg_host}:{Config.pg_port}/{Config.pg_dbname}") diff --git a/replication_itv/reqs/__init__.py b/replication_itv/reqs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/replication_itv/reqs/classifier.py b/replication_itv/reqs/classifier.py new file mode 100644 index 0000000..371e1a0 --- /dev/null +++ b/replication_itv/reqs/classifier.py @@ -0,0 +1,7 @@ +import graphql + +CLASSIFIER_VERSION = '8.23' + +class Classifier: + def get_classifier(self): + diff --git a/replication_itv/reqs/correcting_replication_service.py b/replication_itv/reqs/correcting_replication_service.py new file mode 100644 index 0000000..40cd5fb --- /dev/null +++ b/replication_itv/reqs/correcting_replication_service.py @@ -0,0 +1,71 @@ +from datetime import datetime, timedelta +from enumerations import ReplicationPackageStatusEnum +from models import CorrectingReplicationOutPackage, ReplicantInfo +from typing import List +import xml.etree.ElementTree as ET + + +class CorrectingReplicationService: + EVENT_TYPE: str = 'correcting_replication' + object_list_size: int = 0 + deleted_objects_count: int = 0 + PACKAGE_TAG_NAME: str = 'package' + PACKAGE_ID_ATTRIBUTE_NAME: str = 'pkg_id' + SERVER_VERSION_ATTRIBUTE_NAME: str = 'server_version' + SERVER_ID_ATTRIBUTE_NAME: str = 'server_id' + OBJECTS_TAG_NAME: str = 'objects' + OBJECT_TAG_NAME: str = 'object' + OBJECT_ID_ATTRIBUTE_NAME: str = 'id' + IS_RSC_OBJECT: str = 'is_rsc' + OBJECT_SOURCE_ATTRIBUTE_NAME: str = 'source' + OBJECT_VERSION_ATTRIBUTE_NAME: str = 'version' + METADATA_VERSION_ATTRIBUTE_NAME: str = 'md_version' + ACTUAL_VERSION_DATE_ATTRIBUTE_NAME: str = 'version_date' + OBJECT_MISSING_ATTRIBUTE_NAME: str = 'missing' + CLASSIFIER_VERSION_ATTRIBUTE_NAME: str = 'metadata_clientVersion' + RESULT_TAG_NAME: str = 'result' + RESULT_MESSAGE_ATTRIBUTE_NAME: str = 'result_message' + VERSION_ID_ATTRIBUTE_NAME: str = 'version_id' + VERSION_SOURCE_ATTRIBUTE_NAME: str = 'version_source' + + def start_correcting_replication(self, replicant_id: str, replication_period: int, replication_timeout: int, + manual_run: bool): + replicant = ReplicantInfo() + self.start_correcting_replication(replicant, replication_period, replication_timeout, manual_run) + + def start_correcting_replication(self, except_replicant_id: List[str], replication_period: int, + replication_timeout: int, manual_run: bool): + pass + + def start_correcting_replication(self, replicant: ReplicantInfo, replication_period: int, replication_timeout: int, + manual_run: bool): + pass + + def create_replication_package(self, replicant: ReplicantInfo, replication_period: int, replication_timeout: int) -> CorrectingReplicationOutPackage: + pkg = CorrectingReplicationOutPackage() + pkg.filter_string = replicant.filter_string + pkg.replicant_id = replicant.replicant_id + pkg.server_address = replicant.server_address + pkg.replication_period = replication_period + pkg.replication_timeout = replication_timeout + pkg.status = ReplicationPackageStatusEnum.SENT_REQUEST_TO_CHILD + date = datetime.now() + pkg.replication_date = date + if replication_period: + delta = timedelta(days=-replication_period) + pkg.from_date = date + delta + if replication_timeout: + delta = timedelta(seconds=replication_timeout) + pkg.replication_timeout_date = date + delta + return pkg + + def get_package_xml(self, pkg: CorrectingReplicationOutPackage, request_uuid: str) -> str: + request_xml: ET.Element = xml_service.get_common_request_xml(request_uuid) + element = ET.SubElement(request_xml, self.PACKAGE_TAG_NAME, { + self.PACKAGE_ID_ATTRIBUTE_NAME: pkg.package_id, + self.SERVER_VERSION_ATTRIBUTE_NAME: '1.0.0', + self.SERVER_ID_ATTRIBUTE_NAME: 'ipd-server', + self.CLASSIFIER_VERSION_ATTRIBUTE_NAME: classifier., + }) + xml_service.set_request_uuid(request_xml, request_uuid) + return ET.tostring(request_xml, encoding='unicode', xml_declaration=True) diff --git a/replication_itv/reqs/enumerations.py b/replication_itv/reqs/enumerations.py new file mode 100644 index 0000000..b55aa0d --- /dev/null +++ b/replication_itv/reqs/enumerations.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class ReplicationPackageStatusEnum(Enum): + CANCELLED = -4 + TIMEDOUT = -3 + ERROR = -2 + SENT_REQUEST_TO_CHILD = 1 + SENT_PACKAGE_TO_CHILD = 2 + COMPLETED = 3 diff --git a/replication_itv/reqs/messages.py b/replication_itv/reqs/messages.py new file mode 100644 index 0000000..75d2ad4 --- /dev/null +++ b/replication_itv/reqs/messages.py @@ -0,0 +1,233 @@ +from enum import Enum + + +class Messages(Enum): + REQUEST_EXECUTED_PARTITIALY = 1 + REQUEST_FAILED = 2 + OBJECT_FORBIDDEN = 3 + INVALID_REQUEST_USER = 4 + ACTION_FORBIDDEN = 5 + OBJECTS_FORBIDDEN = 6 + INVALID_REPLICATION_PARAMETERS = 7 + OBJECT_LIST = 11 + OBJECT_LIST_ITEM = 12 + MISSING_TAG = 13 + NO_OBJECTS_AVAILABLE = 14 + OBJECT_BASE_CELL_NOT_FOUND = 15 + WRONG_VERSION_INTERVAL = 16 + REQUEST_PROCESS_STARTED = 17 + REQUEST_PROCESS_FINISHED = 18 + USER_IS_NOT_VALID = 19 + RESPONSE_DELIVERY_ERROR = 20 + RESPONSE_DELIVERY_SUCCESS = 21 + RESPONSE_SEND = 22 + RESPONSE_SEND_ERROR = 23 + DUPLICATE_CHART_NAME = 24 + EXTENDED_METADATA_FORBIDDEN = 25 + MAX_RESULT_COUNT_EXCEEDED = 26 + MAX_RESULT_SIZE_EXCEEDED = 27 + CLIENT_DISCONNECTED = 28 + NO_REQUEST_DATA_PROVIDED = 29 + NO_USER_ID_PROVIDER = 30 + INVALID_REQUEST_DATA = 31 + UNKNOWN_FILTER_ATTRIBUTE_CODE = 32 + UNKNOWN_FILTER_OPERATION = 33 + ARCHIVE_ACCESS_FORBIDDEN = 34 + BROKEN_S57_FILE = 35 + UNKNOWN_OBJECT_ID = 36 + NO_FILTER_IN_REPORT = 37 + UPLOAD_OBJECT_LIST = 100 + UPLOAD_OBJECT_LIST_ITEM = 101 + UPLOAD_WRONG_STRUCTURE = 102 + FOLDER_NOT_FOUND = 103 + UPLOAD_WRONG_SUBVERSION = 104 + UPLOAD_MISSING_CHARTS_FILE = 105 + UPLOAD_NO_CHARTS_IN_FILE = 106 + UPLOAD_WRONG_CHART_TYPE = 107 + UPLOAD_OBJECT_EXISTS = 108 + UPLOAD_BASE_CELL_NOT_FOUND = 109 + UPLOAD_WRONG_VERSION_SEQUENCE = 110 + WRONG_DELETE_VERSION_NUMBER = 111 + CANT_CHANGE_STAMP = 112 + OBJECT_ALREADY_UPDATED = 113 + OBJECT_LOCKED = 114 + UPLOAD_MAIN_FILE_NOT_FOUND = 115 + UPLOAD_VERSION_NUMBER_NOT_THE_SAME = 116 + FROM_ADDRESS = 117 + TO_ADDRESS = 118 + UPLOAD_OBJECT_EXISTS_BY_SIZE_AND_CHECKSUM = 119 + CORRECTING_REPLICATION_TIMED_OUT = 120 + CORRECTING_REPLICATION_AUTO_STARTED = 121 + CORRECTING_REPLICATION_MANUAL_STARTED = 122 + CORRECTING_REPLICATION_ALREADY_STARTED = 123 + CORRECTING_REPLICATION_CANCELLED = 124 + REPLICATION_SENT = 125 + REPLICATION_RECEIVED = 126 + WRONG_OBJECT_FILE_CHECKSUM = 127 + WRONG_UPLOADING_FILE_CHECKSUM = 128 + WRONG_UPLOADING_FILE_SIZE = 129 + CORRECTING_REPLICATION_INFO = 130 + CORRECTING_REPLICATION_CLASSIFIER = 131 + CORRECTING_REPLICATION_LIST_OBJECT = 132 + GET_CORRECTING_REPLICATION_PACKAGE = 133 + CORRECTING_REPLICATION_SIZE_LIMIT = 134 + PACKAGE_INFO = 135 + CORRECTING_REPLICATION_SIZE_ARCHIVE = 136 + SET_CORRECTING_REPLICATION_PACKAGE = 137 + CHECK_DUPLICATE = 138 + CHECK_TAG = 139 + DELETED_DUPLICATE = 140 + STREAMING_REPLICATION_STARTED = 141 + ERROR_PACKAGE_NUM = 142 + STREAMING_REPLICATION_FINISHED = 143 + DELETE_VERSION_STARTED = 144 + DELETE_VERSION_FINISHED = 145 + UPLOAD_OBJECTS_EXISTS_DIFF_CHECKSUM = 146 + UPLOAD_OBJECTS_WITH_LESS_VERSION = 147 + CORRECTING_REPLICATION_OBJECT_COUNTS = 148 + PACKAGE_INFO_FAILED_OBJECTS = 149 + PACKAGE_INFO_INCOMING_OBJECTS = 150 + OBJECT_INFO_DIDNT_ADD = 151 + TAG_NOT_FOUND = 200 + MISSING_ATTRIBUTE = 201 + DUPLICATE_TAG_ID = 202 + CANT_HIDE_TAG = 203 + CANT_MOVE_TAG = 204 + CANT_HIDE_REGION = 205 + CANT_HIDE_REGIONS = 206 + CANT_HIDE_CHILD_REGION = 207 + CANT_HIDE_CHILD_REGIONS = 208 + WRONG_MID = 300 + MAUNFACTURER_KEY_NOT_FOUND = 301 + OBJECT_NOT_FOUND = 302 + OBJECT_REQUESTED_FROM_PARENT_SERVER = 303 + OBJECT_HAS_NO_FILES = 304 + WRONG_EXPIRATION_PERIOD = 400 + OBJECT_LIST_HAS_DUBLICATES = 401 + ACTIVE_SUBSCRIPTION_ALREADY_EXISTS = 402 + ACTIVE_SUBSCRIPTION_NOT_EXISTS = 403 + WRONG_TYPE_ALL_CHARTS = 500 + NO_CLASSIFIER_FILE_ON_SERVER = 600 + NO_CLASSIFIER_VERSION_ON_SERVER = 601 + CLASSIFIERS_VERSIONS_DIFFER = 602 + WRONG_OBJECT_PARAMETER_CODE = 603 + ILLEGAL_PARAMETER_VALUE = 604 + ILLEGAL_PARAMETER_TYPE = 605 + ILLEGAL_PARAMETER_RANGE = 606 + NOT_ALL_REQUIRED_TYPES = 607 + ILLEGAL_ATTRIBUTE_VALUE = 608 + NOT_ALLOWED_ITEM_FOR_TYPE = 609 + NO_CHART_CLASS_WITH_CODE = 610 + CHART_CLASS_CONTAINS_NONEXISTED_ELEMENT = 611 + CLASS_GROUP_CONTAINS_NONEXISTED_CLASS = 612 + SAME_CLASSIFIER_VERSION = 614 + NO_CLASSIFIER_VERSION = 615 + UNKNOWN_ATTR_VAL_DATA_TYPE = 616 + NOT_ALL_USING_ITEMS_IN_CLASSIFIER = 617 + NO_REQIRED_CLASS_CODE_IN_CLASSIFIER = 618 + REQUIRED_ADDED_TO_CHART_CLASS = 619 + SET_CLASSIFIER_STARTED_AUDIT = 620 + DATA_VALIDATION_STARTED_AUDIT = 621 + DATA_VALIDATION_FAILED_AUDIT = 622 + DATA_VALIDATION_ENDED_AUDIT = 623 + SAVING_CLASSIFIER_STARTED_AUDIT = 624 + SAVING_CLASSIFIER_FINISHED_AUDIT = 625 + SEND_SET_CLASSIFIER_REQUEST_AUDIT = 626 + GET_SET_CLASSIFIER_REQUEST_AUDIT = 627 + GET_GET_CLASSIFIER_REQUEST_AUDIT = 628 + IKP_USER_DECISION_TYPE = 700 + IKP_FILE_STRUCTURE_CHECKER_TYPE = 701 + IKP_FILE_STRUCTURE_DB_LINKS_CHECKER_TYPE = 702 + IKP_DB_LINKS_FILE_STRUCTURE_CHECKER_TYPE = 703 + IKP_CHECK_SUM_CHECKER_TYPE = 704 + IKP_BACKUP_TO_DB_TYPE = 705 + IKP_RECOVER_FROM_DB_TYPE = 706 + IKP_RECOVER_FROM_BACKUP_SERVER_TYPE = 707 + IKP_RECOVER_FROM_CHILD_SERVER_TYPE = 708 + IKP_DB_STRUCTURE_CHECKER_TYPE = 709 + IKP_DB_STRUCTURE_MISSING_COLUMN = 710 + IKP_DB_STRUCTURE_WRONG_COLUMN_TYPE = 711 + IKP_DB_STRUCTURE_WRONT_MAX_LENGTH = 712 + IKP_DB_STRUCTURE_MISSING_TABLE = 713 + IKP_NOTHING_TO_RECOVER = 714 + IKP_ERROR_NOT_FIXED = 715 + IKP_FULL_BACKUP_TO_DB_TYPE = 716 + EXPORT_TO_FOLDER_START = 717 + EXPORT_TO_FOLDER_FINISHED = 718 + EXPORT_TO_FOLDER_ERROR = 719 + EXPORT_TO_FOLDER_DELETE_FILE = 720 + IMPORT_FROM_FOLDER_START = 721 + IMPORT_FROM_FOLDER_FINISHED = 722 + IMPORT_FROM_FOLDER_ERROR = 723 + IMPORT_FOLDER_NOT_FOUND = 724 + FILE_NOT_FOUND = 725 + IMPORT_FROM_FOLDER_MERGE_ERROR = 726 + FILE_NOT_FOUND_COUNTER = 727 + LAST_CORRECTION_DATE_NEWER_THAN_NEW_CORRECTION = 728 + CREATION_DATE_MISSING = 729 + IMPORT_FROM_FOLDER_DIDNT_START = 730 + EXPORT_TO_FOLDER_DIDNT_START = 731 + IKP_STEP_START = 800 + IKP_STEP_SUCCESS = 801 + IKP_STEP_ERROR = 802 + IKP_NO_CHECK_ERROR_FOUND = 803 + IKP_SEND_USER_DECISION = 804 + IKP_GET_USER_DECISION = 805 + IKP_FILE_STRUCTURE_ERROR = 806 + IKP_FILE_STRUCTURE_DB_LINKS_ERROR = 807 + IKP_CHECK_SUM_ERROR = 808 + IKP_DB_LINKS_FILE_STRUCTURE_ERROR = 809 + IKP_DB_STRUCTURE_TABLE_COLUMN_ERROR = 810 + IKP_DB_STRUCTURE_TABLE_ERROR = 811 + IKP_STEP_STOPPED = 812 + IKP_WRONG_FILE_ERROR = 813 + IKP_TASK_RUN_COLLISION = 814 + IKP_FIX_SXF_METADATA_CHECKSUM_INFO = 815 + IKP_FIX_SXF_METADATA_CHECKSUM_PASSPORT_ERROR = 816 + IKP_FIX_SXF_METADATA_CHECKSUM_FILE_ERROR = 817 + IKP_MD_RECOVERING_STARTED = 818 + IKP_MD_RECOVERING_NO_VERSION = 819 + IKP_MD_RECOVERING_CREATED = 820 + IKP_MD_RECOVERING_UPDATED = 821 + IKP_MD_RECOVERING_SUCCEED = 822 + IKP_MD_RECOVERING_FAILED = 823 + IKP_CHECK_AVAILABLE_DISK_SPACE_TYPE = 825 + IKP_SCRIPT_RUN = 827 + IKP_SCRIPT_END_WORK_SUCCEED = 828 + IKP_SCRIPT_END_WORK_FAILED = 829 + IKP_SCRIPT_ALREADY_STARTED = 831 + IKP_MERGE_SAME_OBJECTS_INFO = 832 + IKP_MERGE_SAME_OBJECTS_SUCCESSES = 833 + IKP_MERGE_SAME_OBJECTS_WARNING = 834 + IKP_MERGE_SAME_OBJECTS_ERROR = 835 + IKP_MERGE_SAME_OBJECTS_MERGE_TYPE_VERSIONS = 836 + IKP_MERGE_SAME_OBJECTS_MERGE_TYPE_DATES = 837 + RSC_CLASSIFIER_UPLOAD_STARTED = 838 + RSC_CLASSIFIER_UPLOAD_ERROR = 839 + RSC_CLASSIFIER_UPLOAD_FINISHED = 843 + IKP_CLEAR_OUT_FOLDER_INFO = 840 + IKP_CLEAR_OUT_FOLDER_SUCCESS_DELETE = 841 + IKP_CLEAR_OUT_FOLDER_ERROR_DELETE = 842 + SET_CHART_METADATA = 844 + ERROR_WHILE_COPING_FILE = 845 + ERROR_ADD_OBJECT = 848 + MISSING_REGIONS_CHARTS = 849 + NO_MISSING_REGIONS = 850 + IKP_CLEAR_DOWNLOAD_CATALOG_WARNING = 855 + IKP_CLEAR_DOWNLOAD_CATALOG_ERROR = 856 + IKP_CLEAR_DOWNLOAD_CATALOG_INFO = 857 + REPLICATION_PACKAGE_NOT_FOUND = 858 + REPLICATION_PACKAGE_WRONG_STATUS = 859 + NOT_UNIQUE_METADATA = 860 + IKP_SCALE_RECOVERING_STARTED = 861 + IKP_SCALE_RECOVERING_FAILED = 862 + IKP_SCALE_RECOVERING_FINISHED = 863 + OBJECTS_REGIONS_CHECKING_STARTED = 865 + OBJECTS_REGIONS_CHECKING_FINISHED = 866 + OBJECTS_REGIONS_CHECKING_FAILED = 867 + BLACK_HOLE_TRANSMITTER_IS_OFF = 868 + LACK_VERSIONS_IN_TIME_PERIOD = 869 + ERROR_STOPPING_IKP = 870 + IKP_STOPPED = 871 + REPLICATION_ERROR_TEST = 872 + BLACK_HOLE_NOT_PRESENT = 873 diff --git a/replication_itv/reqs/models.py b/replication_itv/reqs/models.py new file mode 100644 index 0000000..e7f21c9 --- /dev/null +++ b/replication_itv/reqs/models.py @@ -0,0 +1,23 @@ +from datetime import datetime + + +class ReplicantInfo: + server_address: str + filter_string: str + replicant_id: str + replicant_user_id: str + replicate_classifiers: bool + + +class CorrectingReplicationOutPackage: + package_id: int + status: int + from_date: datetime + replication_date: datetime + replication_timeout_date: datetime + end_date: datetime + replicant_id: str + server_address: str + filter_string: str + replication_period: int + replication_timeout: int diff --git a/replication_itv/reqs/request_processor.py b/replication_itv/reqs/request_processor.py new file mode 100644 index 0000000..073bec8 --- /dev/null +++ b/replication_itv/reqs/request_processor.py @@ -0,0 +1,4 @@ + +class RequestProcessor: + def __init__(self): + pass diff --git a/replication_itv/reqs/request_xml_service.py b/replication_itv/reqs/request_xml_service.py new file mode 100644 index 0000000..4d84447 --- /dev/null +++ b/replication_itv/reqs/request_xml_service.py @@ -0,0 +1,109 @@ +import xml.etree.ElementTree as ET +from replication_itv.reqs.messages import Messages + + +class RequestXmlService: + REQUEST_NODE_NAME: str = 'request' + HEADER_NODE_NAME: str = 'header' + RESULT_NODE_NAME: str = 'result' + REQUEST_ID_ATTRIBUTE_NAME: str = 'parcel_id' + RESPONSE_ID_ATTRIBUTE_NAME: str = 'reply_parcel_id' + SOURCE_REQUEST_ID_ATTRIBUTE_NAME: str = 'source_parcel_id' + RESULT_CODE_ATTRIBUTE_NAME: str = 'result_code' + RESULT_MESSAGE_ATTRIBUTE_NAME: str = 'result_message' + HEADER_XPATH: str = HEADER_NODE_NAME + + def get_request_uuid(self, document: ET.Element) -> str: + return self.get_header_attribute(document, self.REQUEST_ID_ATTRIBUTE_NAME) + + def get_response_uuid(self, document: ET.Element) -> str: + return self.get_header_attribute(document, self.RESPONSE_ID_ATTRIBUTE_NAME) + + def get_source_request_uuid(self, document: ET.Element) -> str: + return self.get_header_attribute(document, self.SOURCE_REQUEST_ID_ATTRIBUTE_NAME) + + def get_header_node(self, document: ET.Element) -> ET.Element: + return document.find(self.HEADER_XPATH) + + def get_header_attribute(self, document: ET.Element, attribute_name: str) -> str: + header = self.get_header_node(document) + if header is None: + raise Exception(Messages.MISSING_TAG.value) + result = header.get(attribute_name) + return result.strip() if result else '' + + def get_request_document(self, request_uuid: str, response_uuid) -> ET.Element: + document = ET.Element(self.REQUEST_NODE_NAME) + header = ET.SubElement(document, self.HEADER_NODE_NAME) + header.set(self.REQUEST_ID_ATTRIBUTE_NAME, request_uuid.strip()) + if response_uuid: + header.set(self.RESPONSE_ID_ATTRIBUTE_NAME, response_uuid.strip()) + return document + + def get_common_response_xml(self, request_uuid: str, response_uuid: str, result_code: int, result_message: str) -> ET.Element: + document = self.get_request_document(response_uuid, request_uuid) + + result = ET.SubElement(document, self.RESULT_NODE_NAME) + result.set(self.RESULT_CODE_ATTRIBUTE_NAME, str(result_code)) + result.set(self.RESULT_MESSAGE_ATTRIBUTE_NAME, result_message) + + return document + + def set_result(self, document: ET.Element, result_code: int, result_message: str): + result = document.find(self.RESULT_NODE_NAME) + if result is None: + result = ET.SubElement(document, self.RESULT_NODE_NAME) + result.set(self.RESULT_CODE_ATTRIBUTE_NAME, str(result_code)) + result.set(self.RESULT_MESSAGE_ATTRIBUTE_NAME, result_message) + + def set_request_uuid(self, document: ET.Element, request_uuid: str): + header = self.get_header_node(document) + header.set(self.REQUEST_ID_ATTRIBUTE_NAME, request_uuid.strip()) + + def set_source_request_uuid(self, document: ET.Element, source_request_uuid: str): + header = self.get_header_node(document) + header.set(self.SOURCE_REQUEST_ID_ATTRIBUTE_NAME, source_request_uuid.strip()) + + def get_common_request_xml(self, request_uuid: str) -> ET.Element: + return self.get_request_document(request_uuid, None) + + def get_response_xml(self, document: ET.Element, request_uuid, response_uuid: str, result_code: int, result_message: str) -> ET.Element: + header = document.find(self.HEADER_NODE_NAME) + header.set(self.REQUEST_ID_ATTRIBUTE_NAME, response_uuid.strip()) + header.set(self.RESPONSE_ID_ATTRIBUTE_NAME, request_uuid.strip() if request_uuid else None) + result = document.find(self.RESULT_NODE_NAME) + result.set(self.RESULT_CODE_ATTRIBUTE_NAME, str(result_code)) + result.set(self.RESULT_MESSAGE_ATTRIBUTE_NAME, result_message) + + def get_element(self, document: ET.Element, node: ET.Element, name: str) -> ET.Element: + for child in node: + if child.tag.casefold() == name.casefold(): + return child + return ET.SubElement(document, name) + + def validate_common_request_xml(self, xml: str): + if not xml: + raise Exception(Messages.NO_REQUEST_DATA_PROVIDED.name) + try: + document = ET.parse(xml) + except: + raise Exception(Messages.INVALID_REQUEST_DATA) + self.validate_common_request_xml(document) + + def validate_common_request_xml(self, document): + if not document: + raise Exception(Messages.NO_REQUEST_DATA_PROVIDED.name) + if document.tag != self.REQUEST_NODE_NAME: + raise Exception(Messages.MISSING_TAG.name, self.REQUEST_NODE_NAME) + try: + request_uuid = self.get_header_attribute(document, self.REQUEST_ID_ATTRIBUTE_NAME) + except: + raise Exception(Messages.INVALID_REQUEST_DATA) + if not request_uuid: + raise Exception(Messages.MISSING_ATTRIBUTE, self.REQUEST_ID_ATTRIBUTE_NAME, self.HEADER_NODE_NAME) + + def get_result_code(self, document: ET.Element): + result = self.get_element(document, document, self.RESULT_NODE_NAME) + if not result: + return None + return result.get(self.RESULT_CODE_ATTRIBUTE_NAME) diff --git a/replication_itv/reqs_graphql.py b/replication_itv/reqs_graphql.py new file mode 100644 index 0000000..65d8ee3 --- /dev/null +++ b/replication_itv/reqs_graphql.py @@ -0,0 +1,46 @@ +from gql import gql, Client +from gql.transport.aiohttp import AIOHTTPTransport +from .config import Config + +transport = AIOHTTPTransport(url=Config.gql_url) +service = Config.gql_schema + + +def get_classifier(): + client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None) + query = gql( + """ + query getClassifier($name: String!) { + getClassifier(name: $name) + } + """ + ) + result = client.execute(query, variable_values={"name": service}, ) + return result['getClassifier'] + + +def get_catalog(): + client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None) + query = gql( + """ + query getCatalog($name: String!) { + getCatalog(name: $name) + } + """ + ) + result = client.execute(query, variable_values={"name": service}) + return result['getCatalog'] + + +def get_object(oid: str): + client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None) + query = gql( + """ + query getObjects($oid: String!, $name: String!) { + getObject(name: $name, oid: $oid) + } + """ + ) + params = {'oid': oid, 'name': service} + result = client.execute(query, variable_values=params) + return result['getObject'] diff --git a/replication_itv/zip.py b/replication_itv/zip.py new file mode 100644 index 0000000..694a4ff --- /dev/null +++ b/replication_itv/zip.py @@ -0,0 +1,23 @@ +import os +from tempfile import TemporaryDirectory, mktemp +from zipfile import ZipFile + + +class Zip: + + def __init__(self): + self.dir = TemporaryDirectory() + + @property + def dirname(self): + return self.dir.name + + def pack(self): + tmp_zip = mktemp(suffix='.zip') + with ZipFile(tmp_zip, 'w') as zip_object: + for folder_name, sub_folders, file_names in os.walk(self.dir.name): + for filename in file_names: + file_path = os.path.join(folder_name, filename) + rel_file_path = os.path.relpath(file_path, self.dir.name) + zip_object.write(file_path, rel_file_path) + return tmp_zip diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8ecf94b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,40 @@ +--find-links=deps +aiohttp==3.8.4 +aiosignal==1.3.1 +annotated-types==0.5.0 +anyio==3.7.1 +async-timeout==4.0.3 +attrs==23.1.0 +backoff==2.2.1 +boto3==1.28.37 +botocore==1.31.40 +certifi==2023.7.22 +charset-normalizer==3.2.0 +click==8.1.7 +fastapi==0.103.1 +frozenlist==1.4.0 +gql==3.5.0b5 +graphql-core==3.3.0a3 +greenlet==2.0.2 +h11==0.14.0 +idna==3.4 +jmespath==1.0.1 +multidict==6.0.4 +pika==1.3.2 +pika-stubs==0.1.3 +psycopg==3.1.10 +pydantic==2.3.0 +pydantic_core==2.6.3 +pygost==5.12 +python-dateutil==2.8.2 +python-multipart==0.0.6 +requests==2.31.0 +s3transfer==0.6.2 +six==1.16.0 +sniffio==1.3.0 +SQLAlchemy==2.0.20 +starlette==0.27.0 +typing_extensions==4.7.1 +urllib3==1.26 +uvicorn==0.23.2 +yarl==1.9.2