From e292abceee0c716cde6cbe213e40706dd1d8c81d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 9 Feb 2017 13:58:56 -0500 Subject: [PATCH] convert EncryptedFileManager to use inlineCallbacks --- .../lbryfilemanager/EncryptedFileManager.py | 143 +++++++++--------- 1 file changed, 70 insertions(+), 73 deletions(-) diff --git a/lbrynet/lbryfilemanager/EncryptedFileManager.py b/lbrynet/lbryfilemanager/EncryptedFileManager.py index 7bf21c769..a34d8c760 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileManager.py +++ b/lbrynet/lbryfilemanager/EncryptedFileManager.py @@ -69,6 +69,9 @@ class EncryptedFileManager(object): dl.addCallback(filter_failures) return dl + def save_sd_blob_hash_to_stream(self, stream_hash, sd_hash): + return self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_hash) + def _add_to_sd_identifier(self): downloader_factory = ManagedEncryptedFileDownloaderFactory(self) self.sd_identifier.add_stream_downloader_factory( @@ -94,67 +97,79 @@ class EncryptedFileManager(object): stream_hashes = yield self.stream_info_manager.get_all_streams() log.debug("Checking %s streams", len(stream_hashes)) - yield defer.DeferredList(list(_iter_streams(stream_hashes))) + check_streams = yield defer.DeferredList(list(_iter_streams(stream_hashes))) + defer.returnValue(check_streams) + + @defer.inlineCallbacks + def _restore_lbry_file(self, lbry_file): + try: + yield lbry_file.restore() + except Exception as err: + log.error("Failed to start stream: %s, error: %s", lbry_file.stream_hash, err) + self.lbry_files.remove(lbry_file) + # TODO: delete stream without claim instead of just removing from manager? @defer.inlineCallbacks def _start_lbry_files(self): - def set_options_and_restore(rowid, stream_hash, options): - b_prm = self.session.base_payment_rate_manager - payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) - - d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager, - blob_data_rate=options) - d.addCallback(lambda downloader: downloader.restore()) - return d - - def log_error(err, rowid, stream_hash, options): - log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage()) - log.debug(rowid) - log.debug(stream_hash) - log.debug(options) - - def start_lbry_files(lbry_files_and_options): - for rowid, stream_hash, options in lbry_files_and_options: - d = set_options_and_restore(rowid, stream_hash, options) - d.addErrback(lambda err: log_error(err, rowid, stream_hash, options)) - log.info("Started %i lbry files", len(self.lbry_files)) - return True - + b_prm = self.session.base_payment_rate_manager + payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) yield self._check_stream_info_manager() - files_and_options = yield self._get_all_lbry_files() - yield start_lbry_files(files_and_options) + lbry_files_and_options = yield self._get_all_lbry_files() + for rowid, stream_hash, options in lbry_files_and_options: + lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate=options) + d = self._restore_lbry_file(lbry_file) + log.debug("Started %s", lbry_file) + log.info("Started %i lbry files", len(self.lbry_files)) + defer.returnValue(True) + @defer.inlineCallbacks def start_lbry_file(self, rowid, stream_hash, payment_rate_manager, blob_data_rate=None, upload_allowed=True, download_directory=None, file_name=None): if not download_directory: download_directory = self.download_directory payment_rate_manager.min_blob_data_payment_rate = blob_data_rate - lbry_file_downloader = ManagedEncryptedFileDownloader(rowid, stream_hash, - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.stream_info_manager, self, - payment_rate_manager, self.session.wallet, - download_directory, - upload_allowed, - file_name=file_name) - self.lbry_files.append(lbry_file_downloader) - d = lbry_file_downloader.set_stream_info() - d.addCallback(lambda _: lbry_file_downloader) - return d + lbry_file = ManagedEncryptedFileDownloader(rowid, stream_hash, self.session.peer_finder, + self.session.rate_limiter, + self.session.blob_manager, + self.stream_info_manager, + self, payment_rate_manager, self.session.wallet, + download_directory, upload_allowed, + file_name=file_name) + self.lbry_files.append(lbry_file) + yield lbry_file.set_stream_info() + defer.returnValue(lbry_file) - def add_lbry_file(self, stream_hash, payment_rate_manager, - blob_data_rate=None, - upload_allowed=True, - download_directory=None, - file_name=None): - d = self._save_lbry_file(stream_hash, blob_data_rate) - d.addCallback( - lambda rowid: self.start_lbry_file( - rowid, stream_hash, payment_rate_manager, - blob_data_rate, upload_allowed, download_directory, file_name)) - return d + @defer.inlineCallbacks + def _stop_lbry_file(self, lbry_file): + def wait_for_finished(lbry_file, count=2): + if count or lbry_file.saving_status is not False: + return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, count=count - 1) + try: + yield lbry_file.stop(change_status=False) + self.lbry_files.remove(lbry_file) + except CurrentlyStoppingError: + yield wait_for_finished(lbry_file) + except AlreadyStoppedError: + pass + finally: + defer.returnValue(None) + + def _stop_lbry_files(self): + log.info("Stopping %i lbry files", len(self.lbry_files)) + lbry_files = self.lbry_files + for lbry_file in lbry_files: + yield self._stop_lbry_file(lbry_file) + + @defer.inlineCallbacks + def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None, + upload_allowed=True, download_directory=None, file_name=None): + rowid = yield self._save_lbry_file(stream_hash, blob_data_rate) + lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager, + blob_data_rate, upload_allowed, download_directory, + file_name) + defer.returnValue(lbry_file) def delete_lbry_file(self, lbry_file): for l in self.lbry_files: @@ -192,31 +207,13 @@ class EncryptedFileManager(object): else: return defer.fail(Failure(ValueError("Could not find that LBRY file"))) + @defer.inlineCallbacks def stop(self): - log.info('Stopping %s', self) - ds = [] - - def wait_for_finished(lbry_file, count=2): - if count <= 0 or lbry_file.saving_status is False: - return True - else: - return task.deferLater(reactor, 1, wait_for_finished, lbry_file, count=count - 1) - - def ignore_stopped(err, lbry_file): - err.trap(AlreadyStoppedError, CurrentlyStoppingError) - return wait_for_finished(lbry_file) - - for lbry_file in self.lbry_files: - d = lbry_file.stop(change_status=False) - d.addErrback(ignore_stopped, lbry_file) - ds.append(d) - dl = defer.DeferredList(ds) - - def close_db(): - self.db = None - - dl.addCallback(lambda _: close_db()) - return dl + yield defer.DeferredList(list(self._stop_lbry_files())) + yield self.sql_db.close() + self.sql_db = None + log.info("Stopped %s", self) + defer.returnValue(True) def get_count_for_stream_hash(self, stream_hash): return self._get_count_for_stream_hash(stream_hash)