commit 54fa5892619787b45fe39ffe83745464ef8a4811 Author: Ivan Vazhenin Date: Sun Mar 12 16:40:33 2023 +0300 First commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..250d350 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +*.pyc +.idea +*.log +.ropeproject +*.swp +*.pid +build/ +dist/ +enserver.egg-info/ diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000..a64cf34 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,50 @@ +stages: + - init + - build + - store + +include: + - project: 'kt_asu_gis/infrastructure/easo-infra' + file: '/buildjob.yml' + +set_version: + extends: .default_set_version + +build:astra1.6: + stage: build + image: rep.gisdemo.ru/infra/astra16se:latest + tags: [docker] + only: + - easo/astra + before_script: + - apt update && apt install -y python-all wget python-pip gettext fakeroot debhelper python-argparse python-psycopg2 + - pip install stdeb + - pip install -r requirements.txt + script: + - '[ -f "VERSION" ] && export VERSION=`cat VERSION`-$(date "+%Y%m%d-%H%M%S")' + - '[ ! -f "VERSION" ] && export VERSION="2.3.7"-$(date "+%Y%m%d-%H%M%S")' + - 'echo "Version: $VERSION"' + - python setup.py --command-packages=stdeb.command bdist_deb + - mv ./deb_dist/*.deb . + artifacts: + paths: + - '*.deb' + expire_in: 10 hours + +store:astra1.6: + stage: store + image: rep.gisdemo.ru/infra/astra16se:latest + tags: [docker] + variables: + DEB_REPO: https://nexus.sitronics-kt.dev/repository/easo-ast16/ + needs: + - build:astra1.6 + dependencies: + - build:astra1.6 + only: + - easo/astra + script: + - for pack in `ls *.deb`; do + echo "Uploading ${pack}..."; + curl -k -u "${NEXUS_USER}:${NEXUS_PASS_ASTRA}" -H "Content-Type:multipart/form-data" --data-binary @$pack $DEB_REPO; + done; diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..37444c7 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.8-buster + +COPY requirements.txt /tmp/ + +RUN pip install -r /tmp/requirements.txt --no-cache-dir + +VOLUME /opt/enserver + +RUN mkdir -p /share/store/enserver + +COPY enserver /opt/enserver + +WORKDIR /opt/enserver + +EXPOSE 7000 +EXPOSE 7001 + +CMD python enserver.py --debug --port=7001 --pidfile=/tmp/enserver.pid --logfile=/var/log/enserver.log --id=prilvbox --address=enserv --storage=/share/store/enserver --db-host=10.10.8.83 --db-port=32101 --host=10.10.78.40 --db-user=postgres --db-passwd=Root12345678 --auth-provider=http://10.10.8.81:9000/ --replication-provider=http://10.10.8.81:9000/ http://10.10.8.81:9000/ http://10.10.8.81:9000/ diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..5012eee --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,7 @@ +recursive-include debian * +recursive-include data * +recursive-include enserver/locale * +recursive-include enserver/worker * +recursive-include enserver/parcel * +recursive-include enserver/migrations * +include VERSION diff --git a/README.txt b/README.txt new file mode 100644 index 0000000..593cdd2 --- /dev/null +++ b/README.txt @@ -0,0 +1 @@ +EN-Server diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..51bd855 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +2.3.7 \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100755 index 0000000..19230e3 --- /dev/null +++ b/build.sh @@ -0,0 +1,12 @@ +#!/bin/bash +./pre_build.sh + +if [ -f /etc/debian_version ]; then + echo 'Сборка deb-пакета' + python setup.py --command-packages=stdeb.command bdist_deb +else + echo 'Сборка rpm-пакета' + python setup.py bdist_rpm +fi +exit 0 + diff --git a/changelog.txt b/changelog.txt new file mode 100644 index 0000000..facbb63 --- /dev/null +++ b/changelog.txt @@ -0,0 +1,10 @@ +enserver-2.1.0 +[*] Ускорено прохождение посылок в пределах центра (на адрес local). +[+] Системная реализация очередей расширена сохранением элементов в базу данных. +[-] Удалены не используемые более функции сохранения посылок в shelve-файл. + +enserver-2.0.0 +[+] Передача посылок в пределах центра (на адрес local). +[+] Установка и поддержка tcp соединения с вышестоящим центром +[+] Передача посылок по tcp. +[+] Посылки хранятся в файле формата shelve. \ No newline at end of file diff --git a/data/astra/default/enserver b/data/astra/default/enserver new file mode 100644 index 0000000..3336b0b --- /dev/null +++ b/data/astra/default/enserver @@ -0,0 +1,34 @@ +# path to enserver main file +ENSERVER="/usr/lib/python2.7/dist-packages/enserver/enserver.py" +PIDFILE="/var/run/apsh/enserver.pid" +# logging options +LOGFILE="/var/log/apsh/enserver/enserver.log" +# log level {DEBUG,INFO,WARNING,ERROR,CRITICAL} +LOGLEVEL="INFO" +# DB options +DB_HOST="pgsql" +DB_PORT="5432" +DB_NAME="enserver" +DB_USER="db_apsh" +DB_PASSWD="12345678" +# path to enserver storage +STORAGE="/share/store/enserver/storage" +# local enserver id +SERVERID="enserver_astra" +# local ip-address +ADDRESS="enserv" +# providers options +AUTH_PROVIDER="http://apsh@userv:8008/xmlrpc" +# subscribes options +SUBSCRIBER_USERVER="http://userv:8008/xmlrpc" +SUBSCRIBER_CDSERVER="http://cdserv:8080/Transport" +# connection options +TCP_PORT=7001 +TCP_TIMEOUT=90 +DISCONNECT_TIMEOUT="7" +CDSERVER_REPLICATION="http://cdserv:8080/Replication" +REPLICATION_PROVIDER="$CDSERVER_REPLICATION" + +# required for parent bank - uncomment next line and add to unit file param '--host ${HOST}' +# unit file path: /etc/systemd/system/enserver.service +#HOST="bnd" diff --git a/data/astra/logrotate.d/enserver b/data/astra/logrotate.d/enserver new file mode 100644 index 0000000..34b8181 --- /dev/null +++ b/data/astra/logrotate.d/enserver @@ -0,0 +1,8 @@ +/var/log/apsh/enserver.log { + size 10M + rotate 10 + notifempty + compress + copytruncate + delaycompress +} diff --git a/data/astra/systemd/system/enserver.service b/data/astra/systemd/system/enserver.service new file mode 100644 index 0000000..0528f58 --- /dev/null +++ b/data/astra/systemd/system/enserver.service @@ -0,0 +1,22 @@ +# Contents of /etc/systemd/system/enserver.service +[Unit] +Description=En-Server +After=network.target + +[Service] +CapabilitiesParsec=PARSEC_CAP_PRIV_SOCK +AmbientCapabilities=CAP_IPC_LOCK +CapabilityBoundingSet=CAP_IPC_LOCK +Type=simple +User=apsh +Group=apsh +EnvironmentFile=/etc/default/enserver +Restart=always +RestartSec=30s +PIDFile=/var/run/apsh/enserver.pid +WorkingDirectory=/usr/lib/python2.7/dist-packages/enserver/ +ExecStart=/usr/bin/python ${ENSERVER} --debug --id=${SERVERID} --loglevel ${LOGLEVEL} --logfile ${LOGFILE} --pidfile +${PIDFILE} --address ${ADDRESS} --port ${TCP_PORT} --tcp-timeout ${TCP_TIMEOUT} --db-name ${DB_NAME} --db-user ${DB_USER} --db-port ${DB_PORT} --db-host ${DB_HOST} --db-passwd ${DB_PASSWD} --storage ${STORAGE} --auth-provider ${AUTH_PROVIDER} --disconnect_timeout ${DISCONNECT_TIMEOUT} --replication-provider ${REPLICATION_PROVIDER} ${SUBSCRIBER_USERVER} ${SUBSCRIBER_CDSERVER} + +[Install] +WantedBy=multi-user.target diff --git a/data/zarya/init.d/enserver b/data/zarya/init.d/enserver new file mode 100755 index 0000000..6d1a213 --- /dev/null +++ b/data/zarya/init.d/enserver @@ -0,0 +1,92 @@ +#!/bin/sh +# +# enserver Start/Stop the EN-Server daemon +# +# chkconfig: 345 96 10 +# description: Apsheronsk transport system + +### BEGIN INIT INFO +# Provides: enserver +# Required-Start: $local_fs $network +# Required-Stop: $local_fs $network +# Short-Description: enserver daemon +# Description: Apsheronsk transport system +### END INIT INFO + +# Source function library. +. /etc/rc.d/init.d/functions + +#set -e +# This is an interactive program, we need the current locale +[ -f /etc/profile.d/lang.sh ] && . /etc/profile.d/lang.sh + +if [ -f /etc/sysconfig/enserver ]; then + . /etc/sysconfig/enserver +fi + +# configuration parameters +PYTHON=${PYTHON:-/usr/bin/python2.6} +ENSERVER=${ENSERVER:-/usr/lib/python2.6/site-packages/enserver/enserver.py} +USER=${USER:-apsh} +PIDFILE=${PIDFILE:-/var/run/apsh/enserver.pid} +STOP_TIMEOUT=${STOP_TIMEOUT:-10} +HTTP_PORT=${HTTP_PORT:-7000} +TCP_PORT=${TCP_PORT:-7001} +LOGLEVEL=${LOGLEVEL:+--debug} +LOGFILE=${LOGFILE:-/var/log/apsh/enserver.log} +STORAGE=${STORAGE:-/share/store/enserver} +PARENT_HOST=${HOST:+--host=$HOST} +ADDRESS=${ADDRESS:-enserv} +RETVAL=0 + +OPTIONS="$LOGLEVEL --logfile=$LOGFILE --port=$TCP_PORT --pidfile=$PIDFILE --storage=$STORAGE --address=$ADDRESS $PARENT_HOST $OPTIONS" +DAEMON="--daemon" + +start() { + echo -n $"Starting enserver: " + + LANG="C" ss -pltn | grep ":${HTTP_PORT}[ \t]\+" > /dev/null && P1=0 || P1=1 + LANG="C" ss -pltn | grep ":${TCP_PORT}[ \t]\+" > /dev/null && P2=0 || P2=1 + + if [ $P1 -eq 0 ] || [ $P2 -eq 0 ]; then + echo -n $"port already in use" + RETVAL=1 + else + daemon --user=${USER} --pidfile=${PIDFILE} $PYTHON $ENSERVER $DAEMON $OPTIONS + RETVAL=$? + fi + echo +} + +stop() { + echo -n $"Stopping enserver: " + killproc -p ${PIDFILE} -d ${STOP_TIMEOUT} $PYTHON + RETVAL=$? + echo +} + +case "$1" in +start) + start + ;; +constart) + DAEMON="" + start + ;; +stop) + stop + ;; +restart) + stop + start + ;; +status) + status -p ${PIDFILE} $PYTHON + RETVAL=$? + ;; +*) + echo $"Usage: $0 {start|stop|status|restart}" + RETVAL=2 +esac + +exit $RETVAL diff --git a/data/zarya/logrotate.d/enserver b/data/zarya/logrotate.d/enserver new file mode 100644 index 0000000..b691e96 --- /dev/null +++ b/data/zarya/logrotate.d/enserver @@ -0,0 +1,8 @@ +/var/log/apsh/enserver.log { + size 10M + rotate 10 + notifempty + compress + copytruncate + delaycompress +} diff --git a/data/zarya/scripts/install.spec.inc b/data/zarya/scripts/install.spec.inc new file mode 100644 index 0000000..35626b6 --- /dev/null +++ b/data/zarya/scripts/install.spec.inc @@ -0,0 +1,4 @@ +python setup.py install -O1 --root=$RPM_BUILD_ROOT --record=INSTALLED_FILES + +sed -ri '\@^/etc/@ {\@/init.d/@! s@.+@%config(noreplace) \0@}' INSTALLED_FILES + diff --git a/data/zarya/scripts/postinstall.sh b/data/zarya/scripts/postinstall.sh new file mode 100644 index 0000000..fe17c06 --- /dev/null +++ b/data/zarya/scripts/postinstall.sh @@ -0,0 +1,16 @@ +#!/bin/sh -e + +CONF="/etc/sysconfig/enserver" +if [ -f ${CONF} ]; then + . ${CONF} +fi + +USER=${USER-"apsh"} +GROUP=${GROUP-"apsh"} +DBNAME=${DBNAME-"enserver"} +STORAGE=${STORAGE-"/share/store/enserver"} +LOGDIR=${LOGDIR-"/share/store"} + + +mkdir -p ${STORAGE} ${LOGDIR} +chown ${USER}:${GROUP} ${STORAGE} ${LOGDIR} diff --git a/data/zarya/sysconfig/enserver b/data/zarya/sysconfig/enserver new file mode 100644 index 0000000..15f5680 --- /dev/null +++ b/data/zarya/sysconfig/enserver @@ -0,0 +1,23 @@ +PYTHON="/usr/bin/python2.6" +USER="apsh" +PIDFILE="/var/run/apsh/enserver.pid" +LOGFILE="/var/log/apsh/enserver.log" +LOGLEVEL="DEBUG" +STORAGE="/share/store/enserver" +SERVERID="bnd" +ADDRESS="enserv" +DBHOST="pgsql" +DBPORT="5432" +DBUSER="apsh" +DBPASSWD="apsh" +DISCONNECT_TIMEOUT="7" +#required for parent bank - uncomment next line +#HOST="parent_bank" +CDSERVER="http://cdserv:8080/Transport" +CDSERVER_REPLICATION="http://cdserv:8080/Replication" +USERVER="http://userv:8008/xmlrpc" +VSERVER="http://vserv:10110/enclient,local://vis" +AUTH_PROVIDER="$USERVER" +REPLICATION_PROVIDER="$CDSERVER_REPLICATION" +SUBSCRIBERS="$USERVER $CDSERVER $VSERVER" +OPTIONS="--id=$SERVERID --db-host=$DBHOST --db-port=$DBPORT --db-user=$DBUSER --db-passwd=$DBPASSWD --disconnect_timeout=$DISCONNECT_TIMEOUT --auth-provider=$AUTH_PROVIDER --replication-provider=$REPLICATION_PROVIDER $SUBSCRIBERS" diff --git a/debian/enserver.prerm b/debian/enserver.prerm new file mode 100644 index 0000000..62a2a46 --- /dev/null +++ b/debian/enserver.prerm @@ -0,0 +1,5 @@ +#!/bin/sh + +service enserver stop || true + +exit 0 diff --git a/enserver/__init__.py b/enserver/__init__.py new file mode 100644 index 0000000..164b12c --- /dev/null +++ b/enserver/__init__.py @@ -0,0 +1 @@ +__author__ = 'lex' \ No newline at end of file diff --git a/enserver/conf.py b/enserver/conf.py new file mode 100644 index 0000000..81f4bd3 --- /dev/null +++ b/enserver/conf.py @@ -0,0 +1,109 @@ +""" +EN-Server settings +""" +import argparse +import os + + +class LazySettings(object): + __slots__ = ( + 'address', 'auth_provider', 'daemon', 'db_host', 'db_name', 'db_passwd', 'db_port', 'db_user', + 'loglevel', 'host', 'http_host', 'http_port', 'id', 'ims_dir', 'ims_edit', 'ims_encode', 'ims_host', + 'ims_limit', 'ims_passwd', 'ims_redirect', 'ims_send', 'ims_user', 'initialized', 'locals', + 'logfile', 'pidfile', 'port', 'project_path', 'static_path', 'static_url', 'storage', 'subscribers', + 'tcp_bufsize', 'tcp_host', 'tcp_ping', 'tcp_port', 'tcp_timeout', 'xmlrpc_path', 'xmlrpc_url', 'debug', + 'replication_provider', 'disconnect_timeout' + ) + + def __init__(self): + self.initialized = False + + def __getattr__(self, item): + if not self.initialized: + self._setup() + return getattr(self, item) + + def _setup(self): + self.initialized = True + + self.project_path = os.path.realpath(os.path.dirname(__file__)) + self.static_path = '/out' + self.xmlrpc_path = '/xmlrpc' + + args = self._parse_arguments() + for setting in dir(args): + if setting.startswith('_'): + continue + setattr(self, setting, getattr(args, setting)) + self.locals = {'local': []} + self._process_subscribers() + + self.static_url = 'http://%s:%s%s' % (self.address, self.http_port, self.static_path) + self.xmlrpc_url = 'http://%s:%s%s' % (self.address, self.http_port, self.xmlrpc_path) + + def _parse_arguments(self): + parser = argparse.ArgumentParser( + description='EN-Server', + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument('--daemon', action='store_true', help='detach from console') + parser.add_argument('--loglevel', help='set logging level', default=None, + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) + parser.add_argument('--debug', help='set logging level to DEBUG', action='store_true') + parser.add_argument('--logfile', help='path to log file', default=os.path.join(self.project_path, 'enserver.log')) + parser.add_argument('--pidfile', help='path to pid file', default=os.path.join(self.project_path, 'enserver.pid')) + parser.add_argument('--storage', help='path storage dir', default=os.path.join(self.project_path, 'store')) + parser.add_argument('--address', help='external address', default='127.0.0.1') + parser.add_argument('--auth-provider', help='authentication provider url') + parser.add_argument('--replication-provider', help='replication provider url', default='http://cdserv:8080/Replication') + parser.add_argument('--disconnect_timeout', help='timeout before clear replication record, in days', default=7) + parser.add_argument('--id', help='server identifier', required=True) + parser.add_argument('--host', help='tcp uplink address') + parser.add_argument('--port', help='tcp uplink port', type=int) + parser.add_argument('--http-host', help='http listener bind address', default='0.0.0.0', metavar='HOST') + parser.add_argument('--http-port', help='http listener bind port', type=int, default=7000, metavar='PORT') + parser.add_argument('--tcp-host', help='tcp listener bind address', default='0.0.0.0', metavar='HOST') + parser.add_argument('--tcp-port', help='tcp listener bind port', type=int, default=7001, metavar='PORT') + parser.add_argument('--tcp-timeout', help='tcp socket timeout', type=float, default=5.0, metavar='SEC') + parser.add_argument('--tcp-ping', help='tcp ping interval', type=float, default=1.0, metavar='SEC') + parser.add_argument('--tcp-bufsize', help='tcp transfer buffer', type=int, default=65536, metavar='SIZE') + parser.add_argument('--db-name', help='database name', default='db') + parser.add_argument('--db-host', help='database host') + parser.add_argument('--db-port', help='database port', type=int, default=32101) + parser.add_argument('--db-user', help='database user', default='postgres') + parser.add_argument('--db-passwd', help='database password', default='Root12345678') + + parser.add_argument('--ims-redirect', help='redirect ims parcels to', required=False) + parser.add_argument('--ims-send', help='path to imssendmsg utility', required=False) + parser.add_argument('--ims-edit', help='path to imsmsged utility', required=False) + parser.add_argument('--ims-user', help='ims username', required=False) + parser.add_argument('--ims-passwd', help='ims password (generated with imspasswd utility)', required=False) + parser.add_argument('--ims-host', help='ims server address', default='127.0.0.1') + parser.add_argument('--ims-dir', help='ims exchange directory', default=os.path.join(self.project_path, 'soek')) + parser.add_argument('--ims-limit', help='ims message size limit in bytes', type=int, default=1048576) + parser.add_argument('--ims-encode', help='ims command-line encoding', default='koi8-r') + + parser.add_argument('subscribers', help='possible parcel acceptors', metavar='ADDR', nargs='+') + args = parser.parse_args() + + ims_creds = ('ims_send', 'ims_edit', 'ims_user', 'ims_passwd') + ims_creds_all = all(map(lambda a: getattr(args, a), ims_creds)) + ims_creds_any = any(map(lambda a: getattr(args, a), ims_creds)) + if ims_creds_any and not ims_creds_all: + parser.error('must be given together: --ims-send, --ims-edit, --ims-user, --ims-passwd') + if ims_creds_any and args.ims_redirect: + parser.error('--ims-redirect nullifies other --ims-* parameters') + + return args + + def _process_subscribers(self): + for i, sb in enumerate(self.subscribers): + p = sb.rsplit(',', 1) + if len(p) > 1: + self.locals.setdefault(p[1], []).append(p[0]) + self.subscribers[i] = p[0] + else: + self.locals['local'].append(sb) + + +settings = LazySettings() diff --git a/enserver/enserver.py b/enserver/enserver.py new file mode 100644 index 0000000..541803b --- /dev/null +++ b/enserver/enserver.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python +# +# EN-Server daemon +# + +import logging +import os +import signal +import sys +import time + +sys.path.append(sys.path[0]) + +import daemon +from conf import settings +from lib import translation +from migrations.migrations import make_migrations +from parcel.utils import get_db_connection +from worker.audit import Audit +from worker.downloader import ParcelDownloader +from worker.manager import Manager +from worker.postman import Postman +from worker.soek import IMSWorker +from worker.tcp import TCPClientWorker, TCPServerWorker +from worker.xmlrpc import HttpWorker + + +def setup_logging(): + levels = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL + } + + log_to_console = not settings.daemon + logger = logging.getLogger('enserver') + + if not settings.loglevel and settings.debug: + loglevel = logging.DEBUG + elif settings.loglevel: + loglevel = levels[settings.loglevel] + else: + loglevel = logging.WARNING + + logger.setLevel(loglevel) + formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s') + if log_to_console: + ch = logging.StreamHandler() + ch.setLevel(loglevel) + ch.setFormatter(formatter) + logger.addHandler(ch) + fh = logging.FileHandler(settings.logfile) + fh.setLevel(loglevel) + fh.setFormatter(formatter) + logger.addHandler(fh) + + +workers = [] + + +def stop_workers(threads): + for worker in threads: + worker.join() + + +def start_workers(): + translation.activate(os.path.join(settings.project_path, 'locale', 'enserver.mo')) + setup_logging() + log = logging.getLogger('enserver.main') + + with open(settings.pidfile, 'w') as f: + f.write('%s' % os.getpid()) + + tcp_server = TCPServerWorker() + global workers + workers = [ + Manager(), + Postman(), + ParcelDownloader(), + tcp_server, + HttpWorker(), + Audit(), + ] + + if settings.host and settings.port: + uplink_worker = TCPClientWorker() + workers.append(uplink_worker) + if settings.ims_user: + ims_worker = IMSWorker() + workers.append(ims_worker) + for worker in workers: + worker.start() + + while True: + if any(not worker.is_alive() for worker in workers): + dead_workers = ','.join(worker.__class__.__name__ + for worker in workers + if not worker.is_alive()) + log.critical('Dead workers: %s, shutting down...', + dead_workers) + stop_workers(workers) + exit(1) + tcp_server.process_disconnected() + time.sleep(0.1) + + +def main(): + + def on_exit(signal, frame): + stop_workers(workers) + os.unlink(settings.pidfile) + sys.exit(0) + + make_migrations(get_db_connection().cursor()) + + if settings.daemon: + sys.stdout.flush() + sys.stderr.flush() + context = daemon.DaemonContext(working_directory=settings.project_path, detach_process=True) + context.signal_map = { + signal.SIGTERM: on_exit + } + with context: + start_workers() + else: + signal.signal(signal.SIGINT, on_exit) + signal.signal(signal.SIGTERM, on_exit) + start_workers() + + +if __name__ == '__main__': + main() diff --git a/enserver/lib/http_utils.py b/enserver/lib/http_utils.py new file mode 100644 index 0000000..6a3042b --- /dev/null +++ b/enserver/lib/http_utils.py @@ -0,0 +1,26 @@ +from xmlrpc.server import SimpleXMLRPCDispatcher, SimpleXMLRPCRequestHandler +from socketserver import ThreadingTCPServer + +try: + import fcntl +except ImportError: + fcntl = None + + +class ThreadingXMLRPCServer(ThreadingTCPServer, SimpleXMLRPCDispatcher): + allow_reuse_address = True + + def __init__(self, addr, requestHandler=SimpleXMLRPCRequestHandler, + logRequests=True, allow_none=False, encoding=None, bind_and_activate=True): + self.logRequests = logRequests + + SimpleXMLRPCDispatcher.__init__(self, allow_none, encoding) + ThreadingTCPServer.__init__(self, addr, requestHandler, bind_and_activate) + + # [Bug #1222790] If possible, set close-on-exec flag; if a + # method spawns a subprocess, the subprocess shouldn't have + # the listening socket open. + if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): + flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) + flags |= fcntl.FD_CLOEXEC + fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) diff --git a/enserver/lib/threads.py b/enserver/lib/threads.py new file mode 100644 index 0000000..ba3a2dd --- /dev/null +++ b/enserver/lib/threads.py @@ -0,0 +1,39 @@ +import logging +import threading +import time + + +class Worker(threading.Thread): + def __init__(self, shift_interval=0): + self.shift_interval = shift_interval + self.stop_event = threading.Event() + self.log = getattr(self, 'log', None) or logging.getLogger('enserver.thread') + super(Worker, self).__init__() + + def join(self, timeout=None): + self.stop_event.set() + super(Worker, self).join(timeout=timeout) + + def run(self): + self.log.info('Starting...') + self.before_shifts() + while not self.stop_event.is_set(): + try: + self.shift() + except Exception as exc: + self.log.exception('Died! Exception: %s', exc) + raise exc + + if self.shift_interval: + time.sleep(self.shift_interval) + self.after_shifts() + self.log.info('Bye!') + + def before_shifts(self): + pass + + def shift(self): + raise NotImplemented + + def after_shifts(self): + pass diff --git a/enserver/lib/translation.py b/enserver/lib/translation.py new file mode 100644 index 0000000..0e3329f --- /dev/null +++ b/enserver/lib/translation.py @@ -0,0 +1,15 @@ +import gettext + +trans = gettext.NullTranslations() + + +def activate(path): + global trans + with open(path, 'rb') as mo: + trans = gettext.GNUTranslations(mo) + trans.install() + return trans + + +def ugettext(message): + return trans.gettext(message) diff --git a/enserver/locale/enserver.mo b/enserver/locale/enserver.mo new file mode 100644 index 0000000..1c0cc22 Binary files /dev/null and b/enserver/locale/enserver.mo differ diff --git a/enserver/locale/enserver.po b/enserver/locale/enserver.po new file mode 100644 index 0000000..8cd5efe --- /dev/null +++ b/enserver/locale/enserver.po @@ -0,0 +1,36 @@ +msgid "" +msgstr "" +"Project-Id-Version: En-Server\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2014-05-16 14:27+0400\n" +"PO-Revision-Date: 2014-05-16 19:38+0300\n" +"Last-Translator: Alexander Lazarev \n" +"Language-Team: LANGUAGE \n" +"Language: ru_RU\n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=UTF-8\n" +"Content-Transfer-Encoding: 8bit\n" + +msgid "Authentication is unavailable for user %s" +msgstr "Невозможна аутентификация для учетной записи %s" + +msgid "Authentication server is unavailable" +msgstr "Недоступен сервер аутентификации" + +msgid "Authentication error when connecting to %s" +msgstr "Ошибка аутентификации при подключении к %s" + +msgid "Error when authenticating client %s" +msgstr "Ошибка аутентификации клиента %s" + +msgid "Unable to connect to %s" +msgstr "Невозможно установить соединение с %s" + +msgid "Disconnected from %s" +msgstr "Разорвано соединение с %s" + +msgid "Protocol error when connecting to %s" +msgstr "Ошибка протокола соединения с %s" + +msgid "Successfully connected to %s" +msgstr "Установлено соединение с %s" \ No newline at end of file diff --git a/enserver/migrations/migrations.py b/enserver/migrations/migrations.py new file mode 100644 index 0000000..94c11a3 --- /dev/null +++ b/enserver/migrations/migrations.py @@ -0,0 +1,182 @@ +import logging +import pickle + +from parcel.constants import ARCHIVE +from parcel.utils import execute_parsed_query, parse_parcel + +log = logging.getLogger('enserver.queue') + + +def make_migrations(cursor): + """ + Initial table -> parcels (queue_id, id, data, added, updated); + Migration 1: package_size, file_size columns added -> + parcels (queue_id, id, data, added, updated, package_size, file_size); + Migration 2: query_type, from, to, user_id removed from data column and added as separate columns -> + parcels (queue_id, id, data, added, updated, package_size, file_size, query_type, from, to, user_id); + """ + create_actual_tables(cursor) + size_migrated = check_size_fields_migration(cursor) + filtering_migrated = check_filtering_fields_migration(cursor) + if not (size_migrated and filtering_migrated): + actualize_db_data(cursor) + + +def create_actual_tables(cursor): + if not table_exist(cursor, 'parcels'): + table_create(cursor, 'parcels') + if not table_exist(cursor, 'last_connect'): + table_create_last_connect(cursor) + if not table_exist(cursor, 'known_connect'): + table_create_known_connect(cursor) + + +def table_exist(cursor, table_name): + cmd = "select exists(select * from information_schema.tables where table_name='%s')" % table_name + cursor.execute(cmd) + return cursor.fetchone()[0] + + +def table_create(cursor, table_name): + log.info('Creating %s table' % table_name) + cmd = "CREATE TABLE %s (" \ + "queue_id CHARACTER VARYING NOT NULL," \ + "id CHARACTER VARYING(32) PRIMARY KEY, " \ + "data BYTEA," \ + "added TIMESTAMP DEFAULT current_timestamp," \ + "updated TIMESTAMP DEFAULT current_timestamp," \ + "package_size BIGINT NOT NULL DEFAULT 0," \ + "file_size BIGINT NOT NULL DEFAULT 0," \ + "\"from\" CHARACTER VARYING NOT NULL DEFAULT ''," \ + "\"to\" CHARACTER VARYING NOT NULL DEFAULT ''," \ + "query_type INTEGER NOT NULL DEFAULT 0," \ + "user_id CHARACTER VARYING" \ + ")" % table_name + cursor.execute(cmd) + + +def table_create_last_connect(cursor): + log.info('Creating table last_connect') + cmd = "CREATE TABLE last_connect (" \ + "queue_id CHARACTER VARYING," \ + "last_update TIMESTAMP DEFAULT current_timestamp," \ + "is_client BOOLEAN NOT NULL," \ + "need_replication BOOLEAN NOT NULL DEFAULT FALSE," \ + "PRIMARY KEY(queue_id, is_client)" \ + ")" + cursor.execute(cmd) + + +def table_create_known_connect(cursor): + log.info('Creating table last_connect') + cmd = "CREATE TABLE known_connect (" \ + "queue_id CHARACTER VARYING," \ + "is_client BOOLEAN NOT NULL," \ + "PRIMARY KEY(queue_id, is_client)" \ + ")" + cursor.execute(cmd) + + +def check_size_fields_migration(cursor): + log.info('Creating size fields for audit messages') + sql = """ + DO $$ + BEGIN + ALTER TABLE parcels ADD COLUMN package_size bigint NOT NULL DEFAULT 0; + ALTER TABLE parcels ADD COLUMN file_size bigint NOT NULL DEFAULT 0; + EXCEPTION + WHEN duplicate_column THEN RAISE EXCEPTION 'column already exists in parcels'; + END; + $$; + """ + try: + cursor.execute(sql) + + except Exception: + log.info('Size fields already exist') + return True + + return False + + +def check_filtering_fields_migration(cursor): + log.info('Creating query_type, from, to, user_id columns in parcels table') + sql = """ + DO $$ + BEGIN + ALTER TABLE parcels ADD COLUMN query_type INTEGER NOT NULL DEFAULT 0; + ALTER TABLE parcels ADD COLUMN \"from\" CHARACTER VARYING NOT NULL DEFAULT ''; + ALTER TABLE parcels ADD COLUMN \"to\" CHARACTER VARYING NOT NULL DEFAULT ''; + ALTER TABLE parcels ADD COLUMN user_id CHARACTER VARYING; + EXCEPTION + WHEN duplicate_column THEN RAISE EXCEPTION 'column already exists in parcels'; + END; + $$; + """ + try: + cursor.execute(sql) + + except Exception: + log.info('Fields for filtering already exist') + return True + + return False + + +def actualize_db_data(cursor): + chunk_size = 100 + parcels = get_parcels_for_migration(cursor, (ARCHIVE,)) + sql = """ + UPDATE + parcels + SET + package_size = parsed_parcel.package_size, + file_size = parsed_parcel.file_size, + data = parsed_parcel.parcel_data, + query_type = parsed_parcel.query_type, + \"from\" = parsed_parcel.parcel_from, + \"to\" = parsed_parcel.parcel_to, + user_id = parsed_parcel.user_id + FROM + (VALUES %s) AS parsed_parcel( + id, package_size, file_size, parcel_data, query_type, parcel_from, parcel_to, user_id + ) + WHERE parcels.id = parsed_parcel.id + """ + + try: + tmp_list = [] + for number, parcel in enumerate(parcels): + tmp_list.append(parcel) + if number % chunk_size == 0 and number != 0: + execute_parsed_query(sql, tmp_list, cursor) + tmp_list = [] + if len(tmp_list) != 0: + execute_parsed_query(sql, tmp_list, cursor) + + except Exception as ex: + log.error('Error while migrating fields for filtering') + log.exception(ex) + raise + + +def get_parcels_for_migration(cursor, queue_ids_to_exclude): + cmd_list = """ + SELECT id, data, queue_id + FROM parcels + WHERE queue_id NOT IN %s + """ + cursor.execute(cmd_list, (queue_ids_to_exclude, )) + + parcels = [] + for _id, data, queue_id in cursor.fetchall(): + parcel = pickle.loads(data) + parsed_data = parse_parcel(parcel, queue_id) + parsed_data['user_id'] = parsed_data['user_id'] if parsed_data['user_id'] != '' else None + + parcels.append(( + parsed_data['id'], parsed_data['package_size'], parsed_data['file_size'], parsed_data['data'], + parsed_data['query_type'], parsed_data['from'], parsed_data['to'], parsed_data['user_id'] + )) + + return parcels diff --git a/enserver/parcel/base.py b/enserver/parcel/base.py new file mode 100644 index 0000000..42a5e17 --- /dev/null +++ b/enserver/parcel/base.py @@ -0,0 +1,129 @@ +import os +import sys +import time +from datetime import datetime +from xml.dom import minidom +from conf import settings + +# lower category index - higher query_type priority +parcel_priorities = [ + [ # 0 + 1, 4, 5, 7, 16, 23, 24, 25, 46, 47, 6001, 6002, + 1001, 1004, 1005, 1007, 1016, 1023, 1024, 1046, 1047, 6101, 6102 + ], + [ # 1 + 6, 8, 17, 18, 19, 21, 27, 28, 29, + 1006, 1008, 1017, 1018, 1019, 1021, 1027, 1028 + ] +] + + +class ParcelState(object): + # parcel is being delivered to addressed + TO_DOWNLOAD = 0 + + +class IncorrectParcel(Exception): + pass + + +class Parcel(object): + protocols = ('tcp://', 'soek://', 'es://', 'local://') + + def __init__(self, params, files=None, callback_url=None, state=ParcelState.TO_DOWNLOAD): + if 'to' not in params: + raise IncorrectParcel('Missing delivery address') + correct_protocol = any(map(params['to'].startswith, self.protocols)) + if not correct_protocol and params['to'] != 'local': + raise IncorrectParcel('Incorrect or unsupported delivery address') + + self.params = params + self.files = files or [] + self.callback_url = callback_url or '' + self.id = self.id_from_params() + self.state = state + + if 'ts_added' not in self.params: + self.params['ts_added'] = time.time() + + def __lt__(self, other): + """ + Determines parcel priority + parcel1 < parcel2 means parcel1 has higher priority + """ + self_pr, other_pr = (sys.maxsize,) * 2 + for i in range(len(parcel_priorities)): + category = parcel_priorities[i] + if self.params['query_type'] in category: + self_pr = i + if other.params['query_type'] in category: + other_pr = i + if self_pr != other_pr: + return self_pr < other_pr + else: + self_t = float(self.params['ts_added']) + other_t = float(other.params['ts_added']) + return self_t <= other_t + + def id_from_params(self): + """ + Extracts parcel_id from query_data header tag + """ + data = self.params['query_data'] + data = data.encode('utf-8') if isinstance(data, str) else data + document = minidom.parseString(data) + header = document.getElementsByTagName('header')[0] + return header.getAttribute('parcel_id') + + def update_ts(self, ts_added=None): + self.params['ts_added'] = ts_added or time.time() + + def __str__(self): + self_type = self.params.get('query_type', 'none') + return 'type = {0}, id = {1}'.format(self_type, self.id) + + def payload_size(self): + if not self.files: + return 0 + + total_size = 0 + for f in self.files: + filename = f['name'] + filepath = os.path.join(settings.storage, self.id, filename) + if os.path.exists(filepath): + total_size += os.path.getsize(filepath) + + return total_size + + +class LastConnect(object): + def __init__(self, queue_id, is_client, last_update=None, need_replication=False): + self.last_update = last_update or datetime.now() + self.queue_id = queue_id + self.is_client = is_client + self.need_replication = need_replication + + def update_date(self): + self.last_update = datetime.now() + + def as_dict(self): + parsed_data = { + 'queue_id': self.queue_id, + 'last_update': self.last_update, + 'is_client': self.is_client, + 'need_replication': self.need_replication + } + return parsed_data + + +class KnownConnect(object): + def __init__(self, queue_id, is_client): + self.queue_id = queue_id + self.is_client = is_client + + def as_dict(self): + parsed_data = { + 'queue_id': self.queue_id, + 'is_client': self.is_client + } + return parsed_data diff --git a/enserver/parcel/constants.py b/enserver/parcel/constants.py new file mode 100644 index 0000000..cefba49 --- /dev/null +++ b/enserver/parcel/constants.py @@ -0,0 +1,8 @@ +# List of the special queue names +# use them instead of magic strings +ACCEPTED = 'accepted' +ARCHIVE = 'archive' +DOWNLOAD = 'download' +POSTMAN = 'postman' +SOEK = 'soek' +TRANSFER = 'transfer' diff --git a/enserver/parcel/parcelqueue.py b/enserver/parcel/parcelqueue.py new file mode 100644 index 0000000..d42ff97 --- /dev/null +++ b/enserver/parcel/parcelqueue.py @@ -0,0 +1,396 @@ +# coding: utf-8 + +import datetime +import logging +import pickle +import threading +from collections import defaultdict +from queue import Queue +from xml.dom.minidom import parseString + +import psycopg2 +from psycopg2.extras import RealDictCursor + +from parcel.constants import ACCEPTED, ARCHIVE +from parcel.utils import (execute_parsed_query, + get_all_existed_queue_id_from_db, get_db_connection, + get_last_connection_from_db, put_known_connect_in_db, + put_last_connect_in_db, put_parcel_in_db, + remove_all_reconnected_last_connection_from_db, + restore_parcel_from_db) + +log = logging.getLogger('enserver.queue') + + +class ParcelQueue(Queue): + def __init__(self, maxsize=0, queue_id='unknown'): + Queue.__init__(self, maxsize=maxsize) + self.queue_id = queue_id + self.conn = None + self._db_connect() + + def __del__(self): + self.conn.close() + + def _db_connect(self): + if not self.conn: + try: + self.conn = get_db_connection() + self._table_load() + + except psycopg2.Error: + log.exception('Database connection error:') + raise + + def _table_append(self, item): + cursor = self.conn.cursor(cursor_factory=RealDictCursor) + put_parcel_in_db(cursor, item, self.queue_id) + + def _table_load(self): + cursor = self.conn.cursor() + cmd = "SELECT data, \"from\", \"to\", query_type, user_id FROM parcels WHERE queue_id = %s" + cursor.execute(cmd, (self.queue_id, )) + for data, _from, to, query_type, user_id in cursor.fetchall(): + parcel = restore_parcel_from_db(data, _from, to, query_type, user_id) + self._put(parcel) + + def remove(self, items_id_list): + self.not_empty.acquire() + try: + return self._remove(items_id_list) + finally: + self.not_empty.release() + + def _remove(self, items_id_list): + removed_ids = [] + for queue_item in list(self.queue): + if queue_item.id in items_id_list: + self.queue.remove(queue_item) + removed_ids.append(queue_item.id) + + return removed_ids + + # Queue representation + def _put(self, item, table_append=True): + if table_append: + self._table_append(item) + + self.queue.append(item) + + +class QueueManager(object): + def __init__(self): + self.mutex = threading.Lock() + self.queues = {} + self.conn = None + + def _db_connect(self, cursor_factory=None): + if not self.conn: + self.conn = get_db_connection() + + return self.conn.cursor(cursor_factory=cursor_factory) if cursor_factory else self.conn.cursor() + + def get(self, queue_id, queue_class=ParcelQueue): + self._db_connect() + with self.mutex: + if queue_id not in self.queues: + self.queues[queue_id] = queue_class(queue_id=queue_id) + return self.queues[queue_id] + + def get_parcel(self, parcel_id): + cursor = self._db_connect(cursor_factory=RealDictCursor) + cmd = "SELECT data, \"from\", \"to\", query_type, user_id FROM parcels WHERE id = %s" + cursor.execute(cmd, (parcel_id, )) + if cursor.rowcount > 0: + row = cursor.fetchone() + parcel = restore_parcel_from_db(row['data'], row['from'], row['to'], row['query_type'], row['user_id']) + return parcel + + def get_parcels(self, parcel_ids): + cursor = self._db_connect(cursor_factory=RealDictCursor) + cmd = "select data, \"from\", \"to\", query_type, user_id from parcels where id in %s" + cursor.execute(cmd, (tuple(parcel_ids),)) + + parcels = [ + restore_parcel_from_db(row['data'], row['from'], row['to'], row['query_type'], row['user_id']) + for row in cursor.fetchall() + ] + return parcels + + def put_parcel(self, parcel, queue_id): + cursor = self._db_connect(cursor_factory=RealDictCursor) + put_parcel_in_db(cursor, parcel, queue_id) + + def put_last_connect(self, last_connect): + cursor = self._db_connect() + put_last_connect_in_db(cursor, last_connect) + + def put_known_connect(self, known_connect): + cursor = self._db_connect() + put_known_connect_in_db(cursor, known_connect) + + def del_reconnected_last_connect(self): + cursor = self._db_connect() + remove_all_reconnected_last_connection_from_db(cursor) + + def get_last_connect(self, queue_id, is_client): + cursor = self._db_connect() + return get_last_connection_from_db(cursor, queue_id, is_client) + + def get_all_existed_queue_id(self, is_client): + cursor = self._db_connect() + return get_all_existed_queue_id_from_db(cursor, is_client) + + def list_useful_ids(self, updated_timeout=3600): + cursor = self._db_connect() + cmd = """ + SELECT id FROM parcels + WHERE queue_id NOT IN (%(accepted)s, %(archive)s) OR queue_id = %(accepted)s AND updated >= %(updated)s + """ + params = { + 'accepted': ACCEPTED, + 'archive': ARCHIVE, + 'updated': datetime.datetime.now() - datetime.timedelta(seconds=updated_timeout) + } + cursor.execute(cmd, params) + return [row[0] for row in cursor] + + def get_parcels_by_protocol(self, protocol): + if protocol: + cmd = """ + select data, \"from\", \"to\", query_type, user_id from parcels + where queue_id like '%s://%%' + order by queue_id + """ % protocol + cursor = self._db_connect(cursor_factory=RealDictCursor) + cursor.execute(cmd) + + parcels = [ + restore_parcel_from_db(row['data'], row['from'], row['to'], row['query_type'], row['user_id']) + for row in cursor.fetchall() + ] + return parcels + else: + return [] + + def sortable_fields(self): + return ['queue_id', 'id', 'added', 'to', 'updated', 'package_size', 'file_size'] + + def get_parcels_list(self, order_by, order_dir, offset, limit, where_params): + where_params_to = where_params.get('where_params_to') + where_params_query_type = where_params.get('where_params_query_type') + cursor = self._db_connect() + parcels = [] + + cmd_count = """ + SELECT count(*) + FROM parcels + """ + cmd_list = """ + SELECT id, queue_id, added, package_size, file_size, "to", query_type + FROM parcels + """ + + conditional_cmd = """ + WHERE queue_id NOT IN (%(accepted)s, %(archive)s) + """ + if where_params_to: + conditional_cmd += " AND \"to\" = \'%s\'" % where_params_to + if where_params_query_type: + conditional_cmd += " AND query_type = %s" % where_params_query_type + + order_cmd = """ + ORDER BY \"%(order_by)s\" %(order_dir)s + """ % {'order_by': order_by, 'order_dir': order_dir} + limit_offset = """ + LIMIT %(limit)s + OFFSET %(offset)s + """ + cmd_count = cmd_count + conditional_cmd + cmd_list = cmd_list + conditional_cmd + order_cmd + limit_offset + cursor.execute(cmd_count, { + 'accepted': ACCEPTED, + 'archive': ARCHIVE + }) + total, = cursor.fetchone() + + cursor.execute(cmd_list, { + 'offset': offset, + 'limit': limit, + 'accepted': ACCEPTED, + 'archive': ARCHIVE + }) + for _id, queue_id, added, package_size, file_size, to, query_type in cursor.fetchall(): + parcels.append({ + 'id': _id, + 'queue_id': queue_id, + 'added': added.strftime('%d.%m.%Y %H:%M:%S'), + 'package_size': package_size, + 'to': to, + 'query_type': query_type, + 'file_size': float(file_size) + }) + + return total, parcels + + def filterable_fields(self): + return ['query_type', 'to'] + + def get_parcels_list_data_by_field(self, field_name): + parcels_data = [] + uniq_row_record = [] + cmd_list = """ + SELECT id, "to", query_type + FROM parcels + WHERE queue_id NOT IN (%(accepted)s, %(archive)s) + """ + cursor = self._db_connect(cursor_factory=RealDictCursor) + + cursor.execute(cmd_list, { + 'accepted': ACCEPTED, + 'archive': ARCHIVE + }) + for parcel in cursor.fetchall(): + field = parcel.get(field_name) + if field not in uniq_row_record: + uniq_row_record.append(field) + parcels_data.append({field_name: field}) + + return parcels_data + + def queue_list(self): + return self.queues.keys() + + def remove_parcels(self, queues_with_parcels, remove_from_table=True): + removed_ids = self._remove_parcels_from_queues(queues_with_parcels) + + if remove_from_table: + parcels_ids = [_id for id_list in queues_with_parcels.values() for _id in id_list] + removed_ids_from_table = self._remove_parcels_from_table(parcels_ids) + removed_ids = list(set(removed_ids) | set(removed_ids_from_table)) + + return removed_ids + + def _remove_parcels_from_queues(self, queues_with_parcels): + removed_ids = [] + for queue_name, parcels_ids in queues_with_parcels.items(): + queue = self.queues.get(queue_name) + if queue: + removed_ids_chunk = queue.remove(parcels_ids) + removed_ids += removed_ids_chunk + + return removed_ids + + def _remove_parcels_from_table(self, parcels_id_list): + cmd_delete = """ + DELETE FROM parcels + WHERE queue_id NOT IN (%(accepted)s, %(archive)s) AND id IN %(parcels_ids)s + RETURNING id + """ + + cursor = self._db_connect() + cursor.execute(cmd_delete, { + 'accepted': ACCEPTED, + 'archive': ARCHIVE, + 'parcels_ids': tuple(parcels_id_list) + }) + + removed_ids = [row[0] for row in cursor.fetchall()] + return removed_ids + + def remove_parcels_by_queue_id(self, queue_id): + cmd_delete = """ + DELETE from parcels + WHERE queue_id = %s + RETURNING id + """ + cursor = self._db_connect() + cursor.execute(cmd_delete, (queue_id, )) + + removed_ids = [row[0] for row in cursor.fetchall()] + return removed_ids + + def remove_last_connect(self, queue_id, is_client): + cmd_delete = """ + DELETE FROM last_connect + WHERE queue_id = %s and is_client = %s + """ + cursor = self._db_connect() + cursor.execute(cmd_delete, (queue_id, is_client)) + + def remove_parcels_by_filters(self, where_params): + where_params_to = where_params.get('where_params_to') + where_params_query_type = where_params.get('where_params_query_type') + cursor = self._db_connect() + queues_with_parcels = defaultdict(list) + ids_to_remove = [] + + cmd_select = """ + SELECT id, queue_id FROM parcels + """ + conditional_cmd = """ + WHERE queue_id NOT IN (%(accepted)s, %(archive)s) + """ + if where_params_to: + conditional_cmd += " AND \"to\" = \'%s\'" % where_params_to + if where_params_query_type: + conditional_cmd += " AND query_type = %s" % where_params_query_type + cmd_select += conditional_cmd + + cursor.execute(cmd_select, { + 'accepted': ACCEPTED, + 'archive': ARCHIVE + }) + for _id, queue_id in cursor.fetchall(): + queues_with_parcels[queue_id].append(_id) + ids_to_remove.append(_id) + + removed_ids = self.remove_parcels(queues_with_parcels) + not_removed_ids = list(set(ids_to_remove) - set(removed_ids)) + + return not not_removed_ids + + def remove_replications_parcels(self, replications_recipients, replications_ids): + try: + cursor = self._db_connect() + parcels_to_remove = defaultdict(list) + parcels_ids_list = [] + cmd_select_by_recipients = """ + SELECT id, queue_id, data FROM parcels + WHERE queue_id IN %s + """ + + execute_parsed_query(cmd_select_by_recipients, replications_recipients, cursor) + for parcel_id, queue_id, data in cursor.fetchall(): + parcel = pickle.loads(data) + xml_document = parseString(parcel.params['query_data'].encode('utf-8')) + pkg_tag = xml_document.getElementsByTagName('package') + if pkg_tag and pkg_tag[0].getAttribute('pkg_id') in replications_ids: + parcels_to_remove[queue_id].append(parcel_id) + parcels_ids_list.append(parcel_id) + + if parcels_to_remove: + self._remove_parcels_from_queues(parcels_to_remove) + self._remove_parcels_from_table(parcels_ids_list) + + return True, 'All replications parcels were successfully removed' + + except Exception: + return False, 'Error while removing replications parcels' + + def route_parcel(self, parcel_id, queue_id): + parcel = self.get_parcel(parcel_id) + if not parcel.params['from'].startswith('tcp') or \ + not parcel.params['to'].startswith('es'): + return False, 'Unroutable parcel' + + queue_with_parcel = {queue_id: [parcel_id]} + removed_id_list = self._remove_parcels_from_queues(queue_with_parcel) + if parcel_id in removed_id_list: + self.get(queue_id).put(parcel) + return True, 'Parcel has been successfully routed' + + return False, 'Parcel has not been routed' + + +manager = QueueManager() diff --git a/enserver/parcel/utils.py b/enserver/parcel/utils.py new file mode 100644 index 0000000..5f44259 --- /dev/null +++ b/enserver/parcel/utils.py @@ -0,0 +1,181 @@ +import logging +import pickle +from copy import deepcopy +from re import sub + +import psycopg2 + +from parcel.base import LastConnect +from conf import settings + +try: + from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT +except ImportError: + ISOLATION_LEVEL_AUTOCOMMIT = 0 + + +log = logging.getLogger('enserver.queue') + + +def get_db_connection(): + try: + conn = psycopg2.connect( + host=settings.db_host, + port=settings.db_port, + database=settings.db_name, + user=settings.db_user, + password=settings.db_passwd + ) + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + return conn + + except psycopg2.Error: + log.exception('Database connection error:') + raise + + +def restore_parcel_from_db(data, _from, to, query_type, user_id): + parcel = pickle.loads(data) + parcel.params['from'] = _from + parcel.params['to'] = to + parcel.params['query_type'] = query_type + parcel.params['user_id'] = user_id + + return parcel + + +def restore_last_connect_from_db(queue_id, is_client, last_update, need_replication): + return LastConnect( + queue_id=queue_id, + is_client=is_client, + last_update=last_update, + need_replication=need_replication + ) + + +def parse_parcel(parcel, queue_id): + keys_to_remove = ('from', 'to', 'query_type', 'user_id') + + parsed_data = { + 'id': parcel.id, + 'queue_id': queue_id, + 'from': parcel.params['from'], + 'to': parcel.params['to'], + 'query_type': parcel.params['query_type'], + 'user_id': parcel.params['user_id'], + 'file_size': parcel.payload_size(), + 'package_size': len(pickle.dumps(parcel, -1)) + } + + parsed_parcel = deepcopy(parcel) + for key in keys_to_remove: + del parsed_parcel.params[key] + parsed_data['data'] = psycopg2.Binary(pickle.dumps(parsed_parcel, -1)) + + return parsed_data + + +def put_parcel_in_db(cursor, parcel, queue_id): + cmd = """ + UPDATE parcels + SET queue_id=%(queue_id)s, + data=%(data)s, + updated=current_timestamp, + package_size=%(package_size)s, + file_size=%(file_size)s, + "from"=%(from)s, + "to"=%(to)s, + query_type=%(query_type)s, + user_id=%(user_id)s + WHERE id=%(id)s + """ + + parsed_data = parse_parcel(parcel, queue_id) + cursor.execute(cmd, parsed_data) + if cursor.rowcount > 0: + return + + cmd = """ + INSERT INTO parcels (queue_id, data, package_size, file_size, "from", "to", query_type, user_id, id) + VALUES ( + %(queue_id)s, %(data)s, %(package_size)s, %(file_size)s, + %(from)s, %(to)s, %(query_type)s, %(user_id)s, %(id)s + ) + """ + cursor.execute(cmd, parsed_data) + + +def put_last_connect_in_db(cursor, last_connect): + cmd = """ + UPDATE last_connect + SET last_update=%(last_update)s, need_replication=%(need_replication)s + WHERE queue_id=%(queue_id)s and is_client=%(is_client)s + """ + + parsed_data = last_connect.as_dict() + cursor.execute(cmd, parsed_data) + if cursor.rowcount > 0: + return + + cmd = """ + INSERT INTO last_connect (queue_id, last_update, is_client) + VALUES (%(queue_id)s, %(last_update)s, %(is_client)s) + """ + cursor.execute(cmd, parsed_data) + + +def put_known_connect_in_db(cursor, known_connect): + parsed_data = known_connect.as_dict() + cmd = """ + INSERT INTO known_connect (queue_id, is_client) + SELECT %(queue_id)s, %(is_client)s + WHERE + NOT EXISTS ( + SELECT queue_id FROM known_connect WHERE queue_id = %(queue_id)s AND is_client = %(is_client)s + ); + """ + cursor.execute(cmd, parsed_data) + + +def get_all_existed_queue_id_from_db(cursor, is_client): + cmd = """ + SELECT queue_id + FROM known_connect + WHERE is_client = %s + AND EXISTS(SELECT queue_id + FROM parcels + WHERE queue_id=known_connect.queue_id) + """ + cursor.execute(cmd, (is_client, )) + return tuple(queue_id for queue_id in cursor.fetchall()) + + +def remove_all_reconnected_last_connection_from_db(cursor): + cmd = """ + DELETE FROM last_connect + WHERE NOT EXISTS( + SELECT queue_id + FROM parcels + WHERE queue_id=last_connect.queue_id) + AND need_replication = False + """ + cursor.execute(cmd) + + +def get_last_connection_from_db(cursor, queue_id, is_client): + cmd = """ + SELECT queue_id, last_update, is_client, need_replication + FROM last_connect + WHERE queue_id = %s AND is_client = %s + """ + cursor.execute(cmd, (queue_id, is_client)) + for queue_id, last_update, is_client, need_replication in cursor.fetchall(): + return restore_last_connect_from_db(queue_id, is_client, last_update, need_replication) + return None + + +def execute_parsed_query(sql, tmp_list, cursor): + sql_modified = cursor.mogrify(sql, (tuple(tmp_list),)) + sql_modified = sql_modified.replace('((', '(', 1) + sql_modified = sub(r'(.*)\)\)', r'\1)', sql_modified) + cursor.execute(sql_modified) diff --git a/enserver/worker/audit.py b/enserver/worker/audit.py new file mode 100644 index 0000000..4642998 --- /dev/null +++ b/enserver/worker/audit.py @@ -0,0 +1,132 @@ +# coding: utf-8 + +import logging +import queue +import time +from datetime import datetime + +import psycopg2 + +from conf import settings +from lib.threads import Worker +from xmlrpc_wrapper import ServerProxy + +try: + from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT +except ImportError: + ISOLATION_LEVEL_AUTOCOMMIT = 0 + + +class Audit(Worker): + class LEVEL: + INFO = 0 + WARNING = 1 + ERROR = 2 + + queue = queue.Queue() + MSG_BATCH_SIZE = 100 + + def __init__(self, retry_timeout=30, *args, **kwargs): + self.log = logging.getLogger('enserver.audit') + self.conn = None + self.proxy = None + self.wait_interval = retry_timeout + super(Audit, self).__init__(*args, **kwargs) + + def before_shifts(self): + self.proxy = ServerProxy(settings.auth_provider) + self._table_load() + + def shift(self): + msgs = [] + try: + while True: + msg = self.queue.get(block=True, timeout=0.1) + if 'id' not in msg: + msg['id'] = self._table_append(msg) + msgs.append(msg) + except queue.Empty: + pass + if not len(msgs): + return + try: + self.aud_add(msgs) + self.log.info('Called aud_add, messages total: %s', len(msgs)) + for msg in msgs: + self._table_delete(msg['id']) + except Exception as ex: + self.log.error('aud_add exception: %s', ex) + for msg in msgs: + self.queue.put(msg) + self.stop_event.wait(self.wait_interval) + + def aud_add(self, msgs): + for pos in range(0, len(msgs), self.MSG_BATCH_SIZE): + self.proxy.aud_add(msgs[pos:pos+self.MSG_BATCH_SIZE]) + + def _db_connect(self): + if self.conn is None: + try: + self.conn = psycopg2.connect( + host=settings.db_host, + port=settings.db_port, + database=settings.db_name, + user=settings.db_user, + password=settings.db_passwd + ) + self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + if not self._table_exist(): + self._table_create() + except psycopg2.Error as ex: + self.log.critical('Database connection error: %s', ex) + raise + return self.conn.cursor() + + def _table_exist(self): + cmd = "select exists(select * from information_schema.tables where table_name='audit')" + cursor = self.conn.cursor() + cursor.execute(cmd) + return cursor.fetchone()[0] + + def _table_create(self): + self.log.info('Creating table for audit messages') + cursor = self.conn.cursor() + cmd = "create table audit (" \ + "id serial primary key, " \ + "level integer not null, " \ + "type integer not null, " \ + "description text not null," \ + "time timestamp default current_timestamp" \ + ")" + cursor.execute(cmd) + + def _table_append(self, msg): + cursor = self._db_connect() + params = (msg['level'], msg['type'], msg['description'], datetime.fromtimestamp(msg['time'])) + cmd = "insert into audit (level, type, description, time) values (%s, %s, %s, %s) returning id" + cursor.execute(cmd, params) + return cursor.fetchone()[0] + + def _table_load(self): + cursor = self._db_connect() + cursor.execute("select id, level, type, description, time from audit") + for row in cursor: + msg = dict(zip(('id', 'level', 'type', 'description', 'time'), row)) + if isinstance(msg['time'], datetime): + msg['time'] = time.mktime(msg['time'].timetuple()) + self.queue.put(msg) + + def _table_delete(self, msg_id): + cursor = self._db_connect() + cursor.execute("delete from audit where id = %s", (msg_id, )) + + @classmethod + def add_message(cls, type_id, msg, timestamp=None, warning=False, error=False): + if error: + level = cls.LEVEL.ERROR + elif warning: + level = cls.LEVEL.WARNING + else: + level = cls.LEVEL.INFO + timestamp = timestamp or time.time() + cls.queue.put({'level': level, 'type': type_id, 'description': msg, 'time': timestamp}) diff --git a/enserver/worker/downloader.py b/enserver/worker/downloader.py new file mode 100644 index 0000000..4707841 --- /dev/null +++ b/enserver/worker/downloader.py @@ -0,0 +1,120 @@ +# coding: utf-8 + +import logging +import os +import queue +import re +import urllib.request +import urllib.error +from xmlrpc.client import Fault + +from conf import settings +from lib.threads import Worker +from parcel import constants, parcelqueue +from xmlrpc_wrapper import ServerProxy + + +class ParcelDownloader(Worker): + def __init__(self, in_q=None, bufsize=65536): + self.in_q = in_q or parcelqueue.manager.get(constants.DOWNLOAD) + self.bufsize = bufsize + self.static_dir = settings.storage + self.log = logging.getLogger('enserver.downloader') + self.re_url = re.compile('^(?P.+://?)(?:(?P.+?)(?::(?P.+))?@)?(?P.+)$') + super(ParcelDownloader, self).__init__() + + def shift(self): + try: + parcel = self.in_q.get(True, 0.1) + self.receive_parcel(parcel) + except queue.Empty: + return + except Exception as ex: + self.log.exception(ex) + + def receive_parcel(self, parcel): + params, files = parcel.params, parcel.files + self.log.info('Processing parcel: %s', parcel) + + parcel_dir = os.path.join(self.static_dir, parcel.id) + if not os.path.exists(parcel_dir) and files: + os.mkdir(parcel_dir, 750) + + try: + for f in files: + self.download_url(parcel_dir, f['url'], f['name']) + except (urllib.error.URLError, urllib.error.HTTPError) as exc: + self.log.error('File download failed, sending onError! Exception: %s', exc) + self.notify_onerror(params, files, parcel.callback_url) + parcelqueue.manager.put_parcel(parcel, parcelqueue.ARCHIVE) + return + + self.notify_onsent(params, files, parcel.callback_url) + self.force_delete(files, parcel.callback_url) + self.log.info('Finished processing parcel: %s', parcel) + + self.transfer_parcel(parcel) + + def download_url(self, parcel_dir, url, name): + self.log.info('Downloading file url %s', url) + + match = self.re_url.match(url) + if match is None: + raise urllib.error.URLError('Url does not match the http pattern!') + url_groups = match.groupdict() + + if 'login' in url_groups: + url = url_groups['protocol'] + url_groups['path'] + passman = urllib.request.HTTPPasswordMgrWithDefaultRealm() + passman.add_password(None, url, url_groups.get('login'), url_groups.get('password') or '') + opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(passman)) + urllib.request.install_opener(opener) + + fspath = os.path.join(parcel_dir, name) + with open(fspath, 'wb') as f: + u = urllib.request.urlopen(url) + while True: + b = u.read(self.bufsize) + f.write(b) + if len(b) < self.bufsize: + break + return fspath + + def transfer_parcel(self, parcel): + deliver_to = parcel.params['to'] + self.log.debug('Sending parcel to %s', deliver_to) + + if deliver_to.startswith('local'): + parcel.params.setdefault('from', deliver_to) + parcelqueue.manager.get(constants.POSTMAN).put(parcel) + + elif deliver_to.startswith('soek://'): + parcel.params.setdefault('from', 'soek://%s' % settings.ims_user) + parcelqueue.manager.get(constants.SOEK).put(parcel) + + else: + parcelqueue.manager.get(deliver_to).put(parcel) + + def notify_onsent(self, params, files, callback_url): + try: + sender = ServerProxy(callback_url) + sender.onSent(params, files, callback_url) + except Exception as exc: + self.log.warning('onSent failed for url {0}, exception: {1}'.format(callback_url, exc)) + + def notify_onerror(self, params, files, callback_url): + try: + sender = ServerProxy(callback_url) + sender.onError(params, files, callback_url) + except Exception as exc: + self.log.warning('onError failed for url {0}, exception: {1}'.format(callback_url, exc)) + + def force_delete(self, files, callback_url): + try: + sender = ServerProxy(callback_url) + sender.forceDelete(files) + except Fault as exc: + if exc.faultString == 'No such handler: forceDelete': + self.log.warning('forceDelete method if not supported by service: {0}'.format(callback_url)) + except Exception as exc: + self.log.warning('forceDelete failed for url {0}, exception: {1}'.format(callback_url, exc)) diff --git a/enserver/worker/manager.py b/enserver/worker/manager.py new file mode 100644 index 0000000..503e6ae --- /dev/null +++ b/enserver/worker/manager.py @@ -0,0 +1,43 @@ +# coding: utf-8 + +import logging +import os +import shutil +import time + +from conf import settings +from lib.threads import Worker +from parcel import parcelqueue + + +class Manager(Worker): + def __init__(self, clean_interval=30.0, shift_interval=1.0): + self.log = logging.getLogger('enserver.manager') + self.clean_timing = [0, clean_interval] # [last clean time, interval] + super(Manager, self).__init__(shift_interval=shift_interval) + + def shift(self): + curr_timestamp = time.time() + if curr_timestamp > sum(self.clean_timing): + self.clean_timing[0] = curr_timestamp + self._clean_storage() + + def _clean_storage(self): + self.log.debug('Cleaning storage directory') + store_path = settings.storage + parcel_ids = set(parcelqueue.manager.list_useful_ids()) + for name in filter(lambda s: not s.startswith("."), os.listdir(store_path)): + path = os.path.join(store_path, name) + if name in parcel_ids: + continue + try: + if os.stat(path).st_ctime > self.clean_timing[0] - self.clean_timing[1]: + continue + self.log.info('Deleting from storage: %s', name) + if os.path.isfile(path): + os.unlink(path) + else: + shutil.rmtree(path) + except: + self.log.info('Can\'t delete %s', name) + pass diff --git a/enserver/worker/postman.py b/enserver/worker/postman.py new file mode 100644 index 0000000..6ffae31 --- /dev/null +++ b/enserver/worker/postman.py @@ -0,0 +1,86 @@ +# coding: utf-8 + +import logging +import os +import queue +import time + +from conf import settings +from lib.threads import Worker +from parcel import constants, parcelqueue +from xmlrpc_wrapper import ServerProxy + + +class Postman(Worker): + """ + Delivers parcel to addressee by calling xmlrpc accept + """ + + def __init__(self, in_q=None, re_delivery_interval=30.0): + self.in_q = in_q or parcelqueue.manager.get(constants.POSTMAN) + self.re_q = parcelqueue.manager.get('postman_re') + self.static_url = settings.static_url + self.xmlrpc_url = settings.xmlrpc_url + self.re_delivery_interval = re_delivery_interval + self.last_re_delivery = time.time() + self.log = logging.getLogger('enserver.postman') + super(Postman, self).__init__() + + def shift(self): + try: + parcel = self.in_q.get(True, 0.1) + self.log.debug('Processing parcel: %s', parcel) + if 'delivered' in parcel.params: + self.log.info('Informing sender about delivery, parcel: %s', parcel) + self.deliver_notify(parcel) + else: + self.log.debug('Delivering parcel: %s', parcel) + self.deliver_parcel(parcel) + except queue.Empty: + if not self.re_q.empty(): + curr_time = time.time() + if curr_time - self.last_re_delivery >= self.re_delivery_interval: + while not self.re_q.empty(): + self.in_q.put(self.re_q.get()) + self.last_re_delivery = curr_time + except Exception as ex: + self.log.exception(ex) + + def deliver_parcel(self, parcel): + delivered = False + deliver_to = parcel.params['to'] + subscribers = settings.locals.get(deliver_to) or settings.locals['local'] + self.log.debug('Subscribers list: %s', subscribers) + for address in subscribers: + s = ServerProxy(address) + try: + if s.accept(parcel.params, self.fixed_files(parcel), self.xmlrpc_url): + self.log.info('Agreed to receive parcel %s', address) + parcelqueue.manager.put_parcel(parcel, parcelqueue.ACCEPTED) + delivered = True + break + except Exception as exc: + self.log.warning('Inaccessible address %s, exception: %s', address, exc) + if not delivered: + self.re_q.put(parcel) + return delivered + + def deliver_notify(self, parcel): + try: + s = ServerProxy(parcel.callback_url) + if parcel.params['delivered']: + s.onDelivered(parcel.params, parcel.files, parcel.callback_url) + else: + s.onError(parcel.params, parcel.files, parcel.callback_url) + except Exception as exc: + self.log.warning('Failed to notify {0} about parcel {1}, exception: {2}'.format( + parcel.callback_url, + parcel.id, + exc + )) + parcelqueue.manager.put_parcel(parcel, parcelqueue.ARCHIVE) + + def fixed_files(self, parcel): + def file_dict(f): + return {'name': f['name'], 'url': os.path.join(self.static_url, parcel.id, f['name']).replace("\\","/")} + return [file_dict(f) for f in parcel.files] diff --git a/enserver/worker/soek.py b/enserver/worker/soek.py new file mode 100644 index 0000000..1a9a809 --- /dev/null +++ b/enserver/worker/soek.py @@ -0,0 +1,290 @@ +# coding: utf-8 + +import logging +import os +import pickle +import queue +import re +import shutil +import subprocess +import time +import zipfile + +from conf import settings +from lib.threads import Worker +from parcel import constants, parcelqueue +from parcel.base import Parcel + + +def split_file(src_path, dst_path=None, limit=1048576, bufsize=65536): + dst_path = dst_path or src_path + parts = [] + with open(src_path, 'rb') as src: + dst_part, dst_size = 0, 0 + dst = open(dst_path + '_%s' % dst_part, 'wb') + for buf in iter(lambda: src.read(bufsize), ''): + while buf: + dst = open(dst_path + '_%s' % dst_part, 'wb') if dst.closed else dst + write_max = limit - dst_size + dst_size += min(write_max, len(buf)) + dst.write(buf[:write_max]) + buf = buf[write_max:] + if dst_size == limit: + dst.close() + parts.append(dst_path + '_%s' % dst_part) + dst_part, dst_size = dst_part + 1, 0 + if not dst.closed: + parts.append(dst_path + '_%s' % dst_part) + dst.close() + return parts + + +def abslistdir(path): + return ((os.path.join(path, filename), filename) for filename in os.listdir(path)) + + +class IMSCallError(Exception): + pass + + +class IMSWorker(Worker): + def __init__(self, in_q=None, exchange_interval=60): + self.in_q = in_q or parcelqueue.manager.get(constants.SOEK) + self.dir_in = os.path.join(settings.ims_dir, 'in') + self.dir_out = os.path.join(settings.ims_dir, 'out') + self.dir_tmp = os.path.join(settings.ims_dir, 'tmp') + self.exchange_time = [time.time(), exchange_interval] + self.garbage = [] + self.log = logging.getLogger('enserver.soek') + super(IMSWorker, self).__init__() + + def before_shifts(self): + self.init_dir_structure() + + def shift(self): + try: + parcel = self.in_q.get(True, 0.1) + parcel.params['from'] = u'soek://%s' % self.decode_str(settings.ims_user) + self.log.debug('Processing parcel: %s', parcel) + + if len(parcel.files) > 1: + attach = self.pack_files(parcel.id, parcel.files) + elif len(parcel.files) == 1: + attach = os.path.join(settings.storage, parcel.id, parcel.files[0]['name']) + else: + attach = '' + + if attach: + a_size = os.path.getsize(attach) + if a_size > settings.ims_limit: + parts = split_file(attach, limit=settings.ims_limit) + self.log.debug('Parcel splitted, got %s parts', len(parts)) + else: + parts = [attach] + else: + parts = [] + + try: + self.create_messages(parcel, parts) + self.send_and_receive() + except IMSCallError: + self.log.exception('Error while invoking ims tools:') + parcel.update_ts() + self.in_q.put(parcel) + self.exchange_time[0] = time.time() + except queue.Empty: + curr_time = time.time() + if curr_time >= sum(self.exchange_time): + try: + self.send_and_receive() + except IMSCallError: + self.log.exception('Error while invoking ims tools:') + self.exchange_time[0] = curr_time + + def create_messages(self, parcel, parts): + params_path = os.path.join(self.dir_tmp, '%s_params' % parcel.id) + addressee = parcel.params['to'] + assert addressee.startswith('soek://') + with open(params_path, 'wb') as f: + pickle.dump(parcel, f) + + args = [ + settings.ims_edit, + u'create', + u'--from=%s' % self.decode_str(settings.ims_user), + u'--to=%s' % self.decode_str(addressee[7:]), + u'--sect-info=файл', + u'--sect-file=%s' % params_path + ] + + if parts: + for part_n, part_path in enumerate(parts): + ed_args = args + [ + u'--subj=%s-%s-%s' % (parcel.id, part_n, len(parts)), + u'--sect-info=файл', + u'--sect-file=%s' % part_path, + os.path.join(self.dir_out, '4-%s-%s.ims' % (parcel.id, part_n)), + ] + self.call_ims(ed_args) + else: + ed_args = args + [ + u'--subj=%s-0-1' % parcel.id, + os.path.join(self.dir_out, '4-%s-0.ims' % parcel.id) + ] + self.call_ims(ed_args) + os.remove(params_path) + + def send_and_receive(self): + args = [ + settings.ims_send, + u'--originator=%s' % self.decode_str(settings.ims_user), + u'--password=%s' % self.decode_str(settings.ims_passwd), + u'--server-ip=%s' % self.decode_str(settings.ims_host), + u'--dir-in=%s' % self.decode_str(self.dir_in), + u'--dir-out=%s' % self.decode_str(self.dir_out), + ] + self.call_ims(args, ok_retcodes=(0, 40)) + self.look_into_inbox() + self.clean_garbage() + + def look_into_inbox(self): + parcels = {} + ims_re = re.compile('^(.+)-(\d+)-(\d+)$') + + for ims_path, _ in abslistdir(self.dir_in): + if not ims_path.endswith('.ims'): + os.remove(ims_path) + continue + subj = self.get_ims_subj(ims_path) + + m = ims_re.match(subj) + if not m: + self.log.warning('Incorrect subject %s for message %s', subj, ims_path) + continue + p_id, p_num, p_total = m.groups()[0], int(m.groups()[1]), int(m.groups()[2]) + parcels.setdefault(p_id, []).insert(p_num, ims_path) + + if len(parcels[p_id]) == p_total: + parcel = self.build_parcel(parcels[p_id]) + parcelqueue.manager.get(constants.POSTMAN).put(parcel) + + def build_parcel(self, ims_files): + params_file = os.path.join(self.dir_tmp, 'params') + args = [settings.ims_edit, u'export', u'--sect-file=1', ims_files[0], params_file] + self.call_ims(args) + with open(params_file, 'rb') as f: + parcel = pickle.load(f) + assert isinstance(parcel, Parcel) + os.remove(params_file) + + if not parcel.files: + os.remove(ims_files[0]) + return parcel + + atts = [] + for attach_n, ims_file in enumerate(ims_files): + atts.append(os.path.join(self.dir_tmp, 'attachment' + '_%s' % attach_n)) + args = [settings.ims_edit, u'export', u'--sect-file=2', ims_file, atts[-1]] + self.call_ims(args) + os.remove(ims_file) + + with open(atts[0], 'ab') as dst: + for att in atts[1:]: + with open(att, 'rb') as src: + shutil.copyfileobj(src, dst) + os.remove(att) + + parcel_dir = os.path.join(settings.storage, parcel.id) + if os.path.exists(parcel_dir): + shutil.rmtree(parcel_dir) + os.mkdir(parcel_dir) + + if len(parcel.files) > 1: + zf = zipfile.ZipFile(atts[0], 'r') + zf.extractall(parcel_dir) + zf.close() + os.remove(atts[0]) + else: + shutil.move(atts[0], os.path.join(parcel_dir, parcel.files[0]['name'])) + + return parcel + + def get_ims_subj(self, ims_path): + args = [settings.ims_edit, u'list', u'--subj', ims_path] + ret, msgstd, msgerr = self.call_ims(args) + return msgstd.strip() + + def init_dir_structure(self): + for d in (self.dir_tmp, self.dir_out): + if os.path.exists(d): + shutil.rmtree(d) + os.mkdir(d) + if not os.path.exists(self.dir_in): + os.mkdir(self.dir_in) + + def pack_files(self, parcel_id, files): + merge_path = os.path.join(self.dir_tmp, parcel_id) + self.log.debug('Merging parcel files to %s', merge_path) + if os.path.exists(merge_path): + os.remove(merge_path) + + zf = zipfile.ZipFile(merge_path, mode='w', compression=zipfile.ZIP_STORED) + for f in files: + f_path = os.path.join(settings.storage, parcel_id, f['name']) + zf.write(f_path, f['name']) + zf.close() + self.garbage.append(merge_path) + return merge_path + + def split_file(self, parcel_id, filepath): + parts = split_file(filepath, os.path.join(self.dir_tmp, parcel_id), limit=settings.ims_limit) + self.garbage.extend(parts) + return parts + + def clean_garbage(self): + for fspath in self.garbage: + if os.path.exists(fspath): + os.remove(fspath) + self.garbage = [] + + def decode_str(self, s, codings=('utf-8', 'koi8-r')): + if isinstance(s, str): + return s + else: + for c in codings: + try: + return s.decode(c) + except UnicodeDecodeError: + continue + raise UnicodeDecodeError('Could not decode string %s', s) + + def prepare_args(self, *args): + result = [] + for arg in args: + if isinstance(arg, str): + result.append(arg) + elif isinstance(arg, str): + result.append(arg.encode(settings.ims_encode)) + elif isinstance(arg, list): + result.extend(self.prepare_args(*arg)) + return result + + def call_ims(self, args, ok_retcodes=(0, )): + args = self.prepare_args(*args) + fout_p = os.path.join(self.dir_tmp, 'stdout') + ferr_p = os.path.join(self.dir_tmp, 'stderr') + with open(fout_p, 'w') as fout: + with open(ferr_p, 'w') as ferr: + self.log.debug('Calling ims utility %s', args) + ret = subprocess.call(args, stdout=fout, stderr=ferr) + + with open(fout_p, 'r') as fout: + msgout = fout.read() + os.remove(fout_p) + with open(ferr_p, 'r') as ferr: + msgerr = ferr.read() + os.remove(ferr_p) + + if ret not in ok_retcodes: + raise IMSCallError('Subprocess invocation error (return code %s)\n%s\n%s' % (ret, args, msgerr)) + return ret, msgout, msgerr diff --git a/enserver/worker/tcp.py b/enserver/worker/tcp.py new file mode 100644 index 0000000..39b12f5 --- /dev/null +++ b/enserver/worker/tcp.py @@ -0,0 +1,610 @@ +# coding: utf-8 + +import json +import logging +import os +import queue +import socket +import struct +import threading +import time +from collections import deque +from datetime import datetime, timedelta + +from conf import settings +from lib.translation import ugettext as _ +from parcel import constants, parcelqueue +from parcel.base import KnownConnect, LastConnect, Parcel +from worker.utils import disconnect_handle, replication_after_reconnect +from worker.audit import Audit +from xmlrpc_wrapper import ServerProxy + +try: + from sendfile import sendfile + sendfile_available = True +except ImportError: + sendfile = None + sendfile_available = False + + +log = logging.getLogger('enserver.worker.tcp') + + +workers = [] + + +class TCPClientWorker(threading.Thread): + """ + Maintains uplink connection + """ + + def __init__(self, reconnect_timeout=30.0, auth_timeout=60.0): + super(TCPClientWorker, self).__init__() + self.connect_address = (settings.host, settings.port) + self.reconnect_timeout = reconnect_timeout + self.userid_check = [0, auth_timeout, False] + self.stop_event = threading.Event() + self.workers = [] + self.last_message = None + workers.append(self) + + def run(self): + while not self.stop_event.is_set(): + if self.workers: + if not self.workers_alive() or not self.userid_correct(): + self.stop_workers() + else: + self.stop_event.wait(self.reconnect_timeout) + else: + self.start_workers() + self.stop_event.wait(self.reconnect_timeout) + + def join(self, timeout=None): + self.stop_workers() + self.stop_event.set() + super(TCPClientWorker, self).join() + + def start_workers(self): + self.workers = [] + if not self.userid_correct(): + return + try: + for state in (TCPWorker.STATE_READER, TCPWorker.STATE_WRITER): + self.workers.append(self.start_worker(state)) + log.info('Worker type %s connected to %s', state, self.connect_address[0]) + except socket.error: + log.warning('Unable to connect to %s', self.connect_address[0]) + msg = _('Unable to connect to %s') % self.connect_address[0] + if self.last_message != msg: + self.last_message = msg + Audit.add_message(4001, self.last_message, warning=True) + self.stop_workers() + disconnect_handle(is_client=True) + except Exception as exc: + log.exception('Unexpected error: %s, cls: %s', exc, self.__class__) + else: + self.last_message = '' + + def stop_workers(self): + for worker in self.workers: + worker.join() + self.workers = [] + + def workers_alive(self): + for worker in self.workers: + if not worker.is_alive(): + self.last_message = _('Disconnected from %s') % self.connect_address[0] + Audit.add_message(4001, self.last_message, warning=True) + return False + return True + + def start_worker(self, state): + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.connect(self.connect_address) + client_socket.settimeout(settings.tcp_timeout) + worker = TCPWorker(client_socket, state=state) + worker.start() + return worker + + def userid_correct(self): + if sum(self.userid_check[:2]) < time.time(): + self.userid_check[0] = time.time() + try: + s = ServerProxy(settings.auth_provider) + response = s.auth_response('', settings.id, False) + except Exception as exc: + response = {'error': True, 'msg': _('Authentication server is unavailable')} + log.warning('Authentication server is unavailable, exception: %s', exc) + self.userid_check[2] = not response.get('error', True) + if not self.userid_check[2]: + log.warning('Authentication is unavailable for user %s', settings.id) + msg = _('Authentication is unavailable for user %s') % settings.id + '
' + response.get('msg') + if self.last_message != msg: + self.last_message = msg + Audit.add_message(4001, self.last_message, warning=True) + return self.userid_check[2] + + def connections(self): + return self.workers + + +class TCPServerWorker(threading.Thread): + """ + Listens server tcp socket, accepts connections from clients + Every client connection is being served by two threads: reader and writer + """ + + def __init__(self, disconnect_timeout=30): + super(TCPServerWorker, self).__init__() + self.listen_address = (settings.tcp_host, settings.tcp_port) + self.static_dir = settings.storage + self.stop_event = threading.Event() + workers.append(self) + self.workers = [] + self.disconnect_timeout = disconnect_timeout + self.last_timestamp = datetime.now() + + def run(self): + try: + log.info('Serving on %s:%s', *self.listen_address) + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind(self.listen_address) + self.socket.listen(10) + self.socket.settimeout(0.2) + + while not self.stop_event.isSet(): + try: + client = self.socket.accept() + self.process_new_client(*client) + except socket.error: + if self.last_timestamp + timedelta(seconds=self.disconnect_timeout) > datetime.now(): + continue + else: + self.last_timestamp = datetime.now() + disconnect_handle(is_client=False, workers=self.workers) + replication_after_reconnect(self.workers) + + self.socket.close() + except Exception as exc: + log.exception('Unexpected error: %s, cls: %s', exc, self.__class__) + log.info('Bye!') + + def join(self, timeout=None): + self.stop_event.set() + super(TCPServerWorker, self).join(timeout) + + def process_new_client(self, client_socket, client_address): + log.info('New connection from %s', client_address) + client_socket.settimeout(settings.tcp_timeout) + worker = TCPWorker(client_socket, is_incoming=True) + worker.start() + self.workers.append(worker) + + def connections(self): + return self.workers + + def process_disconnected(self): + self.workers = filter(lambda worker: worker.is_alive(), self.workers) + + +class AuthError(Exception): + def __init__(self, *args, **kwargs): + self.send_error = kwargs.get('send_error', False) + super(AuthError, self).__init__(*args) + + +class TCPWorker(threading.Thread): + STATE_READER = 1 + STATE_WRITER = -1 + IDLE = 'idle' + META = 'meta' + + def __init__(self, socket, keepalive=1.0, state=None, auth_timeout=60.0, is_incoming=False): + super(TCPWorker, self).__init__() + assert state in (None, self.STATE_READER, self.STATE_WRITER) + self.socket = socket + self.address = socket.getpeername()[0] + self.in_q = None + self.out_q = parcelqueue.manager.get(constants.POSTMAN) + self.keepalive = [0, keepalive] + self.state = state + self.is_client = state is not None + self.parcel = None + self.server_id = settings.id + self.connected_id = None + self.static_dir = settings.storage + self.bufsize = settings.tcp_bufsize + self.bufdata = deque() + self.stop_event = threading.Event() + self.workflow = None + self.user_check = [0, auth_timeout] + self.established = None + self.status = self.IDLE + self.data_pos = 0 + self.data_size = 0 + self.total = 0 + self.bitrate = 0 + self.is_incoming = is_incoming + + def info(self): + return {'parcel_id': self.parcel.id if self.parcel else '', + 'type': 'up' if self.state == self.STATE_WRITER else 'down', + 'address': self.address, + 'queue': self.in_q.queue_id if self.in_q else '', + 'status': self.status, + 'established': time.mktime(self.established.timetuple()), + 'user': self.connected_id, + 'bitrate': self.bitrate, + 'total': float(self.total), + 'data_pos': float(self.data_pos), + 'data_size': float(self.data_size), + 'direction': 'incoming' if self.is_incoming else 'outgoing'} + + def run(self): + # check if pysendfile available + if sendfile_available: + self.send_file = self.send_file_syscall + else: + self.send_file = self.send_file_native + + self.name = self.socket.getpeername()[0] + self.established = datetime.now() + + try: + self.handshake() + except (socket.error, KeyError) as ex: + log.error('Handshake error: %s', ex) + event_type = 4001 if self.is_client else 4002 + Audit.add_message(event_type, _('Protocol error when connecting to %s') % self.name, warning=True) + self.stop_event.set() + except AuthError as ex: + log.error('%s - Authentication failed, exception: %s', self.name, ex) + if ex.send_error: + self.send_data({'auth_error': True}) + self.stop_event.set() + + while not self.stop_event.is_set(): + try: + self.workflow() + except socket.error: + self.stop_event.set() + if self.state == self.STATE_WRITER and self.parcel is not None: + self.in_q.put(self.parcel) + except Exception as exc: + log.exception('Unexpected error: %s', exc) + self.socket.close() + log.info('%s - Bye!', self.name) + + def handshake(self): + if self.state is not None: + # + # client case + # + c1 = self.auth_challenge() + self.send_data({'flow_state': self.state, + 'server_id': self.server_id, + 'challenge': c1}) + + data = self.receive_data() + if data.get('auth_error', False): + Audit.add_message(4001, _('Authentication error when connecting to %s') % self.name, warning=True) + raise AuthError(send_error=False) + + self.connected_id = data['server_id'] + r1, err_msg = self.auth_response(c1, self.server_id, False) + if err_msg is not None or data['response'] != r1: + msg = _('Authentication error when connecting to %s') % self.name + if err_msg is not None: + msg = msg + '
' + err_msg + Audit.add_message(4001, msg, warning=True) + raise AuthError(send_error=True) + else: + log.debug('%s - Server authenticated', self.name) + + r2, err_msg = self.auth_response(data['challenge'], self.server_id, False) + if err_msg is not None: + msg = _('Authentication error when connecting to %s') % self.name + Audit.add_message(4001, msg + '
' + err_msg, warning=True) + raise AuthError(send_error=True) + + self.send_data({'response': r2}) + + data = self.receive_data() + if data.get('auth_error', False): + Audit.add_message(4001, _('Authentication error when connecting to %s') % self.name, warning=True) + raise AuthError(send_error=False) + else: + Audit.add_message(4001, _('Successfully connected to %s') % '%s, %s' % (self.connected_id, self.name)) + log.info('%s - Client authenticated', self.name) + else: + # + # server case + # + data = self.receive_data() + self.connected_id = data['server_id'] + r1, err_msg = self.auth_response(data['challenge'], self.connected_id, True) + if err_msg is not None: + msg = _('Error when authenticating client %s') % '%s %s' % (self.name, self.connected_id) + Audit.add_message(4002, msg + '
' + err_msg, warning=True) + raise AuthError(send_error=True) + + c1 = self.auth_challenge() + self.send_data({'server_id': self.server_id, + 'challenge': c1, 'response': r1}) + self.state = data['flow_state'] * -1 + + data = self.receive_data() + if data.get('auth_error', False): + msg = _('Error when authenticating client %s') % '%s, %s' % (self.connected_id, self.name) + Audit.add_message(4002, msg, warning=True) + raise socket.error('Client %s failed to authenticate server' % self.connected_id) + + r2, err_msg = self.auth_response(c1, self.connected_id, True) + if err_msg is not None or data['response'] != r2: + msg = _('Error when authenticating client %s') % '%s, %s' % (self.connected_id, self.name) + if err_msg is not None: + msg = msg + '
' + err_msg + Audit.add_message(4002, msg, warning=True) + raise AuthError(send_error=True) + else: + self.send_data({'auth_error': False}) + Audit.add_message(4002, _('Successfully connected to %s') % '%s, %s' % (self.connected_id, self.name)) + log.info('%s - Client authenticated', self.name) + self.user_check[0] = time.time() + + if self.state == self.STATE_READER: + self.name = '%s reader' % self.connected_id + self.workflow = self.reader_workflow + elif self.state == self.STATE_WRITER: + self.name = '%s writer' % self.connected_id + self.workflow = self.writer_workflow + self.in_q = parcelqueue.manager.get('tcp://%s' % self.connected_id) + known_connect = KnownConnect(self.in_q.queue_id, self.is_client) + parcelqueue.manager.put_known_connect(known_connect) + else: + log.error('Incorrect state value %s', self.state) + self.stop_event.set() + + def reader_workflow(self): + # receive parcel meta + data = self.receive_data() + + if not self.is_client and sum(self.user_check) > time.time(): + resp, err_msg = self.auth_response('', self.connected_id, True) + if err_msg is not None: + msg = _('Error when authenticating client %s') % '%s' % self.connected_id + Audit.add_message(4002, msg + '
' + err_msg, warning=True) + raise AuthError(send_error=True) + self.user_check[1] = time.time() + + # if it is a ping-request, sending a reply + if 'ping' in data: + self.send_pong() + return + + self.parcel = Parcel(data['params'], data['files'], + data['callback'], state=data['state']) + + if 'delivered' in self.parcel.params: + self.out_q.put(self.parcel) + self.send_transfer_notice() + return + + if not self.check_incoming_parcel(self.parcel): + log.warning('Incorrect parcel address') + self.send_data({'error': True, 'msg': 'Incorrect parcel address'}) + return + + parcelqueue.manager.put_parcel(self.parcel, constants.TRANSFER) + log.info('%s - Got a new parcel: %s', self.name, self.parcel) + + # receive parcel files (attachments) + for f in self.parcel.files: + log.info('%s - Receiving file: parcel_id = %s, filename = %s, size = %s', + self.name, self.parcel.id, f['name'], f['size']) + self.status = f['name'] + self.receive_file(f, self.parcel.id) + + log.info('%s - Finished processing parcel: %s', self.name, self.parcel) + + self.out_q.put(self.parcel) + self.send_transfer_notice() + self.status = self.IDLE + + def writer_workflow(self): + try: + self.parcel = self.in_q.get(block=True, timeout=0.1) + if 'delivered' not in self.parcel.params: + self.parcel.params['from'] = 'tcp://%s' % self.server_id + self.keepalive[0] = time.time() + for f in self.parcel.files: + file_path = os.path.join(self.static_dir, self.parcel.id, f['name']) + f['size'] = os.path.getsize(file_path) + log.info('%s - Sending parcel %s', self.name, self.parcel) + self.send_data({'params': self.parcel.params, + 'files': self.parcel.files, + 'callback': self.parcel.callback_url, + 'state': self.parcel.state}) + + while True: + data = self.receive_data() + if 'error' in data: + log.warning('Sending parcel failed: %s', data.get('msg')) + parcelqueue.manager.put_parcel(self.parcel, parcelqueue.ARCHIVE) + break + if 'transferred' in data: + parcelqueue.manager.put_parcel(self.parcel, parcelqueue.ARCHIVE) + break + if 'name' in data: + for f in self.parcel.files: + if f['name'] == data['name']: + self.status = f['name'] + filepath = os.path.join(self.static_dir, + self.parcel.id, f['name']) + offset = data.get('offset', 0) + log.info('%s - Sending file %s, offset = %s, ' + 'size = %s', self.name, f['name'], + offset, os.path.getsize(filepath)) + self.send_file(filepath, offset) + self.status = self.IDLE + self.parcel = None + + except queue.Empty: + curr_time = time.time() + if curr_time >= sum(self.keepalive): + self.send_data({'ping': True}) + if 'pong' not in self.receive_data(): + raise socket.error('Ping failed') + self.keepalive[0] = curr_time + + def join(self, timeout=None): + self.stop_event.set() + super(TCPWorker, self).join(timeout) + + def auth_challenge(self): + s = ServerProxy(settings.auth_provider) + try: + return s.auth_challenge() + except: + raise socket.error('Authentication provider not available, disconnecting') + + def auth_response(self, challenge, server_id, is_server): + s = ServerProxy(settings.auth_provider) + try: + result = s.auth_response(challenge, server_id, is_server) + except: + result = {'error': True, 'msg': _('Authentication server is unavailable')} + if result.get('error'): + return None, result.get('msg') + else: + return result.get('response'), None + + def check_incoming_parcel(self, parcel): + if parcel.params['from'] != 'tcp://%s' % self.connected_id: + return False + if parcel.params['to'] != 'tcp://%s' % self.server_id: + return False + return True + + def read_socket(self): + buf = self.socket.recv(self.bufsize) + if not buf: + raise socket.error('Disconnected') + return buf + + def receive_data(self): + if len(self.bufdata): + data = self.bufdata.popleft() + log.debug('%s - Received data: %s', self.name, data) + return json.loads(data) + data = '' + self.data_pos = 0 + self.data_size = 0 + while True: + while len(data) < 8: + block = self.read_socket() + data += block + self.total += len(block) + self.data_size = data_size = struct.unpack('q', data[:8])[0] + data = data[8:] + while len(data) < data_size: + block = self.read_socket() + data += block + self.total += len(block) + self.data_pos += len(block) + self.bufdata.append(data[:data_size]) + data = data[data_size:] + if not data: + break + data = self.bufdata.popleft() + if 'ping' not in data and 'pong' not in data: + log.debug('%s - Received data: %s', self.name, data) + return json.loads(data) + + def receive_file(self, f, parcel_id): + file_dir = os.path.join(self.static_dir, parcel_id) + file_path = os.path.join(file_dir, f['name']) + file_size = f['size'] + if os.path.exists(file_path): + real_size = os.path.getsize(file_path) + if real_size < file_size: + f['offset'] = real_size + else: + return + self.send_file_request(f) + + if f.get('offset', 0) > 0: + log.warning('Receiving file with offset: parcel_id = %s, ' + 'filename = %s, size = %s, offset = %s', + parcel_id, f['name'], f['size'], f.get('offset', 0)) + self.data_size = f['size'] + + if not os.path.exists(file_dir): + os.mkdir(file_dir, 750) + file_obj = open(file_path, mode='ab') + bytes_received = f.get('offset', 0) + self.data_pos = bytes_received + while bytes_received < file_size: + chunk = self.socket.recv(self.bufsize) + if not chunk: + raise socket.error('Disconnected') + file_obj.write(chunk) + bytes_received += len(chunk) + self.total += len(chunk) + self.data_pos = bytes_received + log.info('Received file: parcel_id = %s, filename = %s, ' + 'size = %s', parcel_id, f['name'], f['size']) + del f['size'] + + def send_file_native(self, file_path, offset=0): + file_obj = open(file_path, 'rb') + file_obj.seek(offset) + self.data_pos = offset + self.data_size = os.path.getsize(file_path) + while True: + data = file_obj.read(self.bufsize) + if not data: + break + self.socket.sendall(data) + self.total += len(data) + self.data_pos += len(data) + + def send_file_syscall(self, file_path, offset=0): + file_obj = open(file_path, 'rb') + read_offset = offset + self.data_pos = offset + self.data_size = os.path.getsize(file_path) + while True: + sent = sendfile(self.socket.fileno(), file_obj.fileno(), + read_offset, self.bufsize) + if sent == 0: + break + read_offset += sent + self.total += sent + self.data_pos += sent + + def send_data(self, data): + self.data_size = 0 + data = json.dumps(data) + if 'ping' not in data and 'pong' not in data: + log.debug('%s - Sending data: %s', self.name, data) + self.socket.sendall(struct.pack('q', len(data)) + data) + + def send_pong(self): + self.send_data({'pong': True}) + + def send_file_request(self, f): + log.debug('Send file request: filename = %s, size = %s, offset = %s', + f['name'], f['size'], f.get('offset', 0)) + self.send_data({'name': f['name'], + 'offset': f.get('offset', 0)}) + + def send_transfer_notice(self): + self.send_data({'transferred': True, + 'parcel_id': self.parcel.id}) + + def send_delivery_notice(self): + self.send_data({'notice': True, + 'parcel_id': self.parcel.id, + 'parcel_state': self.parcel.state}) diff --git a/enserver/worker/utils.py b/enserver/worker/utils.py new file mode 100644 index 0000000..5fb935a --- /dev/null +++ b/enserver/worker/utils.py @@ -0,0 +1,67 @@ +import logging +from datetime import datetime, timedelta + +from conf import settings +from parcel import parcelqueue +from parcel.base import LastConnect +from xmlrpc_wrapper import ServerProxy + +log = logging.getLogger('enserver.worker.utils') + +ROUNDING_UP_DAYS_COUNT = 1 + + +def disconnect_handle(is_client, workers=None): + if workers is None: + workers = [] + parcelqueue.manager.del_reconnected_last_connect() + all_queues_ids = parcelqueue.manager.get_all_existed_queue_id(is_client=is_client) + queues_with_workers = set(worker.in_q.queue_id for worker in workers if worker.in_q) + queue_without_workers_ids = set(*all_queues_ids) - queues_with_workers + for queue_id in queue_without_workers_ids: + last_connect = parcelqueue.manager.get_last_connect(queue_id, is_client=is_client) + if not last_connect: + last_connect = LastConnect(queue_id, is_client) + parcelqueue.manager.put_last_connect(last_connect) + else: + try: + float_disconnect_timeout = float(settings.disconnect_timeout) + except ValueError: + log.error('disconnect_timeout param is not a number') + raise + disconnect_timeout = timedelta(days=float_disconnect_timeout) + disconnect_timeout_is_expired = datetime.now() > last_connect.last_update + disconnect_timeout + if disconnect_timeout_is_expired: + server_proxy = ServerProxy(settings.replication_provider) + replicant_id = queue_id.replace('tcp://', '') + server_proxy.remove_active_replications_by_replicant_id(replicant_id) + parcelqueue.manager.remove_parcels_by_queue_id(queue_id) + if is_client: + parcelqueue.manager.remove_last_connect(queue_id, is_client=is_client) + else: + last_connect.need_replication = True + parcelqueue.manager.put_last_connect(last_connect) + parcelqueue.manager.queues.pop(queue_id, None) + + +def replication_after_reconnect(workers=None, is_client=False): + if workers is None: + workers = [] + queue_ids = set(worker.in_q.queue_id for worker in workers if worker.in_q) + for queue_id in queue_ids: + last_connect = parcelqueue.manager.get_last_connect(queue_id, is_client=is_client) + if last_connect and last_connect.need_replication: + try: + server_proxy = ServerProxy(settings.replication_provider) + replicant_id = last_connect.queue_id.replace('tcp://', '') + # rounding up the number of days when count disconnect_period + disconnect_period = (datetime.now() - last_connect.last_update).days + ROUNDING_UP_DAYS_COUNT + server_proxy.start_correction_replication(replicant_id, disconnect_period, 3600) + parcelqueue.manager.remove_last_connect(queue_id, is_client=is_client) + except Exception as e: + err_msg = str(e) + if 'replication exists' in err_msg: + replication_uuid = err_msg.split('(', 1)[1].split(',', 1)[0] + log.error( + 'Replication for {0} is already running: {1}'.format(last_connect.queue_id, replication_uuid) + ) diff --git a/enserver/worker/xmlrpc.py b/enserver/worker/xmlrpc.py new file mode 100644 index 0000000..7f01764 --- /dev/null +++ b/enserver/worker/xmlrpc.py @@ -0,0 +1,244 @@ +import logging +import os +import shutil +import threading +from xmlrpc.server import SimpleXMLRPCRequestHandler + +from conf import settings +from lib.http_utils import ThreadingXMLRPCServer +from parcel import constants, parcelqueue +from parcel.base import IncorrectParcel, Parcel +from worker.tcp import workers + +try: + from version import ENSERVER_VERSION, ENSERVER_HASH_LAST_COMMIT +except ImportError: + ENSERVER_VERSION = "" + ENSERVER_HASH_LAST_COMMIT = "" + + +log = logging.getLogger('enserver.http') + + +class XMLRPCHandler(SimpleXMLRPCRequestHandler): + rpc_paths = ('/', '/xmlrpc') + static_mime = 'application/zip' + + def do_GET(self): + static_prefix = settings.static_path + '/' + if not self.path.startswith(static_prefix): + self.report_404() + return + target = os.path.join(settings.storage, self.path[len(static_prefix):]) + if not os.path.isfile(target): + self.report_404() + return + try: + with open(target, "rb") as f: + self.send_response(200) + self.send_header('Content-type', self.static_mime) + self.send_header('Content-length', os.path.getsize(target)) + self.end_headers() + shutil.copyfileobj(f, self.wfile) + except IOError: + self.report_404() + + +class XMLRPCMethods(object): + + def send(self, params, files, callbackUrl): + try: + log.debug('XMLRPC send: %s, %s, %s', params, files, callbackUrl) + parcel = Parcel(params, files, callbackUrl) + log.info('Got parcel to send: %s; callback: %s', parcel, callbackUrl) + parcelqueue.manager.get(constants.DOWNLOAD).put(parcel) + except IncorrectParcel as ex: + log.warning(ex) + return False + except: + log.exception('Error in xmlrpc send') + return True + + def onReceived(self, params, files, callbackUrl): + try: + log.debug('XMLRPC onReceived: %s, %s, %s', params, files, callbackUrl) + parcel = Parcel(params, files, callbackUrl) + log.info('Got onReceived for parcel: %s', parcel) + self._notify_sender(parcel.id, True) + except: + log.exception('Error in xmlrpc onReceived') + return True + + def onError(self, params, files, callbackUrl): + try: + log.debug('XMLRPC onError: %s, %s, %s', params, files, callbackUrl) + parcel = Parcel(params, files, callbackUrl) + log.info('Got onError for parcel: %s', parcel) + self._notify_sender(parcel.id, False) + except: + log.exception('Error in xmlrpc onError') + return True + + def _notify_sender(self, parcel_id, delivered): + parcel = parcelqueue.manager.get_parcel(parcel_id) + if not parcel: + log.warning('No such parcel in archive: %s', parcel) + return + parcel.params['delivered'] = delivered + from_address = parcel.params['from'] + + unnotified = ['es://', 'soek://'] + + if filter(from_address.startswith, unnotified): + log.info('Protocol do not support delivery notification: %s', from_address) + elif from_address.startswith('local'): + parcelqueue.manager.get(constants.POSTMAN).put(parcel) + else: + parcelqueue.manager.get(from_address).put(parcel) + + def getParcelsByProtocol(self, protocol): + if protocol: + parcels = parcelqueue.manager.get_parcels_by_protocol(protocol) + for parcel in parcels: + for f in parcel.files: + f['url'] = os.path.join(settings.static_url, parcel.id, f['name']) + result = [{'id': parcel.id, 'params': parcel.params, 'files': parcel.files} for parcel in parcels] + else: + result = [] + return result + + def esParcelsDelivered(self, parcel_ids): + if parcel_ids: + for parcel in parcelqueue.manager.get_parcels(parcel_ids): + parcelqueue.manager.put_parcel(parcel, parcelqueue.ARCHIVE) + return True + + def getServerId(self): + return settings.id + + def getLastCommitHash(self): + return {'hash': ENSERVER_HASH_LAST_COMMIT, 'success': True} + + def getServerVersion(self): + return {'version': ENSERVER_VERSION, 'success': True} + + def getParcels(self, query): + order_by = query.get('order_by', 'added') + order_dir = query.get('order_dir', 'desc') + offset = query.get('offset', 0) + limit = query.get('limit', 50) + + where_params = {} + if query.get('parcels_filters'): + filter_data = query.get('parcels_filters') + for filter in filter_data: + if filter['field'] == 'to': + where_params['where_params_to'] = filter['value'] + + if filter['field'] == 'query_type': + where_params['where_params_query_type'] = filter['value'] + + sortable_fields = parcelqueue.manager.sortable_fields() + if order_by not in sortable_fields: + return {'success': False, + 'msg': 'invalid argument: order_by ' \ + 'should be one of {%s}' % ','.join(sortable_fields)} + if order_dir not in ['asc', 'desc']: + return {'success': False, + 'msg': 'invalid argument: order_dir ' \ + 'should be one of {asc, desc}'} + if not isinstance(offset, int): + return {'success': False, + 'msg': 'invalid argument: offset must be integer'} + if not isinstance(limit, int): + return {'success': False, + 'msg': 'invalid argument: limit must be integer'} + + total, parcels = parcelqueue.manager.get_parcels_list(order_by, order_dir, + offset, limit, where_params) + + return {'success': True, + 'total': total, + 'rows': parcels} + + def getParcels_data_by_field(self, field_name): + filterable_fields = parcelqueue.manager.filterable_fields() + if field_name not in filterable_fields: + return {'success': False, + 'msg': 'invalid argument: %s ' \ + 'should be one of {%s}' % (field_name, ','.join(filterable_fields))} + + parcels_data = parcelqueue.manager.get_parcels_list_data_by_field(field_name) + return {'success': True, 'rows': parcels_data} + + def getConnections(self): + total_connections = 0 + info = [] + for worker in workers: + connections = worker.connections() + total_connections += len(connections) + for connection in connections: + info.append(connection.info()) + + return {'success': True, + 'total': total_connections, + 'rows': info} + + def routeParcel(self, parcel_id, queue_id): + success, msg = parcelqueue.manager.route_parcel(parcel_id, queue_id) + result = {'success': success} + if msg: + result['msg'] = msg + return result + + def deleteParcels(self, queues_with_parcels): + removed_ids = parcelqueue.manager.remove_parcels(queues_with_parcels) + return {'success': True, 'removed_ids': removed_ids} + + def deleteParcelsByFilters(self, query): + where_params = {} + filter_data = query.get('parcels_filters', []) + + for filter in filter_data: + if filter['field'] == 'to': + where_params['where_params_to'] = filter['value'] + + if filter['field'] == 'query_type': + where_params['where_params_query_type'] = filter['value'] + + success = parcelqueue.manager.remove_parcels_by_filters(where_params) + return {'success': success} + + def queueList(self): + return {'success': True, + 'rows': parcelqueue.manager.queue_list()} + + def deleteReplicationsParcels(self, replications_recipients, replications_ids): + replications_recipients = ['tcp://%s' % recipient for recipient in replications_recipients] + success, message = parcelqueue.manager.remove_replications_parcels(replications_recipients, replications_ids) + + return {'success': success, 'msg': message} + + +class HttpWorker(threading.Thread): + def __init__(self): + self.listen_address = (settings.http_host, settings.http_port) + self.server = None + super(HttpWorker, self).__init__() + + def run(self): + self.server = ThreadingXMLRPCServer(self.listen_address, + requestHandler=XMLRPCHandler) + self.server.register_instance(XMLRPCMethods()) + log.info('Serving on %s:%s', *self.listen_address) + try: + self.server.serve_forever() + except Exception as exc: + log.critical(exc) + raise exc + + def join(self, timeout=None): + if self.server: + self.server.shutdown() + log.info('Bye!') + super(HttpWorker, self).join(timeout) diff --git a/enserver/xmlrpc_wrapper.py b/enserver/xmlrpc_wrapper.py new file mode 100644 index 0000000..75405a4 --- /dev/null +++ b/enserver/xmlrpc_wrapper.py @@ -0,0 +1,41 @@ +import http.client +import sys +import urllib +import xmlrpc.client + + +class Transport(xmlrpc.client.Transport): + def __init__(self, use_datetime=0, timeout=5.0): + self._timeout = timeout + self._connection = (None, None) + xmlrpc.client.Transport.__init__(self, use_datetime) + + def make_connection(self, host): + if self._connection and host == self._connection[0]: + return self._connection[1] + + chost, _, x509 = self.get_host_info(host) + self._connection = host, http.client.HTTPConnection(chost, timeout=self._timeout) + return self._connection[1] + + +class SafeTransport(xmlrpc.client.SafeTransport): + def __init__(self, use_datetime=0, timeout=5.0): + self._timeout = timeout + self._connection = (None, None) + xmlrpc.client.Transport.__init__(self, use_datetime) + + def make_connection(self, host): + if self._connection and host == self._connection[0]: + return self._connection[1] + + chost, _, x509 = self.get_host_info(host) + self._connection = host, http.client.HTTPSConnection(chost, None, timeout=self._timeout, **(x509 or {})) + return self._connection[1] + + +class ServerProxy(xmlrpc.client.ServerProxy): + def __init__(self, uri, transport=None, use_datetime=False, timeout=5.0, **kwargs): + if transport is None: + transport = Transport(use_datetime=use_datetime, timeout=timeout) + xmlrpc.client.ServerProxy.__init__(self, uri, transport, use_datetime=use_datetime, **kwargs) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..43ce59b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +python-daemon +psycopg2~=2.9.5 +setuptools~=65.6.3 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..88aeb4a --- /dev/null +++ b/setup.cfg @@ -0,0 +1,12 @@ +[bdist_rpm] +requires = argparse, psycopg2, python-daemon, logrotate + +[egg_info] +tag_build = +tag_date = 0 +tag_svn_revision = 0 + +[sdist_dsc] +depends = python-argparse, python-psycopg2, logrotate +package = enserver + diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..abf7d84 --- /dev/null +++ b/setup.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +import os +import platform +try: + from setuptools import setup +except ImportError: + from distutils.core import setup +from distutils.command.install import INSTALL_SCHEMES + + +# allow setup.py to be run from any path +os.chdir(os.path.normpath(os.path.join(os.path.abspath(__file__), os.pardir))) + +README = open(os.path.join(os.path.dirname(__file__), 'README.txt')).read() + +# Tell distutils not to put the data_files in platform-specific installation +# locations. See here for an explanation: +# http://groups.google.com/group/comp.lang.python/browse_thread/thread/35ec7b2fed36eaec/2105ee4d9e8042cb +for scheme in INSTALL_SCHEMES.values(): + scheme['data'] = scheme['purelib'] + +rpm_install = '' +rpm_postinstall = '' + +def fullsplit(path, result=None): + """ + Split a pathname into components (the opposite of os.path.join) in a + platform-neutral way. + """ + if result is None: + result = [] + head, tail = os.path.split(path) + if head == '': + return [tail] + result + if head == path: + return result + return fullsplit(head, [tail] + result) + + +def is_astra_linux(): + return 'Astra' in platform.dist()[0] + + +def find_packages(path): + packages = [] + for dirpath, dirnames, filenames in os.walk(path): + for i, dirname in enumerate(dirnames): + if dirname.startswith('.'): + del dirnames[i] + if '__init__.py' in filenames: + packages.append('.'.join(fullsplit(dirpath))) + return packages + + +def find_data_files(path): + global rpm_install, rpm_postinstall + if is_astra_linux(): + data_files = [ + ['/etc/default', ['data/astra/default/enserver']], + ['/etc/logrotate.d', ['data/astra/logrotate.d/enserver']], + ['/etc/systemd/system', ['data/astra/systemd/system/enserver.service']], + ] + else: + data_files = [ + ['/etc/sysconfig', ['data/zarya/sysconfig/enserver']], + ['/etc/init.d', ['data/zarya/init.d/enserver']], + ['/etc/logrotate.d', ['data/zarya/logrotate.d/enserver']], + ] + rpm_install = 'data/zarya/scripts/install.spec.inc' + rpm_postinstall = 'data/zarya/scripts/postinstall.sh' + for dirpath, dirnames, filenames in os.walk(path): + for i, dirname in enumerate(dirnames): + if dirname.startswith('.'): + del dirnames[i] + if filenames and '__init__.py' not in filenames: + data_files.append([dirpath, [os.path.join(dirpath, f) for f in filenames]]) + return data_files + +def gen_options(): + global rpm_install, rpm_postinstall + bdist_rpm = {} + if rpm_postinstall: + bdist_rpm['post_install'] = rpm_postinstall + if rpm_install: + bdist_rpm['install_script'] = rpm_install + return {'bdist_rpm':bdist_rpm} if bdist_rpm else {} + +def get_version(): + if 'VERSION' in os.environ: + return os.environ.get('VERSION') + else: + return open(os.path.join(os.path.dirname(__file__), 'VERSION')).read().strip() + +setup( + name='enserver', + version=get_version(), + packages=find_packages('enserver'), + data_files=find_data_files('enserver'), + options=gen_options(), + description='EN-Server', + long_description=README, + url='http://www.kronshtadt.ru/', + author='Alexander Lazarev', + author_email='alexander.lazarev2@kronshtadt.ru', + classifiers=[ + 'Environment :: Web Environment', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2.6', + 'Programming Language :: Python :: 2.7', + 'Topic :: Internet :: WWW/HTTP', + ], +) diff --git a/start.sh b/start.sh new file mode 100644 index 0000000..490bb59 --- /dev/null +++ b/start.sh @@ -0,0 +1 @@ +python enserver.py --debug --port=7001 --pidfile=/tmp/enserver.pid --logfile=/var/log/enserver.log --id=prilvbox --address=enserv --storage=/share/store/enserver --db-host=10.10.8.83 --db-port=32101 --host=10.10.78.40 --db-user=postgres --db-passwd=Root12345678 --auth-provider=http://10.10.8.81:9000/ --replication-provider=http://10.10.8.81:9000/ http://10.10.8.81:9000/ http://10.10.8.81:9000/ \ No newline at end of file diff --git a/ver.txt b/ver.txt new file mode 100644 index 0000000..197c4d5 --- /dev/null +++ b/ver.txt @@ -0,0 +1 @@ +2.4.0