diff --git a/CHANGELOG.md b/CHANGELOG.md index 59088d59a..61ed57ac2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,12 +22,12 @@ at anytime. * handling decryption error for blobs encrypted with an invalid key * handling stream with no data blob (https://github.com/lbryio/lbry/issues/905) * fetching the external ip - * `blob_list` failing with --uri parameter (https://github.com/lbryio/lbry/issues/895) + * `blob_list` returning an error with --uri parameter and incorrectly returning `[]` for streams where blobs are known (https://github.com/lbryio/lbry/issues/895) * `get` failing with a non-useful error message when given a uri for a channel claim * exception checking in several wallet unit tests * daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method * `publish` command to allow updating claims with a `bid` amount higher than the wallet balance, so long as the amount is less than the wallet balance plus the bid amount of the claim being updated (https://github.com/lbryio/lbry/issues/748) - * + * incorrect `blob_num` for the stream terminator blob, which would result in creating invalid streams. Such invalid streams are detected on startup and are automatically removed (https://github.com/lbryio/lbry/issues/1124) ### Deprecated * `channel_list_mine`, replaced with `channel_list` diff --git a/lbrynet/__init__.py b/lbrynet/__init__.py index f2d690bc7..1b966468a 100644 --- a/lbrynet/__init__.py +++ b/lbrynet/__init__.py @@ -1,6 +1,6 @@ import logging -__version__ = "0.19.0rc35" +__version__ = "0.19.0rc37" version = tuple(__version__.split('.')) logging.getLogger(__name__).addHandler(logging.NullHandler()) diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index f9495bec4..4a76b5678 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -1,4 +1,3 @@ -import os import binascii from collections import defaultdict import json @@ -335,25 +334,6 @@ def get_sd_info(storage, stream_hash, include_blobs): ) -@defer.inlineCallbacks -def create_plain_sd(storage, stream_hash, file_name, overwrite_existing=False): - def _get_file_name(): - actual_file_name = file_name - if os.path.exists(actual_file_name): - ext_num = 1 - while os.path.exists(actual_file_name + "_" + str(ext_num)): - ext_num += 1 - actual_file_name = actual_file_name + "_" + str(ext_num) - return actual_file_name - - if overwrite_existing is False: - file_name = yield threads.deferToThread(_get_file_name()) - descriptor_writer = PlainStreamDescriptorWriter(file_name) - sd_info = yield get_sd_info(storage, stream_hash, True) - sd_hash = yield descriptor_writer.create_descriptor(sd_info) - defer.returnValue(sd_hash) - - def get_blob_hashsum(b): length = b['length'] if length != 0: @@ -377,13 +357,8 @@ def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos): h.update(key) h.update(hex_suggested_file_name) blobs_hashsum = get_lbry_hash_obj() - sorted_blob_infos = sorted(blob_infos, key=lambda x: x['blob_num']) - for blob in sorted_blob_infos: + for blob in blob_infos: blobs_hashsum.update(get_blob_hashsum(blob)) - if sorted_blob_infos[-1]['length'] != 0: - raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") - if 'blob_hash' in sorted_blob_infos[-1]: - raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") h.update(blobs_hashsum.digest()) return h.hexdigest() @@ -403,6 +378,12 @@ def validate_descriptor(stream_info): blobs = stream_info['blobs'] except KeyError as e: raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0])) + if stream_info['blobs'][-1]['length'] != 0: + raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") + if any([False if blob_info['length'] > 0 else True for blob_info in stream_info['blobs'][:-1]]): + raise InvalidStreamDescriptorError("Contains zero-length data blob") + if 'blob_hash' in stream_info['blobs'][-1]: + raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") verify_hex(key, "key") verify_hex(hex_suggested_file_name, "suggested file name") @@ -467,6 +448,11 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): sd_blob = yield downloader.download() sd_reader = BlobStreamDescriptorReader(sd_blob) sd_info = yield sd_reader.get_info() + try: + validate_descriptor(sd_info) + except InvalidStreamDescriptorError as err: + yield session.blob_manager.delete_blobs([blob_hash]) + raise err raw_sd = yield sd_reader._get_raw_data() yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)) yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info) diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index e9a380ed7..e39a50c1d 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -86,10 +86,9 @@ class CryptStreamCreator(object): self.stopped = True if self.current_blob is not None: self._close_current_blob() - self._finalize() - dl = defer.DeferredList(self.finished_deferreds) - dl.addCallback(lambda _: self._finished()) - return dl + d = self._finalize() + d.addCallback(lambda _: self._finished()) + return d # TODO: move the stream creation process to its own thread and # remove the reactor from this process. @@ -112,6 +111,7 @@ class CryptStreamCreator(object): return defer.succeed(True) + @defer.inlineCallbacks def _finalize(self): """ Finalize a stream by adding an empty @@ -119,14 +119,14 @@ class CryptStreamCreator(object): the stream has ended. This empty blob is not saved to the blob manager """ - log.debug("_finalize has been called") + + yield defer.DeferredList(self.finished_deferreds) self.blob_count += 1 iv = self.iv_generator.next() - final_blob_creator = self.blob_manager.get_blob_creator() - final_blob = self._get_blob_maker(iv, final_blob_creator) - d = final_blob.close() - d.addCallback(self._blob_finished) - self.finished_deferreds.append(d) + final_blob = self._get_blob_maker(iv, self.blob_manager.get_blob_creator()) + stream_terminator = yield final_blob.close() + terminator_info = yield self._blob_finished(stream_terminator) + defer.returnValue(terminator_info) def _write(self, data): while len(data) > 0: diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 53ba59c92..b461802f9 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -45,11 +45,12 @@ from lbrynet.core.Wallet import LBRYumWallet, ClaimOutpoint from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory -from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash -from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout +from lbrynet.core.Error import InsufficientFundsError, UnknownNameError +from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError from lbrynet.core.Peer import Peer from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader +from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader log = logging.getLogger(__name__) @@ -161,16 +162,6 @@ class AlwaysSend(object): return d -# If an instance has a lot of blobs, this call might get very expensive. -# For reflector, with 50k blobs, it definitely has an impact on the first run -# But doesn't seem to impact performance after that. -@defer.inlineCallbacks -def calculate_available_blob_size(blob_manager): - blob_hashes = yield blob_manager.get_all_verified_blobs() - blobs = yield defer.DeferredList([blob_manager.get_blob(b) for b in blob_hashes]) - defer.returnValue(sum(b.length for success, b in blobs if success and b.length)) - - class Daemon(AuthJSONRPCServer): """ LBRYnet daemon, a jsonrpc interface to lbry functions @@ -622,7 +613,11 @@ class Daemon(AuthJSONRPCServer): rate_manager = rate_manager or self.session.payment_rate_manager timeout = timeout or 30 - return download_sd_blob(self.session, blob_hash, rate_manager, timeout) + downloader = StandaloneBlobDownloader( + blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter, + rate_manager, self.session.wallet, timeout + ) + return downloader.download() @defer.inlineCallbacks def _get_stream_analytics_report(self, claim_dict): @@ -949,27 +944,6 @@ class Daemon(AuthJSONRPCServer): log.debug("Collected %i lbry files", len(lbry_files)) defer.returnValue(lbry_files) - # TODO: do this and get_blobs_for_sd_hash in the stream info manager - def get_blobs_for_stream_hash(self, stream_hash): - def _iter_blobs(blob_hashes): - for blob_hash, blob_num, blob_iv, blob_length in blob_hashes: - if blob_hash: - yield self.session.blob_manager.get_blob(blob_hash, length=blob_length) - - def _get_blobs(blob_hashes): - dl = defer.DeferredList(list(_iter_blobs(blob_hashes)), consumeErrors=True) - dl.addCallback(lambda blobs: [blob[1] for blob in blobs if blob[0]]) - return dl - - d = self.session.storage.get_blobs_for_stream(stream_hash) - d.addCallback(_get_blobs) - return d - - def get_blobs_for_sd_hash(self, sd_hash): - d = self.session.storage.get_stream_hash_for_sd_hash(sd_hash) - d.addCallback(self.get_blobs_for_stream_hash) - return d - def _get_single_peer_downloader(self): downloader = SinglePeerDownloader() downloader.setup(self.session.wallet) @@ -2828,17 +2802,19 @@ class Daemon(AuthJSONRPCServer): if announce_all: yield self.session.blob_manager.immediate_announce_all_blobs() else: + blob_hashes = [] if blob_hash: - blob_hashes = [blob_hash] + blob_hashes = blob_hashes.append(blob_hashes) elif stream_hash: - blobs = yield self.get_blobs_for_stream_hash(stream_hash) - blob_hashes = [blob.blob_hash for blob in blobs if blob.get_is_verified()] + pass elif sd_hash: - blobs = yield self.get_blobs_for_sd_hash(sd_hash) - blob_hashes = [sd_hash] + [blob.blob_hash for blob in blobs if - blob.get_is_verified()] + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) else: raise Exception('single argument must be specified') + if not blob_hash: + blobs = yield self.storage.get_blobs_for_stream(stream_hash) + blob_hashes.extend([blob.blob_hash for blob in blobs if blob.get_is_verified()]) + yield self.session.blob_manager._immediate_announce(blob_hashes) response = yield self._render_response(True) @@ -2916,24 +2892,23 @@ class Daemon(AuthJSONRPCServer): Returns: (list) List of blob hashes """ - - if uri: - metadata = yield self._resolve_name(uri) - sd_hash = utils.get_sd_hash(metadata) - try: - blobs = yield self.get_blobs_for_sd_hash(sd_hash) - except NoSuchSDHash: - blobs = [] - elif stream_hash: - try: - blobs = yield self.get_blobs_for_stream_hash(stream_hash) - except NoSuchStreamHash: - blobs = [] - elif sd_hash: - try: - blobs = yield self.get_blobs_for_sd_hash(sd_hash) - except NoSuchSDHash: + if uri or stream_hash or sd_hash: + if uri: + metadata = yield self._resolve_name(uri) + sd_hash = utils.get_sd_hash(metadata) + stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) + elif stream_hash: + sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) + elif sd_hash: + stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) + sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) + if stream_hash: + blobs = yield self.session.storage.get_blobs_for_stream(stream_hash) + else: blobs = [] + # get_blobs_for_stream does not include the sd blob, so we'll add it manually + if sd_hash in self.session.blob_manager.blobs: + blobs = [self.session.blob_manager.blobs[sd_hash]] + blobs else: blobs = self.session.blob_manager.blobs.itervalues() @@ -2942,7 +2917,7 @@ class Daemon(AuthJSONRPCServer): if finished: blobs = [blob for blob in blobs if blob.get_is_verified()] - blob_hashes = [blob.blob_hash for blob in blobs] + blob_hashes = [blob.blob_hash for blob in blobs if blob.blob_hash] page_size = page_size or len(blob_hashes) page = page or 0 start_index = page * page_size diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index c23767e08..60fce734b 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -5,7 +5,7 @@ from twisted.internet.task import LoopingCall from lbryschema.fee import Fee -from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed +from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob @@ -204,14 +204,12 @@ class GetStream(object): safe_start_looping_call(self.checker, 1) self.set_status(DOWNLOAD_METADATA_CODE, name) - sd_blob = yield self._download_sd_blob() - - yield self._download(sd_blob, name, key_fee, txid, nout, file_name) - self.set_status(DOWNLOAD_RUNNING_CODE, name) - try: + sd_blob = yield self._download_sd_blob() + yield self._download(sd_blob, name, key_fee, txid, nout, file_name) + self.set_status(DOWNLOAD_RUNNING_CODE, name) yield self.data_downloading_deferred - except DownloadDataTimeout as err: + except (DownloadDataTimeout, InvalidStreamDescriptorError) as err: safe_stop_looping_call(self.checker) raise err diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index 1598be3ff..49f8ce5f4 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -10,7 +10,7 @@ from twisted.internet import defer from twisted.protocols.basic import FileSender from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType -from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash +from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator log = logging.getLogger(__name__) @@ -40,11 +40,15 @@ class EncryptedFileStreamCreator(CryptStreamCreator): hexlify(self.name), hexlify(self.key), hexlify(self.name), self.blob_infos ) + # generate the sd info self.sd_info = format_sd_info( EncryptedFileStreamType, hexlify(self.name), hexlify(self.key), hexlify(self.name), self.stream_hash, self.blob_infos ) + + # sanity check + validate_descriptor(self.sd_info) return defer.succeed(self.stream_hash) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index cf08be90c..39b970dcd 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -6,12 +6,12 @@ import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure - +from lbrynet.core.Error import InvalidStreamDescriptorError from lbrynet.reflector.reupload import reflect_stream from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call @@ -114,12 +114,22 @@ class EncryptedFileManager(object): ) yield lbry_file.get_claim_info() try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.lbry_files.append(lbry_file) - except Exception: - log.warning("Failed to start %i", file_info['rowid']) - continue + # verify the stream is valid (we might have downloaded an invalid stream + # in the past when the validation check didn't work) + stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) + validate_descriptor(stream_info) + except InvalidStreamDescriptorError as err: + log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", + lbry_file.sd_hash, err.message) + yield lbry_file.delete_data() + yield self.session.storage.delete_stream(lbry_file.stream_hash) + else: + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.lbry_files.append(lbry_file) + except Exception: + log.warning("Failed to start %i", file_info.get('rowid')) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) diff --git a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py index 4ebf36f4f..05c0a8feb 100644 --- a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py +++ b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py @@ -5,6 +5,7 @@ from twisted.trial import unittest from twisted.internet import defer from lbrynet.database.storage import SQLiteStorage +from lbrynet.core.StreamDescriptor import get_sd_info, BlobStreamDescriptorReader from lbrynet.core import BlobManager from lbrynet.core import Session from lbrynet.core.server import DHTHashAnnouncer @@ -15,6 +16,7 @@ from lbrynet.tests.util import mk_db_and_blob_dir, rm_db_and_blob_dir MB = 2**20 + def iv_generator(): while True: yield '3' * AES.block_size @@ -22,6 +24,7 @@ def iv_generator(): class CreateEncryptedFileTest(unittest.TestCase): timeout = 5 + @defer.inlineCallbacks def setUp(self): mocks.mock_conf_settings(self) @@ -57,16 +60,28 @@ class CreateEncryptedFileTest(unittest.TestCase): def test_can_create_file(self): expected_stream_hash = "41e6b247d923d191b154fb6f1b8529d6ddd6a73d65c35" \ "7b1acb742dd83151fb66393a7709e9f346260a4f4db6de10c25" - expected_sd_hash = "bc435ae0c4659635e6514e05bb1fcd0d365b234f6f0e78002" \ - "d2576ff84a0b8710a9847757a9aa8cbeda5a8e1aeafa48b" + expected_sd_hash = "db043b44384c149126685990f6bb6563aa565ae331303d522" \ + "c8728fe0534dd06fbcacae92b0891787ad9b68ffc8d20c1" filename = 'test.file' lbry_file = yield self.create_file(filename) sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(lbry_file.stream_hash) + # read the sd blob file + sd_blob = self.blob_manager.blobs[sd_hash] + sd_reader = BlobStreamDescriptorReader(sd_blob) + sd_file_info = yield sd_reader.get_info() + + # this comes from the database, the blobs returned are sorted + sd_info = yield get_sd_info(self.session.storage, lbry_file.stream_hash, include_blobs=True) + self.assertDictEqual(sd_info, sd_file_info) + self.assertEqual(sd_info['stream_hash'], expected_stream_hash) + self.assertEqual(len(sd_info['blobs']), 3) + self.assertNotEqual(sd_info['blobs'][0]['length'], 0) + self.assertNotEqual(sd_info['blobs'][1]['length'], 0) + self.assertEqual(sd_info['blobs'][2]['length'], 0) self.assertEqual(expected_stream_hash, lbry_file.stream_hash) self.assertEqual(sd_hash, lbry_file.sd_hash) self.assertEqual(sd_hash, expected_sd_hash) - blobs = yield self.blob_manager.get_all_verified_blobs() self.assertEqual(3, len(blobs)) num_should_announce_blobs = yield self.blob_manager.count_should_announce_blobs() diff --git a/requirements.txt b/requirements.txt index 910f898e3..a47e0f495 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,7 +14,7 @@ jsonrpc==1.2 jsonrpclib==0.1.7 jsonschema==2.6.0 keyring==10.4.0 -git+https://github.com/lbryio/lbryum.git@v3.2.0rc16#egg=lbryum +git+https://github.com/lbryio/lbryum.git@v3.2.0rc17#egg=lbryum git+https://github.com/lbryio/lbryschema.git@v0.0.15rc2#egg=lbryschema miniupnpc==1.9 pbkdf2==1.3 diff --git a/setup.py b/setup.py index da5520875..9cb8e2cbe 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ requires = [ 'envparse', 'jsonrpc', 'jsonschema', - 'lbryum==3.2.0rc16', + 'lbryum==3.2.0rc17', 'lbryschema==0.0.15rc2', 'miniupnpc', 'pycrypto',