From 4294bf742d1176eee8d9794028d77bc022448180 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 21 Feb 2019 12:35:22 -0500 Subject: [PATCH] simplify/fix ping queue --- lbrynet/dht/constants.py | 1 + lbrynet/dht/protocol/protocol.py | 66 +++++++++++++------------------- 2 files changed, 27 insertions(+), 40 deletions(-) diff --git a/lbrynet/dht/constants.py b/lbrynet/dht/constants.py index e52fdd377..c64720706 100644 --- a/lbrynet/dht/constants.py +++ b/lbrynet/dht/constants.py @@ -16,6 +16,7 @@ refresh_interval = 3600 # 1 hour replicate_interval = refresh_interval data_expiration = 86400 # 24 hours token_secret_refresh_interval = 300 # 5 minutes +maybe_ping_delay = 300 # 5 minutes check_refresh_interval = refresh_interval / 5 max_datagram_size = 8192 # 8 KB rpc_id_length = 20 diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index cedf5882a..0c95be813 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -187,56 +187,46 @@ class PingQueue: def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'): self._loop = loop self._protocol = protocol - self._enqueued_contacts: typing.List['KademliaPeer'] = [] self._pending_contacts: typing.Dict['KademliaPeer', float] = {} self._process_task: asyncio.Task = None - self._next_task: asyncio.Future = None - self._next_timer: asyncio.TimerHandle = None self._running = False + self._running_pings: typing.List[asyncio.Task] = [] @property def running(self): return self._running - def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): - delay = constants.check_refresh_interval if delay is None else delay + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: float = constants.maybe_ping_delay): + now = self._loop.time() for peer in peers: - if delay and peer not in self._enqueued_contacts: - self._pending_contacts[peer] = self._loop.time() + delay - elif peer not in self._enqueued_contacts: - self._enqueued_contacts.append(peer) - if peer in self._pending_contacts: - del self._pending_contacts[peer] + if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]: + self._pending_contacts[peer] = delay + now - async def _process(self): - async def _ping(p: 'KademliaPeer'): + def maybe_ping(self, peer: 'KademliaPeer'): + async def ping_task(): try: - if self._protocol.peer_manager.peer_is_good(p): - await self._protocol.add_peer(p) + if self._protocol.peer_manager.peer_is_good(peer): + if peer not in self._protocol.routing_table.get_peers(): + await self._protocol.add_peer(peer) return - await self._protocol.get_rpc_peer(p).ping() + await self._protocol.get_rpc_peer(peer).ping() except asyncio.TimeoutError: pass + task = self._loop.create_task(ping_task()) + task.add_done_callback(lambda _: None if task not in self._running_pings else self._running_pings.remove(task)) + self._running_pings.append(task) + + async def _process(self): # send up to 1 ping per second while True: - tasks = [] - - if self._enqueued_contacts or self._pending_contacts: - now = self._loop.time() - scheduled = [k for k, d in self._pending_contacts.items() if now >= d] - for k in scheduled: - del self._pending_contacts[k] - if k not in self._enqueued_contacts: - self._enqueued_contacts.append(k) - while self._enqueued_contacts: - peer = self._enqueued_contacts.pop() - tasks.append(self._loop.create_task(_ping(peer))) - if tasks: - await asyncio.wait(tasks, loop=self._loop) - - f = self._loop.create_future() - self._loop.call_later(1.0, lambda: None if f.done() else f.set_result(None)) - await f + enqueued = list(self._pending_contacts.keys()) + now = self._loop.time() + for peer in enqueued: + if self._pending_contacts[peer] <= now: + del self._pending_contacts[peer] + self.maybe_ping(peer) + break + await asyncio.sleep(1, loop=self._loop) def start(self): assert not self._running @@ -250,12 +240,8 @@ class PingQueue: if self._process_task: self._process_task.cancel() self._process_task = None - if self._next_task: - self._next_task.cancel() - self._next_task = None - if self._next_timer: - self._next_timer.cancel() - self._next_timer = None + while self._running_pings: + self._running_pings[0].cancel() class KademliaProtocol(DatagramProtocol):