diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index f5f706b5a..0c36f232c 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -1,6 +1,7 @@ from random import choice import logging from twisted.internet import defer, task +from twisted.internet.error import ConnectingCancelledError import treq from lbrynet.core.utils import DeferredDict from lbrynet.core.Error import DownloadCanceledError @@ -16,69 +17,105 @@ class HTTPBlobDownloader(object): to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between different types of downloaders. ''' - def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True): + def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True, + clock=None): + if not clock: + from twisted.internet import reactor + self.clock = reactor + else: + self.clock = clock self.blob_manager = blob_manager self.servers = servers or [] self.client = client or treq self.blob_hashes = blob_hashes or [] self.missing_blob_hashes = [] + self.downloaded_blob_hashes = [] self.sd_hashes = sd_hashes or [] self.head_blob_hashes = [] self.max_failures = 3 - self.running = False self.semaphore = defer.DeferredSemaphore(2) self.deferreds = [] self.writers = [] self.retry = retry - self.looping_call = task.LoopingCall(self._download_and_retry) + self.looping_call = task.LoopingCall(self._download_lc) + self.looping_call.clock = self.clock self.finished_deferred = defer.Deferred() - self.last_missing = 100000000 + self.finished_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) + self.short_delay = 30 + self.long_delay = 600 + self.delay = self.short_delay + self.last_missing = 10000000 + self.lc_deferred = None @defer.inlineCallbacks - def _download_and_retry(self): - if not self.running and self.blob_hashes and self.servers: - yield self._download_blobs() - if self.retry and self.missing_blob_hashes: - if len(self.missing_blob_hashes) < self.last_missing: - self.last_missing = len(self.missing_blob_hashes) - log.info("queueing retry of %i blobs", len(self.missing_blob_hashes)) - while self.missing_blob_hashes: - self.blob_hashes.append(self.missing_blob_hashes.pop()) - defer.returnValue(None) - if self.looping_call.running: - self.looping_call.stop() - if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing: - log.info("mirror not making progress, trying less frequently") - self.looping_call.start(600, now=False) - elif not self.finished_deferred.called: - self.finished_deferred.callback(None) - log.info("mirror finished") - def start(self): - if not self.running: - self.looping_call.start(30) - self.running = True - return self.finished_deferred + if not self.looping_call.running: + self.lc_deferred = self.looping_call.start(self.short_delay, now=True) + self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) + yield self.finished_deferred def stop(self): - if self.running: - for d in reversed(self.deferreds): - d.cancel() - while self.writers: - writer = self.writers.pop() - writer.close(DownloadCanceledError()) - self.running = False - self.blob_hashes = [] + for d in reversed(self.deferreds): + d.cancel() + while self.writers: + writer = self.writers.pop() + writer.close(DownloadCanceledError()) + self.blob_hashes = [] if self.looping_call.running: self.looping_call.stop() + if self.lc_deferred and not self.lc_deferred.called: + self.lc_deferred.cancel() + if not self.finished_deferred.called: + self.finished_deferred.cancel() @defer.inlineCallbacks - def _download_blobs(self): - blobs = yield DeferredDict( - {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} - ) - self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] - yield defer.DeferredList(self.deferreds) + def _download_lc(self): + delay = yield self._download_and_get_retry_delay() + log.debug("delay: %s, missing: %i, downloaded from mirror: %i", delay, len(self.missing_blob_hashes), + len(self.downloaded_blob_hashes)) + while self.missing_blob_hashes: + self.blob_hashes.append(self.missing_blob_hashes.pop()) + if not delay: + if self.looping_call.running: + self.looping_call.stop() + if not self.finished_deferred.called: + log.debug("mirror finished") + self.finished_deferred.callback(None) + elif delay and delay != self.delay: + if delay == self.long_delay: + log.debug("mirror not making progress, trying less frequently") + elif delay == self.short_delay: + log.debug("queueing retry of %i blobs", len(self.missing_blob_hashes)) + if self.looping_call.running: + self.looping_call.stop() + self.delay = delay + self.looping_call = task.LoopingCall(self._download_lc) + self.looping_call.clock = self.clock + self.lc_deferred = self.looping_call.start(self.delay, now=False) + self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError)) + yield self.finished_deferred + + @defer.inlineCallbacks + def _download_and_get_retry_delay(self): + if self.blob_hashes and self.servers: + if self.sd_hashes: + log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8]) + else: + log.debug("trying to download %i blobs from mirror", len(self.blob_hashes)) + blobs = yield DeferredDict( + {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} + ) + self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] + yield defer.DeferredList(self.deferreds) + if self.retry and self.missing_blob_hashes: + if not self.downloaded_blob_hashes: + defer.returnValue(self.long_delay) + if len(self.missing_blob_hashes) < self.last_missing: + self.last_missing = len(self.missing_blob_hashes) + defer.returnValue(self.short_delay) + if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing: + defer.returnValue(self.long_delay) + defer.returnValue(None) @defer.inlineCallbacks def _download_blob(self, blob): @@ -93,9 +130,12 @@ class HTTPBlobDownloader(object): log.info('Mirror completed download for %s', blob.blob_hash) should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes yield self.blob_manager.blob_completed(blob, should_announce=should_announce) + self.downloaded_blob_hashes.append(blob.blob_hash) break - except (IOError, Exception, defer.CancelledError) as e: - if isinstance(e, (DownloadCanceledError, defer.CancelledError)) or 'closed file' in str(e): + except (IOError, Exception, defer.CancelledError, ConnectingCancelledError) as e: + if isinstance( + e, (DownloadCanceledError, defer.CancelledError, ConnectingCancelledError) + ) or 'closed file' in str(e): # some other downloader finished first or it was simply cancelled log.info("Mirror download cancelled: %s", blob.blob_hash) break diff --git a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py index c6b0efcee..3c40e997a 100644 --- a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py +++ b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py @@ -75,11 +75,10 @@ class HTTPBlobDownloaderTest(unittest.TestCase): self.assertEqual(self.blob.get_is_verified(), False) self.assertEqual(self.blob.writers, {}) - @defer.inlineCallbacks def test_stop(self): self.client.collect.side_effect = lambda response, write: defer.Deferred() self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop - yield self.downloader.stop() + self.downloader.stop() self.blob_manager.get_blob.assert_called_with(self.blob_hash) self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) self.client.collect.assert_called()