From 76de605b7f90d54f67cf707208032ce3d730f1a3 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 29 Dec 2017 14:10:16 -0500 Subject: [PATCH] refactor start_lbry_files --- .../file_manager/EncryptedFileDownloader.py | 10 +-- lbrynet/file_manager/EncryptedFileManager.py | 81 +++++++++---------- .../lbry_file/EncryptedFileMetadataManager.py | 31 ++++++- 3 files changed, 68 insertions(+), 54 deletions(-) diff --git a/lbrynet/file_manager/EncryptedFileDownloader.py b/lbrynet/file_manager/EncryptedFileDownloader.py index 76fb0e24a..f95757875 100644 --- a/lbrynet/file_manager/EncryptedFileDownloader.py +++ b/lbrynet/file_manager/EncryptedFileDownloader.py @@ -54,22 +54,16 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): def saving_status(self): return self._saving_status - @defer.inlineCallbacks - def restore(self): - - status = yield self.lbry_file_manager.get_lbry_file_status(self) - log_status(self.sd_hash, status) - + def restore(self, status): if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: # start returns self.finished_deferred # which fires when we've finished downloading the file # and we don't want to wait for the entire download self.start() elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: - defer.returnValue(False) + pass elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: self.completed = True - defer.returnValue(True) else: raise Exception("Unknown status for stream %s: %s" % (self.stream_hash, status)) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 5c6f2a349..02cb5f8b1 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -50,8 +50,7 @@ class EncryptedFileManager(object): def setup(self): yield self.stream_info_manager.setup() yield self._add_to_sd_identifier() - # don't block on starting the lbry files - self._start_lbry_files() + yield self._start_lbry_files() log.info("Started file manager") def get_lbry_file_status(self, lbry_file): @@ -86,53 +85,49 @@ class EncryptedFileManager(object): self.sd_identifier.add_stream_downloader_factory( EncryptedFileStreamType, downloader_factory) - @defer.inlineCallbacks - def _check_stream_is_managed(self, stream_hash): - # check that all the streams in the stream_info_manager are also - # tracked by lbry_file_manager and fix any streams that aren't. - rowid = yield self._get_rowid_for_stream_hash(stream_hash) - if rowid is not None: - defer.returnValue(True) - rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate - key, stream_name, file_name = yield self.stream_info_manager.get_stream_info(stream_hash) - log.warning("Trying to fix missing lbry file for %s", stream_name.decode('hex')) - yield self._save_lbry_file(stream_hash, rate) - - @defer.inlineCallbacks - def _check_stream_info_manager(self): - def _iter_streams(stream_hashes): - for stream_hash in stream_hashes: - yield self._check_stream_is_managed(stream_hash) - - stream_hashes = yield self.stream_info_manager.get_all_streams() - log.debug("Checking %s streams", len(stream_hashes)) - yield defer.DeferredList(list(_iter_streams(stream_hashes))) - @defer.inlineCallbacks def _start_lbry_files(self): - yield self._check_stream_info_manager() files_and_options = yield self._get_all_lbry_files() - yield defer.DeferredList([ - self._set_options_and_restore(rowid, stream_hash, options) - for rowid, stream_hash, options in files_and_options - ]) + stream_infos = yield self.stream_info_manager._get_all_stream_infos() + b_prm = self.session.base_payment_rate_manager + payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) + log.info("Trying to start %i files", len(stream_infos)) + for i, (rowid, stream_hash, blob_data_rate, status) in enumerate(files_and_options): + if len(files_and_options) > 500 and i % 500 == 0: + log.info("Started %i/%i files", i, len(stream_infos)) + if stream_hash in stream_infos: + if stream_infos[stream_hash]['suggested_file_name']: + file_name = os.path.basename(stream_infos[stream_hash]['suggested_file_name']) + else: + file_name = os.path.basename(stream_infos[stream_hash]['stream_name']) + lbry_file = ManagedEncryptedFileDownloader( + rowid, + stream_hash, + self.session.peer_finder, + self.session.rate_limiter, + self.session.blob_manager, + self.stream_info_manager, + self, + payment_rate_manager, + self.session.wallet, + self.download_directory, + file_name=file_name, + sd_hash=stream_infos[stream_hash]['sd_hash'], + key=stream_infos[stream_hash]['key'], + stream_name=stream_infos[stream_hash]['stream_name'], + suggested_file_name=stream_infos[stream_hash]['suggested_file_name'] + ) + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(status) + except Exception: + log.warning("Failed to start %i", rowid) + continue + self.lbry_files.append(lbry_file) + log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) - log.info("Started %i lbry files", len(self.lbry_files)) - - @defer.inlineCallbacks - def _set_options_and_restore(self, rowid, stream_hash, options): - try: - b_prm = self.session.base_payment_rate_manager - payment_rate_manager = NegotiatedPaymentRateManager( - b_prm, self.session.blob_tracker) - downloader = yield self.start_lbry_file( - rowid, stream_hash, payment_rate_manager, blob_data_rate=options) - yield downloader.restore() - except Exception: - log.error('An error occurred while starting a lbry file (%s, %s, %s)', - rowid, stream_hash, options) @defer.inlineCallbacks def start_lbry_file(self, rowid, stream_hash, diff --git a/lbrynet/lbry_file/EncryptedFileMetadataManager.py b/lbrynet/lbry_file/EncryptedFileMetadataManager.py index ce1340c82..b3f82d4eb 100644 --- a/lbrynet/lbry_file/EncryptedFileMetadataManager.py +++ b/lbrynet/lbry_file/EncryptedFileMetadataManager.py @@ -1,6 +1,7 @@ +import os import logging import sqlite3 -import os +import binascii from twisted.internet import defer from twisted.python.failure import Failure from twisted.enterprise import adbapi @@ -207,6 +208,30 @@ class DBEncryptedFileMetadataManager(object): d.addCallback(get_result) return d + @rerun_if_locked + @defer.inlineCallbacks + def _get_all_stream_infos(self): + file_results = yield self.db_conn.runQuery("select rowid, * from lbry_files") + descriptor_results = yield self.db_conn.runQuery("select stream_hash, sd_blob_hash " + "from lbry_file_descriptors") + response = {} + for (stream_hash, sd_hash) in descriptor_results: + if stream_hash in response: + log.warning("Duplicate stream %s (sd: %s)", stream_hash, sd_hash[:16]) + continue + response[stream_hash] = { + 'sd_hash': sd_hash + } + for (rowid, stream_hash, key, stream_name, suggested_file_name) in file_results: + if stream_hash not in response: + log.warning("Missing sd hash for %s", stream_hash) + continue + response[stream_hash]['rowid'] = rowid + response[stream_hash]['key'] = binascii.unhexlify(key) + response[stream_hash]['stream_name'] = binascii.unhexlify(stream_name) + response[stream_hash]['suggested_file_name'] = binascii.unhexlify(suggested_file_name) + defer.returnValue(response) + @rerun_if_locked def _check_if_stream_exists(self, stream_hash): d = self.db_conn.runQuery( @@ -321,8 +346,8 @@ class DBEncryptedFileMetadataManager(object): @rerun_if_locked def _get_all_lbry_files(self): - d = self.db_conn.runQuery("select rowid, stream_hash, " - "blob_data_rate from lbry_file_options") + d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate, status " + "from lbry_file_options") return d @rerun_if_locked