Correcting replication

This commit is contained in:
Ivan Vazhenin
2023-12-24 18:28:29 +03:00
parent 794ad96bf3
commit b223733b72
2 changed files with 16 additions and 4 deletions

View File

@@ -1,6 +1,6 @@
class Config: class Config:
ret_path: str = 'http://10.10.8.81:9000/' ret_path: str = 'http://10.10.8.60:9000/'
self_bnd: str = 'bnd127' self_bnd: str = 'bnd127'
enserver: str = 'http://127.0.0.1:7000/xmlrpc' enserver: str = 'http://127.0.0.1:7000/xmlrpc'
remote_bnd: str = 'bnd128' remote_bnd: str = 'bnd128'

18
main.py
View File

@@ -27,6 +27,7 @@ import boto3
import db import db
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from fastapi import FastAPI, Response, Form, UploadFile, File, Request from fastapi import FastAPI, Response, Form, UploadFile, File, Request
from fastapi.middleware.cors import CORSMiddleware
import uvicorn import uvicorn
from typing_extensions import Annotated from typing_extensions import Annotated
import pathlib import pathlib
@@ -69,7 +70,7 @@ def get_branch(bndname: str, scheme: str):
with Session(conn) as session: with Session(conn) as session:
item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none() item = session.query(db.IncomeBranch).filter_by(scheme=scheme).join(db.User).filter_by(bndname=bndname).one_or_none()
if item: if item:
return item.branch,item.local_scheme return item.branch, item.local_scheme
return None, None return None, None
@@ -562,6 +563,8 @@ def put_object(params, files, url):
def apply_commits(params, files, url): def apply_commits(params, files, url):
logging.warning("Apply commits")
logging.warning(params)
assert len(files) == 1 assert len(files) == 1
file = files[0] file = files[0]
dir = TemporaryDirectory() dir = TemporaryDirectory()
@@ -579,11 +582,13 @@ def apply_commits(params, files, url):
con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, con = OOConnectionParams(scheme, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, scheme) Config.oodb_username, Config.oodb_passwd, scheme)
ws = OODBWorkspace.ws(scheme) ws = OODBWorkspace.ws(scheme)
logging.warning("Connected to schema")
if Config.ws_rabbit_params and not ws.hasProducer(): if Config.ws_rabbit_params and not ws.hasProducer():
ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'], ws.setProducer(Config.ws_rabbit_params['host'], Config.ws_rabbit_params['port'],
Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'], Config.ws_rabbit_params['exchange'], Config.ws_rabbit_params['user'],
Config.ws_rabbit_params['password']) Config.ws_rabbit_params['password'])
ws.setCurrentUser(bnd) ws.setCurrentUser(bnd)
logging.warning("Set branch if needed")
if branch: if branch:
logging.warning(branch) logging.warning(branch)
ws.switchBranch(branch) ws.switchBranch(branch)
@@ -591,6 +596,7 @@ def apply_commits(params, files, url):
res = ws.init(con) res = ws.init(con)
logger.warning(res) logger.warning(res)
oe = IpdExporter(ws) oe = IpdExporter(ws)
logging.warning("Importing...")
if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')): if not oe.importChanges(os.path.join(dir.name, 'export.o5c'), os.path.join(dir.name, 'export.mbtiles')):
logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}') logging.warning(f'Error importing commit {commit}: {oe.lastError().text()}')
else: else:
@@ -599,6 +605,7 @@ def apply_commits(params, files, url):
files_data = json.load(f) files_data = json.load(f)
for file_data in files_data: for file_data in files_data:
upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket']) upload_file(os.path.join(dir.name, file_data['key']), file_data['key'], file_data['bucket'])
logging.warning("Finished import")
ws.clearData(True) ws.clearData(True)
@@ -724,6 +731,12 @@ def xmlrpc_task():
app = FastAPI() app = FastAPI()
app.add_middleware(CORSMiddleware,
allow_origins=['http://10.10.8.24:3000'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*']
)
@app.get("/") @app.get("/")
@@ -779,8 +792,7 @@ async def fa_put_object(response: Response,
@app.get("/cr") @app.get("/cr")
async def correction_replication(bnd_name: str, schema: str): async def correction_replication(bnd_name: str, schema: str):
if Config.self_bnd != 'bnd127':
return
date = datetime.datetime.now() date = datetime.datetime.now()
rxmls = RequestXmlService() rxmls = RequestXmlService()
res_id = uuid4().hex res_id = uuid4().hex