forked from LBRYCommunity/lbry-sdk
remove semaphore from ping queue
This commit is contained in:
parent
2f95c3a9d1
commit
4890fdfb50
2 changed files with 14 additions and 27 deletions
|
@ -640,19 +640,14 @@ class Node(MockKademliaHelper):
|
||||||
replication/republishing as necessary """
|
replication/republishing as necessary """
|
||||||
yield self._refreshRoutingTable()
|
yield self._refreshRoutingTable()
|
||||||
self._dataStore.removeExpiredPeers()
|
self._dataStore.removeExpiredPeers()
|
||||||
yield self._refreshStoringPeers()
|
self._refreshStoringPeers()
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
def _refreshContacts(self):
|
def _refreshContacts(self):
|
||||||
return defer.DeferredList(
|
self._protocol._ping_queue.enqueue_maybe_ping(*self.contacts, delay=0)
|
||||||
[self._protocol._ping_queue.enqueue_maybe_ping(contact, delay=0) for contact in self.contacts]
|
|
||||||
)
|
|
||||||
|
|
||||||
def _refreshStoringPeers(self):
|
def _refreshStoringPeers(self):
|
||||||
storing_contacts = self._dataStore.getStoringContacts()
|
self._protocol._ping_queue.enqueue_maybe_ping(*self._dataStore.getStoringContacts(), delay=0)
|
||||||
return defer.DeferredList(
|
|
||||||
[self._protocol._ping_queue.enqueue_maybe_ping(contact, delay=0) for contact in storing_contacts]
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _refreshRoutingTable(self):
|
def _refreshRoutingTable(self):
|
||||||
|
|
|
@ -25,29 +25,25 @@ class PingQueue:
|
||||||
self._get_time = self._node.clock.seconds
|
self._get_time = self._node.clock.seconds
|
||||||
self._queue = deque()
|
self._queue = deque()
|
||||||
self._enqueued_contacts = {}
|
self._enqueued_contacts = {}
|
||||||
self._semaphore = defer.DeferredSemaphore(1)
|
self._process_lc = node.get_looping_call(self._process)
|
||||||
self._ping_semaphore = defer.DeferredSemaphore(constants.alpha)
|
|
||||||
self._process_lc = node.get_looping_call(self._semaphore.run, self._process)
|
|
||||||
|
|
||||||
def _add_contact(self, contact, delay=None):
|
def enqueue_maybe_ping(self, *contacts, **kwargs):
|
||||||
if (contact.address, contact.port) in [(c.address, c.port) for c in self._enqueued_contacts]:
|
schedule = self._get_time() + (kwargs.get('delay', constants.checkRefreshInterval))
|
||||||
return defer.succeed(None)
|
for contact in contacts:
|
||||||
delay = delay or constants.checkRefreshInterval
|
if contact not in self._enqueued_contacts:
|
||||||
self._enqueued_contacts[contact] = self._get_time() + delay
|
self._enqueued_contacts[contact] = schedule
|
||||||
self._queue.append(contact)
|
self._queue.append(contact)
|
||||||
return defer.succeed(None)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _process(self):
|
def _process(self):
|
||||||
if not len(self._queue):
|
if not len(self._queue):
|
||||||
defer.returnValue(None)
|
return
|
||||||
contact = self._queue.popleft()
|
contact = self._queue.popleft()
|
||||||
now = self._get_time()
|
now = self._get_time()
|
||||||
|
|
||||||
# if the oldest contact in the queue isn't old enough to be pinged, add it back to the queue and return
|
# if the oldest contact in the queue isn't old enough to be pinged, add it back to the queue and return
|
||||||
if now < self._enqueued_contacts[contact]:
|
if now < self._enqueued_contacts[contact]:
|
||||||
self._queue.appendleft(contact)
|
self._queue.appendleft(contact)
|
||||||
defer.returnValue(None)
|
return
|
||||||
|
|
||||||
pinged = []
|
pinged = []
|
||||||
checked = []
|
checked = []
|
||||||
|
@ -70,15 +66,14 @@ class PingQueue:
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.warning("unexpected error: %s", err)
|
log.warning("unexpected error: %s", err)
|
||||||
|
|
||||||
yield defer.DeferredList([_ping(contact) for contact in pinged])
|
d = defer.DeferredList([_ping(contact) for contact in pinged])
|
||||||
|
|
||||||
for contact in checked:
|
for contact in checked:
|
||||||
if contact in self._enqueued_contacts and contact in pinged:
|
if contact in self._enqueued_contacts and contact in pinged:
|
||||||
del self._enqueued_contacts[contact]
|
del self._enqueued_contacts[contact]
|
||||||
elif contact not in self._queue:
|
elif contact not in self._queue:
|
||||||
self._queue.appendleft(contact)
|
self._queue.appendleft(contact)
|
||||||
|
return d
|
||||||
defer.returnValue(None)
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
return self._node.safe_start_looping_call(self._process_lc, 60)
|
return self._node.safe_start_looping_call(self._process_lc, 60)
|
||||||
|
@ -86,9 +81,6 @@ class PingQueue:
|
||||||
def stop(self):
|
def stop(self):
|
||||||
return self._node.safe_stop_looping_call(self._process_lc)
|
return self._node.safe_stop_looping_call(self._process_lc)
|
||||||
|
|
||||||
def enqueue_maybe_ping(self, contact, delay=None):
|
|
||||||
return self._semaphore.run(self._add_contact, contact, delay)
|
|
||||||
|
|
||||||
|
|
||||||
class KademliaProtocol(protocol.DatagramProtocol):
|
class KademliaProtocol(protocol.DatagramProtocol):
|
||||||
""" Implements all low-level network-related functions of a Kademlia node """
|
""" Implements all low-level network-related functions of a Kademlia node """
|
||||||
|
|
Loading…
Add table
Reference in a new issue