Fix connections
This commit is contained in:
@@ -16,7 +16,6 @@ import io
|
||||
import zlib
|
||||
import os.path
|
||||
import requests
|
||||
from .reqs_graphql import get_catalog, get_object
|
||||
import xml.etree.ElementTree as ET
|
||||
from .reqs.request_xml_service import RequestXmlService
|
||||
import zipfile
|
||||
@@ -347,6 +346,22 @@ def pika_task():
|
||||
channel_itv.start_consuming()
|
||||
|
||||
|
||||
def status_callback(ch, method, properties, body):
|
||||
status_info = json.loads(body)
|
||||
if status_info['status'] == 'connected':
|
||||
connected.add(status_info['bnd_name'])
|
||||
elif status_info['status'] == 'disconnected':
|
||||
connected.remove(status_info['bnd_name'])
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
def status_task():
|
||||
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
|
||||
channel_status = connection.channel()
|
||||
channel_status.basic_consume(queue=Config.rabbit_status_queue, on_message_callback=status_callback)
|
||||
channel_status.start_consuming()
|
||||
|
||||
|
||||
def pika_itv_callback(ch, method, properties, body):
|
||||
try:
|
||||
data = json.loads(body)
|
||||
@@ -564,6 +579,9 @@ def main():
|
||||
pika_thread = threading.Thread(target=pika_task)
|
||||
pika_thread.start()
|
||||
|
||||
status_thread = threading.Thread(target=status_task)
|
||||
status_thread.start()
|
||||
|
||||
pika_itv_task()
|
||||
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ class Config:
|
||||
rabbit_queue: str = 'ipd'
|
||||
rabbit_incoming_queue: str = 'ipd_queue_replication'
|
||||
rabbit_out_exchange: str = 'ipd_out_itv'
|
||||
rabbit_status_queue: str = 'ipd_status_queue'
|
||||
|
||||
s3_endpoint: str = 'http://10.10.8.83:31006'
|
||||
s3_key_id: str = 's57'
|
||||
|
||||
@@ -1,46 +0,0 @@
|
||||
from gql import gql, Client
|
||||
from gql.transport.aiohttp import AIOHTTPTransport
|
||||
from .config import Config
|
||||
|
||||
transport = AIOHTTPTransport(url=Config.gql_url)
|
||||
service = Config.gql_schema
|
||||
|
||||
|
||||
def get_classifier():
|
||||
client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None)
|
||||
query = gql(
|
||||
"""
|
||||
query getClassifier($name: String!) {
|
||||
getClassifier(name: $name)
|
||||
}
|
||||
"""
|
||||
)
|
||||
result = client.execute(query, variable_values={"name": service}, )
|
||||
return result['getClassifier']
|
||||
|
||||
|
||||
def get_catalog():
|
||||
client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None)
|
||||
query = gql(
|
||||
"""
|
||||
query getCatalog($name: String!) {
|
||||
getCatalog(name: $name)
|
||||
}
|
||||
"""
|
||||
)
|
||||
result = client.execute(query, variable_values={"name": service})
|
||||
return result['getCatalog']
|
||||
|
||||
|
||||
def get_object(oid: str):
|
||||
client = Client(transport=transport, fetch_schema_from_transport=True, execute_timeout=None)
|
||||
query = gql(
|
||||
"""
|
||||
query getObjects($oid: String!, $name: String!) {
|
||||
getObject(name: $name, oid: $oid)
|
||||
}
|
||||
"""
|
||||
)
|
||||
params = {'oid': oid, 'name': service}
|
||||
result = client.execute(query, variable_values=params)
|
||||
return result['getObject']
|
||||
Reference in New Issue
Block a user