From 1ebdfd85b302ce863241a74bb74bcfdcb3df2f4b Mon Sep 17 00:00:00 2001 From: ashatora Date: Sat, 18 May 2024 19:53:57 +0300 Subject: [PATCH] Fix connections --- replication_itv/__main__.py | 20 +++++++++++++- replication_itv/config.py | 1 + replication_itv/reqs_graphql.py | 46 --------------------------------- 3 files changed, 20 insertions(+), 47 deletions(-) delete mode 100644 replication_itv/reqs_graphql.py diff --git a/replication_itv/__main__.py b/replication_itv/__main__.py index 7f9317b..14b3cb9 100644 --- a/replication_itv/__main__.py +++ b/replication_itv/__main__.py @@ -16,7 +16,6 @@ import io import zlib import os.path import requests -from .reqs_graphql import get_catalog, get_object import xml.etree.ElementTree as ET from .reqs.request_xml_service import RequestXmlService import zipfile @@ -347,6 +346,22 @@ def pika_task(): channel_itv.start_consuming() +def status_callback(ch, method, properties, body): + status_info = json.loads(body) + if status_info['status'] == 'connected': + connected.add(status_info['bnd_name']) + elif status_info['status'] == 'disconnected': + connected.remove(status_info['bnd_name']) + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def status_task(): + connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn)) + channel_status = connection.channel() + channel_status.basic_consume(queue=Config.rabbit_status_queue, on_message_callback=status_callback) + channel_status.start_consuming() + + def pika_itv_callback(ch, method, properties, body): try: data = json.loads(body) @@ -564,6 +579,9 @@ def main(): pika_thread = threading.Thread(target=pika_task) pika_thread.start() + status_thread = threading.Thread(target=status_task) + status_thread.start() + pika_itv_task() diff --git a/replication_itv/config.py b/replication_itv/config.py index cb8e6fb..4ca3554 100644 --- a/replication_itv/config.py +++ b/replication_itv/config.py @@ -22,6 +22,7 @@ class Config: rabbit_queue: str = 'ipd' rabbit_incoming_queue: str = 'ipd_queue_replication' rabbit_out_exchange: str = 'ipd_out_itv' + rabbit_status_queue: str = 'ipd_status_queue' s3_endpoint: str = 'http://10.10.8.83:31006' s3_key_id: str = 's57' diff --git a/replication_itv/reqs_graphql.py b/replication_itv/reqs_graphql.py deleted file mode 100644 index 65d8ee3..0000000 --- a/replication_itv/reqs_graphql.py +++ /dev/null @@ -1,46 +0,0 @@ -from gql import gql, Client -from gql.transport.aiohttp import AIOHTTPTransport -from .config import Config - -transport = AIOHTTPTransport(url=Config.gql_url) -service = Config.gql_schema - - -def get_classifier(): - client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None) - query = gql( - """ - query getClassifier($name: String!) { - getClassifier(name: $name) - } - """ - ) - result = client.execute(query, variable_values={"name": service}, ) - return result['getClassifier'] - - -def get_catalog(): - client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None) - query = gql( - """ - query getCatalog($name: String!) { - getCatalog(name: $name) - } - """ - ) - result = client.execute(query, variable_values={"name": service}) - return result['getCatalog'] - - -def get_object(oid: str): - client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None) - query = gql( - """ - query getObjects($oid: String!, $name: String!) { - getObject(name: $name, oid: $oid) - } - """ - ) - params = {'oid': oid, 'name': service} - result = client.execute(query, variable_values=params) - return result['getObject']