remove deferredLock from iterativeFind
-fire the first iteration right away
This commit is contained in:
parent
adca5f5993
commit
e8b402f998
1 changed files with 14 additions and 31 deletions
|
@ -43,7 +43,6 @@ class _IterativeFind(object):
|
|||
self._iteration_count = 0
|
||||
self.find_value_result = {}
|
||||
self.pending_iteration_calls = []
|
||||
self._lock = defer.DeferredLock()
|
||||
|
||||
@property
|
||||
def is_find_node_request(self):
|
||||
|
@ -83,8 +82,6 @@ class _IterativeFind(object):
|
|||
if contact.id == self.node.node_id:
|
||||
defer.returnValue(contact.id)
|
||||
|
||||
yield self._lock.acquire()
|
||||
|
||||
if contact not in self.active_contacts:
|
||||
self.active_contacts.append(contact)
|
||||
if contact not in self.shortlist:
|
||||
|
@ -97,7 +94,6 @@ class _IterativeFind(object):
|
|||
if self.is_find_value_request and self.key in result:
|
||||
# We have found the value
|
||||
self.find_value_result[self.key] = result[self.key]
|
||||
self._lock.release()
|
||||
self.finished_deferred.callback(self.find_value_result)
|
||||
else:
|
||||
if self.is_find_value_request:
|
||||
|
@ -121,12 +117,9 @@ class _IterativeFind(object):
|
|||
if found_contact not in self.shortlist:
|
||||
self.shortlist.append(found_contact)
|
||||
|
||||
self._lock.release()
|
||||
|
||||
if not self.finished_deferred.called:
|
||||
if self.should_stop():
|
||||
self.sortByDistance(self.active_contacts)
|
||||
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
||||
if not self.finished_deferred.called and self.should_stop():
|
||||
self.sortByDistance(self.active_contacts)
|
||||
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
||||
|
||||
defer.returnValue(contact.id)
|
||||
|
||||
|
@ -141,20 +134,15 @@ class _IterativeFind(object):
|
|||
defer.returnValue(contact.id)
|
||||
|
||||
def should_stop(self):
|
||||
active_contacts_len = len(self.active_contacts)
|
||||
if active_contacts_len >= constants.k:
|
||||
# log.info("there are enough results %s(%s)", self.rpc, self.key.encode('hex'))
|
||||
if self.prev_closest_node and self.closest_node and self.distance.is_closer(self.prev_closest_node.id,
|
||||
self.closest_node.id):
|
||||
return True
|
||||
if self.prev_closest_node and self.closest_node and self.distance.is_closer(
|
||||
self.prev_closest_node.id, self.closest_node.id):
|
||||
# log.info("not getting any closer %s(%s)", self.rpc, self.key.encode('hex'))
|
||||
if len(self.active_contacts) >= constants.k:
|
||||
return True
|
||||
return False
|
||||
|
||||
# Send parallel, asynchronous FIND_NODE RPCs to the shortlist of contacts
|
||||
@defer.inlineCallbacks
|
||||
def _searchIteration(self):
|
||||
yield self._lock.acquire()
|
||||
# Sort the discovered active nodes from closest to furthest
|
||||
if len(self.active_contacts):
|
||||
self.sortByDistance(self.active_contacts)
|
||||
|
@ -178,25 +166,18 @@ class _IterativeFind(object):
|
|||
for contact in to_remove: # these contacts will be re-added to the shortlist when they reply successfully
|
||||
self.shortlist.remove(contact)
|
||||
|
||||
# log.info("Active probes: %i, contacted %i/%i (%s)", len(self.active_probes),
|
||||
# len(self.active_contacts), len(self.already_contacted), hex(id(self)))
|
||||
|
||||
# run the probes
|
||||
if probes:
|
||||
# Schedule the next iteration if there are any active
|
||||
# calls (Kademlia uses loose parallelism)
|
||||
self.searchIteration()
|
||||
self._lock.release()
|
||||
|
||||
d = defer.gatherResults(probes)
|
||||
d = defer.DeferredList(probes, consumeErrors=True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _remove_probes(results):
|
||||
yield self._lock.acquire()
|
||||
for probe in probes:
|
||||
self.active_probes.remove(probe)
|
||||
self._lock.release()
|
||||
defer.returnValue(results)
|
||||
return results
|
||||
|
||||
d.addCallback(_remove_probes)
|
||||
|
||||
|
@ -204,8 +185,11 @@ class _IterativeFind(object):
|
|||
# If no probes were sent, there will not be any improvement, so we're done
|
||||
self.sortByDistance(self.active_contacts)
|
||||
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
||||
elif not self.finished_deferred.called and self.should_stop():
|
||||
self.sortByDistance(self.active_contacts)
|
||||
self.finished_deferred.callback(self.active_contacts[:min(constants.k, len(self.active_contacts))])
|
||||
|
||||
def searchIteration(self):
|
||||
def searchIteration(self, delay=constants.iterativeLookupDelay):
|
||||
def _cancel_pending_iterations(result):
|
||||
while self.pending_iteration_calls:
|
||||
canceller = self.pending_iteration_calls.pop()
|
||||
|
@ -213,12 +197,11 @@ class _IterativeFind(object):
|
|||
return result
|
||||
self.finished_deferred.addBoth(_cancel_pending_iterations)
|
||||
self._iteration_count += 1
|
||||
# log.debug("iteration %i %s(%s...)", self._iteration_count, self.rpc, self.key.encode('hex')[:8])
|
||||
call, cancel = self.node.reactor_callLater(1, self._search_iteration_semaphore.run, self._searchIteration)
|
||||
call, cancel = self.node.reactor_callLater(delay, self._search_iteration_semaphore.run, self._searchIteration)
|
||||
self.pending_iteration_calls.append(cancel)
|
||||
|
||||
|
||||
def iterativeFind(node, shortlist, key, rpc):
|
||||
helper = _IterativeFind(node, shortlist, key, rpc)
|
||||
helper.searchIteration()
|
||||
helper.searchIteration(0)
|
||||
return helper.finished_deferred
|
||||
|
|
Loading…
Reference in a new issue