diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index d51956077..866f6c82e 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -34,6 +34,7 @@ def rpcmethod(func): func.rpcmethod = True return func + class Node(object): """ Local node in the Kademlia network @@ -591,176 +592,11 @@ class Node(object): # This is used during the bootstrap process; node ID's are most probably fake shortlist = startupShortlist - # List of active queries; len() indicates number of active probes - # - # n.b: using lists for these variables, because Python doesn't - # allow binding a new value to a name in an enclosing - # (non-global) scope - activeProbes = [] - # List of contact IDs that have already been queried - alreadyContacted = [] - # Probes that were active during the previous iteration - # A list of found and known-to-be-active remote nodes - activeContacts = [] - # This should only contain one entry; the next scheduled iteration call - pendingIterationCalls = [] - prevClosestNode = [None] - findValueResult = {} - slowNodeCount = [0] - - def extendShortlist(responseTuple): - """ @type responseMsg: kademlia.msgtypes.ResponseMessage """ - # The "raw response" tuple contains the response message, - # and the originating address info - responseMsg = responseTuple[0] - originAddress = responseTuple[1] # tuple: (ip adress, udp port) - # Make sure the responding node is valid, and abort the operation if it isn't - if responseMsg.nodeID in activeContacts or responseMsg.nodeID == self.id: - return responseMsg.nodeID - - # Mark this node as active - if responseMsg.nodeID in shortlist: - # Get the contact information from the shortlist... - aContact = shortlist[shortlist.index(responseMsg.nodeID)] - else: - # If it's not in the shortlist; we probably used a fake ID to reach it - # - reconstruct the contact, using the real node ID this time - aContact = Contact( - responseMsg.nodeID, originAddress[0], originAddress[1], self._protocol) - activeContacts.append(aContact) - # This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice - if responseMsg.nodeID not in alreadyContacted: - alreadyContacted.append(responseMsg.nodeID) - # Now grow extend the (unverified) shortlist with the returned contacts - result = responseMsg.response - #TODO: some validation on the result (for guarding against attacks) - # If we are looking for a value, first see if this result is the value - # we are looking for before treating it as a list of contact triples - if findValue is True and key in result and not 'contacts' in result: - # We have found the value - findValueResult[key] = result[key] - findValueResult['from_peer'] = aContact.address - else: - if findValue is True: - # We are looking for a value, and the remote node didn't have it - # - mark it as the closest "empty" node, if it is - if 'closestNodeNoValue' in findValueResult: - is_closer = ( - self._routingTable.distance(key, responseMsg.nodeID) < - self._routingTable.distance(key, activeContacts[0].id)) - if is_closer: - findValueResult['closestNodeNoValue'] = aContact - else: - findValueResult['closestNodeNoValue'] = aContact - contactTriples = result['contacts'] - else: - contactTriples = result - for contactTriple in contactTriples: - if isinstance(contactTriple, (list, tuple)) and len(contactTriple) == 3: - testContact = Contact( - contactTriple[0], contactTriple[1], contactTriple[2], self._protocol) - if testContact not in shortlist: - shortlist.append(testContact) - return responseMsg.nodeID - - def removeFromShortlist(failure): - """ @type failure: twisted.python.failure.Failure """ - failure.trap(protocol.TimeoutError) - deadContactID = failure.getErrorMessage() - if deadContactID in shortlist: - shortlist.remove(deadContactID) - return deadContactID - - def cancelActiveProbe(contactID): - activeProbes.pop() - if len(activeProbes) <= constants.alpha/2 and len(pendingIterationCalls): - # Force the iteration - pendingIterationCalls[0].cancel() - del pendingIterationCalls[0] - searchIteration() - - def log_error(err): - log.error(err.getErrorMessage()) - - # Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts - def searchIteration(): - slowNodeCount[0] = len(activeProbes) - # TODO: move sort_key to be a method on the class - def sort_key(firstContact, secondContact, targetKey=key): - return cmp( - self._routingTable.distance(firstContact.id, targetKey), - self._routingTable.distance(secondContact.id, targetKey) - ) - # Sort the discovered active nodes from closest to furthest - activeContacts.sort(sort_key) - # This makes sure a returning probe doesn't force calling this function by mistake - while len(pendingIterationCalls): - del pendingIterationCalls[0] - # See if should continue the search - if key in findValueResult: - outerDf.callback(findValueResult) - return - elif len(activeContacts) and findValue == False: - is_all_done = ( - len(activeContacts) >= constants.k or - ( - activeContacts[0] == prevClosestNode[0] and - len(activeProbes) == slowNodeCount[0] - ) - ) - if is_all_done: - # TODO: Re-send the FIND_NODEs to all of the k closest nodes not already queried - # - # Ok, we're done; either we have accumulated k - # active contacts or no improvement in closestNode - # has been noted - outerDf.callback(activeContacts) - return - # The search continues... - if len(activeContacts): - prevClosestNode[0] = activeContacts[0] - contactedNow = 0 - shortlist.sort(sort_key) - # Store the current shortList length before contacting other nodes - prevShortlistLength = len(shortlist) - for contact in shortlist: - if contact.id not in alreadyContacted: - activeProbes.append(contact.id) - rpcMethod = getattr(contact, rpc) - df = rpcMethod(key, rawResponse=True) - df.addCallback(extendShortlist) - df.addErrback(removeFromShortlist) - df.addCallback(cancelActiveProbe) - df.addErrback(log_error) - alreadyContacted.append(contact.id) - contactedNow += 1 - if contactedNow == constants.alpha: - break - should_lookup_active_calls = ( - len(activeProbes) > slowNodeCount[0] or - ( - len(shortlist) < constants.k and - len(activeContacts) < len(shortlist) and - len(activeProbes) > 0 - ) - ) - if should_lookup_active_calls: - # Schedule the next iteration if there are any active - # calls (Kademlia uses loose parallelism) - call = twisted.internet.reactor.callLater( - constants.iterativeLookupDelay, searchIteration) #IGNORE:E1101 - pendingIterationCalls.append(call) - # Check for a quick contact response that made an update to the shortList - elif prevShortlistLength < len(shortlist): - # Ensure that the closest contacts are taken from the updated shortList - searchIteration() - else: - # If no probes were sent, there will not be any improvement, so we're done - outerDf.callback(activeContacts) - outerDf = defer.Deferred() + + helper = _IterativeFindHelper(self, outerDf, shortlist, key, findValue, rpc) # Start the iterations - searchIteration() + helper.searchIteration() return outerDf def _refreshNode(self): @@ -796,6 +632,212 @@ class Node(object): return df +# This was originally a set of nested methods in _iterativeFind +# but they have been moved into this helper class in-order to +# have better scoping and readability +class _IterativeFindHelper(object): + # TODO: use polymorphism to search for a value or node + # instead of using a find_value flag + def __init__(self, node, outer_d, shortlist, key, find_value, rpc): + self.node = node + self.outer_d = outer_d + self.shortlist = shortlist + self.key = key + self.find_value = find_value + self.rpc = rpc + # List of active queries; len() indicates number of active probes + # + # n.b: using lists for these variables, because Python doesn't + # allow binding a new value to a name in an enclosing + # (non-global) scope + self.active_probes = [] + # List of contact IDs that have already been queried + self.already_contacted = [] + # Probes that were active during the previous iteration + # A list of found and known-to-be-active remote nodes + self.active_contacts = [] + # This should only contain one entry; the next scheduled iteration call + self.pending_iteration_calls = [] + self.prev_closest_node = [None] + self.find_value_result = {} + self.slow_node_count = [0] + + def extendShortlist(self, responseTuple): + """ @type responseMsg: kademlia.msgtypes.ResponseMessage """ + # The "raw response" tuple contains the response message, + # and the originating address info + responseMsg = responseTuple[0] + originAddress = responseTuple[1] # tuple: (ip adress, udp port) + # Make sure the responding node is valid, and abort the operation if it isn't + if responseMsg.nodeID in self.active_contacts or responseMsg.nodeID == self.node.id: + return responseMsg.nodeID + + # Mark this node as active + aContact = self._getActiveContact(responseMsg, originAddress) + self.active_contacts.append(aContact) + + # This makes sure "bootstrap"-nodes with "fake" IDs don't get queried twice + if responseMsg.nodeID not in self.already_contacted: + self.already_contacted.append(responseMsg.nodeID) + # Now grow extend the (unverified) shortlist with the returned contacts + result = responseMsg.response + #TODO: some validation on the result (for guarding against attacks) + # If we are looking for a value, first see if this result is the value + # we are looking for before treating it as a list of contact triples + if self.find_value is True and self.key in result and not 'contacts' in result: + # We have found the value + self.find_value_result[self.key] = result[self.key] + self.find_value_result['from_peer'] = aContact.address + else: + if self.find_value is True: + self._setClosestNodeValue(responseMsg, aContact) + self._keepSearching(result) + return responseMsg.nodeID + + def _getActiveContact(self, responseMsg, originAddress): + if responseMsg.nodeID in self.shortlist: + # Get the contact information from the shortlist... + return self.shortlist[self.shortlist.index(responseMsg.nodeID)] + else: + # If it's not in the shortlist; we probably used a fake ID to reach it + # - reconstruct the contact, using the real node ID this time + return Contact( + responseMsg.nodeID, originAddress[0], originAddress[1], self.node._protocol) + + def _keepSearching(self, result): + contactTriples = self._getContactTriples(result) + for contactTriple in contactTriples: + self._addIfValid(contactTriple) + + def _getContactTriples(self, result): + if self.find_value is True: + return result['contacts'] + else: + return result + + def _setClosestNodeValue(self, responseMsg, aContact): + # We are looking for a value, and the remote node didn't have it + # - mark it as the closest "empty" node, if it is + if 'closestNodeNoValue' in self.find_value_result: + if self._is_closer(responseMsg): + self.find_value_result['closestNodeNoValue'] = aContact + else: + self.find_value_result['closestNodeNoValue'] = aContact + + def _is_closer(self, responseMsg): + return ( + self.node._routingTable.distance(self.key, responseMsg.nodeID) < + self.node._routingTable.distance(self.key, self.active_contacts[0].id) + ) + + def _addIfValid(self, contactTriple): + if isinstance(contactTriple, (list, tuple)) and len(contactTriple) == 3: + testContact = Contact( + contactTriple[0], contactTriple[1], contactTriple[2], self.node._protocol) + if testContact not in self.shortlist: + self.shortlist.append(testContact) + + def removeFromShortlist(self, failure): + """ @type failure: twisted.python.failure.Failure """ + failure.trap(protocol.TimeoutError) + deadContactID = failure.getErrorMessage() + if deadContactID in self.shortlist: + self.shortlist.remove(deadContactID) + return deadContactID + + def cancelActiveProbe(self, contactID): + self.active_probes.pop() + if len(self.active_probes) <= constants.alpha/2 and len(self.pending_iteration_calls): + # Force the iteration + self.pending_iteration_calls[0].cancel() + del self.pending_iteration_calls[0] + self.searchIteration() + + # Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts + def searchIteration(self): + self.slow_node_count[0] = len(self.active_probes) + # TODO: move sort_key to be a method on the class + def sort_key(firstContact, secondContact, targetKey=self.key): + return cmp( + self.node._routingTable.distance(firstContact.id, targetKey), + self.node._routingTable.distance(secondContact.id, targetKey) + ) + # Sort the discovered active nodes from closest to furthest + self.active_contacts.sort(sort_key) + # This makes sure a returning probe doesn't force calling this function by mistake + while len(self.pending_iteration_calls): + del self.pending_iteration_calls[0] + # See if should continue the search + if self.key in self.find_value_result: + self.outer_d.callback(self.find_value_result) + return + elif len(self.active_contacts) and self.find_value == False: + if self._is_all_done(): + # TODO: Re-send the FIND_NODEs to all of the k closest nodes not already queried + # + # Ok, we're done; either we have accumulated k active + # contacts or no improvement in closestNode has been + # noted + self.outer_d.callback(self.active_contacts) + return + # The search continues... + if len(self.active_contacts): + self.prev_closest_node[0] = self.active_contacts[0] + contactedNow = 0 + self.shortlist.sort(sort_key) + # Store the current shortList length before contacting other nodes + prevShortlistLength = len(self.shortlist) + for contact in self.shortlist: + if contact.id not in self.already_contacted: + self._probeContact(contact) + contactedNow += 1 + if contactedNow == constants.alpha: + break + if self._should_lookup_active_calls(): + # Schedule the next iteration if there are any active + # calls (Kademlia uses loose parallelism) + call = twisted.internet.reactor.callLater( + constants.iterativeLookupDelay, self.searchIteration) #IGNORE:E1101 + self.pending_iteration_calls.append(call) + # Check for a quick contact response that made an update to the shortList + elif prevShortlistLength < len(self.shortlist): + # Ensure that the closest contacts are taken from the updated shortList + self.searchIteration() + else: + # If no probes were sent, there will not be any improvement, so we're done + self.outer_d.callback(self.active_contacts) + + def _probeContact(self, contact): + self.active_probes.append(contact.id) + rpcMethod = getattr(contact, self.rpc) + df = rpcMethod(self.key, rawResponse=True) + df.addCallback(self.extendShortlist) + df.addErrback(self.removeFromShortlist) + df.addCallback(self.cancelActiveProbe) + df.addErrback(log.fail(), 'Failed to contact %s', contact) + self.already_contacted.append(contact.id) + + def _should_lookup_active_calls(self): + return ( + len(self.active_probes) > self.slow_node_count[0] or + ( + len(self.shortlist) < constants.k and + len(self.active_contacts) < len(self.shortlist) and + len(self.active_probes) > 0 + ) + ) + + def _is_all_done(self): + return ( + len(self.active_contacts) >= constants.k or + ( + self.active_contacts[0] == self.prev_closest_node[0] and + len(self.active_probes) == self.slow_node_count[0] + ) + ) + + + def main(): parser = argparse.ArgumentParser(description="Launch a dht node") parser.add_argument("udp_port", help="The UDP port on which the node will listen",