From ea6b2b98fb3d93f0bbee2001a5c358b1f84839d4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 29 Sep 2018 23:53:40 -0300 Subject: [PATCH] refactor ping queue --- lbrynet/dht/node.py | 4 +- lbrynet/dht/protocol.py | 67 +++++++------------- tests/functional/dht/dht_test_environment.py | 4 +- 3 files changed, 26 insertions(+), 49 deletions(-) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 096e46b91..a085ed1b2 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -644,10 +644,10 @@ class Node(MockKademliaHelper): defer.returnValue(None) def _refreshContacts(self): - self._protocol._ping_queue.enqueue_maybe_ping(*self.contacts, delay=0) + self._protocol._ping_queue.enqueue_maybe_ping(*self.contacts) def _refreshStoringPeers(self): - self._protocol._ping_queue.enqueue_maybe_ping(*self._dataStore.getStoringContacts(), delay=0) + self._protocol._ping_queue.enqueue_maybe_ping(*self._dataStore.getStoringContacts()) @defer.inlineCallbacks def _refreshRoutingTable(self): diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 3538269b6..0bfdbf953 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -1,7 +1,6 @@ import logging import errno from binascii import hexlify -from collections import deque from twisted.internet import protocol, defer from .error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError, TransportNotConnected @@ -22,58 +21,36 @@ class PingQueue: def __init__(self, node): self._node = node - self._get_time = self._node.clock.seconds - self._queue = deque() self._enqueued_contacts = {} self._process_lc = node.get_looping_call(self._process) def enqueue_maybe_ping(self, *contacts, **kwargs): - schedule = self._get_time() + (kwargs.get('delay', constants.checkRefreshInterval)) + no_op = (defer.succeed(None), lambda: None) for contact in contacts: - if contact not in self._enqueued_contacts: - self._enqueued_contacts[contact] = schedule - self._queue.append(contact) + self._enqueued_contacts.setdefault(contact, no_op) + + @defer.inlineCallbacks + def _ping(self, contact): + if contact.contact_is_good: + return + try: + yield contact.ping() + except TimeoutError: + pass + except Exception as err: + log.warning("unexpected error: %s", err) + finally: + if contact in self._enqueued_contacts: + del self._enqueued_contacts[contact] def _process(self): - if not len(self._queue): + if not self._enqueued_contacts: return - contact = self._queue.popleft() - now = self._get_time() - - # if the oldest contact in the queue isn't old enough to be pinged, add it back to the queue and return - if now < self._enqueued_contacts[contact]: - self._queue.appendleft(contact) - return - - pinged = [] - checked = [] - while now > self._enqueued_contacts[contact]: - checked.append(contact) - if not contact.contact_is_good: - pinged.append(contact) - if not len(self._queue): - break - contact = self._queue.popleft() - if not now > self._enqueued_contacts[contact]: - checked.append(contact) - - @defer.inlineCallbacks - def _ping(contact): - try: - yield contact.ping() - except TimeoutError: - pass - except Exception as err: - log.warning("unexpected error: %s", err) - - d = defer.DeferredList([_ping(contact) for contact in pinged]) - - for contact in checked: - if contact in self._enqueued_contacts and contact in pinged: - del self._enqueued_contacts[contact] - elif contact not in self._queue: - self._queue.appendleft(contact) - return d + # spread pings across 60 seconds to avoid flood and/or false negatives + step = 60.0/float(len(self._enqueued_contacts)) + for index, (contact, (call, _)) in enumerate(self._enqueued_contacts.items()): + if call.called and not contact.contact_is_good: + self._enqueued_contacts[contact] = self._node.reactor_callLater(index*step, self._ping, contact) def start(self): return self._node.safe_start_looping_call(self._process_lc, 60) diff --git a/tests/functional/dht/dht_test_environment.py b/tests/functional/dht/dht_test_environment.py index 8c431c5ec..7dd138dc0 100644 --- a/tests/functional/dht/dht_test_environment.py +++ b/tests/functional/dht/dht_test_environment.py @@ -47,8 +47,8 @@ class TestKademliaBase(unittest.TestCase): :param step: reactor tick rate (in seconds) """ advanced = 0.0 + self.clock._sortCalls() while advanced < n: - self.clock._sortCalls() if step: next_step = step elif self.clock.getDelayedCalls(): @@ -120,7 +120,7 @@ class TestKademliaBase(unittest.TestCase): seed_dl = [] seeds = sorted(list(self.seed_dns.keys())) known_addresses = [(seed_name, 4444) for seed_name in seeds] - for seed_dns in seeds: + for _ in range(len(seeds)): self._add_next_node() seed = self.nodes.pop() self._seeds.append(seed)