diff --git a/lbrynet/dht/iterativefind.py b/lbrynet/dht/iterativefind.py index 608fd5418..957c69d5b 100644 --- a/lbrynet/dht/iterativefind.py +++ b/lbrynet/dht/iterativefind.py @@ -43,7 +43,6 @@ class _IterativeFind(object): self._iteration_count = 0 self.find_value_result = {} self.pending_iteration_calls = [] - self._lock = defer.DeferredLock() @property def is_find_node_request(self): @@ -83,8 +82,6 @@ class _IterativeFind(object): if contact.id == self.node.node_id: defer.returnValue(contact.id) - yield self._lock.acquire() - if contact not in self.active_contacts: self.active_contacts.append(contact) if contact not in self.shortlist: @@ -97,7 +94,6 @@ class _IterativeFind(object): if self.is_find_value_request and self.key in result: # We have found the value self.find_value_result[self.key] = result[self.key] - self._lock.release() self.finished_deferred.callback(self.find_value_result) else: if self.is_find_value_request: @@ -121,12 +117,9 @@ class _IterativeFind(object): if found_contact not in self.shortlist: self.shortlist.append(found_contact) - self._lock.release() - - if not self.finished_deferred.called: - if self.should_stop(): - self.sortByDistance(self.active_contacts) - self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))]) + if not self.finished_deferred.called and self.should_stop(): + self.sortByDistance(self.active_contacts) + self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))]) defer.returnValue(contact.id) @@ -141,20 +134,15 @@ class _IterativeFind(object): defer.returnValue(contact.id) def should_stop(self): - active_contacts_len = len(self.active_contacts) - if active_contacts_len >= constants.k: - # log.info("there are enough results %s(%s)", self.rpc, self.key.encode('hex')) + if self.prev_closest_node and self.closest_node and self.distance.is_closer(self.prev_closest_node.id, + self.closest_node.id): return True - if self.prev_closest_node and self.closest_node and self.distance.is_closer( - self.prev_closest_node.id, self.closest_node.id): - # log.info("not getting any closer %s(%s)", self.rpc, self.key.encode('hex')) + if len(self.active_contacts) >= constants.k: return True return False # Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts - @defer.inlineCallbacks def _searchIteration(self): - yield self._lock.acquire() # Sort the discovered active nodes from closest to furthest if len(self.active_contacts): self.sortByDistance(self.active_contacts) @@ -178,25 +166,18 @@ class _IterativeFind(object): for contact in to_remove: # these contacts will be re-added to the shortlist when they reply successfully self.shortlist.remove(contact) - # log.info("Active probes: %i, contacted %i/%i (%s)", len(self.active_probes), - # len(self.active_contacts), len(self.already_contacted), hex(id(self))) - # run the probes if probes: # Schedule the next iteration if there are any active # calls (Kademlia uses loose parallelism) self.searchIteration() - self._lock.release() - d = defer.gatherResults(probes) + d = defer.DeferredList(probes, consumeErrors=True) - @defer.inlineCallbacks def _remove_probes(results): - yield self._lock.acquire() for probe in probes: self.active_probes.remove(probe) - self._lock.release() - defer.returnValue(results) + return results d.addCallback(_remove_probes) @@ -204,8 +185,11 @@ class _IterativeFind(object): # If no probes were sent, there will not be any improvement, so we're done self.sortByDistance(self.active_contacts) self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))]) + elif not self.finished_deferred.called and self.should_stop(): + self.sortByDistance(self.active_contacts) + self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))]) - def searchIteration(self): + def searchIteration(self, delay=constants.iterativeLookupDelay): def _cancel_pending_iterations(result): while self.pending_iteration_calls: canceller = self.pending_iteration_calls.pop() @@ -213,12 +197,11 @@ class _IterativeFind(object): return result self.finished_deferred.addBoth(_cancel_pending_iterations) self._iteration_count += 1 - # log.debug("iteration %i %s(%s...)", self._iteration_count, self.rpc, self.key.encode('hex')[:8]) - call, cancel = self.node.reactor_callLater(1, self._search_iteration_semaphore.run, self._searchIteration) + call, cancel = self.node.reactor_callLater(delay, self._search_iteration_semaphore.run, self._searchIteration) self.pending_iteration_calls.append(cancel) def iterativeFind(node, shortlist, key, rpc): helper = _IterativeFind(node, shortlist, key, rpc) - helper.searchIteration() + helper.searchIteration(0) return helper.finished_deferred