From ad96b006f9cb7b411696c7fb6f515b3be96f5313 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 10 Jun 2018 04:57:06 -0300 Subject: [PATCH 1/7] adds http download support for blobs mirroring --- lbrynet/conf.py | 1 + lbrynet/core/HTTPBlobDownloader.py | 81 +++++++++++++++++++ lbrynet/core/Session.py | 3 +- lbrynet/core/StreamDescriptor.py | 5 +- .../file_manager/EncryptedFileDownloader.py | 10 ++- lbrynet/file_manager/EncryptedFileManager.py | 3 +- .../unit/core/test_HTTPBlobDownloader.py | 62 ++++++++++++++ 7 files changed, 161 insertions(+), 4 deletions(-) create mode 100644 lbrynet/core/HTTPBlobDownloader.py create mode 100644 lbrynet/tests/unit/core/test_HTTPBlobDownloader.py diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 1d0020f89..170914b8f 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -268,6 +268,7 @@ ADJUSTABLE_SETTINGS = { 'dht_node_port': (int, 4444), 'download_directory': (str, default_download_dir), 'download_timeout': (int, 180), + 'download_mirrors': (list, ['blobs.lbry.io']), 'is_generous_host': (bool, True), 'announce_head_blobs_only': (bool, True), 'concurrent_announcers': (int, DEFAULT_CONCURRENT_ANNOUNCERS), diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py new file mode 100644 index 000000000..bbd97a7a9 --- /dev/null +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -0,0 +1,81 @@ +from random import choice +import logging + +from twisted.internet import defer +import treq +from twisted.internet.task import LoopingCall + +log = logging.getLogger(__name__) + + +class HTTPBlobDownloader(object): + def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None): + self.blob_manager = blob_manager + self.servers = servers or [] + self.client = client or treq + self.blob_hashes = blob_hashes or [] + self.looping_call = LoopingCall(self._download_next_blob_hash_for_file) + self.failures = 0 + self.max_failures = 3 + self.interval = 1 + + @property + def running(self): + return self.looping_call.running + + def start(self): + if not self.running and self.blob_hashes and self.servers: + return self.looping_call.start(self.interval, now=True) + + def stop(self): + if self.running: + self.blob_hashes = [] + return self.looping_call.stop() + + @defer.inlineCallbacks + def _download_next_blob_hash_for_file(self): + for blob_hash in self.blob_hashes: + blob = yield self.blob_manager.get_blob(blob_hash) + if not blob.get_is_verified(): + self.download_blob(blob) + defer.returnValue(None) + self.stop() + + def download_blob(self, blob): + d = self._download_blob(blob) + d.addCallback(self._on_completed_blob) + d.addErrback(self._on_failed) + + def _on_completed_blob(self, blob_hash): + if blob_hash: + log.debug('Mirror completed download for %s', blob_hash) + self.failures = 0 + + def _on_failed(self, err): + self.failures += 1 + log.error('Mirror failed downloading: %s', err) + if self.failures >= self.max_failures: + self.stop() + self.failures = 0 + + @defer.inlineCallbacks + def _download_blob(self, blob): + if not blob.get_is_verified() and not blob.is_downloading() and 'mirror' not in blob.writers: + response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) + if response.code != 200: + log.error('[Mirror] Missing a blob: %s', blob.blob_hash) + self.blob_hashes.remove(blob.blob_hash) + defer.returnValue(blob.blob_hash) + log.debug('[Mirror] Download started: %s', blob.blob_hash) + blob.set_length(response.length) + writer, finished_deferred = blob.open_for_writing('mirror') + try: + yield self.client.collect(response, writer.write) + except Exception, e: + writer.close(e) + yield finished_deferred + defer.returnValue(blob.blob_hash) + + +def url_for(server, blob_hash=''): + return 'http://{}/{}'.format(server, blob_hash) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 83519ae66..d3a7c758d 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -32,7 +32,7 @@ class Session(object): def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None, - dht_node=None, peer_manager=None): + dht_node=None, peer_manager=None, download_mirrors=None): """@param blob_data_payment_rate: The default payment rate for blob data @param db_dir: The directory in which levelDB files should be stored @@ -104,6 +104,7 @@ class Session(object): self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.payment_rate_manager = OnlyFreePaymentsManager() self.storage = storage or SQLiteStorage(self.db_dir) + self.download_mirrors = download_mirrors def setup(self): """Create the blob directory and database if necessary, start all desired services""" diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 4a76b5678..7a4303308 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -7,7 +7,7 @@ from twisted.internet import threads, defer from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError - +from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader log = logging.getLogger(__name__) @@ -445,7 +445,10 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): payment_rate_manager, session.wallet, timeout) + mirror = HTTPBlobDownloader(session.blob_manager, [blob_hash], session.download_mirrors) + mirror.start() sd_blob = yield downloader.download() + mirror.stop() sd_reader = BlobStreamDescriptorReader(sd_blob) sd_info = yield sd_reader.get_info() try: diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 25abd3e18..5378a541f 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -8,6 +8,7 @@ from zope.interface import implements from twisted.internet import defer from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager +from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader from lbrynet.core.utils import short_hash from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaver from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileDownloader @@ -37,7 +38,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, storage, lbry_file_manager, payment_rate_manager, wallet, download_directory, file_name, stream_name, sd_hash, key, - suggested_file_name): + suggested_file_name, download_mirrors=None): EncryptedFileSaver.__init__( self, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, wallet, download_directory, key, stream_name, file_name @@ -55,6 +56,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.channel_claim_id = None self.channel_name = None self.metadata = None + self.mirror = HTTPBlobDownloader(self.blob_manager, servers=download_mirrors) if download_mirrors else None def set_claim_info(self, claim_info): self.claim_id = claim_info['claim_id'] @@ -94,6 +96,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): @defer.inlineCallbacks def stop(self, err=None, change_status=True): log.debug('Stopping download for stream %s', short_hash(self.stream_hash)) + if self.mirror: + self.mirror.stop() # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. yield EncryptedFileDownloader.stop(self, err=err) if change_status is True: @@ -123,6 +127,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): yield EncryptedFileSaver._start(self) status = yield self._save_status() log_status(self.sd_hash, status) + if self.mirror: + blobs = yield self.storage.get_blobs_for_stream(self.stream_hash) + self.mirror.blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None] + self.mirror.start() defer.returnValue(status) def _get_finished_deferred_callback_value(self): diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index afcb34def..abff82fef 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -92,7 +92,8 @@ class EncryptedFileManager(object): stream_name=stream_name, sd_hash=sd_hash, key=key, - suggested_file_name=suggested_file_name + suggested_file_name=suggested_file_name, + download_mirrors=self.session.download_mirrors ) def _start_lbry_file(self, file_info, payment_rate_manager, claim_info): diff --git a/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py new file mode 100644 index 000000000..6020dbea0 --- /dev/null +++ b/lbrynet/tests/unit/core/test_HTTPBlobDownloader.py @@ -0,0 +1,62 @@ +from mock import MagicMock + +from twisted.trial import unittest +from twisted.internet import defer + +from lbrynet.blob import BlobFile +from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader +from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir + + +class HTTPBlobDownloaderTest(unittest.TestCase): + def setUp(self): + self.db_dir, self.blob_dir = mk_db_and_blob_dir() + self.blob_manager = MagicMock() + self.client = MagicMock() + self.blob_hash = ('d17272b17a1ad61c4316ac13a651c2b0952063214a81333e' + '838364b01b2f07edbd165bb7ec60d2fb2f337a2c02923852') + self.blob = BlobFile(self.blob_dir, self.blob_hash) + self.blob_manager.get_blob.side_effect = lambda _: defer.succeed(self.blob) + self.response = MagicMock(code=200, length=400) + self.client.get.side_effect = lambda uri: defer.succeed(self.response) + self.downloader = HTTPBlobDownloader(self.blob_manager, [self.blob_hash], ['server1'], self.client) + self.downloader.interval = 0 + + def tearDown(self): + rm_db_and_blob_dir(self.db_dir, self.blob_dir) + + @defer.inlineCallbacks + def test_download_successful(self): + self.client.collect.side_effect = collect + yield self.downloader.start() + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) + self.client.collect.assert_called() + self.assertEqual(self.blob.get_length(), self.response.length) + self.assertEqual(self.blob.get_is_verified(), True) + self.assertEqual(self.blob.writers, {}) + + @defer.inlineCallbacks + def test_download_transfer_failed(self): + self.client.collect.side_effect = lambda response, write: defer.fail(Exception()) + yield self.downloader.start() + self.assertEqual(len(self.client.collect.mock_calls), self.downloader.max_failures) + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.assertEqual(self.blob.get_length(), self.response.length) + self.assertEqual(self.blob.get_is_verified(), False) + self.assertEqual(self.blob.writers, {}) + + @defer.inlineCallbacks + def test_blob_not_found(self): + self.response.code = 404 + yield self.downloader.start() + self.blob_manager.get_blob.assert_called_with(self.blob_hash) + self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash)) + self.client.collect.assert_not_called() + self.assertEqual(self.blob.get_is_verified(), False) + self.assertEqual(self.blob.writers, {}) + + +def collect(response, write): + write('f' * response.length) + defer.succeed(None) From ec140d5d8a66967ea52e0a242b39bfa040a9eadb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 18 Jun 2018 03:53:17 -0300 Subject: [PATCH 2/7] changes from code review --- lbrynet/core/HTTPBlobDownloader.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index bbd97a7a9..a67c255fe 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -26,6 +26,7 @@ class HTTPBlobDownloader(object): def start(self): if not self.running and self.blob_hashes and self.servers: return self.looping_call.start(self.interval, now=True) + defer.succeed(None) def stop(self): if self.running: @@ -36,27 +37,24 @@ class HTTPBlobDownloader(object): def _download_next_blob_hash_for_file(self): for blob_hash in self.blob_hashes: blob = yield self.blob_manager.get_blob(blob_hash) - if not blob.get_is_verified(): + if not blob.verified: self.download_blob(blob) - defer.returnValue(None) + return self.stop() + @defer.inlineCallbacks def download_blob(self, blob): - d = self._download_blob(blob) - d.addCallback(self._on_completed_blob) - d.addErrback(self._on_failed) - - def _on_completed_blob(self, blob_hash): - if blob_hash: - log.debug('Mirror completed download for %s', blob_hash) - self.failures = 0 - - def _on_failed(self, err): - self.failures += 1 - log.error('Mirror failed downloading: %s', err) - if self.failures >= self.max_failures: - self.stop() + try: + blob_hash = yield self._download_blob(blob) + if blob_hash: + log.debug('Mirror completed download for %s', blob_hash) self.failures = 0 + except Exception as exception: + self.failures += 1 + log.error('Mirror failed downloading: %s', exception) + if self.failures >= self.max_failures: + self.stop() + self.failures = 0 @defer.inlineCallbacks def _download_blob(self, blob): From 66982c86a996b3fe8338a5c2ad077048fc5c362d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 18 Jun 2018 03:54:14 -0300 Subject: [PATCH 3/7] adds changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2431e2067..fa12d6cb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,8 @@ at anytime. * script to generate docs/api.json file (https://github.com/lbryio/lbry.tech/issues/42) * additional information to the balance error message when editing a claim (https://github.com/lbryio/lbry/pull/1309) * `address` and `port` arguments to `peer_ping` (https://github.com/lbryio/lbry/issues/1313) + * ability to download from HTTP mirrors by setting `download_mirrors` + * ### Removed * most of the internal attributes from `Daemon` From f510c2a43381cb3fb7542dc27dac04ee1fb219e8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 16 Jul 2018 16:48:46 -0300 Subject: [PATCH 4/7] improve logging from review --- lbrynet/core/HTTPBlobDownloader.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index a67c255fe..f25ef1955 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -45,9 +45,7 @@ class HTTPBlobDownloader(object): @defer.inlineCallbacks def download_blob(self, blob): try: - blob_hash = yield self._download_blob(blob) - if blob_hash: - log.debug('Mirror completed download for %s', blob_hash) + yield self._download_blob(blob) self.failures = 0 except Exception as exception: self.failures += 1 @@ -61,15 +59,17 @@ class HTTPBlobDownloader(object): if not blob.get_is_verified() and not blob.is_downloading() and 'mirror' not in blob.writers: response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash)) if response.code != 200: - log.error('[Mirror] Missing a blob: %s', blob.blob_hash) - self.blob_hashes.remove(blob.blob_hash) + log.debug('[Mirror] Missing a blob: %s', blob.blob_hash) + if blob.blob_hash in self.blob_hashes: + self.blob_hashes.remove(blob.blob_hash) defer.returnValue(blob.blob_hash) log.debug('[Mirror] Download started: %s', blob.blob_hash) blob.set_length(response.length) writer, finished_deferred = blob.open_for_writing('mirror') try: yield self.client.collect(response, writer.write) - except Exception, e: + log.info('Mirror completed download for %s', blob.blob_hash) + except Exception as e: writer.close(e) yield finished_deferred defer.returnValue(blob.blob_hash) From 88c2051605df1ad5ce2d4613c524aaf808075b57 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 25 Jul 2018 12:23:15 -0300 Subject: [PATCH 5/7] set download_mirror conf from components change --- lbrynet/daemon/Components.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py index acc216567..620ad9a20 100644 --- a/lbrynet/daemon/Components.py +++ b/lbrynet/daemon/Components.py @@ -211,7 +211,8 @@ class SessionComponent(Component): peer_port=GCS('peer_port'), wallet=self.component_manager.get_component(WALLET_COMPONENT), external_ip=CS.get_external_ip(), - storage=self.component_manager.get_component(DATABASE_COMPONENT) + storage=self.component_manager.get_component(DATABASE_COMPONENT), + download_mirrors=GCS('download_mirrors') ) yield self.session.setup() From ab27203100482f0959b57d04f413f7f2e84ea200 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 25 Jul 2018 13:01:13 -0300 Subject: [PATCH 6/7] improve exception logging and add a docstring on the interaction between downloaders --- lbrynet/core/HTTPBlobDownloader.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lbrynet/core/HTTPBlobDownloader.py b/lbrynet/core/HTTPBlobDownloader.py index f25ef1955..b01621aaa 100644 --- a/lbrynet/core/HTTPBlobDownloader.py +++ b/lbrynet/core/HTTPBlobDownloader.py @@ -9,6 +9,13 @@ log = logging.getLogger(__name__) class HTTPBlobDownloader(object): + ''' + A downloader that is able to get blobs from HTTP mirrors. + Note that when a blob gets downloaded from a mirror or from a peer, BlobManager will mark it as completed + and cause any other type of downloader to progress to the next missing blob. Also, BlobFile is naturally able + to cancel other writers when a writer finishes first. That's why there is no call to cancel/resume/stop between + different types of downloaders. + ''' def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None): self.blob_manager = blob_manager self.servers = servers or [] @@ -49,7 +56,7 @@ class HTTPBlobDownloader(object): self.failures = 0 except Exception as exception: self.failures += 1 - log.error('Mirror failed downloading: %s', exception) + log.exception('Mirror failed downloading') if self.failures >= self.max_failures: self.stop() self.failures = 0 From 5163baf9c15e7438b20ff39ac67ea1853fc11c30 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 25 Jul 2018 13:09:25 -0300 Subject: [PATCH 7/7] improve changelog on changes section --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa12d6cb3..06e36c801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ at anytime. * `status` to no longer return a base58 encoded `lbry_id`, instead return this as the hex encoded `node_id` in a new `dht_node_status` field. * `startup_status` field in the response to `status` to be a dict of component names to status booleans * moved wallet, upnp and dht startup code from `Session` to `Components` + * attempt blob downloads from http mirror sources (by default) concurrently to p2p sources ### Added * `skipped_components` list to the response from `status`