diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 3150a0226..73e639688 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -100,7 +100,7 @@ class Node(object): # information from the DHT as soon as the node is part of the # network (add callbacks to this deferred if scheduling such # operations before the node has finished joining the network) - self._joinDeferred = None + self._joinDeferred = defer.Deferred(None) self.change_token_lc = task.LoopingCall(self.change_token) self.change_token_lc.clock = self.clock self.refresh_node_lc = task.LoopingCall(self._refreshNode) @@ -164,6 +164,45 @@ class Node(object): if self.hash_watcher.lc.running: yield self.hash_watcher.stop() + def start_listening(self): + try: + self._listeningPort = self.reactor_listenUDP(self.port, self._protocol) + log.info("DHT node listening on %i", self.port) + except error.CannotListenError as e: + import traceback + log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) + raise ValueError("%s lbrynet may already be running." % str(e)) + + def bootstrap_join(self, known_node_addresses, finished_d): + @defer.inlineCallbacks + def _resolve_seeds(): + bootstrap_contacts = [] + for node_address, port in known_node_addresses: + host = yield self.reactor_resolve(node_address) + # Create temporary contact information for the list of addresses of known nodes + contact = Contact(self._generateID(), host, port, self._protocol) + bootstrap_contacts.append(contact) + if not bootstrap_contacts: + if not self.hasContacts(): + log.warning("No known contacts!") + else: + log.info("found contacts") + bootstrap_contacts = self.contacts + defer.returnValue(bootstrap_contacts) + + def _rerun(bootstrap_contacts): + if not bootstrap_contacts: + log.info("Failed to join the dht, re-attempting in 60 seconds") + self.reactor_callLater(60, self.bootstrap_join, known_node_addresses, finished_d) + elif not finished_d.called: + finished_d.callback(bootstrap_contacts) + + log.info("Attempting to join the DHT network") + d = _resolve_seeds() + # Initiate the Kademlia joining sequence - perform a search for this node's own ID + d.addCallback(lambda contacts: self._iterativeFind(self.node_id, contacts)) + d.addCallback(_rerun) + @defer.inlineCallbacks def joinNetwork(self, known_node_addresses=None): """ Causes the Node to attempt to join the DHT network by contacting the @@ -177,38 +216,16 @@ class Node(object): @type known_node_addresses: list """ - try: - self._listeningPort = reactor.listenUDP(self.port, self._protocol) - log.info("DHT node listening on %i", self.port) - except error.CannotListenError as e: - import traceback - log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) - raise ValueError("%s lbrynet may already be running." % str(e)) - - known_node_addresses = known_node_addresses or [] - bootstrap_contacts = [] - for node_address, port in known_node_addresses: - host = yield reactor.resolve(node_address) - # Create temporary contact information for the list of addresses of known nodes - contact = Contact(self._generateID(), host, port, self._protocol) - bootstrap_contacts.append(contact) - - log.info("Attempting to join the DHT network") - - # Initiate the Kademlia joining sequence - perform a search for this node's own ID - self._joinDeferred = self._iterativeFind(self.node_id, bootstrap_contacts) + self.start_listening() # #TODO: Refresh all k-buckets further away than this node's closest neighbour # Start refreshing k-buckets periodically, if necessary - self.hash_watcher.tick() + self.bootstrap_join(known_node_addresses or [], self._joinDeferred) yield self._joinDeferred self.hash_watcher.start() - self.change_token_lc.start(constants.tokenSecretChangeInterval) self.refresh_node_lc.start(constants.checkRefreshInterval) self.hash_announcer.run_manage_loop() - #TODO: re-attempt joining the network if it fails - @property def contacts(self): def _inner():