126 lines
4.6 KiB
Python
126 lines
4.6 KiB
Python
# coding: utf-8
|
|
|
|
import logging
|
|
import os
|
|
import queue
|
|
import re
|
|
import urllib.request
|
|
import urllib.error
|
|
from xmlrpc.client import Fault
|
|
|
|
from conf import settings
|
|
from lib.threads import Worker
|
|
from parcel import constants, parcelqueue
|
|
from xmlrpc_wrapper import ServerProxy
|
|
|
|
|
|
class ParcelDownloader(Worker):
|
|
def __init__(self, in_q=None, bufsize=65536):
|
|
self.in_q = in_q or parcelqueue.manager.get(constants.DOWNLOAD)
|
|
self.bufsize = bufsize
|
|
self.static_dir = settings.storage
|
|
self.log = logging.getLogger('enserver.downloader')
|
|
self.re_url = re.compile('^(?P<protocol>.+://?)(?:(?P<login>.+?)(?::(?P<password>.+))?@)?(?P<path>.+)$')
|
|
super(ParcelDownloader, self).__init__()
|
|
|
|
def shift(self):
|
|
try:
|
|
parcel = self.in_q.get(True, 0.1)
|
|
self.receive_parcel(parcel)
|
|
except queue.Empty:
|
|
return
|
|
except Exception as ex:
|
|
self.log.exception(ex)
|
|
|
|
def receive_parcel(self, parcel):
|
|
params, files = parcel.params, parcel.files
|
|
self.log.info('Processing parcel: %s', parcel)
|
|
|
|
parcel_dir = os.path.join(self.static_dir, parcel.id)
|
|
if not os.path.exists(parcel_dir) and files:
|
|
os.mkdir(parcel_dir)
|
|
|
|
try:
|
|
for f in files:
|
|
self.download_url(parcel_dir, f['url'], f['name'])
|
|
except (urllib.error.URLError, urllib.error.HTTPError) as exc:
|
|
self.log.error('File download failed, sending onError! Exception: %s', exc)
|
|
self.notify_onerror(params, files, parcel.callback_url)
|
|
parcelqueue.manager.put_parcel(parcel, parcelqueue.ARCHIVE)
|
|
return
|
|
|
|
self.notify_onsent(params, files, parcel.callback_url)
|
|
self.force_delete(files, parcel.callback_url)
|
|
self.log.info('Finished processing parcel: %s', parcel)
|
|
|
|
self.transfer_parcel(parcel)
|
|
|
|
def download_url(self, parcel_dir, url, name):
|
|
self.log.info('Downloading file url %s', url)
|
|
|
|
match = self.re_url.match(url)
|
|
local = match is None
|
|
if not local:
|
|
url_groups = match.groupdict()
|
|
|
|
if 'login' in url_groups:
|
|
url = url_groups['protocol'] + url_groups['path']
|
|
passman = urllib.request.HTTPPasswordMgrWithDefaultRealm()
|
|
passman.add_password(None, url, url_groups.get('login'), url_groups.get('password') or '')
|
|
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(passman))
|
|
urllib.request.install_opener(opener)
|
|
|
|
fspath = os.path.join(parcel_dir, name)
|
|
with open(fspath, 'wb') as f:
|
|
if local:
|
|
u = open(url, 'rb')
|
|
else:
|
|
u = urllib.request.urlopen(url)
|
|
while True:
|
|
b = u.read(self.bufsize)
|
|
f.write(b)
|
|
if len(b) < self.bufsize:
|
|
break
|
|
if local:
|
|
u.close()
|
|
return fspath
|
|
|
|
def transfer_parcel(self, parcel):
|
|
deliver_to = parcel.params['to']
|
|
self.log.debug('Sending parcel to %s', deliver_to)
|
|
|
|
if deliver_to.startswith('local'):
|
|
parcel.params.setdefault('from', deliver_to)
|
|
parcelqueue.manager.get(constants.POSTMAN).put(parcel)
|
|
|
|
elif deliver_to.startswith('soek://'):
|
|
parcel.params.setdefault('from', 'soek://%s' % settings.ims_user)
|
|
parcelqueue.manager.get(constants.SOEK).put(parcel)
|
|
|
|
else:
|
|
parcelqueue.manager.get(deliver_to).put(parcel)
|
|
|
|
def notify_onsent(self, params, files, callback_url):
|
|
try:
|
|
sender = ServerProxy(callback_url)
|
|
sender.onSent(params, files, callback_url)
|
|
except Exception as exc:
|
|
self.log.warning('onSent failed for url {0}, exception: {1}'.format(callback_url, exc))
|
|
|
|
def notify_onerror(self, params, files, callback_url):
|
|
try:
|
|
sender = ServerProxy(callback_url)
|
|
sender.onError(params, files, callback_url)
|
|
except Exception as exc:
|
|
self.log.warning('onError failed for url {0}, exception: {1}'.format(callback_url, exc))
|
|
|
|
def force_delete(self, files, callback_url):
|
|
try:
|
|
sender = ServerProxy(callback_url)
|
|
sender.forceDelete(files)
|
|
except Fault as exc:
|
|
if exc.faultString == 'No such handler: forceDelete':
|
|
self.log.warning('forceDelete method if not supported by service: {0}'.format(callback_url))
|
|
except Exception as exc:
|
|
self.log.warning('forceDelete failed for url {0}, exception: {1}'.format(callback_url, exc))
|