refactor start_lbry_files

This commit is contained in:
Jack Robison 2017-12-29 14:10:16 -05:00
parent 2af61460a7
commit 76de605b7f
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF
3 changed files with 68 additions and 54 deletions

View file

@ -54,22 +54,16 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
def saving_status(self): def saving_status(self):
return self._saving_status return self._saving_status
@defer.inlineCallbacks def restore(self, status):
def restore(self):
status = yield self.lbry_file_manager.get_lbry_file_status(self)
log_status(self.sd_hash, status)
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING: if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
# start returns self.finished_deferred # start returns self.finished_deferred
# which fires when we've finished downloading the file # which fires when we've finished downloading the file
# and we don't want to wait for the entire download # and we don't want to wait for the entire download
self.start() self.start()
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED: elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
defer.returnValue(False) pass
elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED: elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
self.completed = True self.completed = True
defer.returnValue(True)
else: else:
raise Exception("Unknown status for stream %s: %s" % (self.stream_hash, status)) raise Exception("Unknown status for stream %s: %s" % (self.stream_hash, status))

View file

@ -50,8 +50,7 @@ class EncryptedFileManager(object):
def setup(self): def setup(self):
yield self.stream_info_manager.setup() yield self.stream_info_manager.setup()
yield self._add_to_sd_identifier() yield self._add_to_sd_identifier()
# don't block on starting the lbry files yield self._start_lbry_files()
self._start_lbry_files()
log.info("Started file manager") log.info("Started file manager")
def get_lbry_file_status(self, lbry_file): def get_lbry_file_status(self, lbry_file):
@ -86,53 +85,49 @@ class EncryptedFileManager(object):
self.sd_identifier.add_stream_downloader_factory( self.sd_identifier.add_stream_downloader_factory(
EncryptedFileStreamType, downloader_factory) EncryptedFileStreamType, downloader_factory)
@defer.inlineCallbacks
def _check_stream_is_managed(self, stream_hash):
# check that all the streams in the stream_info_manager are also
# tracked by lbry_file_manager and fix any streams that aren't.
rowid = yield self._get_rowid_for_stream_hash(stream_hash)
if rowid is not None:
defer.returnValue(True)
rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate
key, stream_name, file_name = yield self.stream_info_manager.get_stream_info(stream_hash)
log.warning("Trying to fix missing lbry file for %s", stream_name.decode('hex'))
yield self._save_lbry_file(stream_hash, rate)
@defer.inlineCallbacks
def _check_stream_info_manager(self):
def _iter_streams(stream_hashes):
for stream_hash in stream_hashes:
yield self._check_stream_is_managed(stream_hash)
stream_hashes = yield self.stream_info_manager.get_all_streams()
log.debug("Checking %s streams", len(stream_hashes))
yield defer.DeferredList(list(_iter_streams(stream_hashes)))
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_lbry_files(self): def _start_lbry_files(self):
yield self._check_stream_info_manager()
files_and_options = yield self._get_all_lbry_files() files_and_options = yield self._get_all_lbry_files()
yield defer.DeferredList([ stream_infos = yield self.stream_info_manager._get_all_stream_infos()
self._set_options_and_restore(rowid, stream_hash, options) b_prm = self.session.base_payment_rate_manager
for rowid, stream_hash, options in files_and_options payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
]) log.info("Trying to start %i files", len(stream_infos))
for i, (rowid, stream_hash, blob_data_rate, status) in enumerate(files_and_options):
if len(files_and_options) > 500 and i % 500 == 0:
log.info("Started %i/%i files", i, len(stream_infos))
if stream_hash in stream_infos:
if stream_infos[stream_hash]['suggested_file_name']:
file_name = os.path.basename(stream_infos[stream_hash]['suggested_file_name'])
else:
file_name = os.path.basename(stream_infos[stream_hash]['stream_name'])
lbry_file = ManagedEncryptedFileDownloader(
rowid,
stream_hash,
self.session.peer_finder,
self.session.rate_limiter,
self.session.blob_manager,
self.stream_info_manager,
self,
payment_rate_manager,
self.session.wallet,
self.download_directory,
file_name=file_name,
sd_hash=stream_infos[stream_hash]['sd_hash'],
key=stream_infos[stream_hash]['key'],
stream_name=stream_infos[stream_hash]['stream_name'],
suggested_file_name=stream_infos[stream_hash]['suggested_file_name']
)
try:
# restore will raise an Exception if status is unknown
lbry_file.restore(status)
except Exception:
log.warning("Failed to start %i", rowid)
continue
self.lbry_files.append(lbry_file)
log.info("Started %i lbry files", len(self.lbry_files))
if self.auto_re_reflect is True: if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
log.info("Started %i lbry files", len(self.lbry_files))
@defer.inlineCallbacks
def _set_options_and_restore(self, rowid, stream_hash, options):
try:
b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(
b_prm, self.session.blob_tracker)
downloader = yield self.start_lbry_file(
rowid, stream_hash, payment_rate_manager, blob_data_rate=options)
yield downloader.restore()
except Exception:
log.error('An error occurred while starting a lbry file (%s, %s, %s)',
rowid, stream_hash, options)
@defer.inlineCallbacks @defer.inlineCallbacks
def start_lbry_file(self, rowid, stream_hash, def start_lbry_file(self, rowid, stream_hash,

View file

@ -1,6 +1,7 @@
import os
import logging import logging
import sqlite3 import sqlite3
import os import binascii
from twisted.internet import defer from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
@ -207,6 +208,30 @@ class DBEncryptedFileMetadataManager(object):
d.addCallback(get_result) d.addCallback(get_result)
return d return d
@rerun_if_locked
@defer.inlineCallbacks
def _get_all_stream_infos(self):
file_results = yield self.db_conn.runQuery("select rowid, * from lbry_files")
descriptor_results = yield self.db_conn.runQuery("select stream_hash, sd_blob_hash "
"from lbry_file_descriptors")
response = {}
for (stream_hash, sd_hash) in descriptor_results:
if stream_hash in response:
log.warning("Duplicate stream %s (sd: %s)", stream_hash, sd_hash[:16])
continue
response[stream_hash] = {
'sd_hash': sd_hash
}
for (rowid, stream_hash, key, stream_name, suggested_file_name) in file_results:
if stream_hash not in response:
log.warning("Missing sd hash for %s", stream_hash)
continue
response[stream_hash]['rowid'] = rowid
response[stream_hash]['key'] = binascii.unhexlify(key)
response[stream_hash]['stream_name'] = binascii.unhexlify(stream_name)
response[stream_hash]['suggested_file_name'] = binascii.unhexlify(suggested_file_name)
defer.returnValue(response)
@rerun_if_locked @rerun_if_locked
def _check_if_stream_exists(self, stream_hash): def _check_if_stream_exists(self, stream_hash):
d = self.db_conn.runQuery( d = self.db_conn.runQuery(
@ -321,8 +346,8 @@ class DBEncryptedFileMetadataManager(object):
@rerun_if_locked @rerun_if_locked
def _get_all_lbry_files(self): def _get_all_lbry_files(self):
d = self.db_conn.runQuery("select rowid, stream_hash, " d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate, status "
"blob_data_rate from lbry_file_options") "from lbry_file_options")
return d return d
@rerun_if_locked @rerun_if_locked