forked from LBRYCommunity/lbry-sdk
add CallLaterManager
This commit is contained in:
parent
88970cb0a8
commit
5628d0825b
5 changed files with 80 additions and 31 deletions
63
lbrynet/core/call_later_manager.py
Normal file
63
lbrynet/core/call_later_manager.py
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
class CallLaterManager(object):
|
||||||
|
_callLater = None
|
||||||
|
_pendingCallLaters = []
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _cancel(cls, call_later):
|
||||||
|
"""
|
||||||
|
:param call_later: DelayedCall
|
||||||
|
:return: (callable) canceller function
|
||||||
|
"""
|
||||||
|
|
||||||
|
def cancel(reason=None):
|
||||||
|
"""
|
||||||
|
:param reason: reason for cancellation, this is returned after cancelling the DelayedCall
|
||||||
|
:return: reason
|
||||||
|
"""
|
||||||
|
|
||||||
|
if call_later.active():
|
||||||
|
call_later.cancel()
|
||||||
|
cls._pendingCallLaters.remove(call_later)
|
||||||
|
return reason
|
||||||
|
return cancel
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def stop(cls):
|
||||||
|
"""
|
||||||
|
Cancel any callLaters that are still running
|
||||||
|
"""
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
while cls._pendingCallLaters:
|
||||||
|
canceller = cls._cancel(cls._pendingCallLaters[0])
|
||||||
|
try:
|
||||||
|
canceller()
|
||||||
|
except (defer.CancelledError, defer.AlreadyCalledError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def call_later(cls, when, what, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Schedule a call later and get a canceller callback function
|
||||||
|
|
||||||
|
:param when: (float) delay in seconds
|
||||||
|
:param what: (callable)
|
||||||
|
:param args: (*tuple) args to be passed to the callable
|
||||||
|
:param kwargs: (**dict) kwargs to be passed to the callable
|
||||||
|
|
||||||
|
:return: (tuple) twisted.internet.base.DelayedCall object, canceller function
|
||||||
|
"""
|
||||||
|
|
||||||
|
call_later = cls._callLater(when, what, *args, **kwargs)
|
||||||
|
canceller = cls._cancel(call_later)
|
||||||
|
cls._pendingCallLaters.append(call_later)
|
||||||
|
return call_later, canceller
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setup(cls, callLater):
|
||||||
|
"""
|
||||||
|
Setup the callLater function to use, supports the real reactor as well as task.Clock
|
||||||
|
|
||||||
|
:param callLater: (IReactorTime.callLater)
|
||||||
|
"""
|
||||||
|
cls._callLater = callLater
|
|
@ -1,7 +1,6 @@
|
||||||
import binascii
|
import binascii
|
||||||
import collections
|
import collections
|
||||||
import logging
|
import logging
|
||||||
import time
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
from twisted.internet import defer, task
|
from twisted.internet import defer, task
|
||||||
|
|
|
@ -15,6 +15,7 @@ import logging
|
||||||
from twisted.internet import defer, error, task
|
from twisted.internet import defer, error, task
|
||||||
|
|
||||||
from lbrynet.core.utils import generate_id
|
from lbrynet.core.utils import generate_id
|
||||||
|
from lbrynet.core.call_later_manager import CallLaterManager
|
||||||
from lbrynet.core.PeerManager import PeerManager
|
from lbrynet.core.PeerManager import PeerManager
|
||||||
|
|
||||||
import constants
|
import constants
|
||||||
|
@ -89,10 +90,11 @@ class Node(object):
|
||||||
resolve = resolve or reactor.resolve
|
resolve = resolve or reactor.resolve
|
||||||
callLater = callLater or reactor.callLater
|
callLater = callLater or reactor.callLater
|
||||||
clock = clock or reactor
|
clock = clock or reactor
|
||||||
|
self.clock = clock
|
||||||
|
CallLaterManager.setup(callLater)
|
||||||
self.reactor_resolve = resolve
|
self.reactor_resolve = resolve
|
||||||
self.reactor_listenUDP = listenUDP
|
self.reactor_listenUDP = listenUDP
|
||||||
self.reactor_callLater = callLater
|
self.reactor_callLater = CallLaterManager.call_later
|
||||||
self.clock = clock
|
|
||||||
self.node_id = node_id or self._generateID()
|
self.node_id = node_id or self._generateID()
|
||||||
self.port = udpPort
|
self.port = udpPort
|
||||||
self._listeningPort = None # object implementing Twisted
|
self._listeningPort = None # object implementing Twisted
|
||||||
|
@ -856,7 +858,7 @@ class _IterativeFindHelper(object):
|
||||||
if self._should_lookup_active_calls():
|
if self._should_lookup_active_calls():
|
||||||
# Schedule the next iteration if there are any active
|
# Schedule the next iteration if there are any active
|
||||||
# calls (Kademlia uses loose parallelism)
|
# calls (Kademlia uses loose parallelism)
|
||||||
call = self.node.reactor_callLater(constants.iterativeLookupDelay, self.searchIteration)
|
call, _ = self.node.reactor_callLater(constants.iterativeLookupDelay, self.searchIteration)
|
||||||
self.pending_iteration_calls.append(call)
|
self.pending_iteration_calls.append(call)
|
||||||
# Check for a quick contact response that made an update to the shortList
|
# Check for a quick contact response that made an update to the shortList
|
||||||
elif prevShortlistLength < len(self.shortlist):
|
elif prevShortlistLength < len(self.shortlist):
|
||||||
|
|
|
@ -3,7 +3,8 @@ import time
|
||||||
import socket
|
import socket
|
||||||
import errno
|
import errno
|
||||||
|
|
||||||
from twisted.internet import protocol, defer, error, task
|
from twisted.internet import protocol, defer, task
|
||||||
|
from lbrynet.core.call_later_manager import CallLaterManager
|
||||||
|
|
||||||
import constants
|
import constants
|
||||||
import encoding
|
import encoding
|
||||||
|
@ -169,17 +170,11 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
df._rpcRawResponse = True
|
df._rpcRawResponse = True
|
||||||
|
|
||||||
# Set the RPC timeout timer
|
# Set the RPC timeout timer
|
||||||
timeoutCall = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, msg.id)
|
timeoutCall, cancelTimeout = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, msg.id)
|
||||||
# Transmit the data
|
# Transmit the data
|
||||||
self._send(encodedMsg, msg.id, (contact.address, contact.port))
|
self._send(encodedMsg, msg.id, (contact.address, contact.port))
|
||||||
self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args)
|
self._sentMessages[msg.id] = (contact.id, df, timeoutCall, method, args)
|
||||||
|
df.addErrback(cancelTimeout)
|
||||||
def cancel(err):
|
|
||||||
if timeoutCall.cancelled or timeoutCall.called:
|
|
||||||
return err
|
|
||||||
timeoutCall.cancel()
|
|
||||||
|
|
||||||
df.addErrback(cancel)
|
|
||||||
return df
|
return df
|
||||||
|
|
||||||
def startProtocol(self):
|
def startProtocol(self):
|
||||||
|
@ -340,12 +335,9 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
"""Schedule the sending of the next UDP packet """
|
"""Schedule the sending of the next UDP packet """
|
||||||
delay = self._delay()
|
delay = self._delay()
|
||||||
key = object()
|
key = object()
|
||||||
delayed_call = self._node.reactor_callLater(delay, self._write_and_remove, key, txData, address)
|
delayed_call, _ = self._node.reactor_callLater(delay, self._write_and_remove, key, txData, address)
|
||||||
self._call_later_list[key] = delayed_call
|
|
||||||
|
|
||||||
def _write_and_remove(self, key, txData, address):
|
def _write_and_remove(self, key, txData, address):
|
||||||
if key in self._call_later_list:
|
|
||||||
del self._call_later_list[key]
|
|
||||||
if self.transport:
|
if self.transport:
|
||||||
try:
|
try:
|
||||||
self.transport.write(txData, address)
|
self.transport.write(txData, address)
|
||||||
|
@ -440,7 +432,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
# See if any progress has been made; if not, kill the message
|
# See if any progress has been made; if not, kill the message
|
||||||
if self._hasProgressBeenMade(messageID):
|
if self._hasProgressBeenMade(messageID):
|
||||||
# Reset the RPC timeout timer
|
# Reset the RPC timeout timer
|
||||||
timeoutCall = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, messageID)
|
timeoutCall, _ = self._node.reactor_callLater(constants.rpcTimeout, self._msgTimeout, messageID)
|
||||||
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args)
|
self._sentMessages[messageID] = (remoteContactID, df, timeoutCall, method, args)
|
||||||
else:
|
else:
|
||||||
# No progress has been made
|
# No progress has been made
|
||||||
|
@ -469,15 +461,6 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
if self._bandwidth_stats_update_lc.running:
|
if self._bandwidth_stats_update_lc.running:
|
||||||
self._bandwidth_stats_update_lc.stop()
|
self._bandwidth_stats_update_lc.stop()
|
||||||
|
|
||||||
for delayed_call in self._call_later_list.values():
|
CallLaterManager.stop()
|
||||||
try:
|
|
||||||
delayed_call.cancel()
|
|
||||||
except (error.AlreadyCalled, error.AlreadyCancelled):
|
|
||||||
log.debug('Attempted to cancel a DelayedCall that was not active')
|
|
||||||
except Exception:
|
|
||||||
log.exception('Failed to cancel a DelayedCall')
|
|
||||||
# not sure why this is needed, but taking this out sometimes causes
|
|
||||||
# exceptions.AttributeError: 'Port' object has no attribute 'socket'
|
|
||||||
# to happen on shutdown
|
|
||||||
# reactor.iterate()
|
|
||||||
log.info('DHT stopped')
|
log.info('DHT stopped')
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import time
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
from twisted.internet.task import Clock
|
from twisted.internet.task import Clock
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer, threads
|
||||||
import lbrynet.dht.protocol
|
import lbrynet.dht.protocol
|
||||||
import lbrynet.dht.contact
|
import lbrynet.dht.contact
|
||||||
import lbrynet.dht.constants
|
import lbrynet.dht.constants
|
||||||
|
@ -9,6 +9,7 @@ import lbrynet.dht.msgtypes
|
||||||
from lbrynet.dht.error import TimeoutError
|
from lbrynet.dht.error import TimeoutError
|
||||||
from lbrynet.dht.node import Node, rpcmethod
|
from lbrynet.dht.node import Node, rpcmethod
|
||||||
from lbrynet.tests.mocks import listenUDP, resolve
|
from lbrynet.tests.mocks import listenUDP, resolve
|
||||||
|
from lbrynet.core.call_later_manager import CallLaterManager
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
@ -22,10 +23,12 @@ class KademliaProtocolTest(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self._reactor = Clock()
|
self._reactor = Clock()
|
||||||
|
CallLaterManager.setup(self._reactor.callLater)
|
||||||
self.node = Node(node_id='1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP,
|
self.node = Node(node_id='1' * 48, udpPort=self.udpPort, externalIP="127.0.0.1", listenUDP=listenUDP,
|
||||||
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
|
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
CallLaterManager.stop()
|
||||||
del self._reactor
|
del self._reactor
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -40,7 +43,6 @@ class KademliaProtocolTest(unittest.TestCase):
|
||||||
|
|
||||||
def testRPCTimeout(self):
|
def testRPCTimeout(self):
|
||||||
""" Tests if a RPC message sent to a dead remote node times out correctly """
|
""" Tests if a RPC message sent to a dead remote node times out correctly """
|
||||||
|
|
||||||
dead_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
|
dead_node = Node(node_id='2' * 48, udpPort=self.udpPort, externalIP="127.0.0.2", listenUDP=listenUDP,
|
||||||
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
|
resolve=resolve, clock=self._reactor, callLater=self._reactor.callLater)
|
||||||
dead_node.start_listening()
|
dead_node.start_listening()
|
||||||
|
|
Loading…
Reference in a new issue