Add s3 files processing

This commit is contained in:
Ivan Vazhenin
2023-07-27 18:27:26 +03:00
parent cf7990e96a
commit deadd10588
3 changed files with 112 additions and 11 deletions

102
main.py
View File

@@ -30,6 +30,7 @@ from libipdutilities import *
from liboodriver import *
from config import Config
from zip import Zip
import boto3
tasks = Queue()
@@ -38,6 +39,26 @@ connected = False
logger = logging.getLogger('xmlrpcserver')
def s3_connection():
return boto3.client('s3', endpoint_url=Config.s3_endpoint,
aws_access_key_id=Config.s3_key_id,
aws_secret_access_key=Config.s3_access_key)
def download_file(key: str, filename: str):
client = s3_connection()
obj = client.get_object(Bucket=Config.s3_bucket, Key=key)
with open(f"{filename}", 'wb') as f:
for chunk in obj['Body'].iter_chunks(chunk_size=4096):
f.write(chunk)
def upload_file(filename: str):
client = s3_connection()
with open(filename, 'rb') as f:
client.put_object(Body=f.read(), Bucket=Config.s3_bucket, Key=os.path.basename(filename))
def run_tasks():
logger.debug('Task thread started.')
while True:
@@ -85,15 +106,53 @@ def pika_callback(ch, method, properties, body):
'query_type': 99,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
commit_info = json.loads(body)
schema = commit_info.get('schema') or Config.oodb_schema
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
ws.init(con)
oe = OsmExporter(ws)
# commit = '1de21737-09dc-4a45-b7d9-7b11044ee487'
oe = IpdExporter(ws)
# commit = {"schema": "kartap", "commit": "55a01cf5-c27c-40be-b771-dc0b16c1878b"}
# commit = {"schema": "kartap", "commit": "76405109-db79-4225-b885-fdb00cfc53a6"}
z = Zip()
nc = oe.nextCommit(body)
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), body, nc)
nc = oe.nextCommit(commit_info['commit'])
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), commit_info['commit'], nc)
oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), commit_info['commit'], nc)
created, updated, deleted = ws.changes(commit_info['commit'])
qu = GroupQuery(Envelope())
qu.setUids(updated)
qu.setLoadArch(True)
uids = []
ws.clearData(True)
ws.load(qu, uids)
updated_files = []
for feature_uid in uids:
not_files = True
vers = ws.featureVersion(feature_uid)
if len(vers) > 1:
vers.sort(key=vers_key)
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
if not not_files:
attr = vers[0].feature().attribute('c1000')
if attr:
updated_files.append(feature_uid)
ws.clearData(True)
qu = GroupQuery(Envelope())
qu.setUids(created)
ws.load(qu, uids)
for feature_uid in uids:
feature = ws.featureByUid(feature_uid)
if not feature:
continue
attr = feature.attribute('c1000')
if attr:
updated_files.append(feature_uid)
for x in updated_files:
download_file(x, os.path.join(z.dirname, x))
ws.clearData(True)
ws.close()
filepath = z.pack()
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
@@ -325,6 +384,10 @@ def main():
logger.warning('Exiting')
def vers_key(e):
return e.version()
def test():
#params = {"from": "tcp://kptsp_vb", "query_data": "<?xml version=\"1.0\" encoding=\"utf-8\"?><request><header parcel_id=\"990715ba919544a98f22cc7d3b0d9e8d\"/><getMetadataByIds><chart id=\"fc44343bd1654ee7b03ac1731567bbfd\"/></getMetadataByIds></request>", "query_type": 24, "to": "tcp://bnd127", "user_id": "3302", "ts_added": 1679825320.653038}
#files = []
@@ -332,7 +395,30 @@ def test():
# accept(params, files, url)
#get_metadata(params, files, url)
#get_catalog()
auth_response('123', 'bnd127', False)
# auth_response('123', 'bnd127', False)
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
ws.init(con)
created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
qu = GroupQuery(Envelope())
qu.setUids(updated)
qu.setLoadArch(True)
uids = []
ws.clearData(True)
ws.load(qu, uids)
updated_files = []
for feature_uid in uids:
not_files = True
vers = ws.featureVersion(feature_uid)
if len(vers) > 1:
vers.sort(key=vers_key)
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
if not not_files:
updated_files.append(feature_uid)
print(updated_files)
ws.clearData(True)
ws.close()
pass