From 6952c2c07e099a13afa6458baa47fa2ea202340a Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Thu, 5 Jul 2018 19:05:48 -0400 Subject: [PATCH] Revert "refactor lbrynet-daemon into modular components" (#1286) * Revert "fix reflector test" This reverts commit 6a15b51ac3eb9a028d5a4b14b36d890f3574694f. * Revert "refactor lbrynet-daemon into modular components (#1164)" This reverts commit 75a6ff269ed5805591edc50008b9500192afbd87. --- CHANGELOG.md | 11 +- lbrynet/conf.py | 5 +- lbrynet/core/Error.py | 10 - lbrynet/core/Session.py | 140 +++- lbrynet/core/Wallet.py | 19 +- lbrynet/daemon/Component.py | 63 -- lbrynet/daemon/ComponentManager.py | 139 ---- lbrynet/daemon/Components.py | 523 -------------- lbrynet/daemon/Daemon.py | 647 ++++++++++++------ lbrynet/daemon/DaemonCLI.py | 36 +- lbrynet/daemon/__init__.py | 2 +- lbrynet/daemon/auth/server.py | 27 +- lbrynet/tests/functional/test_misc.py | 39 +- lbrynet/tests/functional/test_reflector.py | 2 - lbrynet/tests/functional/test_streamify.py | 44 +- lbrynet/tests/mocks.py | 94 --- lbrynet/tests/unit/components/__init__.py | 0 .../unit/components/test_Component_Manager.py | 131 ---- .../tests/unit/lbrynet_daemon/test_Daemon.py | 48 +- 19 files changed, 658 insertions(+), 1322 deletions(-) delete mode 100644 lbrynet/daemon/Component.py delete mode 100644 lbrynet/daemon/ComponentManager.py delete mode 100644 lbrynet/daemon/Components.py delete mode 100644 lbrynet/tests/unit/components/__init__.py delete mode 100644 lbrynet/tests/unit/components/test_Component_Manager.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9bc9bb07e..9a12b9b73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,19 +23,14 @@ at anytime. ### Changed * * - * `publish` to accept bid as a decimal string - * all JSONRPC API commands are now registered asynchronously and are available to be called as soon as they are ready ### Added - * Component Manager for managing the dependencies, startup and stopping of components - * `requires` decorator to register the components required by a `jsonrpc_` command, to facilitate commands registering asynchronously - * added unittests for Component Manager + * * ### Removed - * `STARTUP_STAGES` from `status` API and CLI call, it instead returns a dictionary of components along with their running status(this is a **potentially breaking change** if `STARTUP_STAGES` is relied upon) - * all component startup code from `Daemon.py` - * wallet, upnp and dht startup code from `session.py`, the code now resides in `Components.py` + * + * ## [0.20.3] - 2018-07-03 diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 6e3bc88f8..14fa45b53 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -168,11 +168,9 @@ def server_port(server_and_port): def server_list(servers): return [server_port(server) for server in servers] - def server_list_reverse(servers): return ["%s:%s" % (server, port) for server, port in servers] - class Env(envparse.Env): """An Env parser that automatically namespaces the variables with LBRY""" @@ -301,8 +299,7 @@ ADJUSTABLE_SETTINGS = { 'blockchain_name': (str, 'lbrycrd_main'), 'lbryum_servers': (list, [('lbryumx1.lbry.io', 50001), ('lbryumx2.lbry.io', 50001)], server_list, server_list_reverse), - 's3_headers_depth': (int, 96 * 10), # download headers from s3 when the local height is more than 10 chunks behind - 'components_to_skip': (list, []) # components which will be skipped during start-up of daemon + 's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind } diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 68a6df78e..729ceab76 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -155,23 +155,13 @@ class InvalidAuthenticationToken(Exception): class NegotiationError(Exception): pass - class InvalidCurrencyError(Exception): def __init__(self, currency): self.currency = currency Exception.__init__( self, 'Invalid currency: {} is not a supported currency.'.format(currency)) - class NoSuchDirectoryError(Exception): def __init__(self, directory): self.directory = directory Exception.__init__(self, 'No such directory {}'.format(directory)) - - -class ComponentStartConditionNotMet(Exception): - pass - - -class ComponentsNotStarted(Exception): - pass diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 2f5b4b39c..d3a1febbc 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -1,8 +1,11 @@ import logging -from twisted.internet import defer +import miniupnpc +from twisted.internet import threads, defer from lbrynet.core.BlobManager import DiskBlobManager +from lbrynet.dht import node, hashannouncer from lbrynet.database.storage import SQLiteStorage from lbrynet.core.RateLimiter import RateLimiter +from lbrynet.core.utils import generate_id from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager log = logging.getLogger(__name__) @@ -29,11 +32,11 @@ class Session(object): peers can connect to this peer. """ - def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None, + def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, - peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, blob_tracker_class=None, - payment_rate_manager_class=None, is_generous=True, external_ip=None, storage=None, - dht_node=None, peer_manager=None): + peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, + blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None, + storage=None): """@param blob_data_payment_rate: The default payment rate for blob data @param db_dir: The directory in which levelDB files should be stored @@ -108,7 +111,8 @@ class Session(object): self.external_ip = external_ip self.upnp_redirects = [] self.wallet = wallet - self.dht_node = dht_node + self.dht_node_class = dht_node_class + self.dht_node = None self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.payment_rate_manager = OnlyFreePaymentsManager() # self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager @@ -120,14 +124,15 @@ class Session(object): log.debug("Starting session.") - if self.dht_node is not None: - if self.peer_manager is None: - self.peer_manager = self.dht_node.peer_manager + if self.node_id is None: + self.node_id = generate_id() - if self.peer_finder is None: - self.peer_finder = self.dht_node.peer_finder - - d = self.storage.setup() + if self.use_upnp is True: + d = self._try_upnp() + else: + d = defer.succeed(True) + d.addCallback(lambda _: self.storage.setup()) + d.addCallback(lambda _: self._setup_dht()) d.addCallback(lambda _: self._setup_other_components()) return d @@ -135,16 +140,97 @@ class Session(object): """Stop all services""" log.info('Stopping session.') ds = [] + if self.hash_announcer: + self.hash_announcer.stop() # if self.blob_tracker is not None: # ds.append(defer.maybeDeferred(self.blob_tracker.stop)) + if self.dht_node is not None: + ds.append(defer.maybeDeferred(self.dht_node.stop)) if self.rate_limiter is not None: ds.append(defer.maybeDeferred(self.rate_limiter.stop)) + if self.wallet is not None: + ds.append(defer.maybeDeferred(self.wallet.stop)) if self.blob_manager is not None: ds.append(defer.maybeDeferred(self.blob_manager.stop)) - # if self.use_upnp is True: - # ds.append(defer.maybeDeferred(self._unset_upnp)) + if self.use_upnp is True: + ds.append(defer.maybeDeferred(self._unset_upnp)) return defer.DeferredList(ds) + def _try_upnp(self): + + log.debug("In _try_upnp") + + def get_free_port(upnp, port, protocol): + # returns an existing mapping if it exists + mapping = upnp.getspecificportmapping(port, protocol) + if not mapping: + return port + if upnp.lanaddr == mapping[0]: + return mapping[1] + return get_free_port(upnp, port + 1, protocol) + + def get_port_mapping(upnp, port, protocol, description): + # try to map to the requested port, if there is already a mapping use the next external + # port available + if protocol not in ['UDP', 'TCP']: + raise Exception("invalid protocol") + port = get_free_port(upnp, port, protocol) + if isinstance(port, tuple): + log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it", + self.external_ip, port, protocol, upnp.lanaddr, port) + return port + upnp.addportmapping(port, protocol, upnp.lanaddr, port, + description, '') + log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port, + protocol, upnp.lanaddr, port) + return port + + def threaded_try_upnp(): + if self.use_upnp is False: + log.debug("Not using upnp") + return False + u = miniupnpc.UPnP() + num_devices_found = u.discover() + if num_devices_found > 0: + u.selectigd() + external_ip = u.externalipaddress() + if external_ip != '0.0.0.0' and not self.external_ip: + # best not to rely on this external ip, the router can be behind layers of NATs + self.external_ip = external_ip + if self.peer_port: + self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port') + self.upnp_redirects.append((self.peer_port, 'TCP')) + if self.dht_node_port: + self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port') + self.upnp_redirects.append((self.dht_node_port, 'UDP')) + return True + return False + + def upnp_failed(err): + log.warning("UPnP failed. Reason: %s", err.getErrorMessage()) + return False + + d = threads.deferToThread(threaded_try_upnp) + d.addErrback(upnp_failed) + return d + + def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary + self.dht_node = self.dht_node_class( + node_id=self.node_id, + udpPort=self.dht_node_port, + externalIP=self.external_ip, + peerPort=self.peer_port, + peer_manager=self.peer_manager, + peer_finder=self.peer_finder, + ) + if not self.hash_announcer: + self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage) + self.peer_manager = self.dht_node.peer_manager + self.peer_finder = self.dht_node.peer_finder + d = self.dht_node.start(self.known_dht_nodes) + d.addCallback(lambda _: log.info("Joined the dht")) + d.addCallback(lambda _: self.hash_announcer.start()) + def _setup_other_components(self): log.debug("Setting up the rest of the components") @@ -169,4 +255,28 @@ class Session(object): self.rate_limiter.start() d = self.blob_manager.setup() + d.addCallback(lambda _: self.wallet.start()) + # d.addCallback(lambda _: self.blob_tracker.start()) + return d + + def _unset_upnp(self): + log.info("Unsetting upnp for session") + + def threaded_unset_upnp(): + u = miniupnpc.UPnP() + num_devices_found = u.discover() + if num_devices_found > 0: + u.selectigd() + for port, protocol in self.upnp_redirects: + if u.getspecificportmapping(port, protocol) is None: + log.warning( + "UPnP redirect for %s %d was removed by something else.", + protocol, port) + else: + u.deleteportmapping(port, protocol) + log.info("Removed UPnP redirect for %s %d.", protocol, port) + self.upnp_redirects = [] + + d = threads.deferToThread(threaded_unset_upnp) + d.addErrback(lambda err: str(err)) return d diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 3052fdce8..0b71ed59d 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -938,7 +938,9 @@ class LBRYumWallet(Wallet): self._lag_counter = 0 self.blocks_behind = 0 self.catchup_progress = 0 - self.is_wallet_unlocked = None + + # fired when the wallet actually unlocks (wallet_unlocked_d can be called multiple times) + self.wallet_unlock_success = defer.Deferred() def _is_first_run(self): return (not self.printed_retrieving_headers and @@ -951,23 +953,21 @@ class LBRYumWallet(Wallet): return self._cmd_runner def check_locked(self): - """ - Checks if the wallet is encrypted(locked) or not - - :return: (boolean) indicating whether the wallet is locked or not - """ - if not self._cmd_runner: + if not self.wallet.use_encryption: + log.info("Wallet is not encrypted") + self.wallet_unlock_success.callback(True) + elif not self._cmd_runner: raise Exception("Command runner hasn't been initialized yet") elif self._cmd_runner.locked: log.info("Waiting for wallet password") self.wallet_unlocked_d.addCallback(self.unlock) - return self.is_wallet_unlocked + return self.wallet_unlock_success def unlock(self, password): if self._cmd_runner and self._cmd_runner.locked: try: self._cmd_runner.unlock_wallet(password) - self.is_wallet_unlocked = True + self.wallet_unlock_success.callback(True) log.info("Unlocked the wallet!") except InvalidPassword: log.warning("Incorrect password, try again") @@ -1054,7 +1054,6 @@ class LBRYumWallet(Wallet): wallet.create_main_account() wallet.synchronize() self.wallet = wallet - self.is_wallet_unlocked = not self.wallet.use_encryption self._check_large_wallet() return defer.succeed(True) diff --git a/lbrynet/daemon/Component.py b/lbrynet/daemon/Component.py deleted file mode 100644 index e06e64482..000000000 --- a/lbrynet/daemon/Component.py +++ /dev/null @@ -1,63 +0,0 @@ -import logging -from twisted.internet import defer -from ComponentManager import ComponentManager - -log = logging.getLogger(__name__) - - -class ComponentType(type): - def __new__(mcs, name, bases, newattrs): - klass = type.__new__(mcs, name, bases, newattrs) - if name != "Component": - ComponentManager.default_component_classes[klass.component_name] = klass - return klass - - -class Component(object): - """ - lbrynet-daemon component helper - - Inheriting classes will be automatically registered with the ComponentManager and must implement setup and stop - methods - """ - - __metaclass__ = ComponentType - depends_on = [] - component_name = None - - def __init__(self, component_manager): - self.component_manager = component_manager - self._running = False - - @property - def running(self): - return self._running - - def start(self): - raise NotImplementedError() - - def stop(self): - raise NotImplementedError() - - def component(self): - raise NotImplementedError() - - @defer.inlineCallbacks - def _setup(self): - try: - result = yield defer.maybeDeferred(self.start) - self._running = True - defer.returnValue(result) - except Exception as err: - log.exception("Error setting up %s", self.component_name or self.__class__.__name__) - raise err - - @defer.inlineCallbacks - def _stop(self): - try: - result = yield defer.maybeDeferred(self.stop) - self._running = False - defer.returnValue(result) - except Exception as err: - log.exception("Error stopping %s", self.__class__.__name__) - raise err diff --git a/lbrynet/daemon/ComponentManager.py b/lbrynet/daemon/ComponentManager.py deleted file mode 100644 index af5121916..000000000 --- a/lbrynet/daemon/ComponentManager.py +++ /dev/null @@ -1,139 +0,0 @@ -import logging -from twisted.internet import defer - -from lbrynet import conf -from lbrynet.core.Error import ComponentStartConditionNotMet - -log = logging.getLogger(__name__) - - -class ComponentManager(object): - default_component_classes = {} - - def __init__(self, reactor=None, analytics_manager=None, skip_components=None, **override_components): - self.skip_components = skip_components or [] - self.skip_components.extend(conf.settings['components_to_skip']) - - self.reactor = reactor - self.component_classes = {} - self.components = set() - self.analytics_manager = analytics_manager - - for component_name, component_class in self.default_component_classes.iteritems(): - if component_name in override_components: - component_class = override_components.pop(component_name) - if component_name not in self.skip_components: - self.component_classes[component_name] = component_class - - if override_components: - raise SyntaxError("unexpected components: %s" % override_components) - - for component_class in self.component_classes.itervalues(): - self.components.add(component_class(self)) - - def sort_components(self, reverse=False): - """ - Sort components by requirements - """ - steps = [] - staged = set() - components = set(self.components) - - # components with no requirements - step = [] - for component in set(components): - if not component.depends_on: - step.append(component) - staged.add(component.component_name) - components.remove(component) - - if step: - steps.append(step) - - while components: - step = [] - to_stage = set() - for component in set(components): - reqs_met = 0 - for needed in component.depends_on: - if needed in staged: - reqs_met += 1 - if reqs_met == len(component.depends_on): - step.append(component) - to_stage.add(component.component_name) - components.remove(component) - if step: - staged.update(to_stage) - steps.append(step) - elif components: - raise ComponentStartConditionNotMet("Unresolved dependencies for: %s" % components) - if reverse: - steps.reverse() - return steps - - @defer.inlineCallbacks - def setup(self, **callbacks): - """ - Start Components in sequence sorted by requirements - - :return: (defer.Deferred) - """ - - for component_name, cb in callbacks.iteritems(): - if component_name not in self.component_classes: - raise NameError("unknown component: %s" % component_name) - if not callable(cb): - raise ValueError("%s is not callable" % cb) - - def _setup(component): - if component.component_name in callbacks: - d = component._setup() - d.addCallback(callbacks[component.component_name]) - return d - return component._setup() - - stages = self.sort_components() - for stage in stages: - yield defer.DeferredList([_setup(component) for component in stage]) - - @defer.inlineCallbacks - def stop(self): - """ - Stop Components in reversed startup order - - :return: (defer.Deferred) - """ - stages = self.sort_components(reverse=True) - for stage in stages: - yield defer.DeferredList([component._stop() for component in stage]) - - def all_components_running(self, *component_names): - """ - Check if components are running - - :return: (bool) True if all specified components are running - """ - components = {component.component_name: component for component in self.components} - for component in component_names: - if component not in components: - raise NameError("%s is not a known Component" % component) - if not components[component].running: - return False - return True - - def get_components_status(self): - """ - List status of all the components, whether they are running or not - - :return: (dict) {(str) component_name: (bool) True is running else False} - """ - return { - component.component_name: component.running - for component in self.components - } - - def get_component(self, component_name): - for component in self.components: - if component.component_name == component_name: - return component.component - raise NameError(component_name) diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py deleted file mode 100644 index 7fc17b6cb..000000000 --- a/lbrynet/daemon/Components.py +++ /dev/null @@ -1,523 +0,0 @@ -import os -import logging -import miniupnpc -from twisted.internet import defer, threads, reactor, error - -from lbrynet import conf -from lbrynet.core.Session import Session -from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType -from lbrynet.core.Wallet import LBRYumWallet -from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory -from lbrynet.core.server.ServerProtocol import ServerProtocolFactory -from lbrynet.daemon.Component import Component -from lbrynet.database.storage import SQLiteStorage -from lbrynet.dht import node, hashannouncer -from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager -from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory -from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier -from lbrynet.reflector import ServerFactory as reflector_server_factory - -from lbrynet.core.utils import generate_id - -log = logging.getLogger(__name__) - -# settings must be initialized before this file is imported - -DATABASE_COMPONENT = "database" -WALLET_COMPONENT = "wallet" -SESSION_COMPONENT = "session" -DHT_COMPONENT = "dht" -HASH_ANNOUNCER_COMPONENT = "hashAnnouncer" -STREAM_IDENTIFIER_COMPONENT = "streamIdentifier" -FILE_MANAGER_COMPONENT = "fileManager" -PEER_PROTOCOL_SERVER_COMPONENT = "peerProtocolServer" -REFLECTOR_COMPONENT = "reflector" -UPNP_COMPONENT = "upnp" - - -class ConfigSettings(object): - @staticmethod - def get_conf_setting(setting_name): - return conf.settings[setting_name] - - @staticmethod - def get_blobfiles_dir(): - if conf.settings['BLOBFILES_DIR'] == "blobfiles": - return os.path.join(GCS("data_dir"), "blobfiles") - else: - log.info("Using non-default blobfiles directory: %s", conf.settings['BLOBFILES_DIR']) - return conf.settings['BLOBFILES_DIR'] - - @staticmethod - def get_node_id(): - return conf.settings.node_id - - @staticmethod - def get_external_ip(): - from lbrynet.core.system_info import get_platform - platform = get_platform(get_ip=True) - return platform['ip'] - - -# Shorthand for common ConfigSettings methods -CS = ConfigSettings -GCS = ConfigSettings.get_conf_setting - - -class DatabaseComponent(Component): - component_name = DATABASE_COMPONENT - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.storage = None - - @property - def component(self): - return self.storage - - @staticmethod - def get_current_db_revision(): - return 9 - - @staticmethod - def get_revision_filename(): - return conf.settings.get_db_revision_filename() - - @staticmethod - def _write_db_revision_file(version_num): - with open(conf.settings.get_db_revision_filename(), mode='w') as db_revision: - db_revision.write(str(version_num)) - - @defer.inlineCallbacks - def start(self): - # check directories exist, create them if they don't - log.info("Loading databases") - - if not os.path.exists(GCS('download_directory')): - os.mkdir(GCS('download_directory')) - - if not os.path.exists(GCS('data_dir')): - os.mkdir(GCS('data_dir')) - self._write_db_revision_file(self.get_current_db_revision()) - log.debug("Created the db revision file: %s", self.get_revision_filename()) - - if not os.path.exists(CS.get_blobfiles_dir()): - os.mkdir(CS.get_blobfiles_dir()) - log.debug("Created the blobfile directory: %s", str(CS.get_blobfiles_dir())) - - if not os.path.exists(self.get_revision_filename()): - log.warning("db_revision file not found. Creating it") - self._write_db_revision_file(self.get_current_db_revision()) - - # check the db migration and run any needed migrations - with open(self.get_revision_filename(), "r") as revision_read_handle: - old_revision = int(revision_read_handle.read().strip()) - - if old_revision > self.get_current_db_revision(): - raise Exception('This version of lbrynet is not compatible with the database\n' - 'Your database is revision %i, expected %i' % - (old_revision, self.get_current_db_revision())) - if old_revision < self.get_current_db_revision(): - from lbrynet.database.migrator import dbmigrator - log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision()) - yield threads.deferToThread( - dbmigrator.migrate_db, GCS('data_dir'), old_revision, self.get_current_db_revision() - ) - self._write_db_revision_file(self.get_current_db_revision()) - log.info("Finished upgrading the databases.") - - # start SQLiteStorage - self.storage = SQLiteStorage(GCS('data_dir')) - yield self.storage.setup() - - @defer.inlineCallbacks - def stop(self): - yield self.storage.stop() - self.storage = None - - -class WalletComponent(Component): - component_name = WALLET_COMPONENT - depends_on = [DATABASE_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.wallet = None - - @property - def component(self): - return self.wallet - - @defer.inlineCallbacks - def start(self): - storage = self.component_manager.get_component(DATABASE_COMPONENT) - wallet_type = GCS('wallet') - - if wallet_type == conf.LBRYCRD_WALLET: - raise ValueError('LBRYcrd Wallet is no longer supported') - elif wallet_type == conf.LBRYUM_WALLET: - - log.info("Using lbryum wallet") - - lbryum_servers = {address: {'t': str(port)} - for address, port in GCS('lbryum_servers')} - - config = { - 'auto_connect': True, - 'chain': GCS('blockchain_name'), - 'default_servers': lbryum_servers - } - - if 'use_keyring' in conf.settings: - config['use_keyring'] = GCS('use_keyring') - if conf.settings['lbryum_wallet_dir']: - config['lbryum_path'] = GCS('lbryum_wallet_dir') - self.wallet = LBRYumWallet(storage, config) - yield self.wallet.start() - else: - raise ValueError('Wallet Type {} is not valid'.format(wallet_type)) - - @defer.inlineCallbacks - def stop(self): - yield self.wallet.stop() - self.wallet = None - - -class SessionComponent(Component): - component_name = SESSION_COMPONENT - depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.session = None - - @property - def component(self): - return self.session - - @defer.inlineCallbacks - def start(self): - self.session = Session( - GCS('data_rate'), - db_dir=GCS('data_dir'), - node_id=CS.get_node_id(), - blob_dir=CS.get_blobfiles_dir(), - dht_node=self.component_manager.get_component(DHT_COMPONENT), - hash_announcer=self.component_manager.get_component(HASH_ANNOUNCER_COMPONENT), - dht_node_port=GCS('dht_node_port'), - known_dht_nodes=GCS('known_dht_nodes'), - peer_port=GCS('peer_port'), - use_upnp=GCS('use_upnp'), - wallet=self.component_manager.get_component(WALLET_COMPONENT), - is_generous=GCS('is_generous_host'), - external_ip=CS.get_external_ip(), - storage=self.component_manager.get_component(DATABASE_COMPONENT) - ) - yield self.session.setup() - - @defer.inlineCallbacks - def stop(self): - yield self.session.shut_down() - - -class DHTComponent(Component): - component_name = DHT_COMPONENT - depends_on = [UPNP_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.dht_node = None - self.upnp_component = None - self.udp_port, self.peer_port = None, None - - @property - def component(self): - return self.dht_node - - @defer.inlineCallbacks - def start(self): - self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) - self.udp_port, self.peer_port = self.upnp_component.get_redirects() - node_id = CS.get_node_id() - if node_id is None: - node_id = generate_id() - - self.dht_node = node.Node( - node_id=node_id, - udpPort=self.udp_port, - externalIP=CS.get_external_ip(), - peerPort=self.peer_port - ) - yield self.dht_node.start(GCS('known_dht_nodes')) - log.info("Joined the dht") - - @defer.inlineCallbacks - def stop(self): - yield self.dht_node.stop() - - -class HashAnnouncer(Component): - component_name = HASH_ANNOUNCER_COMPONENT - depends_on = [DHT_COMPONENT, DATABASE_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.hash_announcer = None - - @property - def component(self): - return self.hash_announcer - - @defer.inlineCallbacks - def start(self): - storage = self.component_manager.get_component(DATABASE_COMPONENT) - dht_node = self.component_manager.get_component(DHT_COMPONENT) - self.hash_announcer = hashannouncer.DHTHashAnnouncer(dht_node, storage) - yield self.hash_announcer.start() - - @defer.inlineCallbacks - def stop(self): - yield self.hash_announcer.stop() - - -class StreamIdentifier(Component): - component_name = STREAM_IDENTIFIER_COMPONENT - depends_on = [SESSION_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.sd_identifier = StreamDescriptorIdentifier() - - @property - def component(self): - return self.sd_identifier - - @defer.inlineCallbacks - def start(self): - session = self.component_manager.get_component(SESSION_COMPONENT) - add_lbry_file_to_sd_identifier(self.sd_identifier) - file_saver_factory = EncryptedFileSaverFactory( - session.peer_finder, - session.rate_limiter, - session.blob_manager, - session.storage, - session.wallet, - GCS('download_directory') - ) - yield self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory) - - def stop(self): - pass - - -class FileManager(Component): - component_name = FILE_MANAGER_COMPONENT - depends_on = [SESSION_COMPONENT, STREAM_IDENTIFIER_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.file_manager = None - - @property - def component(self): - return self.file_manager - - @defer.inlineCallbacks - def start(self): - session = self.component_manager.get_component(SESSION_COMPONENT) - sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT) - log.info('Starting the file manager') - self.file_manager = EncryptedFileManager(session, sd_identifier) - yield self.file_manager.setup() - log.info('Done setting up file manager') - - @defer.inlineCallbacks - def stop(self): - yield self.file_manager.stop() - - -class PeerProtocolServer(Component): - component_name = PEER_PROTOCOL_SERVER_COMPONENT - depends_on = [SESSION_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.lbry_server_port = None - - @property - def component(self): - return self.lbry_server_port - - @defer.inlineCallbacks - def start(self): - query_handlers = {} - peer_port = GCS('peer_port') - session = self.component_manager.get_component(SESSION_COMPONENT) - - handlers = [ - BlobRequestHandlerFactory( - session.blob_manager, - session.wallet, - session.payment_rate_manager, - self.component_manager.analytics_manager - ), - session.wallet.get_wallet_info_query_handler_factory(), - ] - - for handler in handlers: - query_id = handler.get_primary_query_identifier() - query_handlers[query_id] = handler - - if peer_port is not None: - server_factory = ServerProtocolFactory(session.rate_limiter, query_handlers, session.peer_manager) - - try: - log.info("Peer protocol listening on TCP %d", peer_port) - self.lbry_server_port = yield reactor.listenTCP(peer_port, server_factory) - except error.CannotListenError as e: - import traceback - log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" - " more details.", peer_port) - log.error("%s", traceback.format_exc()) - raise ValueError("%s lbrynet may already be running on your computer." % str(e)) - - @defer.inlineCallbacks - def stop(self): - if self.lbry_server_port is not None: - self.lbry_server_port, old_port = None, self.lbry_server_port - log.info('Stop listening on port %s', old_port.port) - yield old_port.stopListening() - - -class ReflectorComponent(Component): - component_name = REFLECTOR_COMPONENT - depends_on = [SESSION_COMPONENT, FILE_MANAGER_COMPONENT] - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.reflector_server_port = GCS('reflector_port') - self.run_reflector_server = GCS('run_reflector_server') - - @property - def component(self): - return self - - @defer.inlineCallbacks - def start(self): - session = self.component_manager.get_component(SESSION_COMPONENT) - file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) - - if self.run_reflector_server and self.reflector_server_port is not None: - log.info("Starting reflector server") - reflector_factory = reflector_server_factory(session.peer_manager, session.blob_manager, file_manager) - try: - self.reflector_server_port = yield reactor.listenTCP(self.reflector_server_port, reflector_factory) - log.info('Started reflector on port %s', self.reflector_server_port) - except error.CannotListenError as e: - log.exception("Couldn't bind reflector to port %d", self.reflector_server_port) - raise ValueError("{} lbrynet may already be running on your computer.".format(e)) - - def stop(self): - if self.run_reflector_server and self.reflector_server_port is not None: - log.info("Stopping reflector server") - if self.reflector_server_port is not None: - self.reflector_server_port, p = None, self.reflector_server_port - yield p.stopListening - - -class UPnPComponent(Component): - component_name = UPNP_COMPONENT - - def __init__(self, component_manager): - Component.__init__(self, component_manager) - self.peer_port = GCS('peer_port') - self.dht_node_port = GCS('dht_node_port') - self.use_upnp = GCS('use_upnp') - self.external_ip = CS.get_external_ip() - self.upnp_redirects = [] - - @property - def component(self): - return self - - def get_redirects(self): - return self.peer_port, self.dht_node_port - - def start(self): - log.debug("In _try_upnp") - - def get_free_port(upnp, port, protocol): - # returns an existing mapping if it exists - mapping = upnp.getspecificportmapping(port, protocol) - if not mapping: - return port - if upnp.lanaddr == mapping[0]: - return mapping[1] - return get_free_port(upnp, port + 1, protocol) - - def get_port_mapping(upnp, port, protocol, description): - # try to map to the requested port, if there is already a mapping use the next external - # port available - if protocol not in ['UDP', 'TCP']: - raise Exception("invalid protocol") - port = get_free_port(upnp, port, protocol) - if isinstance(port, tuple): - log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it", - self.external_ip, port, protocol, upnp.lanaddr, port) - return port - upnp.addportmapping(port, protocol, upnp.lanaddr, port, - description, '') - log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port, - protocol, upnp.lanaddr, port) - return port - - def threaded_try_upnp(): - if self.use_upnp is False: - log.debug("Not using upnp") - return False - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - external_ip = u.externalipaddress() - if external_ip != '0.0.0.0' and not self.external_ip: - # best not to rely on this external ip, the router can be behind layers of NATs - self.external_ip = external_ip - if self.peer_port: - self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port') - self.upnp_redirects.append((self.peer_port, 'TCP')) - if self.dht_node_port: - self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port') - self.upnp_redirects.append((self.dht_node_port, 'UDP')) - return True - return False - - def upnp_failed(err): - log.warning("UPnP failed. Reason: %s", err.getErrorMessage()) - return False - - d = threads.deferToThread(threaded_try_upnp) - d.addErrback(upnp_failed) - return d - - def stop(self): - log.info("Unsetting upnp for session") - - def threaded_unset_upnp(): - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - for port, protocol in self.upnp_redirects: - if u.getspecificportmapping(port, protocol) is None: - log.warning( - "UPnP redirect for %s %d was removed by something else.", - protocol, port) - else: - u.deleteportmapping(port, protocol) - log.info("Removed UPnP redirect for %s %d.", protocol, port) - self.upnp_redirects = [] - - d = threads.deferToThread(threaded_unset_upnp) - d.addErrback(lambda err: str(err)) - return d diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 78be571fc..3d1681cc7 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -1,4 +1,3 @@ -# coding=utf-8 import binascii import logging.handlers import mimetypes @@ -9,10 +8,11 @@ import urllib import json import textwrap import signal +import six from copy import deepcopy from decimal import Decimal, InvalidOperation from twisted.web import server -from twisted.internet import defer, reactor +from twisted.internet import defer, threads, error, reactor from twisted.internet.task import LoopingCall from twisted.python.failure import Failure @@ -25,20 +25,28 @@ from lbryschema.decode import smart_decode # TODO: importing this when internet is disabled raises a socket.gaierror from lbrynet.core.system_info import get_lbrynet_version +from lbrynet.database.storage import SQLiteStorage from lbrynet import conf +from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET from lbrynet.reflector import reupload +from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.core.log_support import configure_loggly_handler -from lbrynet.daemon.Component import ComponentManager -from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT -from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT +from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory +from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.daemon.Downloader import GetStream from lbrynet.daemon.Publisher import Publisher from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager from lbrynet.daemon.auth.server import AuthJSONRPCServer from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core import utils, system_info -from lbrynet.core.StreamDescriptor import download_sd_blob +from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob +from lbrynet.core.StreamDescriptor import EncryptedFileStreamType +from lbrynet.core.Session import Session +from lbrynet.core.Wallet import LBRYumWallet from lbrynet.core.looping_call_manager import LoopingCallManager +from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory +from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError @@ -50,6 +58,23 @@ from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloade log = logging.getLogger(__name__) INITIALIZING_CODE = 'initializing' +LOADING_DB_CODE = 'loading_db' +LOADING_WALLET_CODE = 'loading_wallet' +LOADING_FILE_MANAGER_CODE = 'loading_file_manager' +LOADING_SERVER_CODE = 'loading_server' +STARTED_CODE = 'started' +WAITING_FOR_FIRST_RUN_CREDITS = 'waiting_for_credits' +WAITING_FOR_UNLOCK = 'waiting_for_wallet_unlock' +STARTUP_STAGES = [ + (INITIALIZING_CODE, 'Initializing'), + (LOADING_DB_CODE, 'Loading databases'), + (LOADING_WALLET_CODE, 'Catching up with the blockchain'), + (LOADING_FILE_MANAGER_CODE, 'Setting up file manager'), + (LOADING_SERVER_CODE, 'Starting lbrynet'), + (STARTED_CODE, 'Started lbrynet'), + (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits'), + (WAITING_FOR_UNLOCK, 'Waiting for user to unlock the wallet using the wallet_unlock command') +] # TODO: make this consistent with the stages in Downloader.py DOWNLOAD_METADATA_CODE = 'downloading_metadata' @@ -153,20 +178,40 @@ class Daemon(AuthJSONRPCServer): LBRYnet daemon, a jsonrpc interface to lbry functions """ - def __init__(self, analytics_manager, component_manager=None): + allowed_during_startup = [ + 'daemon_stop', 'status', 'version', 'wallet_unlock' + ] + + def __init__(self, analytics_manager): AuthJSONRPCServer.__init__(self, conf.settings['use_auth_http']) + self.db_dir = conf.settings['data_dir'] + self.storage = SQLiteStorage(self.db_dir) self.download_directory = conf.settings['download_directory'] + if conf.settings['BLOBFILES_DIR'] == "blobfiles": + self.blobfile_dir = os.path.join(self.db_dir, "blobfiles") + else: + log.info("Using non-default blobfiles directory: %s", conf.settings['BLOBFILES_DIR']) + self.blobfile_dir = conf.settings['BLOBFILES_DIR'] self.data_rate = conf.settings['data_rate'] self.max_key_fee = conf.settings['max_key_fee'] self.disable_max_key_fee = conf.settings['disable_max_key_fee'] self.download_timeout = conf.settings['download_timeout'] + self.run_reflector_server = conf.settings['run_reflector_server'] + self.wallet_type = conf.settings['wallet'] self.delete_blobs_on_remove = conf.settings['delete_blobs_on_remove'] + self.peer_port = conf.settings['peer_port'] + self.reflector_port = conf.settings['reflector_port'] + self.dht_node_port = conf.settings['dht_node_port'] + self.use_upnp = conf.settings['use_upnp'] self.auto_renew_claim_height_delta = conf.settings['auto_renew_claim_height_delta'] + self.startup_status = STARTUP_STAGES[0] self.connected_to_internet = True self.connection_status_code = None self.platform = None + self.current_db_revision = 9 self.db_revision_file = conf.settings.get_db_revision_filename() + self.session = None self._session_id = conf.settings.get_session_id() # TODO: this should probably be passed into the daemon, or # possibly have the entire log upload functionality taken out @@ -175,16 +220,9 @@ class Daemon(AuthJSONRPCServer): self.analytics_manager = analytics_manager self.node_id = conf.settings.node_id - # components - self.storage = None - self.dht_node = None - self.wallet = None - self.sd_identifier = None - self.session = None - self.file_manager = None - self.wallet_user = None self.wallet_password = None + self.query_handlers = {} self.waiting_on = {} self.streams = {} self.exchange_rate_manager = ExchangeRateManager() @@ -193,7 +231,8 @@ class Daemon(AuthJSONRPCServer): Checker.CONNECTION_STATUS: LoopingCall(self._update_connection_status), } self.looping_call_manager = LoopingCallManager(calls) - self.component_manager = component_manager or ComponentManager(self.analytics_manager) + self.sd_identifier = StreamDescriptorIdentifier() + self.lbry_file_manager = None @defer.inlineCallbacks def setup(self): @@ -207,19 +246,32 @@ class Daemon(AuthJSONRPCServer): self.exchange_rate_manager.start() yield self._initial_setup() - yield self.component_manager.setup() - self.storage = self.component_manager.get_component(DATABASE_COMPONENT) - self.session = self.component_manager.get_component(SESSION_COMPONENT) - self.wallet = self.component_manager.get_component(WALLET_COMPONENT) - self.dht_node = self.component_manager.get_component(DHT_COMPONENT) - # yield self._check_wallet_locked() + yield threads.deferToThread(self._setup_data_directory) + migrated = yield self._check_db_migration() + yield self.storage.setup() + yield self._get_session() + yield self._check_wallet_locked() yield self._start_analytics() - self.sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT) - self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) - log.info("Starting balance: " + str(self.wallet.get_balance())) + yield add_lbry_file_to_sd_identifier(self.sd_identifier) + yield self._setup_stream_identifier() + yield self._setup_lbry_file_manager() + yield self._setup_query_handlers() + yield self._setup_server() + log.info("Starting balance: " + str(self.session.wallet.get_balance())) self.announced_startup = True + self.startup_status = STARTUP_STAGES[5] log.info("Started lbrynet-daemon") + ### + # this should be removed with the next db revision + if migrated: + missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids() + while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe + batch = missing_channel_claim_ids[:100] + _ = yield self.session.wallet.get_claims_by_ids(*batch) + missing_channel_claim_ids = missing_channel_claim_ids[100:] + ### + self._auto_renew() def _get_platform(self): @@ -250,12 +302,12 @@ class Daemon(AuthJSONRPCServer): # auto renew is turned off if 0 or some negative number if self.auto_renew_claim_height_delta < 1: defer.returnValue(None) - if not self.wallet.network.get_remote_height(): + if not self.session.wallet.network.get_remote_height(): log.warning("Failed to get remote height, aborting auto renew") defer.returnValue(None) log.debug("Renewing claim") - h = self.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta - results = yield self.wallet.claim_renew_all_before_expiration(h) + h = self.session.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta + results = yield self.session.wallet.claim_renew_all_before_expiration(h) for outpoint, result in results.iteritems(): if result['success']: log.info("Renewed claim at outpoint:%s claim ID:%s, paid fee:%s", @@ -264,6 +316,93 @@ class Daemon(AuthJSONRPCServer): log.info("Failed to renew claim at outpoint:%s, reason:%s", outpoint, result['reason']) + def _start_server(self): + if self.peer_port is not None: + server_factory = ServerProtocolFactory(self.session.rate_limiter, + self.query_handlers, + self.session.peer_manager) + + try: + log.info("Peer protocol listening on TCP %d", self.peer_port) + self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory) + except error.CannotListenError as e: + import traceback + log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" + " more details.", self.peer_port) + log.error("%s", traceback.format_exc()) + raise ValueError("%s lbrynet may already be running on your computer." % str(e)) + return defer.succeed(True) + + def _start_reflector(self): + if self.run_reflector_server: + log.info("Starting reflector server") + if self.reflector_port is not None: + reflector_factory = reflector_server_factory( + self.session.peer_manager, + self.session.blob_manager, + self.lbry_file_manager + ) + try: + self.reflector_server_port = reactor.listenTCP(self.reflector_port, + reflector_factory) + log.info('Started reflector on port %s', self.reflector_port) + except error.CannotListenError as e: + log.exception("Couldn't bind reflector to port %d", self.reflector_port) + raise ValueError( + "{} lbrynet may already be running on your computer.".format(e)) + return defer.succeed(True) + + def _stop_reflector(self): + if self.run_reflector_server: + log.info("Stopping reflector server") + try: + if self.reflector_server_port is not None: + self.reflector_server_port, p = None, self.reflector_server_port + return defer.maybeDeferred(p.stopListening) + except AttributeError: + return defer.succeed(True) + return defer.succeed(True) + + def _stop_file_manager(self): + if self.lbry_file_manager: + self.lbry_file_manager.stop() + return defer.succeed(True) + + def _stop_server(self): + try: + if self.lbry_server_port is not None: + self.lbry_server_port, old_port = None, self.lbry_server_port + log.info('Stop listening on port %s', old_port.port) + return defer.maybeDeferred(old_port.stopListening) + else: + return defer.succeed(True) + except AttributeError: + return defer.succeed(True) + + def _setup_server(self): + self.startup_status = STARTUP_STAGES[4] + d = self._start_server() + d.addCallback(lambda _: self._start_reflector()) + return d + + def _setup_query_handlers(self): + handlers = [ + BlobRequestHandlerFactory( + self.session.blob_manager, + self.session.wallet, + self.session.payment_rate_manager, + self.analytics_manager + ), + self.session.wallet.get_wallet_info_query_handler_factory(), + ] + return self._add_query_handlers(handlers) + + def _add_query_handlers(self, query_handlers): + for handler in query_handlers: + query_id = handler.get_primary_query_identifier() + self.query_handlers[query_id] = handler + return defer.succeed(None) + @staticmethod def _already_shutting_down(sig_num, frame): log.info("Already shutting down") @@ -279,26 +418,190 @@ class Daemon(AuthJSONRPCServer): signal.signal(signal.SIGTERM, self._already_shutting_down) log.info("Closing lbrynet session") + log.info("Status at time of shutdown: " + self.startup_status[0]) self._stop_streams() self.looping_call_manager.shutdown() if self.analytics_manager: self.analytics_manager.shutdown() - if self.component_manager is not None: - d = self.component_manager.stop() + d = self._stop_server() + d.addErrback(log.fail(), 'Failure while shutting down') + d.addCallback(lambda _: self._stop_reflector()) + d.addErrback(log.fail(), 'Failure while shutting down') + d.addCallback(lambda _: self._stop_file_manager()) + d.addErrback(log.fail(), 'Failure while shutting down') + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) d.addErrback(log.fail(), 'Failure while shutting down') return d + def _update_settings(self, settings): + setting_types = { + 'download_directory': str, + 'data_rate': float, + 'download_timeout': int, + 'peer_port': int, + 'max_key_fee': dict, + 'use_upnp': bool, + 'run_reflector_server': bool, + 'cache_time': int, + 'reflect_uploads': bool, + 'share_usage_data': bool, + 'disable_max_key_fee': bool, + 'peer_search_timeout': int, + 'sd_download_timeout': int, + 'auto_renew_claim_height_delta': int + } + + for key, setting_type in setting_types.iteritems(): + if key in settings: + if isinstance(settings[key], setting_type): + conf.settings.update({key: settings[key]}, + data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) + elif setting_type is dict and isinstance(settings[key], six.string_types): + decoded = json.loads(str(settings[key])) + conf.settings.update({key: decoded}, + data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) + else: + converted = setting_type(settings[key]) + conf.settings.update({key: converted}, + data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) + conf.settings.save_conf_file_settings() + + self.data_rate = conf.settings['data_rate'] + self.max_key_fee = conf.settings['max_key_fee'] + self.disable_max_key_fee = conf.settings['disable_max_key_fee'] + self.download_directory = conf.settings['download_directory'] + self.download_timeout = conf.settings['download_timeout'] + + return defer.succeed(True) + + def _write_db_revision_file(self, version_num): + with open(self.db_revision_file, mode='w') as db_revision: + db_revision.write(str(version_num)) + + def _setup_data_directory(self): + old_revision = 1 + self.startup_status = STARTUP_STAGES[1] + log.info("Loading databases") + if not os.path.exists(self.download_directory): + os.mkdir(self.download_directory) + if not os.path.exists(self.db_dir): + os.mkdir(self.db_dir) + self._write_db_revision_file(self.current_db_revision) + log.debug("Created the db revision file: %s", self.db_revision_file) + if not os.path.exists(self.blobfile_dir): + os.mkdir(self.blobfile_dir) + log.debug("Created the blobfile directory: %s", str(self.blobfile_dir)) + if not os.path.exists(self.db_revision_file): + log.warning("db_revision file not found. Creating it") + self._write_db_revision_file(self.current_db_revision) + + @defer.inlineCallbacks + def _check_db_migration(self): + old_revision = 1 + migrated = False + if os.path.exists(self.db_revision_file): + with open(self.db_revision_file, "r") as revision_read_handle: + old_revision = int(revision_read_handle.read().strip()) + + if old_revision > self.current_db_revision: + raise Exception('This version of lbrynet is not compatible with the database\n' + 'Your database is revision %i, expected %i' % + (old_revision, self.current_db_revision)) + if old_revision < self.current_db_revision: + from lbrynet.database.migrator import dbmigrator + log.info("Upgrading your databases (revision %i to %i)", old_revision, self.current_db_revision) + yield threads.deferToThread( + dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision + ) + self._write_db_revision_file(self.current_db_revision) + log.info("Finished upgrading the databases.") + migrated = True + defer.returnValue(migrated) + + @defer.inlineCallbacks + def _setup_lbry_file_manager(self): + log.info('Starting the file manager') + self.startup_status = STARTUP_STAGES[3] + self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) + yield self.lbry_file_manager.setup() + log.info('Done setting up file manager') + def _start_analytics(self): if not self.analytics_manager.is_started: self.analytics_manager.start() + def _get_session(self): + def get_wallet(): + if self.wallet_type == LBRYCRD_WALLET: + raise ValueError('LBRYcrd Wallet is no longer supported') + elif self.wallet_type == LBRYUM_WALLET: + + log.info("Using lbryum wallet") + + lbryum_servers = {address: {'t': str(port)} + for address, port in conf.settings['lbryum_servers']} + + config = { + 'auto_connect': True, + 'chain': conf.settings['blockchain_name'], + 'default_servers': lbryum_servers + } + + if 'use_keyring' in conf.settings: + config['use_keyring'] = conf.settings['use_keyring'] + if conf.settings['lbryum_wallet_dir']: + config['lbryum_path'] = conf.settings['lbryum_wallet_dir'] + wallet = LBRYumWallet(self.storage, config) + return defer.succeed(wallet) + else: + raise ValueError('Wallet Type {} is not valid'.format(self.wallet_type)) + + d = get_wallet() + + def create_session(wallet): + self.session = Session( + conf.settings['data_rate'], + db_dir=self.db_dir, + node_id=self.node_id, + blob_dir=self.blobfile_dir, + dht_node_port=self.dht_node_port, + known_dht_nodes=conf.settings['known_dht_nodes'], + peer_port=self.peer_port, + use_upnp=self.use_upnp, + wallet=wallet, + is_generous=conf.settings['is_generous_host'], + external_ip=self.platform['ip'], + storage=self.storage + ) + self.startup_status = STARTUP_STAGES[2] + + d.addCallback(create_session) + d.addCallback(lambda _: self.session.setup()) + return d + @defer.inlineCallbacks def _check_wallet_locked(self): - wallet = self.wallet - # if wallet.wallet.use_encryption: STARTUP Stage was set earlier, figure out what to do now - yield wallet.check_locked + wallet = self.session.wallet + if wallet.wallet.use_encryption: + self.startup_status = STARTUP_STAGES[7] + + yield wallet.check_locked() + + def _setup_stream_identifier(self): + file_saver_factory = EncryptedFileSaverFactory( + self.session.peer_finder, + self.session.rate_limiter, + self.session.blob_manager, + self.session.storage, + self.session.wallet, + self.download_directory + ) + self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, + file_saver_factory) + return defer.succeed(None) def _download_blob(self, blob_hash, rate_manager=None, timeout=None): """ @@ -317,7 +620,7 @@ class Daemon(AuthJSONRPCServer): timeout = timeout or 30 downloader = StandaloneBlobDownloader( blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter, - rate_manager, self.wallet, timeout + rate_manager, self.session.wallet, timeout ) return downloader.download() @@ -325,7 +628,7 @@ class Daemon(AuthJSONRPCServer): def _get_stream_analytics_report(self, claim_dict): sd_hash = claim_dict.source_hash try: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) except Exception: stream_hash = None report = { @@ -339,7 +642,7 @@ class Daemon(AuthJSONRPCServer): sd_host = None report["sd_blob"] = sd_host if stream_hash: - blob_infos = yield self.storage.get_blobs_for_stream(stream_hash) + blob_infos = yield self.session.storage.get_blobs_for_stream(stream_hash) report["known_blobs"] = len(blob_infos) else: blob_infos = [] @@ -410,12 +713,11 @@ class Daemon(AuthJSONRPCServer): def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None, claim_address=None, change_address=None): - publisher = Publisher(self.session, self.file_manager, self.wallet, + publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet, certificate_id) parse_lbry_uri(name) if not file_path: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash( - claim_dict['stream']['source']['source']) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(claim_dict['stream']['source']['source']) claim_out = yield publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address, change_address) else: @@ -440,7 +742,7 @@ class Daemon(AuthJSONRPCServer): """ parsed = parse_lbry_uri(name) - resolution = yield self.wallet.resolve(parsed.name, check_cache=not force_refresh) + resolution = yield self.session.wallet.resolve(parsed.name, check_cache=not force_refresh) if parsed.name in resolution: result = resolution[parsed.name] defer.returnValue(result) @@ -495,7 +797,7 @@ class Daemon(AuthJSONRPCServer): cost = self._get_est_cost_from_stream_size(size) - resolved = yield self.wallet.resolve(uri) + resolved = yield self.session.wallet.resolve(uri) if uri in resolved and 'claim' in resolved[uri]: claim = ClaimDict.load_dict(resolved[uri]['claim']['value']) @@ -542,7 +844,7 @@ class Daemon(AuthJSONRPCServer): Resolve a name and return the estimated stream cost """ - resolved = yield self.wallet.resolve(uri) + resolved = yield self.session.wallet.resolve(uri) if resolved: claim_response = resolved[uri] else: @@ -622,7 +924,7 @@ class Daemon(AuthJSONRPCServer): def _get_lbry_file(self, search_by, val, return_json=False, full_status=False): lbry_file = None if search_by in FileID: - for l_f in self.file_manager.lbry_files: + for l_f in self.lbry_file_manager.lbry_files: if l_f.__dict__.get(search_by) == val: lbry_file = l_f break @@ -634,7 +936,7 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def _get_lbry_files(self, return_json=False, full_status=True, **kwargs): - lbry_files = list(self.file_manager.lbry_files) + lbry_files = list(self.lbry_file_manager.lbry_files) if kwargs: for search_type, value in iter_lbry_file_search_values(kwargs): lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value] @@ -671,7 +973,7 @@ class Daemon(AuthJSONRPCServer): def _get_single_peer_downloader(self): downloader = SinglePeerDownloader() - downloader.setup(self.wallet) + downloader.setup(self.session.wallet) return downloader @defer.inlineCallbacks @@ -740,7 +1042,8 @@ class Daemon(AuthJSONRPCServer): 'is_running': bool, 'is_first_run': bool, 'startup_status': { - (str) component_name: (bool) True if running else False, + 'code': status code, + 'message': status message }, 'connection_status': { 'code': connection status code, @@ -764,19 +1067,22 @@ class Daemon(AuthJSONRPCServer): """ # on startup, the wallet or network won't be available but we still need this call to work - has_wallet = self.session and self.wallet and self.wallet.network - local_height = self.wallet.network.get_local_height() if has_wallet else 0 - remote_height = self.wallet.network.get_server_height() if has_wallet else 0 - best_hash = (yield self.wallet.get_best_blockhash()) if has_wallet else None - wallet_is_encrypted = has_wallet and self.wallet.wallet and \ - self.wallet.wallet.use_encryption + has_wallet = self.session and self.session.wallet and self.session.wallet.network + local_height = self.session.wallet.network.get_local_height() if has_wallet else 0 + remote_height = self.session.wallet.network.get_server_height() if has_wallet else 0 + best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None + wallet_is_encrypted = has_wallet and self.session.wallet.wallet and \ + self.session.wallet.wallet.use_encryption response = { 'lbry_id': base58.b58encode(self.node_id), 'installation_id': conf.settings.installation_id, 'is_running': self.announced_startup, - 'is_first_run': self.wallet.is_first_run if has_wallet else None, - 'startup_status': self.component_manager.get_components_status(), + 'is_first_run': self.session.wallet.is_first_run if has_wallet else None, + 'startup_status': { + 'code': self.startup_status[0], + 'message': self.startup_status[1], + }, 'connection_status': { 'code': self.connection_status_code, 'message': ( @@ -799,7 +1105,7 @@ class Daemon(AuthJSONRPCServer): should_announce_blobs = yield self.session.blob_manager.count_should_announce_blobs() response['session_status'] = { 'managed_blobs': len(blobs), - 'managed_streams': len(self.file_manager.lbry_files), + 'managed_streams': len(self.lbry_file_manager.lbry_files), 'announce_queue_size': announce_queue_size, 'should_announce_blobs': should_announce_blobs, } @@ -875,6 +1181,7 @@ class Daemon(AuthJSONRPCServer): """ return self._render_response(conf.settings.get_adjustable_settings_dict()) + @defer.inlineCallbacks def jsonrpc_settings_set(self, **kwargs): """ Set daemon settings @@ -926,48 +1233,8 @@ class Daemon(AuthJSONRPCServer): (dict) Updated dictionary of daemon settings """ - # TODO: improve upon the current logic, it could be made better - new_settings = kwargs - - setting_types = { - 'download_directory': str, - 'data_rate': float, - 'download_timeout': int, - 'peer_port': int, - 'max_key_fee': dict, - 'use_upnp': bool, - 'run_reflector_server': bool, - 'cache_time': int, - 'reflect_uploads': bool, - 'share_usage_data': bool, - 'disable_max_key_fee': bool, - 'peer_search_timeout': int, - 'sd_download_timeout': int, - 'auto_renew_claim_height_delta': int - } - - for key, setting_type in setting_types.iteritems(): - if key in new_settings: - if isinstance(new_settings[key], setting_type): - conf.settings.update({key: new_settings[key]}, - data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) - elif setting_type is dict and isinstance(new_settings[key], (unicode, str)): - decoded = json.loads(str(new_settings[key])) - conf.settings.update({key: decoded}, - data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) - else: - converted = setting_type(new_settings[key]) - conf.settings.update({key: converted}, - data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) - conf.settings.save_conf_file_settings() - - self.data_rate = conf.settings['data_rate'] - self.max_key_fee = conf.settings['max_key_fee'] - self.disable_max_key_fee = conf.settings['disable_max_key_fee'] - self.download_directory = conf.settings['download_directory'] - self.download_timeout = conf.settings['download_timeout'] - - return self._render_response(conf.settings.get_adjustable_settings_dict()) + yield self._update_settings(kwargs) + defer.returnValue(conf.settings.get_adjustable_settings_dict()) def jsonrpc_help(self, command=None): """ @@ -1017,7 +1284,6 @@ class Daemon(AuthJSONRPCServer): """ return self._render_response(sorted([command for command in self.callable_methods.keys()])) - @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_balance(self, address=None, include_unconfirmed=False): """ Return the balance of the wallet @@ -1034,12 +1300,11 @@ class Daemon(AuthJSONRPCServer): (float) amount of lbry credits in wallet """ if address is None: - return self._render_response(float(self.wallet.get_balance())) + return self._render_response(float(self.session.wallet.get_balance())) else: return self._render_response(float( - self.wallet.get_address_balance(address, include_unconfirmed))) + self.session.wallet.get_address_balance(address, include_unconfirmed))) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_wallet_unlock(self, password): """ @@ -1055,10 +1320,9 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is unlocked, otherwise false """ - # the check_locked() in the if statement is needed because that is what sets - # the wallet_unlocked_d deferred ¯\_(ツ)_/¯ - if not self.wallet.check_locked: - d = self.wallet.wallet_unlocked_d + cmd_runner = self.session.wallet.get_cmd_runner() + if cmd_runner.locked: + d = self.session.wallet.wallet_unlocked_d d.callback(password) result = yield d else: @@ -1066,7 +1330,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_decrypt(self): """ @@ -1082,11 +1345,10 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is decrypted, otherwise false """ - result = self.wallet.decrypt_wallet() + result = self.session.wallet.decrypt_wallet() response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_encrypt(self, new_password): """ @@ -1103,8 +1365,8 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is decrypted, otherwise false """ - self.wallet.encrypt_wallet(new_password) - response = yield self._render_response(self.wallet.wallet.use_encryption) + self.session.wallet.encrypt_wallet(new_password) + response = yield self._render_response(self.session.wallet.wallet.use_encryption) defer.returnValue(response) @defer.inlineCallbacks @@ -1127,7 +1389,6 @@ class Daemon(AuthJSONRPCServer): reactor.callLater(0.1, reactor.fireSystemEvent, "shutdown") defer.returnValue(response) - @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_list(self, sort=None, **kwargs): """ @@ -1199,7 +1460,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_resolve_name(self, name, force=False): """ @@ -1225,7 +1485,6 @@ class Daemon(AuthJSONRPCServer): else: defer.returnValue(metadata) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_claim_show(self, txid=None, nout=None, claim_id=None): """ @@ -1263,15 +1522,14 @@ class Daemon(AuthJSONRPCServer): """ if claim_id is not None and txid is None and nout is None: - claim_results = yield self.wallet.get_claim_by_claim_id(claim_id) + claim_results = yield self.session.wallet.get_claim_by_claim_id(claim_id) elif txid is not None and nout is not None and claim_id is None: - claim_results = yield self.wallet.get_claim_by_outpoint(txid, int(nout)) + claim_results = yield self.session.wallet.get_claim_by_outpoint(txid, int(nout)) else: raise Exception("Must specify either txid/nout, or claim_id") response = yield self._render_response(claim_results) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_resolve(self, force=False, uri=None, uris=[]): """ @@ -1355,14 +1613,13 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[u] = {"error": "%s is not a valid uri" % u} - resolved = yield self.wallet.resolve(*valid_uris, check_cache=not force) + resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=not force) for resolved_uri in resolved: results[resolved_uri] = resolved[resolved_uri] response = yield self._render_response(results) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet", "fileManager", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_get(self, uri, file_name=None, timeout=None): """ @@ -1414,7 +1671,7 @@ class Daemon(AuthJSONRPCServer): if parsed_uri.is_channel and not parsed_uri.path: raise Exception("cannot download a channel claim, specify a /path") - resolved_result = yield self.wallet.resolve(uri) + resolved_result = yield self.session.wallet.resolve(uri) if resolved_result and uri in resolved_result: resolved = resolved_result[uri] else: @@ -1451,7 +1708,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_set_status(self, status, **kwargs): """ @@ -1482,7 +1738,7 @@ class Daemon(AuthJSONRPCServer): raise Exception('Unable to find a file for {}:{}'.format(search_type, value)) if status == 'start' and lbry_file.stopped or status == 'stop' and not lbry_file.stopped: - yield self.file_manager.toggle_lbry_file_running(lbry_file) + yield self.lbry_file_manager.toggle_lbry_file_running(lbry_file) msg = "Started downloading file" if status == 'start' else "Stopped downloading file" else: msg = ( @@ -1492,7 +1748,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(msg) defer.returnValue(response) - @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): """ @@ -1545,15 +1800,14 @@ class Daemon(AuthJSONRPCServer): file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash if lbry_file.sd_hash in self.streams: del self.streams[lbry_file.sd_hash] - yield self.file_manager.delete_lbry_file(lbry_file, - delete_file=delete_from_download_dir) + yield self.lbry_file_manager.delete_lbry_file(lbry_file, + delete_file=delete_from_download_dir) log.info("Deleted file: %s", file_name) result = True response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_stream_cost_estimate(self, uri, size=None): """ @@ -1574,7 +1828,6 @@ class Daemon(AuthJSONRPCServer): cost = yield self.get_est_cost(uri, size) defer.returnValue(cost) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_channel_new(self, channel_name, amount): """ @@ -1610,26 +1863,25 @@ class Daemon(AuthJSONRPCServer): if amount <= 0: raise Exception("Invalid amount") - yield self.wallet.update_balance() - if amount >= self.wallet.get_balance(): - balance = yield self.wallet.get_max_usable_balance_for_claim(channel_name) + yield self.session.wallet.update_balance() + if amount >= self.session.wallet.get_balance(): + balance = yield self.session.wallet.get_max_usable_balance_for_claim(channel_name) max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE if balance <= MAX_UPDATE_FEE_ESTIMATE: raise InsufficientFundsError( "Insufficient funds, please deposit additional LBC. Minimum additional LBC needed {}" - .format(MAX_UPDATE_FEE_ESTIMATE - balance)) + . format(MAX_UPDATE_FEE_ESTIMATE - balance)) elif amount > max_bid_amount: raise InsufficientFundsError( "Please lower the bid value, the maximum amount you can specify for this channel is {}" - .format(max_bid_amount)) + .format(max_bid_amount)) - result = yield self.wallet.claim_new_channel(channel_name, amount) + result = yield self.session.wallet.claim_new_channel(channel_name, amount) self.analytics_manager.send_new_channel() log.info("Claimed a new channel! Result: %s", result) response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_channel_list(self): """ @@ -1646,11 +1898,10 @@ class Daemon(AuthJSONRPCServer): is in the wallet. """ - result = yield self.wallet.channel_list() + result = yield self.session.wallet.channel_list() response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet") @AuthJSONRPCServer.deprecated("channel_list") def jsonrpc_channel_list_mine(self): """ @@ -1668,7 +1919,6 @@ class Daemon(AuthJSONRPCServer): return self.jsonrpc_channel_list() - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_channel_export(self, claim_id): """ @@ -1684,10 +1934,9 @@ class Daemon(AuthJSONRPCServer): (str) Serialized certificate information """ - result = yield self.wallet.export_certificate_info(claim_id) + result = yield self.session.wallet.export_certificate_info(claim_id) defer.returnValue(result) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_channel_import(self, serialized_certificate_info): """ @@ -1703,10 +1952,9 @@ class Daemon(AuthJSONRPCServer): (dict) Result dictionary """ - result = yield self.wallet.import_certificate_info(serialized_certificate_info) + result = yield self.session.wallet.import_certificate_info(serialized_certificate_info) defer.returnValue(result) - @AuthJSONRPCServer.requires("wallet", "fileManager", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None, description=None, author=None, language=None, license=None, @@ -1798,9 +2046,9 @@ class Daemon(AuthJSONRPCServer): if bid <= 0.0: raise ValueError("Bid value must be greater than 0.0") - yield self.wallet.update_balance() - if bid >= self.wallet.get_balance(): - balance = yield self.wallet.get_max_usable_balance_for_claim(name) + yield self.session.wallet.update_balance() + if bid >= self.session.wallet.get_balance(): + balance = yield self.session.wallet.get_max_usable_balance_for_claim(name) max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE if balance <= MAX_UPDATE_FEE_ESTIMATE: raise InsufficientFundsError( @@ -1847,7 +2095,7 @@ class Daemon(AuthJSONRPCServer): log.warning("Stripping empty fee from published metadata") del metadata['fee'] elif 'address' not in metadata['fee']: - address = yield self.wallet.get_least_used_address() + address = yield self.session.wallet.get_least_used_address() metadata['fee']['address'] = address if 'fee' in metadata and 'version' not in metadata['fee']: metadata['fee']['version'] = '_0_0_1' @@ -1903,7 +2151,7 @@ class Daemon(AuthJSONRPCServer): certificate_id = channel_id elif channel_name: certificate_id = None - my_certificates = yield self.wallet.channel_list() + my_certificates = yield self.session.wallet.channel_list() for certificate in my_certificates: if channel_name == certificate['name']: certificate_id = certificate['claim_id'] @@ -1918,7 +2166,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_abandon(self, claim_id=None, txid=None, nout=None): """ @@ -1947,11 +2194,10 @@ class Daemon(AuthJSONRPCServer): if nout is None and txid is not None: raise Exception('Must specify nout') - result = yield self.wallet.abandon_claim(claim_id, txid, nout) + result = yield self.session.wallet.abandon_claim(claim_id, txid, nout) self.analytics_manager.send_claim_action('abandon') defer.returnValue(result) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_new_support(self, name, claim_id, amount): """ @@ -1975,11 +2221,10 @@ class Daemon(AuthJSONRPCServer): } """ - result = yield self.wallet.support_claim(name, claim_id, amount) + result = yield self.session.wallet.support_claim(name, claim_id, amount) self.analytics_manager.send_claim_action('new_support') defer.returnValue(result) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_renew(self, outpoint=None, height=None): """ @@ -2015,14 +2260,13 @@ class Daemon(AuthJSONRPCServer): nout = int(nout) else: raise Exception("invalid outpoint") - result = yield self.wallet.claim_renew(txid, nout) + result = yield self.session.wallet.claim_renew(txid, nout) result = {outpoint: result} else: height = int(height) - result = yield self.wallet.claim_renew_all_before_expiration(height) + result = yield self.session.wallet.claim_renew_all_before_expiration(height) defer.returnValue(result) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_send_to_address(self, claim_id, address, amount=None): """ @@ -2050,12 +2294,11 @@ class Daemon(AuthJSONRPCServer): } """ - result = yield self.wallet.send_claim_to_address(claim_id, address, amount) + result = yield self.session.wallet.send_claim_to_address(claim_id, address, amount) response = yield self._render_response(result) defer.returnValue(response) # TODO: claim_list_mine should be merged into claim_list, but idk how to authenticate it -Grin - @AuthJSONRPCServer.requires("wallet") def jsonrpc_claim_list_mine(self): """ List my name claims @@ -2089,11 +2332,10 @@ class Daemon(AuthJSONRPCServer): ] """ - d = self.wallet.get_name_claims() + d = self.session.wallet.get_name_claims() d.addCallback(lambda claims: self._render_response(claims)) return d - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_claim_list(self, name): """ @@ -2128,11 +2370,10 @@ class Daemon(AuthJSONRPCServer): } """ - claims = yield self.wallet.get_claims_for_name(name) # type: dict + claims = yield self.session.wallet.get_claims_for_name(name) # type: dict sort_claim_results(claims['claims']) defer.returnValue(claims) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_claim_list_by_channel(self, page=0, page_size=10, uri=None, uris=[]): """ @@ -2206,8 +2447,8 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri} - resolved = yield self.wallet.resolve(*valid_uris, check_cache=False, page=page, - page_size=page_size) + resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=False, page=page, + page_size=page_size) for u in resolved: if 'error' in resolved[u]: results[u] = resolved[u] @@ -2222,7 +2463,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(results) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet") def jsonrpc_transaction_list(self): """ List transactions belonging to wallet @@ -2280,11 +2520,10 @@ class Daemon(AuthJSONRPCServer): """ - d = self.wallet.get_history() + d = self.session.wallet.get_history() d.addCallback(lambda r: self._render_response(r)) return d - @AuthJSONRPCServer.requires("wallet") def jsonrpc_transaction_show(self, txid): """ Get a decoded transaction from a txid @@ -2299,11 +2538,10 @@ class Daemon(AuthJSONRPCServer): (dict) JSON formatted transaction """ - d = self.wallet.get_transaction(txid) + d = self.session.wallet.get_transaction(txid) d.addCallback(lambda r: self._render_response(r)) return d - @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_is_address_mine(self, address): """ Checks if an address is associated with the current wallet. @@ -2318,11 +2556,10 @@ class Daemon(AuthJSONRPCServer): (bool) true, if address is associated with current wallet """ - d = self.wallet.address_is_mine(address) + d = self.session.wallet.address_is_mine(address) d.addCallback(lambda is_mine: self._render_response(is_mine)) return d - @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_public_key(self, address): """ Get public key from wallet address @@ -2338,11 +2575,10 @@ class Daemon(AuthJSONRPCServer): Could contain more than one public key if multisig. """ - d = self.wallet.get_pub_keys(address) + d = self.session.wallet.get_pub_keys(address) d.addCallback(lambda r: self._render_response(r)) return d - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_wallet_list(self): """ @@ -2358,11 +2594,10 @@ class Daemon(AuthJSONRPCServer): List of wallet addresses """ - addresses = yield self.wallet.list_addresses() + addresses = yield self.session.wallet.list_addresses() response = yield self._render_response(addresses) defer.returnValue(response) - @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_new_address(self): """ Generate a new wallet address @@ -2381,12 +2616,11 @@ class Daemon(AuthJSONRPCServer): log.info("Got new wallet address: " + address) return defer.succeed(address) - d = self.wallet.get_new_address() + d = self.session.wallet.get_new_address() d.addCallback(_disp) d.addCallback(lambda address: self._render_response(address)) return d - @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_unused_address(self): """ Return an address containing no balance, will create @@ -2406,12 +2640,11 @@ class Daemon(AuthJSONRPCServer): log.info("Got unused wallet address: " + address) return defer.succeed(address) - d = self.wallet.get_unused_address() + d = self.session.wallet.get_unused_address() d.addCallback(_disp) d.addCallback(lambda address: self._render_response(address)) return d - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @AuthJSONRPCServer.deprecated("wallet_send") @defer.inlineCallbacks def jsonrpc_send_amount_to_address(self, amount, address): @@ -2434,14 +2667,13 @@ class Daemon(AuthJSONRPCServer): elif not amount: raise NullFundsError() - reserved_points = self.wallet.reserve_points(address, amount) + reserved_points = self.session.wallet.reserve_points(address, amount) if reserved_points is None: raise InsufficientFundsError() - yield self.wallet.send_points_to_address(reserved_points, amount) + yield self.session.wallet.send_points_to_address(reserved_points, amount) self.analytics_manager.send_credits_sent() defer.returnValue(True) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_send(self, amount, address=None, claim_id=None): """ @@ -2486,11 +2718,10 @@ class Daemon(AuthJSONRPCServer): result = yield self.jsonrpc_send_amount_to_address(amount, address) else: validate_claim_id(claim_id) - result = yield self.wallet.tip_claim(claim_id, amount) + result = yield self.session.wallet.tip_claim(claim_id, amount) self.analytics_manager.send_claim_action('new_support') defer.returnValue(result) - @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_prefill_addresses(self, num_addresses, amount, no_broadcast=False): """ @@ -2516,12 +2747,11 @@ class Daemon(AuthJSONRPCServer): raise NullFundsError() broadcast = not no_broadcast - tx = yield self.wallet.create_addresses_with_balance( + tx = yield self.session.wallet.create_addresses_with_balance( num_addresses, amount, broadcast=broadcast) tx['broadcast'] = broadcast defer.returnValue(tx) - @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_utxo_list(self): """ @@ -2551,7 +2781,7 @@ class Daemon(AuthJSONRPCServer): ] """ - unspent = yield self.wallet.list_unspent() + unspent = yield self.session.wallet.list_unspent() for i, utxo in enumerate(unspent): utxo['txid'] = utxo.pop('prevout_hash') utxo['nout'] = utxo.pop('prevout_n') @@ -2561,7 +2791,6 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(unspent) - @AuthJSONRPCServer.requires("wallet") def jsonrpc_block_show(self, blockhash=None, height=None): """ Get contents of a block @@ -2578,10 +2807,10 @@ class Daemon(AuthJSONRPCServer): """ if blockhash is not None: - d = self.wallet.get_block(blockhash) + d = self.session.wallet.get_block(blockhash) elif height is not None: - d = self.wallet.get_block_info(height) - d.addCallback(lambda b: self.wallet.get_block(b)) + d = self.session.wallet.get_block_info(height) + d.addCallback(lambda b: self.session.wallet.get_block(b)) else: # TODO: return a useful error message return server.failure @@ -2589,7 +2818,6 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d - @AuthJSONRPCServer.requires("wallet", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): """ @@ -2633,7 +2861,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) - @AuthJSONRPCServer.requires("session") @defer.inlineCallbacks def jsonrpc_blob_delete(self, blob_hash): """ @@ -2653,15 +2880,14 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response("Don't have that blob") defer.returnValue(response) try: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(blob_hash) - yield self.storage.delete_stream(stream_hash) + stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(blob_hash) + yield self.session.storage.delete_stream(stream_hash) except Exception as err: pass yield self.session.blob_manager.delete_blobs([blob_hash]) response = yield self._render_response("Deleted %s" % blob_hash) defer.returnValue(response) - @AuthJSONRPCServer.requires("dht") @defer.inlineCallbacks def jsonrpc_peer_list(self, blob_hash, timeout=None): """ @@ -2681,7 +2907,7 @@ class Daemon(AuthJSONRPCServer): if not utils.is_valid_blobhash(blob_hash): raise Exception("invalid blob hash") - finished_deferred = self.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash)) + finished_deferred = self.session.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash)) def trap_timeout(err): err.trap(defer.TimeoutError) @@ -2700,7 +2926,6 @@ class Daemon(AuthJSONRPCServer): ] defer.returnValue(results) - @AuthJSONRPCServer.requires("database") @defer.inlineCallbacks def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): """ @@ -2737,7 +2962,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(True) defer.returnValue(response) - @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_reflect(self, **kwargs): """ @@ -2773,7 +2997,6 @@ class Daemon(AuthJSONRPCServer): results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) defer.returnValue(results) - @AuthJSONRPCServer.requires("database", "session", "wallet") @defer.inlineCallbacks def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, finished=None, page_size=None, page=None): @@ -2803,14 +3026,14 @@ class Daemon(AuthJSONRPCServer): if uri: metadata = yield self._resolve_name(uri) sd_hash = utils.get_sd_hash(metadata) - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) elif stream_hash: - sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) + sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) elif sd_hash: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) - sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) + stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) + sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) if stream_hash: - crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash) + crypt_blobs = yield self.session.storage.get_blobs_for_stream(stream_hash) blobs = yield defer.gatherResults([ self.session.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None @@ -2837,7 +3060,6 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(blob_hashes_for_return) defer.returnValue(response) - @AuthJSONRPCServer.requires("session") def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None): """ Reflects specified blobs @@ -2856,7 +3078,6 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d - @AuthJSONRPCServer.requires("session") def jsonrpc_blob_reflect_all(self): """ Reflects all saved blobs @@ -2876,7 +3097,6 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d - @AuthJSONRPCServer.requires("dht") @defer.inlineCallbacks def jsonrpc_peer_ping(self, node_id): """ @@ -2894,7 +3114,7 @@ class Daemon(AuthJSONRPCServer): contact = None try: - contact = yield self.dht_node.findContact(node_id.decode('hex')) + contact = yield self.session.dht_node.findContact(node_id.decode('hex')) except TimeoutError: result = {'error': 'timeout finding peer'} defer.returnValue(result) @@ -2906,7 +3126,6 @@ class Daemon(AuthJSONRPCServer): result = {'error': 'ping timeout'} defer.returnValue(result) - @AuthJSONRPCServer.requires("dht") def jsonrpc_routing_table_get(self): """ Get DHT routing information @@ -2937,7 +3156,7 @@ class Daemon(AuthJSONRPCServer): """ result = {} - data_store = self.dht_node._dataStore._dict + data_store = self.session.dht_node._dataStore._dict datastore_len = len(data_store) hosts = {} @@ -2955,8 +3174,8 @@ class Daemon(AuthJSONRPCServer): blob_hashes = [] result['buckets'] = {} - for i in range(len(self.dht_node._routingTable._buckets)): - for contact in self.dht_node._routingTable._buckets[i]._contacts: + for i in range(len(self.session.dht_node._routingTable._buckets)): + for contact in self.session.dht_node._routingTable._buckets[i]._contacts: contacts = result['buckets'].get(i, []) if contact in hosts: blobs = hosts[contact] @@ -2979,11 +3198,9 @@ class Daemon(AuthJSONRPCServer): result['contacts'] = contact_set result['blob_hashes'] = blob_hashes - result['node_id'] = self.dht_node.node_id.encode('hex') + result['node_id'] = self.session.dht_node.node_id.encode('hex') return self._render_response(result) - # the single peer downloader needs wallet access - @AuthJSONRPCServer.requires("dht", "wallet", wallet=lambda wallet: wallet.check_locked) def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None): """ Get blob availability @@ -3008,7 +3225,6 @@ class Daemon(AuthJSONRPCServer): return self._blob_availability(blob_hash, search_timeout, blob_timeout) - @AuthJSONRPCServer.requires("session", "wallet", "dht", wallet=lambda wallet: wallet.check_locked) @AuthJSONRPCServer.deprecated("stream_availability") def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): """ @@ -3029,7 +3245,6 @@ class Daemon(AuthJSONRPCServer): return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout) - @AuthJSONRPCServer.requires("session", "wallet", "dht", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): """ @@ -3082,7 +3297,7 @@ class Daemon(AuthJSONRPCServer): } try: - resolved_result = yield self.wallet.resolve(uri) + resolved_result = yield self.session.wallet.resolve(uri) response['did_resolve'] = True except UnknownNameError: response['error'] = "Failed to resolve name" diff --git a/lbrynet/daemon/DaemonCLI.py b/lbrynet/daemon/DaemonCLI.py index 3cecc7c42..7ec03aa34 100644 --- a/lbrynet/daemon/DaemonCLI.py +++ b/lbrynet/daemon/DaemonCLI.py @@ -7,7 +7,7 @@ from collections import OrderedDict from lbrynet import conf from lbrynet.core import utils from lbrynet.daemon.auth.client import JSONRPCException, LBRYAPIClient, AuthAPIClient -from lbrynet.daemon.Daemon import Daemon +from lbrynet.daemon.Daemon import LOADING_WALLET_CODE, Daemon from lbrynet.core.system_info import get_platform from jsonrpc.common import RPCError from requests.exceptions import ConnectionError @@ -21,13 +21,17 @@ def remove_brackets(key): return key -def set_kwargs(parsed_args): +def set_flag_vals(flag_names, parsed_args): kwargs = OrderedDict() for key, arg in parsed_args.iteritems(): if arg is None: continue - elif key.startswith("--") and remove_brackets(key[2:]) not in kwargs: - k = remove_brackets(key[2:]) + elif key.startswith("--"): + if remove_brackets(key[2:]) not in kwargs: + k = remove_brackets(key[2:]) + elif key in flag_names: + if remove_brackets(flag_names[key]) not in kwargs: + k = remove_brackets(flag_names[key]) elif remove_brackets(key) not in kwargs: k = remove_brackets(key) kwargs[k] = guess_type(arg, k) @@ -75,22 +79,26 @@ def main(): method = new_method fn = Daemon.callable_methods[method] + if hasattr(fn, "_flags"): + flag_names = fn._flags + else: + flag_names = {} parsed = docopt(fn.__doc__, args) - kwargs = set_kwargs(parsed) + kwargs = set_flag_vals(flag_names, parsed) colorama.init() conf.initialize_settings() try: api = LBRYAPIClient.get_client() - api.status() + status = api.status() except (URLError, ConnectionError) as err: if isinstance(err, HTTPError) and err.code == UNAUTHORIZED: api = AuthAPIClient.config() # this can happen if the daemon is using auth with the --http-auth flag # when the config setting is to not use it try: - api.status() + status = api.status() except: print_error("Daemon requires authentication, but none was provided.", suggest_help=False) @@ -100,6 +108,20 @@ def main(): suggest_help=False) return 1 + status_code = status['startup_status']['code'] + + if status_code != "started" and method not in Daemon.allowed_during_startup: + print "Daemon is in the process of starting. Please try again in a bit." + message = status['startup_status']['message'] + if message: + if ( + status['startup_status']['code'] == LOADING_WALLET_CODE + and status['blockchain_status']['blocks_behind'] > 0 + ): + message += '. Blocks left: ' + str(status['blockchain_status']['blocks_behind']) + print " Status: " + message + return 1 + # TODO: check if port is bound. Error if its not try: diff --git a/lbrynet/daemon/__init__.py b/lbrynet/daemon/__init__.py index 8e0f5feca..7461e1c00 100644 --- a/lbrynet/daemon/__init__.py +++ b/lbrynet/daemon/__init__.py @@ -1,3 +1,3 @@ -import Components # register Component classes from lbrynet.daemon.auth.client import LBRYAPIClient + get_client = LBRYAPIClient.get_client diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index 6b117aa70..a0d365a35 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -4,7 +4,6 @@ import json import inspect from decimal import Decimal -from functools import wraps from zope.interface import implements from twisted.web import server, resource from twisted.internet import defer @@ -16,7 +15,6 @@ from traceback import format_exc from lbrynet import conf from lbrynet.core.Error import InvalidAuthenticationToken from lbrynet.core import utils -from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet from lbrynet.daemon.auth.util import APIKey, get_auth_message from lbrynet.daemon.auth.client import LBRY_SECRET from lbrynet.undecorated import undecorated @@ -143,27 +141,6 @@ class AuthorizedBase(object): return f return _deprecated_wrapper - @staticmethod - def requires(*components, **component_conditionals): - def _wrap(fn): - @defer.inlineCallbacks - @wraps(fn) - def _inner(*args, **kwargs): - if component_conditionals: - for component_name, condition in component_conditionals.iteritems(): - if not callable(condition): - raise SyntaxError("The specified condition is invalid/not callable") - if not (yield condition(args[0].component_manager.get_component(component_name))): - raise ComponentStartConditionNotMet( - "Not all conditions required to start component are met") - if args[0].component_manager.all_components_running(*components): - result = yield fn(*args, **kwargs) - defer.returnValue(result) - else: - raise ComponentsNotStarted("Not all required components are set up:", components) - return _inner - return _wrap - class AuthJSONRPCServer(AuthorizedBase): """ @@ -172,6 +149,7 @@ class AuthJSONRPCServer(AuthorizedBase): API methods are named with a leading "jsonrpc_" Attributes: + allowed_during_startup (list): list of api methods that are callable before the server has finished startup sessions (dict): (dict): {: } callable_methods (dict): {: } @@ -438,6 +416,9 @@ class AuthJSONRPCServer(AuthorizedBase): def _verify_method_is_callable(self, function_path): if function_path not in self.callable_methods: raise UnknownAPIMethodError(function_path) + if not self.announced_startup: + if function_path not in self.allowed_during_startup: + raise NotAllowedDuringStartupError(function_path) def _get_jsonrpc_method(self, function_path): if function_path in self.deprecated_methods: diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index eb91214a1..b134b6da2 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -39,7 +39,6 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker log_format = "%(funcName)s(): %(message)s" logging.basicConfig(level=logging.CRITICAL, format=log_format) -TEST_SKIP_STRING_ANDROID = "Test cannot pass on Android because multiprocessing is not supported at the OS level." def require_system(system): def wrapper(fn): @@ -104,15 +103,13 @@ class LbryUploader(object): rate_limiter = RateLimiter() self.sd_identifier = StreamDescriptorIdentifier() self.db_dir, self.blob_dir = mk_db_and_blob_dir() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node=dht_node, is_generous=self.is_generous, external_ip="127.0.0.1") + dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1") self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) if self.ul_rate_limit is not None: self.session.rate_limiter.set_ul_limit(self.ul_rate_limit) @@ -200,7 +197,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, db_dir, blob_dir = mk_db_and_blob_dir() session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd" + str(n), dht_node_port=4446, + node_id="abcd" + str(n), dht_node_port=4446, dht_node_class=FakeNode, peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, @@ -306,16 +303,13 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero db_dir, blob_dir = mk_db_and_blob_dir() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") - session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh", - peer_finder=peer_finder, hash_announcer=hash_announcer, + peer_finder=peer_finder, hash_announcer=hash_announcer, dht_node_class=FakeNode, blob_dir=blob_dir, peer_port=peer_port, dht_node_port=4446, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], - external_ip="127.0.0.1", dht_node=dht_node) + external_ip="127.0.0.1") if slow is True: session.rate_limiter.set_ul_limit(2 ** 11) @@ -484,8 +478,6 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( @@ -494,7 +486,7 @@ class TestTransfer(TestCase): blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node=dht_node, is_generous=self.is_generous, external_ip="127.0.0.1") + dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1") self.lbry_file_manager = EncryptedFileManager( self.session, sd_identifier) @@ -574,16 +566,14 @@ class TestTransfer(TestCase): peer_finder = FakePeerFinder(5553, peer_manager, 2) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, + blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, dht_node_class=FakeNode, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, dht_node=dht_node, + blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) @@ -656,19 +646,17 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() - dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, - node_id="abcd", externalIP="127.0.0.1") downloaders = [] db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, + node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, dht_node_class=FakeNode, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], - external_ip="127.0.0.1", dht_node=dht_node) + external_ip="127.0.0.1") self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) @@ -770,7 +758,7 @@ class TestTransfer(TestCase): sd_identifier = StreamDescriptorIdentifier() db_dir, blob_dir = mk_db_and_blob_dir() - self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, + self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, dht_node_class=FakeNode, node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, @@ -854,10 +842,3 @@ class TestTransfer(TestCase): d.addBoth(stop) return d - - if is_android(): - test_lbry_transfer.skip = TEST_SKIP_STRING_ANDROID - test_last_blob_retrieval.skip = TEST_SKIP_STRING_ANDROID - test_double_download.skip = TEST_SKIP_STRING_ANDROID - test_multiple_uploaders.skip = TEST_SKIP_STRING_ANDROID - diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index 127c5453e..e5a1db1bd 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -53,7 +53,6 @@ class TestReflector(unittest.TestCase): db_dir=self.db_dir, node_id="abcd", peer_finder=peer_finder, - peer_manager=peer_manager, blob_dir=self.blob_dir, peer_port=5553, dht_node_port=4444, @@ -75,7 +74,6 @@ class TestReflector(unittest.TestCase): db_dir=self.server_db_dir, node_id="abcd", peer_finder=peer_finder, - peer_manager=peer_manager, blob_dir=self.server_blob_dir, peer_port=5554, dht_node_port=4443, diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index 7f831b2e4..cda06758b 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -30,7 +30,6 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker class TestStreamify(TestCase): maxDiff = 5000 - def setUp(self): mocks.mock_conf_settings(self) self.session = None @@ -38,12 +37,6 @@ class TestStreamify(TestCase): self.is_generous = True self.db_dir = tempfile.mkdtemp() self.blob_dir = os.path.join(self.db_dir, "blobfiles") - self.dht_node = FakeNode() - self.wallet = FakeWallet() - self.peer_manager = PeerManager() - self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2) - self.rate_limiter = DummyRateLimiter() - self.sd_identifier = StreamDescriptorIdentifier() os.mkdir(self.blob_dir) @defer.inlineCallbacks @@ -61,17 +54,26 @@ class TestStreamify(TestCase): os.remove("test_file") def test_create_stream(self): + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 2) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", peer_finder=self.peer_finder, - blob_dir=self.blob_dir, peer_port=5553, use_upnp=False, rate_limiter=self.rate_limiter, wallet=self.wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1", dht_node=self.dht_node + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=self.blob_dir, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, + is_generous=self.is_generous, external_ip="127.0.0.1", dht_node_class=mocks.Node ) - self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) + self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) d = self.session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) d.addCallback(lambda _: self.lbry_file_manager.setup()) def verify_equal(sd_info): @@ -100,14 +102,22 @@ class TestStreamify(TestCase): return d def test_create_and_combine_stream(self): + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 2) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", peer_finder=self.peer_finder, - blob_dir=self.blob_dir, peer_port=5553, use_upnp=False, rate_limiter=self.rate_limiter, wallet=self.wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1", dht_node=self.dht_node + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=self.blob_dir, peer_port=5553, dht_node_class=mocks.Node, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1" ) - self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) + self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) @defer.inlineCallbacks def create_stream(): @@ -122,7 +132,7 @@ class TestStreamify(TestCase): self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b") d = self.session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: create_stream()) return d diff --git a/lbrynet/tests/mocks.py b/lbrynet/tests/mocks.py index c01e1068b..c8e131362 100644 --- a/lbrynet/tests/mocks.py +++ b/lbrynet/tests/mocks.py @@ -1,6 +1,5 @@ import base64 import io -import mock from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa @@ -11,7 +10,6 @@ from twisted.python.failure import Failure from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import RequestCanceledError from lbrynet.core import BlobAvailability -from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.dht.node import Node as RealNode from lbrynet.daemon import ExchangeRateManager as ERM from lbrynet import conf @@ -65,7 +63,6 @@ class BTCLBCFeed(ERM.MarketFeed): 0.0 ) - class USDBTCFeed(ERM.MarketFeed): def __init__(self): ERM.MarketFeed.__init__( @@ -77,7 +74,6 @@ class USDBTCFeed(ERM.MarketFeed): 0.0 ) - class ExchangeRateManager(ERM.ExchangeRateManager): def __init__(self, market_feeds, rates): self.market_feeds = market_feeds @@ -364,96 +360,6 @@ class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker): pass -# The components below viz. FakeWallet, FakeSession, FakeFileManager are just for testing Component Manager's -# startup and stop -class FakeComponent(object): - depends_on = [] - component_name = None - - def __init__(self, component_manager): - self.component_manager = component_manager - self._running = False - - @property - def running(self): - return self._running - - def start(self): - raise NotImplementedError # Override - - def stop(self): - return defer.succeed(None) - - @property - def component(self): - return self - - @defer.inlineCallbacks - def _setup(self): - result = yield defer.maybeDeferred(self.start) - self._running = True - defer.returnValue(result) - - @defer.inlineCallbacks - def _stop(self): - result = yield defer.maybeDeferred(self.stop) - self._running = False - defer.returnValue(result) - - -class FakeDelayedWallet(FakeComponent): - component_name = "wallet" - depends_on = [] - - def start(self): - return defer.succeed(True) - - def stop(self): - d = defer.Deferred() - self.component_manager.reactor.callLater(1, d.callback, True) - return d - - -class FakeDelayedSession(FakeComponent): - component_name = "session" - depends_on = [FakeDelayedWallet.component_name] - - def start(self): - d = defer.Deferred() - self.component_manager.reactor.callLater(1, d.callback, True) - return d - - def stop(self): - d = defer.Deferred() - self.component_manager.reactor.callLater(1, d.callback, True) - return d - - -class FakeDelayedFileManager(FakeComponent): - component_name = "file_manager" - depends_on = [FakeDelayedSession.component_name] - - def start(self): - d = defer.Deferred() - self.component_manager.reactor.callLater(1, d.callback, True) - return d - - def stop(self): - return defer.succeed(True) - -class FakeFileManager(FakeComponent): - component_name = "fileManager" - depends_on = [] - - @property - def component(self): - return mock.Mock(spec=EncryptedFileManager) - - def start(self): - return defer.succeed(True) - - def stop(self): - pass create_stream_sd_file = { 'stream_name': '746573745f66696c65', diff --git a/lbrynet/tests/unit/components/__init__.py b/lbrynet/tests/unit/components/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lbrynet/tests/unit/components/test_Component_Manager.py b/lbrynet/tests/unit/components/test_Component_Manager.py deleted file mode 100644 index f31fb015c..000000000 --- a/lbrynet/tests/unit/components/test_Component_Manager.py +++ /dev/null @@ -1,131 +0,0 @@ -from twisted.internet.task import Clock -from twisted.trial import unittest - -from lbrynet.daemon.ComponentManager import ComponentManager -from lbrynet.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, STREAM_IDENTIFIER_COMPONENT -from lbrynet.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT -from lbrynet.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT -from lbrynet.daemon import Components -from lbrynet.tests import mocks - - -class TestComponentManager(unittest.TestCase): - def setUp(self): - mocks.mock_conf_settings(self) - self.default_components_sort = [ - [Components.DatabaseComponent, - Components.UPnPComponent], - [Components.DHTComponent, - Components.WalletComponent], - [Components.HashAnnouncer], - [Components.SessionComponent], - [Components.PeerProtocolServer, - Components.StreamIdentifier], - [Components.FileManager], - [Components.ReflectorComponent] - ] - self.component_manager = ComponentManager() - - def tearDown(self): - pass - - def test_sort_components(self): - stages = self.component_manager.sort_components() - - for stage_list, sorted_stage_list in zip(stages, self.default_components_sort): - self.assertSetEqual(set([type(stage) for stage in stage_list]), set(sorted_stage_list)) - - def test_sort_components_reverse(self): - rev_stages = self.component_manager.sort_components(reverse=True) - reverse_default_components_sort = reversed(self.default_components_sort) - - for stage_list, sorted_stage_list in zip(rev_stages, reverse_default_components_sort): - self.assertSetEqual(set([type(stage) for stage in stage_list]), set(sorted_stage_list)) - - def test_get_component_not_exists(self): - - with self.assertRaises(NameError): - self.component_manager.get_component("random_component") - - -class TestComponentManagerOverrides(unittest.TestCase): - def setUp(self): - mocks.mock_conf_settings(self) - - def test_init_with_overrides(self): - class FakeWallet(object): - component_name = "wallet" - depends_on = [] - - def __init__(self, component_manager): - self.component_manager = component_manager - - @property - def component(self): - return self - - new_component_manager = ComponentManager(wallet=FakeWallet) - fake_wallet = new_component_manager.get_component("wallet") - # wallet should be an instance of FakeWallet and not WalletComponent from Components.py - self.assertEquals(type(fake_wallet), FakeWallet) - self.assertNotEquals(type(fake_wallet), Components.WalletComponent) - - def test_init_with_wrong_overrides(self): - class FakeRandomComponent(object): - component_name = "someComponent" - depends_on = [] - - with self.assertRaises(SyntaxError): - ComponentManager(randomComponent=FakeRandomComponent) - - -class TestComponentManagerProperStart(unittest.TestCase): - def setUp(self): - self.reactor = Clock() - mocks.mock_conf_settings(self) - self.component_manager = ComponentManager( - skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, - PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT], - reactor=self.reactor, - wallet=mocks.FakeDelayedWallet, - session=mocks.FakeDelayedSession, - fileManager=mocks.FakeDelayedFileManager - ) - - def tearDown(self): - pass - - def test_proper_starting_of_components(self): - self.component_manager.setup() - self.assertTrue(self.component_manager.get_component('wallet').running) - self.assertFalse(self.component_manager.get_component('session').running) - self.assertFalse(self.component_manager.get_component('file_manager').running) - - self.reactor.advance(1) - self.assertTrue(self.component_manager.get_component('wallet').running) - self.assertTrue(self.component_manager.get_component('session').running) - self.assertFalse(self.component_manager.get_component('file_manager').running) - - self.reactor.advance(1) - self.assertTrue(self.component_manager.get_component('wallet').running) - self.assertTrue(self.component_manager.get_component('session').running) - self.assertTrue(self.component_manager.get_component('file_manager').running) - - def test_proper_stopping_of_components(self): - self.component_manager.setup() - self.reactor.advance(1) - self.reactor.advance(1) - self.component_manager.stop() - self.assertFalse(self.component_manager.get_component('file_manager').running) - self.assertTrue(self.component_manager.get_component('session').running) - self.assertTrue(self.component_manager.get_component('wallet').running) - - self.reactor.advance(1) - self.assertFalse(self.component_manager.get_component('file_manager').running) - self.assertFalse(self.component_manager.get_component('session').running) - self.assertTrue(self.component_manager.get_component('wallet').running) - - self.reactor.advance(1) - self.assertFalse(self.component_manager.get_component('file_manager').running) - self.assertFalse(self.component_manager.get_component('session').running) - self.assertFalse(self.component_manager.get_component('wallet').running) diff --git a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py index 1ef72c1a1..d47c36ba2 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py @@ -1,10 +1,11 @@ import mock import json +import unittest import random from os import path from twisted.internet import defer -from twisted.trial import unittest +from twisted import trial from faker import Faker @@ -13,15 +14,12 @@ from lbryum.wallet import NewWallet from lbrynet import conf from lbrynet.core import Session, PaymentRateManager, Wallet from lbrynet.database.storage import SQLiteStorage -from lbrynet.daemon.ComponentManager import ComponentManager -from lbrynet.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, STREAM_IDENTIFIER_COMPONENT -from lbrynet.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, SESSION_COMPONENT -from lbrynet.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT from lbrynet.daemon.Daemon import Daemon as LBRYDaemon +from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.tests import util -from lbrynet.tests.mocks import mock_conf_settings, FakeNetwork, FakeFileManager +from lbrynet.tests.mocks import mock_conf_settings, FakeNetwork from lbrynet.tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed @@ -42,10 +40,10 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False): } daemon = LBRYDaemon(None) daemon.session = mock.Mock(spec=Session.Session) - daemon.wallet = mock.Mock(spec=Wallet.LBRYumWallet) - daemon.wallet.wallet = mock.Mock(spec=NewWallet) - daemon.wallet.wallet.use_encryption = False - daemon.wallet.network = FakeNetwork() + daemon.session.wallet = mock.Mock(spec=Wallet.LBRYumWallet) + daemon.session.wallet.wallet = mock.Mock(spec=NewWallet) + daemon.session.wallet.wallet.use_encryption = False + daemon.session.wallet.network = FakeNetwork() daemon.session.storage = mock.Mock(spec=SQLiteStorage) market_feeds = [BTCLBCFeed(), USDBTCFeed()] daemon.exchange_rate_manager = DummyExchangeRateManager(market_feeds, rates) @@ -75,12 +73,12 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False): {"fee": {"USD": {"address": "bQ6BGboPV2SpTMEP7wLNiAcnsZiH8ye6eA", "amount": 0.75}}}) daemon._resolve_name = lambda _: defer.succeed(metadata) migrated = smart_decode(json.dumps(metadata)) - daemon.wallet.resolve = lambda *_: defer.succeed( + daemon.session.wallet.resolve = lambda *_: defer.succeed( {"test": {'claim': {'value': migrated.claim_dict}}}) return daemon -class TestCostEst(unittest.TestCase): +class TestCostEst(trial.unittest.TestCase): def setUp(self): mock_conf_settings(self) util.resetTime(self) @@ -113,8 +111,7 @@ class TestCostEst(unittest.TestCase): self.assertEquals(daemon.get_est_cost("test", size).result, correct_result) -class TestJsonRpc(unittest.TestCase): - +class TestJsonRpc(trial.unittest.TestCase): def setUp(self): def noop(): return None @@ -122,39 +119,30 @@ class TestJsonRpc(unittest.TestCase): mock_conf_settings(self) util.resetTime(self) self.test_daemon = get_test_daemon() - self.test_daemon.wallet.is_first_run = False - self.test_daemon.wallet.get_best_blockhash = noop + self.test_daemon.session.wallet.is_first_run = False + self.test_daemon.session.wallet.get_best_blockhash = noop def test_status(self): d = defer.maybeDeferred(self.test_daemon.jsonrpc_status) d.addCallback(lambda status: self.assertDictContainsSubset({'is_running': False}, status)) + @unittest.skipIf(is_android(), + 'Test cannot pass on Android because PYTHONOPTIMIZE removes the docstrings.') def test_help(self): d = defer.maybeDeferred(self.test_daemon.jsonrpc_help, command='status') d.addCallback(lambda result: self.assertSubstring('daemon status', result['help'])) # self.assertSubstring('daemon status', d.result) - if is_android(): - test_help.skip = "Test cannot pass on Android because PYTHONOPTIMIZE removes the docstrings." - -class TestFileListSorting(unittest.TestCase): +class TestFileListSorting(trial.unittest.TestCase): def setUp(self): mock_conf_settings(self) util.resetTime(self) self.faker = Faker('en_US') self.faker.seed(66410) self.test_daemon = get_test_daemon() - component_manager = ComponentManager( - skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, SESSION_COMPONENT, UPNP_COMPONENT, - PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, HASH_ANNOUNCER_COMPONENT, - STREAM_IDENTIFIER_COMPONENT], - fileManager=FakeFileManager - ) - component_manager.setup() - self.test_daemon.component_manager = component_manager - self.test_daemon.file_manager = component_manager.get_component("fileManager") - self.test_daemon.file_manager.lbry_files = self._get_fake_lbry_files() + self.test_daemon.lbry_file_manager = mock.Mock(spec=EncryptedFileManager) + self.test_daemon.lbry_file_manager.lbry_files = self._get_fake_lbry_files() # Pre-sorted lists of prices and file names in ascending order produced by # faker with seed 66410. This seed was chosen becacuse it produces 3 results