From f94a9e8729fe1de7ef1bfd8a0df7d9e734dff058 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 19 Mar 2018 13:34:33 -0400 Subject: [PATCH] start lbry files in parallel --- lbrynet/file_manager/EncryptedFileManager.py | 63 +++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 96b56c0ab..581dfe2a2 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -96,41 +96,48 @@ class EncryptedFileManager(object): suggested_file_name=suggested_file_name ) + @defer.inlineCallbacks + def _start_lbry_file(self, file_info, payment_rate_manager): + lbry_file = self._get_lbry_file( + file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], + file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], + file_info['suggested_file_name'] + ) + yield lbry_file.get_claim_info() + try: + # verify the stream is valid (we might have downloaded an invalid stream + # in the past when the validation check didn't work) + stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) + validate_descriptor(stream_info) + except InvalidStreamDescriptorError as err: + log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", + lbry_file.sd_hash, err.message) + yield lbry_file.delete_data() + yield self.session.storage.delete_stream(lbry_file.stream_hash) + else: + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info + self.lbry_files.append(lbry_file) + if len(self.lbry_files) % 500 == 0: + log.info("Started %i files", len(self.lbry_files)) + except Exception: + log.warning("Failed to start %i", file_info.get('rowid')) + @defer.inlineCallbacks def _start_lbry_files(self): files = yield self.session.storage.get_all_lbry_files() b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) - log.info("Trying to start %i files", len(files)) - for i, file_info in enumerate(files): - if len(files) > 500 and i % 500 == 0: - log.info("Started %i/%i files", i, len(files)) + log.info("Starting %i files", len(files)) + dl = [] + for file_info in files: + dl.append(self._start_lbry_file(file_info, payment_rate_manager)) + + yield defer.DeferredList(dl) - lbry_file = self._get_lbry_file( - file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], - file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], - file_info['suggested_file_name'] - ) - yield lbry_file.get_claim_info() - try: - # verify the stream is valid (we might have downloaded an invalid stream - # in the past when the validation check didn't work) - stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) - validate_descriptor(stream_info) - except InvalidStreamDescriptorError as err: - log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", - lbry_file.sd_hash, err.message) - yield lbry_file.delete_data() - yield self.session.storage.delete_stream(lbry_file.stream_hash) - else: - try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - except Exception: - log.warning("Failed to start %i", file_info.get('rowid')) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)