Merge pull request #1096 from lbryio/re-join-dht

Re-join the DHT if the node has no active peers
This commit is contained in:
Alex Grin 2018-02-06 09:13:50 -05:00 committed by GitHub
commit 433defa7c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 33 deletions

View file

@ -40,6 +40,7 @@ at anytime.
* Remove manual saving of the wallet in from lbrynet, let lbryum handle it * Remove manual saving of the wallet in from lbrynet, let lbryum handle it
* Block wallet startup on being unlocked if it is encrypted * Block wallet startup on being unlocked if it is encrypted
* Use reworked lbryum payto command * Use reworked lbryum payto command
* Re-attempt joining the DHT every 60 secs if the Node has no peers
### Added ### Added
* Add link to instructions on how to change the default peer port * Add link to instructions on how to change the default peer port

View file

@ -247,12 +247,11 @@ class Session(object):
d.addErrback(upnp_failed) d.addErrback(upnp_failed)
return d return d
def _setup_dht(self): # the callback, if any, will be invoked once the joining procedure
# has terminated
def join_dht(self, cb=None):
from twisted.internet import reactor from twisted.internet import reactor
log.info("Starting DHT")
def join_resolved_addresses(result): def join_resolved_addresses(result):
addresses = [] addresses = []
for success, value in result: for success, value in result:
@ -260,10 +259,11 @@ class Session(object):
addresses.append(value) addresses.append(value)
return addresses return addresses
def start_dht(join_network_result): @defer.inlineCallbacks
self.peer_finder.run_manage_loop() def join_network(knownNodes):
self.hash_announcer.run_manage_loop() log.debug("join DHT using known nodes: " + str(knownNodes))
return True result = yield self.dht_node.joinNetwork(knownNodes)
defer.returnValue(result)
ds = [] ds = []
for host, port in self.known_dht_nodes: for host, port in self.known_dht_nodes:
@ -271,6 +271,22 @@ class Session(object):
d.addCallback(lambda h: (h, port)) # match host to port d.addCallback(lambda h: (h, port)) # match host to port
ds.append(d) ds.append(d)
dl = defer.DeferredList(ds)
dl.addCallback(join_resolved_addresses)
dl.addCallback(join_network)
if cb:
dl.addCallback(cb)
return dl
def _setup_dht(self):
log.info("Starting DHT")
def start_dht(join_network_result):
self.peer_finder.run_manage_loop()
self.hash_announcer.run_manage_loop()
return True
self.dht_node = self.dht_node_class( self.dht_node = self.dht_node_class(
udpPort=self.dht_node_port, udpPort=self.dht_node_port,
node_id=self.node_id, node_id=self.node_id,
@ -281,11 +297,10 @@ class Session(object):
if self.hash_announcer is None: if self.hash_announcer is None:
self.hash_announcer = DHTHashAnnouncer(self.dht_node, self.peer_port) self.hash_announcer = DHTHashAnnouncer(self.dht_node, self.peer_port)
dl = defer.DeferredList(ds) self.dht_node.startNetwork()
dl.addCallback(join_resolved_addresses)
dl.addCallback(self.dht_node.joinNetwork) # pass start_dht() as callback to start the remaining components after joining the DHT
dl.addCallback(start_dht) return self.join_dht(start_dht)
return dl
def _setup_other_components(self): def _setup_other_components(self):
log.debug("Setting up the rest of the components") log.debug("Setting up the rest of the components")

View file

@ -34,8 +34,15 @@ class DHTPeerFinder(object):
self.next_manage_call.cancel() self.next_manage_call.cancel()
self.next_manage_call = None self.next_manage_call = None
@defer.inlineCallbacks
def _manage_peers(self): def _manage_peers(self):
pass """
If we don't know any active peer, let's try to reconnect to the preconfigured
known DHT nodes
"""
if not self.dht_node.hasContacts():
log.info("No active peer. Re-attempt joining DHT")
yield self.dht_node.join_dht()
@defer.inlineCallbacks @defer.inlineCallbacks
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False): def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):

View file

@ -132,17 +132,12 @@ class Node(object):
self._listeningPort.stopListening() self._listeningPort.stopListening()
self.hash_watcher.stop() self.hash_watcher.stop()
@defer.inlineCallbacks def startNetwork(self):
def joinNetwork(self, knownNodeAddresses=None): """ Causes the Node to start all the underlying components needed for the DHT
""" Causes the Node to join the Kademlia network; normally, this to work. This should be called before any other DHT operations.
should be called before any other DHT operations.
@param knownNodeAddresses: A sequence of tuples containing IP address
information for existing nodes on the
Kademlia network, in the format:
C{(<ip address>, (udp port>)}
@type knownNodeAddresses: tuple
""" """
log.info("Starting DHT underlying components")
# Prepare the underlying Kademlia protocol # Prepare the underlying Kademlia protocol
if self.port is not None: if self.port is not None:
try: try:
@ -151,6 +146,29 @@ class Node(object):
import traceback import traceback
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
raise ValueError("%s lbrynet may already be running." % str(e)) raise ValueError("%s lbrynet may already be running." % str(e))
# Start the token looping call
self.change_token_lc.start(constants.tokenSecretChangeInterval)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
self._refreshNode)
self.hash_watcher.tick()
@defer.inlineCallbacks
def joinNetwork(self, knownNodeAddresses=None):
""" Causes the Node to attempt to join the DHT network by contacting the
known DHT nodes. This can be called multiple times if the previous attempt
has failed or if the Node has lost all the contacts.
@param knownNodeAddresses: A sequence of tuples containing IP address
information for existing nodes on the
Kademlia network, in the format:
C{(<ip address>, (udp port>)}
@type knownNodeAddresses: tuple
"""
log.info("Attempting to join the DHT network")
# IGNORE:E1101 # IGNORE:E1101
# Create temporary contact information for the list of addresses of known nodes # Create temporary contact information for the list of addresses of known nodes
if knownNodeAddresses != None: if knownNodeAddresses != None:
@ -161,18 +179,11 @@ class Node(object):
else: else:
bootstrapContacts = None bootstrapContacts = None
# Start the token looping call
self.change_token_lc.start(constants.tokenSecretChangeInterval)
# Initiate the Kademlia joining sequence - perform a search for this node's own ID # Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
self._refreshNode)
self.hash_watcher.tick() result = yield self._joinDeferred
yield self._joinDeferred defer.returnValue(result)
@property @property
def contacts(self): def contacts(self):
@ -190,6 +201,12 @@ class Node(object):
print " %s:%i" % (contact.address, contact.port) print " %s:%i" % (contact.address, contact.port)
print '==================================' print '=================================='
def hasContacts(self):
for bucket in self._routingTable._buckets:
if bucket._contacts:
return True
return False
def getApproximateTotalDHTNodes(self): def getApproximateTotalDHTNodes(self):
# get the deepest bucket and the number of contacts in that bucket and multiply it # get the deepest bucket and the number of contacts in that bucket and multiply it
# by the number of equivalently deep buckets in the whole DHT to get a really bad # by the number of equivalently deep buckets in the whole DHT to get a really bad