forked from LBRYCommunity/lbry-sdk
224 lines
9.8 KiB
Python
224 lines
9.8 KiB
Python
import logging
|
|
from twisted.internet import defer
|
|
from distance import Distance
|
|
from error import TimeoutError
|
|
import constants
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def get_contact(contact_list, node_id, address, port):
|
|
for contact in contact_list:
|
|
if contact.id == node_id and contact.address == address and contact.port == port:
|
|
return contact
|
|
raise IndexError(node_id)
|
|
|
|
|
|
class _IterativeFind(object):
|
|
# TODO: use polymorphism to search for a value or node
|
|
# instead of using a find_value flag
|
|
def __init__(self, node, shortlist, key, rpc):
|
|
self.node = node
|
|
self.finished_deferred = defer.Deferred()
|
|
# all distance operations in this class only care about the distance
|
|
# to self.key, so this makes it easier to calculate those
|
|
self.distance = Distance(key)
|
|
# The closest known and active node yet found
|
|
self.closest_node = None if not shortlist else shortlist[0]
|
|
self.prev_closest_node = None
|
|
# Shortlist of contact objects (the k closest known contacts to the key from the routing table)
|
|
self.shortlist = shortlist
|
|
# The search key
|
|
self.key = str(key)
|
|
# The rpc method name (findValue or findNode)
|
|
self.rpc = rpc
|
|
# List of active queries; len() indicates number of active probes
|
|
self.active_probes = []
|
|
# List of contact (address, port) tuples that have already been queried, includes contacts that didn't reply
|
|
self.already_contacted = []
|
|
# A list of found and known-to-be-active remote nodes (Contact objects)
|
|
self.active_contacts = []
|
|
# Ensure only one searchIteration call is running at a time
|
|
self._search_iteration_semaphore = defer.DeferredSemaphore(1)
|
|
self._iteration_count = 0
|
|
self.find_value_result = {}
|
|
self.pending_iteration_calls = []
|
|
self._lock = defer.DeferredLock()
|
|
|
|
@property
|
|
def is_find_node_request(self):
|
|
return self.rpc == "findNode"
|
|
|
|
@property
|
|
def is_find_value_request(self):
|
|
return self.rpc == "findValue"
|
|
|
|
def is_closer(self, contact):
|
|
if not self.closest_node:
|
|
return True
|
|
return self.distance.is_closer(contact.id, self.closest_node.id)
|
|
|
|
def getContactTriples(self, result):
|
|
if self.is_find_value_request:
|
|
contact_triples = result['contacts']
|
|
else:
|
|
contact_triples = result
|
|
for contact_tup in contact_triples:
|
|
if not isinstance(contact_tup, (list, tuple)) or len(contact_tup) != 3:
|
|
raise ValueError("invalid contact triple")
|
|
return contact_triples
|
|
|
|
def sortByDistance(self, contact_list):
|
|
"""Sort the list of contacts in order by distance from key"""
|
|
contact_list.sort(key=lambda c: self.distance(c.id))
|
|
|
|
@defer.inlineCallbacks
|
|
def extendShortlist(self, contact, result):
|
|
# The "raw response" tuple contains the response message and the originating address info
|
|
originAddress = (contact.address, contact.port)
|
|
if self.finished_deferred.called:
|
|
defer.returnValue(contact.id)
|
|
if self.node.contact_manager.is_ignored(originAddress):
|
|
raise ValueError("contact is ignored")
|
|
if contact.id == self.node.node_id:
|
|
defer.returnValue(contact.id)
|
|
|
|
yield self._lock.acquire()
|
|
|
|
if contact not in self.active_contacts:
|
|
self.active_contacts.append(contact)
|
|
if contact not in self.shortlist:
|
|
self.shortlist.append(contact)
|
|
|
|
# Now grow extend the (unverified) shortlist with the returned contacts
|
|
# TODO: some validation on the result (for guarding against attacks)
|
|
# If we are looking for a value, first see if this result is the value
|
|
# we are looking for before treating it as a list of contact triples
|
|
if self.is_find_value_request and self.key in result:
|
|
# We have found the value
|
|
self.find_value_result[self.key] = result[self.key]
|
|
self._lock.release()
|
|
self.finished_deferred.callback(self.find_value_result)
|
|
else:
|
|
if self.is_find_value_request:
|
|
# We are looking for a value, and the remote node didn't have it
|
|
# - mark it as the closest "empty" node, if it is
|
|
# TODO: store to this peer after finding the value as per the kademlia spec
|
|
if 'closestNodeNoValue' in self.find_value_result:
|
|
if self.is_closer(contact):
|
|
self.find_value_result['closestNodeNoValue'] = contact
|
|
else:
|
|
self.find_value_result['closestNodeNoValue'] = contact
|
|
contactTriples = self.getContactTriples(result)
|
|
for contactTriple in contactTriples:
|
|
if (contactTriple[1], contactTriple[2]) in ((c.address, c.port) for c in self.already_contacted):
|
|
continue
|
|
elif self.node.contact_manager.is_ignored((contactTriple[1], contactTriple[2])):
|
|
raise ValueError("contact is ignored")
|
|
else:
|
|
found_contact = self.node.contact_manager.make_contact(contactTriple[0], contactTriple[1],
|
|
contactTriple[2], self.node._protocol)
|
|
if found_contact not in self.shortlist:
|
|
self.shortlist.append(found_contact)
|
|
|
|
self._lock.release()
|
|
|
|
if not self.finished_deferred.called:
|
|
if self.should_stop():
|
|
self.sortByDistance(self.active_contacts)
|
|
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
|
|
|
defer.returnValue(contact.id)
|
|
|
|
@defer.inlineCallbacks
|
|
def probeContact(self, contact):
|
|
fn = getattr(contact, self.rpc)
|
|
try:
|
|
response = yield fn(self.key)
|
|
result = yield self.extendShortlist(contact, response)
|
|
defer.returnValue(result)
|
|
except (TimeoutError, defer.CancelledError, ValueError, IndexError):
|
|
defer.returnValue(contact.id)
|
|
|
|
def should_stop(self):
|
|
active_contacts_len = len(self.active_contacts)
|
|
if active_contacts_len >= constants.k:
|
|
# log.info("there are enough results %s(%s)", self.rpc, self.key.encode('hex'))
|
|
return True
|
|
if self.prev_closest_node and self.closest_node and self.distance.is_closer(
|
|
self.prev_closest_node.id, self.closest_node.id):
|
|
# log.info("not getting any closer %s(%s)", self.rpc, self.key.encode('hex'))
|
|
return True
|
|
return False
|
|
|
|
# Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts
|
|
@defer.inlineCallbacks
|
|
def _searchIteration(self):
|
|
yield self._lock.acquire()
|
|
# Sort the discovered active nodes from closest to furthest
|
|
if len(self.active_contacts):
|
|
self.sortByDistance(self.active_contacts)
|
|
self.prev_closest_node = self.closest_node
|
|
self.closest_node = self.active_contacts[0]
|
|
|
|
# Sort and store the current shortList length before contacting other nodes
|
|
self.sortByDistance(self.shortlist)
|
|
probes = []
|
|
already_contacted_addresses = {(c.address, c.port) for c in self.already_contacted}
|
|
to_remove = []
|
|
for contact in self.shortlist:
|
|
if (contact.address, contact.port) not in already_contacted_addresses:
|
|
self.already_contacted.append(contact)
|
|
to_remove.append(contact)
|
|
probe = self.probeContact(contact)
|
|
probes.append(probe)
|
|
self.active_probes.append(probe)
|
|
if len(probes) == constants.alpha:
|
|
break
|
|
for contact in to_remove: # these contacts will be re-added to the shortlist when they reply successfully
|
|
self.shortlist.remove(contact)
|
|
|
|
# log.info("Active probes: %i, contacted %i/%i (%s)", len(self.active_probes),
|
|
# len(self.active_contacts), len(self.already_contacted), hex(id(self)))
|
|
|
|
# run the probes
|
|
if probes:
|
|
# Schedule the next iteration if there are any active
|
|
# calls (Kademlia uses loose parallelism)
|
|
self.searchIteration()
|
|
self._lock.release()
|
|
|
|
d = defer.gatherResults(probes)
|
|
|
|
@defer.inlineCallbacks
|
|
def _remove_probes(results):
|
|
yield self._lock.acquire()
|
|
for probe in probes:
|
|
self.active_probes.remove(probe)
|
|
self._lock.release()
|
|
defer.returnValue(results)
|
|
|
|
d.addCallback(_remove_probes)
|
|
|
|
elif not self.finished_deferred.called and not self.active_probes:
|
|
# If no probes were sent, there will not be any improvement, so we're done
|
|
self.sortByDistance(self.active_contacts)
|
|
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
|
|
|
def searchIteration(self):
|
|
def _cancel_pending_iterations(result):
|
|
while self.pending_iteration_calls:
|
|
canceller = self.pending_iteration_calls.pop()
|
|
canceller()
|
|
return result
|
|
self.finished_deferred.addBoth(_cancel_pending_iterations)
|
|
self._iteration_count += 1
|
|
# log.debug("iteration %i %s(%s...)", self._iteration_count, self.rpc, self.key.encode('hex')[:8])
|
|
call, cancel = self.node.reactor_callLater(1, self._search_iteration_semaphore.run, self._searchIteration)
|
|
self.pending_iteration_calls.append(cancel)
|
|
|
|
|
|
def iterativeFind(node, shortlist, key, rpc):
|
|
helper = _IterativeFind(node, shortlist, key, rpc)
|
|
helper.searchIteration()
|
|
return helper.finished_deferred
|