diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index d3a1febbc..83519ae66 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -1,11 +1,8 @@ import logging -import miniupnpc -from twisted.internet import threads, defer +from twisted.internet import defer from lbrynet.core.BlobManager import DiskBlobManager -from lbrynet.dht import node, hashannouncer from lbrynet.database.storage import SQLiteStorage from lbrynet.core.RateLimiter import RateLimiter -from lbrynet.core.utils import generate_id from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager log = logging.getLogger(__name__) @@ -32,11 +29,10 @@ class Session(object): peers can connect to this peer. """ - def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None, + def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, - peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, - blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None, - storage=None): + peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None, + dht_node=None, peer_manager=None): """@param blob_data_payment_rate: The default payment rate for blob data @param db_dir: The directory in which levelDB files should be stored @@ -78,10 +74,6 @@ class Session(object): @param peer_port: The port on which other peers should connect to this peer - @param use_upnp: Whether or not to try to open a hole in the - firewall so that outside peers can connect to this peer's - peer_port and dht_node_port - @param rate_limiter: An object which keeps track of the amount of data transferred to and from this peer, and can limit that rate if desired @@ -103,20 +95,14 @@ class Session(object): self.known_dht_nodes = [] self.blob_dir = blob_dir self.blob_manager = blob_manager - # self.blob_tracker = None - # self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker self.peer_port = peer_port - self.use_upnp = use_upnp self.rate_limiter = rate_limiter self.external_ip = external_ip self.upnp_redirects = [] self.wallet = wallet - self.dht_node_class = dht_node_class - self.dht_node = None + self.dht_node = dht_node self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.payment_rate_manager = OnlyFreePaymentsManager() - # self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager - # self.is_generous = is_generous self.storage = storage or SQLiteStorage(self.db_dir) def setup(self): @@ -124,15 +110,14 @@ class Session(object): log.debug("Starting session.") - if self.node_id is None: - self.node_id = generate_id() + if self.dht_node is not None: + if self.peer_manager is None: + self.peer_manager = self.dht_node.peer_manager - if self.use_upnp is True: - d = self._try_upnp() - else: - d = defer.succeed(True) - d.addCallback(lambda _: self.storage.setup()) - d.addCallback(lambda _: self._setup_dht()) + if self.peer_finder is None: + self.peer_finder = self.dht_node.peer_finder + + d = self.storage.setup() d.addCallback(lambda _: self._setup_other_components()) return d @@ -140,97 +125,12 @@ class Session(object): """Stop all services""" log.info('Stopping session.') ds = [] - if self.hash_announcer: - self.hash_announcer.stop() - # if self.blob_tracker is not None: - # ds.append(defer.maybeDeferred(self.blob_tracker.stop)) - if self.dht_node is not None: - ds.append(defer.maybeDeferred(self.dht_node.stop)) if self.rate_limiter is not None: ds.append(defer.maybeDeferred(self.rate_limiter.stop)) - if self.wallet is not None: - ds.append(defer.maybeDeferred(self.wallet.stop)) if self.blob_manager is not None: ds.append(defer.maybeDeferred(self.blob_manager.stop)) - if self.use_upnp is True: - ds.append(defer.maybeDeferred(self._unset_upnp)) return defer.DeferredList(ds) - def _try_upnp(self): - - log.debug("In _try_upnp") - - def get_free_port(upnp, port, protocol): - # returns an existing mapping if it exists - mapping = upnp.getspecificportmapping(port, protocol) - if not mapping: - return port - if upnp.lanaddr == mapping[0]: - return mapping[1] - return get_free_port(upnp, port + 1, protocol) - - def get_port_mapping(upnp, port, protocol, description): - # try to map to the requested port, if there is already a mapping use the next external - # port available - if protocol not in ['UDP', 'TCP']: - raise Exception("invalid protocol") - port = get_free_port(upnp, port, protocol) - if isinstance(port, tuple): - log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it", - self.external_ip, port, protocol, upnp.lanaddr, port) - return port - upnp.addportmapping(port, protocol, upnp.lanaddr, port, - description, '') - log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port, - protocol, upnp.lanaddr, port) - return port - - def threaded_try_upnp(): - if self.use_upnp is False: - log.debug("Not using upnp") - return False - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - external_ip = u.externalipaddress() - if external_ip != '0.0.0.0' and not self.external_ip: - # best not to rely on this external ip, the router can be behind layers of NATs - self.external_ip = external_ip - if self.peer_port: - self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port') - self.upnp_redirects.append((self.peer_port, 'TCP')) - if self.dht_node_port: - self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port') - self.upnp_redirects.append((self.dht_node_port, 'UDP')) - return True - return False - - def upnp_failed(err): - log.warning("UPnP failed. Reason: %s", err.getErrorMessage()) - return False - - d = threads.deferToThread(threaded_try_upnp) - d.addErrback(upnp_failed) - return d - - def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary - self.dht_node = self.dht_node_class( - node_id=self.node_id, - udpPort=self.dht_node_port, - externalIP=self.external_ip, - peerPort=self.peer_port, - peer_manager=self.peer_manager, - peer_finder=self.peer_finder, - ) - if not self.hash_announcer: - self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage) - self.peer_manager = self.dht_node.peer_manager - self.peer_finder = self.dht_node.peer_finder - d = self.dht_node.start(self.known_dht_nodes) - d.addCallback(lambda _: log.info("Joined the dht")) - d.addCallback(lambda _: self.hash_announcer.start()) - def _setup_other_components(self): log.debug("Setting up the rest of the components") @@ -244,39 +144,6 @@ class Session(object): else: self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore) - # if self.blob_tracker is None: - # self.blob_tracker = self.blob_tracker_class( - # self.blob_manager, self.dht_node.peer_finder, self.dht_node - # ) - # if self.payment_rate_manager is None: - # self.payment_rate_manager = self.payment_rate_manager_class( - # self.base_payment_rate_manager, self.blob_tracker, self.is_generous - # ) - self.rate_limiter.start() d = self.blob_manager.setup() - d.addCallback(lambda _: self.wallet.start()) - # d.addCallback(lambda _: self.blob_tracker.start()) - return d - - def _unset_upnp(self): - log.info("Unsetting upnp for session") - - def threaded_unset_upnp(): - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - for port, protocol in self.upnp_redirects: - if u.getspecificportmapping(port, protocol) is None: - log.warning( - "UPnP redirect for %s %d was removed by something else.", - protocol, port) - else: - u.deleteportmapping(port, protocol) - log.info("Removed UPnP redirect for %s %d.", protocol, port) - self.upnp_redirects = [] - - d = threads.deferToThread(threaded_unset_upnp) - d.addErrback(lambda err: str(err)) return d diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index b75037819..9f32b289c 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -8,11 +8,10 @@ import urllib import json import textwrap import signal -import six from copy import deepcopy from decimal import Decimal, InvalidOperation from twisted.web import server -from twisted.internet import defer, threads, error, reactor +from twisted.internet import defer, reactor from twisted.internet.task import LoopingCall from twisted.python.failure import Failure @@ -25,28 +24,20 @@ from lbryschema.decode import smart_decode # TODO: importing this when internet is disabled raises a socket.gaierror from lbrynet.core.system_info import get_lbrynet_version -from lbrynet.database.storage import SQLiteStorage from lbrynet import conf -from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET from lbrynet.reflector import reupload -from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.core.log_support import configure_loggly_handler -from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory -from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier -from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager +from lbrynet.daemon.Component import ComponentManager +from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT +from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT +from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT from lbrynet.daemon.Downloader import GetStream from lbrynet.daemon.Publisher import Publisher -from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager from lbrynet.daemon.auth.server import AuthJSONRPCServer from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core import utils, system_info -from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob -from lbrynet.core.StreamDescriptor import EncryptedFileStreamType -from lbrynet.core.Session import Session -from lbrynet.core.Wallet import LBRYumWallet +from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.looping_call_manager import LoopingCallManager -from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory -from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError @@ -58,23 +49,6 @@ from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloade log = logging.getLogger(__name__) INITIALIZING_CODE = 'initializing' -LOADING_DB_CODE = 'loading_db' -LOADING_WALLET_CODE = 'loading_wallet' -LOADING_FILE_MANAGER_CODE = 'loading_file_manager' -LOADING_SERVER_CODE = 'loading_server' -STARTED_CODE = 'started' -WAITING_FOR_FIRST_RUN_CREDITS = 'waiting_for_credits' -WAITING_FOR_UNLOCK = 'waiting_for_wallet_unlock' -STARTUP_STAGES = [ - (INITIALIZING_CODE, 'Initializing'), - (LOADING_DB_CODE, 'Loading databases'), - (LOADING_WALLET_CODE, 'Catching up with the blockchain'), - (LOADING_FILE_MANAGER_CODE, 'Setting up file manager'), - (LOADING_SERVER_CODE, 'Starting lbrynet'), - (STARTED_CODE, 'Started lbrynet'), - (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits'), - (WAITING_FOR_UNLOCK, 'Waiting for user to unlock the wallet using the wallet_unlock command') -] # TODO: make this consistent with the stages in Downloader.py DOWNLOAD_METADATA_CODE = 'downloading_metadata' @@ -178,39 +152,20 @@ class Daemon(AuthJSONRPCServer): LBRYnet daemon, a jsonrpc interface to lbry functions """ - allowed_during_startup = [ - 'daemon_stop', 'status', 'version', 'wallet_unlock' - ] - - def __init__(self, analytics_manager): + def __init__(self, analytics_manager, component_manager=None): AuthJSONRPCServer.__init__(self, conf.settings['use_auth_http']) - self.db_dir = conf.settings['data_dir'] self.download_directory = conf.settings['download_directory'] - if conf.settings['BLOBFILES_DIR'] == "blobfiles": - self.blobfile_dir = os.path.join(self.db_dir, "blobfiles") - else: - log.info("Using non-default blobfiles directory: %s", conf.settings['BLOBFILES_DIR']) - self.blobfile_dir = conf.settings['BLOBFILES_DIR'] self.data_rate = conf.settings['data_rate'] self.max_key_fee = conf.settings['max_key_fee'] self.disable_max_key_fee = conf.settings['disable_max_key_fee'] self.download_timeout = conf.settings['download_timeout'] - self.run_reflector_server = conf.settings['run_reflector_server'] - self.wallet_type = conf.settings['wallet'] self.delete_blobs_on_remove = conf.settings['delete_blobs_on_remove'] - self.peer_port = conf.settings['peer_port'] - self.reflector_port = conf.settings['reflector_port'] - self.dht_node_port = conf.settings['dht_node_port'] - self.use_upnp = conf.settings['use_upnp'] self.auto_renew_claim_height_delta = conf.settings['auto_renew_claim_height_delta'] - self.startup_status = STARTUP_STAGES[0] self.connected_to_internet = True self.connection_status_code = None self.platform = None - self.current_db_revision = 9 self.db_revision_file = conf.settings.get_db_revision_filename() - self.session = None self._session_id = conf.settings.get_session_id() # TODO: this should probably be passed into the daemon, or # possibly have the entire log upload functionality taken out @@ -219,20 +174,28 @@ class Daemon(AuthJSONRPCServer): self.analytics_manager = analytics_manager self.node_id = conf.settings.node_id + # components + self.storage = None + self.dht_node = None + self.wallet = None + self.sd_identifier = None + self.session = None + self.file_manager = None + self.exchange_rate_manager = None + self.wallet_user = None self.wallet_password = None - self.query_handlers = {} self.waiting_on = {} self.streams = {} - self.exchange_rate_manager = ExchangeRateManager() calls = { Checker.INTERNET_CONNECTION: LoopingCall(CheckInternetConnection(self)), Checker.CONNECTION_STATUS: LoopingCall(self._update_connection_status), } self.looping_call_manager = LoopingCallManager(calls) - self.sd_identifier = StreamDescriptorIdentifier() - self.lbry_file_manager = None - self.storage = None + self.component_manager = component_manager or ComponentManager( + analytics_manager=self.analytics_manager, + skip_components=conf.settings['components_to_skip'] + ) @defer.inlineCallbacks def setup(self): @@ -243,34 +206,21 @@ class Daemon(AuthJSONRPCServer): self.looping_call_manager.start(Checker.INTERNET_CONNECTION, 3600) self.looping_call_manager.start(Checker.CONNECTION_STATUS, 30) - self.exchange_rate_manager.start() yield self._initial_setup() yield self.component_manager.setup() - self.storage = self.component_manager.get_component("database").storage - yield self._get_session() - yield self._check_wallet_locked() + self.exchange_rate_manager = self.component_manager.get_component(EXCHANGE_RATE_MANAGER_COMPONENT) + self.storage = self.component_manager.get_component(DATABASE_COMPONENT) + self.session = self.component_manager.get_component(SESSION_COMPONENT) + self.wallet = self.component_manager.get_component(WALLET_COMPONENT) + self.dht_node = self.component_manager.get_component(DHT_COMPONENT) yield self._start_analytics() - yield add_lbry_file_to_sd_identifier(self.sd_identifier) - yield self._setup_stream_identifier() - yield self._setup_lbry_file_manager() - yield self._setup_query_handlers() - yield self._setup_server() - log.info("Starting balance: " + str(self.session.wallet.get_balance())) + self.sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT) + self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) + log.info("Starting balance: " + str(self.wallet.get_balance())) self.announced_startup = True - self.startup_status = STARTUP_STAGES[5] log.info("Started lbrynet-daemon") - # ### - # # this should be removed with the next db revision - # if migrated: - # missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids() - # while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe - # batch = missing_channel_claim_ids[:100] - # _ = yield self.session.wallet.get_claims_by_ids(*batch) - # missing_channel_claim_ids = missing_channel_claim_ids[100:] - # ### - self._auto_renew() def _get_platform(self): @@ -301,12 +251,12 @@ class Daemon(AuthJSONRPCServer): # auto renew is turned off if 0 or some negative number if self.auto_renew_claim_height_delta < 1: defer.returnValue(None) - if not self.session.wallet.network.get_remote_height(): + if not self.wallet.network.get_remote_height(): log.warning("Failed to get remote height, aborting auto renew") defer.returnValue(None) log.debug("Renewing claim") - h = self.session.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta - results = yield self.session.wallet.claim_renew_all_before_expiration(h) + h = self.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta + results = yield self.wallet.claim_renew_all_before_expiration(h) for outpoint, result in results.iteritems(): if result['success']: log.info("Renewed claim at outpoint:%s claim ID:%s, paid fee:%s", @@ -315,93 +265,6 @@ class Daemon(AuthJSONRPCServer): log.info("Failed to renew claim at outpoint:%s, reason:%s", outpoint, result['reason']) - def _start_server(self): - if self.peer_port is not None: - server_factory = ServerProtocolFactory(self.session.rate_limiter, - self.query_handlers, - self.session.peer_manager) - - try: - log.info("Peer protocol listening on TCP %d", self.peer_port) - self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory) - except error.CannotListenError as e: - import traceback - log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" - " more details.", self.peer_port) - log.error("%s", traceback.format_exc()) - raise ValueError("%s lbrynet may already be running on your computer." % str(e)) - return defer.succeed(True) - - def _start_reflector(self): - if self.run_reflector_server: - log.info("Starting reflector server") - if self.reflector_port is not None: - reflector_factory = reflector_server_factory( - self.session.peer_manager, - self.session.blob_manager, - self.lbry_file_manager - ) - try: - self.reflector_server_port = reactor.listenTCP(self.reflector_port, - reflector_factory) - log.info('Started reflector on port %s', self.reflector_port) - except error.CannotListenError as e: - log.exception("Couldn't bind reflector to port %d", self.reflector_port) - raise ValueError( - "{} lbrynet may already be running on your computer.".format(e)) - return defer.succeed(True) - - def _stop_reflector(self): - if self.run_reflector_server: - log.info("Stopping reflector server") - try: - if self.reflector_server_port is not None: - self.reflector_server_port, p = None, self.reflector_server_port - return defer.maybeDeferred(p.stopListening) - except AttributeError: - return defer.succeed(True) - return defer.succeed(True) - - def _stop_file_manager(self): - if self.lbry_file_manager: - self.lbry_file_manager.stop() - return defer.succeed(True) - - def _stop_server(self): - try: - if self.lbry_server_port is not None: - self.lbry_server_port, old_port = None, self.lbry_server_port - log.info('Stop listening on port %s', old_port.port) - return defer.maybeDeferred(old_port.stopListening) - else: - return defer.succeed(True) - except AttributeError: - return defer.succeed(True) - - def _setup_server(self): - self.startup_status = STARTUP_STAGES[4] - d = self._start_server() - d.addCallback(lambda _: self._start_reflector()) - return d - - def _setup_query_handlers(self): - handlers = [ - BlobRequestHandlerFactory( - self.session.blob_manager, - self.session.wallet, - self.session.payment_rate_manager, - self.analytics_manager - ), - self.session.wallet.get_wallet_info_query_handler_factory(), - ] - return self._add_query_handlers(handlers) - - def _add_query_handlers(self, query_handlers): - for handler in query_handlers: - query_id = handler.get_primary_query_identifier() - self.query_handlers[query_id] = handler - return defer.succeed(None) - @staticmethod def _already_shutting_down(sig_num, frame): log.info("Already shutting down") @@ -417,21 +280,14 @@ class Daemon(AuthJSONRPCServer): signal.signal(signal.SIGTERM, self._already_shutting_down) log.info("Closing lbrynet session") - log.info("Status at time of shutdown: " + self.startup_status[0]) self._stop_streams() self.looping_call_manager.shutdown() if self.analytics_manager: self.analytics_manager.shutdown() - d = self._stop_server() - d.addErrback(log.fail(), 'Failure while shutting down') - d.addCallback(lambda _: self._stop_reflector()) - d.addErrback(log.fail(), 'Failure while shutting down') - d.addCallback(lambda _: self._stop_file_manager()) - d.addErrback(log.fail(), 'Failure while shutting down') - if self.session is not None: - d.addCallback(lambda _: self.session.shut_down()) + if self.component_manager is not None: + d = self.component_manager.stop() d.addErrback(log.fail(), 'Failure while shutting down') return d @@ -476,88 +332,10 @@ class Daemon(AuthJSONRPCServer): return defer.succeed(True) - @defer.inlineCallbacks - def _setup_lbry_file_manager(self): - log.info('Starting the file manager') - self.startup_status = STARTUP_STAGES[3] - self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) - yield self.lbry_file_manager.setup() - log.info('Done setting up file manager') - def _start_analytics(self): if not self.analytics_manager.is_started: self.analytics_manager.start() - def _get_session(self): - def get_wallet(): - if self.wallet_type == LBRYCRD_WALLET: - raise ValueError('LBRYcrd Wallet is no longer supported') - elif self.wallet_type == LBRYUM_WALLET: - - log.info("Using lbryum wallet") - - lbryum_servers = {address: {'t': str(port)} - for address, port in conf.settings['lbryum_servers']} - - config = { - 'auto_connect': True, - 'chain': conf.settings['blockchain_name'], - 'default_servers': lbryum_servers - } - - if 'use_keyring' in conf.settings: - config['use_keyring'] = conf.settings['use_keyring'] - if conf.settings['lbryum_wallet_dir']: - config['lbryum_path'] = conf.settings['lbryum_wallet_dir'] - wallet = LBRYumWallet(self.storage, config) - return defer.succeed(wallet) - else: - raise ValueError('Wallet Type {} is not valid'.format(self.wallet_type)) - - d = get_wallet() - - def create_session(wallet): - self.session = Session( - conf.settings['data_rate'], - db_dir=self.db_dir, - node_id=self.node_id, - blob_dir=self.blobfile_dir, - dht_node_port=self.dht_node_port, - known_dht_nodes=conf.settings['known_dht_nodes'], - peer_port=self.peer_port, - use_upnp=self.use_upnp, - wallet=wallet, - is_generous=conf.settings['is_generous_host'], - external_ip=self.platform['ip'], - storage=self.storage - ) - self.startup_status = STARTUP_STAGES[2] - - d.addCallback(create_session) - d.addCallback(lambda _: self.session.setup()) - return d - - @defer.inlineCallbacks - def _check_wallet_locked(self): - wallet = self.session.wallet - if wallet.wallet.use_encryption: - self.startup_status = STARTUP_STAGES[7] - - yield wallet.check_locked() - - def _setup_stream_identifier(self): - file_saver_factory = EncryptedFileSaverFactory( - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.session.storage, - self.session.wallet, - self.download_directory - ) - self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, - file_saver_factory) - return defer.succeed(None) - def _download_blob(self, blob_hash, rate_manager=None, timeout=None): """ Download a blob @@ -575,7 +353,7 @@ class Daemon(AuthJSONRPCServer): timeout = timeout or 30 downloader = StandaloneBlobDownloader( blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter, - rate_manager, self.session.wallet, timeout + rate_manager, self.wallet, timeout ) return downloader.download() @@ -583,7 +361,7 @@ class Daemon(AuthJSONRPCServer): def _get_stream_analytics_report(self, claim_dict): sd_hash = claim_dict.source_hash try: - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) except Exception: stream_hash = None report = { @@ -597,7 +375,7 @@ class Daemon(AuthJSONRPCServer): sd_host = None report["sd_blob"] = sd_host if stream_hash: - blob_infos = yield self.session.storage.get_blobs_for_stream(stream_hash) + blob_infos = yield self.storage.get_blobs_for_stream(stream_hash) report["known_blobs"] = len(blob_infos) else: blob_infos = [] @@ -668,11 +446,12 @@ class Daemon(AuthJSONRPCServer): def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None, claim_address=None, change_address=None): - publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet, + publisher = Publisher(self.session, self.file_manager, self.wallet, certificate_id) parse_lbry_uri(name) if not file_path: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(claim_dict['stream']['source']['source']) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash( + claim_dict['stream']['source']['source']) claim_out = yield publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address, change_address) else: @@ -697,7 +476,7 @@ class Daemon(AuthJSONRPCServer): """ parsed = parse_lbry_uri(name) - resolution = yield self.session.wallet.resolve(parsed.name, check_cache=not force_refresh) + resolution = yield self.wallet.resolve(parsed.name, check_cache=not force_refresh) if parsed.name in resolution: result = resolution[parsed.name] defer.returnValue(result) @@ -752,7 +531,7 @@ class Daemon(AuthJSONRPCServer): cost = self._get_est_cost_from_stream_size(size) - resolved = yield self.session.wallet.resolve(uri) + resolved = yield self.wallet.resolve(uri) if uri in resolved and 'claim' in resolved[uri]: claim = ClaimDict.load_dict(resolved[uri]['claim']['value']) @@ -799,7 +578,7 @@ class Daemon(AuthJSONRPCServer): Resolve a name and return the estimated stream cost """ - resolved = yield self.session.wallet.resolve(uri) + resolved = yield self.wallet.resolve(uri) if resolved: claim_response = resolved[uri] else: @@ -879,7 +658,7 @@ class Daemon(AuthJSONRPCServer): def _get_lbry_file(self, search_by, val, return_json=False, full_status=False): lbry_file = None if search_by in FileID: - for l_f in self.lbry_file_manager.lbry_files: + for l_f in self.file_manager.lbry_files: if l_f.__dict__.get(search_by) == val: lbry_file = l_f break @@ -891,7 +670,7 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def _get_lbry_files(self, return_json=False, full_status=True, **kwargs): - lbry_files = list(self.lbry_file_manager.lbry_files) + lbry_files = list(self.file_manager.lbry_files) if kwargs: for search_type, value in iter_lbry_file_search_values(kwargs): lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value] @@ -928,7 +707,7 @@ class Daemon(AuthJSONRPCServer): def _get_single_peer_downloader(self): downloader = SinglePeerDownloader() - downloader.setup(self.session.wallet) + downloader.setup(self.wallet) return downloader @defer.inlineCallbacks @@ -1060,7 +839,7 @@ class Daemon(AuthJSONRPCServer): should_announce_blobs = yield self.session.blob_manager.count_should_announce_blobs() response['session_status'] = { 'managed_blobs': len(blobs), - 'managed_streams': len(self.lbry_file_manager.lbry_files), + 'managed_streams': len(self.file_manager.lbry_files), 'announce_queue_size': announce_queue_size, 'should_announce_blobs': should_announce_blobs, } @@ -1255,10 +1034,10 @@ class Daemon(AuthJSONRPCServer): (float) amount of lbry credits in wallet """ if address is None: - return self._render_response(float(self.session.wallet.get_balance())) + return self._render_response(float(self.wallet.get_balance())) else: return self._render_response(float( - self.session.wallet.get_address_balance(address, include_unconfirmed))) + self.wallet.get_address_balance(address, include_unconfirmed))) @defer.inlineCallbacks def jsonrpc_wallet_unlock(self, password): @@ -1275,9 +1054,10 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is unlocked, otherwise false """ - cmd_runner = self.session.wallet.get_cmd_runner() - if cmd_runner.locked: - d = self.session.wallet.wallet_unlocked_d + # the check_locked() in the if statement is needed because that is what sets + # the wallet_unlocked_d deferred ¯\_(ツ)_/¯ + if not self.wallet.check_locked(): + d = self.wallet.wallet_unlocked_d d.callback(password) result = yield d else: @@ -1300,7 +1080,7 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is decrypted, otherwise false """ - result = self.session.wallet.decrypt_wallet() + result = self.wallet.decrypt_wallet() response = yield self._render_response(result) defer.returnValue(response) @@ -1320,8 +1100,8 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is decrypted, otherwise false """ - self.session.wallet.encrypt_wallet(new_password) - response = yield self._render_response(self.session.wallet.wallet.use_encryption) + self.wallet.encrypt_wallet(new_password) + response = yield self._render_response(self.wallet.wallet.use_encryption) defer.returnValue(response) @defer.inlineCallbacks @@ -1477,9 +1257,9 @@ class Daemon(AuthJSONRPCServer): """ if claim_id is not None and txid is None and nout is None: - claim_results = yield self.session.wallet.get_claim_by_claim_id(claim_id) + claim_results = yield self.wallet.get_claim_by_claim_id(claim_id) elif txid is not None and nout is not None and claim_id is None: - claim_results = yield self.session.wallet.get_claim_by_outpoint(txid, int(nout)) + claim_results = yield self.wallet.get_claim_by_outpoint(txid, int(nout)) else: raise Exception("Must specify either txid/nout, or claim_id") response = yield self._render_response(claim_results) @@ -1568,7 +1348,7 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[u] = {"error": "%s is not a valid uri" % u} - resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=not force) + resolved = yield self.wallet.resolve(*valid_uris, check_cache=not force) for resolved_uri in resolved: results[resolved_uri] = resolved[resolved_uri] @@ -1626,7 +1406,7 @@ class Daemon(AuthJSONRPCServer): if parsed_uri.is_channel and not parsed_uri.path: raise Exception("cannot download a channel claim, specify a /path") - resolved_result = yield self.session.wallet.resolve(uri) + resolved_result = yield self.wallet.resolve(uri) if resolved_result and uri in resolved_result: resolved = resolved_result[uri] else: @@ -1693,7 +1473,7 @@ class Daemon(AuthJSONRPCServer): raise Exception('Unable to find a file for {}:{}'.format(search_type, value)) if status == 'start' and lbry_file.stopped or status == 'stop' and not lbry_file.stopped: - yield self.lbry_file_manager.toggle_lbry_file_running(lbry_file) + yield self.file_manager.toggle_lbry_file_running(lbry_file) msg = "Started downloading file" if status == 'start' else "Stopped downloading file" else: msg = ( @@ -1755,8 +1535,8 @@ class Daemon(AuthJSONRPCServer): file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash if lbry_file.sd_hash in self.streams: del self.streams[lbry_file.sd_hash] - yield self.lbry_file_manager.delete_lbry_file(lbry_file, - delete_file=delete_from_download_dir) + yield self.file_manager.delete_lbry_file(lbry_file, + delete_file=delete_from_download_dir) log.info("Deleted file: %s", file_name) result = True @@ -1818,14 +1598,14 @@ class Daemon(AuthJSONRPCServer): if amount <= 0: raise Exception("Invalid amount") - yield self.session.wallet.update_balance() - if amount >= self.session.wallet.get_balance(): - balance = yield self.session.wallet.get_max_usable_balance_for_claim(channel_name) + yield self.wallet.update_balance() + if amount >= self.wallet.get_balance(): + balance = yield self.wallet.get_max_usable_balance_for_claim(channel_name) max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE if balance <= MAX_UPDATE_FEE_ESTIMATE: raise InsufficientFundsError( "Insufficient funds, please deposit additional LBC. Minimum additional LBC needed {}" - . format(MAX_UPDATE_FEE_ESTIMATE - balance)) + .format(MAX_UPDATE_FEE_ESTIMATE - balance)) elif amount > max_bid_amount: raise InsufficientFundsError( "Please wait for any pending bids to resolve or lower the bid value. " @@ -1833,7 +1613,7 @@ class Daemon(AuthJSONRPCServer): .format(max_bid_amount) ) - result = yield self.session.wallet.claim_new_channel(channel_name, amount) + result = yield self.wallet.claim_new_channel(channel_name, amount) self.analytics_manager.send_new_channel() log.info("Claimed a new channel! Result: %s", result) response = yield self._render_response(result) @@ -1855,7 +1635,7 @@ class Daemon(AuthJSONRPCServer): is in the wallet. """ - result = yield self.session.wallet.channel_list() + result = yield self.wallet.channel_list() response = yield self._render_response(result) defer.returnValue(response) @@ -1891,7 +1671,7 @@ class Daemon(AuthJSONRPCServer): (str) Serialized certificate information """ - result = yield self.session.wallet.export_certificate_info(claim_id) + result = yield self.wallet.export_certificate_info(claim_id) defer.returnValue(result) @defer.inlineCallbacks @@ -1909,7 +1689,7 @@ class Daemon(AuthJSONRPCServer): (dict) Result dictionary """ - result = yield self.session.wallet.import_certificate_info(serialized_certificate_info) + result = yield self.wallet.import_certificate_info(serialized_certificate_info) defer.returnValue(result) @defer.inlineCallbacks @@ -2003,9 +1783,9 @@ class Daemon(AuthJSONRPCServer): if bid <= 0.0: raise ValueError("Bid value must be greater than 0.0") - yield self.session.wallet.update_balance() - if bid >= self.session.wallet.get_balance(): - balance = yield self.session.wallet.get_max_usable_balance_for_claim(name) + yield self.wallet.update_balance() + if bid >= self.wallet.get_balance(): + balance = yield self.wallet.get_max_usable_balance_for_claim(name) max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE if balance <= MAX_UPDATE_FEE_ESTIMATE: raise InsufficientFundsError( @@ -2052,7 +1832,7 @@ class Daemon(AuthJSONRPCServer): log.warning("Stripping empty fee from published metadata") del metadata['fee'] elif 'address' not in metadata['fee']: - address = yield self.session.wallet.get_least_used_address() + address = yield self.wallet.get_least_used_address() metadata['fee']['address'] = address if 'fee' in metadata and 'version' not in metadata['fee']: metadata['fee']['version'] = '_0_0_1' @@ -2108,7 +1888,7 @@ class Daemon(AuthJSONRPCServer): certificate_id = channel_id elif channel_name: certificate_id = None - my_certificates = yield self.session.wallet.channel_list() + my_certificates = yield self.wallet.channel_list() for certificate in my_certificates: if channel_name == certificate['name']: certificate_id = certificate['claim_id'] @@ -2151,7 +1931,7 @@ class Daemon(AuthJSONRPCServer): if nout is None and txid is not None: raise Exception('Must specify nout') - result = yield self.session.wallet.abandon_claim(claim_id, txid, nout) + result = yield self.wallet.abandon_claim(claim_id, txid, nout) self.analytics_manager.send_claim_action('abandon') defer.returnValue(result) @@ -2178,7 +1958,7 @@ class Daemon(AuthJSONRPCServer): } """ - result = yield self.session.wallet.support_claim(name, claim_id, amount) + result = yield self.wallet.support_claim(name, claim_id, amount) self.analytics_manager.send_claim_action('new_support') defer.returnValue(result) @@ -2217,11 +1997,11 @@ class Daemon(AuthJSONRPCServer): nout = int(nout) else: raise Exception("invalid outpoint") - result = yield self.session.wallet.claim_renew(txid, nout) + result = yield self.wallet.claim_renew(txid, nout) result = {outpoint: result} else: height = int(height) - result = yield self.session.wallet.claim_renew_all_before_expiration(height) + result = yield self.wallet.claim_renew_all_before_expiration(height) defer.returnValue(result) @defer.inlineCallbacks @@ -2251,7 +2031,7 @@ class Daemon(AuthJSONRPCServer): } """ - result = yield self.session.wallet.send_claim_to_address(claim_id, address, amount) + result = yield self.wallet.send_claim_to_address(claim_id, address, amount) response = yield self._render_response(result) defer.returnValue(response) @@ -2289,7 +2069,7 @@ class Daemon(AuthJSONRPCServer): ] """ - d = self.session.wallet.get_name_claims() + d = self.wallet.get_name_claims() d.addCallback(lambda claims: self._render_response(claims)) return d @@ -2327,7 +2107,7 @@ class Daemon(AuthJSONRPCServer): } """ - claims = yield self.session.wallet.get_claims_for_name(name) # type: dict + claims = yield self.wallet.get_claims_for_name(name) # type: dict sort_claim_results(claims['claims']) defer.returnValue(claims) @@ -2404,8 +2184,8 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri} - resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=False, page=page, - page_size=page_size) + resolved = yield self.wallet.resolve(*valid_uris, check_cache=False, page=page, + page_size=page_size) for u in resolved: if 'error' in resolved[u]: results[u] = resolved[u] @@ -2477,7 +2257,7 @@ class Daemon(AuthJSONRPCServer): """ - d = self.session.wallet.get_history() + d = self.wallet.get_history() d.addCallback(lambda r: self._render_response(r)) return d @@ -2495,7 +2275,7 @@ class Daemon(AuthJSONRPCServer): (dict) JSON formatted transaction """ - d = self.session.wallet.get_transaction(txid) + d = self.wallet.get_transaction(txid) d.addCallback(lambda r: self._render_response(r)) return d @@ -2513,7 +2293,7 @@ class Daemon(AuthJSONRPCServer): (bool) true, if address is associated with current wallet """ - d = self.session.wallet.address_is_mine(address) + d = self.wallet.address_is_mine(address) d.addCallback(lambda is_mine: self._render_response(is_mine)) return d @@ -2532,7 +2312,7 @@ class Daemon(AuthJSONRPCServer): Could contain more than one public key if multisig. """ - d = self.session.wallet.get_pub_keys(address) + d = self.wallet.get_pub_keys(address) d.addCallback(lambda r: self._render_response(r)) return d @@ -2551,7 +2331,7 @@ class Daemon(AuthJSONRPCServer): List of wallet addresses """ - addresses = yield self.session.wallet.list_addresses() + addresses = yield self.wallet.list_addresses() response = yield self._render_response(addresses) defer.returnValue(response) @@ -2573,7 +2353,7 @@ class Daemon(AuthJSONRPCServer): log.info("Got new wallet address: " + address) return defer.succeed(address) - d = self.session.wallet.get_new_address() + d = self.wallet.get_new_address() d.addCallback(_disp) d.addCallback(lambda address: self._render_response(address)) return d @@ -2597,7 +2377,7 @@ class Daemon(AuthJSONRPCServer): log.info("Got unused wallet address: " + address) return defer.succeed(address) - d = self.session.wallet.get_unused_address() + d = self.wallet.get_unused_address() d.addCallback(_disp) d.addCallback(lambda address: self._render_response(address)) return d @@ -2624,10 +2404,10 @@ class Daemon(AuthJSONRPCServer): elif not amount: raise NullFundsError() - reserved_points = self.session.wallet.reserve_points(address, amount) + reserved_points = self.wallet.reserve_points(address, amount) if reserved_points is None: raise InsufficientFundsError() - yield self.session.wallet.send_points_to_address(reserved_points, amount) + yield self.wallet.send_points_to_address(reserved_points, amount) self.analytics_manager.send_credits_sent() defer.returnValue(True) @@ -2675,7 +2455,7 @@ class Daemon(AuthJSONRPCServer): result = yield self.jsonrpc_send_amount_to_address(amount, address) else: validate_claim_id(claim_id) - result = yield self.session.wallet.tip_claim(claim_id, amount) + result = yield self.wallet.tip_claim(claim_id, amount) self.analytics_manager.send_claim_action('new_support') defer.returnValue(result) @@ -2704,7 +2484,7 @@ class Daemon(AuthJSONRPCServer): raise NullFundsError() broadcast = not no_broadcast - tx = yield self.session.wallet.create_addresses_with_balance( + tx = yield self.wallet.create_addresses_with_balance( num_addresses, amount, broadcast=broadcast) tx['broadcast'] = broadcast defer.returnValue(tx) @@ -2738,7 +2518,7 @@ class Daemon(AuthJSONRPCServer): ] """ - unspent = yield self.session.wallet.list_unspent() + unspent = yield self.wallet.list_unspent() for i, utxo in enumerate(unspent): utxo['txid'] = utxo.pop('prevout_hash') utxo['nout'] = utxo.pop('prevout_n') @@ -2764,10 +2544,10 @@ class Daemon(AuthJSONRPCServer): """ if blockhash is not None: - d = self.session.wallet.get_block(blockhash) + d = self.wallet.get_block(blockhash) elif height is not None: - d = self.session.wallet.get_block_info(height) - d.addCallback(lambda b: self.session.wallet.get_block(b)) + d = self.wallet.get_block_info(height) + d.addCallback(lambda b: self.wallet.get_block(b)) else: # TODO: return a useful error message return server.failure @@ -2837,8 +2617,8 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response("Don't have that blob") defer.returnValue(response) try: - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(blob_hash) - yield self.session.storage.delete_stream(stream_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(blob_hash) + yield self.storage.delete_stream(stream_hash) except Exception as err: pass yield self.session.blob_manager.delete_blobs([blob_hash]) @@ -2864,7 +2644,7 @@ class Daemon(AuthJSONRPCServer): if not utils.is_valid_blobhash(blob_hash): raise Exception("invalid blob hash") - finished_deferred = self.session.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash)) + finished_deferred = self.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash)) def trap_timeout(err): err.trap(defer.TimeoutError) @@ -2983,14 +2763,14 @@ class Daemon(AuthJSONRPCServer): if uri: metadata = yield self._resolve_name(uri) sd_hash = utils.get_sd_hash(metadata) - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) elif stream_hash: - sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) + sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) elif sd_hash: - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) - sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) if stream_hash: - crypt_blobs = yield self.session.storage.get_blobs_for_stream(stream_hash) + crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash) blobs = yield defer.gatherResults([ self.session.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None @@ -3071,7 +2851,7 @@ class Daemon(AuthJSONRPCServer): contact = None try: - contact = yield self.session.dht_node.findContact(node_id.decode('hex')) + contact = yield self.dht_node.findContact(node_id.decode('hex')) except TimeoutError: result = {'error': 'timeout finding peer'} defer.returnValue(result) @@ -3113,7 +2893,7 @@ class Daemon(AuthJSONRPCServer): """ result = {} - data_store = self.session.dht_node._dataStore._dict + data_store = self.dht_node._dataStore._dict datastore_len = len(data_store) hosts = {} @@ -3131,8 +2911,8 @@ class Daemon(AuthJSONRPCServer): blob_hashes = [] result['buckets'] = {} - for i in range(len(self.session.dht_node._routingTable._buckets)): - for contact in self.session.dht_node._routingTable._buckets[i]._contacts: + for i in range(len(self.dht_node._routingTable._buckets)): + for contact in self.dht_node._routingTable._buckets[i]._contacts: contacts = result['buckets'].get(i, []) if contact in hosts: blobs = hosts[contact] @@ -3155,7 +2935,7 @@ class Daemon(AuthJSONRPCServer): result['contacts'] = contact_set result['blob_hashes'] = blob_hashes - result['node_id'] = self.session.dht_node.node_id.encode('hex') + result['node_id'] = self.dht_node.node_id.encode('hex') return self._render_response(result) def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None): @@ -3254,7 +3034,7 @@ class Daemon(AuthJSONRPCServer): } try: - resolved_result = yield self.session.wallet.resolve(uri) + resolved_result = yield self.wallet.resolve(uri) response['did_resolve'] = True except UnknownNameError: response['error'] = "Failed to resolve name"