Merge branch 'faster-startup'
This commit is contained in:
commit
92356840ae
18 changed files with 170 additions and 179 deletions
|
@ -18,6 +18,9 @@ at anytime.
|
|||
* Fixed value error due to a race condition when saving to the claim cache (https://github.com/lbryio/lbry/issues/1013)
|
||||
* Fixed being unable to re-download updated content (#951)
|
||||
* Fixed sending error messages for failed api requests
|
||||
* Fixed the file manager startup being slow when handling thousands of files
|
||||
* Fixed handling decryption error for blobs encrypted with an invalid key
|
||||
* Fixed handling stream with no data blob (https://github.com/lbryio/lbry/issues/905)
|
||||
|
||||
### Deprecated
|
||||
* `channel_list_mine`, replaced with `channel_list`
|
||||
|
|
|
@ -566,7 +566,7 @@ class Config(object):
|
|||
if not self._installation_id:
|
||||
if os.path.isfile(install_id_filename):
|
||||
with open(install_id_filename, "r") as install_id_file:
|
||||
self._installation_id = install_id_file.read()
|
||||
self._installation_id = str(install_id_file.read()).strip()
|
||||
if not self._installation_id:
|
||||
self._installation_id = base58.b58encode(utils.generate_id())
|
||||
with open(install_id_filename, "w") as install_id_file:
|
||||
|
@ -578,7 +578,7 @@ class Config(object):
|
|||
if not self._node_id:
|
||||
if os.path.isfile(node_id_filename):
|
||||
with open(node_id_filename, "r") as node_id_file:
|
||||
self._node_id = base58.b58decode(node_id_file.read())
|
||||
self._node_id = base58.b58decode(str(node_id_file.read()).strip())
|
||||
if not self._node_id:
|
||||
self._node_id = utils.generate_id()
|
||||
with open(node_id_filename, "w") as node_id_file:
|
||||
|
|
|
@ -941,6 +941,7 @@ class Wallet(object):
|
|||
bid, certificate_id, claim_address, change_address)
|
||||
|
||||
if not claim['success']:
|
||||
log.error(claim)
|
||||
msg = 'Claim to name {} failed: {}'.format(name, claim['reason'])
|
||||
raise Exception(msg)
|
||||
|
||||
|
@ -1312,9 +1313,14 @@ class LBRYumWallet(Wallet):
|
|||
return defer.succeed(True)
|
||||
|
||||
def _check_large_wallet(self):
|
||||
if len(self.wallet.addresses(include_change=False)) > 1000:
|
||||
log.warning(("Your wallet is excessively large, please follow instructions here: ",
|
||||
"https://github.com/lbryio/lbry/issues/437 to reduce your wallet size"))
|
||||
addr_count = len(self.wallet.addresses(include_change=False))
|
||||
if addr_count > 1000:
|
||||
log.warning("Your wallet is excessively large (%i addresses), "
|
||||
"please follow instructions here: "
|
||||
"https://github.com/lbryio/lbry/issues/437 to reduce your wallet size",
|
||||
addr_count)
|
||||
else:
|
||||
log.info("Wallet has %i addresses", addr_count)
|
||||
|
||||
def _load_blockchain(self):
|
||||
blockchain_caught_d = defer.Deferred()
|
||||
|
|
|
@ -179,8 +179,12 @@ class ConnectionManager(object):
|
|||
|
||||
# find peers for the head blob if configured to do so
|
||||
if self.seek_head_blob_first:
|
||||
peers = yield request_creator.get_new_peers_for_head_blob()
|
||||
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||
try:
|
||||
peers = yield request_creator.get_new_peers_for_head_blob()
|
||||
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||
except KeyError:
|
||||
log.warning("%s does not have a head blob", self._get_log_name())
|
||||
peers = []
|
||||
else:
|
||||
peers = []
|
||||
|
||||
|
@ -196,10 +200,8 @@ class ConnectionManager(object):
|
|||
self._get_log_name(), self._peer_connections.keys())
|
||||
log.debug("%s List of connection states: %s", self._get_log_name(),
|
||||
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
||||
|
||||
defer.returnValue(peers)
|
||||
|
||||
|
||||
def _connect_to_peer(self, peer):
|
||||
if self.stopped:
|
||||
return
|
||||
|
|
|
@ -154,7 +154,8 @@ class FullStreamProgressManager(StreamProgressManager):
|
|||
d.addCallback(lambda _: check_if_finished())
|
||||
|
||||
def log_error(err):
|
||||
log.warning("Error occurred in the output loop. Error: %s", err)
|
||||
log.warning("Error outputting blob %s: %s", blobs[current_blob_num].blob_hash,
|
||||
err.getErrorMessage())
|
||||
if self.outputting_d is not None and not self.outputting_d.called:
|
||||
self.outputting_d.callback(True)
|
||||
self.outputting_d = None
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import binascii
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from lbrynet.cryptstream.CryptBlob import StreamBlobDecryptor
|
||||
from lbrynet.interfaces import IBlobHandler
|
||||
|
||||
|
@ -14,7 +15,10 @@ class CryptBlobHandler(object):
|
|||
######## IBlobHandler #########
|
||||
|
||||
def handle_blob(self, blob, blob_info):
|
||||
blob_decryptor = StreamBlobDecryptor(
|
||||
blob, self.key, binascii.unhexlify(blob_info.iv), blob_info.length)
|
||||
try:
|
||||
blob_decryptor = StreamBlobDecryptor(blob, self.key, binascii.unhexlify(blob_info.iv),
|
||||
blob_info.length)
|
||||
except ValueError as err:
|
||||
return defer.fail(err)
|
||||
d = blob_decryptor.decrypt(self.write_func)
|
||||
return d
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import binascii
|
||||
import logging
|
||||
from zope.interface import implements
|
||||
from lbrynet.interfaces import IStreamDownloader
|
||||
|
@ -37,8 +38,8 @@ class CryptStreamDownloader(object):
|
|||
|
||||
implements(IStreamDownloader)
|
||||
|
||||
def __init__(self, peer_finder, rate_limiter, blob_manager,
|
||||
payment_rate_manager, wallet):
|
||||
def __init__(self, peer_finder, rate_limiter, blob_manager, payment_rate_manager, wallet,
|
||||
key, stream_name):
|
||||
"""Initialize a CryptStreamDownloader
|
||||
|
||||
@param peer_finder: An object which implements the IPeerFinder
|
||||
|
@ -61,8 +62,8 @@ class CryptStreamDownloader(object):
|
|||
self.blob_manager = blob_manager
|
||||
self.payment_rate_manager = payment_rate_manager
|
||||
self.wallet = wallet
|
||||
self.key = None
|
||||
self.stream_name = None
|
||||
self.key = binascii.unhexlify(key)
|
||||
self.stream_name = binascii.unhexlify(stream_name)
|
||||
self.completed = False
|
||||
self.stopped = True
|
||||
self.stopping = False
|
||||
|
|
|
@ -514,7 +514,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def _setup_lbry_file_manager(self):
|
||||
log.info('Starting to setup up file manager')
|
||||
log.info('Starting the file manager')
|
||||
self.startup_status = STARTUP_STAGES[3]
|
||||
self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir)
|
||||
self.lbry_file_manager = EncryptedFileManager(
|
||||
|
@ -670,8 +670,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
self.streams[sd_hash] = GetStream(self.sd_identifier, self.session,
|
||||
self.exchange_rate_manager, self.max_key_fee,
|
||||
self.disable_max_key_fee,
|
||||
conf.settings['data_rate'], timeout,
|
||||
file_name)
|
||||
conf.settings['data_rate'], timeout)
|
||||
try:
|
||||
lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name)
|
||||
yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout)
|
||||
|
@ -917,7 +916,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
defer.returnValue(lbry_file)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_lbry_files(self, return_json=False, full_status=False, **kwargs):
|
||||
def _get_lbry_files(self, return_json=False, full_status=True, **kwargs):
|
||||
lbry_files = list(self.lbry_file_manager.lbry_files)
|
||||
if kwargs:
|
||||
for search_type, value in iter_lbry_file_search_values(kwargs):
|
||||
|
@ -2041,6 +2040,8 @@ class Daemon(AuthJSONRPCServer):
|
|||
'claim_address': claim_address,
|
||||
'change_address': change_address,
|
||||
'claim_dict': claim_dict,
|
||||
'channel_id': channel_id,
|
||||
'channel_name': channel_name
|
||||
})
|
||||
|
||||
if channel_id:
|
||||
|
|
|
@ -31,15 +31,13 @@ log = logging.getLogger(__name__)
|
|||
|
||||
class GetStream(object):
|
||||
def __init__(self, sd_identifier, session, exchange_rate_manager,
|
||||
max_key_fee, disable_max_key_fee, data_rate=None, timeout=None,
|
||||
file_name=None):
|
||||
max_key_fee, disable_max_key_fee, data_rate=None, timeout=None):
|
||||
|
||||
self.timeout = timeout or conf.settings['download_timeout']
|
||||
self.data_rate = data_rate or conf.settings['data_rate']
|
||||
self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1]
|
||||
self.disable_max_key_fee = disable_max_key_fee or conf.settings['disable_max_key_fee']
|
||||
self.download_directory = conf.settings['download_directory']
|
||||
self.file_name = file_name
|
||||
self.timeout_counter = 0
|
||||
self.code = None
|
||||
self.sd_hash = None
|
||||
|
@ -126,7 +124,6 @@ class GetStream(object):
|
|||
[self.data_rate],
|
||||
self.payment_rate_manager,
|
||||
download_directory=self.download_directory,
|
||||
file_name=self.file_name
|
||||
)
|
||||
defer.returnValue(downloader)
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ from twisted.internet import defer
|
|||
|
||||
from lbrynet.core import file_utils
|
||||
from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file
|
||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.lbry_file.StreamDescriptor import publish_sd_blob
|
||||
|
||||
|
||||
|
@ -36,13 +37,15 @@ class Publisher(object):
|
|||
read_handle)
|
||||
sd_hash = yield publish_sd_blob(self.lbry_file_manager.stream_info_manager,
|
||||
self.session.blob_manager, stream_hash)
|
||||
self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash)
|
||||
status = ManagedEncryptedFileDownloader.STATUS_FINISHED
|
||||
self.lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash,
|
||||
status=status)
|
||||
if 'source' not in claim_dict['stream']:
|
||||
claim_dict['stream']['source'] = {}
|
||||
claim_dict['stream']['source']['source'] = sd_hash
|
||||
claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash'
|
||||
claim_dict['stream']['source']['contentType'] = get_content_type(file_path)
|
||||
claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here
|
||||
claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here
|
||||
|
||||
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
|
||||
self.lbry_file.completed = True
|
||||
|
|
|
@ -27,28 +27,24 @@ def log_status(sd_hash, status):
|
|||
status_string = "finished"
|
||||
else:
|
||||
status_string = "unknown"
|
||||
log.info("stream %s is %s", short_hash(sd_hash), status_string)
|
||||
log.debug("stream %s is %s", short_hash(sd_hash), status_string)
|
||||
|
||||
|
||||
class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||
STATUS_RUNNING = "running"
|
||||
STATUS_STOPPED = "stopped"
|
||||
STATUS_FINISHED = "finished"
|
||||
"""
|
||||
These are started by EncryptedFileManager, aka, file_manager
|
||||
"""
|
||||
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter,
|
||||
blob_manager, stream_info_manager, lbry_file_manager,
|
||||
payment_rate_manager, wallet, download_directory,
|
||||
file_name=None):
|
||||
|
||||
def __init__(self, rowid, stream_hash, peer_finder, rate_limiter, blob_manager,
|
||||
stream_info_manager, lbry_file_manager, payment_rate_manager, wallet,
|
||||
download_directory, sd_hash=None, key=None, stream_name=None,
|
||||
suggested_file_name=None):
|
||||
EncryptedFileSaver.__init__(self, stream_hash, peer_finder,
|
||||
rate_limiter, blob_manager,
|
||||
stream_info_manager,
|
||||
payment_rate_manager, wallet,
|
||||
download_directory,
|
||||
file_name)
|
||||
|
||||
download_directory, key, stream_name, suggested_file_name)
|
||||
self.sd_hash = sd_hash
|
||||
self.rowid = rowid
|
||||
self.lbry_file_manager = lbry_file_manager
|
||||
self._saving_status = False
|
||||
|
@ -57,22 +53,16 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
|||
def saving_status(self):
|
||||
return self._saving_status
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def restore(self):
|
||||
|
||||
status = yield self.lbry_file_manager.get_lbry_file_status(self)
|
||||
log_status(self.sd_hash, status)
|
||||
|
||||
def restore(self, status):
|
||||
if status == ManagedEncryptedFileDownloader.STATUS_RUNNING:
|
||||
# start returns self.finished_deferred
|
||||
# which fires when we've finished downloading the file
|
||||
# and we don't want to wait for the entire download
|
||||
self.start()
|
||||
elif status == ManagedEncryptedFileDownloader.STATUS_STOPPED:
|
||||
defer.returnValue(False)
|
||||
pass
|
||||
elif status == ManagedEncryptedFileDownloader.STATUS_FINISHED:
|
||||
self.completed = True
|
||||
defer.returnValue(True)
|
||||
else:
|
||||
raise Exception("Unknown status for stream %s: %s" % (self.stream_hash, status))
|
||||
|
||||
|
@ -147,8 +137,7 @@ class ManagedEncryptedFileDownloaderFactory(object):
|
|||
return True
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None,
|
||||
file_name=None):
|
||||
def make_downloader(self, metadata, options, payment_rate_manager, download_directory=None):
|
||||
assert len(options) == 1
|
||||
data_rate = options[0]
|
||||
stream_hash = yield save_sd_info(self.lbry_file_manager.stream_info_manager,
|
||||
|
@ -156,9 +145,11 @@ class ManagedEncryptedFileDownloaderFactory(object):
|
|||
if metadata.metadata_source == StreamMetadata.FROM_BLOB:
|
||||
yield self.lbry_file_manager.save_sd_blob_hash_to_stream(stream_hash,
|
||||
metadata.source_blob_hash)
|
||||
lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash, payment_rate_manager,
|
||||
lbry_file = yield self.lbry_file_manager.add_lbry_file(stream_hash,
|
||||
metadata.source_blob_hash,
|
||||
payment_rate_manager,
|
||||
data_rate,
|
||||
download_directory, file_name)
|
||||
download_directory)
|
||||
defer.returnValue(lbry_file)
|
||||
|
||||
@staticmethod
|
||||
|
|
|
@ -12,7 +12,7 @@ from lbrynet.reflector.reupload import reflect_stream
|
|||
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
|
||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
|
||||
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
|
||||
from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType
|
||||
from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType, get_sd_info
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError
|
||||
from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError
|
||||
from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call
|
||||
|
@ -50,8 +50,7 @@ class EncryptedFileManager(object):
|
|||
def setup(self):
|
||||
yield self.stream_info_manager.setup()
|
||||
yield self._add_to_sd_identifier()
|
||||
# don't block on starting the lbry files
|
||||
self._start_lbry_files()
|
||||
yield self._start_lbry_files()
|
||||
log.info("Started file manager")
|
||||
|
||||
def get_lbry_file_status(self, lbry_file):
|
||||
|
@ -86,62 +85,11 @@ class EncryptedFileManager(object):
|
|||
self.sd_identifier.add_stream_downloader_factory(
|
||||
EncryptedFileStreamType, downloader_factory)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_stream_is_managed(self, stream_hash):
|
||||
# check that all the streams in the stream_info_manager are also
|
||||
# tracked by lbry_file_manager and fix any streams that aren't.
|
||||
rowid = yield self._get_rowid_for_stream_hash(stream_hash)
|
||||
if rowid is not None:
|
||||
defer.returnValue(True)
|
||||
rate = self.session.base_payment_rate_manager.min_blob_data_payment_rate
|
||||
key, stream_name, file_name = yield self.stream_info_manager.get_stream_info(stream_hash)
|
||||
log.warning("Trying to fix missing lbry file for %s", stream_name.decode('hex'))
|
||||
yield self._save_lbry_file(stream_hash, rate)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_stream_info_manager(self):
|
||||
def _iter_streams(stream_hashes):
|
||||
for stream_hash in stream_hashes:
|
||||
yield self._check_stream_is_managed(stream_hash)
|
||||
|
||||
stream_hashes = yield self.stream_info_manager.get_all_streams()
|
||||
log.debug("Checking %s streams", len(stream_hashes))
|
||||
yield defer.DeferredList(list(_iter_streams(stream_hashes)))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_lbry_files(self):
|
||||
yield self._check_stream_info_manager()
|
||||
files_and_options = yield self._get_all_lbry_files()
|
||||
yield defer.DeferredList([
|
||||
self._set_options_and_restore(rowid, stream_hash, options)
|
||||
for rowid, stream_hash, options in files_and_options
|
||||
])
|
||||
|
||||
if self.auto_re_reflect is True:
|
||||
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
|
||||
log.info("Started %i lbry files", len(self.lbry_files))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _set_options_and_restore(self, rowid, stream_hash, options):
|
||||
try:
|
||||
b_prm = self.session.base_payment_rate_manager
|
||||
payment_rate_manager = NegotiatedPaymentRateManager(
|
||||
b_prm, self.session.blob_tracker)
|
||||
downloader = yield self.start_lbry_file(
|
||||
rowid, stream_hash, payment_rate_manager, blob_data_rate=options)
|
||||
yield downloader.restore()
|
||||
except Exception:
|
||||
log.error('An error occurred while starting a lbry file (%s, %s, %s)',
|
||||
rowid, stream_hash, options)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start_lbry_file(self, rowid, stream_hash,
|
||||
payment_rate_manager, blob_data_rate=None,
|
||||
download_directory=None, file_name=None):
|
||||
if not download_directory:
|
||||
download_directory = self.download_directory
|
||||
payment_rate_manager.min_blob_data_payment_rate = blob_data_rate
|
||||
lbry_file_downloader = ManagedEncryptedFileDownloader(
|
||||
def _get_lbry_file(self, rowid, stream_hash, payment_rate_manager, sd_hash, key,
|
||||
stream_name, suggested_file_name, download_directory=None):
|
||||
download_directory = download_directory or self.download_directory
|
||||
payment_rate_manager = payment_rate_manager or self.session.payment_rate_manager
|
||||
return ManagedEncryptedFileDownloader(
|
||||
rowid,
|
||||
stream_hash,
|
||||
self.session.peer_finder,
|
||||
|
@ -152,17 +100,46 @@ class EncryptedFileManager(object):
|
|||
payment_rate_manager,
|
||||
self.session.wallet,
|
||||
download_directory,
|
||||
file_name=file_name
|
||||
sd_hash=sd_hash,
|
||||
key=key,
|
||||
stream_name=stream_name,
|
||||
suggested_file_name=suggested_file_name
|
||||
)
|
||||
yield lbry_file_downloader.set_stream_info()
|
||||
self.lbry_files.append(lbry_file_downloader)
|
||||
defer.returnValue(lbry_file_downloader)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_lbry_files(self):
|
||||
files_and_options = yield self._get_all_lbry_files()
|
||||
stream_infos = yield self.stream_info_manager._get_all_stream_infos()
|
||||
b_prm = self.session.base_payment_rate_manager
|
||||
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
|
||||
log.info("Trying to start %i files", len(stream_infos))
|
||||
for i, (rowid, stream_hash, blob_data_rate, status) in enumerate(files_and_options):
|
||||
if len(files_and_options) > 500 and i % 500 == 0:
|
||||
log.info("Started %i/%i files", i, len(stream_infos))
|
||||
if stream_hash in stream_infos:
|
||||
lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager,
|
||||
stream_infos[stream_hash]['sd_hash'],
|
||||
stream_infos[stream_hash]['key'],
|
||||
stream_infos[stream_hash]['stream_name'],
|
||||
stream_infos[stream_hash]['suggested_file_name'])
|
||||
log.info("initialized file %s", lbry_file.stream_name)
|
||||
try:
|
||||
# restore will raise an Exception if status is unknown
|
||||
lbry_file.restore(status)
|
||||
self.lbry_files.append(lbry_file)
|
||||
except Exception:
|
||||
log.warning("Failed to start %i", rowid)
|
||||
continue
|
||||
log.info("Started %i lbry files", len(self.lbry_files))
|
||||
if self.auto_re_reflect is True:
|
||||
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _stop_lbry_file(self, lbry_file):
|
||||
def wait_for_finished(lbry_file, count=2):
|
||||
if count or lbry_file.saving_status is not False:
|
||||
return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file, count=count - 1)
|
||||
return task.deferLater(reactor, 1, self._stop_lbry_file, lbry_file,
|
||||
count=count - 1)
|
||||
try:
|
||||
yield lbry_file.stop(change_status=False)
|
||||
self.lbry_files.remove(lbry_file)
|
||||
|
@ -180,14 +157,18 @@ class EncryptedFileManager(object):
|
|||
yield self._stop_lbry_file(lbry_file)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_lbry_file(self, stream_hash, payment_rate_manager=None, blob_data_rate=None,
|
||||
download_directory=None, file_name=None):
|
||||
if not payment_rate_manager:
|
||||
payment_rate_manager = self.session.payment_rate_manager
|
||||
def add_lbry_file(self, stream_hash, sd_hash, payment_rate_manager=None, blob_data_rate=None,
|
||||
download_directory=None, status=None):
|
||||
rowid = yield self._save_lbry_file(stream_hash, blob_data_rate)
|
||||
lbry_file = yield self.start_lbry_file(rowid, stream_hash, payment_rate_manager,
|
||||
blob_data_rate, download_directory,
|
||||
file_name)
|
||||
stream_metadata = yield get_sd_info(self.stream_info_manager,
|
||||
stream_hash, False)
|
||||
key = stream_metadata['key']
|
||||
stream_name = stream_metadata['stream_name']
|
||||
suggested_file_name = stream_metadata['suggested_file_name']
|
||||
lbry_file = self._get_lbry_file(rowid, stream_hash, payment_rate_manager, sd_hash, key,
|
||||
stream_name, suggested_file_name, download_directory)
|
||||
lbry_file.restore(status or ManagedEncryptedFileDownloader.STATUS_STOPPED)
|
||||
self.lbry_files.append(lbry_file)
|
||||
defer.returnValue(lbry_file)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import os
|
||||
import logging
|
||||
import sqlite3
|
||||
import os
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.enterprise import adbapi
|
||||
|
@ -207,6 +207,30 @@ class DBEncryptedFileMetadataManager(object):
|
|||
d.addCallback(get_result)
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
@defer.inlineCallbacks
|
||||
def _get_all_stream_infos(self):
|
||||
file_results = yield self.db_conn.runQuery("select rowid, * from lbry_files")
|
||||
descriptor_results = yield self.db_conn.runQuery("select stream_hash, sd_blob_hash "
|
||||
"from lbry_file_descriptors")
|
||||
response = {}
|
||||
for (stream_hash, sd_hash) in descriptor_results:
|
||||
if stream_hash in response:
|
||||
log.warning("Duplicate stream %s (sd: %s)", stream_hash, sd_hash[:16])
|
||||
continue
|
||||
response[stream_hash] = {
|
||||
'sd_hash': sd_hash
|
||||
}
|
||||
for (rowid, stream_hash, key, stream_name, suggested_file_name) in file_results:
|
||||
if stream_hash not in response:
|
||||
log.warning("Missing sd hash for %s", stream_hash)
|
||||
continue
|
||||
response[stream_hash]['rowid'] = rowid
|
||||
response[stream_hash]['key'] = key
|
||||
response[stream_hash]['stream_name'] = stream_name
|
||||
response[stream_hash]['suggested_file_name'] = suggested_file_name
|
||||
defer.returnValue(response)
|
||||
|
||||
@rerun_if_locked
|
||||
def _check_if_stream_exists(self, stream_hash):
|
||||
d = self.db_conn.runQuery(
|
||||
|
@ -321,8 +345,8 @@ class DBEncryptedFileMetadataManager(object):
|
|||
|
||||
@rerun_if_locked
|
||||
def _get_all_lbry_files(self):
|
||||
d = self.db_conn.runQuery("select rowid, stream_hash, "
|
||||
"blob_data_rate from lbry_file_options")
|
||||
d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate, status "
|
||||
"from lbry_file_options")
|
||||
return d
|
||||
|
||||
@rerun_if_locked
|
||||
|
|
|
@ -6,7 +6,6 @@ from lbrynet.lbry_file.StreamDescriptor import save_sd_info
|
|||
from lbrynet.cryptstream.client.CryptStreamDownloader import CryptStreamDownloader
|
||||
from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager
|
||||
from lbrynet.core.StreamDescriptor import StreamMetadata
|
||||
from lbrynet.core.Error import NoSuchStreamHash
|
||||
from lbrynet.interfaces import IStreamDownloaderFactory
|
||||
from lbrynet.lbry_file.client.EncryptedFileMetadataHandler import EncryptedFileMetadataHandler
|
||||
import os
|
||||
|
@ -22,29 +21,14 @@ class EncryptedFileDownloader(CryptStreamDownloader):
|
|||
"""Classes which inherit from this class download LBRY files"""
|
||||
|
||||
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager,
|
||||
stream_info_manager, payment_rate_manager, wallet):
|
||||
stream_info_manager, payment_rate_manager, wallet, key, stream_name,
|
||||
suggested_file_name=None):
|
||||
CryptStreamDownloader.__init__(self, peer_finder, rate_limiter, blob_manager,
|
||||
payment_rate_manager, wallet)
|
||||
payment_rate_manager, wallet, key, stream_name)
|
||||
self.stream_hash = stream_hash
|
||||
self.stream_info_manager = stream_info_manager
|
||||
self.suggested_file_name = None
|
||||
self.suggested_file_name = binascii.unhexlify(suggested_file_name)
|
||||
self._calculated_total_bytes = None
|
||||
self.sd_hash = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_stream_info(self):
|
||||
if self.key is None:
|
||||
out = yield self.stream_info_manager.get_stream_info(self.stream_hash)
|
||||
key, stream_name, suggested_file_name = out
|
||||
self.key = binascii.unhexlify(key)
|
||||
self.stream_name = binascii.unhexlify(stream_name)
|
||||
self.suggested_file_name = binascii.unhexlify(suggested_file_name)
|
||||
|
||||
out = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)
|
||||
if out:
|
||||
self.sd_hash = out[0]
|
||||
else:
|
||||
raise NoSuchStreamHash(self.stream_hash)
|
||||
|
||||
def delete_data(self):
|
||||
d1 = self.stream_info_manager.get_blobs_for_stream(self.stream_hash)
|
||||
|
@ -171,13 +155,13 @@ class EncryptedFileDownloaderFactory(object):
|
|||
|
||||
class EncryptedFileSaver(EncryptedFileDownloader):
|
||||
def __init__(self, stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet, download_directory, file_name=None):
|
||||
EncryptedFileDownloader.__init__(self, stream_hash,
|
||||
peer_finder, rate_limiter,
|
||||
blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet)
|
||||
payment_rate_manager, wallet, download_directory, key, stream_name,
|
||||
suggested_file_name):
|
||||
EncryptedFileDownloader.__init__(self, stream_hash, peer_finder, rate_limiter,
|
||||
blob_manager, stream_info_manager, payment_rate_manager,
|
||||
wallet, key, stream_name, suggested_file_name)
|
||||
self.download_directory = download_directory
|
||||
self.file_name = file_name
|
||||
self.file_name = os.path.basename(self.suggested_file_name)
|
||||
self.file_written_to = None
|
||||
self.file_handle = None
|
||||
|
||||
|
@ -187,19 +171,6 @@ class EncryptedFileSaver(EncryptedFileDownloader):
|
|||
else:
|
||||
return str(self.file_name)
|
||||
|
||||
def set_stream_info(self):
|
||||
d = EncryptedFileDownloader.set_stream_info(self)
|
||||
|
||||
def set_file_name():
|
||||
if self.file_name is None:
|
||||
if self.suggested_file_name:
|
||||
self.file_name = os.path.basename(self.suggested_file_name)
|
||||
else:
|
||||
self.file_name = os.path.basename(self.stream_name)
|
||||
|
||||
d.addCallback(lambda _: set_file_name())
|
||||
return d
|
||||
|
||||
def stop(self, err=None):
|
||||
d = EncryptedFileDownloader.stop(self, err=err)
|
||||
d.addCallback(lambda _: self._delete_from_info_manager())
|
||||
|
@ -273,11 +244,14 @@ class EncryptedFileSaverFactory(EncryptedFileDownloaderFactory):
|
|||
self.download_directory = download_directory
|
||||
|
||||
def _make_downloader(self, stream_hash, payment_rate_manager, stream_info):
|
||||
return EncryptedFileSaver(stream_hash, self.peer_finder,
|
||||
self.rate_limiter, self.blob_manager,
|
||||
self.stream_info_manager,
|
||||
payment_rate_manager, self.wallet,
|
||||
self.download_directory)
|
||||
stream_name = stream_info.raw_info['stream_name']
|
||||
key = stream_info.raw_info['key']
|
||||
suggested_file_name = stream_info.raw_info['suggested_file_name']
|
||||
return EncryptedFileSaver(stream_hash, self.peer_finder, self.rate_limiter,
|
||||
self.blob_manager, self.stream_info_manager,
|
||||
payment_rate_manager, self.wallet, self.download_directory,
|
||||
key=key, stream_name=stream_name,
|
||||
suggested_file_name=suggested_file_name)
|
||||
|
||||
@staticmethod
|
||||
def get_description():
|
||||
|
|
|
@ -108,7 +108,7 @@ class ReflectorServer(Protocol):
|
|||
yield save_sd_info(self.stream_info_manager, sd_info)
|
||||
yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'],
|
||||
blob.blob_hash)
|
||||
self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'])
|
||||
yield self.lbry_file_manager.add_lbry_file(sd_info['stream_hash'], blob.blob_hash)
|
||||
should_announce = True
|
||||
|
||||
# if we already have the head blob, set it to be announced now that we know it's
|
||||
|
|
|
@ -144,9 +144,10 @@ class TestStreamify(TestCase):
|
|||
d = lbry_file.start()
|
||||
return d
|
||||
|
||||
def combine_stream(stream_hash):
|
||||
def combine_stream(info):
|
||||
stream_hash, sd_hash = info
|
||||
prm = self.session.payment_rate_manager
|
||||
d = self.lbry_file_manager.add_lbry_file(stream_hash, prm)
|
||||
d = self.lbry_file_manager.add_lbry_file(stream_hash, sd_hash, prm)
|
||||
d.addCallback(start_lbry_file)
|
||||
|
||||
def check_md5_sum():
|
||||
|
@ -163,8 +164,9 @@ class TestStreamify(TestCase):
|
|||
test_file = GenFile(53209343, b''.join([chr(i + 5) for i in xrange(0, 64, 6)]))
|
||||
stream_hash = yield create_lbry_file(self.session, self.lbry_file_manager, "test_file",
|
||||
test_file, suggested_file_name="test_file")
|
||||
yield publish_sd_blob(self.stream_info_manager, self.session.blob_manager, stream_hash)
|
||||
defer.returnValue(stream_hash)
|
||||
sd_hash = yield publish_sd_blob(self.stream_info_manager, self.session.blob_manager,
|
||||
stream_hash)
|
||||
defer.returnValue((stream_hash, sd_hash))
|
||||
|
||||
d = self.session.setup()
|
||||
d.addCallback(lambda _: self.stream_info_manager.setup())
|
||||
|
|
|
@ -10,6 +10,7 @@ class TestEncryptedFileSaver(unittest.TestCase):
|
|||
@defer.inlineCallbacks
|
||||
def test_setup_output(self):
|
||||
file_name = 'encrypted_file_saver_test.tmp'
|
||||
file_name_hex = file_name.encode('hex')
|
||||
self.assertFalse(os.path.isfile(file_name))
|
||||
|
||||
# create file in the temporary trial folder
|
||||
|
@ -21,13 +22,13 @@ class TestEncryptedFileSaver(unittest.TestCase):
|
|||
payment_rate_manager = None
|
||||
wallet = None
|
||||
download_directory = '.'
|
||||
upload_allowed = False
|
||||
saver = EncryptedFileSaver(
|
||||
stream_hash, peer_finder, rate_limiter, blob_manager, stream_info_manager,
|
||||
payment_rate_manager, wallet, download_directory, file_name)
|
||||
key = ''
|
||||
|
||||
saver = EncryptedFileSaver(stream_hash, peer_finder, rate_limiter, blob_manager,
|
||||
stream_info_manager, payment_rate_manager, wallet,
|
||||
download_directory, key,
|
||||
file_name_hex, file_name_hex)
|
||||
|
||||
yield saver._setup_output()
|
||||
self.assertTrue(os.path.isfile(file_name))
|
||||
saver._close_output()
|
||||
|
||||
|
||||
|
|
|
@ -103,7 +103,7 @@ class GetStreamTests(unittest.TestCase):
|
|||
DownloadTimeoutError is raised
|
||||
"""
|
||||
def download_sd_blob(self):
|
||||
raise DownloadSDTimeout(self.file_name)
|
||||
raise DownloadSDTimeout(self)
|
||||
|
||||
getstream = self.init_getstream_with_mocs()
|
||||
getstream._initialize = types.MethodType(moc_initialize, getstream)
|
||||
|
|
Loading…
Reference in a new issue