diff --git a/CHANGELOG.md b/CHANGELOG.md index a61198fc6..3a04bbc5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ at anytime. ### Fixed * fix recursion depth error upon failed blob * call stopProducing in reflector client file_sender when uploading is done + * ensure streams in stream_info_manager are saved in lbry_file_manager ## [0.8.1] - 2017-02-01 ### Changed diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index bebf115fe..cfb4b679a 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -39,11 +39,11 @@ class EncryptedFileManager(object): self.download_directory = os.getcwd() log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory)) + @defer.inlineCallbacks def setup(self): - d = self._open_db() - d.addCallback(lambda _: self._add_to_sd_identifier()) - d.addCallback(lambda _: self._start_lbry_files()) - return d + yield self._open_db() + yield self._add_to_sd_identifier() + yield self._start_lbry_files() def get_lbry_file_status(self, lbry_file): return self._get_lbry_file_status(lbry_file.rowid) @@ -74,12 +74,34 @@ class EncryptedFileManager(object): self.sd_identifier.add_stream_downloader_factory( EncryptedFileStreamType, downloader_factory) - def _start_lbry_files(self): + @defer.inlineCallbacks + def _check_stream_is_managed(self, stream_hash): + # check that all the streams in the stream_info_manager are also + # tracked by lbry_file_manager and fix any streams that aren't. + rowid = yield self._get_rowid_for_stream_hash(stream_hash) + if rowid is not None: + defer.returnValue(True) + rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate + key, stream_name, file_name = yield self.stream_info_manager.get_stream_info(stream_hash) + log.warning("Trying to fix missing lbry file for %s", stream_name.decode('hex')) + yield self._save_lbry_file(stream_hash, rate) + @defer.inlineCallbacks + def _check_stream_info_manager(self): + def _iter_streams(stream_hashes): + for stream_hash in stream_hashes: + yield self._check_stream_is_managed(stream_hash) + + stream_hashes = yield self.stream_info_manager.get_all_streams() + log.debug("Checking %s streams", len(stream_hashes)) + yield defer.DeferredList(list(_iter_streams(stream_hashes))) + + @defer.inlineCallbacks + def _start_lbry_files(self): def set_options_and_restore(rowid, stream_hash, options): - payment_rate_manager = NegotiatedPaymentRateManager( - self.session.base_payment_rate_manager, - self.session.blob_tracker) + b_prm = self.session.base_payment_rate_manager + payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) + d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, blob_data_rate=options) d.addCallback(lambda downloader: downloader.restore()) @@ -95,11 +117,12 @@ class EncryptedFileManager(object): for rowid, stream_hash, options in lbry_files_and_options: d = set_options_and_restore(rowid, stream_hash, options) d.addErrback(lambda err: log_error(err, rowid, stream_hash, options)) + log.info("Started %i lbry files", len(self.lbry_files)) return True - d = self._get_all_lbry_files() - d.addCallback(start_lbry_files) - return d + yield self._check_stream_info_manager() + files_and_options = yield self._get_all_lbry_files() + yield start_lbry_files(files_and_options) def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True, @@ -253,11 +276,17 @@ class EncryptedFileManager(object): def _get_lbry_file_status(self, rowid): d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", (rowid,)) - d.addCallback(lambda r: ( - r[0][0] if len(r) else ManagedEncryptedFileDownloader.STATUS_STOPPED)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) return d @rerun_if_locked def _get_count_for_stream_hash(self, stream_hash): return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", (stream_hash,)) + + @rerun_if_locked + def _get_rowid_for_stream_hash(self, stream_hash): + d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) + return d