diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index cf616d16b..df56dda49 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -17,11 +17,13 @@ class HTTPBlobDownloader(object): to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between different types of downloaders. ''' - def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None): + def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None): self.blob_manager = blob_manager self.servers = servers or [] self.client = client or treq self.blob_hashes = blob_hashes or [] + self.sd_hashes = sd_hashes or [] + self.head_blob_hashes = [] self.max_failures = 3 self.running = False self.semaphore = defer.DeferredSemaphore(2) @@ -66,6 +68,12 @@ class HTTPBlobDownloader(object): yield finished_deferred # yield for verification errors, so we log them if blob.verified: log.info('Mirror completed download for %s', blob.blob_hash) + b_h = blob.blob_hash + if b_h in self.sd_hashes or b_h in self.head_blob_hashes: + should_announce = True + else: + should_announce = False + yield self.blob_manager.blob_completed(blob, should_announce=should_announce) break except (IOError, Exception) as e: if isinstance(e, DownloadCanceledError) or 'closed file' in str(e): @@ -80,7 +88,6 @@ class HTTPBlobDownloader(object): writer.close() self.writers.remove(writer) - @defer.inlineCallbacks def _write_blob(self, writer, blob): response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) @@ -95,6 +102,19 @@ class HTTPBlobDownloader(object): yield self.client.collect(response, writer.write) defer.returnValue(True) + @defer.inlineCallbacks + def download_stream(self, stream_hash, sd_hash): + blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash) + blob_hashes = [ + b.blob_hash for b in blobs if b.blob_hash is not None and b.blob_hash not in self.blob_hashes + ] + self.blob_hashes.extend(blob_hashes) + if sd_hash not in self.sd_hashes: + self.sd_hashes.append(sd_hash) + if blob_hashes[0] not in self.head_blob_hashes: + self.head_blob_hashes.append(blob_hashes[0]) + yield self.start() + def url_for(server, blob_hash=''): return 'http://{}/{}'.format(server, blob_hash) diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 89831a3ba..4b32cf2ab 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -446,7 +446,7 @@ def download_sd_blob(blob_hash, blob_manager, peer_finder, rate_limiter, payment payment_rate_manager, wallet, timeout) - mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or []) + mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash]) mirror.start() sd_blob = yield downloader.download() mirror.stop() diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 29fc92c1a..71897dcd5 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -132,9 +132,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): status = yield self._save_status() log_status(self.sd_hash, status) if self.mirror: - blobs = yield self.storage.get_blobs_for_stream(self.stream_hash) - self.mirror.blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None] - self.mirror.start() + self.mirror.download_stream(self.stream_hash, self.sd_hash) defer.returnValue(status) def _get_finished_deferred_callback_value(self):