From bfc02dd3e5d1cc372902dda550e111ce511b46c1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 7 Feb 2017 16:10:58 -0500 Subject: [PATCH 1/3] fix streams in stream_info_manager not being loaded by lbry_file_manager --- lbrynet/core/Error.py | 6 ++ .../lbryfilemanager/EncryptedFileManager.py | 59 +++++++++++++++++-- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 36e20151a..4be3584a3 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -98,3 +98,9 @@ class InvalidAuthenticationToken(Exception): class NegotiationError(Exception): pass + + +class MissingLBRYFile(Exception): + """ + Raised by lbry file manager if stream hash has no associated lbry file + """ \ No newline at end of file diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index bebf115fe..28ee10374 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -10,6 +10,7 @@ from twisted.internet import defer, task, reactor from twisted.python.failure import Failure from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager +from lbrynet.core.Error import MissingLBRYFile from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType @@ -74,12 +75,49 @@ class EncryptedFileManager(object): self.sd_identifier.add_stream_downloader_factory( EncryptedFileStreamType, downloader_factory) - def _start_lbry_files(self): + def _check_stream_is_managed(self, stream_hash): + # check that all the streams in the stream_info_manager are also + # tracked by lbry_file_manager and fix any streams that aren't. + def _check_rowid_result(rowid): + if rowid is not None: + return defer.succeed(True) + raise MissingLBRYFile + def _fix_missing_file(err): + if err.check(MissingLBRYFile): + rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate + d = self.stream_info_manager.get_stream_info(stream_hash) + d.addCallback(lambda info: log.warning("Trying to fix missing lbry file for %s", + info[1].decode('hex'))) + d.addCallback(lambda _: self._save_lbry_file(stream_hash, rate)) + return d + + d = self._get_rowid_for_stream_hash(stream_hash) + d.addCallback(_check_rowid_result) + d.addErrback(_fix_missing_file) + return d + + def _check_stream_info_manager(self): + def _iter_streams(stream_hashes): + for stream_hash in stream_hashes: + d = self._check_stream_is_managed(stream_hash) + yield d + + def check_streams(stream_hashes): + log.debug("Checking %s streams", len(stream_hashes)) + dl = defer.DeferredList(list(_iter_streams(stream_hashes))) + dl.addCallback(lambda r: [x[1] for x in r if x[0]]) + return dl + + d = self.stream_info_manager.get_all_streams() + d.addCallback(check_streams) + return d + + def _start_lbry_files(self): def set_options_and_restore(rowid, stream_hash, options): - payment_rate_manager = NegotiatedPaymentRateManager( - self.session.base_payment_rate_manager, - self.session.blob_tracker) + b_prm = self.session.base_payment_rate_manager + payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) + d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, blob_data_rate=options) d.addCallback(lambda downloader: downloader.restore()) @@ -95,9 +133,11 @@ class EncryptedFileManager(object): for rowid, stream_hash, options in lbry_files_and_options: d = set_options_and_restore(rowid, stream_hash, options) d.addErrback(lambda err: log_error(err, rowid, stream_hash, options)) + log.info("Started %i lbry files", len(self.lbry_files)) return True - d = self._get_all_lbry_files() + d = self._check_stream_info_manager() + d.addCallback(lambda _: self._get_all_lbry_files()) d.addCallback(start_lbry_files) return d @@ -254,10 +294,17 @@ class EncryptedFileManager(object): d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", (rowid,)) d.addCallback(lambda r: ( - r[0][0] if len(r) else ManagedEncryptedFileDownloader.STATUS_STOPPED)) + r[0][0] if len(r) else None)) return d @rerun_if_locked def _get_count_for_stream_hash(self, stream_hash): return self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", (stream_hash,)) + + @rerun_if_locked + def _get_rowid_for_stream_hash(self, stream_hash): + d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) + return d \ No newline at end of file From ecac2ae2ca27f622f70f738688f7cf20b5ffb8c0 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 Feb 2017 17:03:53 -0500 Subject: [PATCH 2/3] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a61198fc6..3a04bbc5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ at anytime. ### Fixed * fix recursion depth error upon failed blob * call stopProducing in reflector client file_sender when uploading is done + * ensure streams in stream_info_manager are saved in lbry_file_manager ## [0.8.1] - 2017-02-01 ### Changed From 7e2456fa69b846cb2a087d54678acada6164326f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 Feb 2017 19:03:35 -0500 Subject: [PATCH 3/3] convert _setup, _check_stream_is_managed, _check_stream_info_manager, and _start_lbry_files to inlineCallbacks remove unused MissingLBRYFile exception --- lbrynet/core/Error.py | 6 -- .../lbryfilemanager/EncryptedFileManager.py | 64 +++++++------------ 2 files changed, 23 insertions(+), 47 deletions(-) diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 4be3584a3..36e20151a 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -98,9 +98,3 @@ class InvalidAuthenticationToken(Exception): class NegotiationError(Exception): pass - - -class MissingLBRYFile(Exception): - """ - Raised by lbry file manager if stream hash has no associated lbry file - """ \ No newline at end of file diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index 28ee10374..cfb4b679a 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -10,7 +10,6 @@ from twisted.internet import defer, task, reactor from twisted.python.failure import Failure from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager -from lbrynet.core.Error import MissingLBRYFile from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType @@ -40,11 +39,11 @@ class EncryptedFileManager(object): self.download_directory = os.getcwd() log.debug("Download directory for EncryptedFileManager: %s", str(self.download_directory)) + @defer.inlineCallbacks def setup(self): - d = self._open_db() - d.addCallback(lambda _: self._add_to_sd_identifier()) - d.addCallback(lambda _: self._start_lbry_files()) - return d + yield self._open_db() + yield self._add_to_sd_identifier() + yield self._start_lbry_files() def get_lbry_file_status(self, lbry_file): return self._get_lbry_file_status(lbry_file.rowid) @@ -75,44 +74,29 @@ class EncryptedFileManager(object): self.sd_identifier.add_stream_downloader_factory( EncryptedFileStreamType, downloader_factory) + @defer.inlineCallbacks def _check_stream_is_managed(self, stream_hash): # check that all the streams in the stream_info_manager are also # tracked by lbry_file_manager and fix any streams that aren't. - def _check_rowid_result(rowid): - if rowid is not None: - return defer.succeed(True) - raise MissingLBRYFile - - def _fix_missing_file(err): - if err.check(MissingLBRYFile): - rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate - d = self.stream_info_manager.get_stream_info(stream_hash) - d.addCallback(lambda info: log.warning("Trying to fix missing lbry file for %s", - info[1].decode('hex'))) - d.addCallback(lambda _: self._save_lbry_file(stream_hash, rate)) - return d - - d = self._get_rowid_for_stream_hash(stream_hash) - d.addCallback(_check_rowid_result) - d.addErrback(_fix_missing_file) - return d + rowid = yield self._get_rowid_for_stream_hash(stream_hash) + if rowid is not None: + defer.returnValue(True) + rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate + key, stream_name, file_name = yield self.stream_info_manager.get_stream_info(stream_hash) + log.warning("Trying to fix missing lbry file for %s", stream_name.decode('hex')) + yield self._save_lbry_file(stream_hash, rate) + @defer.inlineCallbacks def _check_stream_info_manager(self): def _iter_streams(stream_hashes): for stream_hash in stream_hashes: - d = self._check_stream_is_managed(stream_hash) - yield d + yield self._check_stream_is_managed(stream_hash) - def check_streams(stream_hashes): - log.debug("Checking %s streams", len(stream_hashes)) - dl = defer.DeferredList(list(_iter_streams(stream_hashes))) - dl.addCallback(lambda r: [x[1] for x in r if x[0]]) - return dl - - d = self.stream_info_manager.get_all_streams() - d.addCallback(check_streams) - return d + stream_hashes = yield self.stream_info_manager.get_all_streams() + log.debug("Checking %s streams", len(stream_hashes)) + yield defer.DeferredList(list(_iter_streams(stream_hashes))) + @defer.inlineCallbacks def _start_lbry_files(self): def set_options_and_restore(rowid, stream_hash, options): b_prm = self.session.base_payment_rate_manager @@ -136,10 +120,9 @@ class EncryptedFileManager(object): log.info("Started %i lbry files", len(self.lbry_files)) return True - d = self._check_stream_info_manager() - d.addCallback(lambda _: self._get_all_lbry_files()) - d.addCallback(start_lbry_files) - return d + yield self._check_stream_info_manager() + files_and_options = yield self._get_all_lbry_files() + yield start_lbry_files(files_and_options) def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True, @@ -293,8 +276,7 @@ class EncryptedFileManager(object): def _get_lbry_file_status(self, rowid): d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", (rowid,)) - d.addCallback(lambda r: ( - r[0][0] if len(r) else None)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) return d @rerun_if_locked @@ -307,4 +289,4 @@ class EncryptedFileManager(object): d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?", (stream_hash,)) d.addCallback(lambda r: (r[0][0] if len(r) else None)) - return d \ No newline at end of file + return d