diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 0543ad211..d05c49239 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -234,7 +234,7 @@ class Session(object): self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage) self.peer_manager = self.dht_node.peer_manager self.peer_finder = self.dht_node.peer_finder - self._join_dht_deferred = self.dht_node.joinNetwork(self.known_dht_nodes) + self._join_dht_deferred = self.dht_node.start(self.known_dht_nodes) self._join_dht_deferred.addCallback(lambda _: log.info("Joined the dht")) self._join_dht_deferred.addCallback(lambda _: self.hash_announcer.start()) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 34327a964..a60b32dfd 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -8,21 +8,19 @@ # may be created by processing this file with epydoc: http://epydoc.sf.net import binascii import hashlib -import operator import struct import time import logging from twisted.internet import defer, error, task -from lbrynet.core.utils import generate_id +from lbrynet.core.utils import generate_id, DeferredDict from lbrynet.core.call_later_manager import CallLaterManager from lbrynet.core.PeerManager import PeerManager - +from error import TimeoutError import constants import routingtable import datastore import protocol -from error import TimeoutError from peerfinder import DHTPeerFinder from contact import ContactManager from distance import Distance @@ -172,45 +170,93 @@ class Node(MockKademliaHelper): log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) raise ValueError("%s lbrynet may already be running." % str(e)) else: - log.warning("Already bound to port %d", self._listeningPort.port) + log.warning("Already bound to port %s", self._listeningPort) - def bootstrap_join(self, known_node_addresses, finished_d): + @defer.inlineCallbacks + def joinNetwork(self, known_node_addresses=(('jack.lbry.tech', 4455), )): """ Attempt to join the dht, retry every 30 seconds if unsuccessful :param known_node_addresses: [(str, int)] list of hostnames and ports for known dht seed nodes - :param finished_d: (defer.Deferred) called when join succeeds """ + + self._join_deferred = defer.Deferred() + known_node_resolution = {} + @defer.inlineCallbacks def _resolve_seeds(): + result = {} + for host, port in known_node_addresses: + node_address = yield self.reactor_resolve(host) + result[(host, port)] = node_address + defer.returnValue(result) + + if not known_node_resolution: + known_node_resolution = yield _resolve_seeds() + # we are one of the seed nodes, don't add ourselves + if (self.externalIP, self.port) in known_node_resolution.itervalues(): + del known_node_resolution[(self.externalIP, self.port)] + known_node_addresses.remove((self.externalIP, self.port)) + + def _ping_contacts(contacts): + d = DeferredDict({contact: contact.ping() for contact in contacts}, consumeErrors=True) + d.addErrback(lambda err: err.trap(TimeoutError)) + return d + + @defer.inlineCallbacks + def _initialize_routing(): bootstrap_contacts = [] - for node_address, port in known_node_addresses: - host = yield self.reactor_resolve(node_address) - # Create temporary contact information for the list of addresses of known nodes - contact = Contact(self._generateID(), host, port, self._protocol) - bootstrap_contacts.append(contact) - if not bootstrap_contacts: - if not self.hasContacts(): - log.warning("No known contacts!") + contact_addresses = {(c.address, c.port): c for c in self.contacts} + for (host, port), ip_address in known_node_resolution.iteritems(): + if (host, port) not in contact_addresses: + # Create temporary contact information for the list of addresses of known nodes + # The contact node id will be set with the responding node id when we initialize it to None + contact = self.contact_manager.make_contact(None, ip_address, port, self._protocol) + bootstrap_contacts.append(contact) else: - log.info("found contacts") - bootstrap_contacts = self.contacts - defer.returnValue(bootstrap_contacts) + for contact in self.contacts: + if contact.address == ip_address and contact.port == port: + if not contact.id: + bootstrap_contacts.append(contact) + break + if not bootstrap_contacts: + log.warning("no bootstrap contacts to ping") + ping_result = yield _ping_contacts(bootstrap_contacts) + shortlist = ping_result.keys() + if not shortlist: + log.warning("failed to ping %i bootstrap contacts", len(bootstrap_contacts)) + defer.returnValue(None) + else: + # find the closest peers to us + closest = yield self._iterativeFind(self.node_id, shortlist) + yield _ping_contacts(closest) + # query random hashes in our bucket key ranges to fill or split them + random_ids_in_range = self._routingTable.getRefreshList(force=True) + while random_ids_in_range: + yield self.iterativeFindNode(random_ids_in_range.pop()) + defer.returnValue(None) - def _rerun(closest_nodes): - if not closest_nodes: - log.info("Failed to join the dht, re-attempting in 30 seconds") - self.reactor_callLater(30, self.bootstrap_join, known_node_addresses, finished_d) - elif not finished_d.called: - finished_d.callback(closest_nodes) + @defer.inlineCallbacks + def _iterative_join(joined_d=None, last_buckets_with_contacts=None): + log.info("Attempting to join the DHT network, %i contacts known so far", len(self.contacts)) + joined_d = joined_d or defer.Deferred() + yield _initialize_routing() + buckets_with_contacts = self.bucketsWithContacts() + if last_buckets_with_contacts and last_buckets_with_contacts == buckets_with_contacts: + if not joined_d.called: + joined_d.callback(True) + elif buckets_with_contacts < 4: + self.reactor_callLater(1, _iterative_join, joined_d, buckets_with_contacts) + elif not joined_d.called: + joined_d.callback(None) + yield joined_d + if not self._join_deferred.called: + self._join_deferred.callback(True) + defer.returnValue(None) - log.info("Attempting to join the DHT network") - d = _resolve_seeds() - # Initiate the Kademlia joining sequence - perform a search for this node's own ID - d.addCallback(lambda contacts: self._iterativeFind(self.node_id, contacts)) - d.addCallback(_rerun) + yield _iterative_join() @defer.inlineCallbacks - def joinNetwork(self, known_node_addresses=None): + def start(self, known_node_addresses=None): """ Causes the Node to attempt to join the DHT network by contacting the known DHT nodes. This can be called multiple times if the previous attempt has failed or if the Node has lost all the contacts. @@ -225,9 +271,10 @@ class Node(MockKademliaHelper): self.start_listening() yield self._protocol._listening # TODO: Refresh all k-buckets further away than this node's closest neighbour + yield self.joinNetwork(known_node_addresses or []) + self.safe_start_looping_call(self._change_token_lc, constants.tokenSecretChangeInterval) # Start refreshing k-buckets periodically, if necessary - self.bootstrap_join(known_node_addresses or [], self._joinDeferred) self.safe_start_looping_call(self._refresh_node_lc, constants.checkRefreshInterval) @property @@ -244,6 +291,8 @@ class Node(MockKademliaHelper): return True return False + def bucketsWithContacts(self): + return self._routingTable.bucketsWithContacts() def announceHaveBlob(self, key): return self.iterativeAnnounceHaveBlob( key, {