forked from LBRYCommunity/lbry-sdk
update contact replacement in KBucket to follow BEP0005
http://www.bittorrent.org/beps/bep_0005.html
This commit is contained in:
parent
cf3359044d
commit
05241012a2
1 changed files with 69 additions and 36 deletions
|
@ -7,9 +7,11 @@
|
||||||
|
|
||||||
import random
|
import random
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
|
from twisted.internet import defer
|
||||||
import constants
|
import constants
|
||||||
import kbucket
|
import kbucket
|
||||||
import protocol
|
from error import TimeoutError
|
||||||
|
from distance import Distance
|
||||||
from interface import IRoutingTable
|
from interface import IRoutingTable
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
@ -73,58 +75,75 @@ class TreeRoutingTable(object):
|
||||||
|
|
||||||
@param contact: The contact to add to this node's k-buckets
|
@param contact: The contact to add to this node's k-buckets
|
||||||
@type contact: kademlia.contact.Contact
|
@type contact: kademlia.contact.Contact
|
||||||
"""
|
|
||||||
if contact.id == self._parentNodeID:
|
|
||||||
return
|
|
||||||
|
|
||||||
|
@rtype: defer.Deferred
|
||||||
|
"""
|
||||||
|
|
||||||
|
if contact.id == self._parentNodeID:
|
||||||
|
return defer.succeed(None)
|
||||||
bucketIndex = self._kbucketIndex(contact.id)
|
bucketIndex = self._kbucketIndex(contact.id)
|
||||||
try:
|
try:
|
||||||
self._buckets[bucketIndex].addContact(contact)
|
self._buckets[bucketIndex].addContact(contact)
|
||||||
except kbucket.BucketFull:
|
except kbucket.BucketFull:
|
||||||
# The bucket is full; see if it can be split (by checking
|
# The bucket is full; see if it can be split (by checking if its range includes the host node's id)
|
||||||
# if its range includes the host node's id)
|
|
||||||
if self._shouldSplit(bucketIndex, contact.id):
|
if self._shouldSplit(bucketIndex, contact.id):
|
||||||
self._splitBucket(bucketIndex)
|
self._splitBucket(bucketIndex)
|
||||||
# Retry the insertion attempt
|
# Retry the insertion attempt
|
||||||
self.addContact(contact)
|
return self.addContact(contact)
|
||||||
else:
|
else:
|
||||||
# We can't split the k-bucket
|
|
||||||
# NOTE:
|
|
||||||
# In section 2.4 of the 13-page version of the
|
|
||||||
# Kademlia paper, it is specified that in this case,
|
|
||||||
# the new contact should simply be dropped. However,
|
|
||||||
# in section 2.2, it states that the head contact in
|
|
||||||
# the k-bucket (i.e. the least-recently seen node)
|
|
||||||
# should be pinged - if it does not reply, it should
|
|
||||||
# be dropped, and the new contact added to the tail of
|
|
||||||
# the k-bucket. This implementation follows section
|
|
||||||
# 2.2 regarding this point.
|
|
||||||
|
|
||||||
def replaceContact(failure, deadContactID):
|
# We can't split the k-bucket
|
||||||
""" Callback for the deferred PING RPC to see if the head
|
#
|
||||||
node in the k-bucket is still responding
|
# The 13 page kademlia paper specifies that the least recently contacted node in the bucket
|
||||||
|
# shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
|
||||||
|
# the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
|
||||||
|
#
|
||||||
|
# A reasonable extension to this is BEP 0005, which extends the above:
|
||||||
|
#
|
||||||
|
# Not all nodes that we learn about are equal. Some are "good" and some are not.
|
||||||
|
# Many nodes using the DHT are able to send queries and receive responses,
|
||||||
|
# but are not able to respond to queries from other nodes. It is important that
|
||||||
|
# each node's routing table must contain only known good nodes. A good node is
|
||||||
|
# a node has responded to one of our queries within the last 15 minutes. A node
|
||||||
|
# is also good if it has ever responded to one of our queries and has sent us a
|
||||||
|
# query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
|
||||||
|
# questionable. Nodes become bad when they fail to respond to multiple queries
|
||||||
|
# in a row. Nodes that we know are good are given priority over nodes with unknown status.
|
||||||
|
#
|
||||||
|
# When there are bad or questionable nodes in the bucket, the least recent is selected for
|
||||||
|
# potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
|
||||||
|
# contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
|
||||||
|
# is ignored if the pinged node replies.
|
||||||
|
|
||||||
|
def replaceContact(failure, deadContact):
|
||||||
|
"""
|
||||||
|
Callback for the deferred PING RPC to see if the node to be replaced in the k-bucket is still
|
||||||
|
responding
|
||||||
|
|
||||||
@type failure: twisted.python.failure.Failure
|
@type failure: twisted.python.failure.Failure
|
||||||
"""
|
"""
|
||||||
failure.trap(protocol.TimeoutError)
|
failure.trap(TimeoutError)
|
||||||
if len(deadContactID) != constants.key_bits / 8:
|
log.debug("Replacing dead contact in bucket %i: %s:%i (%s) with %s:%i (%s)", bucketIndex,
|
||||||
raise ValueError("invalid contact id")
|
deadContact.address, deadContact.port, deadContact.log_id(), contact.address, contact.port,
|
||||||
log.debug("Replacing dead contact: %s", deadContactID.encode('hex'))
|
contact.log_id())
|
||||||
try:
|
try:
|
||||||
# Remove the old contact...
|
self._buckets[bucketIndex].removeContact(deadContact)
|
||||||
self._buckets[bucketIndex].removeContact(deadContactID)
|
|
||||||
except ValueError:
|
except ValueError:
|
||||||
# The contact has already been removed (probably due to a timeout)
|
# The contact has already been removed (probably due to a timeout)
|
||||||
pass
|
pass
|
||||||
# ...and add the new one at the tail of the bucket
|
return self.addContact(contact)
|
||||||
self.addContact(contact)
|
|
||||||
|
|
||||||
# Ping the least-recently seen contact in this k-bucket
|
not_good_contacts = self._buckets[bucketIndex].getBadOrUnknownContacts()
|
||||||
head_contact = self._buckets[bucketIndex]._contacts[0]
|
if not_good_contacts:
|
||||||
df = head_contact.ping()
|
to_replace = not_good_contacts[0]
|
||||||
# If there's an error (i.e. timeout), remove the head
|
else:
|
||||||
# contact, and append the new one
|
to_replace = self._buckets[bucketIndex]._contacts[0]
|
||||||
df.addErrback(replaceContact, head_contact.id)
|
df = to_replace.ping()
|
||||||
|
df.addErrback(replaceContact, to_replace)
|
||||||
|
return df
|
||||||
|
else:
|
||||||
|
self.touchKBucketByIndex(bucketIndex)
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
def findCloseNodes(self, key, count, sender_node_id=None):
|
def findCloseNodes(self, key, count, sender_node_id=None):
|
||||||
""" Finds a number of known nodes closest to the node/value with the
|
""" Finds a number of known nodes closest to the node/value with the
|
||||||
|
@ -250,7 +269,9 @@ class TreeRoutingTable(object):
|
||||||
@param key: A key in the range of the target k-bucket
|
@param key: A key in the range of the target k-bucket
|
||||||
@type key: str
|
@type key: str
|
||||||
"""
|
"""
|
||||||
bucketIndex = self._kbucketIndex(key)
|
self.touchKBucketByIndex(self._kbucketIndex(key))
|
||||||
|
|
||||||
|
def touchKBucketByIndex(self, bucketIndex):
|
||||||
self._buckets[bucketIndex].lastAccessed = int(self._getTime())
|
self._buckets[bucketIndex].lastAccessed = int(self._getTime())
|
||||||
|
|
||||||
def _kbucketIndex(self, key):
|
def _kbucketIndex(self, key):
|
||||||
|
@ -312,4 +333,16 @@ class TreeRoutingTable(object):
|
||||||
for contact in newBucket._contacts:
|
for contact in newBucket._contacts:
|
||||||
oldBucket.removeContact(contact)
|
oldBucket.removeContact(contact)
|
||||||
|
|
||||||
|
def contactInRoutingTable(self, address_tuple):
|
||||||
|
for bucket in self._buckets:
|
||||||
|
for contact in bucket.getContacts(sort_distance_to=False):
|
||||||
|
if address_tuple[0] == contact.address and address_tuple[1] == contact.port:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def bucketsWithContacts(self):
|
||||||
|
count = 0
|
||||||
|
for bucket in self._buckets:
|
||||||
|
if len(bucket):
|
||||||
|
count += 1
|
||||||
|
return count
|
||||||
|
|
Loading…
Reference in a new issue