fix streams in stream_info_manager not being loaded by lbry_file_manager

This commit is contained in:
Jack Robison 2017-02-07 16:10:58 -05:00
parent c527c32e5f
commit bfc02dd3e5
2 changed files with 59 additions and 6 deletions

View file

@ -98,3 +98,9 @@ class InvalidAuthenticationToken(Exception):
class NegotiationError(Exception): class NegotiationError(Exception):
pass pass
class MissingLBRYFile(Exception):
"""
Raised by lbry file manager if stream hash has no associated lbry file
"""

View file

@ -10,6 +10,7 @@ from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.core.Error import MissingLBRYFile
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType
@ -74,12 +75,49 @@ class EncryptedFileManager(object):
self.sd_identifier.add_stream_downloader_factory( self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, downloader_factory) EncryptedFileStreamType, downloader_factory)
def _start_lbry_files(self): def _check_stream_is_managed(self, stream_hash):
# check that all the streams in the stream_info_manager are also
# tracked by lbry_file_manager and fix any streams that aren't.
def _check_rowid_result(rowid):
if rowid is not None:
return defer.succeed(True)
raise MissingLBRYFile
def _fix_missing_file(err):
if err.check(MissingLBRYFile):
rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate
d = self.stream_info_manager.get_stream_info(stream_hash)
d.addCallback(lambda info: log.warning("Trying to fix missing lbry file for %s",
info[1].decode('hex')))
d.addCallback(lambda _: self._save_lbry_file(stream_hash, rate))
return d
d = self._get_rowid_for_stream_hash(stream_hash)
d.addCallback(_check_rowid_result)
d.addErrback(_fix_missing_file)
return d
def _check_stream_info_manager(self):
def _iter_streams(stream_hashes):
for stream_hash in stream_hashes:
d = self._check_stream_is_managed(stream_hash)
yield d
def check_streams(stream_hashes):
log.debug("Checking %s streams", len(stream_hashes))
dl = defer.DeferredList(list(_iter_streams(stream_hashes)))
dl.addCallback(lambda r: [x[1] for x in r if x[0]])
return dl
d = self.stream_info_manager.get_all_streams()
d.addCallback(check_streams)
return d
def _start_lbry_files(self):
def set_options_and_restore(rowid, stream_hash, options): def set_options_and_restore(rowid, stream_hash, options):
payment_rate_manager = NegotiatedPaymentRateManager( b_prm = self.session.base_payment_rate_manager
self.session.base_payment_rate_manager, payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
self.session.blob_tracker)
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options) blob_data_rate=options)
d.addCallback(lambda downloader: downloader.restore()) d.addCallback(lambda downloader: downloader.restore())
@ -95,9 +133,11 @@ class EncryptedFileManager(object):
for rowid, stream_hash, options in lbry_files_and_options: for rowid, stream_hash, options in lbry_files_and_options:
d = set_options_and_restore(rowid, stream_hash, options) d = set_options_and_restore(rowid, stream_hash, options)
d.addErrback(lambda err: log_error(err, rowid, stream_hash, options)) d.addErrback(lambda err: log_error(err, rowid, stream_hash, options))
log.info("Started %i lbry files", len(self.lbry_files))
return True return True
d = self._get_all_lbry_files() d = self._check_stream_info_manager()
d.addCallback(lambda _: self._get_all_lbry_files())
d.addCallback(start_lbry_files) d.addCallback(start_lbry_files)
return d return d
@ -254,10 +294,17 @@ class EncryptedFileManager(object):
d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,)) (rowid,))
d.addCallback(lambda r: ( d.addCallback(lambda r: (
r[0][0] if len(r) else ManagedEncryptedFileDownloader.STATUS_STOPPED)) r[0][0] if len(r) else None))
return d return d
@rerun_if_locked @rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash): def _get_count_for_stream_hash(self, stream_hash):
return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
(stream_hash,)) (stream_hash,))
@rerun_if_locked
def _get_rowid_for_stream_hash(self, stream_hash):
d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d