diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 57dbd8971..d62b2b2c3 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -7,7 +7,6 @@ import requests import urllib import json import textwrap -import signal from copy import deepcopy from decimal import Decimal, InvalidOperation from twisted.web import server @@ -26,7 +25,6 @@ from lbryschema.decode import smart_decode from lbrynet.core.system_info import get_lbrynet_version from lbrynet import conf from lbrynet.reflector import reupload -from lbrynet.daemon.Component import ComponentManager from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT @@ -36,7 +34,6 @@ from lbrynet.daemon.auth.server import AuthJSONRPCServer from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core import utils, system_info from lbrynet.core.StreamDescriptor import download_sd_blob -from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError @@ -92,8 +89,8 @@ class IterableContainer(object): class Checker(object): """The looping calls the daemon runs""" - INTERNET_CONNECTION = 'internet_connection_checker' - CONNECTION_STATUS = 'connection_status_checker' + INTERNET_CONNECTION = 'internet_connection_checker', 3600 + # CONNECTION_STATUS = 'connection_status_checker' class _FileID(IterableContainer): @@ -162,20 +159,28 @@ class Daemon(AuthJSONRPCServer): LBRYnet daemon, a jsonrpc interface to lbry functions """ + component_attributes = { + EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager", + DATABASE_COMPONENT: "storage", + SESSION_COMPONENT: "session", + WALLET_COMPONENT: "wallet", + DHT_COMPONENT: "dht_node", + STREAM_IDENTIFIER_COMPONENT: "sd_identifier", + FILE_MANAGER_COMPONENT: "file_manager", + } + def __init__(self, analytics_manager=None, component_manager=None): - AuthJSONRPCServer.__init__(self, analytics_manager, conf.settings['use_auth_http']) - self.looping_call_manager = LoopingCallManager({ - Checker.INTERNET_CONNECTION: LoopingCall(CheckInternetConnection(self)), - }) to_skip = list(conf.settings['components_to_skip']) if 'reflector' not in to_skip and not conf.settings['run_reflector_server']: to_skip.append('reflector') - self.component_manager = component_manager or ComponentManager( - analytics_manager=self.analytics_manager, - skip_components=to_skip - ) + looping_calls = { + Checker.INTERNET_CONNECTION[0]: (LoopingCall(CheckInternetConnection(self)), + Checker.INTERNET_CONNECTION[1]) + } + AuthJSONRPCServer.__init__(self, analytics_manager=analytics_manager, component_manager=component_manager, + use_authentication=conf.settings['use_auth_http'], to_skip=to_skip, + looping_calls=looping_calls) self.is_first_run = is_first_run() - self._component_setup_deferred = None # TODO: move this to a component self.connected_to_internet = True @@ -196,58 +201,19 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def setup(self): - reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) - if not self.analytics_manager.is_started: - self.analytics_manager.start() - self.looping_call_manager.start(Checker.INTERNET_CONNECTION, 3600) - - components = { - EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager", - DATABASE_COMPONENT: "storage", - SESSION_COMPONENT: "session", - WALLET_COMPONENT: "wallet", - DHT_COMPONENT: "dht_node", - STREAM_IDENTIFIER_COMPONENT: "sd_identifier", - FILE_MANAGER_COMPONENT: "file_manager", - } - log.info("Starting lbrynet-daemon") log.info("Platform: %s", json.dumps(system_info.get_platform())) - self._component_setup_deferred = self.component_manager.setup(**{ - n: lambda _, c: setattr(self, components[c.component_name], c.component) for n in components.keys()}) - yield self._component_setup_deferred + yield super(Daemon, self).setup() log.info("Started lbrynet-daemon") - @staticmethod - def _already_shutting_down(sig_num, frame): - log.info("Already shutting down") - def _stop_streams(self): """stop pending GetStream downloads""" for sd_hash, stream in self.streams.iteritems(): stream.cancel(reason="daemon shutdown") def _shutdown(self): - # ignore INT/TERM signals once shutdown has started - signal.signal(signal.SIGINT, self._already_shutting_down) - signal.signal(signal.SIGTERM, self._already_shutting_down) - - log.info("Closing lbrynet session") - self._stop_streams() - self.looping_call_manager.shutdown() - if self.analytics_manager: - self.analytics_manager.shutdown() - - try: - self._component_setup_deferred.cancel() - except defer.CancelledError: - pass - - if self.component_manager is not None: - d = self.component_manager.stop() - d.addErrback(log.fail(), 'Failure while shutting down') - return d + return super(Daemon, self)._shutdown() def _download_blob(self, blob_hash, rate_manager=None, timeout=None): """ diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index d5470fddd..7a91c858b 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -2,6 +2,7 @@ import logging import urlparse import json import inspect +import signal from decimal import Decimal from functools import wraps @@ -17,6 +18,8 @@ from lbrynet import conf, analytics from lbrynet.core.Error import InvalidAuthenticationToken from lbrynet.core import utils from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet +from lbrynet.core.looping_call_manager import LoopingCallManager +from lbrynet.daemon.ComponentManager import ComponentManager from lbrynet.undecorated import undecorated from .util import APIKey, get_auth_message from .client import LBRY_SECRET @@ -192,10 +195,19 @@ class AuthJSONRPCServer(AuthorizedBase): isLeaf = True allowed_during_startup = [] + component_attributes = {} - def __init__(self, analytics_manager, use_authentication=None): + def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, to_skip=None, + looping_calls=None): self.analytics_manager = analytics_manager or analytics.Manager.new_instance() + self.component_manager = component_manager or ComponentManager( + analytics_manager=self.analytics_manager, + skip_components=to_skip or [] + ) + self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).iteritems()}) + self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).iteritems()} self._use_authentication = use_authentication or conf.settings['use_auth_http'] + self._component_setup_deferred = None self.announced_startup = False self.sessions = {} @@ -223,7 +235,42 @@ class AuthJSONRPCServer(AuthorizedBase): reactor.fireSystemEvent("shutdown") def setup(self): - raise NotImplementedError() + from twisted.internet import reactor + + reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) + if not self.analytics_manager.is_started: + self.analytics_manager.start() + for lc_name, lc_time in self._looping_call_times.iteritems(): + self.looping_call_manager.start(lc_name, lc_time) + + def update_attribute(setup_result, component): + setattr(self, self.component_attributes[component.component_name], component.component) + + kwargs = {component: update_attribute for component in self.component_attributes.keys()} + self._component_setup_deferred = self.component_manager.setup(**kwargs) + return self._component_setup_deferred + + @staticmethod + def _already_shutting_down(sig_num, frame): + log.info("Already shutting down") + + def _shutdown(self): + # ignore INT/TERM signals once shutdown has started + signal.signal(signal.SIGINT, self._already_shutting_down) + signal.signal(signal.SIGTERM, self._already_shutting_down) + self.looping_call_manager.shutdown() + if self.analytics_manager: + self.analytics_manager.shutdown() + try: + self._component_setup_deferred.cancel() + except (AttributeError, defer.CancelledError): + pass + if self.component_manager is not None: + d = self.component_manager.stop() + d.addErrback(log.fail(), 'Failure while shutting down') + else: + d = defer.succeed(None) + return d def get_server_factory(self): return AuthJSONRPCResource(self).getServerFactory()