convert EncryptedFileManager to use inlineCallbacks

This commit is contained in:
Jack Robison 2017-02-09 13:58:56 -05:00
parent 8de6bd7c7a
commit e292abceee

View file

@ -69,6 +69,9 @@ class EncryptedFileManager(object):
dl.addCallback(filter_failures) dl.addCallback(filter_failures)
return dl return dl
def save_sd_blob_hash_to_stream(self, stream_hash, sd_hash):
return self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, sd_hash)
def _add_to_sd_identifier(self): def _add_to_sd_identifier(self):
downloader_factory = ManagedEncryptedFileDownloaderFactory(self) downloader_factory = ManagedEncryptedFileDownloaderFactory(self)
self.sd_identifier.add_stream_downloader_factory( self.sd_identifier.add_stream_downloader_factory(
@ -94,67 +97,79 @@ class EncryptedFileManager(object):
stream_hashes = yield self.stream_info_manager.get_all_streams() stream_hashes = yield self.stream_info_manager.get_all_streams()
log.debug("Checking %s streams", len(stream_hashes)) log.debug("Checking %s streams", len(stream_hashes))
yield defer.DeferredList(list(_iter_streams(stream_hashes))) check_streams = yield defer.DeferredList(list(_iter_streams(stream_hashes)))
defer.returnValue(check_streams)
@defer.inlineCallbacks
def _restore_lbry_file(self, lbry_file):
try:
yield lbry_file.restore()
except Exception as err:
log.error("Failed to start stream: %s, error: %s", lbry_file.stream_hash, err)
self.lbry_files.remove(lbry_file)
# TODO: delete stream without claim instead of just removing from manager?
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_lbry_files(self): def _start_lbry_files(self):
def set_options_and_restore(rowid, stream_hash, options): b_prm = self.session.base_payment_rate_manager
b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
d = self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
d.addCallback(lambda downloader: downloader.restore())
return d
def log_error(err, rowid, stream_hash, options):
log.error("An error occurred while starting a lbry file: %s", err.getErrorMessage())
log.debug(rowid)
log.debug(stream_hash)
log.debug(options)
def start_lbry_files(lbry_files_and_options):
for rowid, stream_hash, options in lbry_files_and_options:
d = set_options_and_restore(rowid, stream_hash, options)
d.addErrback(lambda err: log_error(err, rowid, stream_hash, options))
log.info("Started %i lbry files", len(self.lbry_files))
return True
yield self._check_stream_info_manager() yield self._check_stream_info_manager()
files_and_options = yield self._get_all_lbry_files() lbry_files_and_options = yield self._get_all_lbry_files()
yield start_lbry_files(files_and_options) for rowid, stream_hash, options in lbry_files_and_options:
lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate=options)
d = self._restore_lbry_file(lbry_file)
log.debug("Started %s", lbry_file)
log.info("Started %i lbry files", len(self.lbry_files))
defer.returnValue(True)
@defer.inlineCallbacks
def start_lbry_file(self, rowid, stream_hash, def start_lbry_file(self, rowid, stream_hash,
payment_rate_manager, blob_data_rate=None, upload_allowed=True, payment_rate_manager, blob_data_rate=None, upload_allowed=True,
download_directory=None, file_name=None): download_directory=None, file_name=None):
if not download_directory: if not download_directory:
download_directory = self.download_directory download_directory = self.download_directory
payment_rate_manager.min_blob_data_payment_rate = blob_data_rate payment_rate_manager.min_blob_data_payment_rate = blob_data_rate
lbry_file_downloader = ManagedEncryptedFileDownloader(rowid, stream_hash, lbry_file = ManagedEncryptedFileDownloader(rowid, stream_hash, self.session.peer_finder,
self.session.peer_finder, self.session.rate_limiter,
self.session.rate_limiter, self.session.blob_manager,
self.session.blob_manager, self.stream_info_manager,
self.stream_info_manager, self, self, payment_rate_manager, self.session.wallet,
payment_rate_manager, self.session.wallet, download_directory, upload_allowed,
download_directory, file_name=file_name)
upload_allowed, self.lbry_files.append(lbry_file)
file_name=file_name) yield lbry_file.set_stream_info()
self.lbry_files.append(lbry_file_downloader) defer.returnValue(lbry_file)
d = lbry_file_downloader.set_stream_info()
d.addCallback(lambda _: lbry_file_downloader)
return d
def add_lbry_file(self, stream_hash, payment_rate_manager, @defer.inlineCallbacks
blob_data_rate=None, def _stop_lbry_file(self, lbry_file):
upload_allowed=True, def wait_for_finished(lbry_file, count=2):
download_directory=None, if count or lbry_file.saving_status is not False:
file_name=None): return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, count=count - 1)
d = self._save_lbry_file(stream_hash, blob_data_rate) try:
d.addCallback( yield lbry_file.stop(change_status=False)
lambda rowid: self.start_lbry_file( self.lbry_files.remove(lbry_file)
rowid, stream_hash, payment_rate_manager, except CurrentlyStoppingError:
blob_data_rate, upload_allowed, download_directory, file_name)) yield wait_for_finished(lbry_file)
return d except AlreadyStoppedError:
pass
finally:
defer.returnValue(None)
def _stop_lbry_files(self):
log.info("Stopping %i lbry files", len(self.lbry_files))
lbry_files = self.lbry_files
for lbry_file in lbry_files:
yield self._stop_lbry_file(lbry_file)
@defer.inlineCallbacks
def add_lbry_file(self, stream_hash, payment_rate_manager, blob_data_rate=None,
upload_allowed=True, download_directory=None, file_name=None):
rowid = yield self._save_lbry_file(stream_hash, blob_data_rate)
lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
blob_data_rate, upload_allowed, download_directory,
file_name)
defer.returnValue(lbry_file)
def delete_lbry_file(self, lbry_file): def delete_lbry_file(self, lbry_file):
for l in self.lbry_files: for l in self.lbry_files:
@ -192,31 +207,13 @@ class EncryptedFileManager(object):
else: else:
return defer.fail(Failure(ValueError("Could not find that LBRY file"))) return defer.fail(Failure(ValueError("Could not find that LBRY file")))
@defer.inlineCallbacks
def stop(self): def stop(self):
log.info('Stopping %s', self) yield defer.DeferredList(list(self._stop_lbry_files()))
ds = [] yield self.sql_db.close()
self.sql_db = None
def wait_for_finished(lbry_file, count=2): log.info("Stopped %s", self)
if count <= 0 or lbry_file.saving_status is False: defer.returnValue(True)
return True
else:
return task.deferLater(reactor, 1, wait_for_finished, lbry_file, count=count - 1)
def ignore_stopped(err, lbry_file):
err.trap(AlreadyStoppedError, CurrentlyStoppingError)
return wait_for_finished(lbry_file)
for lbry_file in self.lbry_files:
d = lbry_file.stop(change_status=False)
d.addErrback(ignore_stopped, lbry_file)
ds.append(d)
dl = defer.DeferredList(ds)
def close_db():
self.db = None
dl.addCallback(lambda _: close_db())
return dl
def get_count_for_stream_hash(self, stream_hash): def get_count_for_stream_hash(self, stream_hash):
return self._get_count_for_stream_hash(stream_hash) return self._get_count_for_stream_hash(stream_hash)