start lbry files in parallel

This commit is contained in:
Jack Robison 2018-03-19 13:34:33 -04:00
parent 43c73b9abf
commit f94a9e8729
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -96,41 +96,48 @@ class EncryptedFileManager(object):
suggested_file_name=suggested_file_name suggested_file_name=suggested_file_name
) )
@defer.inlineCallbacks
def _start_lbry_file(self, file_info, payment_rate_manager):
lbry_file = self._get_lbry_file(
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
file_info['suggested_file_name']
)
yield lbry_file.get_claim_info()
try:
# verify the stream is valid (we might have downloaded an invalid stream
# in the past when the validation check didn't work)
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
validate_descriptor(stream_info)
except InvalidStreamDescriptorError as err:
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
lbry_file.sd_hash, err.message)
yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash)
else:
try:
# restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status'])
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file)
if len(self.lbry_files) % 500 == 0:
log.info("Started %i files", len(self.lbry_files))
except Exception:
log.warning("Failed to start %i", file_info.get('rowid'))
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_lbry_files(self): def _start_lbry_files(self):
files = yield self.session.storage.get_all_lbry_files() files = yield self.session.storage.get_all_lbry_files()
b_prm = self.session.base_payment_rate_manager b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
log.info("Trying to start %i files", len(files)) log.info("Starting %i files", len(files))
for i, file_info in enumerate(files): dl = []
if len(files) > 500 and i % 500 == 0: for file_info in files:
log.info("Started %i/%i files", i, len(files)) dl.append(self._start_lbry_file(file_info, payment_rate_manager))
yield defer.DeferredList(dl)
lbry_file = self._get_lbry_file(
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
file_info['suggested_file_name']
)
yield lbry_file.get_claim_info()
try:
# verify the stream is valid (we might have downloaded an invalid stream
# in the past when the validation check didn't work)
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
validate_descriptor(stream_info)
except InvalidStreamDescriptorError as err:
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
lbry_file.sd_hash, err.message)
yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash)
else:
try:
# restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status'])
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file)
except Exception:
log.warning("Failed to start %i", file_info.get('rowid'))
log.info("Started %i lbry files", len(self.lbry_files)) log.info("Started %i lbry files", len(self.lbry_files))
if self.auto_re_reflect is True: if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)