forked from LBRYCommunity/lbry-sdk
fix announcement of blobs downloaded from a http mirror
This commit is contained in:
parent
93be8b9e31
commit
fe43764bcb
3 changed files with 24 additions and 6 deletions
|
@ -17,11 +17,13 @@ class HTTPBlobDownloader(object):
|
||||||
to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between
|
to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between
|
||||||
different types of downloaders.
|
different types of downloaders.
|
||||||
'''
|
'''
|
||||||
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None):
|
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None):
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.servers = servers or []
|
self.servers = servers or []
|
||||||
self.client = client or treq
|
self.client = client or treq
|
||||||
self.blob_hashes = blob_hashes or []
|
self.blob_hashes = blob_hashes or []
|
||||||
|
self.sd_hashes = sd_hashes or []
|
||||||
|
self.head_blob_hashes = []
|
||||||
self.max_failures = 3
|
self.max_failures = 3
|
||||||
self.running = False
|
self.running = False
|
||||||
self.semaphore = defer.DeferredSemaphore(2)
|
self.semaphore = defer.DeferredSemaphore(2)
|
||||||
|
@ -66,6 +68,12 @@ class HTTPBlobDownloader(object):
|
||||||
yield finished_deferred # yield for verification errors, so we log them
|
yield finished_deferred # yield for verification errors, so we log them
|
||||||
if blob.verified:
|
if blob.verified:
|
||||||
log.info('Mirror completed download for %s', blob.blob_hash)
|
log.info('Mirror completed download for %s', blob.blob_hash)
|
||||||
|
b_h = blob.blob_hash
|
||||||
|
if b_h in self.sd_hashes or b_h in self.head_blob_hashes:
|
||||||
|
should_announce = True
|
||||||
|
else:
|
||||||
|
should_announce = False
|
||||||
|
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
|
||||||
break
|
break
|
||||||
except (IOError, Exception) as e:
|
except (IOError, Exception) as e:
|
||||||
if isinstance(e, DownloadCanceledError) or 'closed file' in str(e):
|
if isinstance(e, DownloadCanceledError) or 'closed file' in str(e):
|
||||||
|
@ -80,7 +88,6 @@ class HTTPBlobDownloader(object):
|
||||||
writer.close()
|
writer.close()
|
||||||
self.writers.remove(writer)
|
self.writers.remove(writer)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _write_blob(self, writer, blob):
|
def _write_blob(self, writer, blob):
|
||||||
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
|
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
|
||||||
|
@ -95,6 +102,19 @@ class HTTPBlobDownloader(object):
|
||||||
yield self.client.collect(response, writer.write)
|
yield self.client.collect(response, writer.write)
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def download_stream(self, stream_hash, sd_hash):
|
||||||
|
blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash)
|
||||||
|
blob_hashes = [
|
||||||
|
b.blob_hash for b in blobs if b.blob_hash is not None and b.blob_hash not in self.blob_hashes
|
||||||
|
]
|
||||||
|
self.blob_hashes.extend(blob_hashes)
|
||||||
|
if sd_hash not in self.sd_hashes:
|
||||||
|
self.sd_hashes.append(sd_hash)
|
||||||
|
if blob_hashes[0] not in self.head_blob_hashes:
|
||||||
|
self.head_blob_hashes.append(blob_hashes[0])
|
||||||
|
yield self.start()
|
||||||
|
|
||||||
|
|
||||||
def url_for(server, blob_hash=''):
|
def url_for(server, blob_hash=''):
|
||||||
return 'http://{}/{}'.format(server, blob_hash)
|
return 'http://{}/{}'.format(server, blob_hash)
|
||||||
|
|
|
@ -446,7 +446,7 @@ def download_sd_blob(blob_hash, blob_manager, peer_finder, rate_limiter, payment
|
||||||
payment_rate_manager,
|
payment_rate_manager,
|
||||||
wallet,
|
wallet,
|
||||||
timeout)
|
timeout)
|
||||||
mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [])
|
mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash])
|
||||||
mirror.start()
|
mirror.start()
|
||||||
sd_blob = yield downloader.download()
|
sd_blob = yield downloader.download()
|
||||||
mirror.stop()
|
mirror.stop()
|
||||||
|
|
|
@ -132,9 +132,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
status = yield self._save_status()
|
status = yield self._save_status()
|
||||||
log_status(self.sd_hash, status)
|
log_status(self.sd_hash, status)
|
||||||
if self.mirror:
|
if self.mirror:
|
||||||
blobs = yield self.storage.get_blobs_for_stream(self.stream_hash)
|
self.mirror.download_stream(self.stream_hash, self.sd_hash)
|
||||||
self.mirror.blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
|
|
||||||
self.mirror.start()
|
|
||||||
defer.returnValue(status)
|
defer.returnValue(status)
|
||||||
|
|
||||||
def _get_finished_deferred_callback_value(self):
|
def _get_finished_deferred_callback_value(self):
|
||||||
|
|
Loading…
Reference in a new issue