cancel starting components if the reactor is stopped before startup has finished
-don't block starting the dht component on having found enough peers, only on setting up the protocol
This commit is contained in:
parent
a800f6ddf0
commit
b06dcf0a0d
5 changed files with 26 additions and 4 deletions
|
@ -1,5 +1,6 @@
|
||||||
import logging
|
import logging
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
from twisted._threads import AlreadyQuit
|
||||||
from ComponentManager import ComponentManager
|
from ComponentManager import ComponentManager
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -52,6 +53,8 @@ class Component(object):
|
||||||
result = yield defer.maybeDeferred(self.start)
|
result = yield defer.maybeDeferred(self.start)
|
||||||
self._running = True
|
self._running = True
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
except (defer.CancelledError, AlreadyQuit):
|
||||||
|
pass
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
|
log.exception("Error setting up %s", self.component_name or self.__class__.__name__)
|
||||||
raise err
|
raise err
|
||||||
|
@ -62,6 +65,8 @@ class Component(object):
|
||||||
result = yield defer.maybeDeferred(self.stop)
|
result = yield defer.maybeDeferred(self.stop)
|
||||||
self._running = False
|
self._running = False
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
except (defer.CancelledError, AlreadyQuit):
|
||||||
|
pass
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.exception("Error stopping %s", self.__class__.__name__)
|
log.exception("Error stopping %s", self.__class__.__name__)
|
||||||
raise err
|
raise err
|
||||||
|
|
|
@ -248,8 +248,13 @@ class DHTComponent(Component):
|
||||||
externalIP=CS.get_external_ip(),
|
externalIP=CS.get_external_ip(),
|
||||||
peerPort=self.peer_port
|
peerPort=self.peer_port
|
||||||
)
|
)
|
||||||
yield self.dht_node.start(GCS('known_dht_nodes'))
|
|
||||||
log.info("Joined the dht")
|
self.dht_node.start_listening()
|
||||||
|
yield self.dht_node._protocol._listening
|
||||||
|
d = self.dht_node.joinNetwork(GCS('known_dht_nodes'))
|
||||||
|
d.addCallback(lambda _: self.dht_node.start_looping_calls())
|
||||||
|
d.addCallback(lambda _: log.info("Joined the dht"))
|
||||||
|
log.info("Started the dht")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
|
|
@ -161,6 +161,7 @@ class Daemon(AuthJSONRPCServer):
|
||||||
analytics_manager=self.analytics_manager,
|
analytics_manager=self.analytics_manager,
|
||||||
skip_components=conf.settings['components_to_skip']
|
skip_components=conf.settings['components_to_skip']
|
||||||
)
|
)
|
||||||
|
self._component_setup_deferred = None
|
||||||
|
|
||||||
# TODO: move this to a component
|
# TODO: move this to a component
|
||||||
self.connected_to_internet = True
|
self.connected_to_internet = True
|
||||||
|
@ -198,8 +199,9 @@ class Daemon(AuthJSONRPCServer):
|
||||||
|
|
||||||
log.info("Starting lbrynet-daemon")
|
log.info("Starting lbrynet-daemon")
|
||||||
log.info("Platform: %s", json.dumps(system_info.get_platform()))
|
log.info("Platform: %s", json.dumps(system_info.get_platform()))
|
||||||
yield self.component_manager.setup(**{n: lambda _, c: setattr(self, components[c.component_name], c.component)
|
self._component_setup_deferred = self.component_manager.setup(**{
|
||||||
for n in components.keys()})
|
n: lambda _, c: setattr(self, components[c.component_name], c.component) for n in components.keys()})
|
||||||
|
yield self._component_setup_deferred
|
||||||
log.info("Started lbrynet-daemon")
|
log.info("Started lbrynet-daemon")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -223,6 +225,11 @@ class Daemon(AuthJSONRPCServer):
|
||||||
if self.analytics_manager:
|
if self.analytics_manager:
|
||||||
self.analytics_manager.shutdown()
|
self.analytics_manager.shutdown()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._component_setup_deferred.cancel()
|
||||||
|
except defer.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
if self.component_manager is not None:
|
if self.component_manager is not None:
|
||||||
d = self.component_manager.stop()
|
d = self.component_manager.stop()
|
||||||
d.addErrback(log.fail(), 'Failure while shutting down')
|
d.addErrback(log.fail(), 'Failure while shutting down')
|
||||||
|
|
|
@ -214,6 +214,9 @@ class AuthJSONRPCServer(AuthorizedBase):
|
||||||
log.error('lbrynet API failed to bind TCP %s:%i for listening', conf.settings['api_host'],
|
log.error('lbrynet API failed to bind TCP %s:%i for listening', conf.settings['api_host'],
|
||||||
conf.settings['api_port'])
|
conf.settings['api_port'])
|
||||||
reactor.fireSystemEvent("shutdown")
|
reactor.fireSystemEvent("shutdown")
|
||||||
|
except defer.CancelledError:
|
||||||
|
log.info("shutting down before finished starting")
|
||||||
|
reactor.fireSystemEvent("shutdown")
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
self.analytics_manager.send_server_startup_error(str(err))
|
self.analytics_manager.send_server_startup_error(str(err))
|
||||||
log.exception('Failed to start lbrynet-daemon')
|
log.exception('Failed to start lbrynet-daemon')
|
||||||
|
|
|
@ -281,7 +281,9 @@ class Node(MockKademliaHelper):
|
||||||
yield self._protocol._listening
|
yield self._protocol._listening
|
||||||
# TODO: Refresh all k-buckets further away than this node's closest neighbour
|
# TODO: Refresh all k-buckets further away than this node's closest neighbour
|
||||||
yield self.joinNetwork(known_node_addresses or [])
|
yield self.joinNetwork(known_node_addresses or [])
|
||||||
|
self.start_looping_calls()
|
||||||
|
|
||||||
|
def start_looping_calls(self):
|
||||||
self.safe_start_looping_call(self._change_token_lc, constants.tokenSecretChangeInterval)
|
self.safe_start_looping_call(self._change_token_lc, constants.tokenSecretChangeInterval)
|
||||||
# Start refreshing k-buckets periodically, if necessary
|
# Start refreshing k-buckets periodically, if necessary
|
||||||
self.safe_start_looping_call(self._refresh_node_lc, constants.checkRefreshInterval)
|
self.safe_start_looping_call(self._refresh_node_lc, constants.checkRefreshInterval)
|
||||||
|
|
Loading…
Reference in a new issue