diff --git a/lbrynet/core/call_later_manager.py b/lbrynet/core/call_later_manager.py index de7395322..d82b456ee 100644 --- a/lbrynet/core/call_later_manager.py +++ b/lbrynet/core/call_later_manager.py @@ -9,22 +9,25 @@ QUEUE_SIZE_THRESHOLD = 100 class CallLaterManager(object): - _callLater = None - _pendingCallLaters = [] - _delay = MIN_DELAY + def __init__(self, callLater): + """ + :param callLater: (IReactorTime.callLater) + """ - @classmethod - def get_min_delay(cls): - cls._pendingCallLaters = [cl for cl in cls._pendingCallLaters if cl.active()] - queue_size = len(cls._pendingCallLaters) + self._callLater = callLater + self._pendingCallLaters = [] + self._delay = MIN_DELAY + + def get_min_delay(self): + self._pendingCallLaters = [cl for cl in self._pendingCallLaters if cl.active()] + queue_size = len(self._pendingCallLaters) if queue_size > QUEUE_SIZE_THRESHOLD: - cls._delay = min((cls._delay + DELAY_INCREMENT), MAX_DELAY) + self._delay = min((self._delay + DELAY_INCREMENT), MAX_DELAY) else: - cls._delay = max((cls._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY) - return cls._delay + self._delay = max((self._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY) + return self._delay - @classmethod - def _cancel(cls, call_later): + def _cancel(self, call_later): """ :param call_later: DelayedCall :return: (callable) canceller function @@ -38,27 +41,25 @@ class CallLaterManager(object): if call_later.active(): call_later.cancel() - if call_later in cls._pendingCallLaters: - cls._pendingCallLaters.remove(call_later) + if call_later in self._pendingCallLaters: + self._pendingCallLaters.remove(call_later) return reason return cancel - @classmethod - def stop(cls): + def stop(self): """ Cancel any callLaters that are still running """ from twisted.internet import defer - while cls._pendingCallLaters: - canceller = cls._cancel(cls._pendingCallLaters[0]) + while self._pendingCallLaters: + canceller = self._cancel(self._pendingCallLaters[0]) try: canceller() except (defer.CancelledError, defer.AlreadyCalledError, ValueError): pass - @classmethod - def call_later(cls, when, what, *args, **kwargs): + def call_later(self, when, what, *args, **kwargs): """ Schedule a call later and get a canceller callback function @@ -70,21 +71,11 @@ class CallLaterManager(object): :return: (tuple) twisted.internet.base.DelayedCall object, canceller function """ - call_later = cls._callLater(when, what, *args, **kwargs) - canceller = cls._cancel(call_later) - cls._pendingCallLaters.append(call_later) + call_later = self._callLater(when, what, *args, **kwargs) + canceller = self._cancel(call_later) + self._pendingCallLaters.append(call_later) return call_later, canceller - @classmethod - def call_soon(cls, what, *args, **kwargs): - delay = cls.get_min_delay() - return cls.call_later(delay, what, *args, **kwargs) - - @classmethod - def setup(cls, callLater): - """ - Setup the callLater function to use, supports the real reactor as well as task.Clock - - :param callLater: (IReactorTime.callLater) - """ - cls._callLater = callLater + def call_soon(self, what, *args, **kwargs): + delay = self.get_min_delay() + return self.call_later(delay, what, *args, **kwargs) diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 4286e69ac..1a6544ab1 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -53,10 +53,9 @@ class MockKademliaHelper(object): self.contact_manager = ContactManager(self.clock.seconds) self.reactor_listenUDP = listenUDP self.reactor_resolve = resolve - - CallLaterManager.setup(callLater) - self.reactor_callLater = CallLaterManager.call_later - self.reactor_callSoon = CallLaterManager.call_soon + self.call_later_manager = CallLaterManager(callLater) + self.reactor_callLater = self.call_later_manager.call_later + self.reactor_callSoon = self.call_later_manager.call_soon self._listeningPort = None # object implementing Twisted # IListeningPort This will contain a deferred created when diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 536315c44..9bd4a6b46 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -4,7 +4,6 @@ import errno from collections import deque from twisted.internet import protocol, defer -from lbrynet.core.call_later_manager import CallLaterManager from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError, TransportNotConnected import constants @@ -461,5 +460,5 @@ class KademliaProtocol(protocol.DatagramProtocol): """ log.info('Stopping DHT') self._ping_queue.stop() - CallLaterManager.stop() + self._node.call_later_manager.stop() log.info('DHT stopped')