diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a12b9b73..9bc9bb07e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,14 +23,19 @@ at anytime. ### Changed * * + * `publish` to accept bid as a decimal string + * all JSONRPC API commands are now registered asynchronously and are available to be called as soon as they are ready ### Added - * + * Component Manager for managing the dependencies, startup and stopping of components + * `requires` decorator to register the components required by a `jsonrpc_` command, to facilitate commands registering asynchronously + * added unittests for Component Manager * ### Removed - * - * + * `STARTUP_STAGES` from `status` API and CLI call, it instead returns a dictionary of components along with their running status(this is a **potentially breaking change** if `STARTUP_STAGES` is relied upon) + * all component startup code from `Daemon.py` + * wallet, upnp and dht startup code from `session.py`, the code now resides in `Components.py` ## [0.20.3] - 2018-07-03 diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 14fa45b53..6e3bc88f8 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -168,9 +168,11 @@ def server_port(server_and_port): def server_list(servers): return [server_port(server) for server in servers] + def server_list_reverse(servers): return ["%s:%s" % (server, port) for server, port in servers] + class Env(envparse.Env): """An Env parser that automatically namespaces the variables with LBRY""" @@ -299,7 +301,8 @@ ADJUSTABLE_SETTINGS = { 'blockchain_name': (str, 'lbrycrd_main'), 'lbryum_servers': (list, [('lbryumx1.lbry.io', 50001), ('lbryumx2.lbry.io', 50001)], server_list, server_list_reverse), - 's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind + 's3_headers_depth': (int, 96 * 10), # download headers from s3 when the local height is more than 10 chunks behind + 'components_to_skip': (list, []) # components which will be skipped during start-up of daemon } diff --git a/lbrynet/core/Error.py b/lbrynet/core/Error.py index 729ceab76..68a6df78e 100644 --- a/lbrynet/core/Error.py +++ b/lbrynet/core/Error.py @@ -155,13 +155,23 @@ class InvalidAuthenticationToken(Exception): class NegotiationError(Exception): pass + class InvalidCurrencyError(Exception): def __init__(self, currency): self.currency = currency Exception.__init__( self, 'Invalid currency: {} is not a supported currency.'.format(currency)) + class NoSuchDirectoryError(Exception): def __init__(self, directory): self.directory = directory Exception.__init__(self, 'No such directory {}'.format(directory)) + + +class ComponentStartConditionNotMet(Exception): + pass + + +class ComponentsNotStarted(Exception): + pass diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index d3a1febbc..2f5b4b39c 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -1,11 +1,8 @@ import logging -import miniupnpc -from twisted.internet import threads, defer +from twisted.internet import defer from lbrynet.core.BlobManager import DiskBlobManager -from lbrynet.dht import node, hashannouncer from lbrynet.database.storage import SQLiteStorage from lbrynet.core.RateLimiter import RateLimiter -from lbrynet.core.utils import generate_id from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager log = logging.getLogger(__name__) @@ -32,11 +29,11 @@ class Session(object): peers can connect to this peer. """ - def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None, + def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None, known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None, - peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node, - blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None, - storage=None): + peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, blob_tracker_class=None, + payment_rate_manager_class=None, is_generous=True, external_ip=None, storage=None, + dht_node=None, peer_manager=None): """@param blob_data_payment_rate: The default payment rate for blob data @param db_dir: The directory in which levelDB files should be stored @@ -111,8 +108,7 @@ class Session(object): self.external_ip = external_ip self.upnp_redirects = [] self.wallet = wallet - self.dht_node_class = dht_node_class - self.dht_node = None + self.dht_node = dht_node self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.payment_rate_manager = OnlyFreePaymentsManager() # self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager @@ -124,15 +120,14 @@ class Session(object): log.debug("Starting session.") - if self.node_id is None: - self.node_id = generate_id() + if self.dht_node is not None: + if self.peer_manager is None: + self.peer_manager = self.dht_node.peer_manager - if self.use_upnp is True: - d = self._try_upnp() - else: - d = defer.succeed(True) - d.addCallback(lambda _: self.storage.setup()) - d.addCallback(lambda _: self._setup_dht()) + if self.peer_finder is None: + self.peer_finder = self.dht_node.peer_finder + + d = self.storage.setup() d.addCallback(lambda _: self._setup_other_components()) return d @@ -140,97 +135,16 @@ class Session(object): """Stop all services""" log.info('Stopping session.') ds = [] - if self.hash_announcer: - self.hash_announcer.stop() # if self.blob_tracker is not None: # ds.append(defer.maybeDeferred(self.blob_tracker.stop)) - if self.dht_node is not None: - ds.append(defer.maybeDeferred(self.dht_node.stop)) if self.rate_limiter is not None: ds.append(defer.maybeDeferred(self.rate_limiter.stop)) - if self.wallet is not None: - ds.append(defer.maybeDeferred(self.wallet.stop)) if self.blob_manager is not None: ds.append(defer.maybeDeferred(self.blob_manager.stop)) - if self.use_upnp is True: - ds.append(defer.maybeDeferred(self._unset_upnp)) + # if self.use_upnp is True: + # ds.append(defer.maybeDeferred(self._unset_upnp)) return defer.DeferredList(ds) - def _try_upnp(self): - - log.debug("In _try_upnp") - - def get_free_port(upnp, port, protocol): - # returns an existing mapping if it exists - mapping = upnp.getspecificportmapping(port, protocol) - if not mapping: - return port - if upnp.lanaddr == mapping[0]: - return mapping[1] - return get_free_port(upnp, port + 1, protocol) - - def get_port_mapping(upnp, port, protocol, description): - # try to map to the requested port, if there is already a mapping use the next external - # port available - if protocol not in ['UDP', 'TCP']: - raise Exception("invalid protocol") - port = get_free_port(upnp, port, protocol) - if isinstance(port, tuple): - log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it", - self.external_ip, port, protocol, upnp.lanaddr, port) - return port - upnp.addportmapping(port, protocol, upnp.lanaddr, port, - description, '') - log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port, - protocol, upnp.lanaddr, port) - return port - - def threaded_try_upnp(): - if self.use_upnp is False: - log.debug("Not using upnp") - return False - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - external_ip = u.externalipaddress() - if external_ip != '0.0.0.0' and not self.external_ip: - # best not to rely on this external ip, the router can be behind layers of NATs - self.external_ip = external_ip - if self.peer_port: - self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port') - self.upnp_redirects.append((self.peer_port, 'TCP')) - if self.dht_node_port: - self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port') - self.upnp_redirects.append((self.dht_node_port, 'UDP')) - return True - return False - - def upnp_failed(err): - log.warning("UPnP failed. Reason: %s", err.getErrorMessage()) - return False - - d = threads.deferToThread(threaded_try_upnp) - d.addErrback(upnp_failed) - return d - - def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary - self.dht_node = self.dht_node_class( - node_id=self.node_id, - udpPort=self.dht_node_port, - externalIP=self.external_ip, - peerPort=self.peer_port, - peer_manager=self.peer_manager, - peer_finder=self.peer_finder, - ) - if not self.hash_announcer: - self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage) - self.peer_manager = self.dht_node.peer_manager - self.peer_finder = self.dht_node.peer_finder - d = self.dht_node.start(self.known_dht_nodes) - d.addCallback(lambda _: log.info("Joined the dht")) - d.addCallback(lambda _: self.hash_announcer.start()) - def _setup_other_components(self): log.debug("Setting up the rest of the components") @@ -255,28 +169,4 @@ class Session(object): self.rate_limiter.start() d = self.blob_manager.setup() - d.addCallback(lambda _: self.wallet.start()) - # d.addCallback(lambda _: self.blob_tracker.start()) - return d - - def _unset_upnp(self): - log.info("Unsetting upnp for session") - - def threaded_unset_upnp(): - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - for port, protocol in self.upnp_redirects: - if u.getspecificportmapping(port, protocol) is None: - log.warning( - "UPnP redirect for %s %d was removed by something else.", - protocol, port) - else: - u.deleteportmapping(port, protocol) - log.info("Removed UPnP redirect for %s %d.", protocol, port) - self.upnp_redirects = [] - - d = threads.deferToThread(threaded_unset_upnp) - d.addErrback(lambda err: str(err)) return d diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 0b71ed59d..3052fdce8 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -938,9 +938,7 @@ class LBRYumWallet(Wallet): self._lag_counter = 0 self.blocks_behind = 0 self.catchup_progress = 0 - - # fired when the wallet actually unlocks (wallet_unlocked_d can be called multiple times) - self.wallet_unlock_success = defer.Deferred() + self.is_wallet_unlocked = None def _is_first_run(self): return (not self.printed_retrieving_headers and @@ -953,21 +951,23 @@ class LBRYumWallet(Wallet): return self._cmd_runner def check_locked(self): - if not self.wallet.use_encryption: - log.info("Wallet is not encrypted") - self.wallet_unlock_success.callback(True) - elif not self._cmd_runner: + """ + Checks if the wallet is encrypted(locked) or not + + :return: (boolean) indicating whether the wallet is locked or not + """ + if not self._cmd_runner: raise Exception("Command runner hasn't been initialized yet") elif self._cmd_runner.locked: log.info("Waiting for wallet password") self.wallet_unlocked_d.addCallback(self.unlock) - return self.wallet_unlock_success + return self.is_wallet_unlocked def unlock(self, password): if self._cmd_runner and self._cmd_runner.locked: try: self._cmd_runner.unlock_wallet(password) - self.wallet_unlock_success.callback(True) + self.is_wallet_unlocked = True log.info("Unlocked the wallet!") except InvalidPassword: log.warning("Incorrect password, try again") @@ -1054,6 +1054,7 @@ class LBRYumWallet(Wallet): wallet.create_main_account() wallet.synchronize() self.wallet = wallet + self.is_wallet_unlocked = not self.wallet.use_encryption self._check_large_wallet() return defer.succeed(True) diff --git a/lbrynet/daemon/Component.py b/lbrynet/daemon/Component.py new file mode 100644 index 000000000..e06e64482 --- /dev/null +++ b/lbrynet/daemon/Component.py @@ -0,0 +1,63 @@ +import logging +from twisted.internet import defer +from ComponentManager import ComponentManager + +log = logging.getLogger(__name__) + + +class ComponentType(type): + def __new__(mcs, name, bases, newattrs): + klass = type.__new__(mcs, name, bases, newattrs) + if name != "Component": + ComponentManager.default_component_classes[klass.component_name] = klass + return klass + + +class Component(object): + """ + lbrynet-daemon component helper + + Inheriting classes will be automatically registered with the ComponentManager and must implement setup and stop + methods + """ + + __metaclass__ = ComponentType + depends_on = [] + component_name = None + + def __init__(self, component_manager): + self.component_manager = component_manager + self._running = False + + @property + def running(self): + return self._running + + def start(self): + raise NotImplementedError() + + def stop(self): + raise NotImplementedError() + + def component(self): + raise NotImplementedError() + + @defer.inlineCallbacks + def _setup(self): + try: + result = yield defer.maybeDeferred(self.start) + self._running = True + defer.returnValue(result) + except Exception as err: + log.exception("Error setting up %s", self.component_name or self.__class__.__name__) + raise err + + @defer.inlineCallbacks + def _stop(self): + try: + result = yield defer.maybeDeferred(self.stop) + self._running = False + defer.returnValue(result) + except Exception as err: + log.exception("Error stopping %s", self.__class__.__name__) + raise err diff --git a/lbrynet/daemon/ComponentManager.py b/lbrynet/daemon/ComponentManager.py new file mode 100644 index 000000000..af5121916 --- /dev/null +++ b/lbrynet/daemon/ComponentManager.py @@ -0,0 +1,139 @@ +import logging +from twisted.internet import defer + +from lbrynet import conf +from lbrynet.core.Error import ComponentStartConditionNotMet + +log = logging.getLogger(__name__) + + +class ComponentManager(object): + default_component_classes = {} + + def __init__(self, reactor=None, analytics_manager=None, skip_components=None, **override_components): + self.skip_components = skip_components or [] + self.skip_components.extend(conf.settings['components_to_skip']) + + self.reactor = reactor + self.component_classes = {} + self.components = set() + self.analytics_manager = analytics_manager + + for component_name, component_class in self.default_component_classes.iteritems(): + if component_name in override_components: + component_class = override_components.pop(component_name) + if component_name not in self.skip_components: + self.component_classes[component_name] = component_class + + if override_components: + raise SyntaxError("unexpected components: %s" % override_components) + + for component_class in self.component_classes.itervalues(): + self.components.add(component_class(self)) + + def sort_components(self, reverse=False): + """ + Sort components by requirements + """ + steps = [] + staged = set() + components = set(self.components) + + # components with no requirements + step = [] + for component in set(components): + if not component.depends_on: + step.append(component) + staged.add(component.component_name) + components.remove(component) + + if step: + steps.append(step) + + while components: + step = [] + to_stage = set() + for component in set(components): + reqs_met = 0 + for needed in component.depends_on: + if needed in staged: + reqs_met += 1 + if reqs_met == len(component.depends_on): + step.append(component) + to_stage.add(component.component_name) + components.remove(component) + if step: + staged.update(to_stage) + steps.append(step) + elif components: + raise ComponentStartConditionNotMet("Unresolved dependencies for: %s" % components) + if reverse: + steps.reverse() + return steps + + @defer.inlineCallbacks + def setup(self, **callbacks): + """ + Start Components in sequence sorted by requirements + + :return: (defer.Deferred) + """ + + for component_name, cb in callbacks.iteritems(): + if component_name not in self.component_classes: + raise NameError("unknown component: %s" % component_name) + if not callable(cb): + raise ValueError("%s is not callable" % cb) + + def _setup(component): + if component.component_name in callbacks: + d = component._setup() + d.addCallback(callbacks[component.component_name]) + return d + return component._setup() + + stages = self.sort_components() + for stage in stages: + yield defer.DeferredList([_setup(component) for component in stage]) + + @defer.inlineCallbacks + def stop(self): + """ + Stop Components in reversed startup order + + :return: (defer.Deferred) + """ + stages = self.sort_components(reverse=True) + for stage in stages: + yield defer.DeferredList([component._stop() for component in stage]) + + def all_components_running(self, *component_names): + """ + Check if components are running + + :return: (bool) True if all specified components are running + """ + components = {component.component_name: component for component in self.components} + for component in component_names: + if component not in components: + raise NameError("%s is not a known Component" % component) + if not components[component].running: + return False + return True + + def get_components_status(self): + """ + List status of all the components, whether they are running or not + + :return: (dict) {(str) component_name: (bool) True is running else False} + """ + return { + component.component_name: component.running + for component in self.components + } + + def get_component(self, component_name): + for component in self.components: + if component.component_name == component_name: + return component.component + raise NameError(component_name) diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py new file mode 100644 index 000000000..7fc17b6cb --- /dev/null +++ b/lbrynet/daemon/Components.py @@ -0,0 +1,523 @@ +import os +import logging +import miniupnpc +from twisted.internet import defer, threads, reactor, error + +from lbrynet import conf +from lbrynet.core.Session import Session +from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType +from lbrynet.core.Wallet import LBRYumWallet +from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory +from lbrynet.core.server.ServerProtocol import ServerProtocolFactory +from lbrynet.daemon.Component import Component +from lbrynet.database.storage import SQLiteStorage +from lbrynet.dht import node, hashannouncer +from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager +from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory +from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier +from lbrynet.reflector import ServerFactory as reflector_server_factory + +from lbrynet.core.utils import generate_id + +log = logging.getLogger(__name__) + +# settings must be initialized before this file is imported + +DATABASE_COMPONENT = "database" +WALLET_COMPONENT = "wallet" +SESSION_COMPONENT = "session" +DHT_COMPONENT = "dht" +HASH_ANNOUNCER_COMPONENT = "hashAnnouncer" +STREAM_IDENTIFIER_COMPONENT = "streamIdentifier" +FILE_MANAGER_COMPONENT = "fileManager" +PEER_PROTOCOL_SERVER_COMPONENT = "peerProtocolServer" +REFLECTOR_COMPONENT = "reflector" +UPNP_COMPONENT = "upnp" + + +class ConfigSettings(object): + @staticmethod + def get_conf_setting(setting_name): + return conf.settings[setting_name] + + @staticmethod + def get_blobfiles_dir(): + if conf.settings['BLOBFILES_DIR'] == "blobfiles": + return os.path.join(GCS("data_dir"), "blobfiles") + else: + log.info("Using non-default blobfiles directory: %s", conf.settings['BLOBFILES_DIR']) + return conf.settings['BLOBFILES_DIR'] + + @staticmethod + def get_node_id(): + return conf.settings.node_id + + @staticmethod + def get_external_ip(): + from lbrynet.core.system_info import get_platform + platform = get_platform(get_ip=True) + return platform['ip'] + + +# Shorthand for common ConfigSettings methods +CS = ConfigSettings +GCS = ConfigSettings.get_conf_setting + + +class DatabaseComponent(Component): + component_name = DATABASE_COMPONENT + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.storage = None + + @property + def component(self): + return self.storage + + @staticmethod + def get_current_db_revision(): + return 9 + + @staticmethod + def get_revision_filename(): + return conf.settings.get_db_revision_filename() + + @staticmethod + def _write_db_revision_file(version_num): + with open(conf.settings.get_db_revision_filename(), mode='w') as db_revision: + db_revision.write(str(version_num)) + + @defer.inlineCallbacks + def start(self): + # check directories exist, create them if they don't + log.info("Loading databases") + + if not os.path.exists(GCS('download_directory')): + os.mkdir(GCS('download_directory')) + + if not os.path.exists(GCS('data_dir')): + os.mkdir(GCS('data_dir')) + self._write_db_revision_file(self.get_current_db_revision()) + log.debug("Created the db revision file: %s", self.get_revision_filename()) + + if not os.path.exists(CS.get_blobfiles_dir()): + os.mkdir(CS.get_blobfiles_dir()) + log.debug("Created the blobfile directory: %s", str(CS.get_blobfiles_dir())) + + if not os.path.exists(self.get_revision_filename()): + log.warning("db_revision file not found. Creating it") + self._write_db_revision_file(self.get_current_db_revision()) + + # check the db migration and run any needed migrations + with open(self.get_revision_filename(), "r") as revision_read_handle: + old_revision = int(revision_read_handle.read().strip()) + + if old_revision > self.get_current_db_revision(): + raise Exception('This version of lbrynet is not compatible with the database\n' + 'Your database is revision %i, expected %i' % + (old_revision, self.get_current_db_revision())) + if old_revision < self.get_current_db_revision(): + from lbrynet.database.migrator import dbmigrator + log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision()) + yield threads.deferToThread( + dbmigrator.migrate_db, GCS('data_dir'), old_revision, self.get_current_db_revision() + ) + self._write_db_revision_file(self.get_current_db_revision()) + log.info("Finished upgrading the databases.") + + # start SQLiteStorage + self.storage = SQLiteStorage(GCS('data_dir')) + yield self.storage.setup() + + @defer.inlineCallbacks + def stop(self): + yield self.storage.stop() + self.storage = None + + +class WalletComponent(Component): + component_name = WALLET_COMPONENT + depends_on = [DATABASE_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.wallet = None + + @property + def component(self): + return self.wallet + + @defer.inlineCallbacks + def start(self): + storage = self.component_manager.get_component(DATABASE_COMPONENT) + wallet_type = GCS('wallet') + + if wallet_type == conf.LBRYCRD_WALLET: + raise ValueError('LBRYcrd Wallet is no longer supported') + elif wallet_type == conf.LBRYUM_WALLET: + + log.info("Using lbryum wallet") + + lbryum_servers = {address: {'t': str(port)} + for address, port in GCS('lbryum_servers')} + + config = { + 'auto_connect': True, + 'chain': GCS('blockchain_name'), + 'default_servers': lbryum_servers + } + + if 'use_keyring' in conf.settings: + config['use_keyring'] = GCS('use_keyring') + if conf.settings['lbryum_wallet_dir']: + config['lbryum_path'] = GCS('lbryum_wallet_dir') + self.wallet = LBRYumWallet(storage, config) + yield self.wallet.start() + else: + raise ValueError('Wallet Type {} is not valid'.format(wallet_type)) + + @defer.inlineCallbacks + def stop(self): + yield self.wallet.stop() + self.wallet = None + + +class SessionComponent(Component): + component_name = SESSION_COMPONENT + depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.session = None + + @property + def component(self): + return self.session + + @defer.inlineCallbacks + def start(self): + self.session = Session( + GCS('data_rate'), + db_dir=GCS('data_dir'), + node_id=CS.get_node_id(), + blob_dir=CS.get_blobfiles_dir(), + dht_node=self.component_manager.get_component(DHT_COMPONENT), + hash_announcer=self.component_manager.get_component(HASH_ANNOUNCER_COMPONENT), + dht_node_port=GCS('dht_node_port'), + known_dht_nodes=GCS('known_dht_nodes'), + peer_port=GCS('peer_port'), + use_upnp=GCS('use_upnp'), + wallet=self.component_manager.get_component(WALLET_COMPONENT), + is_generous=GCS('is_generous_host'), + external_ip=CS.get_external_ip(), + storage=self.component_manager.get_component(DATABASE_COMPONENT) + ) + yield self.session.setup() + + @defer.inlineCallbacks + def stop(self): + yield self.session.shut_down() + + +class DHTComponent(Component): + component_name = DHT_COMPONENT + depends_on = [UPNP_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.dht_node = None + self.upnp_component = None + self.udp_port, self.peer_port = None, None + + @property + def component(self): + return self.dht_node + + @defer.inlineCallbacks + def start(self): + self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) + self.udp_port, self.peer_port = self.upnp_component.get_redirects() + node_id = CS.get_node_id() + if node_id is None: + node_id = generate_id() + + self.dht_node = node.Node( + node_id=node_id, + udpPort=self.udp_port, + externalIP=CS.get_external_ip(), + peerPort=self.peer_port + ) + yield self.dht_node.start(GCS('known_dht_nodes')) + log.info("Joined the dht") + + @defer.inlineCallbacks + def stop(self): + yield self.dht_node.stop() + + +class HashAnnouncer(Component): + component_name = HASH_ANNOUNCER_COMPONENT + depends_on = [DHT_COMPONENT, DATABASE_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.hash_announcer = None + + @property + def component(self): + return self.hash_announcer + + @defer.inlineCallbacks + def start(self): + storage = self.component_manager.get_component(DATABASE_COMPONENT) + dht_node = self.component_manager.get_component(DHT_COMPONENT) + self.hash_announcer = hashannouncer.DHTHashAnnouncer(dht_node, storage) + yield self.hash_announcer.start() + + @defer.inlineCallbacks + def stop(self): + yield self.hash_announcer.stop() + + +class StreamIdentifier(Component): + component_name = STREAM_IDENTIFIER_COMPONENT + depends_on = [SESSION_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.sd_identifier = StreamDescriptorIdentifier() + + @property + def component(self): + return self.sd_identifier + + @defer.inlineCallbacks + def start(self): + session = self.component_manager.get_component(SESSION_COMPONENT) + add_lbry_file_to_sd_identifier(self.sd_identifier) + file_saver_factory = EncryptedFileSaverFactory( + session.peer_finder, + session.rate_limiter, + session.blob_manager, + session.storage, + session.wallet, + GCS('download_directory') + ) + yield self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory) + + def stop(self): + pass + + +class FileManager(Component): + component_name = FILE_MANAGER_COMPONENT + depends_on = [SESSION_COMPONENT, STREAM_IDENTIFIER_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.file_manager = None + + @property + def component(self): + return self.file_manager + + @defer.inlineCallbacks + def start(self): + session = self.component_manager.get_component(SESSION_COMPONENT) + sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT) + log.info('Starting the file manager') + self.file_manager = EncryptedFileManager(session, sd_identifier) + yield self.file_manager.setup() + log.info('Done setting up file manager') + + @defer.inlineCallbacks + def stop(self): + yield self.file_manager.stop() + + +class PeerProtocolServer(Component): + component_name = PEER_PROTOCOL_SERVER_COMPONENT + depends_on = [SESSION_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.lbry_server_port = None + + @property + def component(self): + return self.lbry_server_port + + @defer.inlineCallbacks + def start(self): + query_handlers = {} + peer_port = GCS('peer_port') + session = self.component_manager.get_component(SESSION_COMPONENT) + + handlers = [ + BlobRequestHandlerFactory( + session.blob_manager, + session.wallet, + session.payment_rate_manager, + self.component_manager.analytics_manager + ), + session.wallet.get_wallet_info_query_handler_factory(), + ] + + for handler in handlers: + query_id = handler.get_primary_query_identifier() + query_handlers[query_id] = handler + + if peer_port is not None: + server_factory = ServerProtocolFactory(session.rate_limiter, query_handlers, session.peer_manager) + + try: + log.info("Peer protocol listening on TCP %d", peer_port) + self.lbry_server_port = yield reactor.listenTCP(peer_port, server_factory) + except error.CannotListenError as e: + import traceback + log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" + " more details.", peer_port) + log.error("%s", traceback.format_exc()) + raise ValueError("%s lbrynet may already be running on your computer." % str(e)) + + @defer.inlineCallbacks + def stop(self): + if self.lbry_server_port is not None: + self.lbry_server_port, old_port = None, self.lbry_server_port + log.info('Stop listening on port %s', old_port.port) + yield old_port.stopListening() + + +class ReflectorComponent(Component): + component_name = REFLECTOR_COMPONENT + depends_on = [SESSION_COMPONENT, FILE_MANAGER_COMPONENT] + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.reflector_server_port = GCS('reflector_port') + self.run_reflector_server = GCS('run_reflector_server') + + @property + def component(self): + return self + + @defer.inlineCallbacks + def start(self): + session = self.component_manager.get_component(SESSION_COMPONENT) + file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) + + if self.run_reflector_server and self.reflector_server_port is not None: + log.info("Starting reflector server") + reflector_factory = reflector_server_factory(session.peer_manager, session.blob_manager, file_manager) + try: + self.reflector_server_port = yield reactor.listenTCP(self.reflector_server_port, reflector_factory) + log.info('Started reflector on port %s', self.reflector_server_port) + except error.CannotListenError as e: + log.exception("Couldn't bind reflector to port %d", self.reflector_server_port) + raise ValueError("{} lbrynet may already be running on your computer.".format(e)) + + def stop(self): + if self.run_reflector_server and self.reflector_server_port is not None: + log.info("Stopping reflector server") + if self.reflector_server_port is not None: + self.reflector_server_port, p = None, self.reflector_server_port + yield p.stopListening + + +class UPnPComponent(Component): + component_name = UPNP_COMPONENT + + def __init__(self, component_manager): + Component.__init__(self, component_manager) + self.peer_port = GCS('peer_port') + self.dht_node_port = GCS('dht_node_port') + self.use_upnp = GCS('use_upnp') + self.external_ip = CS.get_external_ip() + self.upnp_redirects = [] + + @property + def component(self): + return self + + def get_redirects(self): + return self.peer_port, self.dht_node_port + + def start(self): + log.debug("In _try_upnp") + + def get_free_port(upnp, port, protocol): + # returns an existing mapping if it exists + mapping = upnp.getspecificportmapping(port, protocol) + if not mapping: + return port + if upnp.lanaddr == mapping[0]: + return mapping[1] + return get_free_port(upnp, port + 1, protocol) + + def get_port_mapping(upnp, port, protocol, description): + # try to map to the requested port, if there is already a mapping use the next external + # port available + if protocol not in ['UDP', 'TCP']: + raise Exception("invalid protocol") + port = get_free_port(upnp, port, protocol) + if isinstance(port, tuple): + log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it", + self.external_ip, port, protocol, upnp.lanaddr, port) + return port + upnp.addportmapping(port, protocol, upnp.lanaddr, port, + description, '') + log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port, + protocol, upnp.lanaddr, port) + return port + + def threaded_try_upnp(): + if self.use_upnp is False: + log.debug("Not using upnp") + return False + u = miniupnpc.UPnP() + num_devices_found = u.discover() + if num_devices_found > 0: + u.selectigd() + external_ip = u.externalipaddress() + if external_ip != '0.0.0.0' and not self.external_ip: + # best not to rely on this external ip, the router can be behind layers of NATs + self.external_ip = external_ip + if self.peer_port: + self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port') + self.upnp_redirects.append((self.peer_port, 'TCP')) + if self.dht_node_port: + self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port') + self.upnp_redirects.append((self.dht_node_port, 'UDP')) + return True + return False + + def upnp_failed(err): + log.warning("UPnP failed. Reason: %s", err.getErrorMessage()) + return False + + d = threads.deferToThread(threaded_try_upnp) + d.addErrback(upnp_failed) + return d + + def stop(self): + log.info("Unsetting upnp for session") + + def threaded_unset_upnp(): + u = miniupnpc.UPnP() + num_devices_found = u.discover() + if num_devices_found > 0: + u.selectigd() + for port, protocol in self.upnp_redirects: + if u.getspecificportmapping(port, protocol) is None: + log.warning( + "UPnP redirect for %s %d was removed by something else.", + protocol, port) + else: + u.deleteportmapping(port, protocol) + log.info("Removed UPnP redirect for %s %d.", protocol, port) + self.upnp_redirects = [] + + d = threads.deferToThread(threaded_unset_upnp) + d.addErrback(lambda err: str(err)) + return d diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 3d1681cc7..78be571fc 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -1,3 +1,4 @@ +# coding=utf-8 import binascii import logging.handlers import mimetypes @@ -8,11 +9,10 @@ import urllib import json import textwrap import signal -import six from copy import deepcopy from decimal import Decimal, InvalidOperation from twisted.web import server -from twisted.internet import defer, threads, error, reactor +from twisted.internet import defer, reactor from twisted.internet.task import LoopingCall from twisted.python.failure import Failure @@ -25,28 +25,20 @@ from lbryschema.decode import smart_decode # TODO: importing this when internet is disabled raises a socket.gaierror from lbrynet.core.system_info import get_lbrynet_version -from lbrynet.database.storage import SQLiteStorage from lbrynet import conf -from lbrynet.conf import LBRYCRD_WALLET, LBRYUM_WALLET from lbrynet.reflector import reupload -from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.core.log_support import configure_loggly_handler -from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory -from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier -from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager +from lbrynet.daemon.Component import ComponentManager +from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT +from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT from lbrynet.daemon.Downloader import GetStream from lbrynet.daemon.Publisher import Publisher from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager from lbrynet.daemon.auth.server import AuthJSONRPCServer from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core import utils, system_info -from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, download_sd_blob -from lbrynet.core.StreamDescriptor import EncryptedFileStreamType -from lbrynet.core.Session import Session -from lbrynet.core.Wallet import LBRYumWallet +from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.looping_call_manager import LoopingCallManager -from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory -from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError @@ -58,23 +50,6 @@ from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloade log = logging.getLogger(__name__) INITIALIZING_CODE = 'initializing' -LOADING_DB_CODE = 'loading_db' -LOADING_WALLET_CODE = 'loading_wallet' -LOADING_FILE_MANAGER_CODE = 'loading_file_manager' -LOADING_SERVER_CODE = 'loading_server' -STARTED_CODE = 'started' -WAITING_FOR_FIRST_RUN_CREDITS = 'waiting_for_credits' -WAITING_FOR_UNLOCK = 'waiting_for_wallet_unlock' -STARTUP_STAGES = [ - (INITIALIZING_CODE, 'Initializing'), - (LOADING_DB_CODE, 'Loading databases'), - (LOADING_WALLET_CODE, 'Catching up with the blockchain'), - (LOADING_FILE_MANAGER_CODE, 'Setting up file manager'), - (LOADING_SERVER_CODE, 'Starting lbrynet'), - (STARTED_CODE, 'Started lbrynet'), - (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits'), - (WAITING_FOR_UNLOCK, 'Waiting for user to unlock the wallet using the wallet_unlock command') -] # TODO: make this consistent with the stages in Downloader.py DOWNLOAD_METADATA_CODE = 'downloading_metadata' @@ -178,40 +153,20 @@ class Daemon(AuthJSONRPCServer): LBRYnet daemon, a jsonrpc interface to lbry functions """ - allowed_during_startup = [ - 'daemon_stop', 'status', 'version', 'wallet_unlock' - ] - - def __init__(self, analytics_manager): + def __init__(self, analytics_manager, component_manager=None): AuthJSONRPCServer.__init__(self, conf.settings['use_auth_http']) - self.db_dir = conf.settings['data_dir'] - self.storage = SQLiteStorage(self.db_dir) self.download_directory = conf.settings['download_directory'] - if conf.settings['BLOBFILES_DIR'] == "blobfiles": - self.blobfile_dir = os.path.join(self.db_dir, "blobfiles") - else: - log.info("Using non-default blobfiles directory: %s", conf.settings['BLOBFILES_DIR']) - self.blobfile_dir = conf.settings['BLOBFILES_DIR'] self.data_rate = conf.settings['data_rate'] self.max_key_fee = conf.settings['max_key_fee'] self.disable_max_key_fee = conf.settings['disable_max_key_fee'] self.download_timeout = conf.settings['download_timeout'] - self.run_reflector_server = conf.settings['run_reflector_server'] - self.wallet_type = conf.settings['wallet'] self.delete_blobs_on_remove = conf.settings['delete_blobs_on_remove'] - self.peer_port = conf.settings['peer_port'] - self.reflector_port = conf.settings['reflector_port'] - self.dht_node_port = conf.settings['dht_node_port'] - self.use_upnp = conf.settings['use_upnp'] self.auto_renew_claim_height_delta = conf.settings['auto_renew_claim_height_delta'] - self.startup_status = STARTUP_STAGES[0] self.connected_to_internet = True self.connection_status_code = None self.platform = None - self.current_db_revision = 9 self.db_revision_file = conf.settings.get_db_revision_filename() - self.session = None self._session_id = conf.settings.get_session_id() # TODO: this should probably be passed into the daemon, or # possibly have the entire log upload functionality taken out @@ -220,9 +175,16 @@ class Daemon(AuthJSONRPCServer): self.analytics_manager = analytics_manager self.node_id = conf.settings.node_id + # components + self.storage = None + self.dht_node = None + self.wallet = None + self.sd_identifier = None + self.session = None + self.file_manager = None + self.wallet_user = None self.wallet_password = None - self.query_handlers = {} self.waiting_on = {} self.streams = {} self.exchange_rate_manager = ExchangeRateManager() @@ -231,8 +193,7 @@ class Daemon(AuthJSONRPCServer): Checker.CONNECTION_STATUS: LoopingCall(self._update_connection_status), } self.looping_call_manager = LoopingCallManager(calls) - self.sd_identifier = StreamDescriptorIdentifier() - self.lbry_file_manager = None + self.component_manager = component_manager or ComponentManager(self.analytics_manager) @defer.inlineCallbacks def setup(self): @@ -246,32 +207,19 @@ class Daemon(AuthJSONRPCServer): self.exchange_rate_manager.start() yield self._initial_setup() - yield threads.deferToThread(self._setup_data_directory) - migrated = yield self._check_db_migration() - yield self.storage.setup() - yield self._get_session() - yield self._check_wallet_locked() + yield self.component_manager.setup() + self.storage = self.component_manager.get_component(DATABASE_COMPONENT) + self.session = self.component_manager.get_component(SESSION_COMPONENT) + self.wallet = self.component_manager.get_component(WALLET_COMPONENT) + self.dht_node = self.component_manager.get_component(DHT_COMPONENT) + # yield self._check_wallet_locked() yield self._start_analytics() - yield add_lbry_file_to_sd_identifier(self.sd_identifier) - yield self._setup_stream_identifier() - yield self._setup_lbry_file_manager() - yield self._setup_query_handlers() - yield self._setup_server() - log.info("Starting balance: " + str(self.session.wallet.get_balance())) + self.sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT) + self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) + log.info("Starting balance: " + str(self.wallet.get_balance())) self.announced_startup = True - self.startup_status = STARTUP_STAGES[5] log.info("Started lbrynet-daemon") - ### - # this should be removed with the next db revision - if migrated: - missing_channel_claim_ids = yield self.storage.get_unknown_certificate_ids() - while missing_channel_claim_ids: # in case there are a crazy amount lets batch to be safe - batch = missing_channel_claim_ids[:100] - _ = yield self.session.wallet.get_claims_by_ids(*batch) - missing_channel_claim_ids = missing_channel_claim_ids[100:] - ### - self._auto_renew() def _get_platform(self): @@ -302,12 +250,12 @@ class Daemon(AuthJSONRPCServer): # auto renew is turned off if 0 or some negative number if self.auto_renew_claim_height_delta < 1: defer.returnValue(None) - if not self.session.wallet.network.get_remote_height(): + if not self.wallet.network.get_remote_height(): log.warning("Failed to get remote height, aborting auto renew") defer.returnValue(None) log.debug("Renewing claim") - h = self.session.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta - results = yield self.session.wallet.claim_renew_all_before_expiration(h) + h = self.wallet.network.get_remote_height() + self.auto_renew_claim_height_delta + results = yield self.wallet.claim_renew_all_before_expiration(h) for outpoint, result in results.iteritems(): if result['success']: log.info("Renewed claim at outpoint:%s claim ID:%s, paid fee:%s", @@ -316,93 +264,6 @@ class Daemon(AuthJSONRPCServer): log.info("Failed to renew claim at outpoint:%s, reason:%s", outpoint, result['reason']) - def _start_server(self): - if self.peer_port is not None: - server_factory = ServerProtocolFactory(self.session.rate_limiter, - self.query_handlers, - self.session.peer_manager) - - try: - log.info("Peer protocol listening on TCP %d", self.peer_port) - self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory) - except error.CannotListenError as e: - import traceback - log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" - " more details.", self.peer_port) - log.error("%s", traceback.format_exc()) - raise ValueError("%s lbrynet may already be running on your computer." % str(e)) - return defer.succeed(True) - - def _start_reflector(self): - if self.run_reflector_server: - log.info("Starting reflector server") - if self.reflector_port is not None: - reflector_factory = reflector_server_factory( - self.session.peer_manager, - self.session.blob_manager, - self.lbry_file_manager - ) - try: - self.reflector_server_port = reactor.listenTCP(self.reflector_port, - reflector_factory) - log.info('Started reflector on port %s', self.reflector_port) - except error.CannotListenError as e: - log.exception("Couldn't bind reflector to port %d", self.reflector_port) - raise ValueError( - "{} lbrynet may already be running on your computer.".format(e)) - return defer.succeed(True) - - def _stop_reflector(self): - if self.run_reflector_server: - log.info("Stopping reflector server") - try: - if self.reflector_server_port is not None: - self.reflector_server_port, p = None, self.reflector_server_port - return defer.maybeDeferred(p.stopListening) - except AttributeError: - return defer.succeed(True) - return defer.succeed(True) - - def _stop_file_manager(self): - if self.lbry_file_manager: - self.lbry_file_manager.stop() - return defer.succeed(True) - - def _stop_server(self): - try: - if self.lbry_server_port is not None: - self.lbry_server_port, old_port = None, self.lbry_server_port - log.info('Stop listening on port %s', old_port.port) - return defer.maybeDeferred(old_port.stopListening) - else: - return defer.succeed(True) - except AttributeError: - return defer.succeed(True) - - def _setup_server(self): - self.startup_status = STARTUP_STAGES[4] - d = self._start_server() - d.addCallback(lambda _: self._start_reflector()) - return d - - def _setup_query_handlers(self): - handlers = [ - BlobRequestHandlerFactory( - self.session.blob_manager, - self.session.wallet, - self.session.payment_rate_manager, - self.analytics_manager - ), - self.session.wallet.get_wallet_info_query_handler_factory(), - ] - return self._add_query_handlers(handlers) - - def _add_query_handlers(self, query_handlers): - for handler in query_handlers: - query_id = handler.get_primary_query_identifier() - self.query_handlers[query_id] = handler - return defer.succeed(None) - @staticmethod def _already_shutting_down(sig_num, frame): log.info("Already shutting down") @@ -418,190 +279,26 @@ class Daemon(AuthJSONRPCServer): signal.signal(signal.SIGTERM, self._already_shutting_down) log.info("Closing lbrynet session") - log.info("Status at time of shutdown: " + self.startup_status[0]) self._stop_streams() self.looping_call_manager.shutdown() if self.analytics_manager: self.analytics_manager.shutdown() - d = self._stop_server() - d.addErrback(log.fail(), 'Failure while shutting down') - d.addCallback(lambda _: self._stop_reflector()) - d.addErrback(log.fail(), 'Failure while shutting down') - d.addCallback(lambda _: self._stop_file_manager()) - d.addErrback(log.fail(), 'Failure while shutting down') - if self.session is not None: - d.addCallback(lambda _: self.session.shut_down()) + if self.component_manager is not None: + d = self.component_manager.stop() d.addErrback(log.fail(), 'Failure while shutting down') return d - def _update_settings(self, settings): - setting_types = { - 'download_directory': str, - 'data_rate': float, - 'download_timeout': int, - 'peer_port': int, - 'max_key_fee': dict, - 'use_upnp': bool, - 'run_reflector_server': bool, - 'cache_time': int, - 'reflect_uploads': bool, - 'share_usage_data': bool, - 'disable_max_key_fee': bool, - 'peer_search_timeout': int, - 'sd_download_timeout': int, - 'auto_renew_claim_height_delta': int - } - - for key, setting_type in setting_types.iteritems(): - if key in settings: - if isinstance(settings[key], setting_type): - conf.settings.update({key: settings[key]}, - data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) - elif setting_type is dict and isinstance(settings[key], six.string_types): - decoded = json.loads(str(settings[key])) - conf.settings.update({key: decoded}, - data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) - else: - converted = setting_type(settings[key]) - conf.settings.update({key: converted}, - data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) - conf.settings.save_conf_file_settings() - - self.data_rate = conf.settings['data_rate'] - self.max_key_fee = conf.settings['max_key_fee'] - self.disable_max_key_fee = conf.settings['disable_max_key_fee'] - self.download_directory = conf.settings['download_directory'] - self.download_timeout = conf.settings['download_timeout'] - - return defer.succeed(True) - - def _write_db_revision_file(self, version_num): - with open(self.db_revision_file, mode='w') as db_revision: - db_revision.write(str(version_num)) - - def _setup_data_directory(self): - old_revision = 1 - self.startup_status = STARTUP_STAGES[1] - log.info("Loading databases") - if not os.path.exists(self.download_directory): - os.mkdir(self.download_directory) - if not os.path.exists(self.db_dir): - os.mkdir(self.db_dir) - self._write_db_revision_file(self.current_db_revision) - log.debug("Created the db revision file: %s", self.db_revision_file) - if not os.path.exists(self.blobfile_dir): - os.mkdir(self.blobfile_dir) - log.debug("Created the blobfile directory: %s", str(self.blobfile_dir)) - if not os.path.exists(self.db_revision_file): - log.warning("db_revision file not found. Creating it") - self._write_db_revision_file(self.current_db_revision) - - @defer.inlineCallbacks - def _check_db_migration(self): - old_revision = 1 - migrated = False - if os.path.exists(self.db_revision_file): - with open(self.db_revision_file, "r") as revision_read_handle: - old_revision = int(revision_read_handle.read().strip()) - - if old_revision > self.current_db_revision: - raise Exception('This version of lbrynet is not compatible with the database\n' - 'Your database is revision %i, expected %i' % - (old_revision, self.current_db_revision)) - if old_revision < self.current_db_revision: - from lbrynet.database.migrator import dbmigrator - log.info("Upgrading your databases (revision %i to %i)", old_revision, self.current_db_revision) - yield threads.deferToThread( - dbmigrator.migrate_db, self.db_dir, old_revision, self.current_db_revision - ) - self._write_db_revision_file(self.current_db_revision) - log.info("Finished upgrading the databases.") - migrated = True - defer.returnValue(migrated) - - @defer.inlineCallbacks - def _setup_lbry_file_manager(self): - log.info('Starting the file manager') - self.startup_status = STARTUP_STAGES[3] - self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) - yield self.lbry_file_manager.setup() - log.info('Done setting up file manager') - def _start_analytics(self): if not self.analytics_manager.is_started: self.analytics_manager.start() - def _get_session(self): - def get_wallet(): - if self.wallet_type == LBRYCRD_WALLET: - raise ValueError('LBRYcrd Wallet is no longer supported') - elif self.wallet_type == LBRYUM_WALLET: - - log.info("Using lbryum wallet") - - lbryum_servers = {address: {'t': str(port)} - for address, port in conf.settings['lbryum_servers']} - - config = { - 'auto_connect': True, - 'chain': conf.settings['blockchain_name'], - 'default_servers': lbryum_servers - } - - if 'use_keyring' in conf.settings: - config['use_keyring'] = conf.settings['use_keyring'] - if conf.settings['lbryum_wallet_dir']: - config['lbryum_path'] = conf.settings['lbryum_wallet_dir'] - wallet = LBRYumWallet(self.storage, config) - return defer.succeed(wallet) - else: - raise ValueError('Wallet Type {} is not valid'.format(self.wallet_type)) - - d = get_wallet() - - def create_session(wallet): - self.session = Session( - conf.settings['data_rate'], - db_dir=self.db_dir, - node_id=self.node_id, - blob_dir=self.blobfile_dir, - dht_node_port=self.dht_node_port, - known_dht_nodes=conf.settings['known_dht_nodes'], - peer_port=self.peer_port, - use_upnp=self.use_upnp, - wallet=wallet, - is_generous=conf.settings['is_generous_host'], - external_ip=self.platform['ip'], - storage=self.storage - ) - self.startup_status = STARTUP_STAGES[2] - - d.addCallback(create_session) - d.addCallback(lambda _: self.session.setup()) - return d - @defer.inlineCallbacks def _check_wallet_locked(self): - wallet = self.session.wallet - if wallet.wallet.use_encryption: - self.startup_status = STARTUP_STAGES[7] - - yield wallet.check_locked() - - def _setup_stream_identifier(self): - file_saver_factory = EncryptedFileSaverFactory( - self.session.peer_finder, - self.session.rate_limiter, - self.session.blob_manager, - self.session.storage, - self.session.wallet, - self.download_directory - ) - self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, - file_saver_factory) - return defer.succeed(None) + wallet = self.wallet + # if wallet.wallet.use_encryption: STARTUP Stage was set earlier, figure out what to do now + yield wallet.check_locked def _download_blob(self, blob_hash, rate_manager=None, timeout=None): """ @@ -620,7 +317,7 @@ class Daemon(AuthJSONRPCServer): timeout = timeout or 30 downloader = StandaloneBlobDownloader( blob_hash, self.session.blob_manager, self.session.peer_finder, self.session.rate_limiter, - rate_manager, self.session.wallet, timeout + rate_manager, self.wallet, timeout ) return downloader.download() @@ -628,7 +325,7 @@ class Daemon(AuthJSONRPCServer): def _get_stream_analytics_report(self, claim_dict): sd_hash = claim_dict.source_hash try: - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) except Exception: stream_hash = None report = { @@ -642,7 +339,7 @@ class Daemon(AuthJSONRPCServer): sd_host = None report["sd_blob"] = sd_host if stream_hash: - blob_infos = yield self.session.storage.get_blobs_for_stream(stream_hash) + blob_infos = yield self.storage.get_blobs_for_stream(stream_hash) report["known_blobs"] = len(blob_infos) else: blob_infos = [] @@ -713,11 +410,12 @@ class Daemon(AuthJSONRPCServer): def _publish_stream(self, name, bid, claim_dict, file_path=None, certificate_id=None, claim_address=None, change_address=None): - publisher = Publisher(self.session, self.lbry_file_manager, self.session.wallet, + publisher = Publisher(self.session, self.file_manager, self.wallet, certificate_id) parse_lbry_uri(name) if not file_path: - stream_hash = yield self.storage.get_stream_hash_for_sd_hash(claim_dict['stream']['source']['source']) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash( + claim_dict['stream']['source']['source']) claim_out = yield publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address, change_address) else: @@ -742,7 +440,7 @@ class Daemon(AuthJSONRPCServer): """ parsed = parse_lbry_uri(name) - resolution = yield self.session.wallet.resolve(parsed.name, check_cache=not force_refresh) + resolution = yield self.wallet.resolve(parsed.name, check_cache=not force_refresh) if parsed.name in resolution: result = resolution[parsed.name] defer.returnValue(result) @@ -797,7 +495,7 @@ class Daemon(AuthJSONRPCServer): cost = self._get_est_cost_from_stream_size(size) - resolved = yield self.session.wallet.resolve(uri) + resolved = yield self.wallet.resolve(uri) if uri in resolved and 'claim' in resolved[uri]: claim = ClaimDict.load_dict(resolved[uri]['claim']['value']) @@ -844,7 +542,7 @@ class Daemon(AuthJSONRPCServer): Resolve a name and return the estimated stream cost """ - resolved = yield self.session.wallet.resolve(uri) + resolved = yield self.wallet.resolve(uri) if resolved: claim_response = resolved[uri] else: @@ -924,7 +622,7 @@ class Daemon(AuthJSONRPCServer): def _get_lbry_file(self, search_by, val, return_json=False, full_status=False): lbry_file = None if search_by in FileID: - for l_f in self.lbry_file_manager.lbry_files: + for l_f in self.file_manager.lbry_files: if l_f.__dict__.get(search_by) == val: lbry_file = l_f break @@ -936,7 +634,7 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def _get_lbry_files(self, return_json=False, full_status=True, **kwargs): - lbry_files = list(self.lbry_file_manager.lbry_files) + lbry_files = list(self.file_manager.lbry_files) if kwargs: for search_type, value in iter_lbry_file_search_values(kwargs): lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value] @@ -973,7 +671,7 @@ class Daemon(AuthJSONRPCServer): def _get_single_peer_downloader(self): downloader = SinglePeerDownloader() - downloader.setup(self.session.wallet) + downloader.setup(self.wallet) return downloader @defer.inlineCallbacks @@ -1042,8 +740,7 @@ class Daemon(AuthJSONRPCServer): 'is_running': bool, 'is_first_run': bool, 'startup_status': { - 'code': status code, - 'message': status message + (str) component_name: (bool) True if running else False, }, 'connection_status': { 'code': connection status code, @@ -1067,22 +764,19 @@ class Daemon(AuthJSONRPCServer): """ # on startup, the wallet or network won't be available but we still need this call to work - has_wallet = self.session and self.session.wallet and self.session.wallet.network - local_height = self.session.wallet.network.get_local_height() if has_wallet else 0 - remote_height = self.session.wallet.network.get_server_height() if has_wallet else 0 - best_hash = (yield self.session.wallet.get_best_blockhash()) if has_wallet else None - wallet_is_encrypted = has_wallet and self.session.wallet.wallet and \ - self.session.wallet.wallet.use_encryption + has_wallet = self.session and self.wallet and self.wallet.network + local_height = self.wallet.network.get_local_height() if has_wallet else 0 + remote_height = self.wallet.network.get_server_height() if has_wallet else 0 + best_hash = (yield self.wallet.get_best_blockhash()) if has_wallet else None + wallet_is_encrypted = has_wallet and self.wallet.wallet and \ + self.wallet.wallet.use_encryption response = { 'lbry_id': base58.b58encode(self.node_id), 'installation_id': conf.settings.installation_id, 'is_running': self.announced_startup, - 'is_first_run': self.session.wallet.is_first_run if has_wallet else None, - 'startup_status': { - 'code': self.startup_status[0], - 'message': self.startup_status[1], - }, + 'is_first_run': self.wallet.is_first_run if has_wallet else None, + 'startup_status': self.component_manager.get_components_status(), 'connection_status': { 'code': self.connection_status_code, 'message': ( @@ -1105,7 +799,7 @@ class Daemon(AuthJSONRPCServer): should_announce_blobs = yield self.session.blob_manager.count_should_announce_blobs() response['session_status'] = { 'managed_blobs': len(blobs), - 'managed_streams': len(self.lbry_file_manager.lbry_files), + 'managed_streams': len(self.file_manager.lbry_files), 'announce_queue_size': announce_queue_size, 'should_announce_blobs': should_announce_blobs, } @@ -1181,7 +875,6 @@ class Daemon(AuthJSONRPCServer): """ return self._render_response(conf.settings.get_adjustable_settings_dict()) - @defer.inlineCallbacks def jsonrpc_settings_set(self, **kwargs): """ Set daemon settings @@ -1233,8 +926,48 @@ class Daemon(AuthJSONRPCServer): (dict) Updated dictionary of daemon settings """ - yield self._update_settings(kwargs) - defer.returnValue(conf.settings.get_adjustable_settings_dict()) + # TODO: improve upon the current logic, it could be made better + new_settings = kwargs + + setting_types = { + 'download_directory': str, + 'data_rate': float, + 'download_timeout': int, + 'peer_port': int, + 'max_key_fee': dict, + 'use_upnp': bool, + 'run_reflector_server': bool, + 'cache_time': int, + 'reflect_uploads': bool, + 'share_usage_data': bool, + 'disable_max_key_fee': bool, + 'peer_search_timeout': int, + 'sd_download_timeout': int, + 'auto_renew_claim_height_delta': int + } + + for key, setting_type in setting_types.iteritems(): + if key in new_settings: + if isinstance(new_settings[key], setting_type): + conf.settings.update({key: new_settings[key]}, + data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) + elif setting_type is dict and isinstance(new_settings[key], (unicode, str)): + decoded = json.loads(str(new_settings[key])) + conf.settings.update({key: decoded}, + data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) + else: + converted = setting_type(new_settings[key]) + conf.settings.update({key: converted}, + data_types=(conf.TYPE_RUNTIME, conf.TYPE_PERSISTED)) + conf.settings.save_conf_file_settings() + + self.data_rate = conf.settings['data_rate'] + self.max_key_fee = conf.settings['max_key_fee'] + self.disable_max_key_fee = conf.settings['disable_max_key_fee'] + self.download_directory = conf.settings['download_directory'] + self.download_timeout = conf.settings['download_timeout'] + + return self._render_response(conf.settings.get_adjustable_settings_dict()) def jsonrpc_help(self, command=None): """ @@ -1284,6 +1017,7 @@ class Daemon(AuthJSONRPCServer): """ return self._render_response(sorted([command for command in self.callable_methods.keys()])) + @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_balance(self, address=None, include_unconfirmed=False): """ Return the balance of the wallet @@ -1300,11 +1034,12 @@ class Daemon(AuthJSONRPCServer): (float) amount of lbry credits in wallet """ if address is None: - return self._render_response(float(self.session.wallet.get_balance())) + return self._render_response(float(self.wallet.get_balance())) else: return self._render_response(float( - self.session.wallet.get_address_balance(address, include_unconfirmed))) + self.wallet.get_address_balance(address, include_unconfirmed))) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_wallet_unlock(self, password): """ @@ -1320,9 +1055,10 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is unlocked, otherwise false """ - cmd_runner = self.session.wallet.get_cmd_runner() - if cmd_runner.locked: - d = self.session.wallet.wallet_unlocked_d + # the check_locked() in the if statement is needed because that is what sets + # the wallet_unlocked_d deferred ¯\_(ツ)_/¯ + if not self.wallet.check_locked: + d = self.wallet.wallet_unlocked_d d.callback(password) result = yield d else: @@ -1330,6 +1066,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_decrypt(self): """ @@ -1345,10 +1082,11 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is decrypted, otherwise false """ - result = self.session.wallet.decrypt_wallet() + result = self.wallet.decrypt_wallet() response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_encrypt(self, new_password): """ @@ -1365,8 +1103,8 @@ class Daemon(AuthJSONRPCServer): (bool) true if wallet is decrypted, otherwise false """ - self.session.wallet.encrypt_wallet(new_password) - response = yield self._render_response(self.session.wallet.wallet.use_encryption) + self.wallet.encrypt_wallet(new_password) + response = yield self._render_response(self.wallet.wallet.use_encryption) defer.returnValue(response) @defer.inlineCallbacks @@ -1389,6 +1127,7 @@ class Daemon(AuthJSONRPCServer): reactor.callLater(0.1, reactor.fireSystemEvent, "shutdown") defer.returnValue(response) + @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_list(self, sort=None, **kwargs): """ @@ -1460,6 +1199,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_resolve_name(self, name, force=False): """ @@ -1485,6 +1225,7 @@ class Daemon(AuthJSONRPCServer): else: defer.returnValue(metadata) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_claim_show(self, txid=None, nout=None, claim_id=None): """ @@ -1522,14 +1263,15 @@ class Daemon(AuthJSONRPCServer): """ if claim_id is not None and txid is None and nout is None: - claim_results = yield self.session.wallet.get_claim_by_claim_id(claim_id) + claim_results = yield self.wallet.get_claim_by_claim_id(claim_id) elif txid is not None and nout is not None and claim_id is None: - claim_results = yield self.session.wallet.get_claim_by_outpoint(txid, int(nout)) + claim_results = yield self.wallet.get_claim_by_outpoint(txid, int(nout)) else: raise Exception("Must specify either txid/nout, or claim_id") response = yield self._render_response(claim_results) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_resolve(self, force=False, uri=None, uris=[]): """ @@ -1613,13 +1355,14 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[u] = {"error": "%s is not a valid uri" % u} - resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=not force) + resolved = yield self.wallet.resolve(*valid_uris, check_cache=not force) for resolved_uri in resolved: results[resolved_uri] = resolved[resolved_uri] response = yield self._render_response(results) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet", "fileManager", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_get(self, uri, file_name=None, timeout=None): """ @@ -1671,7 +1414,7 @@ class Daemon(AuthJSONRPCServer): if parsed_uri.is_channel and not parsed_uri.path: raise Exception("cannot download a channel claim, specify a /path") - resolved_result = yield self.session.wallet.resolve(uri) + resolved_result = yield self.wallet.resolve(uri) if resolved_result and uri in resolved_result: resolved = resolved_result[uri] else: @@ -1708,6 +1451,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_set_status(self, status, **kwargs): """ @@ -1738,7 +1482,7 @@ class Daemon(AuthJSONRPCServer): raise Exception('Unable to find a file for {}:{}'.format(search_type, value)) if status == 'start' and lbry_file.stopped or status == 'stop' and not lbry_file.stopped: - yield self.lbry_file_manager.toggle_lbry_file_running(lbry_file) + yield self.file_manager.toggle_lbry_file_running(lbry_file) msg = "Started downloading file" if status == 'start' else "Stopped downloading file" else: msg = ( @@ -1748,6 +1492,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(msg) defer.returnValue(response) + @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): """ @@ -1800,14 +1545,15 @@ class Daemon(AuthJSONRPCServer): file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash if lbry_file.sd_hash in self.streams: del self.streams[lbry_file.sd_hash] - yield self.lbry_file_manager.delete_lbry_file(lbry_file, - delete_file=delete_from_download_dir) + yield self.file_manager.delete_lbry_file(lbry_file, + delete_file=delete_from_download_dir) log.info("Deleted file: %s", file_name) result = True response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_stream_cost_estimate(self, uri, size=None): """ @@ -1828,6 +1574,7 @@ class Daemon(AuthJSONRPCServer): cost = yield self.get_est_cost(uri, size) defer.returnValue(cost) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_channel_new(self, channel_name, amount): """ @@ -1863,25 +1610,26 @@ class Daemon(AuthJSONRPCServer): if amount <= 0: raise Exception("Invalid amount") - yield self.session.wallet.update_balance() - if amount >= self.session.wallet.get_balance(): - balance = yield self.session.wallet.get_max_usable_balance_for_claim(channel_name) + yield self.wallet.update_balance() + if amount >= self.wallet.get_balance(): + balance = yield self.wallet.get_max_usable_balance_for_claim(channel_name) max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE if balance <= MAX_UPDATE_FEE_ESTIMATE: raise InsufficientFundsError( "Insufficient funds, please deposit additional LBC. Minimum additional LBC needed {}" - . format(MAX_UPDATE_FEE_ESTIMATE - balance)) + .format(MAX_UPDATE_FEE_ESTIMATE - balance)) elif amount > max_bid_amount: raise InsufficientFundsError( "Please lower the bid value, the maximum amount you can specify for this channel is {}" - .format(max_bid_amount)) + .format(max_bid_amount)) - result = yield self.session.wallet.claim_new_channel(channel_name, amount) + result = yield self.wallet.claim_new_channel(channel_name, amount) self.analytics_manager.send_new_channel() log.info("Claimed a new channel! Result: %s", result) response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_channel_list(self): """ @@ -1898,10 +1646,11 @@ class Daemon(AuthJSONRPCServer): is in the wallet. """ - result = yield self.session.wallet.channel_list() + result = yield self.wallet.channel_list() response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet") @AuthJSONRPCServer.deprecated("channel_list") def jsonrpc_channel_list_mine(self): """ @@ -1919,6 +1668,7 @@ class Daemon(AuthJSONRPCServer): return self.jsonrpc_channel_list() + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_channel_export(self, claim_id): """ @@ -1934,9 +1684,10 @@ class Daemon(AuthJSONRPCServer): (str) Serialized certificate information """ - result = yield self.session.wallet.export_certificate_info(claim_id) + result = yield self.wallet.export_certificate_info(claim_id) defer.returnValue(result) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_channel_import(self, serialized_certificate_info): """ @@ -1952,9 +1703,10 @@ class Daemon(AuthJSONRPCServer): (dict) Result dictionary """ - result = yield self.session.wallet.import_certificate_info(serialized_certificate_info) + result = yield self.wallet.import_certificate_info(serialized_certificate_info) defer.returnValue(result) + @AuthJSONRPCServer.requires("wallet", "fileManager", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None, description=None, author=None, language=None, license=None, @@ -2046,9 +1798,9 @@ class Daemon(AuthJSONRPCServer): if bid <= 0.0: raise ValueError("Bid value must be greater than 0.0") - yield self.session.wallet.update_balance() - if bid >= self.session.wallet.get_balance(): - balance = yield self.session.wallet.get_max_usable_balance_for_claim(name) + yield self.wallet.update_balance() + if bid >= self.wallet.get_balance(): + balance = yield self.wallet.get_max_usable_balance_for_claim(name) max_bid_amount = balance - MAX_UPDATE_FEE_ESTIMATE if balance <= MAX_UPDATE_FEE_ESTIMATE: raise InsufficientFundsError( @@ -2095,7 +1847,7 @@ class Daemon(AuthJSONRPCServer): log.warning("Stripping empty fee from published metadata") del metadata['fee'] elif 'address' not in metadata['fee']: - address = yield self.session.wallet.get_least_used_address() + address = yield self.wallet.get_least_used_address() metadata['fee']['address'] = address if 'fee' in metadata and 'version' not in metadata['fee']: metadata['fee']['version'] = '_0_0_1' @@ -2151,7 +1903,7 @@ class Daemon(AuthJSONRPCServer): certificate_id = channel_id elif channel_name: certificate_id = None - my_certificates = yield self.session.wallet.channel_list() + my_certificates = yield self.wallet.channel_list() for certificate in my_certificates: if channel_name == certificate['name']: certificate_id = certificate['claim_id'] @@ -2166,6 +1918,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_abandon(self, claim_id=None, txid=None, nout=None): """ @@ -2194,10 +1947,11 @@ class Daemon(AuthJSONRPCServer): if nout is None and txid is not None: raise Exception('Must specify nout') - result = yield self.session.wallet.abandon_claim(claim_id, txid, nout) + result = yield self.wallet.abandon_claim(claim_id, txid, nout) self.analytics_manager.send_claim_action('abandon') defer.returnValue(result) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_new_support(self, name, claim_id, amount): """ @@ -2221,10 +1975,11 @@ class Daemon(AuthJSONRPCServer): } """ - result = yield self.session.wallet.support_claim(name, claim_id, amount) + result = yield self.wallet.support_claim(name, claim_id, amount) self.analytics_manager.send_claim_action('new_support') defer.returnValue(result) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_renew(self, outpoint=None, height=None): """ @@ -2260,13 +2015,14 @@ class Daemon(AuthJSONRPCServer): nout = int(nout) else: raise Exception("invalid outpoint") - result = yield self.session.wallet.claim_renew(txid, nout) + result = yield self.wallet.claim_renew(txid, nout) result = {outpoint: result} else: height = int(height) - result = yield self.session.wallet.claim_renew_all_before_expiration(height) + result = yield self.wallet.claim_renew_all_before_expiration(height) defer.returnValue(result) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_claim_send_to_address(self, claim_id, address, amount=None): """ @@ -2294,11 +2050,12 @@ class Daemon(AuthJSONRPCServer): } """ - result = yield self.session.wallet.send_claim_to_address(claim_id, address, amount) + result = yield self.wallet.send_claim_to_address(claim_id, address, amount) response = yield self._render_response(result) defer.returnValue(response) # TODO: claim_list_mine should be merged into claim_list, but idk how to authenticate it -Grin + @AuthJSONRPCServer.requires("wallet") def jsonrpc_claim_list_mine(self): """ List my name claims @@ -2332,10 +2089,11 @@ class Daemon(AuthJSONRPCServer): ] """ - d = self.session.wallet.get_name_claims() + d = self.wallet.get_name_claims() d.addCallback(lambda claims: self._render_response(claims)) return d + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_claim_list(self, name): """ @@ -2370,10 +2128,11 @@ class Daemon(AuthJSONRPCServer): } """ - claims = yield self.session.wallet.get_claims_for_name(name) # type: dict + claims = yield self.wallet.get_claims_for_name(name) # type: dict sort_claim_results(claims['claims']) defer.returnValue(claims) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_claim_list_by_channel(self, page=0, page_size=10, uri=None, uris=[]): """ @@ -2447,8 +2206,8 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri} - resolved = yield self.session.wallet.resolve(*valid_uris, check_cache=False, page=page, - page_size=page_size) + resolved = yield self.wallet.resolve(*valid_uris, check_cache=False, page=page, + page_size=page_size) for u in resolved: if 'error' in resolved[u]: results[u] = resolved[u] @@ -2463,6 +2222,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(results) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet") def jsonrpc_transaction_list(self): """ List transactions belonging to wallet @@ -2520,10 +2280,11 @@ class Daemon(AuthJSONRPCServer): """ - d = self.session.wallet.get_history() + d = self.wallet.get_history() d.addCallback(lambda r: self._render_response(r)) return d + @AuthJSONRPCServer.requires("wallet") def jsonrpc_transaction_show(self, txid): """ Get a decoded transaction from a txid @@ -2538,10 +2299,11 @@ class Daemon(AuthJSONRPCServer): (dict) JSON formatted transaction """ - d = self.session.wallet.get_transaction(txid) + d = self.wallet.get_transaction(txid) d.addCallback(lambda r: self._render_response(r)) return d + @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_is_address_mine(self, address): """ Checks if an address is associated with the current wallet. @@ -2556,10 +2318,11 @@ class Daemon(AuthJSONRPCServer): (bool) true, if address is associated with current wallet """ - d = self.session.wallet.address_is_mine(address) + d = self.wallet.address_is_mine(address) d.addCallback(lambda is_mine: self._render_response(is_mine)) return d + @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_public_key(self, address): """ Get public key from wallet address @@ -2575,10 +2338,11 @@ class Daemon(AuthJSONRPCServer): Could contain more than one public key if multisig. """ - d = self.session.wallet.get_pub_keys(address) + d = self.wallet.get_pub_keys(address) d.addCallback(lambda r: self._render_response(r)) return d + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_wallet_list(self): """ @@ -2594,10 +2358,11 @@ class Daemon(AuthJSONRPCServer): List of wallet addresses """ - addresses = yield self.session.wallet.list_addresses() + addresses = yield self.wallet.list_addresses() response = yield self._render_response(addresses) defer.returnValue(response) + @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_new_address(self): """ Generate a new wallet address @@ -2616,11 +2381,12 @@ class Daemon(AuthJSONRPCServer): log.info("Got new wallet address: " + address) return defer.succeed(address) - d = self.session.wallet.get_new_address() + d = self.wallet.get_new_address() d.addCallback(_disp) d.addCallback(lambda address: self._render_response(address)) return d + @AuthJSONRPCServer.requires("wallet") def jsonrpc_wallet_unused_address(self): """ Return an address containing no balance, will create @@ -2640,11 +2406,12 @@ class Daemon(AuthJSONRPCServer): log.info("Got unused wallet address: " + address) return defer.succeed(address) - d = self.session.wallet.get_unused_address() + d = self.wallet.get_unused_address() d.addCallback(_disp) d.addCallback(lambda address: self._render_response(address)) return d + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @AuthJSONRPCServer.deprecated("wallet_send") @defer.inlineCallbacks def jsonrpc_send_amount_to_address(self, amount, address): @@ -2667,13 +2434,14 @@ class Daemon(AuthJSONRPCServer): elif not amount: raise NullFundsError() - reserved_points = self.session.wallet.reserve_points(address, amount) + reserved_points = self.wallet.reserve_points(address, amount) if reserved_points is None: raise InsufficientFundsError() - yield self.session.wallet.send_points_to_address(reserved_points, amount) + yield self.wallet.send_points_to_address(reserved_points, amount) self.analytics_manager.send_credits_sent() defer.returnValue(True) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_send(self, amount, address=None, claim_id=None): """ @@ -2718,10 +2486,11 @@ class Daemon(AuthJSONRPCServer): result = yield self.jsonrpc_send_amount_to_address(amount, address) else: validate_claim_id(claim_id) - result = yield self.session.wallet.tip_claim(claim_id, amount) + result = yield self.wallet.tip_claim(claim_id, amount) self.analytics_manager.send_claim_action('new_support') defer.returnValue(result) + @AuthJSONRPCServer.requires("wallet", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_wallet_prefill_addresses(self, num_addresses, amount, no_broadcast=False): """ @@ -2747,11 +2516,12 @@ class Daemon(AuthJSONRPCServer): raise NullFundsError() broadcast = not no_broadcast - tx = yield self.session.wallet.create_addresses_with_balance( + tx = yield self.wallet.create_addresses_with_balance( num_addresses, amount, broadcast=broadcast) tx['broadcast'] = broadcast defer.returnValue(tx) + @AuthJSONRPCServer.requires("wallet") @defer.inlineCallbacks def jsonrpc_utxo_list(self): """ @@ -2781,7 +2551,7 @@ class Daemon(AuthJSONRPCServer): ] """ - unspent = yield self.session.wallet.list_unspent() + unspent = yield self.wallet.list_unspent() for i, utxo in enumerate(unspent): utxo['txid'] = utxo.pop('prevout_hash') utxo['nout'] = utxo.pop('prevout_n') @@ -2791,6 +2561,7 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(unspent) + @AuthJSONRPCServer.requires("wallet") def jsonrpc_block_show(self, blockhash=None, height=None): """ Get contents of a block @@ -2807,10 +2578,10 @@ class Daemon(AuthJSONRPCServer): """ if blockhash is not None: - d = self.session.wallet.get_block(blockhash) + d = self.wallet.get_block(blockhash) elif height is not None: - d = self.session.wallet.get_block_info(height) - d.addCallback(lambda b: self.session.wallet.get_block(b)) + d = self.wallet.get_block_info(height) + d.addCallback(lambda b: self.wallet.get_block(b)) else: # TODO: return a useful error message return server.failure @@ -2818,6 +2589,7 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + @AuthJSONRPCServer.requires("wallet", "session", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): """ @@ -2861,6 +2633,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(result) defer.returnValue(response) + @AuthJSONRPCServer.requires("session") @defer.inlineCallbacks def jsonrpc_blob_delete(self, blob_hash): """ @@ -2880,14 +2653,15 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response("Don't have that blob") defer.returnValue(response) try: - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(blob_hash) - yield self.session.storage.delete_stream(stream_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(blob_hash) + yield self.storage.delete_stream(stream_hash) except Exception as err: pass yield self.session.blob_manager.delete_blobs([blob_hash]) response = yield self._render_response("Deleted %s" % blob_hash) defer.returnValue(response) + @AuthJSONRPCServer.requires("dht") @defer.inlineCallbacks def jsonrpc_peer_list(self, blob_hash, timeout=None): """ @@ -2907,7 +2681,7 @@ class Daemon(AuthJSONRPCServer): if not utils.is_valid_blobhash(blob_hash): raise Exception("invalid blob hash") - finished_deferred = self.session.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash)) + finished_deferred = self.dht_node.iterativeFindValue(binascii.unhexlify(blob_hash)) def trap_timeout(err): err.trap(defer.TimeoutError) @@ -2926,6 +2700,7 @@ class Daemon(AuthJSONRPCServer): ] defer.returnValue(results) + @AuthJSONRPCServer.requires("database") @defer.inlineCallbacks def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): """ @@ -2962,6 +2737,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(True) defer.returnValue(response) + @AuthJSONRPCServer.requires("fileManager") @defer.inlineCallbacks def jsonrpc_file_reflect(self, **kwargs): """ @@ -2997,6 +2773,7 @@ class Daemon(AuthJSONRPCServer): results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) defer.returnValue(results) + @AuthJSONRPCServer.requires("database", "session", "wallet") @defer.inlineCallbacks def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, finished=None, page_size=None, page=None): @@ -3026,14 +2803,14 @@ class Daemon(AuthJSONRPCServer): if uri: metadata = yield self._resolve_name(uri) sd_hash = utils.get_sd_hash(metadata) - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) elif stream_hash: - sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) + sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) elif sd_hash: - stream_hash = yield self.session.storage.get_stream_hash_for_sd_hash(sd_hash) - sd_hash = yield self.session.storage.get_sd_blob_hash_for_stream(stream_hash) + stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) + sd_hash = yield self.storage.get_sd_blob_hash_for_stream(stream_hash) if stream_hash: - crypt_blobs = yield self.session.storage.get_blobs_for_stream(stream_hash) + crypt_blobs = yield self.storage.get_blobs_for_stream(stream_hash) blobs = yield defer.gatherResults([ self.session.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None @@ -3060,6 +2837,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response(blob_hashes_for_return) defer.returnValue(response) + @AuthJSONRPCServer.requires("session") def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None): """ Reflects specified blobs @@ -3078,6 +2856,7 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + @AuthJSONRPCServer.requires("session") def jsonrpc_blob_reflect_all(self): """ Reflects all saved blobs @@ -3097,6 +2876,7 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + @AuthJSONRPCServer.requires("dht") @defer.inlineCallbacks def jsonrpc_peer_ping(self, node_id): """ @@ -3114,7 +2894,7 @@ class Daemon(AuthJSONRPCServer): contact = None try: - contact = yield self.session.dht_node.findContact(node_id.decode('hex')) + contact = yield self.dht_node.findContact(node_id.decode('hex')) except TimeoutError: result = {'error': 'timeout finding peer'} defer.returnValue(result) @@ -3126,6 +2906,7 @@ class Daemon(AuthJSONRPCServer): result = {'error': 'ping timeout'} defer.returnValue(result) + @AuthJSONRPCServer.requires("dht") def jsonrpc_routing_table_get(self): """ Get DHT routing information @@ -3156,7 +2937,7 @@ class Daemon(AuthJSONRPCServer): """ result = {} - data_store = self.session.dht_node._dataStore._dict + data_store = self.dht_node._dataStore._dict datastore_len = len(data_store) hosts = {} @@ -3174,8 +2955,8 @@ class Daemon(AuthJSONRPCServer): blob_hashes = [] result['buckets'] = {} - for i in range(len(self.session.dht_node._routingTable._buckets)): - for contact in self.session.dht_node._routingTable._buckets[i]._contacts: + for i in range(len(self.dht_node._routingTable._buckets)): + for contact in self.dht_node._routingTable._buckets[i]._contacts: contacts = result['buckets'].get(i, []) if contact in hosts: blobs = hosts[contact] @@ -3198,9 +2979,11 @@ class Daemon(AuthJSONRPCServer): result['contacts'] = contact_set result['blob_hashes'] = blob_hashes - result['node_id'] = self.session.dht_node.node_id.encode('hex') + result['node_id'] = self.dht_node.node_id.encode('hex') return self._render_response(result) + # the single peer downloader needs wallet access + @AuthJSONRPCServer.requires("dht", "wallet", wallet=lambda wallet: wallet.check_locked) def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None): """ Get blob availability @@ -3225,6 +3008,7 @@ class Daemon(AuthJSONRPCServer): return self._blob_availability(blob_hash, search_timeout, blob_timeout) + @AuthJSONRPCServer.requires("session", "wallet", "dht", wallet=lambda wallet: wallet.check_locked) @AuthJSONRPCServer.deprecated("stream_availability") def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): """ @@ -3245,6 +3029,7 @@ class Daemon(AuthJSONRPCServer): return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout) + @AuthJSONRPCServer.requires("session", "wallet", "dht", wallet=lambda wallet: wallet.check_locked) @defer.inlineCallbacks def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): """ @@ -3297,7 +3082,7 @@ class Daemon(AuthJSONRPCServer): } try: - resolved_result = yield self.session.wallet.resolve(uri) + resolved_result = yield self.wallet.resolve(uri) response['did_resolve'] = True except UnknownNameError: response['error'] = "Failed to resolve name" diff --git a/lbrynet/daemon/DaemonCLI.py b/lbrynet/daemon/DaemonCLI.py index 7ec03aa34..3cecc7c42 100644 --- a/lbrynet/daemon/DaemonCLI.py +++ b/lbrynet/daemon/DaemonCLI.py @@ -7,7 +7,7 @@ from collections import OrderedDict from lbrynet import conf from lbrynet.core import utils from lbrynet.daemon.auth.client import JSONRPCException, LBRYAPIClient, AuthAPIClient -from lbrynet.daemon.Daemon import LOADING_WALLET_CODE, Daemon +from lbrynet.daemon.Daemon import Daemon from lbrynet.core.system_info import get_platform from jsonrpc.common import RPCError from requests.exceptions import ConnectionError @@ -21,17 +21,13 @@ def remove_brackets(key): return key -def set_flag_vals(flag_names, parsed_args): +def set_kwargs(parsed_args): kwargs = OrderedDict() for key, arg in parsed_args.iteritems(): if arg is None: continue - elif key.startswith("--"): - if remove_brackets(key[2:]) not in kwargs: - k = remove_brackets(key[2:]) - elif key in flag_names: - if remove_brackets(flag_names[key]) not in kwargs: - k = remove_brackets(flag_names[key]) + elif key.startswith("--") and remove_brackets(key[2:]) not in kwargs: + k = remove_brackets(key[2:]) elif remove_brackets(key) not in kwargs: k = remove_brackets(key) kwargs[k] = guess_type(arg, k) @@ -79,26 +75,22 @@ def main(): method = new_method fn = Daemon.callable_methods[method] - if hasattr(fn, "_flags"): - flag_names = fn._flags - else: - flag_names = {} parsed = docopt(fn.__doc__, args) - kwargs = set_flag_vals(flag_names, parsed) + kwargs = set_kwargs(parsed) colorama.init() conf.initialize_settings() try: api = LBRYAPIClient.get_client() - status = api.status() + api.status() except (URLError, ConnectionError) as err: if isinstance(err, HTTPError) and err.code == UNAUTHORIZED: api = AuthAPIClient.config() # this can happen if the daemon is using auth with the --http-auth flag # when the config setting is to not use it try: - status = api.status() + api.status() except: print_error("Daemon requires authentication, but none was provided.", suggest_help=False) @@ -108,20 +100,6 @@ def main(): suggest_help=False) return 1 - status_code = status['startup_status']['code'] - - if status_code != "started" and method not in Daemon.allowed_during_startup: - print "Daemon is in the process of starting. Please try again in a bit." - message = status['startup_status']['message'] - if message: - if ( - status['startup_status']['code'] == LOADING_WALLET_CODE - and status['blockchain_status']['blocks_behind'] > 0 - ): - message += '. Blocks left: ' + str(status['blockchain_status']['blocks_behind']) - print " Status: " + message - return 1 - # TODO: check if port is bound. Error if its not try: diff --git a/lbrynet/daemon/__init__.py b/lbrynet/daemon/__init__.py index 7461e1c00..8e0f5feca 100644 --- a/lbrynet/daemon/__init__.py +++ b/lbrynet/daemon/__init__.py @@ -1,3 +1,3 @@ +import Components # register Component classes from lbrynet.daemon.auth.client import LBRYAPIClient - get_client = LBRYAPIClient.get_client diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index a0d365a35..6b117aa70 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -4,6 +4,7 @@ import json import inspect from decimal import Decimal +from functools import wraps from zope.interface import implements from twisted.web import server, resource from twisted.internet import defer @@ -15,6 +16,7 @@ from traceback import format_exc from lbrynet import conf from lbrynet.core.Error import InvalidAuthenticationToken from lbrynet.core import utils +from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet from lbrynet.daemon.auth.util import APIKey, get_auth_message from lbrynet.daemon.auth.client import LBRY_SECRET from lbrynet.undecorated import undecorated @@ -141,6 +143,27 @@ class AuthorizedBase(object): return f return _deprecated_wrapper + @staticmethod + def requires(*components, **component_conditionals): + def _wrap(fn): + @defer.inlineCallbacks + @wraps(fn) + def _inner(*args, **kwargs): + if component_conditionals: + for component_name, condition in component_conditionals.iteritems(): + if not callable(condition): + raise SyntaxError("The specified condition is invalid/not callable") + if not (yield condition(args[0].component_manager.get_component(component_name))): + raise ComponentStartConditionNotMet( + "Not all conditions required to start component are met") + if args[0].component_manager.all_components_running(*components): + result = yield fn(*args, **kwargs) + defer.returnValue(result) + else: + raise ComponentsNotStarted("Not all required components are set up:", components) + return _inner + return _wrap + class AuthJSONRPCServer(AuthorizedBase): """ @@ -149,7 +172,6 @@ class AuthJSONRPCServer(AuthorizedBase): API methods are named with a leading "jsonrpc_" Attributes: - allowed_during_startup (list): list of api methods that are callable before the server has finished startup sessions (dict): (dict): {: } callable_methods (dict): {: } @@ -416,9 +438,6 @@ class AuthJSONRPCServer(AuthorizedBase): def _verify_method_is_callable(self, function_path): if function_path not in self.callable_methods: raise UnknownAPIMethodError(function_path) - if not self.announced_startup: - if function_path not in self.allowed_during_startup: - raise NotAllowedDuringStartupError(function_path) def _get_jsonrpc_method(self, function_path): if function_path in self.deprecated_methods: diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index b134b6da2..eb91214a1 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -39,6 +39,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker log_format = "%(funcName)s(): %(message)s" logging.basicConfig(level=logging.CRITICAL, format=log_format) +TEST_SKIP_STRING_ANDROID = "Test cannot pass on Android because multiprocessing is not supported at the OS level." def require_system(system): def wrapper(fn): @@ -103,13 +104,15 @@ class LbryUploader(object): rate_limiter = RateLimiter() self.sd_identifier = StreamDescriptorIdentifier() self.db_dir, self.blob_dir = mk_db_and_blob_dir() + dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, + node_id="abcd", externalIP="127.0.0.1") self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1") + dht_node=dht_node, is_generous=self.is_generous, external_ip="127.0.0.1") self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) if self.ul_rate_limit is not None: self.session.rate_limiter.set_ul_limit(self.ul_rate_limit) @@ -197,7 +200,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, db_dir, blob_dir = mk_db_and_blob_dir() session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd" + str(n), dht_node_port=4446, dht_node_class=FakeNode, + node_id="abcd" + str(n), dht_node_port=4446, peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, @@ -303,13 +306,16 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero db_dir, blob_dir = mk_db_and_blob_dir() + dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, + node_id="abcd", externalIP="127.0.0.1") + session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh", - peer_finder=peer_finder, hash_announcer=hash_announcer, dht_node_class=FakeNode, + peer_finder=peer_finder, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=peer_port, dht_node_port=4446, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], - external_ip="127.0.0.1") + external_ip="127.0.0.1", dht_node=dht_node) if slow is True: session.rate_limiter.set_ul_limit(2 ** 11) @@ -478,6 +484,8 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, + node_id="abcd", externalIP="127.0.0.1") db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( @@ -486,7 +494,7 @@ class TestTransfer(TestCase): blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, - dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1") + dht_node=dht_node, is_generous=self.is_generous, external_ip="127.0.0.1") self.lbry_file_manager = EncryptedFileManager( self.session, sd_identifier) @@ -566,14 +574,16 @@ class TestTransfer(TestCase): peer_finder = FakePeerFinder(5553, peer_manager, 2) hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() + dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, + node_id="abcd", externalIP="127.0.0.1") db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session( conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, dht_node_class=FakeNode, + blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, + blob_tracker_class=DummyBlobAvailabilityTracker, dht_node=dht_node, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") d1 = self.wait_for_hash_from_queue(blob_hash_queue_1) @@ -646,17 +656,19 @@ class TestTransfer(TestCase): hash_announcer = FakeAnnouncer() rate_limiter = DummyRateLimiter() sd_identifier = StreamDescriptorIdentifier() + dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553, + node_id="abcd", externalIP="127.0.0.1") downloaders = [] db_dir, blob_dir = mk_db_and_blob_dir() self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, - node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, dht_node_class=FakeNode, + node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], - external_ip="127.0.0.1") + external_ip="127.0.0.1", dht_node=dht_node) self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) @@ -758,7 +770,7 @@ class TestTransfer(TestCase): sd_identifier = StreamDescriptorIdentifier() db_dir, blob_dir = mk_db_and_blob_dir() - self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, dht_node_class=FakeNode, + self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, @@ -842,3 +854,10 @@ class TestTransfer(TestCase): d.addBoth(stop) return d + + if is_android(): + test_lbry_transfer.skip = TEST_SKIP_STRING_ANDROID + test_last_blob_retrieval.skip = TEST_SKIP_STRING_ANDROID + test_double_download.skip = TEST_SKIP_STRING_ANDROID + test_multiple_uploaders.skip = TEST_SKIP_STRING_ANDROID + diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index 0a9d82053..3858f7f4b 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -53,6 +53,7 @@ class TestReflector(unittest.TestCase): db_dir=self.db_dir, node_id="abcd", peer_finder=peer_finder, + peer_manager=peer_manager, blob_dir=self.blob_dir, peer_port=5553, dht_node_port=4444, @@ -60,8 +61,8 @@ class TestReflector(unittest.TestCase): wallet=wallet, blob_tracker_class=mocks.BlobAvailabilityTracker, external_ip="127.0.0.1", - dht_node_class=mocks.Node, - hash_announcer=mocks.Announcer() + dht_node=mocks.Node, + hash_announcer=mocks.Announcer(), ) self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager(self.session, @@ -74,6 +75,7 @@ class TestReflector(unittest.TestCase): db_dir=self.server_db_dir, node_id="abcd", peer_finder=peer_finder, + peer_manager=peer_manager, blob_dir=self.server_blob_dir, peer_port=5554, dht_node_port=4443, @@ -81,8 +83,8 @@ class TestReflector(unittest.TestCase): wallet=wallet, blob_tracker_class=mocks.BlobAvailabilityTracker, external_ip="127.0.0.1", - dht_node_class=mocks.Node, - hash_announcer=mocks.Announcer() + dht_node=mocks.Node, + hash_announcer=mocks.Announcer(), ) self.server_blob_manager = BlobManager.DiskBlobManager(self.server_blob_dir, diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index cda06758b..7f831b2e4 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -30,6 +30,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker class TestStreamify(TestCase): maxDiff = 5000 + def setUp(self): mocks.mock_conf_settings(self) self.session = None @@ -37,6 +38,12 @@ class TestStreamify(TestCase): self.is_generous = True self.db_dir = tempfile.mkdtemp() self.blob_dir = os.path.join(self.db_dir, "blobfiles") + self.dht_node = FakeNode() + self.wallet = FakeWallet() + self.peer_manager = PeerManager() + self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2) + self.rate_limiter = DummyRateLimiter() + self.sd_identifier = StreamDescriptorIdentifier() os.mkdir(self.blob_dir) @defer.inlineCallbacks @@ -54,26 +61,17 @@ class TestStreamify(TestCase): os.remove("test_file") def test_create_stream(self): - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 2) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=self.blob_dir, peer_port=5553, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, - is_generous=self.is_generous, external_ip="127.0.0.1", dht_node_class=mocks.Node + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", peer_finder=self.peer_finder, + blob_dir=self.blob_dir, peer_port=5553, use_upnp=False, rate_limiter=self.rate_limiter, wallet=self.wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1", dht_node=self.dht_node ) - self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) + self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) d = self.session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self.lbry_file_manager.setup()) def verify_equal(sd_info): @@ -102,22 +100,14 @@ class TestStreamify(TestCase): return d def test_create_and_combine_stream(self): - wallet = FakeWallet() - peer_manager = PeerManager() - peer_finder = FakePeerFinder(5553, peer_manager, 2) - hash_announcer = FakeAnnouncer() - rate_limiter = DummyRateLimiter() - sd_identifier = StreamDescriptorIdentifier() self.session = Session( - conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", - peer_finder=peer_finder, hash_announcer=hash_announcer, - blob_dir=self.blob_dir, peer_port=5553, dht_node_class=mocks.Node, - use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, - blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1" + conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", peer_finder=self.peer_finder, + blob_dir=self.blob_dir, peer_port=5553, use_upnp=False, rate_limiter=self.rate_limiter, wallet=self.wallet, + blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1", dht_node=self.dht_node ) - self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier) + self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier) @defer.inlineCallbacks def create_stream(): @@ -132,7 +122,7 @@ class TestStreamify(TestCase): self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b") d = self.session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier)) d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: create_stream()) return d diff --git a/lbrynet/tests/mocks.py b/lbrynet/tests/mocks.py index c8e131362..c01e1068b 100644 --- a/lbrynet/tests/mocks.py +++ b/lbrynet/tests/mocks.py @@ -1,5 +1,6 @@ import base64 import io +import mock from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa @@ -10,6 +11,7 @@ from twisted.python.failure import Failure from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import RequestCanceledError from lbrynet.core import BlobAvailability +from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.dht.node import Node as RealNode from lbrynet.daemon import ExchangeRateManager as ERM from lbrynet import conf @@ -63,6 +65,7 @@ class BTCLBCFeed(ERM.MarketFeed): 0.0 ) + class USDBTCFeed(ERM.MarketFeed): def __init__(self): ERM.MarketFeed.__init__( @@ -74,6 +77,7 @@ class USDBTCFeed(ERM.MarketFeed): 0.0 ) + class ExchangeRateManager(ERM.ExchangeRateManager): def __init__(self, market_feeds, rates): self.market_feeds = market_feeds @@ -360,6 +364,96 @@ class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker): pass +# The components below viz. FakeWallet, FakeSession, FakeFileManager are just for testing Component Manager's +# startup and stop +class FakeComponent(object): + depends_on = [] + component_name = None + + def __init__(self, component_manager): + self.component_manager = component_manager + self._running = False + + @property + def running(self): + return self._running + + def start(self): + raise NotImplementedError # Override + + def stop(self): + return defer.succeed(None) + + @property + def component(self): + return self + + @defer.inlineCallbacks + def _setup(self): + result = yield defer.maybeDeferred(self.start) + self._running = True + defer.returnValue(result) + + @defer.inlineCallbacks + def _stop(self): + result = yield defer.maybeDeferred(self.stop) + self._running = False + defer.returnValue(result) + + +class FakeDelayedWallet(FakeComponent): + component_name = "wallet" + depends_on = [] + + def start(self): + return defer.succeed(True) + + def stop(self): + d = defer.Deferred() + self.component_manager.reactor.callLater(1, d.callback, True) + return d + + +class FakeDelayedSession(FakeComponent): + component_name = "session" + depends_on = [FakeDelayedWallet.component_name] + + def start(self): + d = defer.Deferred() + self.component_manager.reactor.callLater(1, d.callback, True) + return d + + def stop(self): + d = defer.Deferred() + self.component_manager.reactor.callLater(1, d.callback, True) + return d + + +class FakeDelayedFileManager(FakeComponent): + component_name = "file_manager" + depends_on = [FakeDelayedSession.component_name] + + def start(self): + d = defer.Deferred() + self.component_manager.reactor.callLater(1, d.callback, True) + return d + + def stop(self): + return defer.succeed(True) + +class FakeFileManager(FakeComponent): + component_name = "fileManager" + depends_on = [] + + @property + def component(self): + return mock.Mock(spec=EncryptedFileManager) + + def start(self): + return defer.succeed(True) + + def stop(self): + pass create_stream_sd_file = { 'stream_name': '746573745f66696c65', diff --git a/lbrynet/tests/unit/components/__init__.py b/lbrynet/tests/unit/components/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/tests/unit/components/test_Component_Manager.py b/lbrynet/tests/unit/components/test_Component_Manager.py new file mode 100644 index 000000000..f31fb015c --- /dev/null +++ b/lbrynet/tests/unit/components/test_Component_Manager.py @@ -0,0 +1,131 @@ +from twisted.internet.task import Clock +from twisted.trial import unittest + +from lbrynet.daemon.ComponentManager import ComponentManager +from lbrynet.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, STREAM_IDENTIFIER_COMPONENT +from lbrynet.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT +from lbrynet.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT +from lbrynet.daemon import Components +from lbrynet.tests import mocks + + +class TestComponentManager(unittest.TestCase): + def setUp(self): + mocks.mock_conf_settings(self) + self.default_components_sort = [ + [Components.DatabaseComponent, + Components.UPnPComponent], + [Components.DHTComponent, + Components.WalletComponent], + [Components.HashAnnouncer], + [Components.SessionComponent], + [Components.PeerProtocolServer, + Components.StreamIdentifier], + [Components.FileManager], + [Components.ReflectorComponent] + ] + self.component_manager = ComponentManager() + + def tearDown(self): + pass + + def test_sort_components(self): + stages = self.component_manager.sort_components() + + for stage_list, sorted_stage_list in zip(stages, self.default_components_sort): + self.assertSetEqual(set([type(stage) for stage in stage_list]), set(sorted_stage_list)) + + def test_sort_components_reverse(self): + rev_stages = self.component_manager.sort_components(reverse=True) + reverse_default_components_sort = reversed(self.default_components_sort) + + for stage_list, sorted_stage_list in zip(rev_stages, reverse_default_components_sort): + self.assertSetEqual(set([type(stage) for stage in stage_list]), set(sorted_stage_list)) + + def test_get_component_not_exists(self): + + with self.assertRaises(NameError): + self.component_manager.get_component("random_component") + + +class TestComponentManagerOverrides(unittest.TestCase): + def setUp(self): + mocks.mock_conf_settings(self) + + def test_init_with_overrides(self): + class FakeWallet(object): + component_name = "wallet" + depends_on = [] + + def __init__(self, component_manager): + self.component_manager = component_manager + + @property + def component(self): + return self + + new_component_manager = ComponentManager(wallet=FakeWallet) + fake_wallet = new_component_manager.get_component("wallet") + # wallet should be an instance of FakeWallet and not WalletComponent from Components.py + self.assertEquals(type(fake_wallet), FakeWallet) + self.assertNotEquals(type(fake_wallet), Components.WalletComponent) + + def test_init_with_wrong_overrides(self): + class FakeRandomComponent(object): + component_name = "someComponent" + depends_on = [] + + with self.assertRaises(SyntaxError): + ComponentManager(randomComponent=FakeRandomComponent) + + +class TestComponentManagerProperStart(unittest.TestCase): + def setUp(self): + self.reactor = Clock() + mocks.mock_conf_settings(self) + self.component_manager = ComponentManager( + skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, + PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT], + reactor=self.reactor, + wallet=mocks.FakeDelayedWallet, + session=mocks.FakeDelayedSession, + fileManager=mocks.FakeDelayedFileManager + ) + + def tearDown(self): + pass + + def test_proper_starting_of_components(self): + self.component_manager.setup() + self.assertTrue(self.component_manager.get_component('wallet').running) + self.assertFalse(self.component_manager.get_component('session').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) + + self.reactor.advance(1) + self.assertTrue(self.component_manager.get_component('wallet').running) + self.assertTrue(self.component_manager.get_component('session').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) + + self.reactor.advance(1) + self.assertTrue(self.component_manager.get_component('wallet').running) + self.assertTrue(self.component_manager.get_component('session').running) + self.assertTrue(self.component_manager.get_component('file_manager').running) + + def test_proper_stopping_of_components(self): + self.component_manager.setup() + self.reactor.advance(1) + self.reactor.advance(1) + self.component_manager.stop() + self.assertFalse(self.component_manager.get_component('file_manager').running) + self.assertTrue(self.component_manager.get_component('session').running) + self.assertTrue(self.component_manager.get_component('wallet').running) + + self.reactor.advance(1) + self.assertFalse(self.component_manager.get_component('file_manager').running) + self.assertFalse(self.component_manager.get_component('session').running) + self.assertTrue(self.component_manager.get_component('wallet').running) + + self.reactor.advance(1) + self.assertFalse(self.component_manager.get_component('file_manager').running) + self.assertFalse(self.component_manager.get_component('session').running) + self.assertFalse(self.component_manager.get_component('wallet').running) diff --git a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py index d47c36ba2..1ef72c1a1 100644 --- a/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py +++ b/lbrynet/tests/unit/lbrynet_daemon/test_Daemon.py @@ -1,11 +1,10 @@ import mock import json -import unittest import random from os import path from twisted.internet import defer -from twisted import trial +from twisted.trial import unittest from faker import Faker @@ -14,12 +13,15 @@ from lbryum.wallet import NewWallet from lbrynet import conf from lbrynet.core import Session, PaymentRateManager, Wallet from lbrynet.database.storage import SQLiteStorage +from lbrynet.daemon.ComponentManager import ComponentManager +from lbrynet.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, STREAM_IDENTIFIER_COMPONENT +from lbrynet.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, SESSION_COMPONENT +from lbrynet.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT from lbrynet.daemon.Daemon import Daemon as LBRYDaemon -from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.tests import util -from lbrynet.tests.mocks import mock_conf_settings, FakeNetwork +from lbrynet.tests.mocks import mock_conf_settings, FakeNetwork, FakeFileManager from lbrynet.tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed @@ -40,10 +42,10 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False): } daemon = LBRYDaemon(None) daemon.session = mock.Mock(spec=Session.Session) - daemon.session.wallet = mock.Mock(spec=Wallet.LBRYumWallet) - daemon.session.wallet.wallet = mock.Mock(spec=NewWallet) - daemon.session.wallet.wallet.use_encryption = False - daemon.session.wallet.network = FakeNetwork() + daemon.wallet = mock.Mock(spec=Wallet.LBRYumWallet) + daemon.wallet.wallet = mock.Mock(spec=NewWallet) + daemon.wallet.wallet.use_encryption = False + daemon.wallet.network = FakeNetwork() daemon.session.storage = mock.Mock(spec=SQLiteStorage) market_feeds = [BTCLBCFeed(), USDBTCFeed()] daemon.exchange_rate_manager = DummyExchangeRateManager(market_feeds, rates) @@ -73,12 +75,12 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False): {"fee": {"USD": {"address": "bQ6BGboPV2SpTMEP7wLNiAcnsZiH8ye6eA", "amount": 0.75}}}) daemon._resolve_name = lambda _: defer.succeed(metadata) migrated = smart_decode(json.dumps(metadata)) - daemon.session.wallet.resolve = lambda *_: defer.succeed( + daemon.wallet.resolve = lambda *_: defer.succeed( {"test": {'claim': {'value': migrated.claim_dict}}}) return daemon -class TestCostEst(trial.unittest.TestCase): +class TestCostEst(unittest.TestCase): def setUp(self): mock_conf_settings(self) util.resetTime(self) @@ -111,7 +113,8 @@ class TestCostEst(trial.unittest.TestCase): self.assertEquals(daemon.get_est_cost("test", size).result, correct_result) -class TestJsonRpc(trial.unittest.TestCase): +class TestJsonRpc(unittest.TestCase): + def setUp(self): def noop(): return None @@ -119,30 +122,39 @@ class TestJsonRpc(trial.unittest.TestCase): mock_conf_settings(self) util.resetTime(self) self.test_daemon = get_test_daemon() - self.test_daemon.session.wallet.is_first_run = False - self.test_daemon.session.wallet.get_best_blockhash = noop + self.test_daemon.wallet.is_first_run = False + self.test_daemon.wallet.get_best_blockhash = noop def test_status(self): d = defer.maybeDeferred(self.test_daemon.jsonrpc_status) d.addCallback(lambda status: self.assertDictContainsSubset({'is_running': False}, status)) - @unittest.skipIf(is_android(), - 'Test cannot pass on Android because PYTHONOPTIMIZE removes the docstrings.') def test_help(self): d = defer.maybeDeferred(self.test_daemon.jsonrpc_help, command='status') d.addCallback(lambda result: self.assertSubstring('daemon status', result['help'])) # self.assertSubstring('daemon status', d.result) + if is_android(): + test_help.skip = "Test cannot pass on Android because PYTHONOPTIMIZE removes the docstrings." -class TestFileListSorting(trial.unittest.TestCase): + +class TestFileListSorting(unittest.TestCase): def setUp(self): mock_conf_settings(self) util.resetTime(self) self.faker = Faker('en_US') self.faker.seed(66410) self.test_daemon = get_test_daemon() - self.test_daemon.lbry_file_manager = mock.Mock(spec=EncryptedFileManager) - self.test_daemon.lbry_file_manager.lbry_files = self._get_fake_lbry_files() + component_manager = ComponentManager( + skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, SESSION_COMPONENT, UPNP_COMPONENT, + PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, HASH_ANNOUNCER_COMPONENT, + STREAM_IDENTIFIER_COMPONENT], + fileManager=FakeFileManager + ) + component_manager.setup() + self.test_daemon.component_manager = component_manager + self.test_daemon.file_manager = component_manager.get_component("fileManager") + self.test_daemon.file_manager.lbry_files = self._get_fake_lbry_files() # Pre-sorted lists of prices and file names in ascending order produced by # faker with seed 66410. This seed was chosen becacuse it produces 3 results