fix old replication

This commit is contained in:
Ivan Vazhenin
2023-11-02 19:05:48 +03:00
parent e9ca649da4
commit 5b17bba5bf
3 changed files with 177 additions and 25 deletions

199
main.py
View File

@@ -12,6 +12,7 @@ from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.client import ServerProxy
import logging
import os
import zlib
import os.path
import requests
from reqs.graphql import get_catalog, get_object
@@ -75,12 +76,161 @@ def replication_task():
for bndname in connected:
for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all():
if item.user.is_active_now():
replication(bndname, item.commit_id, item.schema)
if item.user.newbnd:
replication(bndname, item.commit_id, item.schema)
else:
replication_old(bndname, item.commit_id, item.schema)
session.delete(item)
session.commit()
time.sleep(60)
def process_chart(ws, chart, chart_class):
for x in chart.get('properties') or {}:
attr = ws.attribute(x)
if not attr:
continue
if str(attr.type()) == 'AT_Domain':
dom = attr.domain()
key = str(chart['properties'][x])
chart['properties'][x] = variantToString(dom.value(variantFromString(key)))
if x == 'c103':
chart['properties'][x] = 'Несекретно'
chart['class'] = chart_class['name'].split('_')[-1]
return chart
def crc32(filename, chunk_size=65536):
"""Compute the CRC-32 checksum of the contents of the given filename"""
with open(filename, "rb") as f:
checksum = 0
while chunk := f.read(chunk_size):
checksum = zlib.crc32(chunk, checksum)
return "%08X" % (checksum & 0xFFFFFFFF)
def send_object_replication(ws, ids):
qu = GroupQuery(Envelope())
query = GroupQuery(Envelope.world())
query.setUids(ids)
uids = []
ws.load(query, uids)
res = json.loads(ws.dataToJson())
charts = [process_chart(ws, f, x) for x in res for f in x['features']]
for chart in charts:
logger.warning('\n')
date = datetime.datetime.now()
rxmls = RequestXmlService()
res_id = uuid4().hex
res = rxmls.get_request_document(res_id, None)
res.set('replication_package', '1')
res.set('replication_version', date.strftime('%Y%m%d%H%M%S'))
res.set('user_permit', 'AA0AA00020200726D3E75C80B713A7A3D3E75C80B713A7A363F7CB889AA3F520')
rxmls.set_result(res, 0, '')
print(chart)
properties = chart.get('properties') or {}
c1000 = properties.get('c1000')
if not c1000:
logger.warning(f'No file for {chart["uid"].replace("-", "")}')
continue
z = Zip()
xml_objects = ET.SubElement(res, 'objects')
created_date = datetime.datetime.fromisoformat(chart.get('date_created'))
xml_chart = ET.SubElement(res, 'chart', {
'Class': chart['class'],
'ID': chart['uid'].replace('-', ''),
'Name': c1000[0]['fileName'],
'Type': 'OOD',
'metadata_version': '1',
'source': 'bnd127',
'system_date': str(created_date.timestamp()),
})
xml_version = ET.SubElement(xml_objects, 'version', {
'object_id': chart['uid'].replace('-', ''),
'source': 'bnd127',
'system_date': str(created_date.timestamp()),
'version': '1.0',
'version_id': chart['uid'].replace('-', ''),
})
total_size = 0
for file in c1000:
directory = os.path.join(z.dirname, f'maps/{res_id}/ENC_ROOT/{c1000[0]["fileName"]}')
fp = os.path.join(directory, file['fileName'])
if not os.path.exists(directory):
os.makedirs(directory)
download_file(file['key'], Config.s3_bucket, fp)
size = os.stat(fp).st_size
_crc32 = crc32(fp)
ET.SubElement(xml_version, 'file', {
'cell_file': 'false',
'crc32': _crc32,
'crc32_enc': _crc32,
'file_id': file['key'].replace('-', ''),
'file_ref': os.path.join(f'maps/{res_id}/ENC_ROOT', file['fileName']),
'file_size': str(size),
'file_size_enc': str(size),
})
xml_version.set('crc32', _crc32)
xml_version.set('crc32', _crc32)
total_size += size
xml_version.set('size', str(total_size))
xml_version.set('crc32', str(total_size))
xml_tags = ET.SubElement(res, 'tags')
xml_archs = ET.SubElement(res, 'archs')
xml_arch = ET.SubElement(xml_archs, 'arch', {
'obj_id': chart['uid'].replace('-', ''),
'ver_cl': '8.31',
'ver_id': chart['uid'].replace('-', ''),
})
for attribute in properties:
if attribute.startswith('c'):
ET.SubElement(xml_chart, 'Attribute', {
'name': attribute.replace('_', '.'),
'value': str(chart['properties'][attribute]),
})
ET.SubElement(xml_arch, 'attr', {
'code': attribute.replace('_', '.'),
'name': attribute.replace('_', '.'),
'value': str(chart['properties'][attribute]),
})
params = {
'from': f'tcp://{Config.self_bnd}',
'to': f'tcp://{Config.remote_bnd}',
'ts_added': date.timestamp(),
'user_id': '0',
'query_type': NEW_REPLICATION_REQUEST,
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True),
}
filepath = z.pack()
response_files = [{'name': os.path.basename(filepath), 'url': filepath, 'size': os.path.getsize(filepath)}]
logger.warning(response_files)
logging.debug('Send replication package')
proxy = ServerProxy(Config.enserver)
try:
proxy.send(params, response_files, Config.ret_path)
except:
logger.error('Error sending')
def replication_old(bnd_name: str, commit_id: str, schema: str):
logger.warning('Start replication')
if schema != Config.oodb_schema:
return
con = OOConnectionParams(schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, schema)
ws = OODBWorkspace.ws(schema)
if not ws.isInit():
res = ws.init(con)
logger.warning(res)
created, updated, _ = ws.changes(commit_id)
ids = list(set(created) | set(updated))
send_object_replication(ws, ids)
ws.clearData(True)
logger.warning('Replication to old bnd is sent')
def replication(bnd_name: str, commit_id: str, schema: str):
date = datetime.datetime.now()
rxmls = RequestXmlService()
@@ -567,29 +717,30 @@ def test():
#get_metadata(params, files, url)
#get_catalog()
# auth_response('123', 'bnd127', False)
con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
ws = OODBWorkspace.ws(Config.oodb_schema)
ws.init(con)
created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
qu = GroupQuery(Envelope())
qu.setUids(updated)
qu.setLoadArch(True)
uids = []
ws.clearData(True)
ws.load(qu, uids)
updated_files = []
for feature_uid in uids:
not_files = True
vers = ws.featureVersion(feature_uid)
if len(vers) > 1:
vers.sort(key=vers_key)
not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
if not not_files:
updated_files.append(feature_uid)
print(updated_files)
ws.clearData(True)
ws.close()
# con = OOConnectionParams(Config.oodb_schema, Config.oodb_host, Config.oodb_port, Config.oodb_dbname,
# Config.oodb_username, Config.oodb_passwd, Config.oodb_schema)
# ws = OODBWorkspace.ws(Config.oodb_schema)
# ws.init(con)
# created, updated, deleted = ws.changes('2dad8c8a-d2db-4074-ab7a-c01c36ada2be')
# qu = GroupQuery(Envelope())
# qu.setUids(updated)
# qu.setLoadArch(True)
# uids = []
# ws.clearData(True)
# ws.load(qu, uids)
# updated_files = []
# for feature_uid in uids:
# not_files = True
# vers = ws.featureVersion(feature_uid)
# if len(vers) > 1:
# vers.sort(key=vers_key)
# not_files = vers[0].feature().isEqual(vers[1].feature(), ["c1000"])
# if not not_files:
# updated_files.append(feature_uid)
# print(updated_files)
# ws.clearData(True)
# ws.close()
replication_old('bnd128', '23c9a275-ec0f-481a-8437-f3e41e4fe4f5', 'documents_src')
pass

3
zip.py
View File

@@ -18,5 +18,6 @@ class Zip:
for folder_name, sub_folders, file_names in os.walk(self.dir.name):
for filename in file_names:
file_path = os.path.join(folder_name, filename)
zip_object.write(file_path, os.path.basename(file_path))
rel_file_path = os.path.relpath(file_path, self.dir.name)
zip_object.write(file_path, rel_file_path)
return tmp_zip