diff --git a/main.py b/main.py index d721f19..c3d2a90 100644 --- a/main.py +++ b/main.py @@ -34,6 +34,8 @@ from infra import * NEW_REPLICATION_REQUEST = 99 +NEW_COMMIT_REQUEST = 98 +NEW_COMMIT_RESPONSE = 1098 tasks = Queue() @@ -600,6 +602,69 @@ def apply_commits(params, files, url): ws.clearData(True) +def get_commit(params, files, url): + date = datetime.datetime.now() + rxmls = RequestXmlService() + req = ET.fromstring(params['query_data']) + req_id = rxmls.get_request_uuid(req) + res_id = uuid4().hex + res_doc = rxmls.get_request_document(res_id, req_id) + schema = req.find('currentCommit').get('scheme') + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + ET.SubElement(res_doc, 'commit', {'id': ws.currentCommit(), 'schema': schema}) + + response_params = { + 'from': params['to'], + 'to': params['from'], + 'ts_added': date.timestamp(), + 'user_id': '1', + 'user_id_to': 0, + 'query_type': NEW_COMMIT_RESPONSE, + 'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True) + } + proxy = ServerProxy(url) + proxy.send(response_params, [], Config.ret_path) + + +def query_commits(params, files, url): + req = ET.fromstring(params['query_data']) + commit_el = req.find('commit') + commit_id = commit_el.get('id') + schema = commit_el.get('schema') + bnd = params['from'].replace('tcp://', '') + + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + schema_commits = ws.commits(ws.branch()) + logger.warning(schema_commits) + if commit_id not in schema_commits: + logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') + return + logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) + + conn = db.connect_db() + with Session(conn) as session: + for commit in schema_commits[schema_commits.index(commit_id) + 1:]: + for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): + if user.bndname == Config.self_bnd: + continue + profiles = {x.scheme: x.to_dict() for x in user.profiles} + if len(profiles) == 0 or schema in profiles: + item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) + logging.warning(item) + session.add(item) + session.commit() + + def run_task(query_type, params, files, url): if query_type == 4: tasks.put(lambda: load_catalog(params, files, url)) @@ -611,6 +676,10 @@ def run_task(query_type, params, files, url): tasks.put(lambda: put_object(params, files, url)) if query_type == NEW_REPLICATION_REQUEST: tasks.put(lambda: apply_commits(params, files, url)) + if query_type == NEW_COMMIT_REQUEST: + tasks.put(lambda: get_commit(params, files, url)) + if query_type == NEW_COMMIT_RESPONSE: + tasks.put(lambda: query_commits(params, files, url)) def accept(params, files, url): @@ -708,6 +777,30 @@ async def fa_put_object(response: Response, put_object(request_params, files, None) return {"Upload": "Ok"} +@app.get("/cr") +async def correction_replication(bnd_name: str, schema: str): + if Config.self_bnd != 'bnd127': + return + date = datetime.datetime.now() + rxmls = RequestXmlService() + res_id = uuid4().hex + + res = rxmls.get_request_document(res_id, None) + ET.SubElement(res, 'currentCommit', {'scheme': schema}) + params = { + 'from': f'tcp://{Config.self_bnd}', + 'to': f'tcp://{bnd_name}', + 'ts_added': date.timestamp(), + 'user_id': '0', + 'query_type': NEW_COMMIT_REQUEST, + 'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True), + } + proxy = ServerProxy(Config.enserver) + try: + proxy.send(params, [], Config.ret_path) + except: + logger.error('Error sending') + def main(): logger.setLevel(logging.INFO) @@ -727,7 +820,7 @@ def main(): try: logger.warning('Start server') - uvicorn.run(app, host="0.0.0.0", port=80) + uvicorn.run(app, host="0.0.0.0", port=8000) except KeyboardInterrupt: logger.warning('Exiting') @@ -750,10 +843,12 @@ def test(): #get_metadata(params, files, url) #get_catalog() # auth_response('123', 'bnd127', False) - # con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, - # Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) - # ws = OODBWorkspace.ws(Config.oodb_schema) - # ws.init(con) + con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, Config.oodb_schema) + ws = OODBWorkspace.ws(Config.oodb_schema) + ws.init(con) + print(ws.currentCommit()) + print(ws.commits(ws.branch())) # created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be') # qu = GroupQuery(Envelope()) # qu.setUids(updated) @@ -773,7 +868,7 @@ def test(): # print(updated_files) # ws.clearData(True) # ws.close() - replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src') + #replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src') pass