Correcting replication
This commit is contained in:
107
main.py
107
main.py
@@ -34,6 +34,8 @@ from infra import *
|
|||||||
|
|
||||||
|
|
||||||
NEW_REPLICATION_REQUEST = 99
|
NEW_REPLICATION_REQUEST = 99
|
||||||
|
NEW_COMMIT_REQUEST = 98
|
||||||
|
NEW_COMMIT_RESPONSE = 1098
|
||||||
|
|
||||||
|
|
||||||
tasks = Queue()
|
tasks = Queue()
|
||||||
@@ -600,6 +602,69 @@ def apply_commits(params, files, url):
|
|||||||
ws.clearData(True)
|
ws.clearData(True)
|
||||||
|
|
||||||
|
|
||||||
|
def get_commit(params, files, url):
|
||||||
|
date = datetime.datetime.now()
|
||||||
|
rxmls = RequestXmlService()
|
||||||
|
req = ET.fromstring(params['query_data'])
|
||||||
|
req_id = rxmls.get_request_uuid(req)
|
||||||
|
res_id = uuid4().hex
|
||||||
|
res_doc = rxmls.get_request_document(res_id, req_id)
|
||||||
|
schema = req.find('currentCommit').get('scheme')
|
||||||
|
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||||
|
Config.oodb_username, Config.oodb_passwd, schema)
|
||||||
|
ws = OODBWorkspace.ws(schema)
|
||||||
|
if not ws.isInit():
|
||||||
|
res = ws.init(con)
|
||||||
|
logger.warning(res)
|
||||||
|
ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema})
|
||||||
|
|
||||||
|
response_params = {
|
||||||
|
'from': params['to'],
|
||||||
|
'to': params['from'],
|
||||||
|
'ts_added': date.timestamp(),
|
||||||
|
'user_id': '1',
|
||||||
|
'user_id_to': 0,
|
||||||
|
'query_type': NEW_COMMIT_RESPONSE,
|
||||||
|
'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True)
|
||||||
|
}
|
||||||
|
proxy = ServerProxy(url)
|
||||||
|
proxy.send(response_params, [], Config.ret_path)
|
||||||
|
|
||||||
|
|
||||||
|
def query_commits(params, files, url):
|
||||||
|
req = ET.fromstring(params['query_data'])
|
||||||
|
commit_el = req.find('commit')
|
||||||
|
commit_id = commit_el.get('id')
|
||||||
|
schema = commit_el.get('schema')
|
||||||
|
bnd = params['from'].replace('tcp://', '')
|
||||||
|
|
||||||
|
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||||
|
Config.oodb_username, Config.oodb_passwd, schema)
|
||||||
|
ws = OODBWorkspace.ws(schema)
|
||||||
|
if not ws.isInit():
|
||||||
|
res = ws.init(con)
|
||||||
|
logger.warning(res)
|
||||||
|
schema_commits = ws.commits(ws.branch())
|
||||||
|
logger.warning(schema_commits)
|
||||||
|
if commit_id not in schema_commits:
|
||||||
|
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
|
||||||
|
return
|
||||||
|
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
|
||||||
|
|
||||||
|
conn = db.connect_db()
|
||||||
|
with Session(conn) as session:
|
||||||
|
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
|
||||||
|
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
|
||||||
|
if user.bndname == Config.self_bnd:
|
||||||
|
continue
|
||||||
|
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||||
|
if len(profiles) == 0 or schema in profiles:
|
||||||
|
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||||
|
logging.warning(item)
|
||||||
|
session.add(item)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
|
||||||
def run_task(query_type, params, files, url):
|
def run_task(query_type, params, files, url):
|
||||||
if query_type == 4:
|
if query_type == 4:
|
||||||
tasks.put(lambda: load_catalog(params, files, url))
|
tasks.put(lambda: load_catalog(params, files, url))
|
||||||
@@ -611,6 +676,10 @@ def run_task(query_type, params, files, url):
|
|||||||
tasks.put(lambda: put_object(params, files, url))
|
tasks.put(lambda: put_object(params, files, url))
|
||||||
if query_type == NEW_REPLICATION_REQUEST:
|
if query_type == NEW_REPLICATION_REQUEST:
|
||||||
tasks.put(lambda: apply_commits(params, files, url))
|
tasks.put(lambda: apply_commits(params, files, url))
|
||||||
|
if query_type == NEW_COMMIT_REQUEST:
|
||||||
|
tasks.put(lambda: get_commit(params, files, url))
|
||||||
|
if query_type == NEW_COMMIT_RESPONSE:
|
||||||
|
tasks.put(lambda: query_commits(params, files, url))
|
||||||
|
|
||||||
|
|
||||||
def accept(params, files, url):
|
def accept(params, files, url):
|
||||||
@@ -708,6 +777,30 @@ async def fa_put_object(response: Response,
|
|||||||
put_object(request_params, files, None)
|
put_object(request_params, files, None)
|
||||||
return {"Upload": "Ok"}
|
return {"Upload": "Ok"}
|
||||||
|
|
||||||
|
@app.get("/cr")
|
||||||
|
async def correction_replication(bnd_name: str, schema: str):
|
||||||
|
if Config.self_bnd != 'bnd127':
|
||||||
|
return
|
||||||
|
date = datetime.datetime.now()
|
||||||
|
rxmls = RequestXmlService()
|
||||||
|
res_id = uuid4().hex
|
||||||
|
|
||||||
|
res = rxmls.get_request_document(res_id, None)
|
||||||
|
ET.SubElement(res, 'currentCommit', {'scheme': schema})
|
||||||
|
params = {
|
||||||
|
'from': f'tcp://{Config.self_bnd}',
|
||||||
|
'to': f'tcp://{bnd_name}',
|
||||||
|
'ts_added': date.timestamp(),
|
||||||
|
'user_id': '0',
|
||||||
|
'query_type': NEW_COMMIT_REQUEST,
|
||||||
|
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
|
||||||
|
}
|
||||||
|
proxy = ServerProxy(Config.enserver)
|
||||||
|
try:
|
||||||
|
proxy.send(params, [], Config.ret_path)
|
||||||
|
except:
|
||||||
|
logger.error('Error sending')
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
@@ -727,7 +820,7 @@ def main():
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
logger.warning('Start server')
|
logger.warning('Start server')
|
||||||
uvicorn.run(app, host="0.0.0.0", port=80)
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
logger.warning('Exiting')
|
logger.warning('Exiting')
|
||||||
|
|
||||||
@@ -750,10 +843,12 @@ def test():
|
|||||||
#get_metadata(params, files, url)
|
#get_metadata(params, files, url)
|
||||||
#get_catalog()
|
#get_catalog()
|
||||||
# auth_response('123', 'bnd127', False)
|
# auth_response('123', 'bnd127', False)
|
||||||
# con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||||
# Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
||||||
# ws = OODBWorkspace.ws(Config.oodb_schema)
|
ws = OODBWorkspace.ws(Config.oodb_schema)
|
||||||
# ws.init(con)
|
ws.init(con)
|
||||||
|
print(ws.currentCommit())
|
||||||
|
print(ws.commits(ws.branch()))
|
||||||
# created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
|
# created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
|
||||||
# qu = GroupQuery(Envelope())
|
# qu = GroupQuery(Envelope())
|
||||||
# qu.setUids(updated)
|
# qu.setUids(updated)
|
||||||
@@ -773,7 +868,7 @@ def test():
|
|||||||
# print(updated_files)
|
# print(updated_files)
|
||||||
# ws.clearData(True)
|
# ws.clearData(True)
|
||||||
# ws.close()
|
# ws.close()
|
||||||
replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
|
#replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user