Compare commits
30 Commits
5b17bba5bf
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cdaccb911b | ||
|
|
219b597042 | ||
|
|
201b2d2da6 | ||
|
|
a742774ae1 | ||
|
|
3230931e4e | ||
|
|
4078dbd84c | ||
|
|
bf01fbc69d | ||
|
|
84dd6e5537 | ||
|
|
0910a2b780 | ||
|
|
0a9c887b7e | ||
|
|
03db4ece92 | ||
|
|
7cb5a93c90 | ||
|
|
2898bfd7a2 | ||
|
|
90f7965242 | ||
|
|
966200132f | ||
|
|
b223733b72 | ||
|
|
794ad96bf3 | ||
|
|
81aac15ec6 | ||
|
|
436703641b | ||
|
|
3841f50628 | ||
|
|
92dd8162c5 | ||
|
|
004eeb4b2b | ||
|
|
5363cbc10f | ||
|
|
4b435e44ba | ||
|
|
a25a54eb92 | ||
|
|
5e9f427072 | ||
|
|
df96273efb | ||
|
|
45b04a635b | ||
|
|
3fd2ec4762 | ||
|
|
36e23465a8 |
23
.gitea/workflows/build.yaml
Normal file
23
.gitea/workflows/build.yaml
Normal file
@@ -0,0 +1,23 @@
|
||||
name: Build xmlrpcserver image
|
||||
run-name: Build xmlrpcserver image
|
||||
on: [push]
|
||||
|
||||
jobs:
|
||||
Build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: https://github.com/actions/checkout@v4
|
||||
- name: Set up Docker Buildx
|
||||
uses: https://github.com/docker/setup-buildx-action@v3
|
||||
with:
|
||||
config-inline: |
|
||||
[registry."10.10.8.83:32000"]
|
||||
http = true
|
||||
insecure = true
|
||||
- name: Build and push Docker image
|
||||
uses: https://github.com/docker/build-push-action@v5
|
||||
with:
|
||||
context: .
|
||||
file: ./Dockerfile
|
||||
push: true
|
||||
tags: "10.10.8.83:32000/xmlrpc:latest"
|
||||
14
Dockerfile
Normal file
14
Dockerfile
Normal file
@@ -0,0 +1,14 @@
|
||||
FROM reg.ivazh.ru/infra-oodb
|
||||
WORKDIR /app
|
||||
COPY . ./
|
||||
RUN cd deps/pygost-5.12/ && \
|
||||
python3 setup.py install && \
|
||||
cd ../.. && \
|
||||
pip3 install -r requirements.txt && \
|
||||
mkdir -p /opt/tnt/bin && \
|
||||
ln -s /usr/bin/python3 /opt/tnt/bin/python3
|
||||
ENV LD_LIBRARY_PATH "/app"
|
||||
ENV PYTHONPATH "${PYTHONPATH}:/app"
|
||||
EXPOSE 9000
|
||||
EXPOSE 8000
|
||||
CMD ["python3", "main.py"]
|
||||
16
config.py
16
config.py
@@ -1,8 +1,8 @@
|
||||
|
||||
class Config:
|
||||
ret_path: str = 'http://10.10.8.81:9000/'
|
||||
ret_path: str = 'http://10.10.8.83:32200/'
|
||||
self_bnd: str = 'bnd127'
|
||||
enserver: str = 'http://127.0.0.1:7000/xmlrpc'
|
||||
enserver: str = 'http://10.10.8.83:32210/xmlrpc'
|
||||
remote_bnd: str = 'bnd128'
|
||||
|
||||
pg_host: str = '10.10.8.83'
|
||||
@@ -21,7 +21,19 @@ class Config:
|
||||
rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f'
|
||||
rabbit_queue: str = 'ipd'
|
||||
|
||||
ws_rabbit_params: dict = {
|
||||
'host': '10.10.8.83',
|
||||
'port': 31005,
|
||||
'exchange': 'ipd',
|
||||
'user': 'user',
|
||||
'password': 'password',
|
||||
}
|
||||
|
||||
s3_endpoint: str = 'http://10.10.8.83:31006'
|
||||
s3_key_id: str = 's57'
|
||||
s3_access_key: str = 'd9MMinLF3U8TLSj'
|
||||
s3_bucket: str = 'files'
|
||||
|
||||
gql_url: str = 'https://gql.ivazh.ru/graphql'
|
||||
gql_download: str = 'https://gql.ivazh.ru/item/{key}'
|
||||
gql_schema: str = 'pdim'
|
||||
|
||||
12
config/response2.json
Normal file
12
config/response2.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"filestorage": {
|
||||
"type": "s3",
|
||||
"endpoint": "http://10.10.8.83:31006",
|
||||
"key_id": "s57",
|
||||
"access_key": "d9MMinLF3U8TLSj",
|
||||
"download_path": "/tmp"
|
||||
},
|
||||
"file_code": "c1000",
|
||||
"name_code": "c122",
|
||||
"use_version": true
|
||||
}
|
||||
167
config/workspaces.json
Normal file
167
config/workspaces.json
Normal file
@@ -0,0 +1,167 @@
|
||||
{
|
||||
"databases": {
|
||||
"oodb_git": {
|
||||
"host": "10.10.8.83",
|
||||
"port": 32100,
|
||||
"database": "db",
|
||||
"user": "postgres",
|
||||
"password": "Root12345678"
|
||||
}
|
||||
},
|
||||
"workspaces": {
|
||||
"documents_src": {
|
||||
"type": "documents",
|
||||
"group": "src",
|
||||
"database": "oodb_git",
|
||||
"schema": "documents_src",
|
||||
"alias": "Документы исходная"
|
||||
},
|
||||
"documents_standard": {
|
||||
"type": "documents",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "documents_standard",
|
||||
"alias": "Документы эталон"
|
||||
},
|
||||
"documents_standard_pub": {
|
||||
"type": "documents",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "documents_standard",
|
||||
"alias": "Документы публичная"
|
||||
},
|
||||
"ood": {
|
||||
"type": "npd",
|
||||
"group": "src",
|
||||
"database": "oodb_git",
|
||||
"schema": "ood",
|
||||
"alias": "ООБД исходные НПД",
|
||||
"map_service": "VUE_APP_GISAIS_URL:/styles/ood/style.json"
|
||||
},
|
||||
"oodb": {
|
||||
"type": "oodb",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "kartap",
|
||||
"alias": "ООДБ эталон",
|
||||
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb/style.json"
|
||||
},
|
||||
"oodb_standard": {
|
||||
"type": "oodb",
|
||||
"group": "forming_standard",
|
||||
"database": "oodb_git",
|
||||
"schema": "kartap",
|
||||
"alias": "ООДБ эталон",
|
||||
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb/style.json"
|
||||
},
|
||||
"oodb_working": {
|
||||
"type": "oodb",
|
||||
"group": "forming_work",
|
||||
"database": "oodb_git",
|
||||
"schema": "kartap",
|
||||
"alias": "ООДБ рабочая",
|
||||
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb_tech/style.json"
|
||||
},
|
||||
"oodb_pub": {
|
||||
"type": "oodb",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "kartap",
|
||||
"alias": "ООБД публичная",
|
||||
"map_service": "VUE_APP_GISAIS_URL:/styles/oodb/style.json"
|
||||
},
|
||||
"regions": {
|
||||
"type": "regions",
|
||||
"database": "oodb_git",
|
||||
"schema": "regions_hard",
|
||||
"alias": "Регионы",
|
||||
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/regions/style.json"
|
||||
},
|
||||
"regions_contour": {
|
||||
"type": "regions",
|
||||
"database": "oodb_git",
|
||||
"schema": "regions_hard",
|
||||
"alias": "Регионы",
|
||||
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/regions_contour/style.json"
|
||||
},
|
||||
"npd_9": {
|
||||
"type": "npd",
|
||||
"database": "oodb_git",
|
||||
"schema": "npd_9",
|
||||
"alias": "НПД 9.0"
|
||||
},
|
||||
"npd": {
|
||||
"type": "npd",
|
||||
"database": "oodb_git",
|
||||
"schema": "initial",
|
||||
"alias": "НПД 9.0"
|
||||
},
|
||||
"npd_831": {
|
||||
"type": "npd",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "npd_831",
|
||||
"alias": "НПД 8.31"
|
||||
},
|
||||
"updater_test": {
|
||||
"type": "npd",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "npd_831_test",
|
||||
"alias": "НПД 8.31 публичная"
|
||||
},
|
||||
"lukoil": {
|
||||
"type": "oodb",
|
||||
"database": "oodb_git",
|
||||
"schema": "lukoil",
|
||||
"alias": "ЛУКОЙЛ",
|
||||
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/lukoil/style.json"
|
||||
},
|
||||
"geocover": {
|
||||
"type": "ecpz",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "coverage",
|
||||
"alias": "ЕЦПЗ"
|
||||
},
|
||||
"geocover_test": {
|
||||
"type": "ecpz",
|
||||
"database": "oodb_git",
|
||||
"schema": "coverage",
|
||||
"alias": "ЕЦПЗ тест"
|
||||
},
|
||||
"gcmr": {
|
||||
"type": "gcmr",
|
||||
"group": "order",
|
||||
"database": "oodb_git",
|
||||
"schema": "gcmr",
|
||||
"alias": "ГЦМР"
|
||||
},
|
||||
"orders": {
|
||||
"type": "system",
|
||||
"database": "oodb_git",
|
||||
"schema": "orders",
|
||||
"alias": "Заказы"
|
||||
},
|
||||
"ilo": {
|
||||
"type": "system",
|
||||
"database": "oodb_git",
|
||||
"schema": "ilo",
|
||||
"alias": "ИЛО"
|
||||
},
|
||||
"raz_sgok": {
|
||||
"type": "raz_sgok",
|
||||
"database": "razsgok",
|
||||
"schema": "razsgok",
|
||||
"alias": "СГОК",
|
||||
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/raz_sgok/style.json"
|
||||
},
|
||||
"raz_vtu": {
|
||||
"type": "raz_vtu",
|
||||
"database": "razvtu",
|
||||
"schema": "razvtu",
|
||||
"alias": "ВТУ",
|
||||
"map_service": "VUE_APP_GISAIS_URL_GK:/styles/raz_vtu/style.json"
|
||||
}
|
||||
}
|
||||
}
|
||||
34
db.py
34
db.py
@@ -2,6 +2,7 @@ from datetime import datetime
|
||||
from typing import List, Optional
|
||||
from sqlalchemy import create_engine, String, select, ForeignKey, Enum
|
||||
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship
|
||||
from config import Config
|
||||
|
||||
|
||||
def tow(day: int, hour: int, minute: int):
|
||||
@@ -27,6 +28,10 @@ class User(Base):
|
||||
back_populates='user', cascade='all, delete-orphan'
|
||||
)
|
||||
|
||||
income_branches: Mapped[List['IncomeBranch']] = relationship(
|
||||
back_populates='user', cascade='all, delete-orphan'
|
||||
)
|
||||
|
||||
schedule: Mapped[List['Schedule']] = relationship(
|
||||
back_populates='user', cascade='all, delete-orphan'
|
||||
)
|
||||
@@ -49,6 +54,7 @@ class User(Base):
|
||||
'profiles': [x.to_dict() for x in self.profiles],
|
||||
'schedule': [x.to_dict() for x in self.schedule],
|
||||
'queue': [x.to_dict() for x in self.queue],
|
||||
'income_branches': [x.to_dict() for x in self.income_branches],
|
||||
}
|
||||
|
||||
def is_active_now(self):
|
||||
@@ -69,17 +75,41 @@ class Profile(Base):
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
|
||||
scheme: Mapped[str]
|
||||
json: Mapped[str]
|
||||
branch: Mapped[str] = mapped_column(String, nullable=True)
|
||||
json: Mapped[str] = mapped_column(String, nullable=True)
|
||||
no_files: Mapped[bool]
|
||||
|
||||
user: Mapped['User'] = relationship(back_populates='profiles')
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'id': self.id,
|
||||
'scheme': self.scheme,
|
||||
'branch': self.branch,
|
||||
'json': self.json,
|
||||
}
|
||||
|
||||
|
||||
class IncomeBranch(Base):
|
||||
__tablename__ = 'income_branches'
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
|
||||
scheme: Mapped[str]
|
||||
branch: Mapped[str]
|
||||
local_scheme: Mapped[str]
|
||||
|
||||
user: Mapped['User'] = relationship(back_populates='income_branches')
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
'id': self.id,
|
||||
'scheme': self.scheme,
|
||||
'branch': self.branch,
|
||||
'local_scheme': self.local_scheme,
|
||||
}
|
||||
|
||||
|
||||
class Schedule(Base):
|
||||
__tablename__ = 'schedule'
|
||||
|
||||
@@ -133,4 +163,4 @@ class Schemas(Base):
|
||||
|
||||
|
||||
def connect_db():
|
||||
return create_engine("postgresql+psycopg://postgres:Root12345678@10.10.8.83:32101/db")
|
||||
return create_engine(f"postgresql+psycopg://{Config.pg_username}:{Config.pg_password}@{Config.pg_host}:{Config.pg_port}/{Config.pg_dbname}")
|
||||
|
||||
BIN
deps/pygost-5.12/dist/pygost-5.12-py3.11.egg
vendored
BIN
deps/pygost-5.12/dist/pygost-5.12-py3.11.egg
vendored
Binary file not shown.
9
infra.py
Normal file
9
infra.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from libcommon import *
|
||||
from libdatabase import *
|
||||
from libgeodata import *
|
||||
from libgeodriver import *
|
||||
from libgeodesy import *
|
||||
from libgeom import *
|
||||
from libipdutilities import *
|
||||
from liboodriver import *
|
||||
|
||||
389
main.py
389
main.py
@@ -1,7 +1,7 @@
|
||||
import asyncio
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Optional, Any
|
||||
|
||||
import pika
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from queue import Queue
|
||||
@@ -11,11 +11,11 @@ from uuid import uuid4, UUID
|
||||
from xmlrpc.server import SimpleXMLRPCServer
|
||||
from xmlrpc.client import ServerProxy
|
||||
import logging
|
||||
import os
|
||||
import io
|
||||
import zlib
|
||||
import os.path
|
||||
import requests
|
||||
from reqs.graphql import get_catalog, get_object
|
||||
from reqs_graphql import get_catalog, get_object
|
||||
from pygost import gost34112012256
|
||||
import xml.etree.ElementTree as ET
|
||||
from reqs.request_xml_service import RequestXmlService
|
||||
@@ -26,18 +26,28 @@ import boto3
|
||||
import db
|
||||
from sqlalchemy.orm import Session
|
||||
from fastapi import FastAPI, Response, Form, UploadFile, File, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
import uvicorn
|
||||
from typing_extensions import Annotated
|
||||
import pathlib
|
||||
from infra import *
|
||||
from shutil import make_archive
|
||||
|
||||
|
||||
NEW_REPLICATION_REQUEST = 99
|
||||
NEW_COMMIT_REQUEST = 98
|
||||
NEW_COMMIT_RESPONSE = 1098
|
||||
NEW_DATA_REQUEST = 97
|
||||
NEW_DATA_RESPONSE = 1097
|
||||
|
||||
|
||||
tasks = Queue()
|
||||
connected = set()
|
||||
|
||||
connection: Optional[pika.BlockingConnection] = None
|
||||
channel: Optional[Any] = None
|
||||
server: Optional[SimpleXMLRPCServer] = None
|
||||
|
||||
logger = logging.getLogger('xmlrpcserver')
|
||||
|
||||
|
||||
@@ -61,6 +71,24 @@ def upload_file(filename: str, key: str, bucket: str):
|
||||
client.put_object(Body=f.read(), Bucket=bucket, Key=key)
|
||||
|
||||
|
||||
def get_branch(bndname: str, scheme: str):
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
|
||||
if item:
|
||||
return item.branch, item.local_scheme
|
||||
return None, None
|
||||
|
||||
|
||||
def get_profile(bndname: str, scheme: str):
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
item = session.query(db.Profile).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
|
||||
if item:
|
||||
return item
|
||||
return None
|
||||
|
||||
|
||||
def run_tasks():
|
||||
logger.debug('Task thread started.')
|
||||
while True:
|
||||
@@ -70,17 +98,17 @@ def run_tasks():
|
||||
|
||||
def replication_task():
|
||||
while True:
|
||||
print()
|
||||
conn = db.connect_db()
|
||||
# logging.warning(connected)
|
||||
with Session(conn) as session:
|
||||
for bndname in connected:
|
||||
for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all():
|
||||
if item.user.is_active_now():
|
||||
if item.user.newbnd:
|
||||
replication(bndname, item.commit_id, item.schema)
|
||||
else:
|
||||
replication_old(bndname, item.commit_id, item.schema)
|
||||
session.delete(item)
|
||||
for item in session.query(db.Queue).join(db.Queue.user).all():
|
||||
bndname = item.user.bndname
|
||||
if item.user.is_active_now() and bndname != Config.self_bnd:
|
||||
if item.user.newbnd:
|
||||
replication(bndname, item.commit_id, item.schema)
|
||||
else:
|
||||
replication_old(bndname, item.commit_id, item.schema)
|
||||
session.delete(item)
|
||||
session.commit()
|
||||
time.sleep(60)
|
||||
|
||||
@@ -109,7 +137,7 @@ def crc32(filename, chunk_size=65536):
|
||||
return "%08X" % (checksum & 0xFFFFFFFF)
|
||||
|
||||
|
||||
def send_object_replication(ws, ids):
|
||||
def send_object_replication(bndname, ws, ids):
|
||||
qu = GroupQuery(Envelope())
|
||||
query = GroupQuery(Envelope.world())
|
||||
query.setUids(ids)
|
||||
@@ -143,12 +171,12 @@ def send_object_replication(ws, ids):
|
||||
'Name': c1000[0]['fileName'],
|
||||
'Type': 'OOD',
|
||||
'metadata_version': '1',
|
||||
'source': 'bnd127',
|
||||
'source': Config.self_bnd,
|
||||
'system_date': str(created_date.timestamp()),
|
||||
})
|
||||
xml_version = ET.SubElement(xml_objects, 'version', {
|
||||
'object_id': chart['uid'].replace('-', ''),
|
||||
'source': 'bnd127',
|
||||
'source': Config.self_bnd,
|
||||
'system_date': str(created_date.timestamp()),
|
||||
'version': '1.0',
|
||||
'version_id': chart['uid'].replace('-', ''),
|
||||
@@ -197,7 +225,7 @@ def send_object_replication(ws, ids):
|
||||
})
|
||||
params = {
|
||||
'from': f'tcp://{Config.self_bnd}',
|
||||
'to': f'tcp://{Config.remote_bnd}',
|
||||
'to': f'tcp://{bndname}',
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '0',
|
||||
'query_type': NEW_REPLICATION_REQUEST,
|
||||
@@ -226,22 +254,25 @@ def replication_old(bnd_name: str, commit_id: str, schema: str):
|
||||
logger.warning(res)
|
||||
created, updated, _ = ws.changes(commit_id)
|
||||
ids = list(set(created) | set(updated))
|
||||
send_object_replication(ws, ids)
|
||||
send_object_replication(bnd_name, ws, ids)
|
||||
ws.clearData(True)
|
||||
logger.warning('Replication to old bnd is sent')
|
||||
|
||||
|
||||
def replication(bnd_name: str, commit_id: str, schema: str):
|
||||
logging.warning(f'{bnd_name} {commit_id} {schema}')
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
res_id = uuid4().hex
|
||||
|
||||
profile = get_profile(bndname=bnd_name, scheme=schema)
|
||||
|
||||
res = rxmls.get_request_document(res_id, None)
|
||||
rxmls.set_result(res, 0, '')
|
||||
ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema})
|
||||
response_params = {
|
||||
'from': f'tcp://{Config.self_bnd}',
|
||||
'to': f'tcp://{Config.remote_bnd}',
|
||||
'to': f'tcp://{bnd_name}',
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '0',
|
||||
'query_type': NEW_REPLICATION_REQUEST,
|
||||
@@ -287,20 +318,23 @@ def replication(bnd_name: str, commit_id: str, schema: str):
|
||||
qu.setUids(created)
|
||||
ws.load(qu, uids)
|
||||
exported_files = []
|
||||
for feature_uid in uids:
|
||||
feature = ws.featureByUid(feature_uid)
|
||||
if not feature:
|
||||
continue
|
||||
for attr in feature.attributes('c1000'):
|
||||
updated_files.append(variantToFileValue(attr.val()))
|
||||
# updated_files.append(feature_uid)
|
||||
for x in updated_files:
|
||||
exported_files.append({
|
||||
'key': x.key,
|
||||
'bucket': x.bucket,
|
||||
'filename': x.fileName,
|
||||
})
|
||||
download_file(x.key, x.bucket, os.path.join(z.dirname, x.key))
|
||||
if not profile or not profile.no_files:
|
||||
for feature_uid in uids:
|
||||
feature = ws.featureByUid(feature_uid)
|
||||
if not feature:
|
||||
continue
|
||||
for attr in feature.attributes('c1000'):
|
||||
updated_files.append(variantToFileValue(attr.val()))
|
||||
# updated_files.append(feature_uid)
|
||||
for x in updated_files:
|
||||
exported_files.append({
|
||||
'key': x.key,
|
||||
'bucket': x.bucket,
|
||||
'filename': x.fileName,
|
||||
})
|
||||
fp = os.path.join(z.dirname, x.key)
|
||||
os.makedirs(os.path.dirname(fp), exist_ok=True)
|
||||
download_file(x.key, x.bucket, fp)
|
||||
with open(os.path.join(z.dirname, 'export_files.json'), 'w') as f:
|
||||
f.write(json.dumps(exported_files))
|
||||
ws.clearData(True)
|
||||
@@ -318,18 +352,26 @@ def replication(bnd_name: str, commit_id: str, schema: str):
|
||||
|
||||
def pika_callback(ch, method, properties, body):
|
||||
commit_info = json.loads(body)
|
||||
logging.warning(commit_info)
|
||||
schema = commit_info.get('schema') or Config.oodb_schema
|
||||
commit = commit_info['commit']
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
for user in session.query(db.User).filter(db.User.active == True, db.User.upstream == False).all():
|
||||
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||
session.add(item)
|
||||
for user in session.query(db.User).filter(db.User.active == True).all():
|
||||
if user.bndname == Config.self_bnd:
|
||||
continue
|
||||
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||
if len(profiles) == 0 or schema in profiles:
|
||||
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||
logging.warning(item)
|
||||
session.add(item)
|
||||
session.commit()
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
def pika_task():
|
||||
global connection
|
||||
global channel
|
||||
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
|
||||
channel = connection.channel()
|
||||
channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
|
||||
@@ -337,6 +379,7 @@ def pika_task():
|
||||
channel.start_consuming()
|
||||
|
||||
|
||||
|
||||
def list_contents(dir_name):
|
||||
logger.warning('list_contents(%s)', dir_name)
|
||||
return os.listdir(dir_name)
|
||||
@@ -436,7 +479,7 @@ def get_objects(params, files, url):
|
||||
for file in obj['properties'].get('c1000', []):
|
||||
if not main_filename:
|
||||
main_filename = file['fileName']
|
||||
res = requests.get(f'https://gql.ivazh.ru/item/{file["key"]}')
|
||||
res = requests.get(Config.gql_download, params={'item_id': file["key"]})
|
||||
zipf.writestr(f'{main_filename}/{file["fileName"]}', res.content)
|
||||
zipf.close()
|
||||
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
||||
@@ -537,34 +580,187 @@ def put_object(params, files, url):
|
||||
ws.transaction()
|
||||
res = ws.save()
|
||||
ws.commit(f'Putobject from {params["to"]}')
|
||||
ws.clearData(True)
|
||||
|
||||
|
||||
def apply_commits(params, files, url):
|
||||
logger.warning(params, files, url)
|
||||
logging.warning("Apply commits")
|
||||
logging.warning(params)
|
||||
assert len(files) == 1
|
||||
file = files[0]
|
||||
dir = TemporaryDirectory()
|
||||
with zipfile.ZipFile(file, 'r') as zip_ref:
|
||||
r = requests.get(file['url'])
|
||||
with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref:
|
||||
zip_ref.extractall(dir.name)
|
||||
req = ET.fromstring(params['query_data'])
|
||||
repl = req.find('replication')
|
||||
scheme = repl.get('scheme')
|
||||
commit = repl.get('id')
|
||||
logger.warning(scheme, commit)
|
||||
os.path.join(dir.name, 'export.o5c')
|
||||
bnd = params['from'].replace('tcp://', '')
|
||||
branch, new_scheme = get_branch(bnd, scheme)
|
||||
if new_scheme:
|
||||
scheme = new_scheme
|
||||
con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, scheme)
|
||||
ws = OODBWorkspace.ws(scheme)
|
||||
logging.warning("Connected to schema")
|
||||
if Config.ws_rabbit_params and not ws.hasProducer():
|
||||
ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'],
|
||||
Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'],
|
||||
Config.ws_rabbit_params['password'])
|
||||
ws.setCurrentUser(bnd)
|
||||
logging.warning("Set branch if needed")
|
||||
if branch:
|
||||
logging.warning(branch)
|
||||
ws.switchBranch(branch)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
oe = IpdExporter(ws)
|
||||
oe.improtFromOsm(os.path.join(dir.name, 'export.o5c'))
|
||||
oe.improtFromMbtiles(os.path.join(dir.name, 'export.mbtiles'))
|
||||
with open('export_files.json', 'r') as f:
|
||||
files_data = json.load(f)
|
||||
for file_data in files_data:
|
||||
upload_file(file_data['filename'], file_data['key'], file_data['bucket'])
|
||||
logging.warning("Importing...")
|
||||
if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')):
|
||||
logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}')
|
||||
else:
|
||||
logging.warning(f'Importing commit {commit} finished successfully')
|
||||
with open(os.path.join(dir.name, 'export_files.json'), 'r') as f:
|
||||
files_data = json.load(f)
|
||||
for file_data in files_data:
|
||||
upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket'])
|
||||
logging.warning("Finished import")
|
||||
ws.clearData(True)
|
||||
|
||||
|
||||
def get_commit(params, files, url):
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
req = ET.fromstring(params['query_data'])
|
||||
req_id = rxmls.get_request_uuid(req)
|
||||
res_id = uuid4().hex
|
||||
res_doc = rxmls.get_request_document(res_id, req_id)
|
||||
schema = req.find('currentCommit').get('scheme')
|
||||
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, schema)
|
||||
ws = OODBWorkspace.ws(schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema})
|
||||
|
||||
response_params = {
|
||||
'from': params['to'],
|
||||
'to': params['from'],
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '1',
|
||||
'user_id_to': 0,
|
||||
'query_type': NEW_COMMIT_RESPONSE,
|
||||
'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True)
|
||||
}
|
||||
proxy = ServerProxy(url)
|
||||
proxy.send(response_params, [], Config.ret_path)
|
||||
|
||||
|
||||
def query_commits(params, files, url):
|
||||
req = ET.fromstring(params['query_data'])
|
||||
commit_el = req.find('commit')
|
||||
commit_id = commit_el.get('id')
|
||||
schema = commit_el.get('schema')
|
||||
bnd = params['from'].replace('tcp://', '')
|
||||
|
||||
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, schema)
|
||||
ws = OODBWorkspace.ws(schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
schema_commits = ws.commits(ws.branch())
|
||||
logger.warning(schema_commits)
|
||||
if commit_id not in schema_commits:
|
||||
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
|
||||
return
|
||||
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
|
||||
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
|
||||
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
|
||||
if user.bndname == Config.self_bnd:
|
||||
continue
|
||||
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||
if len(profiles) == 0 or schema in profiles:
|
||||
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||
logging.warning(item)
|
||||
session.add(item)
|
||||
session.commit()
|
||||
|
||||
|
||||
def get_data(params, files, url):
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
req = ET.fromstring(params['query_data'])
|
||||
req_id = rxmls.get_request_uuid(req)
|
||||
res_id = uuid4().hex
|
||||
res = rxmls.get_request_document(res_id, req_id)
|
||||
rxmls.set_result(res, 0, '')
|
||||
request_string = req.find('data').text
|
||||
|
||||
OODBWorkspaceFactory.init('config/workspaces.json')
|
||||
conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json')
|
||||
worker = ResponseWorker(conf)
|
||||
fn = uuid4().hex
|
||||
dir = os.path.join(os.getcwd(), 'tmp', fn)
|
||||
os.makedirs(dir)
|
||||
worker.makeResponse(request_string, dir)
|
||||
make_archive(dir, 'zip', dir, '.')
|
||||
|
||||
rxmls.set_result(res, 0, '')
|
||||
response_params = {
|
||||
'from': params['to'],
|
||||
'to': params['from'],
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '1',
|
||||
'user_id_to': params['user_id'],
|
||||
'query_type': NEW_DATA_RESPONSE,
|
||||
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
||||
}
|
||||
filename = fn + '.zip'
|
||||
filepath = os.path.join(os.getcwd(), 'tmp', filename)
|
||||
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
||||
proxy = ServerProxy(url)
|
||||
proxy.send(response_params, response_files, Config.ret_path)
|
||||
|
||||
|
||||
def receive_data(params, files, url):
|
||||
req = ET.fromstring(params['query_data'])
|
||||
commit_el = req.find('commit')
|
||||
commit_id = commit_el.get('id')
|
||||
schema = commit_el.get('schema')
|
||||
bnd = params['from'].replace('tcp://', '')
|
||||
|
||||
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, schema)
|
||||
ws = OODBWorkspace.ws(schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
schema_commits = ws.commits(ws.branch())
|
||||
logger.warning(schema_commits)
|
||||
if commit_id not in schema_commits:
|
||||
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
|
||||
return
|
||||
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
|
||||
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
|
||||
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
|
||||
if user.bndname == Config.self_bnd:
|
||||
continue
|
||||
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||
if len(profiles) == 0 or schema in profiles:
|
||||
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||
logging.warning(item)
|
||||
session.add(item)
|
||||
session.commit()
|
||||
|
||||
|
||||
def run_task(query_type, params, files, url):
|
||||
@@ -578,6 +774,14 @@ def run_task(query_type, params, files, url):
|
||||
tasks.put(lambda: put_object(params, files, url))
|
||||
if query_type == NEW_REPLICATION_REQUEST:
|
||||
tasks.put(lambda: apply_commits(params, files, url))
|
||||
if query_type == NEW_COMMIT_REQUEST:
|
||||
tasks.put(lambda: get_commit(params, files, url))
|
||||
if query_type == NEW_COMMIT_RESPONSE:
|
||||
tasks.put(lambda: query_commits(params, files, url))
|
||||
if query_type == NEW_DATA_REQUEST:
|
||||
tasks.put(lambda: get_data(params, files, url))
|
||||
if query_type == NEW_DATA_RESPONSE:
|
||||
tasks.put(lambda: receive_data(params, files, url))
|
||||
|
||||
|
||||
def accept(params, files, url):
|
||||
@@ -608,7 +812,8 @@ def bnd_disconnected(bnd_name: str):
|
||||
|
||||
|
||||
def xmlrpc_task():
|
||||
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=False, allow_none=True)
|
||||
global server
|
||||
server = SimpleXMLRPCServer(('0.0.0.0', 9000), logRequests=True, allow_none=True)
|
||||
server.register_function(list_contents)
|
||||
server.register_function(aud_add)
|
||||
server.register_function(auth_response)
|
||||
@@ -622,6 +827,12 @@ def xmlrpc_task():
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
app.add_middleware(CORSMiddleware,
|
||||
allow_origins=['http://10.10.8.24:3000'],
|
||||
allow_credentials=True,
|
||||
allow_methods=['*'],
|
||||
allow_headers=['*']
|
||||
)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
@@ -675,8 +886,67 @@ async def fa_put_object(response: Response,
|
||||
put_object(request_params, files, None)
|
||||
return {"Upload": "Ok"}
|
||||
|
||||
@app.get("/cr")
|
||||
async def correction_replication(bnd_name: str, schema: str):
|
||||
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
res_id = uuid4().hex
|
||||
|
||||
res = rxmls.get_request_document(res_id, None)
|
||||
ET.SubElement(res, 'currentCommit', {'scheme': schema})
|
||||
params = {
|
||||
'from': f'tcp://{Config.self_bnd}',
|
||||
'to': f'tcp://{bnd_name}',
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '0',
|
||||
'query_type': NEW_COMMIT_REQUEST,
|
||||
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
|
||||
}
|
||||
proxy = ServerProxy(Config.enserver)
|
||||
try:
|
||||
proxy.send(params, [], Config.ret_path)
|
||||
except:
|
||||
logger.error('Error sending')
|
||||
|
||||
|
||||
@app.get("/get_cr")
|
||||
async def correction_replication(bnd_name: str, schema: str):
|
||||
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
res_id = uuid4().hex
|
||||
|
||||
res = rxmls.get_request_document(res_id, None)
|
||||
|
||||
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, schema)
|
||||
ws = OODBWorkspace.ws(schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
ET.SubElement(res, 'commit', {'id': ws.currentCommit(), 'schema': schema})
|
||||
|
||||
params = {
|
||||
'from': f'tcp://{Config.self_bnd}',
|
||||
'to': f'tcp://{bnd_name}',
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '0',
|
||||
'query_type': NEW_COMMIT_RESPONSE,
|
||||
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
|
||||
}
|
||||
proxy = ServerProxy(Config.enserver)
|
||||
try:
|
||||
proxy.send(params, [], Config.ret_path)
|
||||
except:
|
||||
logger.error('Error sending')
|
||||
|
||||
|
||||
def main():
|
||||
global connection
|
||||
global server
|
||||
global channel
|
||||
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.warning('Use Control-C to exit')
|
||||
|
||||
@@ -694,9 +964,11 @@ def main():
|
||||
|
||||
try:
|
||||
logger.warning('Start server')
|
||||
uvicorn.run(app, host="0.0.0.0", port=80)
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
except KeyboardInterrupt:
|
||||
logger.warning('Exiting')
|
||||
finally:
|
||||
server.server_close()
|
||||
|
||||
|
||||
def vers_key(e):
|
||||
@@ -721,6 +993,8 @@ def test():
|
||||
# Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
||||
# ws = OODBWorkspace.ws(Config.oodb_schema)
|
||||
# ws.init(con)
|
||||
# print(ws.currentCommit())
|
||||
# print(ws.commits(ws.branch()))
|
||||
# created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
|
||||
# qu = GroupQuery(Envelope())
|
||||
# qu.setUids(updated)
|
||||
@@ -740,7 +1014,22 @@ def test():
|
||||
# print(updated_files)
|
||||
# ws.clearData(True)
|
||||
# ws.close()
|
||||
replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
|
||||
#replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
|
||||
request_string = """
|
||||
{
|
||||
"type": "group_query",
|
||||
"data_source": "npd",
|
||||
"uids": [ "2e20130c-541a-4b9f-9efb-f2a0e8b10c33" ]
|
||||
}
|
||||
"""
|
||||
OODBWorkspaceFactory.init('config/workspaces.json')
|
||||
conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json')
|
||||
worker = ResponseWorker(conf)
|
||||
fn = uuid4().hex
|
||||
dir = os.path.join('tmp', fn)
|
||||
os.makedirs(dir)
|
||||
worker.makeResponse(request_string, dir)
|
||||
make_archive(dir, 'zip', dir, '.')
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
from gql import gql, Client
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
from config import Config
|
||||
|
||||
transport = AIOHTTPTransport(url="https://gql.ivazh.ru/graphql/")
|
||||
service = 'pdim'
|
||||
transport = AIOHTTPTransport(url=Config.gql_url)
|
||||
service = Config.gql_schema
|
||||
|
||||
|
||||
def get_classifier():
|
||||
@@ -18,7 +18,6 @@ graphql-core==3.3.0a3
|
||||
greenlet==2.0.2
|
||||
h11==0.14.0
|
||||
idna==3.4
|
||||
infra
|
||||
jmespath==1.0.1
|
||||
multidict==6.0.4
|
||||
pika==1.3.2
|
||||
|
||||
32
test.py
Normal file
32
test.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import db
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
def get_branch(bndname: str, scheme: str):
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
|
||||
if item:
|
||||
return item.branch
|
||||
return None
|
||||
|
||||
|
||||
def is_replication_scheme(bndname: str, scheme: str):
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
item = session.query(db.User).filter_by(bndname=bndname).one_or_none()
|
||||
if not item:
|
||||
return False
|
||||
profiles = {x.scheme: x.to_dict() for x in item.profiles}
|
||||
if len(profiles) == 0 or scheme in profiles:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
# print(get_branch('bnd128', 'ood'))
|
||||
print(is_replication_scheme('bnd128', 'documents_src'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user