Add s3 files processing
This commit is contained in:
91
main.py
91
main.py
@@ -29,6 +29,7 @@ import uvicorn
|
||||
from typing_extensions import Annotated
|
||||
import pathlib
|
||||
from infra import *
|
||||
from shutil import make_archive
|
||||
|
||||
|
||||
NEW_REPLICATION_REQUEST = 99
|
||||
@@ -672,37 +673,39 @@ def query_commits(params, files, url):
|
||||
|
||||
|
||||
def get_data(params, files, url):
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
req = ET.fromstring(params['query_data'])
|
||||
commit_el = req.find('commit')
|
||||
commit_id = commit_el.get('id')
|
||||
schema = commit_el.get('schema')
|
||||
bnd = params['from'].replace('tcp://', '')
|
||||
req_id = rxmls.get_request_uuid(req)
|
||||
res_id = uuid4().hex
|
||||
res = rxmls.get_request_document(res_id, req_id)
|
||||
rxmls.set_result(res, 0, '')
|
||||
request_string = req.find('data').text
|
||||
|
||||
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, schema)
|
||||
ws = OODBWorkspace.ws(schema)
|
||||
if not ws.isInit():
|
||||
res = ws.init(con)
|
||||
logger.warning(res)
|
||||
schema_commits = ws.commits(ws.branch())
|
||||
logger.warning(schema_commits)
|
||||
if commit_id not in schema_commits:
|
||||
logger.warning(f'Error in commits in schema {schema}: no commit {commit_id}')
|
||||
return
|
||||
logger.warning(schema_commits[schema_commits.index(commit_id) + 1:])
|
||||
OODBWorkspaceFactory.init('config/workspaces.json')
|
||||
conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json')
|
||||
worker = ResponseWorker(conf)
|
||||
fn = uuid4().hex
|
||||
dir = os.path.join('tmp', fn)
|
||||
os.makedirs(dir)
|
||||
worker.makeResponse(request_string, dir)
|
||||
make_archive(dir, 'zip', dir, '.')
|
||||
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
for commit in schema_commits[schema_commits.index(commit_id) + 1:]:
|
||||
for user in session.query(db.User).filter(db.User.bndname == bnd, db.User.active == True).all():
|
||||
if user.bndname == Config.self_bnd:
|
||||
continue
|
||||
profiles = {x.scheme: x.to_dict() for x in user.profiles}
|
||||
if len(profiles) == 0 or schema in profiles:
|
||||
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||
logging.warning(item)
|
||||
session.add(item)
|
||||
session.commit()
|
||||
rxmls.set_result(res, 0, '')
|
||||
response_params = {
|
||||
'from': params['to'],
|
||||
'to': params['from'],
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '1',
|
||||
'user_id_to': params['user_id'],
|
||||
'query_type': NEW_DATA_RESPONSE,
|
||||
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
||||
}
|
||||
filename = fn + '.zip'
|
||||
filepath = os.path.join(os.getcwd(), 'tmp', filename)
|
||||
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
||||
proxy = ServerProxy(url)
|
||||
proxy.send(response_params, response_files, Config.ret_path)
|
||||
|
||||
|
||||
def receive_data(params, files, url):
|
||||
@@ -926,12 +929,12 @@ def test():
|
||||
#get_metadata(params, files, url)
|
||||
#get_catalog()
|
||||
# auth_response('123', 'bnd127', False)
|
||||
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
||||
ws = OODBWorkspace.ws(Config.oodb_schema)
|
||||
ws.init(con)
|
||||
print(ws.currentCommit())
|
||||
print(ws.commits(ws.branch()))
|
||||
# con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
|
||||
# Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
|
||||
# ws = OODBWorkspace.ws(Config.oodb_schema)
|
||||
# ws.init(con)
|
||||
# print(ws.currentCommit())
|
||||
# print(ws.commits(ws.branch()))
|
||||
# created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
|
||||
# qu = GroupQuery(Envelope())
|
||||
# qu.setUids(updated)
|
||||
@@ -952,9 +955,25 @@ def test():
|
||||
# ws.clearData(True)
|
||||
# ws.close()
|
||||
#replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
|
||||
request_string = """
|
||||
{
|
||||
"type": "group_query",
|
||||
"data_source": "npd",
|
||||
"uids": [ "2e20130c-541a-4b9f-9efb-f2a0e8b10c33" ]
|
||||
}
|
||||
"""
|
||||
OODBWorkspaceFactory.init('config/workspaces.json')
|
||||
conf = ResponseWorkerConfig('config/response2.json', 'config/workspaces.json')
|
||||
worker = ResponseWorker(conf)
|
||||
fn = uuid4().hex
|
||||
dir = os.path.join('tmp', fn)
|
||||
os.makedirs(dir)
|
||||
worker.makeResponse(request_string, dir)
|
||||
make_archive(dir, 'zip', dir, '.')
|
||||
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
# test()
|
||||
# main()
|
||||
test()
|
||||
|
||||
Reference in New Issue
Block a user