forked from LBRYCommunity/lbry-sdk
Refactor dht.node.iterativeFind
Move nested functions into a helper class. Add new, smaller functions to increase readability
This commit is contained in:
parent
fa8190356a
commit
740fad5cbe
1 changed files with 210 additions and 168 deletions
|
@ -34,6 +34,7 @@ def rpcmethod(func):
|
||||||
func.rpcmethod = True
|
func.rpcmethod = True
|
||||||
return func
|
return func
|
||||||
|
|
||||||
|
|
||||||
class Node(object):
|
class Node(object):
|
||||||
""" Local node in the Kademlia network
|
""" Local node in the Kademlia network
|
||||||
|
|
||||||
|
@ -591,176 +592,11 @@ class Node(object):
|
||||||
# This is used during the bootstrap process; node ID's are most probably fake
|
# This is used during the bootstrap process; node ID's are most probably fake
|
||||||
shortlist = startupShortlist
|
shortlist = startupShortlist
|
||||||
|
|
||||||
# List of active queries; len() indicates number of active probes
|
|
||||||
#
|
|
||||||
# n.b: using lists for these variables, because Python doesn't
|
|
||||||
# allow binding a new value to a name in an enclosing
|
|
||||||
# (non-global) scope
|
|
||||||
activeProbes = []
|
|
||||||
# List of contact IDs that have already been queried
|
|
||||||
alreadyContacted = []
|
|
||||||
# Probes that were active during the previous iteration
|
|
||||||
# A list of found and known-to-be-active remote nodes
|
|
||||||
activeContacts = []
|
|
||||||
# This should only contain one entry; the next scheduled iteration call
|
|
||||||
pendingIterationCalls = []
|
|
||||||
prevClosestNode = [None]
|
|
||||||
findValueResult = {}
|
|
||||||
slowNodeCount = [0]
|
|
||||||
|
|
||||||
def extendShortlist(responseTuple):
|
|
||||||
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """
|
|
||||||
# The "raw response" tuple contains the response message,
|
|
||||||
# and the originating address info
|
|
||||||
responseMsg = responseTuple[0]
|
|
||||||
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
|
|
||||||
# Make sure the responding node is valid, and abort the operation if it isn't
|
|
||||||
if responseMsg.nodeID in activeContacts or responseMsg.nodeID == self.id:
|
|
||||||
return responseMsg.nodeID
|
|
||||||
|
|
||||||
# Mark this node as active
|
|
||||||
if responseMsg.nodeID in shortlist:
|
|
||||||
# Get the contact information from the shortlist...
|
|
||||||
aContact = shortlist[shortlist.index(responseMsg.nodeID)]
|
|
||||||
else:
|
|
||||||
# If it's not in the shortlist; we probably used a fake ID to reach it
|
|
||||||
# - reconstruct the contact, using the real node ID this time
|
|
||||||
aContact = Contact(
|
|
||||||
responseMsg.nodeID, originAddress[0], originAddress[1], self._protocol)
|
|
||||||
activeContacts.append(aContact)
|
|
||||||
# This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice
|
|
||||||
if responseMsg.nodeID not in alreadyContacted:
|
|
||||||
alreadyContacted.append(responseMsg.nodeID)
|
|
||||||
# Now grow extend the (unverified) shortlist with the returned contacts
|
|
||||||
result = responseMsg.response
|
|
||||||
#TODO: some validation on the result (for guarding against attacks)
|
|
||||||
# If we are looking for a value, first see if this result is the value
|
|
||||||
# we are looking for before treating it as a list of contact triples
|
|
||||||
if findValue is True and key in result and not 'contacts' in result:
|
|
||||||
# We have found the value
|
|
||||||
findValueResult[key] = result[key]
|
|
||||||
findValueResult['from_peer'] = aContact.address
|
|
||||||
else:
|
|
||||||
if findValue is True:
|
|
||||||
# We are looking for a value, and the remote node didn't have it
|
|
||||||
# - mark it as the closest "empty" node, if it is
|
|
||||||
if 'closestNodeNoValue' in findValueResult:
|
|
||||||
is_closer = (
|
|
||||||
self._routingTable.distance(key, responseMsg.nodeID) <
|
|
||||||
self._routingTable.distance(key, activeContacts[0].id))
|
|
||||||
if is_closer:
|
|
||||||
findValueResult['closestNodeNoValue'] = aContact
|
|
||||||
else:
|
|
||||||
findValueResult['closestNodeNoValue'] = aContact
|
|
||||||
contactTriples = result['contacts']
|
|
||||||
else:
|
|
||||||
contactTriples = result
|
|
||||||
for contactTriple in contactTriples:
|
|
||||||
if isinstance(contactTriple, (list, tuple)) and len(contactTriple) == 3:
|
|
||||||
testContact = Contact(
|
|
||||||
contactTriple[0], contactTriple[1], contactTriple[2], self._protocol)
|
|
||||||
if testContact not in shortlist:
|
|
||||||
shortlist.append(testContact)
|
|
||||||
return responseMsg.nodeID
|
|
||||||
|
|
||||||
def removeFromShortlist(failure):
|
|
||||||
""" @type failure: twisted.python.failure.Failure """
|
|
||||||
failure.trap(protocol.TimeoutError)
|
|
||||||
deadContactID = failure.getErrorMessage()
|
|
||||||
if deadContactID in shortlist:
|
|
||||||
shortlist.remove(deadContactID)
|
|
||||||
return deadContactID
|
|
||||||
|
|
||||||
def cancelActiveProbe(contactID):
|
|
||||||
activeProbes.pop()
|
|
||||||
if len(activeProbes) <= constants.alpha/2 and len(pendingIterationCalls):
|
|
||||||
# Force the iteration
|
|
||||||
pendingIterationCalls[0].cancel()
|
|
||||||
del pendingIterationCalls[0]
|
|
||||||
searchIteration()
|
|
||||||
|
|
||||||
def log_error(err):
|
|
||||||
log.error(err.getErrorMessage())
|
|
||||||
|
|
||||||
# Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts
|
|
||||||
def searchIteration():
|
|
||||||
slowNodeCount[0] = len(activeProbes)
|
|
||||||
# TODO: move sort_key to be a method on the class
|
|
||||||
def sort_key(firstContact, secondContact, targetKey=key):
|
|
||||||
return cmp(
|
|
||||||
self._routingTable.distance(firstContact.id, targetKey),
|
|
||||||
self._routingTable.distance(secondContact.id, targetKey)
|
|
||||||
)
|
|
||||||
# Sort the discovered active nodes from closest to furthest
|
|
||||||
activeContacts.sort(sort_key)
|
|
||||||
# This makes sure a returning probe doesn't force calling this function by mistake
|
|
||||||
while len(pendingIterationCalls):
|
|
||||||
del pendingIterationCalls[0]
|
|
||||||
# See if should continue the search
|
|
||||||
if key in findValueResult:
|
|
||||||
outerDf.callback(findValueResult)
|
|
||||||
return
|
|
||||||
elif len(activeContacts) and findValue == False:
|
|
||||||
is_all_done = (
|
|
||||||
len(activeContacts) >= constants.k or
|
|
||||||
(
|
|
||||||
activeContacts[0] == prevClosestNode[0] and
|
|
||||||
len(activeProbes) == slowNodeCount[0]
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if is_all_done:
|
|
||||||
# TODO: Re-send the FIND_NODEs to all of the k closest nodes not already queried
|
|
||||||
#
|
|
||||||
# Ok, we're done; either we have accumulated k
|
|
||||||
# active contacts or no improvement in closestNode
|
|
||||||
# has been noted
|
|
||||||
outerDf.callback(activeContacts)
|
|
||||||
return
|
|
||||||
# The search continues...
|
|
||||||
if len(activeContacts):
|
|
||||||
prevClosestNode[0] = activeContacts[0]
|
|
||||||
contactedNow = 0
|
|
||||||
shortlist.sort(sort_key)
|
|
||||||
# Store the current shortList length before contacting other nodes
|
|
||||||
prevShortlistLength = len(shortlist)
|
|
||||||
for contact in shortlist:
|
|
||||||
if contact.id not in alreadyContacted:
|
|
||||||
activeProbes.append(contact.id)
|
|
||||||
rpcMethod = getattr(contact, rpc)
|
|
||||||
df = rpcMethod(key, rawResponse=True)
|
|
||||||
df.addCallback(extendShortlist)
|
|
||||||
df.addErrback(removeFromShortlist)
|
|
||||||
df.addCallback(cancelActiveProbe)
|
|
||||||
df.addErrback(log_error)
|
|
||||||
alreadyContacted.append(contact.id)
|
|
||||||
contactedNow += 1
|
|
||||||
if contactedNow == constants.alpha:
|
|
||||||
break
|
|
||||||
should_lookup_active_calls = (
|
|
||||||
len(activeProbes) > slowNodeCount[0] or
|
|
||||||
(
|
|
||||||
len(shortlist) < constants.k and
|
|
||||||
len(activeContacts) < len(shortlist) and
|
|
||||||
len(activeProbes) > 0
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if should_lookup_active_calls:
|
|
||||||
# Schedule the next iteration if there are any active
|
|
||||||
# calls (Kademlia uses loose parallelism)
|
|
||||||
call = twisted.internet.reactor.callLater(
|
|
||||||
constants.iterativeLookupDelay, searchIteration) #IGNORE:E1101
|
|
||||||
pendingIterationCalls.append(call)
|
|
||||||
# Check for a quick contact response that made an update to the shortList
|
|
||||||
elif prevShortlistLength < len(shortlist):
|
|
||||||
# Ensure that the closest contacts are taken from the updated shortList
|
|
||||||
searchIteration()
|
|
||||||
else:
|
|
||||||
# If no probes were sent, there will not be any improvement, so we're done
|
|
||||||
outerDf.callback(activeContacts)
|
|
||||||
|
|
||||||
outerDf = defer.Deferred()
|
outerDf = defer.Deferred()
|
||||||
|
|
||||||
|
helper = _IterativeFindHelper(self, outerDf, shortlist, key, findValue, rpc)
|
||||||
# Start the iterations
|
# Start the iterations
|
||||||
searchIteration()
|
helper.searchIteration()
|
||||||
return outerDf
|
return outerDf
|
||||||
|
|
||||||
def _refreshNode(self):
|
def _refreshNode(self):
|
||||||
|
@ -796,6 +632,212 @@ class Node(object):
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
# This was originally a set of nested methods in _iterativeFind
|
||||||
|
# but they have been moved into this helper class in-order to
|
||||||
|
# have better scoping and readability
|
||||||
|
class _IterativeFindHelper(object):
|
||||||
|
# TODO: use polymorphism to search for a value or node
|
||||||
|
# instead of using a find_value flag
|
||||||
|
def __init__(self, node, outer_d, shortlist, key, find_value, rpc):
|
||||||
|
self.node = node
|
||||||
|
self.outer_d = outer_d
|
||||||
|
self.shortlist = shortlist
|
||||||
|
self.key = key
|
||||||
|
self.find_value = find_value
|
||||||
|
self.rpc = rpc
|
||||||
|
# List of active queries; len() indicates number of active probes
|
||||||
|
#
|
||||||
|
# n.b: using lists for these variables, because Python doesn't
|
||||||
|
# allow binding a new value to a name in an enclosing
|
||||||
|
# (non-global) scope
|
||||||
|
self.active_probes = []
|
||||||
|
# List of contact IDs that have already been queried
|
||||||
|
self.already_contacted = []
|
||||||
|
# Probes that were active during the previous iteration
|
||||||
|
# A list of found and known-to-be-active remote nodes
|
||||||
|
self.active_contacts = []
|
||||||
|
# This should only contain one entry; the next scheduled iteration call
|
||||||
|
self.pending_iteration_calls = []
|
||||||
|
self.prev_closest_node = [None]
|
||||||
|
self.find_value_result = {}
|
||||||
|
self.slow_node_count = [0]
|
||||||
|
|
||||||
|
def extendShortlist(self, responseTuple):
|
||||||
|
""" @type responseMsg: kademlia.msgtypes.ResponseMessage """
|
||||||
|
# The "raw response" tuple contains the response message,
|
||||||
|
# and the originating address info
|
||||||
|
responseMsg = responseTuple[0]
|
||||||
|
originAddress = responseTuple[1] # tuple: (ip adress, udp port)
|
||||||
|
# Make sure the responding node is valid, and abort the operation if it isn't
|
||||||
|
if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id:
|
||||||
|
return responseMsg.nodeID
|
||||||
|
|
||||||
|
# Mark this node as active
|
||||||
|
aContact = self._getActiveContact(responseMsg, originAddress)
|
||||||
|
self.active_contacts.append(aContact)
|
||||||
|
|
||||||
|
# This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice
|
||||||
|
if responseMsg.nodeID not in self.already_contacted:
|
||||||
|
self.already_contacted.append(responseMsg.nodeID)
|
||||||
|
# Now grow extend the (unverified) shortlist with the returned contacts
|
||||||
|
result = responseMsg.response
|
||||||
|
#TODO: some validation on the result (for guarding against attacks)
|
||||||
|
# If we are looking for a value, first see if this result is the value
|
||||||
|
# we are looking for before treating it as a list of contact triples
|
||||||
|
if self.find_value is True and self.key in result and not 'contacts' in result:
|
||||||
|
# We have found the value
|
||||||
|
self.find_value_result[self.key] = result[self.key]
|
||||||
|
self.find_value_result['from_peer'] = aContact.address
|
||||||
|
else:
|
||||||
|
if self.find_value is True:
|
||||||
|
self._setClosestNodeValue(responseMsg, aContact)
|
||||||
|
self._keepSearching(result)
|
||||||
|
return responseMsg.nodeID
|
||||||
|
|
||||||
|
def _getActiveContact(self, responseMsg, originAddress):
|
||||||
|
if responseMsg.nodeID in self.shortlist:
|
||||||
|
# Get the contact information from the shortlist...
|
||||||
|
return self.shortlist[self.shortlist.index(responseMsg.nodeID)]
|
||||||
|
else:
|
||||||
|
# If it's not in the shortlist; we probably used a fake ID to reach it
|
||||||
|
# - reconstruct the contact, using the real node ID this time
|
||||||
|
return Contact(
|
||||||
|
responseMsg.nodeID, originAddress[0], originAddress[1], self.node._protocol)
|
||||||
|
|
||||||
|
def _keepSearching(self, result):
|
||||||
|
contactTriples = self._getContactTriples(result)
|
||||||
|
for contactTriple in contactTriples:
|
||||||
|
self._addIfValid(contactTriple)
|
||||||
|
|
||||||
|
def _getContactTriples(self, result):
|
||||||
|
if self.find_value is True:
|
||||||
|
return result['contacts']
|
||||||
|
else:
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _setClosestNodeValue(self, responseMsg, aContact):
|
||||||
|
# We are looking for a value, and the remote node didn't have it
|
||||||
|
# - mark it as the closest "empty" node, if it is
|
||||||
|
if 'closestNodeNoValue' in self.find_value_result:
|
||||||
|
if self._is_closer(responseMsg):
|
||||||
|
self.find_value_result['closestNodeNoValue'] = aContact
|
||||||
|
else:
|
||||||
|
self.find_value_result['closestNodeNoValue'] = aContact
|
||||||
|
|
||||||
|
def _is_closer(self, responseMsg):
|
||||||
|
return (
|
||||||
|
self.node._routingTable.distance(self.key, responseMsg.nodeID) <
|
||||||
|
self.node._routingTable.distance(self.key, self.active_contacts[0].id)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _addIfValid(self, contactTriple):
|
||||||
|
if isinstance(contactTriple, (list, tuple)) and len(contactTriple) == 3:
|
||||||
|
testContact = Contact(
|
||||||
|
contactTriple[0], contactTriple[1], contactTriple[2], self.node._protocol)
|
||||||
|
if testContact not in self.shortlist:
|
||||||
|
self.shortlist.append(testContact)
|
||||||
|
|
||||||
|
def removeFromShortlist(self, failure):
|
||||||
|
""" @type failure: twisted.python.failure.Failure """
|
||||||
|
failure.trap(protocol.TimeoutError)
|
||||||
|
deadContactID = failure.getErrorMessage()
|
||||||
|
if deadContactID in self.shortlist:
|
||||||
|
self.shortlist.remove(deadContactID)
|
||||||
|
return deadContactID
|
||||||
|
|
||||||
|
def cancelActiveProbe(self, contactID):
|
||||||
|
self.active_probes.pop()
|
||||||
|
if len(self.active_probes) <= constants.alpha/2 and len(self.pending_iteration_calls):
|
||||||
|
# Force the iteration
|
||||||
|
self.pending_iteration_calls[0].cancel()
|
||||||
|
del self.pending_iteration_calls[0]
|
||||||
|
self.searchIteration()
|
||||||
|
|
||||||
|
# Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts
|
||||||
|
def searchIteration(self):
|
||||||
|
self.slow_node_count[0] = len(self.active_probes)
|
||||||
|
# TODO: move sort_key to be a method on the class
|
||||||
|
def sort_key(firstContact, secondContact, targetKey=self.key):
|
||||||
|
return cmp(
|
||||||
|
self.node._routingTable.distance(firstContact.id, targetKey),
|
||||||
|
self.node._routingTable.distance(secondContact.id, targetKey)
|
||||||
|
)
|
||||||
|
# Sort the discovered active nodes from closest to furthest
|
||||||
|
self.active_contacts.sort(sort_key)
|
||||||
|
# This makes sure a returning probe doesn't force calling this function by mistake
|
||||||
|
while len(self.pending_iteration_calls):
|
||||||
|
del self.pending_iteration_calls[0]
|
||||||
|
# See if should continue the search
|
||||||
|
if self.key in self.find_value_result:
|
||||||
|
self.outer_d.callback(self.find_value_result)
|
||||||
|
return
|
||||||
|
elif len(self.active_contacts) and self.find_value == False:
|
||||||
|
if self._is_all_done():
|
||||||
|
# TODO: Re-send the FIND_NODEs to all of the k closest nodes not already queried
|
||||||
|
#
|
||||||
|
# Ok, we're done; either we have accumulated k active
|
||||||
|
# contacts or no improvement in closestNode has been
|
||||||
|
# noted
|
||||||
|
self.outer_d.callback(self.active_contacts)
|
||||||
|
return
|
||||||
|
# The search continues...
|
||||||
|
if len(self.active_contacts):
|
||||||
|
self.prev_closest_node[0] = self.active_contacts[0]
|
||||||
|
contactedNow = 0
|
||||||
|
self.shortlist.sort(sort_key)
|
||||||
|
# Store the current shortList length before contacting other nodes
|
||||||
|
prevShortlistLength = len(self.shortlist)
|
||||||
|
for contact in self.shortlist:
|
||||||
|
if contact.id not in self.already_contacted:
|
||||||
|
self._probeContact(contact)
|
||||||
|
contactedNow += 1
|
||||||
|
if contactedNow == constants.alpha:
|
||||||
|
break
|
||||||
|
if self._should_lookup_active_calls():
|
||||||
|
# Schedule the next iteration if there are any active
|
||||||
|
# calls (Kademlia uses loose parallelism)
|
||||||
|
call = twisted.internet.reactor.callLater(
|
||||||
|
constants.iterativeLookupDelay, self.searchIteration) #IGNORE:E1101
|
||||||
|
self.pending_iteration_calls.append(call)
|
||||||
|
# Check for a quick contact response that made an update to the shortList
|
||||||
|
elif prevShortlistLength < len(self.shortlist):
|
||||||
|
# Ensure that the closest contacts are taken from the updated shortList
|
||||||
|
self.searchIteration()
|
||||||
|
else:
|
||||||
|
# If no probes were sent, there will not be any improvement, so we're done
|
||||||
|
self.outer_d.callback(self.active_contacts)
|
||||||
|
|
||||||
|
def _probeContact(self, contact):
|
||||||
|
self.active_probes.append(contact.id)
|
||||||
|
rpcMethod = getattr(contact, self.rpc)
|
||||||
|
df = rpcMethod(self.key, rawResponse=True)
|
||||||
|
df.addCallback(self.extendShortlist)
|
||||||
|
df.addErrback(self.removeFromShortlist)
|
||||||
|
df.addCallback(self.cancelActiveProbe)
|
||||||
|
df.addErrback(log.fail(), 'Failed to contact %s', contact)
|
||||||
|
self.already_contacted.append(contact.id)
|
||||||
|
|
||||||
|
def _should_lookup_active_calls(self):
|
||||||
|
return (
|
||||||
|
len(self.active_probes) > self.slow_node_count[0] or
|
||||||
|
(
|
||||||
|
len(self.shortlist) < constants.k and
|
||||||
|
len(self.active_contacts) < len(self.shortlist) and
|
||||||
|
len(self.active_probes) > 0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _is_all_done(self):
|
||||||
|
return (
|
||||||
|
len(self.active_contacts) >= constants.k or
|
||||||
|
(
|
||||||
|
self.active_contacts[0] == self.prev_closest_node[0] and
|
||||||
|
len(self.active_probes) == self.slow_node_count[0]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="Launch a dht node")
|
parser = argparse.ArgumentParser(description="Launch a dht node")
|
||||||
parser.add_argument("udp_port", help="The UDP port on which the node will listen",
|
parser.add_argument("udp_port", help="The UDP port on which the node will listen",
|
||||||
|
|
Loading…
Reference in a new issue