diff --git a/lbrynet/dht/routingtable.py b/lbrynet/dht/routingtable.py index 8c905a114..5a4db509a 100644 --- a/lbrynet/dht/routingtable.py +++ b/lbrynet/dht/routingtable.py @@ -7,9 +7,11 @@ import random from zope.interface import implements +from twisted.internet import defer import constants import kbucket -import protocol +from error import TimeoutError +from distance import Distance from interface import IRoutingTable import logging @@ -73,58 +75,75 @@ class TreeRoutingTable(object): @param contact: The contact to add to this node's k-buckets @type contact: kademlia.contact.Contact - """ - if contact.id == self._parentNodeID: - return + @rtype: defer.Deferred + """ + + if contact.id == self._parentNodeID: + return defer.succeed(None) bucketIndex = self._kbucketIndex(contact.id) try: self._buckets[bucketIndex].addContact(contact) except kbucket.BucketFull: - # The bucket is full; see if it can be split (by checking - # if its range includes the host node's id) + # The bucket is full; see if it can be split (by checking if its range includes the host node's id) if self._shouldSplit(bucketIndex, contact.id): self._splitBucket(bucketIndex) # Retry the insertion attempt - self.addContact(contact) + return self.addContact(contact) else: - # We can't split the k-bucket - # NOTE: - # In section 2.4 of the 13-page version of the - # Kademlia paper, it is specified that in this case, - # the new contact should simply be dropped. However, - # in section 2.2, it states that the head contact in - # the k-bucket (i.e. the least-recently seen node) - # should be pinged - if it does not reply, it should - # be dropped, and the new contact added to the tail of - # the k-bucket. This implementation follows section - # 2.2 regarding this point. - def replaceContact(failure, deadContactID): - """ Callback for the deferred PING RPC to see if the head - node in the k-bucket is still responding + # We can't split the k-bucket + # + # The 13 page kademlia paper specifies that the least recently contacted node in the bucket + # shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful + # the new contact is ignored and not added to the bucket (sections 2.2 and 2.4). + # + # A reasonable extension to this is BEP 0005, which extends the above: + # + # Not all nodes that we learn about are equal. Some are "good" and some are not. + # Many nodes using the DHT are able to send queries and receive responses, + # but are not able to respond to queries from other nodes. It is important that + # each node's routing table must contain only known good nodes. A good node is + # a node has responded to one of our queries within the last 15 minutes. A node + # is also good if it has ever responded to one of our queries and has sent us a + # query within the last 15 minutes. After 15 minutes of inactivity, a node becomes + # questionable. Nodes become bad when they fail to respond to multiple queries + # in a row. Nodes that we know are good are given priority over nodes with unknown status. + # + # When there are bad or questionable nodes in the bucket, the least recent is selected for + # potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent) + # contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact + # is ignored if the pinged node replies. + + def replaceContact(failure, deadContact): + """ + Callback for the deferred PING RPC to see if the node to be replaced in the k-bucket is still + responding @type failure: twisted.python.failure.Failure """ - failure.trap(protocol.TimeoutError) - if len(deadContactID) != constants.key_bits / 8: - raise ValueError("invalid contact id") - log.debug("Replacing dead contact: %s", deadContactID.encode('hex')) + failure.trap(TimeoutError) + log.debug("Replacing dead contact in bucket %i: %s:%i (%s) with %s:%i (%s)", bucketIndex, + deadContact.address, deadContact.port, deadContact.log_id(), contact.address, contact.port, + contact.log_id()) try: - # Remove the old contact... - self._buckets[bucketIndex].removeContact(deadContactID) + self._buckets[bucketIndex].removeContact(deadContact) except ValueError: # The contact has already been removed (probably due to a timeout) pass - # ...and add the new one at the tail of the bucket - self.addContact(contact) + return self.addContact(contact) - # Ping the least-recently seen contact in this k-bucket - head_contact = self._buckets[bucketIndex]._contacts[0] - df = head_contact.ping() - # If there's an error (i.e. timeout), remove the head - # contact, and append the new one - df.addErrback(replaceContact, head_contact.id) + not_good_contacts = self._buckets[bucketIndex].getBadOrUnknownContacts() + if not_good_contacts: + to_replace = not_good_contacts[0] + else: + to_replace = self._buckets[bucketIndex]._contacts[0] + df = to_replace.ping() + df.addErrback(replaceContact, to_replace) + return df + else: + self.touchKBucketByIndex(bucketIndex) + return defer.succeed(None) def findCloseNodes(self, key, count, sender_node_id=None): """ Finds a number of known nodes closest to the node/value with the @@ -250,7 +269,9 @@ class TreeRoutingTable(object): @param key: A key in the range of the target k-bucket @type key: str """ - bucketIndex = self._kbucketIndex(key) + self.touchKBucketByIndex(self._kbucketIndex(key)) + + def touchKBucketByIndex(self, bucketIndex): self._buckets[bucketIndex].lastAccessed = int(self._getTime()) def _kbucketIndex(self, key): @@ -312,4 +333,16 @@ class TreeRoutingTable(object): for contact in newBucket._contacts: oldBucket.removeContact(contact) + def contactInRoutingTable(self, address_tuple): + for bucket in self._buckets: + for contact in bucket.getContacts(sort_distance_to=False): + if address_tuple[0] == contact.address and address_tuple[1] == contact.port: + return True + return False + def bucketsWithContacts(self): + count = 0 + for bucket in self._buckets: + if len(bucket): + count += 1 + return count