Merge branch 'daemon-components'

This commit is contained in:
Jack Robison 2018-07-24 21:17:51 -04:00
commit 9b873d2f48
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
31 changed files with 1754 additions and 1113 deletions

View file

@ -8,35 +8,36 @@ can and probably will change functionality and break backwards compatability
at anytime.
## [Unreleased]
## [0.20.3] - 2018-07-20
### Changed
* Additional information added to the balance error message when editing a claim.
(https://github.com/lbryio/lbry/pull/1309)
### Security
*
*
### Fixed
*
* loggly error reporting not following `share_usage_data`
*
### Deprecated
*
* automatic claim renew, this is no longer needed
*
### Changed
*
*
* api server class to use components, and for all JSONRPC API commands to be callable so long as the required components are available.
* return error messages when required conditions on components are not met for API calls
* `status` to no longer return a base58 encoded `lbry_id`, instead return this as the hex encoded `node_id` in a new `dht_node_status` field.
* `startup_status` field in the response to `status` to be a dict of component names to status booleans
* moved wallet, upnp and dht startup code from `Session` to `Components`
### Added
* `skipped_components` list to the response from `status`
* `skipped_components` config setting, accemapts a list of names of components to not run
* `ComponentManager` for managing the lifecycles of dependencies
* `requires` decorator to register the components required by a `jsonrpc_` command, to facilitate commands registering asynchronously
* unittests for `ComponentManager`
* script to generate docs/api.json file (https://github.com/lbryio/lbry.tech/issues/42)
*
* additional information to the balance error message when editing a claim (https://github.com/lbryio/lbry/pull/1309)
### Removed
*
*
* most of the internal attributes from `Daemon`
## [0.20.4] - 2018-07-18

View file

@ -168,9 +168,11 @@ def server_port(server_and_port):
def server_list(servers):
return [server_port(server) for server in servers]
def server_list_reverse(servers):
return ["%s:%s" % (server, port) for server, port in servers]
class Env(envparse.Env):
"""An Env parser that automatically namespaces the variables with LBRY"""
@ -288,7 +290,7 @@ ADJUSTABLE_SETTINGS = {
'reflect_uploads': (bool, True),
'auto_re_reflect_interval': (int, 86400), # set to 0 to disable
'reflector_servers': (list, [('reflector2.lbry.io', 5566)], server_list, server_list_reverse),
'run_reflector_server': (bool, False),
'run_reflector_server': (bool, False), # adds `reflector` to components_to_skip unless True
'sd_download_timeout': (int, 3),
'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY
'peer_search_timeout': (int, 30),
@ -299,7 +301,8 @@ ADJUSTABLE_SETTINGS = {
'blockchain_name': (str, 'lbrycrd_main'),
'lbryum_servers': (list, [('lbryumx1.lbry.io', 50001), ('lbryumx2.lbry.io',
50001)], server_list, server_list_reverse),
's3_headers_depth': (int, 96 * 10) # download headers from s3 when the local height is more than 10 chunks behind
's3_headers_depth': (int, 96 * 10), # download headers from s3 when the local height is more than 10 chunks behind
'components_to_skip': (list, []) # components which will be skipped during start-up of daemon
}

View file

@ -27,7 +27,8 @@ class DiskBlobManager(object):
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
self.check_should_announce_lc = None
if conf.settings['run_reflector_server']: # TODO: move this looping call to SQLiteStorage
# TODO: move this looping call to SQLiteStorage
if 'reflector' not in conf.settings['components_to_skip']:
self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs)
@defer.inlineCallbacks

View file

@ -155,13 +155,23 @@ class InvalidAuthenticationToken(Exception):
class NegotiationError(Exception):
pass
class InvalidCurrencyError(Exception):
def __init__(self, currency):
self.currency = currency
Exception.__init__(
self, 'Invalid currency: {} is not a supported currency.'.format(currency))
class NoSuchDirectoryError(Exception):
def __init__(self, directory):
self.directory = directory
Exception.__init__(self, 'No such directory {}'.format(directory))
class ComponentStartConditionNotMet(Exception):
pass
class ComponentsNotStarted(Exception):
pass

View file

@ -1,11 +1,8 @@
import logging
import miniupnpc
from twisted.internet import threads, defer
from twisted.internet import defer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht import node, hashannouncer
from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id
from lbrynet.core.PaymentRateManager import BasePaymentRateManager, OnlyFreePaymentsManager
log = logging.getLogger(__name__)
@ -32,11 +29,10 @@ class Session(object):
peers can connect to this peer.
"""
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, peer_manager=None, dht_node_port=None,
def __init__(self, blob_data_payment_rate, db_dir=None, node_id=None, dht_node_port=None,
known_dht_nodes=None, peer_finder=None, hash_announcer=None, blob_dir=None, blob_manager=None,
peer_port=None, use_upnp=True, rate_limiter=None, wallet=None, dht_node_class=node.Node,
blob_tracker_class=None, payment_rate_manager_class=None, is_generous=True, external_ip=None,
storage=None):
peer_port=None, rate_limiter=None, wallet=None, external_ip=None, storage=None,
dht_node=None, peer_manager=None):
"""@param blob_data_payment_rate: The default payment rate for blob data
@param db_dir: The directory in which levelDB files should be stored
@ -78,10 +74,6 @@ class Session(object):
@param peer_port: The port on which other peers should connect
to this peer
@param use_upnp: Whether or not to try to open a hole in the
firewall so that outside peers can connect to this peer's
peer_port and dht_node_port
@param rate_limiter: An object which keeps track of the amount
of data transferred to and from this peer, and can limit
that rate if desired
@ -103,20 +95,14 @@ class Session(object):
self.known_dht_nodes = []
self.blob_dir = blob_dir
self.blob_manager = blob_manager
# self.blob_tracker = None
# self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker
self.peer_port = peer_port
self.use_upnp = use_upnp
self.rate_limiter = rate_limiter
self.external_ip = external_ip
self.upnp_redirects = []
self.wallet = wallet
self.dht_node_class = dht_node_class
self.dht_node = None
self.dht_node = dht_node
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = OnlyFreePaymentsManager()
# self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
# self.is_generous = is_generous
self.storage = storage or SQLiteStorage(self.db_dir)
def setup(self):
@ -124,15 +110,14 @@ class Session(object):
log.debug("Starting session.")
if self.node_id is None:
self.node_id = generate_id()
if self.dht_node is not None:
if self.peer_manager is None:
self.peer_manager = self.dht_node.peer_manager
if self.use_upnp is True:
d = self._try_upnp()
else:
d = defer.succeed(True)
d.addCallback(lambda _: self.storage.setup())
d.addCallback(lambda _: self._setup_dht())
if self.peer_finder is None:
self.peer_finder = self.dht_node.peer_finder
d = self.storage.setup()
d.addCallback(lambda _: self._setup_other_components())
return d
@ -140,97 +125,12 @@ class Session(object):
"""Stop all services"""
log.info('Stopping session.')
ds = []
if self.hash_announcer:
self.hash_announcer.stop()
# if self.blob_tracker is not None:
# ds.append(defer.maybeDeferred(self.blob_tracker.stop))
if self.dht_node is not None:
ds.append(defer.maybeDeferred(self.dht_node.stop))
if self.rate_limiter is not None:
ds.append(defer.maybeDeferred(self.rate_limiter.stop))
if self.wallet is not None:
ds.append(defer.maybeDeferred(self.wallet.stop))
if self.blob_manager is not None:
ds.append(defer.maybeDeferred(self.blob_manager.stop))
if self.use_upnp is True:
ds.append(defer.maybeDeferred(self._unset_upnp))
return defer.DeferredList(ds)
def _try_upnp(self):
log.debug("In _try_upnp")
def get_free_port(upnp, port, protocol):
# returns an existing mapping if it exists
mapping = upnp.getspecificportmapping(port, protocol)
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping[1]
return get_free_port(upnp, port + 1, protocol)
def get_port_mapping(upnp, port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
port = get_free_port(upnp, port, protocol)
if isinstance(port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, port, protocol, upnp.lanaddr, port)
return port
upnp.addportmapping(port, protocol, upnp.lanaddr, port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port,
protocol, upnp.lanaddr, port)
return port
def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port:
self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port:
self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
return False
def upnp_failed(err):
log.warning("UPnP failed. Reason: %s", err.getErrorMessage())
return False
d = threads.deferToThread(threaded_try_upnp)
d.addErrback(upnp_failed)
return d
def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary
self.dht_node = self.dht_node_class(
node_id=self.node_id,
udpPort=self.dht_node_port,
externalIP=self.external_ip,
peerPort=self.peer_port,
peer_manager=self.peer_manager,
peer_finder=self.peer_finder,
)
if not self.hash_announcer:
self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage)
self.peer_manager = self.dht_node.peer_manager
self.peer_finder = self.dht_node.peer_finder
d = self.dht_node.start(self.known_dht_nodes)
d.addCallback(lambda _: log.info("Joined the dht"))
d.addCallback(lambda _: self.hash_announcer.start())
def _setup_other_components(self):
log.debug("Setting up the rest of the components")
@ -244,39 +144,6 @@ class Session(object):
else:
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage, self.dht_node._dataStore)
# if self.blob_tracker is None:
# self.blob_tracker = self.blob_tracker_class(
# self.blob_manager, self.dht_node.peer_finder, self.dht_node
# )
# if self.payment_rate_manager is None:
# self.payment_rate_manager = self.payment_rate_manager_class(
# self.base_payment_rate_manager, self.blob_tracker, self.is_generous
# )
self.rate_limiter.start()
d = self.blob_manager.setup()
d.addCallback(lambda _: self.wallet.start())
# d.addCallback(lambda _: self.blob_tracker.start())
return d
def _unset_upnp(self):
log.info("Unsetting upnp for session")
def threaded_unset_upnp():
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
for port, protocol in self.upnp_redirects:
if u.getspecificportmapping(port, protocol) is None:
log.warning(
"UPnP redirect for %s %d was removed by something else.",
protocol, port)
else:
u.deleteportmapping(port, protocol)
log.info("Removed UPnP redirect for %s %d.", protocol, port)
self.upnp_redirects = []
d = threads.deferToThread(threaded_unset_upnp)
d.addErrback(lambda err: str(err))
return d

View file

@ -938,9 +938,7 @@ class LBRYumWallet(Wallet):
self._lag_counter = 0
self.blocks_behind = 0
self.catchup_progress = 0
# fired when the wallet actually unlocks (wallet_unlocked_d can be called multiple times)
self.wallet_unlock_success = defer.Deferred()
self.is_wallet_unlocked = None
def _is_first_run(self):
return (not self.printed_retrieving_headers and
@ -953,21 +951,23 @@ class LBRYumWallet(Wallet):
return self._cmd_runner
def check_locked(self):
if not self.wallet.use_encryption:
log.info("Wallet is not encrypted")
self.wallet_unlock_success.callback(True)
elif not self._cmd_runner:
"""
Checks if the wallet is encrypted(locked) or not
:return: (boolean) indicating whether the wallet is locked or not
"""
if not self._cmd_runner:
raise Exception("Command runner hasn't been initialized yet")
elif self._cmd_runner.locked:
log.info("Waiting for wallet password")
self.wallet_unlocked_d.addCallback(self.unlock)
return self.wallet_unlock_success
return self.is_wallet_unlocked
def unlock(self, password):
if self._cmd_runner and self._cmd_runner.locked:
try:
self._cmd_runner.unlock_wallet(password)
self.wallet_unlock_success.callback(True)
self.is_wallet_unlocked = True
log.info("Unlocked the wallet!")
except InvalidPassword:
log.warning("Incorrect password, try again")
@ -1054,6 +1054,7 @@ class LBRYumWallet(Wallet):
wallet.create_main_account()
wallet.synchronize()
self.wallet = wallet
self.is_wallet_unlocked = not self.wallet.use_encryption
self._check_large_wallet()
return defer.succeed(True)

View file

@ -5,3 +5,5 @@ This includes classes for connecting to other peers and downloading blobs from t
connections from peers and responding to their requests, managing locally stored blobs, sending
and receiving payments, and locating peers in the DHT.
"""
from lbrynet import custom_logger

View file

@ -1,8 +1,6 @@
import inspect
import json
import logging
import logging.handlers
import os
import sys
import traceback
@ -13,25 +11,6 @@ import twisted.python.log
from lbrynet import __version__ as lbrynet_version, build_type, conf
from lbrynet.core import utils
####
# This code is copied from logging/__init__.py in the python source code
####
#
# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame.
#
if hasattr(sys, 'frozen'): # support for py2exe
_srcfile = "logging%s__init__%s" % (os.sep, __file__[-4:])
elif __file__[-4:].lower() in ['.pyc', '.pyo']:
_srcfile = __file__[:-4] + '.py'
else:
_srcfile = __file__
_srcfile = os.path.normcase(_srcfile)
#####
TRACE = 5
class HTTPSHandler(logging.Handler):
def __init__(self, url, fqdn=False, localname=None, facility=None, cookies=None):
@ -139,6 +118,8 @@ def get_loggly_url(token=None, version=None):
def configure_loggly_handler():
if build_type.BUILD == 'dev':
return
if not conf.settings['share_usage_data']:
return
level = logging.ERROR
handler = get_loggly_handler(level=level, installation_id=conf.settings.installation_id,
session_id=conf.settings.get_session_id())
@ -185,33 +166,6 @@ class JsonFormatter(logging.Formatter):
return json.dumps(data)
####
# This code is copied from logging/__init__.py in the python source code
####
def findCaller(srcfile=None):
"""Returns the filename, line number and function name of the caller"""
srcfile = srcfile or _srcfile
f = inspect.currentframe()
# On some versions of IronPython, currentframe() returns None if
# IronPython isn't run with -X:Frames.
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)"
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
# ignore any function calls that are in this file
if filename == srcfile:
f = f.f_back
continue
rv = (filename, f.f_lineno, co.co_name)
break
return rv
###
def failure(failure, log, msg, *args):
"""Log a failure message from a deferred.
@ -316,65 +270,3 @@ def get_parent(logger_name):
return ''
names = names[:-1]
return '.'.join(names)
class Logger(logging.Logger):
"""A logger that has an extra `fail` method useful for handling twisted failures."""
def fail(self, callback=None, *args, **kwargs):
"""Returns a function to log a failure from an errback.
The returned function appends the error message and extracts
the traceback from `err`.
Example usage:
d.addErrback(log.fail(), 'This is an error message')
Although odd, making the method call is necessary to extract
out useful filename and line number information; otherwise the
reported values are from inside twisted's deferred handling
code.
Args:
callback: callable to call after making the log. The first argument
will be the `err` from the deferred
args: extra arguments to pass into `callback`
Returns: a function that takes the following arguments:
err: twisted.python.failure.Failure
msg: the message to log, using normal logging string iterpolation.
msg_args: the values to subtitute into `msg`
msg_kwargs: set `level` to change from the default ERROR severity. Other
keywoards are treated as normal log kwargs.
"""
fn, lno, func = findCaller()
def _fail(err, msg, *msg_args, **msg_kwargs):
level = msg_kwargs.pop('level', logging.ERROR)
msg += ": %s"
msg_args += (err.getErrorMessage(),)
exc_info = (err.type, err.value, err.getTracebackObject())
record = self.makeRecord(
self.name, level, fn, lno, msg, msg_args, exc_info, func, msg_kwargs)
self.handle(record)
if callback:
try:
return callback(err, *args, **kwargs)
except Exception:
# log.fail is almost always called within an
# errback. If callback fails and we didn't catch
# the exception we would need to attach a second
# errback to deal with that, which we will almost
# never do and then we end up with an unhandled
# error that will get swallowed by twisted
self.exception('Failed to run callback')
return _fail
def trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE):
self._log(TRACE, msg, args, **kwargs)
logging.setLoggerClass(Logger)
logging.addLevelName(TRACE, 'TRACE')

106
lbrynet/custom_logger.py Normal file
View file

@ -0,0 +1,106 @@
import os
import sys
import inspect
import logging
TRACE = 5
####
# This code is copied from logging/__init__.py in the python source code
####
#
# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame.
#
if hasattr(sys, 'frozen'): # support for py2exe
_srcfile = "logging%s__init__%s" % (os.sep, __file__[-4:])
elif __file__[-4:].lower() in ['.pyc', '.pyo']:
_srcfile = __file__[:-4] + '.py'
else:
_srcfile = __file__
_srcfile = os.path.normcase(_srcfile)
def findCaller(srcfile=None):
"""Returns the filename, line number and function name of the caller"""
srcfile = srcfile or _srcfile
f = inspect.currentframe()
# On some versions of IronPython, currentframe() returns None if
# IronPython isn't run with -X:Frames.
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)"
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
# ignore any function calls that are in this file
if filename == srcfile:
f = f.f_back
continue
rv = (filename, f.f_lineno, co.co_name)
break
return rv
###
class Logger(logging.Logger):
"""A logger that has an extra `fail` method useful for handling twisted failures."""
def fail(self, callback=None, *args, **kwargs):
"""Returns a function to log a failure from an errback.
The returned function appends the error message and extracts
the traceback from `err`.
Example usage:
d.addErrback(log.fail(), 'This is an error message')
Although odd, making the method call is necessary to extract
out useful filename and line number information; otherwise the
reported values are from inside twisted's deferred handling
code.
Args:
callback: callable to call after making the log. The first argument
will be the `err` from the deferred
args: extra arguments to pass into `callback`
Returns: a function that takes the following arguments:
err: twisted.python.failure.Failure
msg: the message to log, using normal logging string iterpolation.
msg_args: the values to subtitute into `msg`
msg_kwargs: set `level` to change from the default ERROR severity. Other
keywoards are treated as normal log kwargs.
"""
fn, lno, func = findCaller()
def _fail(err, msg, *msg_args, **msg_kwargs):
level = msg_kwargs.pop('level', logging.ERROR)
msg += ": %s"
msg_args += (err.getErrorMessage(),)
exc_info = (err.type, err.value, err.getTracebackObject())
record = self.makeRecord(
self.name, level, fn, lno, msg, msg_args, exc_info, func, msg_kwargs)
self.handle(record)
if callback:
try:
return callback(err, *args, **kwargs)
except Exception:
# log.fail is almost always called within an
# errback. If callback fails and we didn't catch
# the exception we would need to attach a second
# errback to deal with that, which we will almost
# never do and then we end up with an unhandled
# error that will get swallowed by twisted
self.exception('Failed to run callback')
return _fail
def trace(self, msg, *args, **kwargs):
if self.isEnabledFor(TRACE):
self._log(TRACE, msg, args, **kwargs)
logging.setLoggerClass(Logger)
logging.addLevelName(TRACE, 'TRACE')

View file

@ -0,0 +1,72 @@
import logging
from twisted.internet import defer
from twisted._threads import AlreadyQuit
from ComponentManager import ComponentManager
log = logging.getLogger(__name__)
class ComponentType(type):
def __new__(mcs, name, bases, newattrs):
klass = type.__new__(mcs, name, bases, newattrs)
if name != "Component":
ComponentManager.default_component_classes[klass.component_name] = klass
return klass
class Component(object):
"""
lbrynet-daemon component helper
Inheriting classes will be automatically registered with the ComponentManager and must implement setup and stop
methods
"""
__metaclass__ = ComponentType
depends_on = []
component_name = None
def __init__(self, component_manager):
self.component_manager = component_manager
self._running = False
def __lt__(self, other):
return self.component_name < other.component_name
@property
def running(self):
return self._running
def start(self):
raise NotImplementedError()
def stop(self):
raise NotImplementedError()
@property
def component(self):
raise NotImplementedError()
@defer.inlineCallbacks
def _setup(self):
try:
result = yield defer.maybeDeferred(self.start)
self._running = True
defer.returnValue(result)
except (defer.CancelledError, AlreadyQuit):
pass
except Exception as err:
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
raise err
@defer.inlineCallbacks
def _stop(self):
try:
result = yield defer.maybeDeferred(self.stop)
self._running = False
defer.returnValue(result)
except (defer.CancelledError, AlreadyQuit):
pass
except Exception as err:
log.exception("Error stopping %s", self.__class__.__name__)
raise err

View file

@ -0,0 +1,177 @@
import logging
from twisted.internet import defer
from lbrynet.core.Error import ComponentStartConditionNotMet
log = logging.getLogger(__name__)
class RegisteredConditions(object):
conditions = {}
class RequiredConditionType(type):
def __new__(mcs, name, bases, newattrs):
klass = type.__new__(mcs, name, bases, newattrs)
if name != "RequiredCondition":
if klass.name in RegisteredConditions.conditions:
raise SyntaxError("already have a component registered for \"%s\"" % klass.name)
RegisteredConditions.conditions[klass.name] = klass
return klass
class RequiredCondition(object):
name = ""
component = ""
message = ""
@staticmethod
def evaluate(component):
raise NotImplementedError()
__metaclass__ = RequiredConditionType
class ComponentManager(object):
default_component_classes = {}
def __init__(self, reactor=None, analytics_manager=None, skip_components=None, **override_components):
self.skip_components = skip_components or []
self.reactor = reactor
self.component_classes = {}
self.components = set()
self.analytics_manager = analytics_manager
for component_name, component_class in self.default_component_classes.iteritems():
if component_name in override_components:
component_class = override_components.pop(component_name)
if component_name not in self.skip_components:
self.component_classes[component_name] = component_class
if override_components:
raise SyntaxError("unexpected components: %s" % override_components)
for component_class in self.component_classes.itervalues():
self.components.add(component_class(self))
@defer.inlineCallbacks
def evaluate_condition(self, condition_name):
if condition_name not in RegisteredConditions.conditions:
raise NameError(condition_name)
condition = RegisteredConditions.conditions[condition_name]
try:
component = self.get_component(condition.component)
result = yield defer.maybeDeferred(condition.evaluate, component)
except Exception as err:
result = False
defer.returnValue((result, "" if result else condition.message))
def sort_components(self, reverse=False):
"""
Sort components by requirements
"""
steps = []
staged = set()
components = set(self.components)
# components with no requirements
step = []
for component in set(components):
if not component.depends_on:
step.append(component)
staged.add(component.component_name)
components.remove(component)
if step:
step.sort()
steps.append(step)
while components:
step = []
to_stage = set()
for component in set(components):
reqs_met = 0
for needed in component.depends_on:
if needed in staged:
reqs_met += 1
if reqs_met == len(component.depends_on):
step.append(component)
to_stage.add(component.component_name)
components.remove(component)
if step:
step.sort()
staged.update(to_stage)
steps.append(step)
elif components:
raise ComponentStartConditionNotMet("Unresolved dependencies for: %s" % components)
if reverse:
steps.reverse()
return steps
@defer.inlineCallbacks
def setup(self, **callbacks):
"""
Start Components in sequence sorted by requirements
:return: (defer.Deferred)
"""
for component_name, cb in callbacks.iteritems():
if component_name not in self.component_classes:
raise NameError("unknown component: %s" % component_name)
if not callable(cb):
raise ValueError("%s is not callable" % cb)
def _setup(component):
if component.component_name in callbacks:
d = component._setup()
d.addCallback(callbacks[component.component_name], component)
return d
return component._setup()
stages = self.sort_components()
for stage in stages:
yield defer.DeferredList([_setup(component) for component in stage])
@defer.inlineCallbacks
def stop(self):
"""
Stop Components in reversed startup order
:return: (defer.Deferred)
"""
stages = self.sort_components(reverse=True)
for stage in stages:
yield defer.DeferredList([component._stop() for component in stage if component.running])
def all_components_running(self, *component_names):
"""
Check if components are running
:return: (bool) True if all specified components are running
"""
components = {component.component_name: component for component in self.components}
for component in component_names:
if component not in components:
raise NameError("%s is not a known Component" % component)
if not components[component].running:
return False
return True
def get_components_status(self):
"""
List status of all the components, whether they are running or not
:return: (dict) {(str) component_name: (bool) True is running else False}
"""
return {
component.component_name: component.running
for component in self.components
}
def get_component(self, component_name):
for component in self.components:
if component.component_name == component_name:
return component.component
raise NameError(component_name)

View file

@ -0,0 +1,552 @@
import os
import logging
import miniupnpc
from twisted.internet import defer, threads, reactor, error
from lbrynet import conf
from lbrynet.core.Session import Session
from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType
from lbrynet.core.Wallet import LBRYumWallet
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
from lbrynet.daemon.Component import Component
from lbrynet.daemon.ExchangeRateManager import ExchangeRateManager
from lbrynet.database.storage import SQLiteStorage
from lbrynet.dht import node, hashannouncer
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverFactory
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.core.utils import generate_id
log = logging.getLogger(__name__)
# settings must be initialized before this file is imported
DATABASE_COMPONENT = "database"
WALLET_COMPONENT = "wallet"
SESSION_COMPONENT = "session"
DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer"
STREAM_IDENTIFIER_COMPONENT = "stream_identifier"
FILE_MANAGER_COMPONENT = "file_manager"
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
REFLECTOR_COMPONENT = "reflector"
UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
class ConfigSettings(object):
@staticmethod
def get_conf_setting(setting_name):
return conf.settings[setting_name]
@staticmethod
def get_blobfiles_dir():
if conf.settings['BLOBFILES_DIR'] == "blobfiles":
return os.path.join(GCS("data_dir"), "blobfiles")
else:
log.info("Using non-default blobfiles directory: %s", conf.settings['BLOBFILES_DIR'])
return conf.settings['BLOBFILES_DIR']
@staticmethod
def get_node_id():
return conf.settings.node_id
@staticmethod
def get_external_ip():
from lbrynet.core.system_info import get_platform
platform = get_platform(get_ip=True)
return platform['ip']
# Shorthand for common ConfigSettings methods
CS = ConfigSettings
GCS = ConfigSettings.get_conf_setting
class DatabaseComponent(Component):
component_name = DATABASE_COMPONENT
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.storage = None
@property
def component(self):
return self.storage
@staticmethod
def get_current_db_revision():
return 9
@staticmethod
def get_revision_filename():
return conf.settings.get_db_revision_filename()
@staticmethod
def _write_db_revision_file(version_num):
with open(conf.settings.get_db_revision_filename(), mode='w') as db_revision:
db_revision.write(str(version_num))
@defer.inlineCallbacks
def start(self):
# check directories exist, create them if they don't
log.info("Loading databases")
if not os.path.exists(GCS('download_directory')):
os.mkdir(GCS('download_directory'))
if not os.path.exists(GCS('data_dir')):
os.mkdir(GCS('data_dir'))
self._write_db_revision_file(self.get_current_db_revision())
log.debug("Created the db revision file: %s", self.get_revision_filename())
if not os.path.exists(CS.get_blobfiles_dir()):
os.mkdir(CS.get_blobfiles_dir())
log.debug("Created the blobfile directory: %s", str(CS.get_blobfiles_dir()))
if not os.path.exists(self.get_revision_filename()):
log.warning("db_revision file not found. Creating it")
self._write_db_revision_file(self.get_current_db_revision())
# check the db migration and run any needed migrations
with open(self.get_revision_filename(), "r") as revision_read_handle:
old_revision = int(revision_read_handle.read().strip())
if old_revision > self.get_current_db_revision():
raise Exception('This version of lbrynet is not compatible with the database\n'
'Your database is revision %i, expected %i' %
(old_revision, self.get_current_db_revision()))
if old_revision < self.get_current_db_revision():
from lbrynet.database.migrator import dbmigrator
log.info("Upgrading your databases (revision %i to %i)", old_revision, self.get_current_db_revision())
yield threads.deferToThread(
dbmigrator.migrate_db, GCS('data_dir'), old_revision, self.get_current_db_revision()
)
self._write_db_revision_file(self.get_current_db_revision())
log.info("Finished upgrading the databases.")
# start SQLiteStorage
self.storage = SQLiteStorage(GCS('data_dir'))
yield self.storage.setup()
@defer.inlineCallbacks
def stop(self):
yield self.storage.stop()
self.storage = None
class WalletComponent(Component):
component_name = WALLET_COMPONENT
depends_on = [DATABASE_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.wallet = None
@property
def component(self):
return self.wallet
@defer.inlineCallbacks
def start(self):
storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet_type = GCS('wallet')
if wallet_type == conf.LBRYCRD_WALLET:
raise ValueError('LBRYcrd Wallet is no longer supported')
elif wallet_type == conf.LBRYUM_WALLET:
log.info("Using lbryum wallet")
lbryum_servers = {address: {'t': str(port)}
for address, port in GCS('lbryum_servers')}
config = {
'auto_connect': True,
'chain': GCS('blockchain_name'),
'default_servers': lbryum_servers
}
if 'use_keyring' in conf.settings:
config['use_keyring'] = GCS('use_keyring')
if conf.settings['lbryum_wallet_dir']:
config['lbryum_path'] = GCS('lbryum_wallet_dir')
self.wallet = LBRYumWallet(storage, config)
yield self.wallet.start()
else:
raise ValueError('Wallet Type {} is not valid'.format(wallet_type))
@defer.inlineCallbacks
def stop(self):
yield self.wallet.stop()
self.wallet = None
class SessionComponent(Component):
component_name = SESSION_COMPONENT
depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.session = None
@property
def component(self):
return self.session
@defer.inlineCallbacks
def start(self):
self.session = Session(
GCS('data_rate'),
db_dir=GCS('data_dir'),
node_id=CS.get_node_id(),
blob_dir=CS.get_blobfiles_dir(),
dht_node=self.component_manager.get_component(DHT_COMPONENT),
hash_announcer=self.component_manager.get_component(HASH_ANNOUNCER_COMPONENT),
dht_node_port=GCS('dht_node_port'),
known_dht_nodes=GCS('known_dht_nodes'),
peer_port=GCS('peer_port'),
wallet=self.component_manager.get_component(WALLET_COMPONENT),
external_ip=CS.get_external_ip(),
storage=self.component_manager.get_component(DATABASE_COMPONENT)
)
yield self.session.setup()
@defer.inlineCallbacks
def stop(self):
yield self.session.shut_down()
class DHTComponent(Component):
component_name = DHT_COMPONENT
depends_on = [UPNP_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.dht_node = None
self.upnp_component = None
self.udp_port, self.peer_port = None, None
@property
def component(self):
return self.dht_node
@defer.inlineCallbacks
def start(self):
self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
self.peer_port, self.udp_port = self.upnp_component.get_redirects()
node_id = CS.get_node_id()
if node_id is None:
node_id = generate_id()
self.dht_node = node.Node(
node_id=node_id,
udpPort=self.udp_port,
externalIP=CS.get_external_ip(),
peerPort=self.peer_port
)
self.dht_node.start_listening()
yield self.dht_node._protocol._listening
d = self.dht_node.joinNetwork(GCS('known_dht_nodes'))
d.addCallback(lambda _: self.dht_node.start_looping_calls())
d.addCallback(lambda _: log.info("Joined the dht"))
log.info("Started the dht")
@defer.inlineCallbacks
def stop(self):
yield self.dht_node.stop()
class HashAnnouncerComponent(Component):
component_name = HASH_ANNOUNCER_COMPONENT
depends_on = [DHT_COMPONENT, DATABASE_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.hash_announcer = None
@property
def component(self):
return self.hash_announcer
@defer.inlineCallbacks
def start(self):
storage = self.component_manager.get_component(DATABASE_COMPONENT)
dht_node = self.component_manager.get_component(DHT_COMPONENT)
self.hash_announcer = hashannouncer.DHTHashAnnouncer(dht_node, storage)
yield self.hash_announcer.start()
@defer.inlineCallbacks
def stop(self):
yield self.hash_announcer.stop()
class StreamIdentifierComponent(Component):
component_name = STREAM_IDENTIFIER_COMPONENT
depends_on = [SESSION_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.sd_identifier = StreamDescriptorIdentifier()
@property
def component(self):
return self.sd_identifier
@defer.inlineCallbacks
def start(self):
session = self.component_manager.get_component(SESSION_COMPONENT)
add_lbry_file_to_sd_identifier(self.sd_identifier)
file_saver_factory = EncryptedFileSaverFactory(
session.peer_finder,
session.rate_limiter,
session.blob_manager,
session.storage,
session.wallet,
GCS('download_directory')
)
yield self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
def stop(self):
pass
class FileManagerComponent(Component):
component_name = FILE_MANAGER_COMPONENT
depends_on = [SESSION_COMPONENT, STREAM_IDENTIFIER_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.file_manager = None
@property
def component(self):
return self.file_manager
@defer.inlineCallbacks
def start(self):
session = self.component_manager.get_component(SESSION_COMPONENT)
sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT)
log.info('Starting the file manager')
self.file_manager = EncryptedFileManager(session, sd_identifier)
yield self.file_manager.setup()
log.info('Done setting up file manager')
@defer.inlineCallbacks
def stop(self):
yield self.file_manager.stop()
class PeerProtocolServerComponent(Component):
component_name = PEER_PROTOCOL_SERVER_COMPONENT
depends_on = [SESSION_COMPONENT, UPNP_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.lbry_server_port = None
@property
def component(self):
return self.lbry_server_port
@defer.inlineCallbacks
def start(self):
query_handlers = {}
upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
peer_port, udp_port = upnp_component.get_redirects()
session = self.component_manager.get_component(SESSION_COMPONENT)
handlers = [
BlobRequestHandlerFactory(
session.blob_manager,
session.wallet,
session.payment_rate_manager,
self.component_manager.analytics_manager
),
session.wallet.get_wallet_info_query_handler_factory(),
]
for handler in handlers:
query_id = handler.get_primary_query_identifier()
query_handlers[query_id] = handler
if peer_port is not None:
server_factory = ServerProtocolFactory(session.rate_limiter, query_handlers, session.peer_manager)
try:
log.info("Peer protocol listening on TCP %d", peer_port)
self.lbry_server_port = yield reactor.listenTCP(peer_port, server_factory)
except error.CannotListenError as e:
import traceback
log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for"
" more details.", peer_port)
log.error("%s", traceback.format_exc())
raise ValueError("%s lbrynet may already be running on your computer." % str(e))
@defer.inlineCallbacks
def stop(self):
if self.lbry_server_port is not None:
self.lbry_server_port, old_port = None, self.lbry_server_port
log.info('Stop listening on port %s', old_port.port)
yield old_port.stopListening()
class ReflectorComponent(Component):
component_name = REFLECTOR_COMPONENT
depends_on = [SESSION_COMPONENT, FILE_MANAGER_COMPONENT]
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.reflector_server_port = GCS('reflector_port')
self.reflector_server = None
@property
def component(self):
return self.reflector_server
@defer.inlineCallbacks
def start(self):
log.info("Starting reflector server")
session = self.component_manager.get_component(SESSION_COMPONENT)
file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
reflector_factory = reflector_server_factory(session.peer_manager, session.blob_manager, file_manager)
try:
self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory)
log.info('Started reflector on port %s', self.reflector_server_port)
except error.CannotListenError as e:
log.exception("Couldn't bind reflector to port %d", self.reflector_server_port)
raise ValueError("{} lbrynet may already be running on your computer.".format(e))
@defer.inlineCallbacks
def stop(self):
if self.reflector_server is not None:
log.info("Stopping reflector server")
self.reflector_server, p = None, self.reflector_server
yield p.stopListening
class UPnPComponent(Component):
component_name = UPNP_COMPONENT
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.peer_port = GCS('peer_port')
self.dht_node_port = GCS('dht_node_port')
self.use_upnp = GCS('use_upnp')
self.external_ip = CS.get_external_ip()
self.upnp_redirects = []
@property
def component(self):
return self
def get_redirects(self):
return self.peer_port, self.dht_node_port
def start(self):
log.debug("In _try_upnp")
def get_free_port(upnp, port, protocol):
# returns an existing mapping if it exists
mapping = upnp.getspecificportmapping(port, protocol)
if not mapping:
return port
if upnp.lanaddr == mapping[0]:
return mapping[1]
return get_free_port(upnp, port + 1, protocol)
def get_port_mapping(upnp, port, protocol, description):
# try to map to the requested port, if there is already a mapping use the next external
# port available
if protocol not in ['UDP', 'TCP']:
raise Exception("invalid protocol")
port = get_free_port(upnp, port, protocol)
if isinstance(port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, port, protocol, upnp.lanaddr, port)
return port
upnp.addportmapping(port, protocol, upnp.lanaddr, port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port,
protocol, upnp.lanaddr, port)
return port
def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port:
self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port:
self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
return False
def upnp_failed(err):
log.warning("UPnP failed. Reason: %s", err.getErrorMessage())
return False
d = threads.deferToThread(threaded_try_upnp)
d.addErrback(upnp_failed)
return d
def stop(self):
log.info("Unsetting upnp for session")
def threaded_unset_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
for port, protocol in self.upnp_redirects:
if u.getspecificportmapping(port, protocol) is None:
log.warning(
"UPnP redirect for %s %d was removed by something else.",
protocol, port)
else:
u.deleteportmapping(port, protocol)
log.info("Removed UPnP redirect for %s %d.", protocol, port)
self.upnp_redirects = []
d = threads.deferToThread(threaded_unset_upnp)
d.addErrback(lambda err: str(err))
return d
class ExchangeRateManagerComponent(Component):
component_name = EXCHANGE_RATE_MANAGER_COMPONENT
def __init__(self, component_manager):
Component.__init__(self, component_manager)
self.exchange_rate_manager = ExchangeRateManager()
@property
def component(self):
return self.exchange_rate_manager
@defer.inlineCallbacks
def start(self):
yield self.exchange_rate_manager.start()
@defer.inlineCallbacks
def stop(self):
yield self.exchange_rate_manager.stop()

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ from collections import OrderedDict
from lbrynet import conf
from lbrynet.core import utils
from lbrynet.daemon.auth.client import JSONRPCException, LBRYAPIClient, AuthAPIClient
from lbrynet.daemon.Daemon import LOADING_WALLET_CODE, Daemon
from lbrynet.daemon.Daemon import Daemon
from lbrynet.core.system_info import get_platform
from jsonrpc.common import RPCError
from requests.exceptions import ConnectionError
@ -21,17 +21,13 @@ def remove_brackets(key):
return key
def set_flag_vals(flag_names, parsed_args):
def set_kwargs(parsed_args):
kwargs = OrderedDict()
for key, arg in parsed_args.iteritems():
if arg is None:
continue
elif key.startswith("--"):
if remove_brackets(key[2:]) not in kwargs:
k = remove_brackets(key[2:])
elif key in flag_names:
if remove_brackets(flag_names[key]) not in kwargs:
k = remove_brackets(flag_names[key])
elif key.startswith("--") and remove_brackets(key[2:]) not in kwargs:
k = remove_brackets(key[2:])
elif remove_brackets(key) not in kwargs:
k = remove_brackets(key)
kwargs[k] = guess_type(arg, k)
@ -79,26 +75,22 @@ def main():
method = new_method
fn = Daemon.callable_methods[method]
if hasattr(fn, "_flags"):
flag_names = fn._flags
else:
flag_names = {}
parsed = docopt(fn.__doc__, args)
kwargs = set_flag_vals(flag_names, parsed)
kwargs = set_kwargs(parsed)
colorama.init()
conf.initialize_settings()
try:
api = LBRYAPIClient.get_client()
status = api.status()
api.status()
except (URLError, ConnectionError) as err:
if isinstance(err, HTTPError) and err.code == UNAUTHORIZED:
api = AuthAPIClient.config()
# this can happen if the daemon is using auth with the --http-auth flag
# when the config setting is to not use it
try:
status = api.status()
api.status()
except:
print_error("Daemon requires authentication, but none was provided.",
suggest_help=False)
@ -108,20 +100,6 @@ def main():
suggest_help=False)
return 1
status_code = status['startup_status']['code']
if status_code != "started" and method not in Daemon.allowed_during_startup:
print "Daemon is in the process of starting. Please try again in a bit."
message = status['startup_status']['message']
if message:
if (
status['startup_status']['code'] == LOADING_WALLET_CODE
and status['blockchain_status']['blocks_behind'] > 0
):
message += '. Blocks left: ' + str(status['blockchain_status']['blocks_behind'])
print " Status: " + message
return 1
# TODO: check if port is bound. Error if its not
try:

View file

@ -10,7 +10,6 @@ from lbrynet import analytics
from lbrynet import conf
from lbrynet.core import utils
from lbrynet.core import log_support
from lbrynet.daemon.DaemonServer import DaemonServer
from lbrynet.daemon.auth.client import LBRYAPIClient
from lbrynet.daemon.Daemon import Daemon
@ -175,18 +174,7 @@ def start_server_and_listen(use_auth, analytics_manager, quiet):
logging.getLogger("requests").setLevel(logging.CRITICAL)
analytics_manager.send_server_startup()
daemon_server = DaemonServer(analytics_manager)
try:
yield daemon_server.start(use_auth)
analytics_manager.send_server_startup_success()
if not quiet:
print "Started lbrynet-daemon!"
defer.returnValue(True)
except Exception as e:
log.exception('Failed to start lbrynet-daemon')
analytics_manager.send_server_startup_error(str(e))
daemon_server.stop()
raise
yield Daemon().start_listening()
def threaded_terminal(started_daemon, quiet):

View file

@ -12,13 +12,12 @@ from lbrynet.core import log_support
import argparse
import logging.handlers
from twisted.internet import defer, reactor
from twisted.internet import reactor
from jsonrpc.proxy import JSONRPCProxy
from lbrynet import analytics
from lbrynet import conf
from lbrynet.core import utils, system_info
from lbrynet.daemon.DaemonServer import DaemonServer
from lbrynet.daemon.Daemon import Daemon
log = logging.getLogger(__name__)
@ -71,6 +70,7 @@ def start():
lbrynet_log = conf.settings.get_log_filename()
log_support.configure_logging(lbrynet_log, not args.quiet, args.verbose)
log_support.configure_loggly_handler()
log.debug('Final Settings: %s', conf.settings.get_current_settings_dict())
try:
@ -84,8 +84,8 @@ def start():
log.info("Starting lbrynet-daemon from command line")
if test_internet_connection():
analytics_manager = analytics.Manager.new_instance()
start_server_and_listen(analytics_manager)
daemon = Daemon()
daemon.start_listening()
reactor.run()
else:
log.info("Not connected to internet, unable to start")
@ -101,24 +101,5 @@ def update_settings_from_args(args):
}, data_types=(conf.TYPE_CLI,))
@defer.inlineCallbacks
def start_server_and_listen(analytics_manager):
"""
Args:
use_auth: set to true to enable http authentication
analytics_manager: to send analytics
"""
analytics_manager.send_server_startup()
daemon_server = DaemonServer(analytics_manager)
try:
yield daemon_server.start(conf.settings['use_auth_http'])
analytics_manager.send_server_startup_success()
except Exception as e:
log.exception('Failed to start lbrynet-daemon')
analytics_manager.send_server_startup_error(str(e))
daemon_server.stop()
if __name__ == "__main__":
start()

View file

@ -1,77 +0,0 @@
import logging
import os
from twisted.web import server, guard, resource
from twisted.internet import defer, reactor, error
from twisted.cred import portal
from lbrynet import conf
from lbrynet.daemon.Daemon import Daemon
from lbrynet.daemon.auth.auth import PasswordChecker, HttpPasswordRealm
from lbrynet.daemon.auth.util import initialize_api_key_file
log = logging.getLogger(__name__)
class IndexResource(resource.Resource):
def getChild(self, name, request):
request.setHeader('cache-control', 'no-cache, no-store, must-revalidate')
request.setHeader('expires', '0')
return self if name == '' else resource.Resource.getChild(self, name, request)
class DaemonServer(object):
def __init__(self, analytics_manager=None):
self._daemon = None
self.root = None
self.server_port = None
self.analytics_manager = analytics_manager
def _setup_server(self, use_auth):
self.root = IndexResource()
self._daemon = Daemon(self.analytics_manager)
self.root.putChild("", self._daemon)
# TODO: DEPRECATED, remove this and just serve the API at the root
self.root.putChild(conf.settings['API_ADDRESS'], self._daemon)
lbrynet_server = get_site_base(use_auth, self.root)
try:
self.server_port = reactor.listenTCP(
conf.settings['api_port'], lbrynet_server, interface=conf.settings['api_host'])
log.info("lbrynet API listening on TCP %s:%i", conf.settings['api_host'], conf.settings['api_port'])
except error.CannotListenError:
log.info('Daemon already running, exiting app')
raise
return defer.succeed(True)
@defer.inlineCallbacks
def start(self, use_auth):
yield self._setup_server(use_auth)
yield self._daemon.setup()
def stop(self):
if reactor.running:
log.info("Stopping the reactor")
reactor.fireSystemEvent("shutdown")
def get_site_base(use_auth, root):
if use_auth:
log.info("Using authenticated API")
root = create_auth_session(root)
else:
log.info("Using non-authenticated API")
return server.Site(root)
def create_auth_session(root):
pw_path = os.path.join(conf.settings['data_dir'], ".api_keys")
initialize_api_key_file(pw_path)
checker = PasswordChecker.load_file(pw_path)
realm = HttpPasswordRealm(root)
portal_to_realm = portal.Portal(realm, [checker, ])
factory = guard.BasicCredentialFactory('Login to lbrynet api')
_lbrynet_server = guard.HTTPAuthSessionWrapper(portal_to_realm, [factory, ])
return _lbrynet_server

View file

@ -12,7 +12,7 @@ log = logging.getLogger(__name__)
CURRENCY_PAIRS = ["USDBTC", "BTCLBC"]
BITTREX_FEE = 0.0025
COINBASE_FEE = 0.0 #add fee
COINBASE_FEE = 0.0 # add fee
class ExchangeRate(object):
@ -37,6 +37,7 @@ class ExchangeRate(object):
class MarketFeed(object):
REQUESTS_TIMEOUT = 20
EXCHANGE_RATE_UPDATE_RATE_SEC = 300
def __init__(self, market, name, url, params, fee):
self.market = market
self.name = name
@ -115,7 +116,7 @@ class BittrexFeed(MarketFeed):
qtys = sum([i['Quantity'] for i in trades])
if totals <= 0 or qtys <= 0:
raise InvalidExchangeRateResponse(self.market, 'quantities were not positive')
vwap = totals/qtys
vwap = totals / qtys
return defer.succeed(float(1.0 / vwap))
@ -175,12 +176,11 @@ class CryptonatorBTCFeed(MarketFeed):
except ValueError:
raise InvalidExchangeRateResponse(self.name, "invalid rate response")
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(float(json_response['ticker']['price']))
class CryptonatorFeed(MarketFeed):
def __init__(self):
MarketFeed.__init__(
@ -198,7 +198,7 @@ class CryptonatorFeed(MarketFeed):
except ValueError:
raise InvalidExchangeRateResponse(self.name, "invalid rate response")
if 'ticker' not in json_response or len(json_response['ticker']) == 0 or \
'success' not in json_response or json_response['success'] is not True:
'success' not in json_response or json_response['success'] is not True:
raise InvalidExchangeRateResponse(self.name, 'result not found')
return defer.succeed(float(json_response['ticker']['price']))
@ -231,11 +231,11 @@ class ExchangeRateManager(object):
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair == (from_currency, to_currency)):
market.rate.currency_pair == (from_currency, to_currency)):
return amount * market.rate.spot
for market in self.market_feeds:
if (market.rate_is_initialized() and market.is_online() and
market.rate.currency_pair[0] == from_currency):
market.rate.currency_pair[0] == from_currency):
return self.convert_currency(
market.rate.currency_pair[1], to_currency, amount * market.rate.spot)
raise Exception(

View file

@ -1,3 +1,4 @@
from lbrynet import custom_logger
import Components # register Component classes
from lbrynet.daemon.auth.client import LBRYAPIClient
get_client = LBRYAPIClient.get_client

View file

@ -0,0 +1,38 @@
import logging
import os
from twisted.web import server, guard, resource
from twisted.cred import portal
from lbrynet import conf
from .auth import PasswordChecker, HttpPasswordRealm
from .util import initialize_api_key_file
log = logging.getLogger(__name__)
class AuthJSONRPCResource(resource.Resource):
def __init__(self, protocol):
resource.Resource.__init__(self)
self.putChild("", protocol)
self.putChild(conf.settings['API_ADDRESS'], protocol)
def getChild(self, name, request):
request.setHeader('cache-control', 'no-cache, no-store, must-revalidate')
request.setHeader('expires', '0')
return self if name == '' else resource.Resource.getChild(self, name, request)
def getServerFactory(self):
if conf.settings['use_auth_http']:
log.info("Using authenticated API")
pw_path = os.path.join(conf.settings['data_dir'], ".api_keys")
initialize_api_key_file(pw_path)
checker = PasswordChecker.load_file(pw_path)
realm = HttpPasswordRealm(self)
portal_to_realm = portal.Portal(realm, [checker, ])
factory = guard.BasicCredentialFactory('Login to lbrynet api')
root = guard.HTTPAuthSessionWrapper(portal_to_realm, [factory, ])
else:
log.info("Using non-authenticated API")
root = self
return server.Site(root)

View file

@ -2,8 +2,10 @@ import logging
import urlparse
import json
import inspect
import signal
from decimal import Decimal
from functools import wraps
from zope.interface import implements
from twisted.web import server, resource
from twisted.internet import defer
@ -12,13 +14,16 @@ from twisted.internet.error import ConnectionDone, ConnectionLost
from txjsonrpc import jsonrpclib
from traceback import format_exc
from lbrynet import conf
from lbrynet import conf, analytics
from lbrynet.core.Error import InvalidAuthenticationToken
from lbrynet.core import utils
from lbrynet.daemon.auth.util import APIKey, get_auth_message
from lbrynet.daemon.auth.client import LBRY_SECRET
from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet
from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.daemon.ComponentManager import ComponentManager
from lbrynet.undecorated import undecorated
from .util import APIKey, get_auth_message
from .client import LBRY_SECRET
from .factory import AuthJSONRPCResource
log = logging.getLogger(__name__)
EMPTY_PARAMS = [{}]
@ -91,10 +96,6 @@ class UnknownAPIMethodError(Exception):
pass
class NotAllowedDuringStartupError(Exception):
pass
def trap(err, *to_trap):
err.trap(*to_trap)
@ -141,6 +142,29 @@ class AuthorizedBase(object):
return f
return _deprecated_wrapper
@staticmethod
def requires(*components, **conditions):
if conditions and ["conditions"] != conditions.keys():
raise SyntaxError("invalid conditions argument")
condition_names = conditions.get("conditions", [])
def _wrap(fn):
@defer.inlineCallbacks
@wraps(fn)
def _inner(*args, **kwargs):
component_manager = args[0].component_manager
for condition_name in condition_names:
condition_result, err_msg = yield component_manager.evaluate_condition(condition_name)
if not condition_result:
raise ComponentStartConditionNotMet(err_msg)
if not component_manager.all_components_running(*components):
raise ComponentsNotStarted("the following required components have not yet started: "
"%s" % json.dumps(components))
result = yield fn(*args, **kwargs)
defer.returnValue(result)
return _inner
return _wrap
class AuthJSONRPCServer(AuthorizedBase):
"""
@ -149,7 +173,6 @@ class AuthJSONRPCServer(AuthorizedBase):
API methods are named with a leading "jsonrpc_"
Attributes:
allowed_during_startup (list): list of api methods that are callable before the server has finished startup
sessions (dict): (dict): {<session id>: <lbrynet.daemon.auth.util.APIKey>}
callable_methods (dict): {<api method name>: <api method>}
@ -170,14 +193,85 @@ class AuthJSONRPCServer(AuthorizedBase):
isLeaf = True
allowed_during_startup = []
component_attributes = {}
def __init__(self, use_authentication=None):
def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, to_skip=None,
looping_calls=None):
self.analytics_manager = analytics_manager or analytics.Manager.new_instance()
self.component_manager = component_manager or ComponentManager(
analytics_manager=self.analytics_manager,
skip_components=to_skip or []
)
self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).iteritems()})
self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).iteritems()}
self._use_authentication = use_authentication or conf.settings['use_auth_http']
self._component_setup_deferred = None
self.announced_startup = False
self.sessions = {}
@defer.inlineCallbacks
def start_listening(self):
from twisted.internet import reactor, error as tx_error
try:
reactor.listenTCP(
conf.settings['api_port'], self.get_server_factory(), interface=conf.settings['api_host']
)
log.info("lbrynet API listening on TCP %s:%i", conf.settings['api_host'], conf.settings['api_port'])
yield self.setup()
self.analytics_manager.send_server_startup_success()
except tx_error.CannotListenError:
log.error('lbrynet API failed to bind TCP %s:%i for listening', conf.settings['api_host'],
conf.settings['api_port'])
reactor.fireSystemEvent("shutdown")
except defer.CancelledError:
log.info("shutting down before finished starting")
reactor.fireSystemEvent("shutdown")
except Exception as err:
self.analytics_manager.send_server_startup_error(str(err))
log.exception('Failed to start lbrynet-daemon')
reactor.fireSystemEvent("shutdown")
def setup(self):
return NotImplementedError()
from twisted.internet import reactor
reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
if not self.analytics_manager.is_started:
self.analytics_manager.start()
for lc_name, lc_time in self._looping_call_times.iteritems():
self.looping_call_manager.start(lc_name, lc_time)
def update_attribute(setup_result, component):
setattr(self, self.component_attributes[component.component_name], component.component)
kwargs = {component: update_attribute for component in self.component_attributes.keys()}
self._component_setup_deferred = self.component_manager.setup(**kwargs)
return self._component_setup_deferred
@staticmethod
def _already_shutting_down(sig_num, frame):
log.info("Already shutting down")
def _shutdown(self):
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
signal.signal(signal.SIGTERM, self._already_shutting_down)
self.looping_call_manager.shutdown()
if self.analytics_manager:
self.analytics_manager.shutdown()
try:
self._component_setup_deferred.cancel()
except (AttributeError, defer.CancelledError):
pass
if self.component_manager is not None:
d = self.component_manager.stop()
d.addErrback(log.fail(), 'Failure while shutting down')
else:
d = defer.succeed(None)
return d
def get_server_factory(self):
return AuthJSONRPCResource(self).getServerFactory()
def _set_headers(self, request, data, update_secret=False):
if conf.settings['allowed_origin']:
@ -207,8 +301,9 @@ class AuthJSONRPCServer(AuthorizedBase):
else:
# last resort, just cast it as a string
error = JSONRPCError(str(failure))
log.warning("error processing api request: %s\ntraceback: %s", error.message,
"\n".join(error.traceback))
if not failure.check(ComponentsNotStarted, ComponentStartConditionNotMet):
log.warning("error processing api request: %s\ntraceback: %s", error.message,
"\n".join(error.traceback))
response_content = jsonrpc_dumps_pretty(error, id=id_)
self._set_headers(request, response_content)
request.setResponseCode(200)
@ -304,14 +399,6 @@ class AuthJSONRPCServer(AuthorizedBase):
request, request_id
)
return server.NOT_DONE_YET
except NotAllowedDuringStartupError:
log.warning('Function not allowed during startup: %s', function_name)
self._render_error(
JSONRPCError("This method is unavailable until the daemon is fully started",
code=JSONRPCError.CODE_INVALID_REQUEST),
request, request_id
)
return server.NOT_DONE_YET
if args == EMPTY_PARAMS or args == []:
_args, _kwargs = (), {}
@ -416,9 +503,6 @@ class AuthJSONRPCServer(AuthorizedBase):
def _verify_method_is_callable(self, function_path):
if function_path not in self.callable_methods:
raise UnknownAPIMethodError(function_path)
if not self.announced_startup:
if function_path not in self.allowed_during_startup:
raise NotAllowedDuringStartupError(function_path)
def _get_jsonrpc_method(self, function_path):
if function_path in self.deprecated_methods:

View file

@ -281,7 +281,9 @@ class Node(MockKademliaHelper):
yield self._protocol._listening
# TODO: Refresh all k-buckets further away than this node's closest neighbour
yield self.joinNetwork(known_node_addresses or [])
self.start_looping_calls()
def start_looping_calls(self):
self.safe_start_looping_call(self._change_token_lc, constants.tokenSecretChangeInterval)
# Start refreshing k-buckets periodically, if necessary
self.safe_start_looping_call(self._refresh_node_lc, constants.checkRefreshInterval)

View file

@ -39,6 +39,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
log_format = "%(funcName)s(): %(message)s"
logging.basicConfig(level=logging.CRITICAL, format=log_format)
TEST_SKIP_STRING_ANDROID = "Test cannot pass on Android because multiprocessing is not supported at the OS level."
def require_system(system):
def wrapper(fn):
@ -103,13 +104,14 @@ class LbryUploader(object):
rate_limiter = RateLimiter()
self.sd_identifier = StreamDescriptorIdentifier()
self.db_dir, self.blob_dir = mk_db_and_blob_dir()
dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553,
node_id="abcd", externalIP="127.0.0.1")
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, blob_dir=self.blob_dir,
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
peer_port=5553, dht_node_port=4445, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1")
peer_port=5553, dht_node_port=4445, rate_limiter=rate_limiter, wallet=wallet,
dht_node=dht_node, external_ip="127.0.0.1")
self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
if self.ul_rate_limit is not None:
self.session.rate_limiter.set_ul_limit(self.ul_rate_limit)
@ -197,12 +199,10 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event,
db_dir, blob_dir = mk_db_and_blob_dir()
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd" + str(n), dht_node_port=4446, dht_node_class=FakeNode,
node_id="abcd" + str(n), dht_node_port=4446,
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
rate_limiter=rate_limiter, wallet=wallet,
external_ip="127.0.0.1")
lbry_file_manager = EncryptedFileManager(session, sd_identifier)
@ -303,13 +303,14 @@ def start_blob_uploader(blob_hash_queue, kill_event, dead_event, slow, is_genero
db_dir, blob_dir = mk_db_and_blob_dir()
dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553,
node_id="abcd", externalIP="127.0.0.1")
session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="efgh",
peer_finder=peer_finder, hash_announcer=hash_announcer, dht_node_class=FakeNode,
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=peer_port, dht_node_port=4446,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
rate_limiter=rate_limiter, wallet=wallet,
external_ip="127.0.0.1", dht_node=dht_node)
if slow is True:
session.rate_limiter.set_ul_limit(2 ** 11)
@ -478,15 +479,16 @@ class TestTransfer(TestCase):
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553,
node_id="abcd", externalIP="127.0.0.1")
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd", peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
dht_node_class=FakeNode, is_generous=self.is_generous, external_ip="127.0.0.1")
rate_limiter=rate_limiter, wallet=wallet,
dht_node=dht_node, external_ip="127.0.0.1")
self.lbry_file_manager = EncryptedFileManager(
self.session, sd_identifier)
@ -566,15 +568,16 @@ class TestTransfer(TestCase):
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553,
node_id="abcd", externalIP="127.0.0.1")
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445, dht_node_class=FakeNode,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1")
blob_dir=blob_dir, peer_port=5553, dht_node_port=4445,
rate_limiter=rate_limiter, wallet=wallet,
dht_node=dht_node, external_ip="127.0.0.1")
d1 = self.wait_for_hash_from_queue(blob_hash_queue_1)
d2 = self.wait_for_hash_from_queue(blob_hash_queue_2)
@ -646,17 +649,17 @@ class TestTransfer(TestCase):
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
dht_node = FakeNode(peer_finder=peer_finder, peer_manager=peer_manager, udpPort=4445, peerPort=5553,
node_id="abcd", externalIP="127.0.0.1")
downloaders = []
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445, dht_node_class=FakeNode,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445,
hash_announcer=hash_announcer, blob_dir=blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
rate_limiter=rate_limiter, wallet=wallet,
external_ip="127.0.0.1", dht_node=dht_node)
self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier)
@ -758,13 +761,11 @@ class TestTransfer(TestCase):
sd_identifier = StreamDescriptorIdentifier()
db_dir, blob_dir = mk_db_and_blob_dir()
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir, dht_node_class=FakeNode,
self.session = Session(conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=db_dir,
node_id="abcd", peer_finder=peer_finder, dht_node_port=4445,
hash_announcer=hash_announcer, blob_dir=blob_dir,
peer_port=5553, use_upnp=False, rate_limiter=rate_limiter,
wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1],
external_ip="127.0.0.1")
peer_port=5553, rate_limiter=rate_limiter,
wallet=wallet, external_ip="127.0.0.1")
self.lbry_file_manager = EncryptedFileManager(
self.session, sd_identifier)
@ -842,3 +843,10 @@ class TestTransfer(TestCase):
d.addBoth(stop)
return d
if is_android():
test_lbry_transfer.skip = TEST_SKIP_STRING_ANDROID
test_last_blob_retrieval.skip = TEST_SKIP_STRING_ANDROID
test_double_download.skip = TEST_SKIP_STRING_ANDROID
test_multiple_uploaders.skip = TEST_SKIP_STRING_ANDROID

View file

@ -53,13 +53,13 @@ class TestReflector(unittest.TestCase):
db_dir=self.db_dir,
node_id="abcd",
peer_finder=peer_finder,
peer_manager=peer_manager,
blob_dir=self.blob_dir,
peer_port=5553,
dht_node_port=4444,
use_upnp=False,
wallet=wallet,
blob_tracker_class=mocks.BlobAvailabilityTracker,
external_ip="127.0.0.1",
dht_node=mocks.Node(),
hash_announcer=mocks.Announcer(),
)
@ -73,13 +73,13 @@ class TestReflector(unittest.TestCase):
db_dir=self.server_db_dir,
node_id="abcd",
peer_finder=peer_finder,
peer_manager=peer_manager,
blob_dir=self.server_blob_dir,
peer_port=5554,
dht_node_port=4443,
use_upnp=False,
wallet=wallet,
blob_tracker_class=mocks.BlobAvailabilityTracker,
external_ip="127.0.0.1",
dht_node=mocks.Node(),
hash_announcer=mocks.Announcer(),
)

View file

@ -30,6 +30,7 @@ DummyBlobAvailabilityTracker = mocks.BlobAvailabilityTracker
class TestStreamify(TestCase):
maxDiff = 5000
def setUp(self):
mocks.mock_conf_settings(self)
self.session = None
@ -37,6 +38,12 @@ class TestStreamify(TestCase):
self.is_generous = True
self.db_dir = tempfile.mkdtemp()
self.blob_dir = os.path.join(self.db_dir, "blobfiles")
self.dht_node = FakeNode()
self.wallet = FakeWallet()
self.peer_manager = PeerManager()
self.peer_finder = FakePeerFinder(5553, self.peer_manager, 2)
self.rate_limiter = DummyRateLimiter()
self.sd_identifier = StreamDescriptorIdentifier()
os.mkdir(self.blob_dir)
@defer.inlineCallbacks
@ -54,26 +61,17 @@ class TestStreamify(TestCase):
os.remove("test_file")
def test_create_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=self.blob_dir, peer_port=5553,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker,
is_generous=self.is_generous, external_ip="127.0.0.1", dht_node_class=mocks.Node
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", peer_finder=self.peer_finder,
blob_dir=self.blob_dir, peer_port=5553, rate_limiter=self.rate_limiter, wallet=self.wallet,
external_ip="127.0.0.1", dht_node=self.dht_node
)
self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier)
self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
d = self.session.setup()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
def verify_equal(sd_info):
@ -102,22 +100,14 @@ class TestStreamify(TestCase):
return d
def test_create_and_combine_stream(self):
wallet = FakeWallet()
peer_manager = PeerManager()
peer_finder = FakePeerFinder(5553, peer_manager, 2)
hash_announcer = FakeAnnouncer()
rate_limiter = DummyRateLimiter()
sd_identifier = StreamDescriptorIdentifier()
self.session = Session(
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd",
peer_finder=peer_finder, hash_announcer=hash_announcer,
blob_dir=self.blob_dir, peer_port=5553, dht_node_class=mocks.Node,
use_upnp=False, rate_limiter=rate_limiter, wallet=wallet,
blob_tracker_class=DummyBlobAvailabilityTracker, external_ip="127.0.0.1"
conf.ADJUSTABLE_SETTINGS['data_rate'][1], db_dir=self.db_dir, node_id="abcd", peer_finder=self.peer_finder,
blob_dir=self.blob_dir, peer_port=5553, rate_limiter=self.rate_limiter, wallet=self.wallet,
external_ip="127.0.0.1", dht_node=self.dht_node
)
self.lbry_file_manager = EncryptedFileManager(self.session, sd_identifier)
self.lbry_file_manager = EncryptedFileManager(self.session, self.sd_identifier)
@defer.inlineCallbacks
def create_stream():
@ -132,7 +122,7 @@ class TestStreamify(TestCase):
self.assertEqual(hashsum.hexdigest(), "68959747edc73df45e45db6379dd7b3b")
d = self.session.setup()
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier))
d.addCallback(lambda _: add_lbry_file_to_sd_identifier(self.sd_identifier))
d.addCallback(lambda _: self.lbry_file_manager.setup())
d.addCallback(lambda _: create_stream())
return d

View file

@ -1,5 +1,6 @@
import base64
import io
import mock
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
@ -10,6 +11,7 @@ from twisted.python.failure import Failure
from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import RequestCanceledError
from lbrynet.core import BlobAvailability
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.dht.node import Node as RealNode
from lbrynet.daemon import ExchangeRateManager as ERM
from lbrynet import conf
@ -63,6 +65,7 @@ class BTCLBCFeed(ERM.MarketFeed):
0.0
)
class USDBTCFeed(ERM.MarketFeed):
def __init__(self):
ERM.MarketFeed.__init__(
@ -74,6 +77,7 @@ class USDBTCFeed(ERM.MarketFeed):
0.0
)
class ExchangeRateManager(ERM.ExchangeRateManager):
def __init__(self, market_feeds, rates):
self.market_feeds = market_feeds
@ -360,6 +364,96 @@ class BlobAvailabilityTracker(BlobAvailability.BlobAvailabilityTracker):
pass
# The components below viz. FakeWallet, FakeSession, FakeFileManager are just for testing Component Manager's
# startup and stop
class FakeComponent(object):
depends_on = []
component_name = None
def __init__(self, component_manager):
self.component_manager = component_manager
self._running = False
@property
def running(self):
return self._running
def start(self):
raise NotImplementedError # Override
def stop(self):
return defer.succeed(None)
@property
def component(self):
return self
@defer.inlineCallbacks
def _setup(self):
result = yield defer.maybeDeferred(self.start)
self._running = True
defer.returnValue(result)
@defer.inlineCallbacks
def _stop(self):
result = yield defer.maybeDeferred(self.stop)
self._running = False
defer.returnValue(result)
class FakeDelayedWallet(FakeComponent):
component_name = "wallet"
depends_on = []
def start(self):
return defer.succeed(True)
def stop(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
class FakeDelayedSession(FakeComponent):
component_name = "session"
depends_on = [FakeDelayedWallet.component_name]
def start(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
def stop(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
class FakeDelayedFileManager(FakeComponent):
component_name = "file_manager"
depends_on = [FakeDelayedSession.component_name]
def start(self):
d = defer.Deferred()
self.component_manager.reactor.callLater(1, d.callback, True)
return d
def stop(self):
return defer.succeed(True)
class FakeFileManager(FakeComponent):
component_name = "file_manager"
depends_on = []
@property
def component(self):
return mock.Mock(spec=EncryptedFileManager)
def start(self):
return defer.succeed(True)
def stop(self):
pass
create_stream_sd_file = {
'stream_name': '746573745f66696c65',

View file

@ -0,0 +1,133 @@
from twisted.internet.task import Clock
from twisted.trial import unittest
from lbrynet.daemon.ComponentManager import ComponentManager
from lbrynet.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, STREAM_IDENTIFIER_COMPONENT
from lbrynet.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT
from lbrynet.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
from lbrynet.daemon import Components
from lbrynet.tests import mocks
class TestComponentManager(unittest.TestCase):
def setUp(self):
mocks.mock_conf_settings(self)
self.default_components_sort = [
[Components.DatabaseComponent,
Components.ExchangeRateManagerComponent,
Components.UPnPComponent],
[Components.DHTComponent,
Components.WalletComponent],
[Components.HashAnnouncerComponent],
[Components.SessionComponent],
[Components.PeerProtocolServerComponent,
Components.StreamIdentifierComponent],
[Components.FileManagerComponent],
[Components.ReflectorComponent]
]
self.component_manager = ComponentManager()
def tearDown(self):
pass
def test_sort_components(self):
stages = self.component_manager.sort_components()
for stage_list, sorted_stage_list in zip(stages, self.default_components_sort):
self.assertEqual([type(stage) for stage in stage_list], sorted_stage_list)
def test_sort_components_reverse(self):
rev_stages = self.component_manager.sort_components(reverse=True)
reverse_default_components_sort = reversed(self.default_components_sort)
for stage_list, sorted_stage_list in zip(rev_stages, reverse_default_components_sort):
self.assertEqual([type(stage) for stage in stage_list], sorted_stage_list)
def test_get_component_not_exists(self):
with self.assertRaises(NameError):
self.component_manager.get_component("random_component")
class TestComponentManagerOverrides(unittest.TestCase):
def setUp(self):
mocks.mock_conf_settings(self)
def test_init_with_overrides(self):
class FakeWallet(object):
component_name = "wallet"
depends_on = []
def __init__(self, component_manager):
self.component_manager = component_manager
@property
def component(self):
return self
new_component_manager = ComponentManager(wallet=FakeWallet)
fake_wallet = new_component_manager.get_component("wallet")
# wallet should be an instance of FakeWallet and not WalletComponent from Components.py
self.assertIsInstance(fake_wallet, FakeWallet)
self.assertNotIsInstance(fake_wallet, Components.WalletComponent)
def test_init_with_wrong_overrides(self):
class FakeRandomComponent(object):
component_name = "someComponent"
depends_on = []
with self.assertRaises(SyntaxError):
ComponentManager(randomComponent=FakeRandomComponent)
class TestComponentManagerProperStart(unittest.TestCase):
def setUp(self):
self.reactor = Clock()
mocks.mock_conf_settings(self)
self.component_manager = ComponentManager(
skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT,
EXCHANGE_RATE_MANAGER_COMPONENT],
reactor=self.reactor,
wallet=mocks.FakeDelayedWallet,
session=mocks.FakeDelayedSession,
file_manager=mocks.FakeDelayedFileManager
)
def tearDown(self):
pass
def test_proper_starting_of_components(self):
self.component_manager.setup()
self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertFalse(self.component_manager.get_component('session').running)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.reactor.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('session').running)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.reactor.advance(1)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.assertTrue(self.component_manager.get_component('session').running)
self.assertTrue(self.component_manager.get_component('file_manager').running)
def test_proper_stopping_of_components(self):
self.component_manager.setup()
self.reactor.advance(1)
self.reactor.advance(1)
self.component_manager.stop()
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertTrue(self.component_manager.get_component('session').running)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.reactor.advance(1)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertFalse(self.component_manager.get_component('session').running)
self.assertTrue(self.component_manager.get_component('wallet').running)
self.reactor.advance(1)
self.assertFalse(self.component_manager.get_component('file_manager').running)
self.assertFalse(self.component_manager.get_component('session').running)
self.assertFalse(self.component_manager.get_component('wallet').running)

View file

@ -11,7 +11,7 @@ class AuthJSONRPCServerTest(unittest.TestCase):
# onto it.
def setUp(self):
conf.initialize_settings(False)
self.server = server.AuthJSONRPCServer(use_authentication=False)
self.server = server.AuthJSONRPCServer(True, use_authentication=False)
def test_get_server_port(self):
self.assertSequenceEqual(

View file

@ -1,11 +1,10 @@
import mock
import json
import unittest
import random
from os import path
from twisted.internet import defer
from twisted import trial
from twisted.trial import unittest
from faker import Faker
@ -14,12 +13,15 @@ from lbryum.wallet import NewWallet
from lbrynet import conf
from lbrynet.core import Session, PaymentRateManager, Wallet
from lbrynet.database.storage import SQLiteStorage
from lbrynet.daemon.ComponentManager import ComponentManager
from lbrynet.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, STREAM_IDENTIFIER_COMPONENT
from lbrynet.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, SESSION_COMPONENT
from lbrynet.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
from lbrynet.daemon.Daemon import Daemon as LBRYDaemon
from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.tests import util
from lbrynet.tests.mocks import mock_conf_settings, FakeNetwork
from lbrynet.tests.mocks import mock_conf_settings, FakeNetwork, FakeFileManager
from lbrynet.tests.mocks import BlobAvailabilityTracker as DummyBlobAvailabilityTracker
from lbrynet.tests.mocks import ExchangeRateManager as DummyExchangeRateManager
from lbrynet.tests.mocks import BTCLBCFeed, USDBTCFeed
@ -40,10 +42,10 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
}
daemon = LBRYDaemon(None)
daemon.session = mock.Mock(spec=Session.Session)
daemon.session.wallet = mock.Mock(spec=Wallet.LBRYumWallet)
daemon.session.wallet.wallet = mock.Mock(spec=NewWallet)
daemon.session.wallet.wallet.use_encryption = False
daemon.session.wallet.network = FakeNetwork()
daemon.wallet = mock.Mock(spec=Wallet.LBRYumWallet)
daemon.wallet.wallet = mock.Mock(spec=NewWallet)
daemon.wallet.wallet.use_encryption = False
daemon.wallet.network = FakeNetwork()
daemon.session.storage = mock.Mock(spec=SQLiteStorage)
market_feeds = [BTCLBCFeed(), USDBTCFeed()]
daemon.exchange_rate_manager = DummyExchangeRateManager(market_feeds, rates)
@ -73,12 +75,12 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
{"fee": {"USD": {"address": "bQ6BGboPV2SpTMEP7wLNiAcnsZiH8ye6eA", "amount": 0.75}}})
daemon._resolve_name = lambda _: defer.succeed(metadata)
migrated = smart_decode(json.dumps(metadata))
daemon.session.wallet.resolve = lambda *_: defer.succeed(
daemon.wallet.resolve = lambda *_: defer.succeed(
{"test": {'claim': {'value': migrated.claim_dict}}})
return daemon
class TestCostEst(trial.unittest.TestCase):
class TestCostEst(unittest.TestCase):
def setUp(self):
mock_conf_settings(self)
util.resetTime(self)
@ -111,7 +113,8 @@ class TestCostEst(trial.unittest.TestCase):
self.assertEquals(daemon.get_est_cost("test", size).result, correct_result)
class TestJsonRpc(trial.unittest.TestCase):
class TestJsonRpc(unittest.TestCase):
def setUp(self):
def noop():
return None
@ -119,30 +122,39 @@ class TestJsonRpc(trial.unittest.TestCase):
mock_conf_settings(self)
util.resetTime(self)
self.test_daemon = get_test_daemon()
self.test_daemon.session.wallet.is_first_run = False
self.test_daemon.session.wallet.get_best_blockhash = noop
self.test_daemon.wallet.is_first_run = False
self.test_daemon.wallet.get_best_blockhash = noop
def test_status(self):
d = defer.maybeDeferred(self.test_daemon.jsonrpc_status)
d.addCallback(lambda status: self.assertDictContainsSubset({'is_running': False}, status))
@unittest.skipIf(is_android(),
'Test cannot pass on Android because PYTHONOPTIMIZE removes the docstrings.')
def test_help(self):
d = defer.maybeDeferred(self.test_daemon.jsonrpc_help, command='status')
d.addCallback(lambda result: self.assertSubstring('daemon status', result['help']))
# self.assertSubstring('daemon status', d.result)
if is_android():
test_help.skip = "Test cannot pass on Android because PYTHONOPTIMIZE removes the docstrings."
class TestFileListSorting(trial.unittest.TestCase):
class TestFileListSorting(unittest.TestCase):
def setUp(self):
mock_conf_settings(self)
util.resetTime(self)
self.faker = Faker('en_US')
self.faker.seed(66410)
self.test_daemon = get_test_daemon()
self.test_daemon.lbry_file_manager = mock.Mock(spec=EncryptedFileManager)
self.test_daemon.lbry_file_manager.lbry_files = self._get_fake_lbry_files()
component_manager = ComponentManager(
skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, SESSION_COMPONENT, UPNP_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, HASH_ANNOUNCER_COMPONENT,
STREAM_IDENTIFIER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT],
file_manager=FakeFileManager
)
component_manager.setup()
self.test_daemon.component_manager = component_manager
self.test_daemon.file_manager = component_manager.get_component("file_manager")
self.test_daemon.file_manager.lbry_files = self._get_fake_lbry_files()
# Pre-sorted lists of prices and file names in ascending order produced by
# faker with seed 66410. This seed was chosen becacuse it produces 3 results

View file

@ -6,7 +6,7 @@ import unittest
from twisted.internet import defer
from twisted import trial
from lbrynet.core import log_support
from lbrynet import custom_logger
from lbrynet.tests.util import is_android
@ -22,7 +22,7 @@ class TestLogger(trial.unittest.TestCase):
return d
def setUp(self):
self.log = log_support.Logger('test')
self.log = custom_logger.Logger('test')
self.stream = StringIO.StringIO()
handler = logging.StreamHandler(self.stream)
handler.setFormatter(logging.Formatter("%(filename)s:%(lineno)d - %(message)s"))
@ -36,7 +36,7 @@ class TestLogger(trial.unittest.TestCase):
return self.stream.getvalue().split('\n')
# the line number could change if this file gets refactored
expected_first_line = 'test_log_support.py:20 - My message: terrible things happened'
expected_first_line = 'test_customLogger.py:20 - My message: terrible things happened'
# testing the entirety of the message is futile as the
# traceback will depend on the system the test is being run on