dht_node: split network start and join procedures

In order to attempt to join the DHT several times
(i.e. when the first attempt has failed) we need to
split the components initialization from the real
joining operation.

Create node.startNetwork() to initialize the node
and keep the rest in node.joinNetwork()

Signed-off-by: Antonio Quartulli <antonio@mandelbit.com>
This commit is contained in:
Antonio Quartulli 2018-02-01 17:17:17 +08:00
parent 1f68d29d73
commit ce0af77aa9
No known key found for this signature in database
GPG key ID: 07A53C580EF2CD74
2 changed files with 58 additions and 32 deletions

View file

@ -247,12 +247,11 @@ class Session(object):
d.addErrback(upnp_failed) d.addErrback(upnp_failed)
return d return d
def _setup_dht(self): # the callback, if any, will be invoked once the joining procedure
# has terminated
def join_dht(self, cb=None):
from twisted.internet import reactor from twisted.internet import reactor
log.info("Starting DHT")
def join_resolved_addresses(result): def join_resolved_addresses(result):
addresses = [] addresses = []
for success, value in result: for success, value in result:
@ -260,10 +259,11 @@ class Session(object):
addresses.append(value) addresses.append(value)
return addresses return addresses
def start_dht(join_network_result): @defer.inlineCallbacks
self.peer_finder.run_manage_loop() def join_network(knownNodes):
self.hash_announcer.run_manage_loop() log.debug("join DHT using known nodes: " + str(knownNodes))
return True result = yield self.dht_node.joinNetwork(knownNodes)
defer.returnValue(result)
ds = [] ds = []
for host, port in self.known_dht_nodes: for host, port in self.known_dht_nodes:
@ -271,6 +271,22 @@ class Session(object):
d.addCallback(lambda h: (h, port)) # match host to port d.addCallback(lambda h: (h, port)) # match host to port
ds.append(d) ds.append(d)
dl = defer.DeferredList(ds)
dl.addCallback(join_resolved_addresses)
dl.addCallback(join_network)
if cb:
dl.addCallback(cb)
return dl
def _setup_dht(self):
log.info("Starting DHT")
def start_dht(join_network_result):
self.peer_finder.run_manage_loop()
self.hash_announcer.run_manage_loop()
return True
self.dht_node = self.dht_node_class( self.dht_node = self.dht_node_class(
udpPort=self.dht_node_port, udpPort=self.dht_node_port,
node_id=self.node_id, node_id=self.node_id,
@ -281,11 +297,10 @@ class Session(object):
if self.hash_announcer is None: if self.hash_announcer is None:
self.hash_announcer = DHTHashAnnouncer(self.dht_node, self.peer_port) self.hash_announcer = DHTHashAnnouncer(self.dht_node, self.peer_port)
dl = defer.DeferredList(ds) self.dht_node.startNetwork()
dl.addCallback(join_resolved_addresses)
dl.addCallback(self.dht_node.joinNetwork) # pass start_dht() as callback to start the remaining components after joining the DHT
dl.addCallback(start_dht) return self.join_dht(start_dht)
return dl
def _setup_other_components(self): def _setup_other_components(self):
log.debug("Setting up the rest of the components") log.debug("Setting up the rest of the components")

View file

@ -132,17 +132,12 @@ class Node(object):
self._listeningPort.stopListening() self._listeningPort.stopListening()
self.hash_watcher.stop() self.hash_watcher.stop()
@defer.inlineCallbacks def startNetwork(self):
def joinNetwork(self, knownNodeAddresses=None): """ Causes the Node to start all the underlying components needed for the DHT
""" Causes the Node to join the Kademlia network; normally, this to work. This should be called before any other DHT operations.
should be called before any other DHT operations.
@param knownNodeAddresses: A sequence of tuples containing IP address
information for existing nodes on the
Kademlia network, in the format:
C{(<ip address>, (udp port>)}
@type knownNodeAddresses: tuple
""" """
log.info("Starting DHT underlying components")
# Prepare the underlying Kademlia protocol # Prepare the underlying Kademlia protocol
if self.port is not None: if self.port is not None:
try: try:
@ -151,6 +146,29 @@ class Node(object):
import traceback import traceback
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
raise ValueError("%s lbrynet may already be running." % str(e)) raise ValueError("%s lbrynet may already be running." % str(e))
# Start the token looping call
self.change_token_lc.start(constants.tokenSecretChangeInterval)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
self._refreshNode)
self.hash_watcher.tick()
@defer.inlineCallbacks
def joinNetwork(self, knownNodeAddresses=None):
""" Causes the Node to attempt to join the DHT network by contacting the
known DHT nodes. This can be called multiple times if the previous attempt
has failed or if the Node has lost all the contacts.
@param knownNodeAddresses: A sequence of tuples containing IP address
information for existing nodes on the
Kademlia network, in the format:
C{(<ip address>, (udp port>)}
@type knownNodeAddresses: tuple
"""
log.info("Attempting to join the DHT network")
# IGNORE:E1101 # IGNORE:E1101
# Create temporary contact information for the list of addresses of known nodes # Create temporary contact information for the list of addresses of known nodes
if knownNodeAddresses != None: if knownNodeAddresses != None:
@ -161,18 +179,11 @@ class Node(object):
else: else:
bootstrapContacts = None bootstrapContacts = None
# Start the token looping call
self.change_token_lc.start(constants.tokenSecretChangeInterval)
# Initiate the Kademlia joining sequence - perform a search for this node's own ID # Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
self._refreshNode)
self.hash_watcher.tick() result = yield self._joinDeferred
yield self._joinDeferred defer.returnValue(result)
@property @property
def contacts(self): def contacts(self):