forked from LBRYCommunity/lbry-sdk
Merge branch 'fix-announce-lockup'
This commit is contained in:
commit
b4b1f1a2c1
4 changed files with 30 additions and 4 deletions
|
@ -1,6 +1,27 @@
|
|||
import logging
|
||||
|
||||
log = logging.getLogger()
|
||||
|
||||
MIN_DELAY = 0.0
|
||||
MAX_DELAY = 0.01
|
||||
DELAY_INCREMENT = 0.0001
|
||||
QUEUE_SIZE_THRESHOLD = 100
|
||||
|
||||
|
||||
class CallLaterManager(object):
|
||||
_callLater = None
|
||||
_pendingCallLaters = []
|
||||
_delay = MIN_DELAY
|
||||
|
||||
@classmethod
|
||||
def get_min_delay(cls):
|
||||
cls._pendingCallLaters = [cl for cl in cls._pendingCallLaters if cl.active()]
|
||||
queue_size = len(cls._pendingCallLaters)
|
||||
if queue_size > QUEUE_SIZE_THRESHOLD:
|
||||
cls._delay = min((cls._delay + DELAY_INCREMENT), MAX_DELAY)
|
||||
else:
|
||||
cls._delay = max((cls._delay - 2.0 * DELAY_INCREMENT), MIN_DELAY)
|
||||
return cls._delay
|
||||
|
||||
@classmethod
|
||||
def _cancel(cls, call_later):
|
||||
|
@ -53,6 +74,11 @@ class CallLaterManager(object):
|
|||
cls._pendingCallLaters.append(call_later)
|
||||
return call_later, canceller
|
||||
|
||||
@classmethod
|
||||
def call_soon(cls, what, *args, **kwargs):
|
||||
delay = cls.get_min_delay()
|
||||
return cls.call_later(delay, what, *args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def setup(cls, callLater):
|
||||
"""
|
||||
|
|
|
@ -227,7 +227,6 @@ class Daemon(AuthJSONRPCServer):
|
|||
@defer.inlineCallbacks
|
||||
def setup(self):
|
||||
reactor.addSystemEventTrigger('before', 'shutdown', self._shutdown)
|
||||
|
||||
configure_loggly_handler()
|
||||
|
||||
log.info("Starting lbrynet-daemon")
|
||||
|
@ -412,7 +411,6 @@ class Daemon(AuthJSONRPCServer):
|
|||
log.info("Status at time of shutdown: " + self.startup_status[0])
|
||||
|
||||
self._stop_streams()
|
||||
|
||||
self.looping_call_manager.shutdown()
|
||||
if self.analytics_manager:
|
||||
self.analytics_manager.shutdown()
|
||||
|
|
|
@ -94,6 +94,7 @@ class Node(object):
|
|||
self.reactor_resolve = resolve
|
||||
self.reactor_listenUDP = listenUDP
|
||||
self.reactor_callLater = CallLaterManager.call_later
|
||||
self.reactor_callSoon = CallLaterManager.call_soon
|
||||
self.node_id = node_id or self._generateID()
|
||||
self.port = udpPort
|
||||
self._listeningPort = None # object implementing Twisted
|
||||
|
|
|
@ -108,7 +108,8 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
message = self._translator.fromPrimitive(msgPrimitive)
|
||||
except (encoding.DecodeError, ValueError) as err:
|
||||
# We received some rubbish here
|
||||
log.exception("Error decoding datagram from %s:%i - %s", address[0], address[1], err)
|
||||
log.warning("Error decoding datagram %s from %s:%i - %s", datagram.encode('hex'),
|
||||
address[0], address[1], err)
|
||||
return
|
||||
except (IndexError, KeyError):
|
||||
log.warning("Couldn't decode dht datagram from %s", address)
|
||||
|
@ -206,7 +207,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
|||
|
||||
def _scheduleSendNext(self, txData, address):
|
||||
"""Schedule the sending of the next UDP packet """
|
||||
delayed_call, _ = self._node.reactor_callLater(0, self._write, txData, address)
|
||||
delayed_call, _ = self._node.reactor_callSoon(self._write, txData, address)
|
||||
|
||||
def _write(self, txData, address):
|
||||
if self.transport:
|
||||
|
|
Loading…
Reference in a new issue