simplify/fix ping queue
This commit is contained in:
parent
57ad9f1c52
commit
4294bf742d
2 changed files with 27 additions and 40 deletions
|
@ -16,6 +16,7 @@ refresh_interval = 3600 # 1 hour
|
||||||
replicate_interval = refresh_interval
|
replicate_interval = refresh_interval
|
||||||
data_expiration = 86400 # 24 hours
|
data_expiration = 86400 # 24 hours
|
||||||
token_secret_refresh_interval = 300 # 5 minutes
|
token_secret_refresh_interval = 300 # 5 minutes
|
||||||
|
maybe_ping_delay = 300 # 5 minutes
|
||||||
check_refresh_interval = refresh_interval / 5
|
check_refresh_interval = refresh_interval / 5
|
||||||
max_datagram_size = 8192 # 8 KB
|
max_datagram_size = 8192 # 8 KB
|
||||||
rpc_id_length = 20
|
rpc_id_length = 20
|
||||||
|
|
|
@ -187,56 +187,46 @@ class PingQueue:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'):
|
def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._protocol = protocol
|
self._protocol = protocol
|
||||||
self._enqueued_contacts: typing.List['KademliaPeer'] = []
|
|
||||||
self._pending_contacts: typing.Dict['KademliaPeer', float] = {}
|
self._pending_contacts: typing.Dict['KademliaPeer', float] = {}
|
||||||
self._process_task: asyncio.Task = None
|
self._process_task: asyncio.Task = None
|
||||||
self._next_task: asyncio.Future = None
|
|
||||||
self._next_timer: asyncio.TimerHandle = None
|
|
||||||
self._running = False
|
self._running = False
|
||||||
|
self._running_pings: typing.List[asyncio.Task] = []
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def running(self):
|
def running(self):
|
||||||
return self._running
|
return self._running
|
||||||
|
|
||||||
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
|
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: float = constants.maybe_ping_delay):
|
||||||
delay = constants.check_refresh_interval if delay is None else delay
|
now = self._loop.time()
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
if delay and peer not in self._enqueued_contacts:
|
if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]:
|
||||||
self._pending_contacts[peer] = self._loop.time() + delay
|
self._pending_contacts[peer] = delay + now
|
||||||
elif peer not in self._enqueued_contacts:
|
|
||||||
self._enqueued_contacts.append(peer)
|
|
||||||
if peer in self._pending_contacts:
|
|
||||||
del self._pending_contacts[peer]
|
|
||||||
|
|
||||||
async def _process(self):
|
def maybe_ping(self, peer: 'KademliaPeer'):
|
||||||
async def _ping(p: 'KademliaPeer'):
|
async def ping_task():
|
||||||
try:
|
try:
|
||||||
if self._protocol.peer_manager.peer_is_good(p):
|
if self._protocol.peer_manager.peer_is_good(peer):
|
||||||
await self._protocol.add_peer(p)
|
if peer not in self._protocol.routing_table.get_peers():
|
||||||
|
await self._protocol.add_peer(peer)
|
||||||
return
|
return
|
||||||
await self._protocol.get_rpc_peer(p).ping()
|
await self._protocol.get_rpc_peer(peer).ping()
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
task = self._loop.create_task(ping_task())
|
||||||
|
task.add_done_callback(lambda _: None if task not in self._running_pings else self._running_pings.remove(task))
|
||||||
|
self._running_pings.append(task)
|
||||||
|
|
||||||
|
async def _process(self): # send up to 1 ping per second
|
||||||
while True:
|
while True:
|
||||||
tasks = []
|
enqueued = list(self._pending_contacts.keys())
|
||||||
|
now = self._loop.time()
|
||||||
if self._enqueued_contacts or self._pending_contacts:
|
for peer in enqueued:
|
||||||
now = self._loop.time()
|
if self._pending_contacts[peer] <= now:
|
||||||
scheduled = [k for k, d in self._pending_contacts.items() if now >= d]
|
del self._pending_contacts[peer]
|
||||||
for k in scheduled:
|
self.maybe_ping(peer)
|
||||||
del self._pending_contacts[k]
|
break
|
||||||
if k not in self._enqueued_contacts:
|
await asyncio.sleep(1, loop=self._loop)
|
||||||
self._enqueued_contacts.append(k)
|
|
||||||
while self._enqueued_contacts:
|
|
||||||
peer = self._enqueued_contacts.pop()
|
|
||||||
tasks.append(self._loop.create_task(_ping(peer)))
|
|
||||||
if tasks:
|
|
||||||
await asyncio.wait(tasks, loop=self._loop)
|
|
||||||
|
|
||||||
f = self._loop.create_future()
|
|
||||||
self._loop.call_later(1.0, lambda: None if f.done() else f.set_result(None))
|
|
||||||
await f
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
assert not self._running
|
assert not self._running
|
||||||
|
@ -250,12 +240,8 @@ class PingQueue:
|
||||||
if self._process_task:
|
if self._process_task:
|
||||||
self._process_task.cancel()
|
self._process_task.cancel()
|
||||||
self._process_task = None
|
self._process_task = None
|
||||||
if self._next_task:
|
while self._running_pings:
|
||||||
self._next_task.cancel()
|
self._running_pings[0].cancel()
|
||||||
self._next_task = None
|
|
||||||
if self._next_timer:
|
|
||||||
self._next_timer.cancel()
|
|
||||||
self._next_timer = None
|
|
||||||
|
|
||||||
|
|
||||||
class KademliaProtocol(DatagramProtocol):
|
class KademliaProtocol(DatagramProtocol):
|
||||||
|
|
Loading…
Add table
Reference in a new issue