forked from LBRYCommunity/lbry-sdk
add PingQueue to KademliaProtocol
This commit is contained in:
parent
1adf4f7818
commit
c65274e9e5
1 changed files with 74 additions and 0 deletions
|
@ -1,6 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
import errno
|
import errno
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
from twisted.internet import protocol, defer
|
from twisted.internet import protocol, defer
|
||||||
from lbrynet.core.call_later_manager import CallLaterManager
|
from lbrynet.core.call_later_manager import CallLaterManager
|
||||||
|
@ -14,6 +15,76 @@ import msgformat
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PingQueue(object):
|
||||||
|
"""
|
||||||
|
Schedules a 15 minute delayed ping after a new node sends us a query. This is so the new node gets added to the
|
||||||
|
routing table after having been given enough time for a pinhole to expire.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, node):
|
||||||
|
self._node = node
|
||||||
|
self._get_time = self._node.clock.seconds
|
||||||
|
self._queue = deque()
|
||||||
|
self._enqueued_contacts = {}
|
||||||
|
self._semaphore = defer.DeferredSemaphore(1)
|
||||||
|
self._ping_semaphore = defer.DeferredSemaphore(constants.alpha)
|
||||||
|
self._process_lc = node.get_looping_call(self._semaphore.run, self._process)
|
||||||
|
self._delay = 300
|
||||||
|
|
||||||
|
def _add_contact(self, contact):
|
||||||
|
if contact in self._enqueued_contacts:
|
||||||
|
return defer.succeed(None)
|
||||||
|
self._enqueued_contacts[contact] = self._get_time() + self._delay
|
||||||
|
self._queue.append(contact)
|
||||||
|
return defer.succeed(None)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _process(self):
|
||||||
|
if not len(self._queue):
|
||||||
|
defer.returnValue(None)
|
||||||
|
contact = self._queue.popleft()
|
||||||
|
now = self._get_time()
|
||||||
|
|
||||||
|
# if the oldest contact in the queue isn't old enough to be pinged, add it back to the queue and return
|
||||||
|
if now < self._enqueued_contacts[contact]:
|
||||||
|
self._queue.appendleft(contact)
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
|
def _ping(contact):
|
||||||
|
d = contact.ping()
|
||||||
|
d.addErrback(lambda err: err.trap(TimeoutError))
|
||||||
|
return d
|
||||||
|
|
||||||
|
pinged = []
|
||||||
|
checked = []
|
||||||
|
while now > self._enqueued_contacts[contact]:
|
||||||
|
checked.append(contact)
|
||||||
|
if contact.contact_is_good is None:
|
||||||
|
pinged.append(contact)
|
||||||
|
if not len(self._queue):
|
||||||
|
break
|
||||||
|
contact = self._queue.popleft()
|
||||||
|
if not now > self._enqueued_contacts[contact]:
|
||||||
|
checked.append(contact)
|
||||||
|
# log.info("ping %i/%i peers", len(pinged), len(checked))
|
||||||
|
|
||||||
|
yield defer.DeferredList([self._ping_semaphore.run(_ping, contact) for contact in pinged])
|
||||||
|
|
||||||
|
for contact in checked:
|
||||||
|
if contact in self._enqueued_contacts:
|
||||||
|
del self._enqueued_contacts[contact]
|
||||||
|
|
||||||
|
defer.returnValue(None)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
return self._node.safe_start_looping_call(self._process_lc, 60)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
return self._node.safe_stop_looping_call(self._process_lc)
|
||||||
|
|
||||||
|
def enqueue_maybe_ping(self, contact):
|
||||||
|
return self._semaphore.run(self._add_contact, contact)
|
||||||
|
|
||||||
class KademliaProtocol(protocol.DatagramProtocol):
|
class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
""" Implements all low-level network-related functions of a Kademlia node """
|
""" Implements all low-level network-related functions of a Kademlia node """
|
||||||
|
|
||||||
|
@ -27,6 +98,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
self._partialMessages = {}
|
self._partialMessages = {}
|
||||||
self._partialMessagesProgress = {}
|
self._partialMessagesProgress = {}
|
||||||
self._listening = defer.Deferred(None)
|
self._listening = defer.Deferred(None)
|
||||||
|
self._ping_queue = PingQueue(self._node)
|
||||||
|
|
||||||
def sendRPC(self, contact, method, args, rawResponse=False):
|
def sendRPC(self, contact, method, args, rawResponse=False):
|
||||||
"""
|
"""
|
||||||
|
@ -100,6 +172,7 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
def startProtocol(self):
|
def startProtocol(self):
|
||||||
log.info("DHT listening on UDP %s:%i", self._node.externalIP, self._node.port)
|
log.info("DHT listening on UDP %s:%i", self._node.externalIP, self._node.port)
|
||||||
self._listening.callback(True)
|
self._listening.callback(True)
|
||||||
|
return self._ping_queue.start()
|
||||||
|
|
||||||
def datagramReceived(self, datagram, address):
|
def datagramReceived(self, datagram, address):
|
||||||
""" Handles and parses incoming RPC messages (and responses)
|
""" Handles and parses incoming RPC messages (and responses)
|
||||||
|
@ -386,5 +459,6 @@ class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
Will only be called once, after all ports are disconnected.
|
Will only be called once, after all ports are disconnected.
|
||||||
"""
|
"""
|
||||||
log.info('Stopping DHT')
|
log.info('Stopping DHT')
|
||||||
|
self._ping_queue.stop()
|
||||||
CallLaterManager.stop()
|
CallLaterManager.stop()
|
||||||
log.info('DHT stopped')
|
log.info('DHT stopped')
|
||||||
|
|
Loading…
Add table
Reference in a new issue