diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index df56dda49..f5f706b5a 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -1,9 +1,8 @@ from random import choice import logging - -from twisted.internet import defer +from twisted.internet import defer, task import treq - +from lbrynet.core.utils import DeferredDict from lbrynet.core.Error import DownloadCanceledError log = logging.getLogger(__name__) @@ -17,11 +16,12 @@ class HTTPBlobDownloader(object): to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between different types of downloaders. ''' - def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None): + def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True): self.blob_manager = blob_manager self.servers = servers or [] self.client = client or treq self.blob_hashes = blob_hashes or [] + self.missing_blob_hashes = [] self.sd_hashes = sd_hashes or [] self.head_blob_hashes = [] self.max_failures = 3 @@ -29,36 +29,59 @@ class HTTPBlobDownloader(object): self.semaphore = defer.DeferredSemaphore(2) self.deferreds = [] self.writers = [] + self.retry = retry + self.looping_call = task.LoopingCall(self._download_and_retry) + self.finished_deferred = defer.Deferred() + self.last_missing = 100000000 + + @defer.inlineCallbacks + def _download_and_retry(self): + if not self.running and self.blob_hashes and self.servers: + yield self._download_blobs() + if self.retry and self.missing_blob_hashes: + if len(self.missing_blob_hashes) < self.last_missing: + self.last_missing = len(self.missing_blob_hashes) + log.info("queueing retry of %i blobs", len(self.missing_blob_hashes)) + while self.missing_blob_hashes: + self.blob_hashes.append(self.missing_blob_hashes.pop()) + defer.returnValue(None) + if self.looping_call.running: + self.looping_call.stop() + if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing: + log.info("mirror not making progress, trying less frequently") + self.looping_call.start(600, now=False) + elif not self.finished_deferred.called: + self.finished_deferred.callback(None) + log.info("mirror finished") def start(self): - if not self.running and self.blob_hashes and self.servers: - return self._start() - defer.succeed(None) + if not self.running: + self.looping_call.start(30) + self.running = True + return self.finished_deferred def stop(self): if self.running: for d in reversed(self.deferreds): d.cancel() - for writer in self.writers: + while self.writers: + writer = self.writers.pop() writer.close(DownloadCanceledError()) self.running = False self.blob_hashes = [] + if self.looping_call.running: + self.looping_call.stop() @defer.inlineCallbacks - def _start(self): - self.running = True - dl = [] - for blob_hash in self.blob_hashes: - blob = yield self.blob_manager.get_blob(blob_hash) - if not blob.verified: - d = self.semaphore.run(self.download_blob, blob) - d.addErrback(lambda err: err.check(defer.TimeoutError, defer.CancelledError)) - dl.append(d) - self.deferreds = dl - yield defer.DeferredList(dl) + def _download_blobs(self): + blobs = yield DeferredDict( + {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} + ) + self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] + yield defer.DeferredList(self.deferreds) @defer.inlineCallbacks - def download_blob(self, blob): + def _download_blob(self, blob): for _ in range(self.max_failures): writer, finished_deferred = blob.open_for_writing('mirror') self.writers.append(writer) @@ -68,15 +91,11 @@ class HTTPBlobDownloader(object): yield finished_deferred # yield for verification errors, so we log them if blob.verified: log.info('Mirror completed download for %s', blob.blob_hash) - b_h = blob.blob_hash - if b_h in self.sd_hashes or b_h in self.head_blob_hashes: - should_announce = True - else: - should_announce = False + should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes yield self.blob_manager.blob_completed(blob, should_announce=should_announce) break - except (IOError, Exception) as e: - if isinstance(e, DownloadCanceledError) or 'closed file' in str(e): + except (IOError, Exception, defer.CancelledError) as e: + if isinstance(e, (DownloadCanceledError, defer.CancelledError)) or 'closed file' in str(e): # some other downloader finished first or it was simply cancelled log.info("Mirror download cancelled: %s", blob.blob_hash) break @@ -88,6 +107,13 @@ class HTTPBlobDownloader(object): writer.close() self.writers.remove(writer) + def download_blob(self, blob): + if not blob.verified: + d = self.semaphore.run(self._download_blob, blob) + d.addErrback(lambda err: err.trap(defer.TimeoutError, defer.CancelledError)) + return d + return defer.succeed(None) + @defer.inlineCallbacks def _write_blob(self, writer, blob): response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) @@ -95,6 +121,8 @@ class HTTPBlobDownloader(object): log.debug('Missing a blob: %s', blob.blob_hash) if blob.blob_hash in self.blob_hashes: self.blob_hashes.remove(blob.blob_hash) + if blob.blob_hash not in self.missing_blob_hashes: + self.missing_blob_hashes.append(blob.blob_hash) defer.returnValue(False) log.debug('Download started: %s', blob.blob_hash) @@ -104,15 +132,16 @@ class HTTPBlobDownloader(object): @defer.inlineCallbacks def download_stream(self, stream_hash, sd_hash): - blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash) - blob_hashes = [ - b.blob_hash for b in blobs if b.blob_hash is not None and b.blob_hash not in self.blob_hashes - ] - self.blob_hashes.extend(blob_hashes) + stream_crypt_blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash) + self.blob_hashes.extend([ + b.blob_hash for b in stream_crypt_blobs + if b.blob_hash and b.blob_hash not in self.blob_hashes + ]) if sd_hash not in self.sd_hashes: self.sd_hashes.append(sd_hash) - if blob_hashes[0] not in self.head_blob_hashes: - self.head_blob_hashes.append(blob_hashes[0]) + head_blob_hash = stream_crypt_blobs[0].blob_hash + if head_blob_hash not in self.head_blob_hashes: + self.head_blob_hashes.append(head_blob_hash) yield self.start() diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 4b32cf2ab..32a220f1c 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -446,7 +446,7 @@ def download_sd_blob(blob_hash, blob_manager, peer_finder, rate_limiter, payment payment_rate_manager, wallet, timeout) - mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash]) + mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash], retry=False) mirror.start() sd_blob = yield downloader.download() mirror.stop() diff --git a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py index 9187b55d9..c6b0efcee 100644 --- a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py +++ b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py @@ -19,10 +19,11 @@ class HTTPBlobDownloaderTest(unittest.TestCase): self.blob_manager.get_blob.side_effect = lambda _: defer.succeed(self.blob) self.response = MagicMock(code=200, length=400) self.client.get.side_effect = lambda uri: defer.succeed(self.response) - self.downloader = HTTPBlobDownloader(self.blob_manager, [self.blob_hash], ['server1'], self.client) + self.downloader = HTTPBlobDownloader(self.blob_manager, [self.blob_hash], ['server1'], self.client, retry=False) self.downloader.interval = 0 def tearDown(self): + self.downloader.stop() rm_db_and_blob_dir(self.db_dir, self.blob_dir) @defer.inlineCallbacks @@ -33,7 +34,7 @@ class HTTPBlobDownloaderTest(unittest.TestCase): self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) self.client.collect.assert_called() self.assertEqual(self.blob.get_length(), self.response.length) - self.assertEqual(self.blob.get_is_verified(), True) + self.assertTrue(self.blob.get_is_verified()) self.assertEqual(self.blob.writers, {}) @defer.inlineCallbacks