adds http download support for blobs mirroring
This commit is contained in:
parent
40c5f6e3ab
commit
ad96b006f9
7 changed files with 161 additions and 4 deletions
|
@ -268,6 +268,7 @@ ADJUSTABLE_SETTINGS = {
|
|||
'dht_node_port': (int, 4444),
|
||||
'download_directory': (str, default_download_dir),
|
||||
'download_timeout': (int, 180),
|
||||
'download_mirrors': (list, ['blobs.lbry.io']),
|
||||
'is_generous_host': (bool, True),
|
||||
'announce_head_blobs_only': (bool, True),
|
||||
'concurrent_announcers': (int, DEFAULT_CONCURRENT_ANNOUNCERS),
|
||||
|
|
81
lbrynet/core/HTTPBlobDownloader.py
Normal file
81
lbrynet/core/HTTPBlobDownloader.py
Normal file
|
@ -0,0 +1,81 @@
|
|||
from random import choice
|
||||
import logging
|
||||
|
||||
from twisted.internet import defer
|
||||
import treq
|
||||
from twisted.internet.task import LoopingCall
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HTTPBlobDownloader(object):
|
||||
def __init__(self, blob_manager, blob_hashes=None, servers=None, client=None):
|
||||
self.blob_manager = blob_manager
|
||||
self.servers = servers or []
|
||||
self.client = client or treq
|
||||
self.blob_hashes = blob_hashes or []
|
||||
self.looping_call = LoopingCall(self._download_next_blob_hash_for_file)
|
||||
self.failures = 0
|
||||
self.max_failures = 3
|
||||
self.interval = 1
|
||||
|
||||
@property
|
||||
def running(self):
|
||||
return self.looping_call.running
|
||||
|
||||
def start(self):
|
||||
if not self.running and self.blob_hashes and self.servers:
|
||||
return self.looping_call.start(self.interval, now=True)
|
||||
|
||||
def stop(self):
|
||||
if self.running:
|
||||
self.blob_hashes = []
|
||||
return self.looping_call.stop()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _download_next_blob_hash_for_file(self):
|
||||
for blob_hash in self.blob_hashes:
|
||||
blob = yield self.blob_manager.get_blob(blob_hash)
|
||||
if not blob.get_is_verified():
|
||||
self.download_blob(blob)
|
||||
defer.returnValue(None)
|
||||
self.stop()
|
||||
|
||||
def download_blob(self, blob):
|
||||
d = self._download_blob(blob)
|
||||
d.addCallback(self._on_completed_blob)
|
||||
d.addErrback(self._on_failed)
|
||||
|
||||
def _on_completed_blob(self, blob_hash):
|
||||
if blob_hash:
|
||||
log.debug('Mirror completed download for %s', blob_hash)
|
||||
self.failures = 0
|
||||
|
||||
def _on_failed(self, err):
|
||||
self.failures += 1
|
||||
log.error('Mirror failed downloading: %s', err)
|
||||
if self.failures >= self.max_failures:
|
||||
self.stop()
|
||||
self.failures = 0
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _download_blob(self, blob):
|
||||
if not blob.get_is_verified() and not blob.is_downloading() and 'mirror' not in blob.writers:
|
||||
response = yield self.client.get(url_for(choice(self.servers), blob.blob_hash))
|
||||
if response.code != 200:
|
||||
log.error('[Mirror] Missing a blob: %s', blob.blob_hash)
|
||||
self.blob_hashes.remove(blob.blob_hash)
|
||||
defer.returnValue(blob.blob_hash)
|
||||
log.debug('[Mirror] Download started: %s', blob.blob_hash)
|
||||
blob.set_length(response.length)
|
||||
writer, finished_deferred = blob.open_for_writing('mirror')
|
||||
try:
|
||||
yield self.client.collect(response, writer.write)
|
||||
except Exception, e:
|
||||
writer.close(e)
|
||||
yield finished_deferred
|
||||
defer.returnValue(blob.blob_hash)
|
||||
|
||||
|
||||
def url_for(server, blob_hash=''):
|
||||
return 'http://{}/{}'.format(server, blob_hash)
|
|
@ -32,7 +32,7 @@ class Session(object):
|
|||
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None,
|
||||
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
|
||||
peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None,
|
||||
dht_node=None, peer_manager=None):
|
||||
dht_node=None, peer_manager=None, download_mirrors=None):
|
||||
"""@param blob_data_payment_rate: The default payment rate for blob data
|
||||
|
||||
@param db_dir: The directory in which levelDB files should be stored
|
||||
|
@ -104,6 +104,7 @@ class Session(object):
|
|||
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
|
||||
self.payment_rate_manager = OnlyFreePaymentsManager()
|
||||
self.storage = storage or SQLiteStorage(self.db_dir)
|
||||
self.download_mirrors = download_mirrors
|
||||
|
||||
def setup(self):
|
||||
"""Create the blob directory and database if necessary, start all desired services"""
|
||||
|
|
|
@ -7,7 +7,7 @@ from twisted.internet import threads, defer
|
|||
from lbrynet.core.cryptoutils import get_lbry_hash_obj
|
||||
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
|
||||
from lbrynet.core.Error import UnknownStreamTypeError, InvalidStreamDescriptorError
|
||||
|
||||
from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -445,7 +445,10 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None):
|
|||
payment_rate_manager,
|
||||
session.wallet,
|
||||
timeout)
|
||||
mirror = HTTPBlobDownloader(session.blob_manager, [blob_hash], session.download_mirrors)
|
||||
mirror.start()
|
||||
sd_blob = yield downloader.download()
|
||||
mirror.stop()
|
||||
sd_reader = BlobStreamDescriptorReader(sd_blob)
|
||||
sd_info = yield sd_reader.get_info()
|
||||
try:
|
||||
|
|
|
@ -8,6 +8,7 @@ from zope.interface import implements
|
|||
from twisted.internet import defer
|
||||
|
||||
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
|
||||
from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader
|
||||
from lbrynet.core.utils import short_hash
|
||||
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaver
|
||||
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileDownloader
|
||||
|
@ -37,7 +38,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
|
||||
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager, storage, lbry_file_manager,
|
||||
payment_rate_manager, wallet, download_directory, file_name, stream_name, sd_hash, key,
|
||||
suggested_file_name):
|
||||
suggested_file_name, download_mirrors=None):
|
||||
EncryptedFileSaver.__init__(
|
||||
self, stream_hash, peer_finder, rate_limiter, blob_manager, storage, payment_rate_manager, wallet,
|
||||
download_directory, key, stream_name, file_name
|
||||
|
@ -55,6 +56,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
self.channel_claim_id = None
|
||||
self.channel_name = None
|
||||
self.metadata = None
|
||||
self.mirror = HTTPBlobDownloader(self.blob_manager, servers=download_mirrors) if download_mirrors else None
|
||||
|
||||
def set_claim_info(self, claim_info):
|
||||
self.claim_id = claim_info['claim_id']
|
||||
|
@ -94,6 +96,8 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
@defer.inlineCallbacks
|
||||
def stop(self, err=None, change_status=True):
|
||||
log.debug('Stopping download for stream %s', short_hash(self.stream_hash))
|
||||
if self.mirror:
|
||||
self.mirror.stop()
|
||||
# EncryptedFileSaver deletes metadata when it's stopped. We don't want that here.
|
||||
yield EncryptedFileDownloader.stop(self, err=err)
|
||||
if change_status is True:
|
||||
|
@ -123,6 +127,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
yield EncryptedFileSaver._start(self)
|
||||
status = yield self._save_status()
|
||||
log_status(self.sd_hash, status)
|
||||
if self.mirror:
|
||||
blobs = yield self.storage.get_blobs_for_stream(self.stream_hash)
|
||||
self.mirror.blob_hashes = [b.blob_hash for b in blobs if b.blob_hash is not None]
|
||||
self.mirror.start()
|
||||
defer.returnValue(status)
|
||||
|
||||
def _get_finished_deferred_callback_value(self):
|
||||
|
|
|
@ -92,7 +92,8 @@ class EncryptedFileManager(object):
|
|||
stream_name=stream_name,
|
||||
sd_hash=sd_hash,
|
||||
key=key,
|
||||
suggested_file_name=suggested_file_name
|
||||
suggested_file_name=suggested_file_name,
|
||||
download_mirrors=self.session.download_mirrors
|
||||
)
|
||||
|
||||
def _start_lbry_file(self, file_info, payment_rate_manager, claim_info):
|
||||
|
|
62
lbrynet/tests/unit/core/test_HTTPBlobDownloader.py
Normal file
62
lbrynet/tests/unit/core/test_HTTPBlobDownloader.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
from mock import MagicMock
|
||||
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
|
||||
from lbrynet.blob import BlobFile
|
||||
from lbrynet.core.HTTPBlobDownloader import HTTPBlobDownloader
|
||||
from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir
|
||||
|
||||
|
||||
class HTTPBlobDownloaderTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
|
||||
self.blob_manager = MagicMock()
|
||||
self.client = MagicMock()
|
||||
self.blob_hash = ('d17272b17a1ad61c4316ac13a651c2b0952063214a81333e'
|
||||
'838364b01b2f07edbd165bb7ec60d2fb2f337a2c02923852')
|
||||
self.blob = BlobFile(self.blob_dir, self.blob_hash)
|
||||
self.blob_manager.get_blob.side_effect = lambda _: defer.succeed(self.blob)
|
||||
self.response = MagicMock(code=200, length=400)
|
||||
self.client.get.side_effect = lambda uri: defer.succeed(self.response)
|
||||
self.downloader = HTTPBlobDownloader(self.blob_manager, [self.blob_hash], ['server1'], self.client)
|
||||
self.downloader.interval = 0
|
||||
|
||||
def tearDown(self):
|
||||
rm_db_and_blob_dir(self.db_dir, self.blob_dir)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_download_successful(self):
|
||||
self.client.collect.side_effect = collect
|
||||
yield self.downloader.start()
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
|
||||
self.client.collect.assert_called()
|
||||
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||
self.assertEqual(self.blob.get_is_verified(), True)
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_download_transfer_failed(self):
|
||||
self.client.collect.side_effect = lambda response, write: defer.fail(Exception())
|
||||
yield self.downloader.start()
|
||||
self.assertEqual(len(self.client.collect.mock_calls), self.downloader.max_failures)
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.assertEqual(self.blob.get_length(), self.response.length)
|
||||
self.assertEqual(self.blob.get_is_verified(), False)
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def test_blob_not_found(self):
|
||||
self.response.code = 404
|
||||
yield self.downloader.start()
|
||||
self.blob_manager.get_blob.assert_called_with(self.blob_hash)
|
||||
self.client.get.assert_called_with('http://{}/{}'.format('server1', self.blob_hash))
|
||||
self.client.collect.assert_not_called()
|
||||
self.assertEqual(self.blob.get_is_verified(), False)
|
||||
self.assertEqual(self.blob.writers, {})
|
||||
|
||||
|
||||
def collect(response, write):
|
||||
write('f' * response.length)
|
||||
defer.succeed(None)
|
Loading…
Reference in a new issue