lbry-sdk/lbrynet/dht/routingtable.py
2018-11-09 16:52:46 -05:00

318 lines
14 KiB
Python

# This library is free software, distributed under the terms of
# the GNU Lesser General Public License Version 3, or any later version.
# See the COPYING file included in this archive
#
# The docstrings in this module contain epytext markup; API documentation
# may be created by processing this file with epydoc: http://epydoc.sf.net
import random
import logging
from binascii import unhexlify
from twisted.internet import defer
from lbrynet.dht import constants, kbucket
from lbrynet.dht.error import TimeoutError
from lbrynet.dht.distance import Distance
log = logging.getLogger(__name__)
class TreeRoutingTable:
""" This class implements a routing table used by a Node class.
The Kademlia routing table is a binary tree whFose leaves are k-buckets,
where each k-bucket contains nodes with some common prefix of their IDs.
This prefix is the k-bucket's position in the binary tree; it therefore
covers some range of ID values, and together all of the k-buckets cover
the entire n-bit ID (or key) space (with no overlap).
@note: In this implementation, nodes in the tree (the k-buckets) are
added dynamically, as needed; this technique is described in the 13-page
version of the Kademlia paper, in section 2.4. It does, however, use the
C{PING} RPC-based k-bucket eviction algorithm described in section 2.2 of
that paper.
"""
#implements(IRoutingTable)
def __init__(self, parentNodeID, getTime=None):
"""
@param parentNodeID: The n-bit node ID of the node to which this
routing table belongs
@type parentNodeID: str
"""
# Create the initial (single) k-bucket covering the range of the entire n-bit ID space
self._parentNodeID = parentNodeID
self._buckets = [kbucket.KBucket(rangeMin=0, rangeMax=2 ** constants.key_bits, node_id=self._parentNodeID)]
if not getTime:
from twisted.internet import reactor
getTime = reactor.seconds
self._getTime = getTime
self._ongoing_replacements = set()
def get_contacts(self):
contacts = []
for i in range(len(self._buckets)):
for contact in self._buckets[i]._contacts:
contacts.append(contact)
return contacts
def _shouldSplit(self, bucketIndex, toAdd):
# https://stackoverflow.com/questions/32129978/highly-unbalanced-kademlia-routing-table/32187456#32187456
if self._buckets[bucketIndex].keyInRange(self._parentNodeID):
return True
contacts = self.get_contacts()
distance = Distance(self._parentNodeID)
contacts.sort(key=lambda c: distance(c.id))
kth_contact = contacts[-1] if len(contacts) < constants.k else contacts[constants.k-1]
return distance(toAdd) < distance(kth_contact.id)
def addContact(self, contact):
""" Add the given contact to the correct k-bucket; if it already
exists, its status will be updated
@param contact: The contact to add to this node's k-buckets
@type contact: kademlia.contact.Contact
@rtype: defer.Deferred
"""
if contact.id == self._parentNodeID:
return defer.succeed(None)
bucketIndex = self._kbucketIndex(contact.id)
try:
self._buckets[bucketIndex].addContact(contact)
except kbucket.BucketFull:
# The bucket is full; see if it can be split (by checking if its range includes the host node's id)
if self._shouldSplit(bucketIndex, contact.id):
self._splitBucket(bucketIndex)
# Retry the insertion attempt
return self.addContact(contact)
else:
# We can't split the k-bucket
#
# The 13 page kademlia paper specifies that the least recently contacted node in the bucket
# shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
# the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
#
# A reasonable extension to this is BEP 0005, which extends the above:
#
# Not all nodes that we learn about are equal. Some are "good" and some are not.
# Many nodes using the DHT are able to send queries and receive responses,
# but are not able to respond to queries from other nodes. It is important that
# each node's routing table must contain only known good nodes. A good node is
# a node has responded to one of our queries within the last 15 minutes. A node
# is also good if it has ever responded to one of our queries and has sent us a
# query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
# questionable. Nodes become bad when they fail to respond to multiple queries
# in a row. Nodes that we know are good are given priority over nodes with unknown status.
#
# When there are bad or questionable nodes in the bucket, the least recent is selected for
# potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
# contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
# is ignored if the pinged node replies.
def replaceContact(failure, deadContact):
"""
Callback for the deferred PING RPC to see if the node to be replaced in the k-bucket is still
responding
@type failure: twisted.python.failure.Failure
"""
failure.trap(TimeoutError)
log.debug("Replacing dead contact in bucket %i: %s:%i (%s) with %s:%i (%s)", bucketIndex,
deadContact.address, deadContact.port, deadContact.log_id(), contact.address,
contact.port, contact.log_id())
try:
self._buckets[bucketIndex].removeContact(deadContact)
except ValueError:
# The contact has already been removed (probably due to a timeout)
pass
return self.addContact(contact)
not_good_contacts = self._buckets[bucketIndex].getBadOrUnknownContacts()
if not_good_contacts:
to_replace = not_good_contacts[0]
else:
to_replace = self._buckets[bucketIndex]._contacts[0]
if to_replace not in self._ongoing_replacements:
log.debug("pinging %s:%s", to_replace.address, to_replace.port)
self._ongoing_replacements.add(to_replace)
df = to_replace.ping()
df.addErrback(replaceContact, to_replace)
df.addBoth(lambda _: self._ongoing_replacements.remove(to_replace))
else:
df = defer.succeed(None)
return df
else:
self.touchKBucketByIndex(bucketIndex)
return defer.succeed(None)
def findCloseNodes(self, key, count=None, sender_node_id=None):
""" Finds a number of known nodes closest to the node/value with the
specified key.
@param key: the n-bit key (i.e. the node or value ID) to search for
@type key: str
@param count: the amount of contacts to return, default of k (8)
@type count: int
@param sender_node_id: Used during RPC, this is be the sender's Node ID
Whatever ID is passed in the parameter will get
excluded from the list of returned contacts.
@type sender_node_id: str
@return: A list of node contacts (C{kademlia.contact.Contact instances})
closest to the specified key.
This method will return C{k} (or C{count}, if specified)
contacts if at all possible; it will only return fewer if the
node is returning all of the contacts that it knows of.
@rtype: list
"""
exclude = [self._parentNodeID]
if sender_node_id:
exclude.append(sender_node_id)
if key in exclude:
exclude.remove(key)
count = count or constants.k
distance = Distance(key)
contacts = self.get_contacts()
contacts = [c for c in contacts if c.id not in exclude]
contacts.sort(key=lambda c: distance(c.id))
return contacts[:min(count, len(contacts))]
def getContact(self, contactID):
""" Returns the (known) contact with the specified node ID
@raise ValueError: No contact with the specified contact ID is known
by this node
"""
bucketIndex = self._kbucketIndex(contactID)
return self._buckets[bucketIndex].getContact(contactID)
def getRefreshList(self, startIndex=0, force=False):
""" Finds all k-buckets that need refreshing, starting at the
k-bucket with the specified index, and returns IDs to be searched for
in order to refresh those k-buckets
@param startIndex: The index of the bucket to start refreshing at;
this bucket and those further away from it will
be refreshed. For example, when joining the
network, this node will set this to the index of
the bucket after the one containing it's closest
neighbour.
@type startIndex: index
@param force: If this is C{True}, all buckets (in the specified range)
will be refreshed, regardless of the time they were last
accessed.
@type force: bool
@return: A list of node ID's that the parent node should search for
in order to refresh the routing Table
@rtype: list
"""
bucketIndex = startIndex
refreshIDs = []
now = int(self._getTime())
for bucket in self._buckets[startIndex:]:
if force or now - bucket.lastAccessed >= constants.refreshTimeout:
searchID = self._randomIDInBucketRange(bucketIndex)
refreshIDs.append(searchID)
bucketIndex += 1
return refreshIDs
def removeContact(self, contact):
"""
Remove the contact from the routing table
@param contact: The contact to remove
@type contact: dht.contact._Contact
"""
bucketIndex = self._kbucketIndex(contact.id)
try:
self._buckets[bucketIndex].removeContact(contact)
except ValueError:
return
def touchKBucket(self, key):
""" Update the "last accessed" timestamp of the k-bucket which covers
the range containing the specified key in the key/ID space
@param key: A key in the range of the target k-bucket
@type key: str
"""
self.touchKBucketByIndex(self._kbucketIndex(key))
def touchKBucketByIndex(self, bucketIndex):
self._buckets[bucketIndex].lastAccessed = int(self._getTime())
def _kbucketIndex(self, key):
""" Calculate the index of the k-bucket which is responsible for the
specified key (or ID)
@param key: The key for which to find the appropriate k-bucket index
@type key: str
@return: The index of the k-bucket responsible for the specified key
@rtype: int
"""
i = 0
for bucket in self._buckets:
if bucket.keyInRange(key):
return i
else:
i += 1
return i
def _randomIDInBucketRange(self, bucketIndex):
""" Returns a random ID in the specified k-bucket's range
@param bucketIndex: The index of the k-bucket to use
@type bucketIndex: int
"""
idValue = random.randrange(
self._buckets[bucketIndex].rangeMin, self._buckets[bucketIndex].rangeMax)
randomID = hex(idValue)[2:]
if randomID[-1] == 'L':
randomID = randomID[:-1]
if len(randomID) % 2 != 0:
randomID = '0' + randomID
randomID = unhexlify(randomID)
randomID = ((constants.key_bits // 8) - len(randomID)) * b'\x00' + randomID
return randomID
def _splitBucket(self, oldBucketIndex):
""" Splits the specified k-bucket into two new buckets which together
cover the same range in the key/ID space
@param oldBucketIndex: The index of k-bucket to split (in this table's
list of k-buckets)
@type oldBucketIndex: int
"""
# Resize the range of the current (old) k-bucket
oldBucket = self._buckets[oldBucketIndex]
splitPoint = oldBucket.rangeMax - (oldBucket.rangeMax - oldBucket.rangeMin) / 2
# Create a new k-bucket to cover the range split off from the old bucket
newBucket = kbucket.KBucket(splitPoint, oldBucket.rangeMax, self._parentNodeID)
oldBucket.rangeMax = splitPoint
# Now, add the new bucket into the routing table tree
self._buckets.insert(oldBucketIndex + 1, newBucket)
# Finally, copy all nodes that belong to the new k-bucket into it...
for contact in oldBucket._contacts:
if newBucket.keyInRange(contact.id):
newBucket.addContact(contact)
# ...and remove them from the old bucket
for contact in newBucket._contacts:
oldBucket.removeContact(contact)
def contactInRoutingTable(self, address_tuple):
for bucket in self._buckets:
for contact in bucket.getContacts(sort_distance_to=False):
if address_tuple[0] == contact.address and address_tuple[1] == contact.port:
return True
return False
def bucketsWithContacts(self):
count = 0
for bucket in self._buckets:
if len(bucket):
count += 1
return count