diff --git a/CHANGELOG.md b/CHANGELOG.md index 647f1c7bc..9e417c1df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ at anytime. * Remove manual saving of the wallet in from lbrynet, let lbryum handle it * Block wallet startup on being unlocked if it is encrypted * Use reworked lbryum payto command + * Re-attempt joining the DHT every 60 secs if the Node has no peers ### Added * Add link to instructions on how to change the default peer port diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index d85b7aca2..c462cd8e5 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -247,12 +247,11 @@ class Session(object): d.addErrback(upnp_failed) return d - def _setup_dht(self): - + # the callback, if any, will be invoked once the joining procedure + # has terminated + def join_dht(self, cb=None): from twisted.internet import reactor - log.info("Starting DHT") - def join_resolved_addresses(result): addresses = [] for success, value in result: @@ -260,10 +259,11 @@ class Session(object): addresses.append(value) return addresses - def start_dht(join_network_result): - self.peer_finder.run_manage_loop() - self.hash_announcer.run_manage_loop() - return True + @defer.inlineCallbacks + def join_network(knownNodes): + log.debug("join DHT using known nodes: " + str(knownNodes)) + result = yield self.dht_node.joinNetwork(knownNodes) + defer.returnValue(result) ds = [] for host, port in self.known_dht_nodes: @@ -271,6 +271,22 @@ class Session(object): d.addCallback(lambda h: (h, port)) # match host to port ds.append(d) + dl = defer.DeferredList(ds) + dl.addCallback(join_resolved_addresses) + dl.addCallback(join_network) + if cb: + dl.addCallback(cb) + + return dl + + def _setup_dht(self): + log.info("Starting DHT") + + def start_dht(join_network_result): + self.peer_finder.run_manage_loop() + self.hash_announcer.run_manage_loop() + return True + self.dht_node = self.dht_node_class( udpPort=self.dht_node_port, node_id=self.node_id, @@ -281,11 +297,10 @@ class Session(object): if self.hash_announcer is None: self.hash_announcer = DHTHashAnnouncer(self.dht_node, self.peer_port) - dl = defer.DeferredList(ds) - dl.addCallback(join_resolved_addresses) - dl.addCallback(self.dht_node.joinNetwork) - dl.addCallback(start_dht) - return dl + self.dht_node.startNetwork() + + # pass start_dht() as callback to start the remaining components after joining the DHT + return self.join_dht(start_dht) def _setup_other_components(self): log.debug("Setting up the rest of the components") diff --git a/lbrynet/core/client/DHTPeerFinder.py b/lbrynet/core/client/DHTPeerFinder.py index 1682006cb..f273e5900 100644 --- a/lbrynet/core/client/DHTPeerFinder.py +++ b/lbrynet/core/client/DHTPeerFinder.py @@ -34,8 +34,15 @@ class DHTPeerFinder(object): self.next_manage_call.cancel() self.next_manage_call = None + @defer.inlineCallbacks def _manage_peers(self): - pass + """ + If we don't know any active peer, let's try to reconnect to the preconfigured + known DHT nodes + """ + if not self.dht_node.hasContacts(): + log.info("No active peer. Re-attempt joining DHT") + yield self.dht_node.join_dht() @defer.inlineCallbacks def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False): diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 0d094e4e7..a13a0227f 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -132,17 +132,12 @@ class Node(object): self._listeningPort.stopListening() self.hash_watcher.stop() - @defer.inlineCallbacks - def joinNetwork(self, knownNodeAddresses=None): - """ Causes the Node to join the Kademlia network; normally, this - should be called before any other DHT operations. - - @param knownNodeAddresses: A sequence of tuples containing IP address - information for existing nodes on the - Kademlia network, in the format: - C{(, (udp port>)} - @type knownNodeAddresses: tuple + def startNetwork(self): + """ Causes the Node to start all the underlying components needed for the DHT + to work. This should be called before any other DHT operations. """ + log.info("Starting DHT underlying components") + # Prepare the underlying Kademlia protocol if self.port is not None: try: @@ -151,6 +146,29 @@ class Node(object): import traceback log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) raise ValueError("%s lbrynet may already be running." % str(e)) + + # Start the token looping call + self.change_token_lc.start(constants.tokenSecretChangeInterval) + # #TODO: Refresh all k-buckets further away than this node's closest neighbour + # Start refreshing k-buckets periodically, if necessary + self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, + self._refreshNode) + self.hash_watcher.tick() + + @defer.inlineCallbacks + def joinNetwork(self, knownNodeAddresses=None): + """ Causes the Node to attempt to join the DHT network by contacting the + known DHT nodes. This can be called multiple times if the previous attempt + has failed or if the Node has lost all the contacts. + + @param knownNodeAddresses: A sequence of tuples containing IP address + information for existing nodes on the + Kademlia network, in the format: + C{(, (udp port>)} + @type knownNodeAddresses: tuple + """ + log.info("Attempting to join the DHT network") + # IGNORE:E1101 # Create temporary contact information for the list of addresses of known nodes if knownNodeAddresses != None: @@ -161,18 +179,11 @@ class Node(object): else: bootstrapContacts = None - # Start the token looping call - self.change_token_lc.start(constants.tokenSecretChangeInterval) - # Initiate the Kademlia joining sequence - perform a search for this node's own ID self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) - # #TODO: Refresh all k-buckets further away than this node's closest neighbour - # Start refreshing k-buckets periodically, if necessary - self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, - self._refreshNode) - self.hash_watcher.tick() - yield self._joinDeferred + result = yield self._joinDeferred + defer.returnValue(result) @property def contacts(self): @@ -190,6 +201,12 @@ class Node(object): print " %s:%i" % (contact.address, contact.port) print '==================================' + def hasContacts(self): + for bucket in self._routingTable._buckets: + if bucket._contacts: + return True + return False + def getApproximateTotalDHTNodes(self): # get the deepest bucket and the number of contacts in that bucket and multiply it # by the number of equivalently deep buckets in the whole DHT to get a really bad