diff --git a/lbrynet/core/call_later_manager.py b/lbrynet/core/call_later_manager.py index d3f5d3c2d..2bf858a40 100644 --- a/lbrynet/core/call_later_manager.py +++ b/lbrynet/core/call_later_manager.py @@ -1,6 +1,27 @@ +import logging + +log = logging.getLogger() + +MIN_DELAY = 0.0 +MAX_DELAY = 0.01 +DELAY_INCREMENT = 0.0001 +QUEUE_SIZE_THRESHOLD = 100 + + class CallLaterManager(object): _callLater = None _pendingCallLaters = [] + _delay = MIN_DELAY + + @classmethod + def get_min_delay(cls): + cls._pendingCallLaters = [cl for cl in cls._pendingCallLaters if cl.active()] + queue_size = len(cls._pendingCallLaters) + if queue_size > QUEUE_SIZE_THRESHOLD: + cls._delay = min((cls._delay + DELAY_INCREMENT), MAX_DELAY) + else: + cls._delay = max((cls._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY) + return cls._delay @classmethod def _cancel(cls, call_later): @@ -53,6 +74,11 @@ class CallLaterManager(object): cls._pendingCallLaters.append(call_later) return call_later, canceller + @classmethod + def call_soon(cls, what, *args, **kwargs): + delay = cls.get_min_delay() + return cls.call_later(delay, what, *args, **kwargs) + @classmethod def setup(cls, callLater): """ diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 4efb16605..df39a6137 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -227,7 +227,6 @@ class Daemon(AuthJSONRPCServer): @defer.inlineCallbacks def setup(self): reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown) - configure_loggly_handler() log.info("Starting lbrynet-daemon") @@ -412,7 +411,6 @@ class Daemon(AuthJSONRPCServer): log.info("Status at time of shutdown: " + self.startup_status[0]) self._stop_streams() - self.looping_call_manager.shutdown() if self.analytics_manager: self.analytics_manager.shutdown() diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 9ad105c1e..7e088b152 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -94,6 +94,7 @@ class Node(object): self.reactor_resolve = resolve self.reactor_listenUDP = listenUDP self.reactor_callLater = CallLaterManager.call_later + self.reactor_callSoon = CallLaterManager.call_soon self.node_id = node_id or self._generateID() self.port = udpPort self._listeningPort = None # object implementing Twisted diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index f9e68b515..e1ca25d15 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -108,7 +108,8 @@ class KademliaProtocol(protocol.DatagramProtocol): message = self._translator.fromPrimitive(msgPrimitive) except (encoding.DecodeError, ValueError) as err: # We received some rubbish here - log.exception("Error decoding datagram from %s:%i - %s", address[0], address[1], err) + log.warning("Error decoding datagram %s from %s:%i - %s", datagram.encode('hex'), + address[0], address[1], err) return except (IndexError, KeyError): log.warning("Couldn't decode dht datagram from %s", address) @@ -206,7 +207,7 @@ class KademliaProtocol(protocol.DatagramProtocol): def _scheduleSendNext(self, txData, address): """Schedule the sending of the next UDP packet """ - delayed_call, _ = self._node.reactor_callLater(0, self._write, txData, address) + delayed_call, _ = self._node.reactor_callSoon(self._write, txData, address) def _write(self, txData, address): if self.transport: