From ce0af77aa9f457c2b89f6f553a0091fa1974fc54 Mon Sep 17 00:00:00 2001 From: Antonio Quartulli Date: Thu, 1 Feb 2018 17:17:17 +0800 Subject: [PATCH 1/3] dht_node: split network start and join procedures In order to attempt to join the DHT several times (i.e. when the first attempt has failed) we need to split the components initialization from the real joining operation. Create node.startNetwork() to initialize the node and keep the rest in node.joinNetwork() Signed-off-by: Antonio Quartulli --- lbrynet/core/Session.py | 41 +++++++++++++++++++++++----------- lbrynet/dht/node.py | 49 +++++++++++++++++++++++++---------------- 2 files changed, 58 insertions(+), 32 deletions(-) diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index d85b7aca2..c462cd8e5 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -247,12 +247,11 @@ class Session(object): d.addErrback(upnp_failed) return d - def _setup_dht(self): - + # the callback, if any, will be invoked once the joining procedure + # has terminated + def join_dht(self, cb=None): from twisted.internet import reactor - log.info("Starting DHT") - def join_resolved_addresses(result): addresses = [] for success, value in result: @@ -260,10 +259,11 @@ class Session(object): addresses.append(value) return addresses - def start_dht(join_network_result): - self.peer_finder.run_manage_loop() - self.hash_announcer.run_manage_loop() - return True + @defer.inlineCallbacks + def join_network(knownNodes): + log.debug("join DHT using known nodes: " + str(knownNodes)) + result = yield self.dht_node.joinNetwork(knownNodes) + defer.returnValue(result) ds = [] for host, port in self.known_dht_nodes: @@ -271,6 +271,22 @@ class Session(object): d.addCallback(lambda h: (h, port)) # match host to port ds.append(d) + dl = defer.DeferredList(ds) + dl.addCallback(join_resolved_addresses) + dl.addCallback(join_network) + if cb: + dl.addCallback(cb) + + return dl + + def _setup_dht(self): + log.info("Starting DHT") + + def start_dht(join_network_result): + self.peer_finder.run_manage_loop() + self.hash_announcer.run_manage_loop() + return True + self.dht_node = self.dht_node_class( udpPort=self.dht_node_port, node_id=self.node_id, @@ -281,11 +297,10 @@ class Session(object): if self.hash_announcer is None: self.hash_announcer = DHTHashAnnouncer(self.dht_node, self.peer_port) - dl = defer.DeferredList(ds) - dl.addCallback(join_resolved_addresses) - dl.addCallback(self.dht_node.joinNetwork) - dl.addCallback(start_dht) - return dl + self.dht_node.startNetwork() + + # pass start_dht() as callback to start the remaining components after joining the DHT + return self.join_dht(start_dht) def _setup_other_components(self): log.debug("Setting up the rest of the components") diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 0d094e4e7..052521420 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -132,17 +132,12 @@ class Node(object): self._listeningPort.stopListening() self.hash_watcher.stop() - @defer.inlineCallbacks - def joinNetwork(self, knownNodeAddresses=None): - """ Causes the Node to join the Kademlia network; normally, this - should be called before any other DHT operations. - - @param knownNodeAddresses: A sequence of tuples containing IP address - information for existing nodes on the - Kademlia network, in the format: - C{(, (udp port>)} - @type knownNodeAddresses: tuple + def startNetwork(self): + """ Causes the Node to start all the underlying components needed for the DHT + to work. This should be called before any other DHT operations. """ + log.info("Starting DHT underlying components") + # Prepare the underlying Kademlia protocol if self.port is not None: try: @@ -151,6 +146,29 @@ class Node(object): import traceback log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) raise ValueError("%s lbrynet may already be running." % str(e)) + + # Start the token looping call + self.change_token_lc.start(constants.tokenSecretChangeInterval) + # #TODO: Refresh all k-buckets further away than this node's closest neighbour + # Start refreshing k-buckets periodically, if necessary + self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, + self._refreshNode) + self.hash_watcher.tick() + + @defer.inlineCallbacks + def joinNetwork(self, knownNodeAddresses=None): + """ Causes the Node to attempt to join the DHT network by contacting the + known DHT nodes. This can be called multiple times if the previous attempt + has failed or if the Node has lost all the contacts. + + @param knownNodeAddresses: A sequence of tuples containing IP address + information for existing nodes on the + Kademlia network, in the format: + C{(, (udp port>)} + @type knownNodeAddresses: tuple + """ + log.info("Attempting to join the DHT network") + # IGNORE:E1101 # Create temporary contact information for the list of addresses of known nodes if knownNodeAddresses != None: @@ -161,18 +179,11 @@ class Node(object): else: bootstrapContacts = None - # Start the token looping call - self.change_token_lc.start(constants.tokenSecretChangeInterval) - # Initiate the Kademlia joining sequence - perform a search for this node's own ID self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) - # #TODO: Refresh all k-buckets further away than this node's closest neighbour - # Start refreshing k-buckets periodically, if necessary - self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, - self._refreshNode) - self.hash_watcher.tick() - yield self._joinDeferred + result = yield self._joinDeferred + defer.returnValue(result) @property def contacts(self): From 4f7885e4990384e464a6765ba7888f70a87c6f2e Mon Sep 17 00:00:00 2001 From: Antonio Quartulli Date: Thu, 1 Feb 2018 17:21:52 +0800 Subject: [PATCH 2/3] dht_node: add hasContacts method This method can be used by other components to check if in the Node routing table there is at least one peer. Signed-off-by: Antonio Quartulli --- lbrynet/dht/node.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 052521420..a13a0227f 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -201,6 +201,12 @@ class Node(object): print " %s:%i" % (contact.address, contact.port) print '==================================' + def hasContacts(self): + for bucket in self._routingTable._buckets: + if bucket._contacts: + return True + return False + def getApproximateTotalDHTNodes(self): # get the deepest bucket and the number of contacts in that bucket and multiply it # by the number of equivalently deep buckets in the whole DHT to get a really bad From 8a7e7075048d5151002f7e57fac15fb14e116822 Mon Sep 17 00:00:00 2001 From: Antonio Quartulli Date: Thu, 1 Feb 2018 17:22:39 +0800 Subject: [PATCH 3/3] DHTPeerFinder: re-join the network if we have no peers Periodically check if the routing table is empty and, if so, re-attempt joining the DHT network. This check is performed in the main DHTPeerFinder loop every 60 secs. Closes: #1093 Signed-off-by: Antonio Quartulli --- CHANGELOG.md | 1 + lbrynet/core/client/DHTPeerFinder.py | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 647f1c7bc..9e417c1df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ at anytime. * Remove manual saving of the wallet in from lbrynet, let lbryum handle it * Block wallet startup on being unlocked if it is encrypted * Use reworked lbryum payto command + * Re-attempt joining the DHT every 60 secs if the Node has no peers ### Added * Add link to instructions on how to change the default peer port diff --git a/lbrynet/core/client/DHTPeerFinder.py b/lbrynet/core/client/DHTPeerFinder.py index 1682006cb..f273e5900 100644 --- a/lbrynet/core/client/DHTPeerFinder.py +++ b/lbrynet/core/client/DHTPeerFinder.py @@ -34,8 +34,15 @@ class DHTPeerFinder(object): self.next_manage_call.cancel() self.next_manage_call = None + @defer.inlineCallbacks def _manage_peers(self): - pass + """ + If we don't know any active peer, let's try to reconnect to the preconfigured + known DHT nodes + """ + if not self.dht_node.hasContacts(): + log.info("No active peer. Re-attempt joining DHT") + yield self.dht_node.join_dht() @defer.inlineCallbacks def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):