diff --git a/main.py b/main.py index 205e140..af381c5 100644 --- a/main.py +++ b/main.py @@ -34,6 +34,8 @@ from infra import * NEW_REPLICATION_REQUEST = 99 NEW_COMMIT_REQUEST = 98 NEW_COMMIT_RESPONSE = 1098 +NEW_DATA_REQUEST = 97 +NEW_DATA_RESPONSE = 1097 tasks = Queue() @@ -669,6 +671,74 @@ def query_commits(params, files, url): session.commit() +def get_data(params, files, url): + req = ET.fromstring(params['query_data']) + commit_el = req.find('commit') + commit_id = commit_el.get('id') + schema = commit_el.get('schema') + bnd = params['from'].replace('tcp://', '') + + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + schema_commits = ws.commits(ws.branch()) + logger.warning(schema_commits) + if commit_id not in schema_commits: + logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') + return + logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) + + conn = db.connect_db() + with Session(conn) as session: + for commit in schema_commits[schema_commits.index(commit_id) + 1:]: + for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): + if user.bndname == Config.self_bnd: + continue + profiles = {x.scheme: x.to_dict() for x in user.profiles} + if len(profiles) == 0 or schema in profiles: + item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) + logging.warning(item) + session.add(item) + session.commit() + + +def receive_data(params, files, url): + req = ET.fromstring(params['query_data']) + commit_el = req.find('commit') + commit_id = commit_el.get('id') + schema = commit_el.get('schema') + bnd = params['from'].replace('tcp://', '') + + con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname, + Config.oodb_username, Config.oodb_passwd, schema) + ws = OODBWorkspace.ws(schema) + if not ws.isInit(): + res = ws.init(con) + logger.warning(res) + schema_commits = ws.commits(ws.branch()) + logger.warning(schema_commits) + if commit_id not in schema_commits: + logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}') + return + logger.warning(schema_commits[schema_commits.index(commit_id) + 1:]) + + conn = db.connect_db() + with Session(conn) as session: + for commit in schema_commits[schema_commits.index(commit_id) + 1:]: + for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all(): + if user.bndname == Config.self_bnd: + continue + profiles = {x.scheme: x.to_dict() for x in user.profiles} + if len(profiles) == 0 or schema in profiles: + item = db.Queue(user_id=user.id, commit_id=commit, schema=schema) + logging.warning(item) + session.add(item) + session.commit() + + def run_task(query_type, params, files, url): if query_type == 4: tasks.put(lambda: load_catalog(params, files, url)) @@ -684,6 +754,10 @@ def run_task(query_type, params, files, url): tasks.put(lambda: get_commit(params, files, url)) if query_type == NEW_COMMIT_RESPONSE: tasks.put(lambda: query_commits(params, files, url)) + if query_type == NEW_DATA_REQUEST: + tasks.put(lambda: get_data(params, files, url)) + if query_type == NEW_DATA_RESPONSE: + tasks.put(lambda: receive_data(params, files, url)) def accept(params, files, url):