Fix connections

This commit is contained in:
ashatora
2024-05-01 12:01:30 +03:00
parent 90d628be01
commit 18be337d34
6 changed files with 73 additions and 41 deletions

10
Dockerfile Normal file
View File

@@ -0,0 +1,10 @@
FROM reg.ivazh.ru/infra-oodb
WORKDIR /app
COPY replication_itv ./replication_itv
COPY requirements.txt ./
RUN pip3 install -r requirements.txt && \
mkdir -p /opt/tnt/bin && \
ln -s /usr/bin/python3 /opt/tnt/bin/python3
ENV LD_LIBRARY_PATH "/app"
ENV PYTHONPATH "${PYTHONPATH}:/app"
CMD ["python3", "-m", "replication_itv"]

11
examples/replication.json Normal file
View File

@@ -0,0 +1,11 @@
{
"files": [],
"url": "http://10.10.8.79:7000/xmlrpc",
"params": {
"from": "tcp://kptsp_vb",
"query_data": "<?xml version=\"1.0\" encoding=\"utf-8\"?><request><header parcel_id=\"990715ba919544a98f22cc7d3b0d9e8d\"/><getCatalog/></request>",
"query_type": 4,
"to": "tcp://bnd127",
"user_id": 0
}
}

View File

@@ -1,4 +1,5 @@
from tempfile import TemporaryDirectory
from json import JSONDecodeError
from tempfile import TemporaryDirectory, gettempdir
from typing import Optional, Any
import pika
@@ -16,7 +17,6 @@ import zlib
import os.path
import requests
from .reqs_graphql import get_catalog, get_object
from pygost import gost34112012256
import xml.etree.ElementTree as ET
from .reqs.request_xml_service import RequestXmlService
import zipfile
@@ -43,8 +43,6 @@ NEW_DATA_RESPONSE = 1097
tasks = Queue()
connected = set()
connection: Optional[pika.BlockingConnection] = None
channel: Optional[Any] = None
server: Optional[SimpleXMLRPCServer] = None
logger = logging.getLogger('xmlrpcserver')
@@ -80,13 +78,6 @@ def get_branch(bndname: str, scheme: str):
return None, None
def run_tasks():
logger.debug('Task thread started.')
while True:
task = tasks.get()
task()
def replication_task():
while True:
conn = db.connect_db()
@@ -358,13 +349,52 @@ def pika_callback(ch, method, properties, body):
def pika_task():
global connection
global channel
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel = connection.channel()
channel.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
channel_itv = connection.channel()
channel_itv.basic_consume(queue=Config.rabbit_queue, on_message_callback=pika_callback)
channel_itv.start_consuming()
channel.start_consuming()
def pika_itv_callback(ch, method, properties, body):
try:
data = json.loads(body)
params = data['params']
url = data['url']
files = []
for file in data['files']:
fn = os.path.join(gettempdir(), uuid4().hex)
download_file(file['url']['name'], file['url']['bucket'], fn)
file['url'] = fn
files.append(file)
run_task(params['query_type'], params, files, url)
except JSONDecodeError as e:
logging.warning(e)
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def pika_itv_task():
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel_receive = connection.channel()
channel_receive.basic_consume(queue=Config.rabbit_incoming_queue, on_message_callback=pika_itv_callback)
channel_receive.start_consuming()
def send_response(params, files, url):
files_s3 = []
for file in files:
fn = uuid4().hex
upload_file(file['url'], fn, Config.s3_bucket)
file['url'] = {'name': fn, 'bucket': Config.s3_bucket}
files_s3.append(file)
data = {
'params': params,
'files': files_s3,
'url': url,
}
connection = pika.BlockingConnection(pika.URLParameters(Config.rabbit_conn))
channel_send = connection.channel()
channel_send.basic_publish(exchange=Config.rabbit_out_exchange, body=json.dumps(data), routing_key='')
def put_object(params, files, url):
@@ -484,8 +514,7 @@ def get_commit(params, files, url):
'query_type': NEW_COMMIT_RESPONSE,
'query_data': ET.tostring(res_doc, encoding='unicode', xml_declaration=True)
}
proxy = ServerProxy(url)
proxy.send(response_params, [], Config.ret_path)
send_response(response_params, [], url)
def query_commits(params, files, url):
@@ -532,29 +561,18 @@ def run_task(query_type, params, files, url):
def main():
global connection
global server
global channel
logger.setLevel(logging.INFO)
logger.warning('Use Control-C to exit')
thread = threading.Thread(target=run_tasks)
thread.start()
replication_thread = threading.Thread(target=replication_task)
replication_thread.start()
pika_thread = threading.Thread(target=pika_task)
pika_thread.start()
try:
logger.warning('Start server')
uvicorn.run(app, host="0.0.0.0", port=8000)
except KeyboardInterrupt:
logger.warning('Exiting')
finally:
server.server_close()
pika_itv_task()
def vers_key(e):

View File

@@ -20,19 +20,14 @@ class Config:
rabbit_conn: str = 'amqp://user:password@10.10.8.83:31005/%2f'
rabbit_queue: str = 'ipd'
ws_rabbit_params: dict = {
'host': '10.10.8.83',
'port': 31005,
'exchange': 'ipd',
'user': 'user',
'password': 'password',
}
rabbit_incoming_queue: str = 'ipd_queue_replication'
rabbit_out_exchange: str = 'ipd_out_itv'
s3_endpoint: str = 'http://10.10.8.83:31006'
s3_key_id: str = 's57'
s3_access_key: str = 'd9MMinLF3U8TLSj'
s3_bucket: str = 'files'
s3_bucket_itv: str = 'itv'
gql_url: str = 'https://gql.ivazh.ru/graphql'
gql_download: str = 'https://gql.ivazh.ru/item/{key}'

View File

@@ -2,7 +2,7 @@ from datetime import datetime
from typing import List, Optional
from sqlalchemy import create_engine, String, select, ForeignKey, Enum
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column, relationship
from config import Config
from .config import Config
def tow(day: int, hour: int, minute: int):

View File

@@ -1,4 +1,3 @@
--find-links=deps
aiohttp==3.8.4
aiosignal==1.3.1
annotated-types==0.5.0
@@ -25,7 +24,6 @@ pika-stubs==0.1.3
psycopg==3.1.10
pydantic==2.3.0
pydantic_core==2.6.3
pygost==5.12
python-dateutil==2.8.2
python-multipart==0.0.6
requests==2.31.0