fix mirror downloader looping call

This commit is contained in:
Jack Robison 2018-08-23 18:36:08 -04:00
parent f4b2a05fff
commit a7a53f9381
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 84 additions and 45 deletions

View file

@ -1,6 +1,7 @@
from random import choice from random import choice
import logging import logging
from twisted.internet import defer, task from twisted.internet import defer, task
from twisted.internet.error import ConnectingCancelledError
import treq import treq
from lbrynet.core.utils import DeferredDict from lbrynet.core.utils import DeferredDict
from lbrynet.core.Error import DownloadCanceledError from lbrynet.core.Error import DownloadCanceledError
@ -16,69 +17,105 @@ class HTTPBlobDownloader(object):
to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between
different types of downloaders. different types of downloaders.
''' '''
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True): def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True,
clock=None):
if not clock:
from twisted.internet import reactor
self.clock = reactor
else:
self.clock = clock
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.servers = servers or [] self.servers = servers or []
self.client = client or treq self.client = client or treq
self.blob_hashes = blob_hashes or [] self.blob_hashes = blob_hashes or []
self.missing_blob_hashes = [] self.missing_blob_hashes = []
self.downloaded_blob_hashes = []
self.sd_hashes = sd_hashes or [] self.sd_hashes = sd_hashes or []
self.head_blob_hashes = [] self.head_blob_hashes = []
self.max_failures = 3 self.max_failures = 3
self.running = False
self.semaphore = defer.DeferredSemaphore(2) self.semaphore = defer.DeferredSemaphore(2)
self.deferreds = [] self.deferreds = []
self.writers = [] self.writers = []
self.retry = retry self.retry = retry
self.looping_call = task.LoopingCall(self._download_and_retry) self.looping_call = task.LoopingCall(self._download_lc)
self.looping_call.clock = self.clock
self.finished_deferred = defer.Deferred() self.finished_deferred = defer.Deferred()
self.last_missing = 100000000 self.finished_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
self.short_delay = 30
self.long_delay = 600
self.delay = self.short_delay
self.last_missing = 10000000
self.lc_deferred = None
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_and_retry(self):
if not self.running and self.blob_hashes and self.servers:
yield self._download_blobs()
if self.retry and self.missing_blob_hashes:
if len(self.missing_blob_hashes) < self.last_missing:
self.last_missing = len(self.missing_blob_hashes)
log.info("queueing retry of %i blobs", len(self.missing_blob_hashes))
while self.missing_blob_hashes:
self.blob_hashes.append(self.missing_blob_hashes.pop())
defer.returnValue(None)
if self.looping_call.running:
self.looping_call.stop()
if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing:
log.info("mirror not making progress, trying less frequently")
self.looping_call.start(600, now=False)
elif not self.finished_deferred.called:
self.finished_deferred.callback(None)
log.info("mirror finished")
def start(self): def start(self):
if not self.running: if not self.looping_call.running:
self.looping_call.start(30) self.lc_deferred = self.looping_call.start(self.short_delay, now=True)
self.running = True self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
return self.finished_deferred yield self.finished_deferred
def stop(self): def stop(self):
if self.running:
for d in reversed(self.deferreds): for d in reversed(self.deferreds):
d.cancel() d.cancel()
while self.writers: while self.writers:
writer = self.writers.pop() writer = self.writers.pop()
writer.close(DownloadCanceledError()) writer.close(DownloadCanceledError())
self.running = False
self.blob_hashes = [] self.blob_hashes = []
if self.looping_call.running: if self.looping_call.running:
self.looping_call.stop() self.looping_call.stop()
if self.lc_deferred and not self.lc_deferred.called:
self.lc_deferred.cancel()
if not self.finished_deferred.called:
self.finished_deferred.cancel()
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_blobs(self): def _download_lc(self):
delay = yield self._download_and_get_retry_delay()
log.debug("delay: %s, missing: %i, downloaded from mirror: %i", delay, len(self.missing_blob_hashes),
len(self.downloaded_blob_hashes))
while self.missing_blob_hashes:
self.blob_hashes.append(self.missing_blob_hashes.pop())
if not delay:
if self.looping_call.running:
self.looping_call.stop()
if not self.finished_deferred.called:
log.debug("mirror finished")
self.finished_deferred.callback(None)
elif delay and delay != self.delay:
if delay == self.long_delay:
log.debug("mirror not making progress, trying less frequently")
elif delay == self.short_delay:
log.debug("queueing retry of %i blobs", len(self.missing_blob_hashes))
if self.looping_call.running:
self.looping_call.stop()
self.delay = delay
self.looping_call = task.LoopingCall(self._download_lc)
self.looping_call.clock = self.clock
self.lc_deferred = self.looping_call.start(self.delay, now=False)
self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
yield self.finished_deferred
@defer.inlineCallbacks
def _download_and_get_retry_delay(self):
if self.blob_hashes and self.servers:
if self.sd_hashes:
log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8])
else:
log.debug("trying to download %i blobs from mirror", len(self.blob_hashes))
blobs = yield DeferredDict( blobs = yield DeferredDict(
{blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes} {blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes}
) )
self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes] self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes]
yield defer.DeferredList(self.deferreds) yield defer.DeferredList(self.deferreds)
if self.retry and self.missing_blob_hashes:
if not self.downloaded_blob_hashes:
defer.returnValue(self.long_delay)
if len(self.missing_blob_hashes) < self.last_missing:
self.last_missing = len(self.missing_blob_hashes)
defer.returnValue(self.short_delay)
if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing:
defer.returnValue(self.long_delay)
defer.returnValue(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def _download_blob(self, blob): def _download_blob(self, blob):
@ -93,9 +130,12 @@ class HTTPBlobDownloader(object):
log.info('Mirror completed download for %s', blob.blob_hash) log.info('Mirror completed download for %s', blob.blob_hash)
should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes
yield self.blob_manager.blob_completed(blob, should_announce=should_announce) yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
self.downloaded_blob_hashes.append(blob.blob_hash)
break break
except (IOError, Exception, defer.CancelledError) as e: except (IOError, Exception, defer.CancelledError, ConnectingCancelledError) as e:
if isinstance(e, (DownloadCanceledError, defer.CancelledError)) or 'closed file' in str(e): if isinstance(
e, (DownloadCanceledError, defer.CancelledError, ConnectingCancelledError)
) or 'closed file' in str(e):
# some other downloader finished first or it was simply cancelled # some other downloader finished first or it was simply cancelled
log.info("Mirror download cancelled: %s", blob.blob_hash) log.info("Mirror download cancelled: %s", blob.blob_hash)
break break

View file

@ -75,11 +75,10 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
self.assertEqual(self.blob.get_is_verified(), False) self.assertEqual(self.blob.get_is_verified(), False)
self.assertEqual(self.blob.writers, {}) self.assertEqual(self.blob.writers, {})
@defer.inlineCallbacks
def test_stop(self): def test_stop(self):
self.client.collect.side_effect = lambda response, write: defer.Deferred() self.client.collect.side_effect = lambda response, write: defer.Deferred()
self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop
yield self.downloader.stop() self.downloader.stop()
self.blob_manager.get_blob.assert_called_with(self.blob_hash) self.blob_manager.get_blob.assert_called_with(self.blob_hash)
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
self.client.collect.assert_called() self.client.collect.assert_called()