cancel streams on shutdown

This commit is contained in:
Jack Robison 2017-09-28 13:51:20 -04:00
parent 89ebed570e
commit 50b51569a3
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
2 changed files with 20 additions and 5 deletions

View file

@ -392,6 +392,11 @@ class Daemon(AuthJSONRPCServer):
def _already_shutting_down(sig_num, frame): def _already_shutting_down(sig_num, frame):
log.info("Already shutting down") log.info("Already shutting down")
def _stop_streams(self):
"""stop pending GetStream downloads"""
for claim_id, stream in self.streams.iteritems():
stream.cancel(reason="daemon shutdown")
def _shutdown(self): def _shutdown(self):
# ignore INT/TERM signals once shutdown has started # ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down) signal.signal(signal.SIGINT, self._already_shutting_down)
@ -399,6 +404,9 @@ class Daemon(AuthJSONRPCServer):
log.info("Closing lbrynet session") log.info("Closing lbrynet session")
log.info("Status at time of shutdown: " + self.startup_status[0]) log.info("Status at time of shutdown: " + self.startup_status[0])
self._stop_streams()
self.looping_call_manager.shutdown() self.looping_call_manager.shutdown()
if self.analytics_manager: if self.analytics_manager:
self.analytics_manager.shutdown() self.analytics_manager.shutdown()

View file

@ -151,9 +151,7 @@ class GetStream(object):
def fail(self, err): def fail(self, err):
safe_stop_looping_call(self.checker) safe_stop_looping_call(self.checker)
if not err.check(DownloadDataTimeout): raise err
raise err
return DownloadCanceledError()
@defer.inlineCallbacks @defer.inlineCallbacks
def _initialize(self, stream_info): def _initialize(self, stream_info):
@ -184,8 +182,7 @@ class GetStream(object):
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
self.fail)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self, stream_info, name): def start(self, stream_info, name):
@ -215,3 +212,13 @@ class GetStream(object):
raise err raise err
defer.returnValue((self.downloader, self.finished_deferred)) defer.returnValue((self.downloader, self.finished_deferred))
def cancel(self, reason=None):
if reason:
msg = "download stream cancelled: %s" % reason
else:
msg = "download stream cancelled"
if self.finished_deferred and not self.finished_deferred.called:
self.finished_deferred.errback(DownloadCanceledError(msg))
if self.data_downloading_deferred and not self.data_downloading_deferred.called:
self.data_downloading_deferred.errback(DownloadCanceledError(msg))