move setup and _shutdown to AuthJSONRPCServer

This commit is contained in:
Jack Robison 2018-07-24 18:24:51 -04:00
parent a285db1b08
commit a9c94ca22d
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 70 additions and 57 deletions

View file

@ -7,7 +7,6 @@ import requests
import urllib import urllib
import json import json
import textwrap import textwrap
import signal
from copy import deepcopy from copy import deepcopy
from decimal import Decimal, InvalidOperation from decimal import Decimal, InvalidOperation
from twisted.web import server from twisted.web import server
@ -26,7 +25,6 @@ from lbryschema.decode import smart_decode
from lbrynet.core.system_info import get_lbrynet_version from lbrynet.core.system_info import get_lbrynet_version
from lbrynet import conf from lbrynet import conf
from lbrynet.reflector import reupload from lbrynet.reflector import reupload
from lbrynet.daemon.Component import ComponentManager
from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT from lbrynet.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, SESSION_COMPONENT, DHT_COMPONENT
from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT from lbrynet.daemon.Components import STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT
from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT from lbrynet.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT
@ -36,7 +34,6 @@ from lbrynet.daemon.auth.server import AuthJSONRPCServer
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core import utils, system_info from lbrynet.core import utils, system_info
from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.core.StreamDescriptor import download_sd_blob
from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
from lbrynet.core.Error import NullFundsError, NegativeFundsError from lbrynet.core.Error import NullFundsError, NegativeFundsError
@ -92,8 +89,8 @@ class IterableContainer(object):
class Checker(object): class Checker(object):
"""The looping calls the daemon runs""" """The looping calls the daemon runs"""
INTERNET_CONNECTION = 'internet_connection_checker' INTERNET_CONNECTION = 'internet_connection_checker', 3600
CONNECTION_STATUS = 'connection_status_checker' # CONNECTION_STATUS = 'connection_status_checker'
class _FileID(IterableContainer): class _FileID(IterableContainer):
@ -162,20 +159,28 @@ class Daemon(AuthJSONRPCServer):
LBRYnet daemon, a jsonrpc interface to lbry functions LBRYnet daemon, a jsonrpc interface to lbry functions
""" """
component_attributes = {
EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager",
DATABASE_COMPONENT: "storage",
SESSION_COMPONENT: "session",
WALLET_COMPONENT: "wallet",
DHT_COMPONENT: "dht_node",
STREAM_IDENTIFIER_COMPONENT: "sd_identifier",
FILE_MANAGER_COMPONENT: "file_manager",
}
def __init__(self, analytics_manager=None, component_manager=None): def __init__(self, analytics_manager=None, component_manager=None):
AuthJSONRPCServer.__init__(self, analytics_manager, conf.settings['use_auth_http'])
self.looping_call_manager = LoopingCallManager({
Checker.INTERNET_CONNECTION: LoopingCall(CheckInternetConnection(self)),
})
to_skip = list(conf.settings['components_to_skip']) to_skip = list(conf.settings['components_to_skip'])
if 'reflector' not in to_skip and not conf.settings['run_reflector_server']: if 'reflector' not in to_skip and not conf.settings['run_reflector_server']:
to_skip.append('reflector') to_skip.append('reflector')
self.component_manager = component_manager or ComponentManager( looping_calls = {
analytics_manager=self.analytics_manager, Checker.INTERNET_CONNECTION[0]: (LoopingCall(CheckInternetConnection(self)),
skip_components=to_skip Checker.INTERNET_CONNECTION[1])
) }
AuthJSONRPCServer.__init__(self, analytics_manager=analytics_manager, component_manager=component_manager,
use_authentication=conf.settings['use_auth_http'], to_skip=to_skip,
looping_calls=looping_calls)
self.is_first_run = is_first_run() self.is_first_run = is_first_run()
self._component_setup_deferred = None
# TODO: move this to a component # TODO: move this to a component
self.connected_to_internet = True self.connected_to_internet = True
@ -196,58 +201,19 @@ class Daemon(AuthJSONRPCServer):
@defer.inlineCallbacks @defer.inlineCallbacks
def setup(self): def setup(self):
reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
if not self.analytics_manager.is_started:
self.analytics_manager.start()
self.looping_call_manager.start(Checker.INTERNET_CONNECTION, 3600)
components = {
EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager",
DATABASE_COMPONENT: "storage",
SESSION_COMPONENT: "session",
WALLET_COMPONENT: "wallet",
DHT_COMPONENT: "dht_node",
STREAM_IDENTIFIER_COMPONENT: "sd_identifier",
FILE_MANAGER_COMPONENT: "file_manager",
}
log.info("Starting lbrynet-daemon") log.info("Starting lbrynet-daemon")
log.info("Platform: %s", json.dumps(system_info.get_platform())) log.info("Platform: %s", json.dumps(system_info.get_platform()))
self._component_setup_deferred = self.component_manager.setup(**{ yield super(Daemon, self).setup()
n: lambda _, c: setattr(self, components[c.component_name], c.component) for n in components.keys()})
yield self._component_setup_deferred
log.info("Started lbrynet-daemon") log.info("Started lbrynet-daemon")
@staticmethod
def _already_shutting_down(sig_num, frame):
log.info("Already shutting down")
def _stop_streams(self): def _stop_streams(self):
"""stop pending GetStream downloads""" """stop pending GetStream downloads"""
for sd_hash, stream in self.streams.iteritems(): for sd_hash, stream in self.streams.iteritems():
stream.cancel(reason="daemon shutdown") stream.cancel(reason="daemon shutdown")
def _shutdown(self): def _shutdown(self):
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
signal.signal(signal.SIGTERM, self._already_shutting_down)
log.info("Closing lbrynet session")
self._stop_streams() self._stop_streams()
self.looping_call_manager.shutdown() return super(Daemon, self)._shutdown()
if self.analytics_manager:
self.analytics_manager.shutdown()
try:
self._component_setup_deferred.cancel()
except defer.CancelledError:
pass
if self.component_manager is not None:
d = self.component_manager.stop()
d.addErrback(log.fail(), 'Failure while shutting down')
return d
def _download_blob(self, blob_hash, rate_manager=None, timeout=None): def _download_blob(self, blob_hash, rate_manager=None, timeout=None):
""" """

View file

@ -2,6 +2,7 @@ import logging
import urlparse import urlparse
import json import json
import inspect import inspect
import signal
from decimal import Decimal from decimal import Decimal
from functools import wraps from functools import wraps
@ -17,6 +18,8 @@ from lbrynet import conf, analytics
from lbrynet.core.Error import InvalidAuthenticationToken from lbrynet.core.Error import InvalidAuthenticationToken
from lbrynet.core import utils from lbrynet.core import utils
from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet
from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.daemon.ComponentManager import ComponentManager
from lbrynet.undecorated import undecorated from lbrynet.undecorated import undecorated
from .util import APIKey, get_auth_message from .util import APIKey, get_auth_message
from .client import LBRY_SECRET from .client import LBRY_SECRET
@ -192,10 +195,19 @@ class AuthJSONRPCServer(AuthorizedBase):
isLeaf = True isLeaf = True
allowed_during_startup = [] allowed_during_startup = []
component_attributes = {}
def __init__(self, analytics_manager, use_authentication=None): def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, to_skip=None,
looping_calls=None):
self.analytics_manager = analytics_manager or analytics.Manager.new_instance() self.analytics_manager = analytics_manager or analytics.Manager.new_instance()
self.component_manager = component_manager or ComponentManager(
analytics_manager=self.analytics_manager,
skip_components=to_skip or []
)
self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).iteritems()})
self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).iteritems()}
self._use_authentication = use_authentication or conf.settings['use_auth_http'] self._use_authentication = use_authentication or conf.settings['use_auth_http']
self._component_setup_deferred = None
self.announced_startup = False self.announced_startup = False
self.sessions = {} self.sessions = {}
@ -223,7 +235,42 @@ class AuthJSONRPCServer(AuthorizedBase):
reactor.fireSystemEvent("shutdown") reactor.fireSystemEvent("shutdown")
def setup(self): def setup(self):
raise NotImplementedError() from twisted.internet import reactor
reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
if not self.analytics_manager.is_started:
self.analytics_manager.start()
for lc_name, lc_time in self._looping_call_times.iteritems():
self.looping_call_manager.start(lc_name, lc_time)
def update_attribute(setup_result, component):
setattr(self, self.component_attributes[component.component_name], component.component)
kwargs = {component: update_attribute for component in self.component_attributes.keys()}
self._component_setup_deferred = self.component_manager.setup(**kwargs)
return self._component_setup_deferred
@staticmethod
def _already_shutting_down(sig_num, frame):
log.info("Already shutting down")
def _shutdown(self):
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
signal.signal(signal.SIGTERM, self._already_shutting_down)
self.looping_call_manager.shutdown()
if self.analytics_manager:
self.analytics_manager.shutdown()
try:
self._component_setup_deferred.cancel()
except (AttributeError, defer.CancelledError):
pass
if self.component_manager is not None:
d = self.component_manager.stop()
d.addErrback(log.fail(), 'Failure while shutting down')
else:
d = defer.succeed(None)
return d
def get_server_factory(self): def get_server_factory(self):
return AuthJSONRPCResource(self).getServerFactory() return AuthJSONRPCResource(self).getServerFactory()