Better dht shutdown.

The old code relied on the timing of DelayedCalls
to see what had been called or not, but unfortunately
we don't have a real-time OS so the timing on DelayedCalls
can only be approximate. Changed to explicitly keep
track of which calls had been made.

This simplifies the shutdown logic drastically, and I believe
we can take out the reactor.iterate() now
This commit is contained in:
Job Evers-Meltzer 2016-12-15 23:44:35 -06:00
parent 3dde7af576
commit 8fe15f507b

View file

@ -13,6 +13,7 @@ import time
from twisted.internet import protocol, defer
from twisted.python import failure
from twisted.internet import error
import twisted.internet.reactor
import constants
@ -36,11 +37,31 @@ class TimeoutError(Exception):
self.remote_contact_id = remote_contact_id
class Delay(object):
maxToSendDelay = 10**-3 #0.05
minToSendDelay = 10**-5 #0.01
def __init__(self, start=0):
self._next = start
# TODO: explain why this logic is like it is. And add tests that
# show that it actually does what it needs to do.
def __call__(self):
ts = time.time()
delay = 0
if ts >= self._next:
delay = self.minToSendDelay
self._next = ts + self.minToSendDelay
else:
delay = (self._next - ts) + self.maxToSendDelay
self._next += self.maxToSendDelay
return delay
class KademliaProtocol(protocol.DatagramProtocol):
""" Implements all low-level network-related functions of a Kademlia node """
msgSizeLimit = constants.udpDatagramMaxSize-26
maxToSendDelay = 10**-3#0.05
minToSendDelay = 10**-5#0.01
def __init__(self, node, msgEncoder=encoding.Bencode(),
msgTranslator=msgformat.DefaultFormat()):
@ -50,8 +71,10 @@ class KademliaProtocol(protocol.DatagramProtocol):
self._sentMessages = {}
self._partialMessages = {}
self._partialMessagesProgress = {}
self._next = 0
self._callLaterList = {}
self._delay = Delay()
# keep track of outstanding writes so that they
# can be cancelled on shutdown
self._call_later_list = {}
def sendRPC(self, contact, method, args, rawResponse=False):
""" Sends an RPC to the specified contact
@ -211,29 +234,24 @@ class KademliaProtocol(protocol.DatagramProtocol):
packetData = data[startPos:startPos+self.msgSizeLimit]
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
self._sendNext(txData, address)
self._scheduleSendNext(txData, address)
startPos += self.msgSizeLimit
seqNumber += 1
else:
self._sendNext(data, address)
self._scheduleSendNext(data, address)
def _sendNext(self, txData, address):
""" Send the next UDP packet """
ts = time.time()
delay = 0
if ts >= self._next:
delay = self.minToSendDelay
self._next = ts + self.minToSendDelay
else:
delay = (self._next-ts) + self.maxToSendDelay
self._next += self.maxToSendDelay
def _scheduleSendNext(self, txData, address):
"""Schedule the sending of the next UDP packet """
delay = self._delay()
key = object()
delayed_call = reactor.callLater(delay, self._write_and_remove, key, txData, address)
self._call_later_list[key] = delayed_call
def _write_and_remove(self, key, txData, address):
del self._call_later_list[key]
if self.transport:
laterCall = reactor.callLater(delay, self.transport.write, txData, address)
for key in self._callLaterList.keys():
if key <= ts:
del self._callLaterList[key]
self._callLaterList[self._next] = laterCall
self.transport.write(txData, address)
def _sendResponse(self, contact, rpcID, response):
""" Send a RPC response to the specified contact
@ -324,16 +342,14 @@ class KademliaProtocol(protocol.DatagramProtocol):
Will only be called once, after all ports are disconnected.
"""
log.info('Stopping dht')
for key in self._callLaterList.keys():
for delayed_call in self._call_later_list.values():
try:
if key > time.time():
log.info('Cancelling %s', self._callLaterList[key])
self._callLaterList[key].cancel()
except Exception, e:
log.exception('Failed to cancel %s', self._callLaterList[key])
del self._callLaterList[key]
delayed_call.cancel()
except (error.AlreadyCalled, error.AlreadyCancelled):
log.debug('Attempted to cancel a DelayedCall that was not active')
except Exception:
log.exception('Failed to cancel a DelayedCall')
# not sure why this is needed, but taking this out sometimes causes
# exceptions.AttributeError: 'Port' object has no attribute 'socket'
# to happen on shutdown
reactor.iterate()
# reactor.iterate()