retry mirror downloads

This commit is contained in:
Jack Robison 2018-08-22 13:12:19 -04:00
parent c48d195951
commit 2c546b2cde
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 67 additions and 37 deletions

View file

@ -1,9 +1,8 @@
from random import choice from random import choice
import logging import logging
from twisted.internet import defer, task
from twisted.internet import defer
import treq import treq
from lbrynet.core.utils import DeferredDict
from lbrynet.core.Error import DownloadCanceledError from lbrynet.core.Error import DownloadCanceledError
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -17,11 +16,12 @@ class HTTPBlobDownloader(object):
to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between
different types of downloaders. different types of downloaders.
''' '''
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None): def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True):
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.servers = servers or [] self.servers = servers or []
self.client = client or treq self.client = client or treq
self.blob_hashes = blob_hashes or [] self.blob_hashes = blob_hashes or []
self.missing_blob_hashes = []
self.sd_hashes = sd_hashes or [] self.sd_hashes = sd_hashes or []
self.head_blob_hashes = [] self.head_blob_hashes = []
self.max_failures = 3 self.max_failures = 3
@ -29,36 +29,59 @@ class HTTPBlobDownloader(object):
self.semaphore = defer.DeferredSemaphore(2) self.semaphore = defer.DeferredSemaphore(2)
self.deferreds = [] self.deferreds = []
self.writers = [] self.writers = []
self.retry = retry
self.looping_call = task.LoopingCall(self._download_and_retry)
self.finished_deferred = defer.Deferred()
self.last_missing = 100000000
@defer.inlineCallbacks
def _download_and_retry(self):
if not self.running and self.blob_hashes and self.servers:
yield self._download_blobs()
if self.retry and self.missing_blob_hashes:
if len(self.missing_blob_hashes) < self.last_missing:
self.last_missing = len(self.missing_blob_hashes)
log.info("queueing retry of %i blobs", len(self.missing_blob_hashes))
while self.missing_blob_hashes:
self.blob_hashes.append(self.missing_blob_hashes.pop())
defer.returnValue(None)
if self.looping_call.running:
self.looping_call.stop()
if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing:
log.info("mirror not making progress, trying less frequently")
self.looping_call.start(600, now=False)
elif not self.finished_deferred.called:
self.finished_deferred.callback(None)
log.info("mirror finished")
def start(self): def start(self):
if not self.running and self.blob_hashes and self.servers: if not self.running:
return self._start() self.looping_call.start(30)
defer.succeed(None) self.running = True
return self.finished_deferred
def stop(self): def stop(self):
if self.running: if self.running:
for d in reversed(self.deferreds): for d in reversed(self.deferreds):
d.cancel() d.cancel()
for writer in self.writers: while self.writers:
writer = self.writers.pop()
writer.close(DownloadCanceledError()) writer.close(DownloadCanceledError())
self.running = False self.running = False
self.blob_hashes = [] self.blob_hashes = []
if self.looping_call.running:
self.looping_call.stop()
@defer.inlineCallbacks @defer.inlineCallbacks
def _start(self): def _download_blobs(self):
self.running = True blobs = yield DeferredDict(
dl = [] {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes}
for blob_hash in self.blob_hashes: )
blob = yield self.blob_manager.get_blob(blob_hash) self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes]
if not blob.verified: yield defer.DeferredList(self.deferreds)
d = self.semaphore.run(self.download_blob, blob)
d.addErrback(lambda err: err.check(defer.TimeoutError, defer.CancelledError))
dl.append(d)
self.deferreds = dl
yield defer.DeferredList(dl)
@defer.inlineCallbacks @defer.inlineCallbacks
def download_blob(self, blob): def _download_blob(self, blob):
for _ in range(self.max_failures): for _ in range(self.max_failures):
writer, finished_deferred = blob.open_for_writing('mirror') writer, finished_deferred = blob.open_for_writing('mirror')
self.writers.append(writer) self.writers.append(writer)
@ -68,15 +91,11 @@ class HTTPBlobDownloader(object):
yield finished_deferred # yield for verification errors, so we log them yield finished_deferred # yield for verification errors, so we log them
if blob.verified: if blob.verified:
log.info('Mirror completed download for %s', blob.blob_hash) log.info('Mirror completed download for %s', blob.blob_hash)
b_h = blob.blob_hash should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes
if b_h in self.sd_hashes or b_h in self.head_blob_hashes:
should_announce = True
else:
should_announce = False
yield self.blob_manager.blob_completed(blob, should_announce=should_announce) yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
break break
except (IOError, Exception) as e: except (IOError, Exception, defer.CancelledError) as e:
if isinstance(e, DownloadCanceledError) or 'closed file' in str(e): if isinstance(e, (DownloadCanceledError, defer.CancelledError)) or 'closed file' in str(e):
# some other downloader finished first or it was simply cancelled # some other downloader finished first or it was simply cancelled
log.info("Mirror download cancelled: %s", blob.blob_hash) log.info("Mirror download cancelled: %s", blob.blob_hash)
break break
@ -88,6 +107,13 @@ class HTTPBlobDownloader(object):
writer.close() writer.close()
self.writers.remove(writer) self.writers.remove(writer)
def download_blob(self, blob):
if not blob.verified:
d = self.semaphore.run(self._download_blob, blob)
d.addErrback(lambda err: err.trap(defer.TimeoutError, defer.CancelledError))
return d
return defer.succeed(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def _write_blob(self, writer, blob): def _write_blob(self, writer, blob):
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
@ -95,6 +121,8 @@ class HTTPBlobDownloader(object):
log.debug('Missing a blob: %s', blob.blob_hash) log.debug('Missing a blob: %s', blob.blob_hash)
if blob.blob_hash in self.blob_hashes: if blob.blob_hash in self.blob_hashes:
self.blob_hashes.remove(blob.blob_hash) self.blob_hashes.remove(blob.blob_hash)
if blob.blob_hash not in self.missing_blob_hashes:
self.missing_blob_hashes.append(blob.blob_hash)
defer.returnValue(False) defer.returnValue(False)
log.debug('Download started: %s', blob.blob_hash) log.debug('Download started: %s', blob.blob_hash)
@ -104,15 +132,16 @@ class HTTPBlobDownloader(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def download_stream(self, stream_hash, sd_hash): def download_stream(self, stream_hash, sd_hash):
blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash) stream_crypt_blobs = yield self.blob_manager.storage.get_blobs_for_stream(stream_hash)
blob_hashes = [ self.blob_hashes.extend([
b.blob_hash for b in blobs if b.blob_hash is not None and b.blob_hash not in self.blob_hashes b.blob_hash for b in stream_crypt_blobs
] if b.blob_hash and b.blob_hash not in self.blob_hashes
self.blob_hashes.extend(blob_hashes) ])
if sd_hash not in self.sd_hashes: if sd_hash not in self.sd_hashes:
self.sd_hashes.append(sd_hash) self.sd_hashes.append(sd_hash)
if blob_hashes[0] not in self.head_blob_hashes: head_blob_hash = stream_crypt_blobs[0].blob_hash
self.head_blob_hashes.append(blob_hashes[0]) if head_blob_hash not in self.head_blob_hashes:
self.head_blob_hashes.append(head_blob_hash)
yield self.start() yield self.start()

View file

@ -446,7 +446,7 @@ def download_sd_blob(blob_hash, blob_manager, peer_finder, rate_limiter, payment
payment_rate_manager, payment_rate_manager,
wallet, wallet,
timeout) timeout)
mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash]) mirror = HTTPBlobDownloader(blob_manager, [blob_hash], download_mirrors or [], sd_hashes=[blob_hash], retry=False)
mirror.start() mirror.start()
sd_blob = yield downloader.download() sd_blob = yield downloader.download()
mirror.stop() mirror.stop()

View file

@ -19,10 +19,11 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.blob_manager.get_blob.side_effect = lambda _: defer.succeed(self.blob) self.blob_manager.get_blob.side_effect = lambda _: defer.succeed(self.blob)
self.response = MagicMock(code=200, length=400) self.response = MagicMock(code=200, length=400)
self.client.get.side_effect = lambda uri: defer.succeed(self.response) self.client.get.side_effect = lambda uri: defer.succeed(self.response)
self.downloader = HTTPBlobDownloader(self.blob_manager, [self.blob_hash], ['server1'], self.client) self.downloader = HTTPBlobDownloader(self.blob_manager, [self.blob_hash], ['server1'], self.client, retry=False)
self.downloader.interval = 0 self.downloader.interval = 0
def tearDown(self): def tearDown(self):
self.downloader.stop()
rm_db_and_blob_dir(self.db_dir, self.blob_dir) rm_db_and_blob_dir(self.db_dir, self.blob_dir)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -33,7 +34,7 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
self.client.collect.assert_called() self.client.collect.assert_called()
self.assertEqual(self.blob.get_length(), self.response.length) self.assertEqual(self.blob.get_length(), self.response.length)
self.assertEqual(self.blob.get_is_verified(), True) self.assertTrue(self.blob.get_is_verified())
self.assertEqual(self.blob.writers, {}) self.assertEqual(self.blob.writers, {})
@defer.inlineCallbacks @defer.inlineCallbacks