restore delay on ping queue
This commit is contained in:
parent
ea6b2b98fb
commit
50003d2600
2 changed files with 16 additions and 6 deletions
|
@ -644,10 +644,10 @@ class Node(MockKademliaHelper):
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
def _refreshContacts(self):
|
def _refreshContacts(self):
|
||||||
self._protocol._ping_queue.enqueue_maybe_ping(*self.contacts)
|
self._protocol._ping_queue.enqueue_maybe_ping(*self.contacts, delay=0)
|
||||||
|
|
||||||
def _refreshStoringPeers(self):
|
def _refreshStoringPeers(self):
|
||||||
self._protocol._ping_queue.enqueue_maybe_ping(*self._dataStore.getStoringContacts())
|
self._protocol._ping_queue.enqueue_maybe_ping(*self._dataStore.getStoringContacts(), delay=0)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _refreshRoutingTable(self):
|
def _refreshRoutingTable(self):
|
||||||
|
|
|
@ -22,12 +22,17 @@ class PingQueue:
|
||||||
def __init__(self, node):
|
def __init__(self, node):
|
||||||
self._node = node
|
self._node = node
|
||||||
self._enqueued_contacts = {}
|
self._enqueued_contacts = {}
|
||||||
|
self._pending_contacts = {}
|
||||||
self._process_lc = node.get_looping_call(self._process)
|
self._process_lc = node.get_looping_call(self._process)
|
||||||
|
|
||||||
def enqueue_maybe_ping(self, *contacts, **kwargs):
|
def enqueue_maybe_ping(self, *contacts, **kwargs):
|
||||||
|
delay = kwargs.get('delay', constants.checkRefreshInterval)
|
||||||
no_op = (defer.succeed(None), lambda: None)
|
no_op = (defer.succeed(None), lambda: None)
|
||||||
for contact in contacts:
|
for contact in contacts:
|
||||||
self._enqueued_contacts.setdefault(contact, no_op)
|
if delay and contact not in self._enqueued_contacts:
|
||||||
|
self._pending_contacts.setdefault(contact, self._node.clock.seconds() + delay)
|
||||||
|
else:
|
||||||
|
self._enqueued_contacts.setdefault(contact, no_op)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _ping(self, contact):
|
def _ping(self, contact):
|
||||||
|
@ -44,10 +49,14 @@ class PingQueue:
|
||||||
del self._enqueued_contacts[contact]
|
del self._enqueued_contacts[contact]
|
||||||
|
|
||||||
def _process(self):
|
def _process(self):
|
||||||
if not self._enqueued_contacts:
|
# move contacts that are scheduled to join the queue
|
||||||
return
|
if self._pending_contacts:
|
||||||
|
now = self._node.clock.seconds()
|
||||||
|
for contact in [contact for contact, schedule in self._pending_contacts.items() if schedule <= now]:
|
||||||
|
del self._pending_contacts[contact]
|
||||||
|
self._enqueued_contacts.setdefault(contact, (defer.succeed(None), lambda: None))
|
||||||
# spread pings across 60 seconds to avoid flood and/or false negatives
|
# spread pings across 60 seconds to avoid flood and/or false negatives
|
||||||
step = 60.0/float(len(self._enqueued_contacts))
|
step = 60.0/float(len(self._enqueued_contacts)) if self._enqueued_contacts else 0
|
||||||
for index, (contact, (call, _)) in enumerate(self._enqueued_contacts.items()):
|
for index, (contact, (call, _)) in enumerate(self._enqueued_contacts.items()):
|
||||||
if call.called and not contact.contact_is_good:
|
if call.called and not contact.contact_is_good:
|
||||||
self._enqueued_contacts[contact] = self._node.reactor_callLater(index*step, self._ping, contact)
|
self._enqueued_contacts[contact] = self._node.reactor_callLater(index*step, self._ping, contact)
|
||||||
|
@ -56,6 +65,7 @@ class PingQueue:
|
||||||
return self._node.safe_start_looping_call(self._process_lc, 60)
|
return self._node.safe_start_looping_call(self._process_lc, 60)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
map(None, (cancel() for _, (call, cancel) in self._enqueued_contacts.items() if not call.called))
|
||||||
return self._node.safe_stop_looping_call(self._process_lc)
|
return self._node.safe_stop_looping_call(self._process_lc)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue