forked from LBRYCommunity/lbry-sdk
Merge pull request #345 from lbryio/better-dht-shutdown
Better dht shutdown.
This commit is contained in:
commit
aa3353ae49
1 changed files with 46 additions and 30 deletions
|
@ -13,6 +13,7 @@ import time
|
||||||
|
|
||||||
from twisted.internet import protocol, defer
|
from twisted.internet import protocol, defer
|
||||||
from twisted.python import failure
|
from twisted.python import failure
|
||||||
|
from twisted.internet import error
|
||||||
import twisted.internet.reactor
|
import twisted.internet.reactor
|
||||||
|
|
||||||
import constants
|
import constants
|
||||||
|
@ -36,11 +37,31 @@ class TimeoutError(Exception):
|
||||||
self.remote_contact_id = remote_contact_id
|
self.remote_contact_id = remote_contact_id
|
||||||
|
|
||||||
|
|
||||||
|
class Delay(object):
|
||||||
|
maxToSendDelay = 10**-3 #0.05
|
||||||
|
minToSendDelay = 10**-5 #0.01
|
||||||
|
|
||||||
|
def __init__(self, start=0):
|
||||||
|
self._next = start
|
||||||
|
|
||||||
|
# TODO: explain why this logic is like it is. And add tests that
|
||||||
|
# show that it actually does what it needs to do.
|
||||||
|
def __call__(self):
|
||||||
|
ts = time.time()
|
||||||
|
delay = 0
|
||||||
|
if ts >= self._next:
|
||||||
|
delay = self.minToSendDelay
|
||||||
|
self._next = ts + self.minToSendDelay
|
||||||
|
else:
|
||||||
|
delay = (self._next - ts) + self.maxToSendDelay
|
||||||
|
self._next += self.maxToSendDelay
|
||||||
|
return delay
|
||||||
|
|
||||||
|
|
||||||
class KademliaProtocol(protocol.DatagramProtocol):
|
class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
""" Implements all low-level network-related functions of a Kademlia node """
|
""" Implements all low-level network-related functions of a Kademlia node """
|
||||||
msgSizeLimit = constants.udpDatagramMaxSize-26
|
msgSizeLimit = constants.udpDatagramMaxSize-26
|
||||||
maxToSendDelay = 10**-3#0.05
|
|
||||||
minToSendDelay = 10**-5#0.01
|
|
||||||
|
|
||||||
def __init__(self, node, msgEncoder=encoding.Bencode(),
|
def __init__(self, node, msgEncoder=encoding.Bencode(),
|
||||||
msgTranslator=msgformat.DefaultFormat()):
|
msgTranslator=msgformat.DefaultFormat()):
|
||||||
|
@ -50,8 +71,10 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
self._sentMessages = {}
|
self._sentMessages = {}
|
||||||
self._partialMessages = {}
|
self._partialMessages = {}
|
||||||
self._partialMessagesProgress = {}
|
self._partialMessagesProgress = {}
|
||||||
self._next = 0
|
self._delay = Delay()
|
||||||
self._callLaterList = {}
|
# keep track of outstanding writes so that they
|
||||||
|
# can be cancelled on shutdown
|
||||||
|
self._call_later_list = {}
|
||||||
|
|
||||||
def sendRPC(self, contact, method, args, rawResponse=False):
|
def sendRPC(self, contact, method, args, rawResponse=False):
|
||||||
""" Sends an RPC to the specified contact
|
""" Sends an RPC to the specified contact
|
||||||
|
@ -211,29 +234,24 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
packetData = data[startPos:startPos+self.msgSizeLimit]
|
packetData = data[startPos:startPos+self.msgSizeLimit]
|
||||||
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
|
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
|
||||||
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
|
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
|
||||||
self._sendNext(txData, address)
|
self._scheduleSendNext(txData, address)
|
||||||
|
|
||||||
startPos += self.msgSizeLimit
|
startPos += self.msgSizeLimit
|
||||||
seqNumber += 1
|
seqNumber += 1
|
||||||
else:
|
else:
|
||||||
self._sendNext(data, address)
|
self._scheduleSendNext(data, address)
|
||||||
|
|
||||||
def _sendNext(self, txData, address):
|
def _scheduleSendNext(self, txData, address):
|
||||||
""" Send the next UDP packet """
|
"""Schedule the sending of the next UDP packet """
|
||||||
ts = time.time()
|
delay = self._delay()
|
||||||
delay = 0
|
key = object()
|
||||||
if ts >= self._next:
|
delayed_call = reactor.callLater(delay, self._write_and_remove, key, txData, address)
|
||||||
delay = self.minToSendDelay
|
self._call_later_list[key] = delayed_call
|
||||||
self._next = ts + self.minToSendDelay
|
|
||||||
else:
|
def _write_and_remove(self, key, txData, address):
|
||||||
delay = (self._next-ts) + self.maxToSendDelay
|
del self._call_later_list[key]
|
||||||
self._next += self.maxToSendDelay
|
|
||||||
if self.transport:
|
if self.transport:
|
||||||
laterCall = reactor.callLater(delay, self.transport.write, txData, address)
|
self.transport.write(txData, address)
|
||||||
for key in self._callLaterList.keys():
|
|
||||||
if key <= ts:
|
|
||||||
del self._callLaterList[key]
|
|
||||||
self._callLaterList[self._next] = laterCall
|
|
||||||
|
|
||||||
def _sendResponse(self, contact, rpcID, response):
|
def _sendResponse(self, contact, rpcID, response):
|
||||||
""" Send a RPC response to the specified contact
|
""" Send a RPC response to the specified contact
|
||||||
|
@ -324,16 +342,14 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
Will only be called once, after all ports are disconnected.
|
Will only be called once, after all ports are disconnected.
|
||||||
"""
|
"""
|
||||||
log.info('Stopping dht')
|
log.info('Stopping dht')
|
||||||
for key in self._callLaterList.keys():
|
for delayed_call in self._call_later_list.values():
|
||||||
try:
|
try:
|
||||||
if key > time.time():
|
delayed_call.cancel()
|
||||||
log.info('Cancelling %s', self._callLaterList[key])
|
except (error.AlreadyCalled, error.AlreadyCancelled):
|
||||||
self._callLaterList[key].cancel()
|
log.debug('Attempted to cancel a DelayedCall that was not active')
|
||||||
except Exception, e:
|
except Exception:
|
||||||
log.exception('Failed to cancel %s', self._callLaterList[key])
|
log.exception('Failed to cancel a DelayedCall')
|
||||||
del self._callLaterList[key]
|
|
||||||
|
|
||||||
# not sure why this is needed, but taking this out sometimes causes
|
# not sure why this is needed, but taking this out sometimes causes
|
||||||
# exceptions.AttributeError: 'Port' object has no attribute 'socket'
|
# exceptions.AttributeError: 'Port' object has no attribute 'socket'
|
||||||
# to happen on shutdown
|
# to happen on shutdown
|
||||||
reactor.iterate()
|
# reactor.iterate()
|
||||||
|
|
Loading…
Add table
Reference in a new issue