detect and remove invalid streams and sd blobs

This commit is contained in:
Jack Robison 2018-02-21 19:09:10 -05:00
parent 466654ffd0
commit b1c66015e1
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 46 additions and 24 deletions

View file

@ -26,7 +26,7 @@ at anytime.
* `get` failing with a non-useful error message when given a uri for a channel claim * `get` failing with a non-useful error message when given a uri for a channel claim
* exception checking in several wallet unit tests * exception checking in several wallet unit tests
* daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method * daemon not erring properly for non-numeric values being passed to the `bid` parameter for the `publish` method
* incorrect `blob_num` for the stream terminator blob, which would result in creating invalid streams (https://github.com/lbryio/lbry/issues/1124) * incorrect `blob_num` for the stream terminator blob, which would result in creating invalid streams. Such invalid streams are detected on startup and are automatically removed (https://github.com/lbryio/lbry/issues/1124)
### Deprecated ### Deprecated
* `channel_list_mine`, replaced with `channel_list` * `channel_list_mine`, replaced with `channel_list`

View file

@ -357,12 +357,6 @@ def get_stream_hash(hex_stream_name, key, hex_suggested_file_name, blob_infos):
h.update(key) h.update(key)
h.update(hex_suggested_file_name) h.update(hex_suggested_file_name)
blobs_hashsum = get_lbry_hash_obj() blobs_hashsum = get_lbry_hash_obj()
if any(blob['length'] for blob in blob_infos if blob['length'] <= 0):
raise InvalidStreamDescriptorError("Contains invalid length data blobs")
if blob_infos[-1]['length'] != 0:
raise InvalidStreamDescriptorError("Does not end with a zero-length blob.")
if 'blob_hash' in blob_infos[-1]:
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
for blob in blob_infos: for blob in blob_infos:
blobs_hashsum.update(get_blob_hashsum(blob)) blobs_hashsum.update(get_blob_hashsum(blob))
h.update(blobs_hashsum.digest()) h.update(blobs_hashsum.digest())
@ -384,6 +378,12 @@ def validate_descriptor(stream_info):
blobs = stream_info['blobs'] blobs = stream_info['blobs']
except KeyError as e: except KeyError as e:
raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0])) raise InvalidStreamDescriptorError("Missing '%s'" % (e.args[0]))
if stream_info['blobs'][-1]['length'] != 0:
raise InvalidStreamDescriptorError("Does not end with a zero-length blob.")
if any([False if blob_info['length'] > 0 else True for blob_info in stream_info['blobs'][:-1]]):
raise InvalidStreamDescriptorError("Contains zero-length data blob")
if 'blob_hash' in stream_info['blobs'][-1]:
raise InvalidStreamDescriptorError("Stream terminator blob should not have a hash")
verify_hex(key, "key") verify_hex(key, "key")
verify_hex(hex_suggested_file_name, "suggested file name") verify_hex(hex_suggested_file_name, "suggested file name")
@ -448,6 +448,11 @@ def download_sd_blob(session, blob_hash, payment_rate_manager, timeout=None):
sd_blob = yield downloader.download() sd_blob = yield downloader.download()
sd_reader = BlobStreamDescriptorReader(sd_blob) sd_reader = BlobStreamDescriptorReader(sd_blob)
sd_info = yield sd_reader.get_info() sd_info = yield sd_reader.get_info()
try:
validate_descriptor(sd_info)
except InvalidStreamDescriptorError as err:
yield session.blob_manager.delete_blobs([blob_hash])
raise err
raw_sd = yield sd_reader._get_raw_data() raw_sd = yield sd_reader._get_raw_data()
yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd)) yield session.blob_manager.storage.add_known_blob(blob_hash, len(raw_sd))
yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info) yield save_sd_info(session.blob_manager, sd_blob.blob_hash, sd_info)

View file

@ -50,6 +50,7 @@ from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSD
from lbrynet.core.Error import NullFundsError, NegativeFundsError from lbrynet.core.Error import NullFundsError, NegativeFundsError
from lbrynet.core.Peer import Peer from lbrynet.core.Peer import Peer
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -621,7 +622,11 @@ class Daemon(AuthJSONRPCServer):
rate_manager = rate_manager or self.session.payment_rate_manager rate_manager = rate_manager or self.session.payment_rate_manager
timeout = timeout or 30 timeout = timeout or 30
return download_sd_blob(self.session, blob_hash, rate_manager, timeout) downloader = StandaloneBlobDownloader(
blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter,
rate_manager, self.session.wallet, timeout
)
return downloader.download()
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_stream_analytics_report(self, claim_dict): def _get_stream_analytics_report(self, claim_dict):

View file

@ -5,7 +5,7 @@ from twisted.internet.task import LoopingCall
from lbryschema.fee import Fee from lbryschema.fee import Fee
from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed from lbrynet.core.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError
from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout from lbrynet.core.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.StreamDescriptor import download_sd_blob
@ -204,14 +204,12 @@ class GetStream(object):
safe_start_looping_call(self.checker, 1) safe_start_looping_call(self.checker, 1)
self.set_status(DOWNLOAD_METADATA_CODE, name) self.set_status(DOWNLOAD_METADATA_CODE, name)
try:
sd_blob = yield self._download_sd_blob() sd_blob = yield self._download_sd_blob()
yield self._download(sd_blob, name, key_fee, txid, nout, file_name) yield self._download(sd_blob, name, key_fee, txid, nout, file_name)
self.set_status(DOWNLOAD_RUNNING_CODE, name) self.set_status(DOWNLOAD_RUNNING_CODE, name)
try:
yield self.data_downloading_deferred yield self.data_downloading_deferred
except DownloadDataTimeout as err: except (DownloadDataTimeout, InvalidStreamDescriptorError) as err:
safe_stop_looping_call(self.checker) safe_stop_looping_call(self.checker)
raise err raise err

View file

@ -10,7 +10,7 @@ from twisted.internet import defer
from twisted.protocols.basic import FileSender from twisted.protocols.basic import FileSender
from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType from lbrynet.core.StreamDescriptor import BlobStreamDescriptorWriter, EncryptedFileStreamType
from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash from lbrynet.core.StreamDescriptor import format_sd_info, get_stream_hash, validate_descriptor
from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator from lbrynet.cryptstream.CryptStreamCreator import CryptStreamCreator
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -40,11 +40,15 @@ class EncryptedFileStreamCreator(CryptStreamCreator):
hexlify(self.name), hexlify(self.key), hexlify(self.name), hexlify(self.name), hexlify(self.key), hexlify(self.name),
self.blob_infos self.blob_infos
) )
# generate the sd info # generate the sd info
self.sd_info = format_sd_info( self.sd_info = format_sd_info(
EncryptedFileStreamType, hexlify(self.name), hexlify(self.key), EncryptedFileStreamType, hexlify(self.name), hexlify(self.key),
hexlify(self.name), self.stream_hash, self.blob_infos hexlify(self.name), self.stream_hash, self.blob_infos
) )
# sanity check
validate_descriptor(self.sd_info)
return defer.succeed(self.stream_hash) return defer.succeed(self.stream_hash)

View file

@ -6,12 +6,12 @@ import logging
from twisted.internet import defer, task, reactor from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.core.Error import InvalidStreamDescriptorError
from lbrynet.reflector.reupload import reflect_stream from lbrynet.reflector.reupload import reflect_stream
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info from lbrynet.core.StreamDescriptor import EncryptedFileStreamType, get_sd_info, validate_descriptor
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
@ -113,13 +113,23 @@ class EncryptedFileManager(object):
file_info['suggested_file_name'] file_info['suggested_file_name']
) )
yield lbry_file.get_claim_info() yield lbry_file.get_claim_info()
try:
# verify the stream is valid (we might have downloaded an invalid stream
# in the past when the validation check didn't work)
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
validate_descriptor(stream_info)
except InvalidStreamDescriptorError as err:
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
lbry_file.sd_hash, err.message)
yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash)
else:
try: try:
# restore will raise an Exception if status is unknown # restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status']) lbry_file.restore(file_info['status'])
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
except Exception: except Exception:
log.warning("Failed to start %i", file_info['rowid']) log.warning("Failed to start %i", file_info.get('rowid'))
continue
log.info("Started %i lbry files", len(self.lbry_files)) log.info("Started %i lbry files", len(self.lbry_files))
if self.auto_re_reflect is True: if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)