diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index a085ed1b2..096e46b91 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -644,10 +644,10 @@ class Node(MockKademliaHelper): defer.returnValue(None) def _refreshContacts(self): - self._protocol._ping_queue.enqueue_maybe_ping(*self.contacts) + self._protocol._ping_queue.enqueue_maybe_ping(*self.contacts, delay=0) def _refreshStoringPeers(self): - self._protocol._ping_queue.enqueue_maybe_ping(*self._dataStore.getStoringContacts()) + self._protocol._ping_queue.enqueue_maybe_ping(*self._dataStore.getStoringContacts(), delay=0) @defer.inlineCallbacks def _refreshRoutingTable(self): diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 0bfdbf953..6a9fd61fe 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -22,12 +22,17 @@ class PingQueue: def __init__(self, node): self._node = node self._enqueued_contacts = {} + self._pending_contacts = {} self._process_lc = node.get_looping_call(self._process) def enqueue_maybe_ping(self, *contacts, **kwargs): + delay = kwargs.get('delay', constants.checkRefreshInterval) no_op = (defer.succeed(None), lambda: None) for contact in contacts: - self._enqueued_contacts.setdefault(contact, no_op) + if delay and contact not in self._enqueued_contacts: + self._pending_contacts.setdefault(contact, self._node.clock.seconds() + delay) + else: + self._enqueued_contacts.setdefault(contact, no_op) @defer.inlineCallbacks def _ping(self, contact): @@ -44,10 +49,14 @@ class PingQueue: del self._enqueued_contacts[contact] def _process(self): - if not self._enqueued_contacts: - return + # move contacts that are scheduled to join the queue + if self._pending_contacts: + now = self._node.clock.seconds() + for contact in [contact for contact, schedule in self._pending_contacts.items() if schedule <= now]: + del self._pending_contacts[contact] + self._enqueued_contacts.setdefault(contact, (defer.succeed(None), lambda: None)) # spread pings across 60 seconds to avoid flood and/or false negatives - step = 60.0/float(len(self._enqueued_contacts)) + step = 60.0/float(len(self._enqueued_contacts)) if self._enqueued_contacts else 0 for index, (contact, (call, _)) in enumerate(self._enqueued_contacts.items()): if call.called and not contact.contact_is_good: self._enqueued_contacts[contact] = self._node.reactor_callLater(index*step, self._ping, contact) @@ -56,6 +65,7 @@ class PingQueue: return self._node.safe_start_looping_call(self._process_lc, 60) def stop(self): + map(None, (cancel() for _, (call, cancel) in self._enqueued_contacts.items() if not call.called)) return self._node.safe_stop_looping_call(self._process_lc)