update CallLaterManager to be an object
This commit is contained in:
parent
760417ff3a
commit
3dfc6bd2cc
3 changed files with 31 additions and 42 deletions
|
@ -9,22 +9,25 @@ QUEUE_SIZE_THRESHOLD = 100
|
||||||
|
|
||||||
|
|
||||||
class CallLaterManager(object):
|
class CallLaterManager(object):
|
||||||
_callLater = None
|
def __init__(self, callLater):
|
||||||
_pendingCallLaters = []
|
"""
|
||||||
_delay = MIN_DELAY
|
:param callLater: (IReactorTime.callLater)
|
||||||
|
"""
|
||||||
|
|
||||||
@classmethod
|
self._callLater = callLater
|
||||||
def get_min_delay(cls):
|
self._pendingCallLaters = []
|
||||||
cls._pendingCallLaters = [cl for cl in cls._pendingCallLaters if cl.active()]
|
self._delay = MIN_DELAY
|
||||||
queue_size = len(cls._pendingCallLaters)
|
|
||||||
|
def get_min_delay(self):
|
||||||
|
self._pendingCallLaters = [cl for cl in self._pendingCallLaters if cl.active()]
|
||||||
|
queue_size = len(self._pendingCallLaters)
|
||||||
if queue_size > QUEUE_SIZE_THRESHOLD:
|
if queue_size > QUEUE_SIZE_THRESHOLD:
|
||||||
cls._delay = min((cls._delay + DELAY_INCREMENT), MAX_DELAY)
|
self._delay = min((self._delay + DELAY_INCREMENT), MAX_DELAY)
|
||||||
else:
|
else:
|
||||||
cls._delay = max((cls._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY)
|
self._delay = max((self._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY)
|
||||||
return cls._delay
|
return self._delay
|
||||||
|
|
||||||
@classmethod
|
def _cancel(self, call_later):
|
||||||
def _cancel(cls, call_later):
|
|
||||||
"""
|
"""
|
||||||
:param call_later: DelayedCall
|
:param call_later: DelayedCall
|
||||||
:return: (callable) canceller function
|
:return: (callable) canceller function
|
||||||
|
@ -38,27 +41,25 @@ class CallLaterManager(object):
|
||||||
|
|
||||||
if call_later.active():
|
if call_later.active():
|
||||||
call_later.cancel()
|
call_later.cancel()
|
||||||
if call_later in cls._pendingCallLaters:
|
if call_later in self._pendingCallLaters:
|
||||||
cls._pendingCallLaters.remove(call_later)
|
self._pendingCallLaters.remove(call_later)
|
||||||
return reason
|
return reason
|
||||||
return cancel
|
return cancel
|
||||||
|
|
||||||
@classmethod
|
def stop(self):
|
||||||
def stop(cls):
|
|
||||||
"""
|
"""
|
||||||
Cancel any callLaters that are still running
|
Cancel any callLaters that are still running
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
while cls._pendingCallLaters:
|
while self._pendingCallLaters:
|
||||||
canceller = cls._cancel(cls._pendingCallLaters[0])
|
canceller = self._cancel(self._pendingCallLaters[0])
|
||||||
try:
|
try:
|
||||||
canceller()
|
canceller()
|
||||||
except (defer.CancelledError, defer.AlreadyCalledError, ValueError):
|
except (defer.CancelledError, defer.AlreadyCalledError, ValueError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@classmethod
|
def call_later(self, when, what, *args, **kwargs):
|
||||||
def call_later(cls, when, what, *args, **kwargs):
|
|
||||||
"""
|
"""
|
||||||
Schedule a call later and get a canceller callback function
|
Schedule a call later and get a canceller callback function
|
||||||
|
|
||||||
|
@ -70,21 +71,11 @@ class CallLaterManager(object):
|
||||||
:return: (tuple) twisted.internet.base.DelayedCall object, canceller function
|
:return: (tuple) twisted.internet.base.DelayedCall object, canceller function
|
||||||
"""
|
"""
|
||||||
|
|
||||||
call_later = cls._callLater(when, what, *args, **kwargs)
|
call_later = self._callLater(when, what, *args, **kwargs)
|
||||||
canceller = cls._cancel(call_later)
|
canceller = self._cancel(call_later)
|
||||||
cls._pendingCallLaters.append(call_later)
|
self._pendingCallLaters.append(call_later)
|
||||||
return call_later, canceller
|
return call_later, canceller
|
||||||
|
|
||||||
@classmethod
|
def call_soon(self, what, *args, **kwargs):
|
||||||
def call_soon(cls, what, *args, **kwargs):
|
delay = self.get_min_delay()
|
||||||
delay = cls.get_min_delay()
|
return self.call_later(delay, what, *args, **kwargs)
|
||||||
return cls.call_later(delay, what, *args, **kwargs)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def setup(cls, callLater):
|
|
||||||
"""
|
|
||||||
Setup the callLater function to use, supports the real reactor as well as task.Clock
|
|
||||||
|
|
||||||
:param callLater: (IReactorTime.callLater)
|
|
||||||
"""
|
|
||||||
cls._callLater = callLater
|
|
||||||
|
|
|
@ -53,10 +53,9 @@ class MockKademliaHelper(object):
|
||||||
self.contact_manager = ContactManager(self.clock.seconds)
|
self.contact_manager = ContactManager(self.clock.seconds)
|
||||||
self.reactor_listenUDP = listenUDP
|
self.reactor_listenUDP = listenUDP
|
||||||
self.reactor_resolve = resolve
|
self.reactor_resolve = resolve
|
||||||
|
self.call_later_manager = CallLaterManager(callLater)
|
||||||
CallLaterManager.setup(callLater)
|
self.reactor_callLater = self.call_later_manager.call_later
|
||||||
self.reactor_callLater = CallLaterManager.call_later
|
self.reactor_callSoon = self.call_later_manager.call_soon
|
||||||
self.reactor_callSoon = CallLaterManager.call_soon
|
|
||||||
|
|
||||||
self._listeningPort = None # object implementing Twisted
|
self._listeningPort = None # object implementing Twisted
|
||||||
# IListeningPort This will contain a deferred created when
|
# IListeningPort This will contain a deferred created when
|
||||||
|
|
|
@ -4,7 +4,6 @@ import errno
|
||||||
from collections import deque
|
from collections import deque
|
||||||
|
|
||||||
from twisted.internet import protocol, defer
|
from twisted.internet import protocol, defer
|
||||||
from lbrynet.core.call_later_manager import CallLaterManager
|
|
||||||
from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError, TransportNotConnected
|
from error import BUILTIN_EXCEPTIONS, UnknownRemoteException, TimeoutError, TransportNotConnected
|
||||||
|
|
||||||
import constants
|
import constants
|
||||||
|
@ -461,5 +460,5 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
"""
|
"""
|
||||||
log.info('Stopping DHT')
|
log.info('Stopping DHT')
|
||||||
self._ping_queue.stop()
|
self._ping_queue.stop()
|
||||||
CallLaterManager.stop()
|
self._node.call_later_manager.stop()
|
||||||
log.info('DHT stopped')
|
log.info('DHT stopped')
|
||||||
|
|
Loading…
Add table
Reference in a new issue