From ad96b006f9cb7b411696c7fb6f515b3be96f5313 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 10 Jun 2018 04:57:06 -0300 Subject: [PATCH] adds http download support for blobs mirroring --- lbrynet/conf.py | 1 + lbrynet/core/HTTPBlobDownloader.py | 81 +++++++++++++++++++ lbrynet/core/Session.py | 3 +- lbrynet/core/StreamDescriptor.py | 5 +- .../file_manager/EncryptedFileDownloader.py | 10 ++- lbrynet/file_manager/EncryptedFileManager.py | 3 +- .../unit/core/test_HTTPBlobDownloader.py | 62 ++++++++++++++ 7 files changed, 161 insertions(+), 4 deletions(-) create mode 100644 lbrynet/core/HTTPBlobDownloader.py create mode 100644 lbrynet/tests/unit/core/test_HTTPBlobDownloader.py diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 1d0020f89..170914b8f 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -268,6 +268,7 @@ ADJUSTABLE_SETTINGS = { 'dht_node_port': (int, 4444), 'download_directory': (str, default_download_dir), 'download_timeout': (int, 180), + 'download_mirrors': (list, ['blobs.lbry.io']), 'is_generous_host': (bool, True), 'announce_head_blobs_only': (bool, True), 'concurrent_announcers': (int, DEFAULT_CONCURRENT_ANNOUNCERS), diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py new file mode 100644 index 000000000..bbd97a7a9 --- /dev/null +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -0,0 +1,81 @@ +from random import choice +import logging + +from twisted.internet import defer +import treq +from twisted.internet.task import LoopingCall + +log = logging.getLogger(__name__) + + +class HTTPBlobDownloader(object): + def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None): + self.blob_manager = blob_manager + self.servers = servers or [] + self.client = client or treq + self.blob_hashes = blob_hashes or [] + self.looping_call = LoopingCall(self._download_next_blob_hash_for_file) + self.failures = 0 + self.max_failures = 3 + self.interval = 1 + + @property + def running(self): + return self.looping_call.running + + def start(self): + if not self.running and self.blob_hashes and self.servers: + return self.looping_call.start(self.interval, now=True) + + def stop(self): + if self.running: + self.blob_hashes = [] + return self.looping_call.stop() + + @defer.inlineCallbacks + def _download_next_blob_hash_for_file(self): + for blob_hash in self.blob_hashes: + blob = yield self.blob_manager.get_blob(blob_hash) + if not blob.get_is_verified(): + self.download_blob(blob) + defer.returnValue(None) + self.stop() + + def download_blob(self, blob): + d = self._download_blob(blob) + d.addCallback(self._on_completed_blob) + d.addErrback(self._on_failed) + + def _on_completed_blob(self, blob_hash): + if blob_hash: + log.debug('Mirror completed download for %s', blob_hash) + self.failures = 0 + + def _on_failed(self, err): + self.failures += 1 + log.error('Mirror failed downloading: %s', err) + if self.failures >= self.max_failures: + self.stop() + self.failures = 0 + + @defer.inlineCallbacks + def _download_blob(self, blob): + if not blob.get_is_verified() and not blob.is_downloading() and 'mirror' not in blob.writers: + response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) + if response.code != 200: + log.error('[Mirror] Missing a blob: %s', blob.blob_hash) + self.blob_hashes.remove(blob.blob_hash) + defer.returnValue(blob.blob_hash) + log.debug('[Mirror] Download started: %s', blob.blob_hash) + blob.set_length(response.length) + writer, finished_deferred = blob.open_for_writing('mirror') + try: + yield self.client.collect(response, writer.write) + except Exception, e: + writer.close(e) + yield finished_deferred + defer.returnValue(blob.blob_hash) + + +def url_for(server, blob_hash=''): + return 'http://{}/{}'.format(server, blob_hash) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 83519ae66..d3a7c758d 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -32,7 +32,7 @@ class Session(object): def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None, - dht_node=None, peer_manager=None): + dht_node=None, peer_manager=None, download_mirrors=None): """@param blob_data_payment_rate: The default payment rate for blob data @param db_dir: The directory in which levelDB files should be stored @@ -104,6 +104,7 @@ class Session(object): self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.payment_rate_manager = OnlyFreePaymentsManager() self.storage = storage or SQLiteStorage(self.db_dir) + self.download_mirrors = download_mirrors def setup(self): """Create the blob directory and database if necessary, start all desired services""" diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 4a76b5678..7a4303308 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -7,7 +7,7 @@ from twisted.internet import threads, defer from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError - +from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader log = logging.getLogger(__name__) @@ -445,7 +445,10 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): payment_rate_manager, session.wallet, timeout) + mirror = HTTPBlobDownloader(session.blob_manager, [blob_hash], session.download_mirrors) + mirror.start() sd_blob = yield downloader.download() + mirror.stop() sd_reader = BlobStreamDescriptorReader(sd_blob) sd_info = yield sd_reader.get_info() try: diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 25abd3e18..5378a541f 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -8,6 +8,7 @@ from zope.interface import implements from twisted.internet import defer from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager +from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader from lbrynet.core.utils import short_hash from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaver from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileDownloader @@ -37,7 +38,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, storage, lbry_file_manager, payment_rate_manager, wallet, download_directory, file_name, stream_name, sd_hash, key, - suggested_file_name): + suggested_file_name, download_mirrors=None): EncryptedFileSaver.__init__( self, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, wallet, download_directory, key, stream_name, file_name @@ -55,6 +56,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.channel_claim_id = None self.channel_name = None self.metadata = None + self.mirror = HTTPBlobDownloader(self.blob_manager, servers=download_mirrors) if download_mirrors else None def set_claim_info(self, claim_info): self.claim_id = claim_info['claim_id'] @@ -94,6 +96,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): @defer.inlineCallbacks def stop(self, err=None, change_status=True): log.debug('Stopping download for stream %s', short_hash(self.stream_hash)) + if self.mirror: + self.mirror.stop() # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. yield EncryptedFileDownloader.stop(self, err=err) if change_status is True: @@ -123,6 +127,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): yield EncryptedFileSaver._start(self) status = yield self._save_status() log_status(self.sd_hash, status) + if self.mirror: + blobs = yield self.storage.get_blobs_for_stream(self.stream_hash) + self.mirror.blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None] + self.mirror.start() defer.returnValue(status) def _get_finished_deferred_callback_value(self): diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index afcb34def..abff82fef 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -92,7 +92,8 @@ class EncryptedFileManager(object): stream_name=stream_name, sd_hash=sd_hash, key=key, - suggested_file_name=suggested_file_name + suggested_file_name=suggested_file_name, + download_mirrors=self.session.download_mirrors ) def _start_lbry_file(self, file_info, payment_rate_manager, claim_info): diff --git a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py new file mode 100644 index 000000000..6020dbea0 --- /dev/null +++ b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py @@ -0,0 +1,62 @@ +from mock import MagicMock + +from twisted.trial import unittest +from twisted.internet import defer + +from lbrynet.blob import BlobFile +from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader +from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir + + +class HTTPBlobDownloaderTest(unittest.TestCase): + def setUp(self): + self.db_dir, self.blob_dir = mk_db_and_blob_dir() + self.blob_manager = MagicMock() + self.client = MagicMock() + self.blob_hash = ('d17272b17a1ad61c4316ac13a651c2b0952063214a81333e' + '838364b01b2f07edbd165bb7ec60d2fb2f337a2c02923852') + self.blob = BlobFile(self.blob_dir, self.blob_hash) + self.blob_manager.get_blob.side_effect = lambda _: defer.succeed(self.blob) + self.response = MagicMock(code=200, length=400) + self.client.get.side_effect = lambda uri: defer.succeed(self.response) + self.downloader = HTTPBlobDownloader(self.blob_manager, [self.blob_hash], ['server1'], self.client) + self.downloader.interval = 0 + + def tearDown(self): + rm_db_and_blob_dir(self.db_dir, self.blob_dir) + + @defer.inlineCallbacks + def test_download_successful(self): + self.client.collect.side_effect = collect + yield self.downloader.start() + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) + self.client.collect.assert_called() + self.assertEqual(self.blob.get_length(), self.response.length) + self.assertEqual(self.blob.get_is_verified(), True) + self.assertEqual(self.blob.writers, {}) + + @defer.inlineCallbacks + def test_download_transfer_failed(self): + self.client.collect.side_effect = lambda response, write: defer.fail(Exception()) + yield self.downloader.start() + self.assertEqual(len(self.client.collect.mock_calls), self.downloader.max_failures) + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.assertEqual(self.blob.get_length(), self.response.length) + self.assertEqual(self.blob.get_is_verified(), False) + self.assertEqual(self.blob.writers, {}) + + @defer.inlineCallbacks + def test_blob_not_found(self): + self.response.code = 404 + yield self.downloader.start() + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) + self.client.collect.assert_not_called() + self.assertEqual(self.blob.get_is_verified(), False) + self.assertEqual(self.blob.writers, {}) + + +def collect(response, write): + write('f' * response.length) + defer.succeed(None)