Merge pull request #1315 from lbryio/mirroring_refactor
refactor mirroring
This commit is contained in:
commit
5b3103e41b
4 changed files with 91 additions and 43 deletions
|
@ -15,6 +15,7 @@ at anytime.
|
||||||
### Fixed
|
### Fixed
|
||||||
* loggly error reporting not following `share_usage_data`
|
* loggly error reporting not following `share_usage_data`
|
||||||
* improper error handling when data is not valid JSON
|
* improper error handling when data is not valid JSON
|
||||||
|
* edge cases of http mirrored download of blobs
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
* automatic claim renew, this is no longer needed
|
* automatic claim renew, this is no longer needed
|
||||||
|
|
|
@ -27,7 +27,7 @@ class HashBlobWriter(object):
|
||||||
|
|
||||||
def write(self, data):
|
def write(self, data):
|
||||||
if self.write_handle is None:
|
if self.write_handle is None:
|
||||||
log.exception("writer has already been closed")
|
log.warning("writer has already been closed")
|
||||||
raise IOError('I/O operation on closed file')
|
raise IOError('I/O operation on closed file')
|
||||||
|
|
||||||
self._hashsum.update(data)
|
self._hashsum.update(data)
|
||||||
|
|
|
@ -3,7 +3,8 @@ import logging
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
import treq
|
import treq
|
||||||
from twisted.internet.task import LoopingCall
|
|
||||||
|
from lbrynet.core.Error import DownloadCanceledError
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -21,65 +22,78 @@ class HTTPBlobDownloader(object):
|
||||||
self.servers = servers or []
|
self.servers = servers or []
|
||||||
self.client = client or treq
|
self.client = client or treq
|
||||||
self.blob_hashes = blob_hashes or []
|
self.blob_hashes = blob_hashes or []
|
||||||
self.looping_call = LoopingCall(self._download_next_blob_hash_for_file)
|
|
||||||
self.failures = 0
|
|
||||||
self.max_failures = 3
|
self.max_failures = 3
|
||||||
self.interval = 1
|
self.running = False
|
||||||
|
self.semaphore = defer.DeferredSemaphore(2)
|
||||||
@property
|
self.deferreds = []
|
||||||
def running(self):
|
self.writers = []
|
||||||
return self.looping_call.running
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if not self.running and self.blob_hashes and self.servers:
|
if not self.running and self.blob_hashes and self.servers:
|
||||||
return self.looping_call.start(self.interval, now=True)
|
return self._start()
|
||||||
defer.succeed(None)
|
defer.succeed(None)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
if self.running:
|
if self.running:
|
||||||
|
for d in reversed(self.deferreds):
|
||||||
|
d.cancel()
|
||||||
|
for writer in self.writers:
|
||||||
|
writer.close(DownloadCanceledError())
|
||||||
|
self.running = False
|
||||||
self.blob_hashes = []
|
self.blob_hashes = []
|
||||||
return self.looping_call.stop()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _download_next_blob_hash_for_file(self):
|
def _start(self):
|
||||||
|
self.running = True
|
||||||
|
dl = []
|
||||||
for blob_hash in self.blob_hashes:
|
for blob_hash in self.blob_hashes:
|
||||||
blob = yield self.blob_manager.get_blob(blob_hash)
|
blob = yield self.blob_manager.get_blob(blob_hash)
|
||||||
if not blob.verified:
|
if not blob.verified:
|
||||||
self.download_blob(blob)
|
d = self.semaphore.run(self.download_blob, blob)
|
||||||
return
|
d.addErrback(lambda err: err.check(defer.TimeoutError, defer.CancelledError))
|
||||||
self.stop()
|
dl.append(d)
|
||||||
|
self.deferreds = dl
|
||||||
|
yield defer.DeferredList(dl)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def download_blob(self, blob):
|
def download_blob(self, blob):
|
||||||
try:
|
for _ in range(self.max_failures):
|
||||||
yield self._download_blob(blob)
|
writer, finished_deferred = blob.open_for_writing('mirror')
|
||||||
self.failures = 0
|
self.writers.append(writer)
|
||||||
except Exception as exception:
|
try:
|
||||||
self.failures += 1
|
downloaded = yield self._write_blob(writer, blob)
|
||||||
log.exception('Mirror failed downloading')
|
if downloaded:
|
||||||
if self.failures >= self.max_failures:
|
yield finished_deferred # yield for verification errors, so we log them
|
||||||
self.stop()
|
if blob.verified:
|
||||||
self.failures = 0
|
log.info('Mirror completed download for %s', blob.blob_hash)
|
||||||
|
break
|
||||||
|
except (IOError, Exception) as e:
|
||||||
|
if isinstance(e, DownloadCanceledError) or 'closed file' in str(e):
|
||||||
|
# some other downloader finished first or it was simply cancelled
|
||||||
|
log.info("Mirror download cancelled: %s", blob.blob_hash)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
log.exception('Mirror failed downloading')
|
||||||
|
finally:
|
||||||
|
finished_deferred.addBoth(lambda _: None) # suppress echoed errors
|
||||||
|
if 'mirror' in blob.writers:
|
||||||
|
writer.close()
|
||||||
|
self.writers.remove(writer)
|
||||||
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _download_blob(self, blob):
|
def _write_blob(self, writer, blob):
|
||||||
if not blob.get_is_verified() and not blob.is_downloading() and 'mirror' not in blob.writers:
|
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
|
||||||
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
|
if response.code != 200:
|
||||||
if response.code != 200:
|
log.debug('Missing a blob: %s', blob.blob_hash)
|
||||||
log.debug('[Mirror] Missing a blob: %s', blob.blob_hash)
|
if blob.blob_hash in self.blob_hashes:
|
||||||
if blob.blob_hash in self.blob_hashes:
|
self.blob_hashes.remove(blob.blob_hash)
|
||||||
self.blob_hashes.remove(blob.blob_hash)
|
defer.returnValue(False)
|
||||||
defer.returnValue(blob.blob_hash)
|
|
||||||
log.debug('[Mirror] Download started: %s', blob.blob_hash)
|
log.debug('Download started: %s', blob.blob_hash)
|
||||||
blob.set_length(response.length)
|
blob.set_length(response.length)
|
||||||
writer, finished_deferred = blob.open_for_writing('mirror')
|
yield self.client.collect(response, writer.write)
|
||||||
try:
|
defer.returnValue(True)
|
||||||
yield self.client.collect(response, writer.write)
|
|
||||||
log.info('Mirror completed download for %s', blob.blob_hash)
|
|
||||||
except Exception as e:
|
|
||||||
writer.close(e)
|
|
||||||
yield finished_deferred
|
|
||||||
defer.returnValue(blob.blob_hash)
|
|
||||||
|
|
||||||
|
|
||||||
def url_for(server, blob_hash=''):
|
def url_for(server, blob_hash=''):
|
||||||
|
|
|
@ -36,6 +36,24 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
|
||||||
self.assertEqual(self.blob.get_is_verified(), True)
|
self.assertEqual(self.blob.get_is_verified(), True)
|
||||||
self.assertEqual(self.blob.writers, {})
|
self.assertEqual(self.blob.writers, {})
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_download_invalid_content(self):
|
||||||
|
self.client.collect.side_effect = bad_collect
|
||||||
|
yield self.downloader.start()
|
||||||
|
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||||
|
self.assertEqual(self.blob.get_is_verified(), False)
|
||||||
|
self.assertEqual(self.blob.writers, {})
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_peer_finished_first_causing_a_write_on_closed_handle(self):
|
||||||
|
self.client.collect.side_effect = lambda response, write: defer.fail(IOError('I/O operation on closed file'))
|
||||||
|
yield self.downloader.start()
|
||||||
|
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||||
|
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
|
||||||
|
self.client.collect.assert_called()
|
||||||
|
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||||
|
self.assertEqual(self.blob.writers, {})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_download_transfer_failed(self):
|
def test_download_transfer_failed(self):
|
||||||
self.client.collect.side_effect = lambda response, write: defer.fail(Exception())
|
self.client.collect.side_effect = lambda response, write: defer.fail(Exception())
|
||||||
|
@ -56,7 +74,22 @@ class HTTPBlobDownloaderTest(unittest.TestCase):
|
||||||
self.assertEqual(self.blob.get_is_verified(), False)
|
self.assertEqual(self.blob.get_is_verified(), False)
|
||||||
self.assertEqual(self.blob.writers, {})
|
self.assertEqual(self.blob.writers, {})
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_stop(self):
|
||||||
|
self.client.collect.side_effect = lambda response, write: defer.Deferred()
|
||||||
|
self.downloader.start() # hangs if yielded, as intended, to simulate a long ongoing write while we call stop
|
||||||
|
yield self.downloader.stop()
|
||||||
|
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||||
|
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
|
||||||
|
self.client.collect.assert_called()
|
||||||
|
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||||
|
self.assertEqual(self.blob.get_is_verified(), False)
|
||||||
|
self.assertEqual(self.blob.writers, {})
|
||||||
|
|
||||||
|
|
||||||
def collect(response, write):
|
def collect(response, write):
|
||||||
write('f' * response.length)
|
write('f' * response.length)
|
||||||
defer.succeed(None)
|
|
||||||
|
|
||||||
|
def bad_collect(response, write):
|
||||||
|
write('0' * response.length)
|
||||||
|
|
Loading…
Reference in a new issue