Merge branch 'fix-mirror-lc'
This commit is contained in:
commit
1f5c56cac3
4 changed files with 86 additions and 47 deletions
|
@ -14,7 +14,7 @@ at anytime.
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
* issue in dht ping queue where enqueued pings that aren't yet due wouldn't be rescheduled
|
* issue in dht ping queue where enqueued pings that aren't yet due wouldn't be rescheduled
|
||||||
* blob mirror downloader not finishing streams that were partially uploaded
|
* blob mirror downloader not finishing streams that were partially uploaded at the time of the download attempt (https://github.com/lbryio/lbry/issues/1376)
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
*
|
*
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
from random import choice
|
from random import choice
|
||||||
import logging
|
import logging
|
||||||
from twisted.internet import defer, task
|
from twisted.internet import defer, task
|
||||||
|
from twisted.internet.error import ConnectingCancelledError
|
||||||
import treq
|
import treq
|
||||||
from lbrynet.core.utils import DeferredDict
|
from lbrynet.core.utils import DeferredDict
|
||||||
from lbrynet.core.Error import DownloadCanceledError
|
from lbrynet.core.Error import DownloadCanceledError
|
||||||
|
@ -16,69 +17,105 @@ class HTTPBlobDownloader(object):
|
||||||
to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between
|
to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between
|
||||||
different types of downloaders.
|
different types of downloaders.
|
||||||
'''
|
'''
|
||||||
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True):
|
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None, sd_hashes=None, retry=True,
|
||||||
|
clock=None):
|
||||||
|
if not clock:
|
||||||
|
from twisted.internet import reactor
|
||||||
|
self.clock = reactor
|
||||||
|
else:
|
||||||
|
self.clock = clock
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.servers = servers or []
|
self.servers = servers or []
|
||||||
self.client = client or treq
|
self.client = client or treq
|
||||||
self.blob_hashes = blob_hashes or []
|
self.blob_hashes = blob_hashes or []
|
||||||
self.missing_blob_hashes = []
|
self.missing_blob_hashes = []
|
||||||
|
self.downloaded_blob_hashes = []
|
||||||
self.sd_hashes = sd_hashes or []
|
self.sd_hashes = sd_hashes or []
|
||||||
self.head_blob_hashes = []
|
self.head_blob_hashes = []
|
||||||
self.max_failures = 3
|
self.max_failures = 3
|
||||||
self.running = False
|
|
||||||
self.semaphore = defer.DeferredSemaphore(2)
|
self.semaphore = defer.DeferredSemaphore(2)
|
||||||
self.deferreds = []
|
self.deferreds = []
|
||||||
self.writers = []
|
self.writers = []
|
||||||
self.retry = retry
|
self.retry = retry
|
||||||
self.looping_call = task.LoopingCall(self._download_and_retry)
|
self.looping_call = task.LoopingCall(self._download_lc)
|
||||||
|
self.looping_call.clock = self.clock
|
||||||
self.finished_deferred = defer.Deferred()
|
self.finished_deferred = defer.Deferred()
|
||||||
self.last_missing = 100000000
|
self.finished_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||||
|
self.short_delay = 30
|
||||||
|
self.long_delay = 600
|
||||||
|
self.delay = self.short_delay
|
||||||
|
self.last_missing = 10000000
|
||||||
|
self.lc_deferred = None
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _download_and_retry(self):
|
|
||||||
if not self.running and self.blob_hashes and self.servers:
|
|
||||||
yield self._download_blobs()
|
|
||||||
if self.retry and self.missing_blob_hashes:
|
|
||||||
if len(self.missing_blob_hashes) < self.last_missing:
|
|
||||||
self.last_missing = len(self.missing_blob_hashes)
|
|
||||||
log.info("queueing retry of %i blobs", len(self.missing_blob_hashes))
|
|
||||||
while self.missing_blob_hashes:
|
|
||||||
self.blob_hashes.append(self.missing_blob_hashes.pop())
|
|
||||||
defer.returnValue(None)
|
|
||||||
if self.looping_call.running:
|
|
||||||
self.looping_call.stop()
|
|
||||||
if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing:
|
|
||||||
log.info("mirror not making progress, trying less frequently")
|
|
||||||
self.looping_call.start(600, now=False)
|
|
||||||
elif not self.finished_deferred.called:
|
|
||||||
self.finished_deferred.callback(None)
|
|
||||||
log.info("mirror finished")
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if not self.running:
|
if not self.looping_call.running:
|
||||||
self.looping_call.start(30)
|
self.lc_deferred = self.looping_call.start(self.short_delay, now=True)
|
||||||
self.running = True
|
self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||||
return self.finished_deferred
|
yield self.finished_deferred
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.running:
|
for d in reversed(self.deferreds):
|
||||||
for d in reversed(self.deferreds):
|
d.cancel()
|
||||||
d.cancel()
|
while self.writers:
|
||||||
while self.writers:
|
writer = self.writers.pop()
|
||||||
writer = self.writers.pop()
|
writer.close(DownloadCanceledError())
|
||||||
writer.close(DownloadCanceledError())
|
self.blob_hashes = []
|
||||||
self.running = False
|
|
||||||
self.blob_hashes = []
|
|
||||||
if self.looping_call.running:
|
if self.looping_call.running:
|
||||||
self.looping_call.stop()
|
self.looping_call.stop()
|
||||||
|
if self.lc_deferred and not self.lc_deferred.called:
|
||||||
|
self.lc_deferred.cancel()
|
||||||
|
if not self.finished_deferred.called:
|
||||||
|
self.finished_deferred.cancel()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _download_blobs(self):
|
def _download_lc(self):
|
||||||
blobs = yield DeferredDict(
|
delay = yield self._download_and_get_retry_delay()
|
||||||
{blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes}
|
log.debug("delay: %s, missing: %i, downloaded from mirror: %i", delay, len(self.missing_blob_hashes),
|
||||||
)
|
len(self.downloaded_blob_hashes))
|
||||||
self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes]
|
while self.missing_blob_hashes:
|
||||||
yield defer.DeferredList(self.deferreds)
|
self.blob_hashes.append(self.missing_blob_hashes.pop())
|
||||||
|
if not delay:
|
||||||
|
if self.looping_call.running:
|
||||||
|
self.looping_call.stop()
|
||||||
|
if not self.finished_deferred.called:
|
||||||
|
log.debug("mirror finished")
|
||||||
|
self.finished_deferred.callback(None)
|
||||||
|
elif delay and delay != self.delay:
|
||||||
|
if delay == self.long_delay:
|
||||||
|
log.debug("mirror not making progress, trying less frequently")
|
||||||
|
elif delay == self.short_delay:
|
||||||
|
log.debug("queueing retry of %i blobs", len(self.missing_blob_hashes))
|
||||||
|
if self.looping_call.running:
|
||||||
|
self.looping_call.stop()
|
||||||
|
self.delay = delay
|
||||||
|
self.looping_call = task.LoopingCall(self._download_lc)
|
||||||
|
self.looping_call.clock = self.clock
|
||||||
|
self.lc_deferred = self.looping_call.start(self.delay, now=False)
|
||||||
|
self.lc_deferred.addErrback(lambda err: err.trap(defer.CancelledError))
|
||||||
|
yield self.finished_deferred
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _download_and_get_retry_delay(self):
|
||||||
|
if self.blob_hashes and self.servers:
|
||||||
|
if self.sd_hashes:
|
||||||
|
log.debug("trying to download stream from mirror (sd %s)", self.sd_hashes[0][:8])
|
||||||
|
else:
|
||||||
|
log.debug("trying to download %i blobs from mirror", len(self.blob_hashes))
|
||||||
|
blobs = yield DeferredDict(
|
||||||
|
{blob_hash: self.blob_manager.get_blob(blob_hash) for blob_hash in self.blob_hashes}
|
||||||
|
)
|
||||||
|
self.deferreds = [self.download_blob(blobs[blob_hash]) for blob_hash in self.blob_hashes]
|
||||||
|
yield defer.DeferredList(self.deferreds)
|
||||||
|
if self.retry and self.missing_blob_hashes:
|
||||||
|
if not self.downloaded_blob_hashes:
|
||||||
|
defer.returnValue(self.long_delay)
|
||||||
|
if len(self.missing_blob_hashes) < self.last_missing:
|
||||||
|
self.last_missing = len(self.missing_blob_hashes)
|
||||||
|
defer.returnValue(self.short_delay)
|
||||||
|
if self.retry and self.last_missing and len(self.missing_blob_hashes) == self.last_missing:
|
||||||
|
defer.returnValue(self.long_delay)
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _download_blob(self, blob):
|
def _download_blob(self, blob):
|
||||||
|
@ -93,9 +130,12 @@ class HTTPBlobDownloader(object):
|
||||||
log.info('Mirror completed download for %s', blob.blob_hash)
|
log.info('Mirror completed download for %s', blob.blob_hash)
|
||||||
should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes
|
should_announce = blob.blob_hash in self.sd_hashes or blob.blob_hash in self.head_blob_hashes
|
||||||
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
|
yield self.blob_manager.blob_completed(blob, should_announce=should_announce)
|
||||||
|
self.downloaded_blob_hashes.append(blob.blob_hash)
|
||||||
break
|
break
|
||||||
except (IOError, Exception, defer.CancelledError) as e:
|
except (IOError, Exception, defer.CancelledError, ConnectingCancelledError) as e:
|
||||||
if isinstance(e, (DownloadCanceledError, defer.CancelledError)) or 'closed file' in str(e):
|
if isinstance(
|
||||||
|
e, (DownloadCanceledError, defer.CancelledError, ConnectingCancelledError)
|
||||||
|
) or 'closed file' in str(e):
|
||||||
# some other downloader finished first or it was simply cancelled
|
# some other downloader finished first or it was simply cancelled
|
||||||
log.info("Mirror download cancelled: %s", blob.blob_hash)
|
log.info("Mirror download cancelled: %s", blob.blob_hash)
|
||||||
break
|
break
|
||||||
|
|
|
@ -25,7 +25,7 @@ class EncryptedFileDownloader(CryptStreamDownloader):
|
||||||
payment_rate_manager, wallet, key, stream_name)
|
payment_rate_manager, wallet, key, stream_name)
|
||||||
self.stream_hash = stream_hash
|
self.stream_hash = stream_hash
|
||||||
self.storage = storage
|
self.storage = storage
|
||||||
self.file_name = binascii.unhexlify(os.path.basename(file_name))
|
self.file_name = os.path.basename(binascii.unhexlify(file_name))
|
||||||
self._calculated_total_bytes = None
|
self._calculated_total_bytes = None
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
|
@ -75,11 +75,10 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
|
||||||
self.assertEqual(self.blob.get_is_verified(), False)
|
self.assertEqual(self.blob.get_is_verified(), False)
|
||||||
self.assertEqual(self.blob.writers, {})
|
self.assertEqual(self.blob.writers, {})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_stop(self):
|
def test_stop(self):
|
||||||
self.client.collect.side_effect = lambda response, write: defer.Deferred()
|
self.client.collect.side_effect = lambda response, write: defer.Deferred()
|
||||||
self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop
|
self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop
|
||||||
yield self.downloader.stop()
|
self.downloader.stop()
|
||||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||||
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
|
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
|
||||||
self.client.collect.assert_called()
|
self.client.collect.assert_called()
|
||||||
|
|
Loading…
Reference in a new issue