diff --git a/lbrynet/dht/protocol.py b/lbrynet/dht/protocol.py index 3530f78f6..1939ead2e 100644 --- a/lbrynet/dht/protocol.py +++ b/lbrynet/dht/protocol.py @@ -13,6 +13,7 @@ import time from twisted.internet import protocol, defer from twisted.python import failure +from twisted.internet import error import twisted.internet.reactor import constants @@ -36,11 +37,31 @@ class TimeoutError(Exception): self.remote_contact_id = remote_contact_id +class Delay(object): + maxToSendDelay = 10**-3 #0.05 + minToSendDelay = 10**-5 #0.01 + + def __init__(self, start=0): + self._next = start + + # TODO: explain why this logic is like it is. And add tests that + # show that it actually does what it needs to do. + def __call__(self): + ts = time.time() + delay = 0 + if ts >= self._next: + delay = self.minToSendDelay + self._next = ts + self.minToSendDelay + else: + delay = (self._next - ts) + self.maxToSendDelay + self._next += self.maxToSendDelay + return delay + + class KademliaProtocol(protocol.DatagramProtocol): """ Implements all low-level network-related functions of a Kademlia node """ msgSizeLimit = constants.udpDatagramMaxSize-26 - maxToSendDelay = 10**-3#0.05 - minToSendDelay = 10**-5#0.01 + def __init__(self, node, msgEncoder=encoding.Bencode(), msgTranslator=msgformat.DefaultFormat()): @@ -50,8 +71,10 @@ class KademliaProtocol(protocol.DatagramProtocol): self._sentMessages = {} self._partialMessages = {} self._partialMessagesProgress = {} - self._next = 0 - self._callLaterList = {} + self._delay = Delay() + # keep track of outstanding writes so that they + # can be cancelled on shutdown + self._call_later_list = {} def sendRPC(self, contact, method, args, rawResponse=False): """ Sends an RPC to the specified contact @@ -211,29 +234,24 @@ class KademliaProtocol(protocol.DatagramProtocol): packetData = data[startPos:startPos+self.msgSizeLimit] encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff) txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData) - self._sendNext(txData, address) + self._scheduleSendNext(txData, address) startPos += self.msgSizeLimit seqNumber += 1 else: - self._sendNext(data, address) + self._scheduleSendNext(data, address) - def _sendNext(self, txData, address): - """ Send the next UDP packet """ - ts = time.time() - delay = 0 - if ts >= self._next: - delay = self.minToSendDelay - self._next = ts + self.minToSendDelay - else: - delay = (self._next-ts) + self.maxToSendDelay - self._next += self.maxToSendDelay + def _scheduleSendNext(self, txData, address): + """Schedule the sending of the next UDP packet """ + delay = self._delay() + key = object() + delayed_call = reactor.callLater(delay, self._write_and_remove, key, txData, address) + self._call_later_list[key] = delayed_call + + def _write_and_remove(self, key, txData, address): + del self._call_later_list[key] if self.transport: - laterCall = reactor.callLater(delay, self.transport.write, txData, address) - for key in self._callLaterList.keys(): - if key <= ts: - del self._callLaterList[key] - self._callLaterList[self._next] = laterCall + self.transport.write(txData, address) def _sendResponse(self, contact, rpcID, response): """ Send a RPC response to the specified contact @@ -324,16 +342,14 @@ class KademliaProtocol(protocol.DatagramProtocol): Will only be called once, after all ports are disconnected. """ log.info('Stopping dht') - for key in self._callLaterList.keys(): + for delayed_call in self._call_later_list.values(): try: - if key > time.time(): - log.info('Cancelling %s', self._callLaterList[key]) - self._callLaterList[key].cancel() - except Exception, e: - log.exception('Failed to cancel %s', self._callLaterList[key]) - del self._callLaterList[key] - + delayed_call.cancel() + except (error.AlreadyCalled, error.AlreadyCancelled): + log.debug('Attempted to cancel a DelayedCall that was not active') + except Exception: + log.exception('Failed to cancel a DelayedCall') # not sure why this is needed, but taking this out sometimes causes # exceptions.AttributeError: 'Port' object has no attribute 'socket' # to happen on shutdown - reactor.iterate() + # reactor.iterate()