Compare commits

...

30 Commits

Author SHA1 Message Date
Ivan Vazhenin
cdaccb911b Correcting replication and no_files flag
All checks were successful
Build xmlrpcserver image / Build (push) Successful in 56s
2024-04-08 20:30:34 +03:00
Ivan Vazhenin
219b597042 Add build action
All checks were successful
Build xmlrpcserver image / Build (push) Successful in 45s
2024-02-25 16:21:31 +03:00
Ivan Vazhenin
201b2d2da6 Add build action
All checks were successful
Build xmlrpcserver image / Build (push) Successful in 57s
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 7s
2024-02-25 16:14:37 +03:00
Ivan Vazhenin
a742774ae1 Add build action
Some checks failed
Build xmlrpcserver image / Build (push) Failing after 19s
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 6s
2024-02-25 16:13:26 +03:00
Ivan Vazhenin
3230931e4e Add build action
Some checks failed
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 8s
Build xmlrpcserver image / Explore-Gitea-Actions (push) Failing after 1s
2024-02-25 16:02:40 +03:00
Ivan Vazhenin
4078dbd84c fix
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 7s
2024-02-25 15:44:54 +03:00
Ivan Vazhenin
bf01fbc69d Add actions 2024-02-25 15:40:56 +03:00
Ivan Vazhenin
84dd6e5537 Make service configs 2024-02-25 13:36:35 +03:00
Ivan Vazhenin
0910a2b780 Make service configs 2024-02-18 18:40:23 +03:00
Ivan Vazhenin
0a9c887b7e Fix 2024-02-13 20:30:29 +03:00
Ivan Vazhenin
03db4ece92 Fix 2024-02-13 16:02:26 +03:00
Ivan Vazhenin
7cb5a93c90 Fix 2024-02-13 15:54:33 +03:00
Ivan Vazhenin
2898bfd7a2 Add s3 files processing 2024-02-12 20:08:58 +03:00
Ivan Vazhenin
90f7965242 Update config 2024-02-12 18:15:03 +03:00
Ivan Vazhenin
966200132f Update config 2024-02-05 20:25:45 +03:00
Ivan Vazhenin
b223733b72 Correcting replication 2023-12-24 18:28:29 +03:00
Ivan Vazhenin
794ad96bf3 Correcting replication 2023-12-06 18:59:39 +03:00
Ivan Vazhenin
81aac15ec6 Add ws rabbit config 2023-12-04 20:01:46 +03:00
Ivan Vazhenin
436703641b Add branch using 2023-11-30 16:29:32 +03:00
Ivan Vazhenin
3841f50628 Add new scheme application 2023-11-27 20:37:53 +03:00
Ivan Vazhenin
92dd8162c5 Add logging to replication 2023-11-27 20:04:24 +03:00
Ivan Vazhenin
004eeb4b2b Add logging to replication 2023-11-22 20:32:08 +03:00
Ivan Vazhenin
5363cbc10f Use profiles for replication 2023-11-15 13:18:20 +07:00
Ivan Vazhenin
4b435e44ba Use profiles for replication 2023-11-15 13:11:47 +07:00
Ivan Vazhenin
a25a54eb92 Create dockerfile, changed apply_commits method 2023-11-15 10:58:45 +07:00
Ivan Vazhenin
5e9f427072 Update config schema for replication 2023-11-13 21:49:41 +07:00
Ivan Vazhenin
df96273efb Add new fields to config and cleardata 2023-11-10 22:08:31 +07:00
Ivan Vazhenin
45b04a635b Fix downloading file from s3 2023-11-10 09:50:51 +07:00
Ivan Vazhenin
3fd2ec4762 Fix downloading file from enserver 2023-11-09 19:53:10 +07:00
Ivan Vazhenin
36e23465a8 Fix bndname in replication 2023-11-09 10:41:43 +07:00
12 changed files with 645 additions and 57 deletions

View File

@@ -0,0 +1,23 @@
name: Build xmlrpcserver image
run-name: Build xmlrpcserver image
on: [push]
jobs:
Build:
runs-on: ubuntu-latest
steps:
- uses: https://github.com/actions/checkout@v4
- name: Set up Docker Buildx
uses: https://github.com/docker/setup-buildx-action@v3
with:
config-inline: |
[registry."10.10.8.83:32000"]
http = true
insecure = true
- name: Build and push Docker image
uses: https://github.com/docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: "10.10.8.83:32000/xmlrpc:latest"

14
Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
FROM reg.ivazh.ru/infra-oodb
WORKDIR /app
COPY . ./
RUN cd deps/pygost-5.12/ && \
python3 setup.py install && \
cd ../.. && \
pip3 install -r requirements.txt && \
mkdir -p /opt/tnt/bin && \
ln -s /usr/bin/python3 /opt/tnt/bin/python3
ENV LD_LIBRARY_PATH "/app"
ENV PYTHONPATH "${PYTHONPATH}:/app"
EXPOSE 9000
EXPOSE 8000
CMD ["python3", "main.py"]

View File

@@ -1,8 +1,8 @@
class Config:
ret_path: str = 'http://10.10.8.81:9000/'
ret_path: str = 'http://10.10.8.83:32200/'
self_bnd: str = 'bnd127'
enserver: str = 'http://127.0.0.1:7000/xmlrpc'
enserver: str = 'http://10.10.8.83:32210/xmlrpc'
remote_bnd: str = 'bnd128'
pg_host: str = '10.10.8.83'
@@ -21,7 +21,19 @@ class Config:
rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f'
rabbit_queue: str = 'ipd'
ws_rabbit_params: dict = {
'host': '10.10.8.83',
'port': 31005,
'exchange': 'ipd',
'user': 'user',
'password': 'password',
}
s3_endpoint: str = 'http://10.10.8.83:31006'
s3_key_id: str = 's57'
s3_access_key: str = 'd9MMinLF3U8TLSj'
s3_bucket: str = 'files'
gql_url: str = 'https://gql.ivazh.ru/graphql'
gql_download: str = 'https://gql.ivazh.ru/item/{key}'
gql_schema: str = 'pdim'

12
config/response2.json Normal file
View File

@@ -0,0 +1,12 @@
{
"filestorage": {
"type": "s3",
"endpoint": "http://10.10.8.83:31006",
"key_id": "s57",
"access_key": "d9MMinLF3U8TLSj",
"download_path": "/tmp"
},
"file_code": "c1000",
"name_code": "c122",
"use_version": true
}

167
config/workspaces.json Normal file
View File

@@ -0,0 +1,167 @@
{
"databases": {
"oodb_git": {
"host": "10.10.8.83",
"port": 32100,
"database": "db",
"user": "postgres",
"password": "Root12345678"
}
},
"workspaces": {
"documents_src": {
"type": "documents",
"group": "src",
"database": "oodb_git",
"schema": "documents_src",
"alias": "Документы исходная"
},
"documents_standard": {
"type": "documents",
"group": "order",
"database": "oodb_git",
"schema": "documents_standard",
"alias": "Документы эталон"
},
"documents_standard_pub": {
"type": "documents",
"group": "order",
"database": "oodb_git",
"schema": "documents_standard",
"alias": "Документы публичная"
},
"ood": {
"type": "npd",
"group": "src",
"database": "oodb_git",
"schema": "ood",
"alias": "ООБД исходные НПД",
"map_service": "VUE_APP_GISAIS_URL:/styles/ood/style.json"
},
"oodb": {
"type": "oodb",
"group": "order",
"database": "oodb_git",
"schema": "kartap",
"alias": "ООДБ эталон",
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb/style.json"
},
"oodb_standard": {
"type": "oodb",
"group": "forming_standard",
"database": "oodb_git",
"schema": "kartap",
"alias": "ООДБ эталон",
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb/style.json"
},
"oodb_working": {
"type": "oodb",
"group": "forming_work",
"database": "oodb_git",
"schema": "kartap",
"alias": "ООДБ рабочая",
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb_tech/style.json"
},
"oodb_pub": {
"type": "oodb",
"group": "order",
"database": "oodb_git",
"schema": "kartap",
"alias": "ООБД публичная",
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb/style.json"
},
"regions": {
"type": "regions",
"database": "oodb_git",
"schema": "regions_hard",
"alias": "Регионы",
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/regions/style.json"
},
"regions_contour": {
"type": "regions",
"database": "oodb_git",
"schema": "regions_hard",
"alias": "Регионы",
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/regions_contour/style.json"
},
"npd_9": {
"type": "npd",
"database": "oodb_git",
"schema": "npd_9",
"alias": "НПД 9.0"
},
"npd": {
"type": "npd",
"database": "oodb_git",
"schema": "initial",
"alias": "НПД 9.0"
},
"npd_831": {
"type": "npd",
"group": "order",
"database": "oodb_git",
"schema": "npd_831",
"alias": "НПД 8.31"
},
"updater_test": {
"type": "npd",
"group": "order",
"database": "oodb_git",
"schema": "npd_831_test",
"alias": "НПД 8.31 публичная"
},
"lukoil": {
"type": "oodb",
"database": "oodb_git",
"schema": "lukoil",
"alias": "ЛУКОЙЛ",
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/lukoil/style.json"
},
"geocover": {
"type": "ecpz",
"group": "order",
"database": "oodb_git",
"schema": "coverage",
"alias": "ЕЦПЗ"
},
"geocover_test": {
"type": "ecpz",
"database": "oodb_git",
"schema": "coverage",
"alias": "ЕЦПЗ тест"
},
"gcmr": {
"type": "gcmr",
"group": "order",
"database": "oodb_git",
"schema": "gcmr",
"alias": "ГЦМР"
},
"orders": {
"type": "system",
"database": "oodb_git",
"schema": "orders",
"alias": "Заказы"
},
"ilo": {
"type": "system",
"database": "oodb_git",
"schema": "ilo",
"alias": "ИЛО"
},
"raz_sgok": {
"type": "raz_sgok",
"database": "razsgok",
"schema": "razsgok",
"alias": "СГОК",
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/raz_sgok/style.json"
},
"raz_vtu": {
"type": "raz_vtu",
"database": "razvtu",
"schema": "razvtu",
"alias": "ВТУ",
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/raz_vtu/style.json"
}
}
}

34
db.py
View File

@@ -2,6 +2,7 @@ from datetime import datetime
from typing import List, Optional
from sqlalchemy import create_engine, String, select, ForeignKey, Enum
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship
from config import Config
def tow(day: int, hour: int, minute: int):
@@ -27,6 +28,10 @@ class User(Base):
back_populates='user', cascade='all, delete-orphan'
)
income_branches: Mapped[List['IncomeBranch']] = relationship(
back_populates='user', cascade='all, delete-orphan'
)
schedule: Mapped[List['Schedule']] = relationship(
back_populates='user', cascade='all, delete-orphan'
)
@@ -49,6 +54,7 @@ class User(Base):
'profiles': [x.to_dict() for x in self.profiles],
'schedule': [x.to_dict() for x in self.schedule],
'queue': [x.to_dict() for x in self.queue],
'income_branches': [x.to_dict() for x in self.income_branches],
}
def is_active_now(self):
@@ -69,17 +75,41 @@ class Profile(Base):
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
scheme: Mapped[str]
json: Mapped[str]
branch: Mapped[str] = mapped_column(String, nullable=True)
json: Mapped[str] = mapped_column(String, nullable=True)
no_files: Mapped[bool]
user: Mapped['User'] = relationship(back_populates='profiles')
def to_dict(self) -> dict:
return {
'id': self.id,
'scheme': self.scheme,
'branch': self.branch,
'json': self.json,
}
class IncomeBranch(Base):
__tablename__ = 'income_branches'
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
scheme: Mapped[str]
branch: Mapped[str]
local_scheme: Mapped[str]
user: Mapped['User'] = relationship(back_populates='income_branches')
def to_dict(self) -> dict:
return {
'id': self.id,
'scheme': self.scheme,
'branch': self.branch,
'local_scheme': self.local_scheme,
}
class Schedule(Base):
__tablename__ = 'schedule'
@@ -133,4 +163,4 @@ class Schemas(Base):
def connect_db():
return create_engine("postgresql+psycopg://postgres:Root12345678@10.10.8.83:32101/db")
return create_engine(f"postgresql+psycopg://{Config.pg_username}:{Config.pg_password}@{Config.pg_host}:{Config.pg_port}/{Config.pg_dbname}")

Binary file not shown.

9
infra.py Normal file
View File

@@ -0,0 +1,9 @@
from libcommon import *
from libdatabase import *
from libgeodata import *
from libgeodriver import *
from libgeodesy import *
from libgeom import *
from libipdutilities import *
from liboodriver import *

389
main.py
View File

@@ -1,7 +1,7 @@
import asyncio
from tempfile import TemporaryDirectory
from typing import Optional, Any
import pika
import sys
import threading
import time
from queue import Queue
@@ -11,11 +11,11 @@ from uuid import uuid4, UUID
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
import logging
import os
import io
import zlib
import os.path
import requests
from reqs.graphql import get_catalog, get_object
from reqs_graphql import get_catalog, get_object
from pygost import gost34112012256
import xml.etree.ElementTree as ET
from reqs.request_xml_service import RequestXmlService
@@ -26,18 +26,28 @@ import boto3
import db
from sqlalchemy.orm import Session
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing_extensions import Annotated
import pathlib
from infra import *
from shutil import make_archive
NEW_REPLICATION_REQUEST = 99
NEW_COMMIT_REQUEST = 98
NEW_COMMIT_RESPONSE = 1098
NEW_DATA_REQUEST = 97
NEW_DATA_RESPONSE = 1097
tasks = Queue()
connected = set()
connection: Optional[pika.BlockingConnection] = None
channel: Optional[Any] = None
server: Optional[SimpleXMLRPCServer] = None
logger = logging.getLogger('xmlrpcserver')
@@ -61,6 +71,24 @@ def upload_file(filename: str, key: str, bucket: str):
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
def get_branch(bndname: str, scheme: str):
conn = db.connect_db()
with Session(conn) as session:
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
if item:
return item.branch, item.local_scheme
return None, None
def get_profile(bndname: str, scheme: str):
conn = db.connect_db()
with Session(conn) as session:
item = session.query(db.Profile).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
if item:
return item
return None
def run_tasks():
logger.debug('Task thread started.')
while True:
@@ -70,17 +98,17 @@ def run_tasks():
def replication_task():
while True:
print()
conn = db.connect_db()
# logging.warning(connected)
with Session(conn) as session:
for bndname in connected:
for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all():
if item.user.is_active_now():
if item.user.newbnd:
replication(bndname, item.commit_id, item.schema)
else:
replication_old(bndname, item.commit_id, item.schema)
session.delete(item)
for item in session.query(db.Queue).join(db.Queue.user).all():
bndname = item.user.bndname
if item.user.is_active_now() and bndname != Config.self_bnd:
if item.user.newbnd:
replication(bndname, item.commit_id, item.schema)
else:
replication_old(bndname, item.commit_id, item.schema)
session.delete(item)
session.commit()
time.sleep(60)
@@ -109,7 +137,7 @@ def crc32(filename, chunk_size=65536):
return "%08X" % (checksum & 0xFFFFFFFF)
def send_object_replication(ws, ids):
def send_object_replication(bndname, ws, ids):
qu = GroupQuery(Envelope())
query = GroupQuery(Envelope.world())
query.setUids(ids)
@@ -143,12 +171,12 @@ def send_object_replication(ws, ids):
'Name': c1000[0]['fileName'],
'Type': 'OOD',
'metadata_version': '1',
'source': 'bnd127',
'source': Config.self_bnd,
'system_date': str(created_date.timestamp()),
})
xml_version = ET.SubElement(xml_objects, 'version', {
'object_id': chart['uid'].replace('-', ''),
'source': 'bnd127',
'source': Config.self_bnd,
'system_date': str(created_date.timestamp()),
'version': '1.0',
'version_id': chart['uid'].replace('-', ''),
@@ -197,7 +225,7 @@ def send_object_replication(ws, ids):
})
params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{Config.remote_bnd}',
'to': f'tcp://{bndname}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
@@ -226,22 +254,25 @@ def replication_old(bnd_name: str, commit_id: str, schema: str):
logger.warning(res)
created, updated, _ = ws.changes(commit_id)
ids = list(set(created) | set(updated))
send_object_replication(ws, ids)
send_object_replication(bnd_name, ws, ids)
ws.clearData(True)
logger.warning('Replication to old bnd is sent')
def replication(bnd_name: str, commit_id: str, schema: str):
logging.warning(f'{bnd_name} {commit_id} {schema}')
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
profile = get_profile(bndname=bnd_name, scheme=schema)
res = rxmls.get_request_document(res_id, None)
rxmls.set_result(res, 0, '')
ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema})
response_params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{Config.remote_bnd}',
'to': f'tcp://{bnd_name}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
@@ -287,20 +318,23 @@ def replication(bnd_name: str, commit_id: str, schema: str):
qu.setUids(created)
ws.load(qu, uids)
exported_files = []
for feature_uid in uids:
feature = ws.featureByUid(feature_uid)
if not feature:
continue
for attr in feature.attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# updated_files.append(feature_uid)
for x in updated_files:
exported_files.append({
'key': x.key,
'bucket': x.bucket,
'filename': x.fileName,
})
download_file(x.key, x.bucket, os.path.join(z.dirname, x.key))
if not profile or not profile.no_files:
for feature_uid in uids:
feature = ws.featureByUid(feature_uid)
if not feature:
continue
for attr in feature.attributes('c1000'):
updated_files.append(variantToFileValue(attr.val()))
# updated_files.append(feature_uid)
for x in updated_files:
exported_files.append({
'key': x.key,
'bucket': x.bucket,
'filename': x.fileName,
})
fp = os.path.join(z.dirname, x.key)
os.makedirs(os.path.dirname(fp), exist_ok=True)
download_file(x.key, x.bucket, fp)
with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f:
f.write(json.dumps(exported_files))
ws.clearData(True)
@@ -318,18 +352,26 @@ def replication(bnd_name: str, commit_id: str, schema: str):
def pika_callback(ch, method, properties, body):
commit_info = json.loads(body)
logging.warning(commit_info)
schema = commit_info.get('schema') or Config.oodb_schema
commit = commit_info['commit']
conn = db.connect_db()
with Session(conn) as session:
for user in session.query(db.User).filter(db.User.active == True, db.User.upstream == False).all():
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
session.add(item)
for user in session.query(db.User).filter(db.User.active == True).all():
if user.bndname == Config.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
logging.warning(item)
session.add(item)
session.commit()
ch.basic_ack(delivery_tag=method.delivery_tag)
def pika_task():
global connection
global channel
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel = connection.channel()
channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
@@ -337,6 +379,7 @@ def pika_task():
channel.start_consuming()
def list_contents(dir_name):
logger.warning('list_contents(%s)', dir_name)
return os.listdir(dir_name)
@@ -436,7 +479,7 @@ def get_objects(params, files, url):
for file in obj['properties'].get('c1000', []):
if not main_filename:
main_filename = file['fileName']
res = requests.get(f'https://gql.ivazh.ru/item/{file["key"]}')
res = requests.get(Config.gql_download, params={'item_id': file["key"]})
zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content)
zipf.close()
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
@@ -537,34 +580,187 @@ def put_object(params, files, url):
ws.transaction()
res = ws.save()
ws.commit(f'Putobject from {params["to"]}')
ws.clearData(True)
def apply_commits(params, files, url):
logger.warning(params, files, url)
logging.warning("Apply commits")
logging.warning(params)
assert len(files) == 1
file = files[0]
dir = TemporaryDirectory()
with zipfile.ZipFile(file, 'r') as zip_ref:
r = requests.get(file['url'])
with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref:
zip_ref.extractall(dir.name)
req = ET.fromstring(params['query_data'])
repl = req.find('replication')
scheme = repl.get('scheme')
commit = repl.get('id')
logger.warning(scheme, commit)
os.path.join(dir.name, 'export.o5c')
bnd = params['from'].replace('tcp://', '')
branch, new_scheme = get_branch(bnd, scheme)
if new_scheme:
scheme = new_scheme
con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, scheme)
ws = OODBWorkspace.ws(scheme)
logging.warning("Connected to schema")
if Config.ws_rabbit_params and not ws.hasProducer():
ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'],
Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'],
Config.ws_rabbit_params['password'])
ws.setCurrentUser(bnd)
logging.warning("Set branch if needed")
if branch:
logging.warning(branch)
ws.switchBranch(branch)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
oe = IpdExporter(ws)
oe.improtFromOsm(os.path.join(dir.name, 'export.o5c'))
oe.improtFromMbtiles(os.path.join(dir.name, 'export.mbtiles'))
with open('export_files.json', 'r') as f:
files_data = json.load(f)
for file_data in files_data:
upload_file(file_data['filename'], file_data['key'], file_data['bucket'])
logging.warning("Importing...")
if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')):
logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}')
else:
logging.warning(f'Importing commit {commit} finished successfully')
with open(os.path.join(dir.name, 'export_files.json'), 'r') as f:
files_data = json.load(f)
for file_data in files_data:
upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket'])
logging.warning("Finished import")
ws.clearData(True)
def get_commit(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res_doc = rxmls.get_request_document(res_id, req_id)
schema = req.find('currentCommit').get('scheme')
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema})
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': 0,
'query_type': NEW_COMMIT_RESPONSE,
'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True)
}
proxy = ServerProxy(url)
proxy.send(response_params, [], Config.ret_path)
def query_commits(params, files, url):
req = ET.fromstring(params['query_data'])
commit_el = req.find('commit')
commit_id = commit_el.get('id')
schema = commit_el.get('schema')
bnd = params['from'].replace('tcp://', '')
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
schema_commits = ws.commits(ws.branch())
logger.warning(schema_commits)
if commit_id not in schema_commits:
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
return
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
conn = db.connect_db()
with Session(conn) as session:
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
if user.bndname == Config.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
logging.warning(item)
session.add(item)
session.commit()
def get_data(params, files, url):
date = datetime.datetime.now()
rxmls = RequestXmlService()
req = ET.fromstring(params['query_data'])
req_id = rxmls.get_request_uuid(req)
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, req_id)
rxmls.set_result(res, 0, '')
request_string = req.find('data').text
OODBWorkspaceFactory.init('config/workspaces.json')
conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json')
worker = ResponseWorker(conf)
fn = uuid4().hex
dir = os.path.join(os.getcwd(), 'tmp', fn)
os.makedirs(dir)
worker.makeResponse(request_string, dir)
make_archive(dir, 'zip', dir, '.')
rxmls.set_result(res, 0, '')
response_params = {
'from': params['to'],
'to': params['from'],
'ts_added': date.timestamp(),
'user_id': '1',
'user_id_to': params['user_id'],
'query_type': NEW_DATA_RESPONSE,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
}
filename = fn + '.zip'
filepath = os.path.join(os.getcwd(), 'tmp', filename)
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
proxy = ServerProxy(url)
proxy.send(response_params, response_files, Config.ret_path)
def receive_data(params, files, url):
req = ET.fromstring(params['query_data'])
commit_el = req.find('commit')
commit_id = commit_el.get('id')
schema = commit_el.get('schema')
bnd = params['from'].replace('tcp://', '')
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
schema_commits = ws.commits(ws.branch())
logger.warning(schema_commits)
if commit_id not in schema_commits:
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
return
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
conn = db.connect_db()
with Session(conn) as session:
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
if user.bndname == Config.self_bnd:
continue
profiles = {x.scheme: x.to_dict() for x in user.profiles}
if len(profiles) == 0 or schema in profiles:
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
logging.warning(item)
session.add(item)
session.commit()
def run_task(query_type, params, files, url):
@@ -578,6 +774,14 @@ def run_task(query_type, params, files, url):
tasks.put(lambda: put_object(params, files, url))
if query_type == NEW_REPLICATION_REQUEST:
tasks.put(lambda: apply_commits(params, files, url))
if query_type == NEW_COMMIT_REQUEST:
tasks.put(lambda: get_commit(params, files, url))
if query_type == NEW_COMMIT_RESPONSE:
tasks.put(lambda: query_commits(params, files, url))
if query_type == NEW_DATA_REQUEST:
tasks.put(lambda: get_data(params, files, url))
if query_type == NEW_DATA_RESPONSE:
tasks.put(lambda: receive_data(params, files, url))
def accept(params, files, url):
@@ -608,7 +812,8 @@ def bnd_disconnected(bnd_name: str):
def xmlrpc_task():
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=False, allow_none=True)
global server
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=True, allow_none=True)
server.register_function(list_contents)
server.register_function(aud_add)
server.register_function(auth_response)
@@ -622,6 +827,12 @@ def xmlrpc_task():
app = FastAPI()
app.add_middleware(CORSMiddleware,
allow_origins=['http://10.10.8.24:3000'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*']
)
@app.get("/")
@@ -675,8 +886,67 @@ async def fa_put_object(response: Response,
put_object(request_params, files, None)
return {"Upload": "Ok"}
@app.get("/cr")
async def correction_replication(bnd_name: str, schema: str):
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
ET.SubElement(res, 'currentCommit', {'scheme': schema})
params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{bnd_name}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_COMMIT_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
proxy = ServerProxy(Config.enserver)
try:
proxy.send(params, [], Config.ret_path)
except:
logger.error('Error sending')
@app.get("/get_cr")
async def correction_replication(bnd_name: str, schema: str):
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
ET.SubElement(res, 'commit', {'id': ws.currentCommit(), 'schema': schema})
params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{bnd_name}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_COMMIT_RESPONSE,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
proxy = ServerProxy(Config.enserver)
try:
proxy.send(params, [], Config.ret_path)
except:
logger.error('Error sending')
def main():
global connection
global server
global channel
logger.setLevel(logging.INFO)
logger.warning('Use Control-C to exit')
@@ -694,9 +964,11 @@ def main():
try:
logger.warning('Start server')
uvicorn.run(app, host="0.0.0.0", port=80)
uvicorn.run(app, host="0.0.0.0", port=8000)
except KeyboardInterrupt:
logger.warning('Exiting')
finally:
server.server_close()
def vers_key(e):
@@ -721,6 +993,8 @@ def test():
# Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
# ws = OODBWorkspace.ws(Config.oodb_schema)
# ws.init(con)
# print(ws.currentCommit())
# print(ws.commits(ws.branch()))
# created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
# qu = GroupQuery(Envelope())
# qu.setUids(updated)
@@ -740,7 +1014,22 @@ def test():
# print(updated_files)
# ws.clearData(True)
# ws.close()
replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
#replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
request_string = """
{
"type": "group_query",
"data_source": "npd",
"uids": [ "2e20130c-541a-4b9f-9efb-f2a0e8b10c33" ]
}
"""
OODBWorkspaceFactory.init('config/workspaces.json')
conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json')
worker = ResponseWorker(conf)
fn = uuid4().hex
dir = os.path.join('tmp', fn)
os.makedirs(dir)
worker.makeResponse(request_string, dir)
make_archive(dir, 'zip', dir, '.')
pass

View File

@@ -1,8 +1,9 @@
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from config import Config
transport = AIOHTTPTransport(url="https://gql.ivazh.ru/graphql/")
service = 'pdim'
transport = AIOHTTPTransport(url=Config.gql_url)
service = Config.gql_schema
def get_classifier():

View File

@@ -18,7 +18,6 @@ graphql-core==3.3.0a3
greenlet==2.0.2
h11==0.14.0
idna==3.4
infra
jmespath==1.0.1
multidict==6.0.4
pika==1.3.2

32
test.py Normal file
View File

@@ -0,0 +1,32 @@
import db
from sqlalchemy.orm import Session
def get_branch(bndname: str, scheme: str):
conn = db.connect_db()
with Session(conn) as session:
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
if item:
return item.branch
return None
def is_replication_scheme(bndname: str, scheme: str):
conn = db.connect_db()
with Session(conn) as session:
item = session.query(db.User).filter_by(bndname=bndname).one_or_none()
if not item:
return False
profiles = {x.scheme: x.to_dict() for x in item.profiles}
if len(profiles) == 0 or scheme in profiles:
return True
return False
def main():
# print(get_branch('bnd128', 'ood'))
print(is_replication_scheme('bnd128', 'documents_src'))
if __name__ == '__main__':
main()