From b1c66015e1d86b2830f59410fd41c0a615784ec1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 21 Feb 2018 19:09:10 -0500 Subject: [PATCH] detect and remove invalid streams and sd blobs --- CHANGELOG.md | 2 +- lbrynet/core/StreamDescriptor.py | 17 ++++++++----- lbrynet/daemon/Daemon.py | 7 +++++- lbrynet/daemon/Downloader.py | 12 ++++----- lbrynet/file_manager/EncryptedFileCreator.py | 6 ++++- lbrynet/file_manager/EncryptedFileManager.py | 26 ++++++++++++++------ 6 files changed, 46 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd6f2f03b..6ea87bda4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ at anytime. * `get` failing with a non-useful error message when given a uri for a channel claim * exception checking in several wallet unit tests * daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method - * incorrect `blob_num` for the stream terminator blob, which would result in creating invalid streams (https://github.com/lbryio/lbry/issues/1124) + * incorrect `blob_num` for the stream terminator blob, which would result in creating invalid streams. Such invalid streams are detected on startup and are automatically removed (https://github.com/lbryio/lbry/issues/1124) ### Deprecated * `channel_list_mine`, replaced with `channel_list` diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 44a760bf8..4a76b5678 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -357,12 +357,6 @@ def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos): h.update(key) h.update(hex_suggested_file_name) blobs_hashsum = get_lbry_hash_obj() - if any(blob['length'] for blob in blob_infos if blob['length'] <= 0): - raise InvalidStreamDescriptorError("Contains invalid length data blobs") - if blob_infos[-1]['length'] != 0: - raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") - if 'blob_hash' in blob_infos[-1]: - raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") for blob in blob_infos: blobs_hashsum.update(get_blob_hashsum(blob)) h.update(blobs_hashsum.digest()) @@ -384,6 +378,12 @@ def validate_descriptor(stream_info): blobs = stream_info['blobs'] except KeyError as e: raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0])) + if stream_info['blobs'][-1]['length'] != 0: + raise InvalidStreamDescriptorError("Does not end with a zero-length blob.") + if any([False if blob_info['length'] > 0 else True for blob_info in stream_info['blobs'][:-1]]): + raise InvalidStreamDescriptorError("Contains zero-length data blob") + if 'blob_hash' in stream_info['blobs'][-1]: + raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash") verify_hex(key, "key") verify_hex(hex_suggested_file_name, "suggested file name") @@ -448,6 +448,11 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None): sd_blob = yield downloader.download() sd_reader = BlobStreamDescriptorReader(sd_blob) sd_info = yield sd_reader.get_info() + try: + validate_descriptor(sd_info) + except InvalidStreamDescriptorError as err: + yield session.blob_manager.delete_blobs([blob_hash]) + raise err raw_sd = yield sd_reader._get_raw_data() yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)) yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index cff92ce23..8b1f424b7 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -50,6 +50,7 @@ from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSD from lbrynet.core.Error import NullFundsError, NegativeFundsError from lbrynet.core.Peer import Peer from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader +from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader log = logging.getLogger(__name__) @@ -621,7 +622,11 @@ class Daemon(AuthJSONRPCServer): rate_manager = rate_manager or self.session.payment_rate_manager timeout = timeout or 30 - return download_sd_blob(self.session, blob_hash, rate_manager, timeout) + downloader = StandaloneBlobDownloader( + blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter, + rate_manager, self.session.wallet, timeout + ) + return downloader.download() @defer.inlineCallbacks def _get_stream_analytics_report(self, claim_dict): diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index c23767e08..60fce734b 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -5,7 +5,7 @@ from twisted.internet.task import LoopingCall from lbryschema.fee import Fee -from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed +from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.StreamDescriptor import download_sd_blob @@ -204,14 +204,12 @@ class GetStream(object): safe_start_looping_call(self.checker, 1) self.set_status(DOWNLOAD_METADATA_CODE, name) - sd_blob = yield self._download_sd_blob() - - yield self._download(sd_blob, name, key_fee, txid, nout, file_name) - self.set_status(DOWNLOAD_RUNNING_CODE, name) - try: + sd_blob = yield self._download_sd_blob() + yield self._download(sd_blob, name, key_fee, txid, nout, file_name) + self.set_status(DOWNLOAD_RUNNING_CODE, name) yield self.data_downloading_deferred - except DownloadDataTimeout as err: + except (DownloadDataTimeout, InvalidStreamDescriptorError) as err: safe_stop_looping_call(self.checker) raise err diff --git a/lbrynet/file_manager/EncryptedFileCreator.py b/lbrynet/file_manager/EncryptedFileCreator.py index 1598be3ff..49f8ce5f4 100644 --- a/lbrynet/file_manager/EncryptedFileCreator.py +++ b/lbrynet/file_manager/EncryptedFileCreator.py @@ -10,7 +10,7 @@ from twisted.internet import defer from twisted.protocols.basic import FileSender from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType -from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash +from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator log = logging.getLogger(__name__) @@ -40,11 +40,15 @@ class EncryptedFileStreamCreator(CryptStreamCreator): hexlify(self.name), hexlify(self.key), hexlify(self.name), self.blob_infos ) + # generate the sd info self.sd_info = format_sd_info( EncryptedFileStreamType, hexlify(self.name), hexlify(self.key), hexlify(self.name), self.stream_hash, self.blob_infos ) + + # sanity check + validate_descriptor(self.sd_info) return defer.succeed(self.stream_hash) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index cf08be90c..39b970dcd 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -6,12 +6,12 @@ import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure - +from lbrynet.core.Error import InvalidStreamDescriptorError from lbrynet.reflector.reupload import reflect_stream from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory -from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call @@ -114,12 +114,22 @@ class EncryptedFileManager(object): ) yield lbry_file.get_claim_info() try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.lbry_files.append(lbry_file) - except Exception: - log.warning("Failed to start %i", file_info['rowid']) - continue + # verify the stream is valid (we might have downloaded an invalid stream + # in the past when the validation check didn't work) + stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) + validate_descriptor(stream_info) + except InvalidStreamDescriptorError as err: + log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", + lbry_file.sd_hash, err.message) + yield lbry_file.delete_data() + yield self.session.storage.delete_stream(lbry_file.stream_hash) + else: + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.lbry_files.append(lbry_file) + except Exception: + log.warning("Failed to start %i", file_info.get('rowid')) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)