diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 365360339..829c02e9b 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -28,12 +28,12 @@ class PingQueue(object): self._semaphore = defer.DeferredSemaphore(1) self._ping_semaphore = defer.DeferredSemaphore(constants.alpha) self._process_lc = node.get_looping_call(self._semaphore.run, self._process) - self._delay = 300 - def _add_contact(self, contact): + def _add_contact(self, contact, delay=None): if contact in self._enqueued_contacts: return defer.succeed(None) - self._enqueued_contacts[contact] = self._get_time() + self._delay + delay = delay or constants.checkRefreshInterval + self._enqueued_contacts[contact] = self._get_time() + delay self._queue.append(contact) return defer.succeed(None) @@ -49,11 +49,6 @@ class PingQueue(object): self._queue.appendleft(contact) defer.returnValue(None) - def _ping(contact): - d = contact.ping() - d.addErrback(lambda err: err.trap(TimeoutError)) - return d - pinged = [] checked = [] while now > self._enqueued_contacts[contact]: @@ -65,9 +60,17 @@ class PingQueue(object): contact = self._queue.popleft() if not now > self._enqueued_contacts[contact]: checked.append(contact) - # log.info("ping %i/%i peers", len(pinged), len(checked)) - yield defer.DeferredList([self._ping_semaphore.run(_ping, contact) for contact in pinged]) + @defer.inlineCallbacks + def _ping(contact): + try: + yield contact.ping() + except TimeoutError: + pass + except Exception as err: + log.warning("unexpected error: %s", err) + + yield defer.DeferredList([_ping(contact) for contact in pinged]) for contact in checked: if contact in self._enqueued_contacts: @@ -81,8 +84,8 @@ class PingQueue(object): def stop(self): return self._node.safe_stop_looping_call(self._process_lc) - def enqueue_maybe_ping(self, contact): - return self._semaphore.run(self._add_contact, contact) + def enqueue_maybe_ping(self, contact, delay=None): + return self._semaphore.run(self._add_contact, contact, delay) class KademliaProtocol(protocol.DatagramProtocol):