improve timeouts

This commit is contained in:
Jack 2016-04-22 18:18:17 -04:00
parent bac7f097a9
commit 28f66e30dd
3 changed files with 37 additions and 23 deletions

View file

@ -625,6 +625,11 @@ class LBRYDaemon(jsonrpc.JSONRPC):
self.session_settings['upload_log'] = settings['upload_log'] self.session_settings['upload_log'] = settings['upload_log']
else: else:
return defer.fail() return defer.fail()
elif k == 'download_timeout':
if type(settings['download_timeout']) is int:
self.session_settings['download_timeout'] = settings['download_timeout']
else:
return defer.fail()
self.run_on_startup = self.session_settings['run_on_startup'] self.run_on_startup = self.session_settings['run_on_startup']
self.data_rate = self.session_settings['data_rate'] self.data_rate = self.session_settings['data_rate']
@ -633,6 +638,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
self.max_upload = self.session_settings['max_upload'] self.max_upload = self.session_settings['max_upload']
self.max_download = self.session_settings['max_download'] self.max_download = self.session_settings['max_download']
self.upload_log = self.session_settings['upload_log'] self.upload_log = self.session_settings['upload_log']
self.download_timeout = self.session_settings['download_timeout']
f = open(self.daemon_conf, "w") f = open(self.daemon_conf, "w")
f.write(json.dumps(self.session_settings)) f.write(json.dumps(self.session_settings))
@ -864,7 +870,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
def _disp_file(f): def _disp_file(f):
file_path = os.path.join(self.download_directory, f.file_name) file_path = os.path.join(self.download_directory, f.file_name)
log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.stream_hash) + " --> " + file_path) log.info("[" + str(datetime.now()) + "] Already downloaded: " + str(f.stream_hash) + " --> " + file_path)
return defer.succeed(f) return {'stream_hash': f.stream_hash, 'path': file_path}
def _get_stream(name): def _get_stream(name):
def _disp(stream): def _disp(stream):
@ -880,14 +886,13 @@ class LBRYDaemon(jsonrpc.JSONRPC):
max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout, max_key_fee=self.max_key_fee, data_rate=self.data_rate, timeout=timeout,
download_directory=download_directory) download_directory=download_directory)
d.addCallback(_disp) d.addCallback(_disp)
d.addCallback(lambda stream_info: stream.start(stream_info)) d.addCallback(lambda stream_info: stream.start(stream_info, name))
d.addCallback(lambda _: self._path_from_name(name)) d.addCallback(lambda r: {'stream_hash': r[0], 'path': r[1]} if r else server.failure)
return d return d
d = self._check_history(name) d = self._check_history(name)
d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file)) d.addCallback(lambda lbry_file: _get_stream(name) if not lbry_file else _disp_file(lbry_file))
d.addCallback(lambda _: self._path_from_name(name))
return d return d
@ -933,7 +938,8 @@ class LBRYDaemon(jsonrpc.JSONRPC):
path = os.path.join(self.blobfile_dir, stream_hash) path = os.path.join(self.blobfile_dir, stream_hash)
if os.path.isfile(path): if os.path.isfile(path):
log.info("[" + str(datetime.now()) + "] Search for lbry_file, returning: " + stream_hash) file_size = os.stat(path).st_size
log.info("[" + str(datetime.now()) + "] Search for lbry_file, found " + str(file_size) + " bytes written from stream hash: " + stream_hash)
return defer.succeed(_get_lbry_file(path)) return defer.succeed(_get_lbry_file(path))
else: else:
log.info("[" + str(datetime.now()) + "] Search for lbry_file didn't return anything") log.info("[" + str(datetime.now()) + "] Search for lbry_file didn't return anything")
@ -1188,6 +1194,7 @@ class LBRYDaemon(jsonrpc.JSONRPC):
'max_upload': float, 0.0 for unlimited 'max_upload': float, 0.0 for unlimited
'max_download': float, 0.0 for unlimited 'max_download': float, 0.0 for unlimited
'upload_log': bool, 'upload_log': bool,
'download_timeout': int
Returns: Returns:
settings dict settings dict
""" """

View file

@ -50,6 +50,11 @@ class LBRYindex(resource.Resource):
return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request) return static.File(os.path.join(self.ui_dir, "index.html")).render_GET(request)
class HostedLBRYFile(static.File):
def __init__(self, path):
static.File.__init__(self, path=path)
class LBRYFileRender(resource.Resource): class LBRYFileRender(resource.Resource):
isLeaf = False isLeaf = False
@ -58,7 +63,7 @@ class LBRYFileRender(resource.Resource):
api = jsonrpc.Proxy(API_CONNECTION_STRING) api = jsonrpc.Proxy(API_CONNECTION_STRING)
if request.args['name'][0] != 'lbry': if request.args['name'][0] != 'lbry':
d = api.callRemote("get", {'name': request.args['name'][0]}) d = api.callRemote("get", {'name': request.args['name'][0]})
d.addCallback(lambda results: static.File(results['path'])) d.addCallback(lambda results: HostedLBRYFile(results['path']))
d.addCallback(lambda static_file: static_file.render_GET(request) if static_file.getFileSize() > 0 d.addCallback(lambda static_file: static_file.render_GET(request) if static_file.getFileSize() > 0
else server.failure) else server.failure)
else: else:

View file

@ -13,6 +13,7 @@ from lbrynet.lbryfilemanager.LBRYFileDownloader import ManagedLBRYFileDownloader
from lbrynet.conf import DEFAULT_TIMEOUT from lbrynet.conf import DEFAULT_TIMEOUT
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
class GetStream(object): class GetStream(object):
@ -39,22 +40,24 @@ class GetStream(object):
self.timeout_counter = 0 self.timeout_counter = 0
self.download_directory = download_directory self.download_directory = download_directory
self.download_path = None self.download_path = None
self.finished = defer.Deferred()
self.checker = LoopingCall(self.check_status) self.checker = LoopingCall(self.check_status)
def check_status(self): def check_status(self):
self.timeout_counter += 1 self.timeout_counter += 1
if self.download_path and os.path.isfile(self.download_path): if self.download_path:
self.checker.stop() self.checker.stop()
return defer.succeed(True) self.finished.callback((self.stream_hash, self.download_path))
elif self.timeout_counter >= self.timeout: elif self.timeout_counter >= self.timeout:
log.info("Timeout downloading " + str(self.stream_info)) log.info("Timeout downloading lbry://" + self.resolved_name + ", " + str(self.stream_info))
self.checker.stop() self.checker.stop()
self.d.cancel() self.d.cancel()
self.finished.callback(False)
def start(self, stream_info): def start(self, stream_info, name):
self.resolved_name = name
self.stream_info = stream_info self.stream_info = stream_info
if 'stream_hash' in self.stream_info.keys(): if 'stream_hash' in self.stream_info.keys():
self.description = self.stream_info['description'] self.description = self.stream_info['description']
@ -84,21 +87,24 @@ class GetStream(object):
else: else:
pass pass
def _cause_timeout():
self.timeout_counter = self.timeout * 2
self.checker.start(1) self.checker.start(1)
self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager)) self.d.addCallback(lambda _: download_sd_blob(self.session, self.stream_hash, self.payment_rate_manager))
self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob)
self.d.addCallback(lambda metadata: (next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)), metadata)) self.d.addCallback(lambda metadata: (
next(factory for factory in metadata.factories if isinstance(factory, ManagedLBRYFileDownloaderFactory)),
metadata))
self.d.addCallback(lambda (factory, metadata): factory.make_downloader(metadata, self.d.addCallback(lambda (factory, metadata): factory.make_downloader(metadata,
[self.data_rate, True], [self.data_rate, True],
self.payment_rate_manager, self.payment_rate_manager,
download_directory=self.download_directory)) download_directory=self.download_directory))
self.d.addErrback(lambda err: err.trap(defer.CancelledError)) self.d.addCallbacks(self._start_download, lambda _: _cause_timeout())
self.d.addErrback(lambda err: log.error("An exception occurred attempting to load the stream descriptor: %s", err.getTraceback()))
self.d.addCallback(self._start_download)
self.d.callback(None) self.d.callback(None)
return self.d return self.finished
def _start_download(self, downloader): def _start_download(self, downloader):
def _pay_key_fee(): def _pay_key_fee():
@ -114,14 +120,10 @@ class GetStream(object):
d = _pay_key_fee() d = _pay_key_fee()
else: else:
d = defer.Deferred() d = defer.Deferred()
downloader.start()
self.download_path = os.path.join(downloader.download_directory, downloader.file_name) self.download_path = os.path.join(downloader.download_directory, downloader.file_name)
log.info("Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)) d.addCallback(lambda _: log.info("Downloading " + str(self.stream_hash) + " --> " + str(self.download_path)))
d.addCallback(lambda _: downloader.start())
return d d.callback()
class FetcherDaemon(object): class FetcherDaemon(object):
def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf, def __init__(self, session, lbry_file_manager, lbry_file_metadata_manager, wallet, sd_identifier, autofetcher_conf,