Update config
This commit is contained in:
74
main.py
74
main.py
@@ -34,6 +34,8 @@ from infra import *
|
|||||||
NEW_REPLICATION_REQUEST = 99
|
NEW_REPLICATION_REQUEST = 99
|
||||||
NEW_COMMIT_REQUEST = 98
|
NEW_COMMIT_REQUEST = 98
|
||||||
NEW_COMMIT_RESPONSE = 1098
|
NEW_COMMIT_RESPONSE = 1098
|
||||||
|
NEW_DATA_REQUEST = 97
|
||||||
|
NEW_DATA_RESPONSE = 1097
|
||||||
|
|
||||||
|
|
||||||
tasks = Queue()
|
tasks = Queue()
|
||||||
@@ -669,6 +671,74 @@ def query_commits(params, files, url):
|
|||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def get_data(params, files, url):
|
||||||
|
req = ET.fromstring(params['query_data'])
|
||||||
|
commit_el = req.find('commit')
|
||||||
|
commit_id = commit_el.get('id')
|
||||||
|
schema = commit_el.get('schema')
|
||||||
|
bnd = params['from'].replace('tcp://', '')
|
||||||
|
|
||||||
|
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||||
|
Config.oodb_username, Config.oodb_passwd, schema)
|
||||||
|
ws = OODBWorkspace.ws(schema)
|
||||||
|
if not ws.isInit():
|
||||||
|
res = ws.init(con)
|
||||||
|
logger.warning(res)
|
||||||
|
schema_commits = ws.commits(ws.branch())
|
||||||
|
logger.warning(schema_commits)
|
||||||
|
if commit_id not in schema_commits:
|
||||||
|
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
|
||||||
|
return
|
||||||
|
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
|
||||||
|
|
||||||
|
conn = db.connect_db()
|
||||||
|
with Session(conn) as session:
|
||||||
|
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
|
||||||
|
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
|
||||||
|
if user.bndname == Config.self_bnd:
|
||||||
|
continue
|
||||||
|
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||||
|
if len(profiles) == 0 or schema in profiles:
|
||||||
|
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||||
|
logging.warning(item)
|
||||||
|
session.add(item)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def receive_data(params, files, url):
|
||||||
|
req = ET.fromstring(params['query_data'])
|
||||||
|
commit_el = req.find('commit')
|
||||||
|
commit_id = commit_el.get('id')
|
||||||
|
schema = commit_el.get('schema')
|
||||||
|
bnd = params['from'].replace('tcp://', '')
|
||||||
|
|
||||||
|
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||||
|
Config.oodb_username, Config.oodb_passwd, schema)
|
||||||
|
ws = OODBWorkspace.ws(schema)
|
||||||
|
if not ws.isInit():
|
||||||
|
res = ws.init(con)
|
||||||
|
logger.warning(res)
|
||||||
|
schema_commits = ws.commits(ws.branch())
|
||||||
|
logger.warning(schema_commits)
|
||||||
|
if commit_id not in schema_commits:
|
||||||
|
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
|
||||||
|
return
|
||||||
|
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
|
||||||
|
|
||||||
|
conn = db.connect_db()
|
||||||
|
with Session(conn) as session:
|
||||||
|
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
|
||||||
|
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
|
||||||
|
if user.bndname == Config.self_bnd:
|
||||||
|
continue
|
||||||
|
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||||
|
if len(profiles) == 0 or schema in profiles:
|
||||||
|
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||||
|
logging.warning(item)
|
||||||
|
session.add(item)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
|
||||||
def run_task(query_type, params, files, url):
|
def run_task(query_type, params, files, url):
|
||||||
if query_type == 4:
|
if query_type == 4:
|
||||||
tasks.put(lambda: load_catalog(params, files, url))
|
tasks.put(lambda: load_catalog(params, files, url))
|
||||||
@@ -684,6 +754,10 @@ def run_task(query_type, params, files, url):
|
|||||||
tasks.put(lambda: get_commit(params, files, url))
|
tasks.put(lambda: get_commit(params, files, url))
|
||||||
if query_type == NEW_COMMIT_RESPONSE:
|
if query_type == NEW_COMMIT_RESPONSE:
|
||||||
tasks.put(lambda: query_commits(params, files, url))
|
tasks.put(lambda: query_commits(params, files, url))
|
||||||
|
if query_type == NEW_DATA_REQUEST:
|
||||||
|
tasks.put(lambda: get_data(params, files, url))
|
||||||
|
if query_type == NEW_DATA_RESPONSE:
|
||||||
|
tasks.put(lambda: receive_data(params, files, url))
|
||||||
|
|
||||||
|
|
||||||
def accept(params, files, url):
|
def accept(params, files, url):
|
||||||
|
|||||||
Reference in New Issue
Block a user