Add events: connect and disconnect
This commit is contained in:
93
main.py
93
main.py
@@ -13,8 +13,6 @@ from xmlrpc.client import ServerProxy
|
||||
import logging
|
||||
import os
|
||||
import os.path
|
||||
|
||||
import psycopg
|
||||
import requests
|
||||
from reqs.graphql import get_catalog, get_object
|
||||
from pygost import gost34112012256
|
||||
@@ -25,13 +23,15 @@ from infra import *
|
||||
from config import Config
|
||||
from zip import Zip
|
||||
import boto3
|
||||
import db
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
|
||||
NEW_REPLICATION_REQUEST = 99
|
||||
|
||||
|
||||
tasks = Queue()
|
||||
connected = False
|
||||
connected = set()
|
||||
|
||||
logger = logging.getLogger('xmlrpcserver')
|
||||
|
||||
@@ -64,40 +64,26 @@ def run_tasks():
|
||||
|
||||
|
||||
def replication_task():
|
||||
return
|
||||
while not connected:
|
||||
time.sleep(1)
|
||||
while True:
|
||||
print()
|
||||
conn = db.connect_db()
|
||||
with (Session(conn) as session):
|
||||
for bndname in connected:
|
||||
for item in session.query(db.Queue).join(db.Queue.user).filter_by(bndname=bndname).all():
|
||||
replication(bndname, item.commit_id, item.schema)
|
||||
session.delete(item)
|
||||
session.commit()
|
||||
time.sleep(60)
|
||||
|
||||
|
||||
def replication(bnd_name: str, commit_id: str, schema: str):
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
res_id = uuid4().hex
|
||||
res = rxmls.get_request_document(res_id, None)
|
||||
res.set('replication_package', '1')
|
||||
rxmls.set_result(res, 0, '')
|
||||
response_params = {
|
||||
'from': f'tcp://{Config.self_bnd}',
|
||||
'to': f'tcp://{Config.remote_bnd}',
|
||||
'ts_added': date.timestamp(),
|
||||
'user_id': '0',
|
||||
'query_type': 1114,
|
||||
'query_data': ET.tostring(res, encoding='unicode', xml_declaration=True)
|
||||
}
|
||||
filename = '51a8a2c81f774af7bba61b475b4b51b5'
|
||||
filepath = '/tmp/' + filename
|
||||
response_files = [{'name': filename, 'url': filepath, 'size': os.path.getsize(filepath)}]
|
||||
logging.debug('Send replication package')
|
||||
proxy = ServerProxy(Config.enserver)
|
||||
proxy.send(response_params, response_files, Config.ret_path)
|
||||
|
||||
|
||||
def pika_callback(ch, method, properties, body):
|
||||
date = datetime.datetime.now()
|
||||
rxmls = RequestXmlService()
|
||||
res_id = uuid4().hex
|
||||
commit_info = json.loads(body)
|
||||
schema = commit_info.get('schema') or Config.oodb_schema
|
||||
res = rxmls.get_request_document(res_id, None)
|
||||
rxmls.set_result(res, 0, '')
|
||||
ET.SubElement(res, 'replication', {'id': commit_info['commit'], 'scheme': schema})
|
||||
ET.SubElement(res, 'replication', {'id': commit_id, 'scheme': schema})
|
||||
response_params = {
|
||||
'from': f'tcp://{Config.self_bnd}',
|
||||
'to': f'tcp://{Config.remote_bnd}',
|
||||
@@ -119,11 +105,11 @@ def pika_callback(ch, method, properties, body):
|
||||
# commit = {"schema": "npd_data", "commit": "f42afacd-483a-4d8f-bccb-74d65d45378f"}
|
||||
z = Zip()
|
||||
|
||||
pc = oe.previousCommit(commit_info['commit'])
|
||||
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_info['commit'])
|
||||
oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_info['commit'])
|
||||
pc = oe.previousCommit(commit_id)
|
||||
oe.exportChanges2osm(os.path.join(z.dirname, 'export.o5c'), pc, commit_id)
|
||||
oe.exportChanges2mbtiles(os.path.join(z.dirname, 'export.mbtiles'), pc, commit_id)
|
||||
|
||||
created, updated, deleted = ws.changes(commit_info['commit'])
|
||||
created, updated, deleted = ws.changes(commit_id)
|
||||
qu = GroupQuery(Envelope())
|
||||
qu.setUids(updated)
|
||||
qu.setLoadArch(True)
|
||||
@@ -173,6 +159,18 @@ def pika_callback(ch, method, properties, body):
|
||||
proxy.send(response_params, response_files, Config.ret_path)
|
||||
except:
|
||||
logger.error('Error sending')
|
||||
|
||||
|
||||
def pika_callback(ch, method, properties, body):
|
||||
commit_info = json.loads(body)
|
||||
schema = commit_info.get('schema') or Config.oodb_schema
|
||||
commit = commit_info['commit']
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
for user in session.query(db.User).filter(db.User.active == True, db.User.upstream == False).all():
|
||||
item = db.Queue(user_id=user.id, commit_id=commit, schema=schema)
|
||||
session.add(item)
|
||||
session.commit()
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
@@ -190,29 +188,27 @@ def list_contents(dir_name):
|
||||
|
||||
|
||||
def aud_add(message):
|
||||
global connected
|
||||
if not isinstance(message, list):
|
||||
logger.warning(message)
|
||||
return 'OK'
|
||||
for item in message:
|
||||
logger.warning(item)
|
||||
if item.get('level', -1) == 0 and item.get('type', -1) == 4002:
|
||||
connected = True
|
||||
return 'OK'
|
||||
|
||||
|
||||
def auth_response(challenge, server_id, is_server):
|
||||
# logging.debug(f'Challenge: {challenge}, Server: {server_id}, IsServer: {is_server}')
|
||||
with psycopg.connect(f'host={Config.pg_host} port={Config.pg_port} dbname={Config.pg_dbname} '
|
||||
f'user={Config.pg_username} password={Config.pg_password}') as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute('SELECT passwd FROM users where username = %s', (server_id,))
|
||||
passwd = cur.fetchone()
|
||||
if not passwd:
|
||||
return {'error': True, 'response': 'Wrong user/bnd'}
|
||||
conn = db.connect_db()
|
||||
with Session(conn) as session:
|
||||
try:
|
||||
user = session.query(db.User).filter((db.User.username == server_id)).one()
|
||||
passwd = user.passwd
|
||||
msg = '%s%s%s' % (challenge, server_id, passwd)
|
||||
response = gost34112012256.new(msg.encode('utf-8')[::-1]).digest()[::-1].hex()
|
||||
session.commit()
|
||||
return {'error': False, 'response': response}
|
||||
except Exception:
|
||||
return {'error': True, 'response': 'Wrong user/bnd'}
|
||||
|
||||
|
||||
def auth_challenge():
|
||||
@@ -380,7 +376,7 @@ def run_task(query_type, params, files, url):
|
||||
if query_type == 24:
|
||||
tasks.put(lambda: get_metadata(params, files, url))
|
||||
if query_type == NEW_REPLICATION_REQUEST:
|
||||
tasks.put(lambda: get_metadata(params, files, url))
|
||||
tasks.put(lambda: apply_commits(params, files, url))
|
||||
|
||||
|
||||
def accept(params, files, url):
|
||||
@@ -398,11 +394,16 @@ def onSent(params, files, callback_url):
|
||||
def onDelivered(params, files, callback_url):
|
||||
logger.warning('onDelivered')
|
||||
|
||||
|
||||
def bnd_connected(bnd_name: str):
|
||||
logger.warning(f'{bnd_name} connected')
|
||||
connected.add(bnd_name)
|
||||
|
||||
|
||||
def bnd_disconnected(bnd_name: str):
|
||||
logger.warning(f'{bnd_name} disconnected')
|
||||
if bnd_name in connected:
|
||||
connected.remove(bnd_name)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
Reference in New Issue
Block a user