Merge pull request #466 from lbryio/fix-untracked-lbry-streams

fix streams in stream_info_manager not being loaded by lbry_file_manager
This commit is contained in:
Jack Robison 2017-02-08 19:47:03 -05:00 committed by GitHub
commit d1aa69bfa3
2 changed files with 43 additions and 13 deletions

View file

@ -13,6 +13,7 @@ at anytime.
### Fixed
* fix recursion depth error upon failed blob
* call stopProducing in reflector client file_sender when uploading is done
* ensure streams in stream_info_manager are saved in lbry_file_manager
## [0.8.1] - 2017-02-01
### Changed

View file

@ -39,11 +39,11 @@ class EncryptedFileManager(object):
self.download_directory = os.getcwd()
log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory))
@defer.inlineCallbacks
def setup(self):
d = self._open_db()
d.addCallback(lambda _: self._add_to_sd_identifier())
d.addCallback(lambda _: self._start_lbry_files())
return d
yield self._open_db()
yield self._add_to_sd_identifier()
yield self._start_lbry_files()
def get_lbry_file_status(self, lbry_file):
return self._get_lbry_file_status(lbry_file.rowid)
@ -74,12 +74,34 @@ class EncryptedFileManager(object):
self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, downloader_factory)
def _start_lbry_files(self):
@defer.inlineCallbacks
def _check_stream_is_managed(self, stream_hash):
# check that all the streams in the stream_info_manager are also
# tracked by lbry_file_manager and fix any streams that aren't.
rowid = yield self._get_rowid_for_stream_hash(stream_hash)
if rowid is not None:
defer.returnValue(True)
rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate
key, stream_name, file_name = yield self.stream_info_manager.get_stream_info(stream_hash)
log.warning("Trying to fix missing lbry file for %s", stream_name.decode('hex'))
yield self._save_lbry_file(stream_hash, rate)
@defer.inlineCallbacks
def _check_stream_info_manager(self):
def _iter_streams(stream_hashes):
for stream_hash in stream_hashes:
yield self._check_stream_is_managed(stream_hash)
stream_hashes = yield self.stream_info_manager.get_all_streams()
log.debug("Checking %s streams", len(stream_hashes))
yield defer.DeferredList(list(_iter_streams(stream_hashes)))
@defer.inlineCallbacks
def _start_lbry_files(self):
def set_options_and_restore(rowid, stream_hash, options):
payment_rate_manager = NegotiatedPaymentRateManager(
self.session.base_payment_rate_manager,
self.session.blob_tracker)
b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
d.addCallback(lambda downloader: downloader.restore())
@ -95,11 +117,12 @@ class EncryptedFileManager(object):
for rowid, stream_hash, options in lbry_files_and_options:
d = set_options_and_restore(rowid, stream_hash, options)
d.addErrback(lambda err: log_error(err, rowid, stream_hash, options))
log.info("Started %i lbry files", len(self.lbry_files))
return True
d = self._get_all_lbry_files()
d.addCallback(start_lbry_files)
return d
yield self._check_stream_info_manager()
files_and_options = yield self._get_all_lbry_files()
yield start_lbry_files(files_and_options)
def start_lbry_file(self, rowid, stream_hash,
payment_rate_manager, blob_data_rate=None, upload_allowed=True,
@ -253,11 +276,17 @@ class EncryptedFileManager(object):
def _get_lbry_file_status(self, rowid):
d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,))
d.addCallback(lambda r: (
r[0][0] if len(r) else ManagedEncryptedFileDownloader.STATUS_STOPPED))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d
@rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash):
return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
(stream_hash,))
@rerun_if_locked
def _get_rowid_for_stream_hash(self, stream_hash):
d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d