convert node manage function to a looping call

This commit is contained in:
Jack Robison 2017-10-23 13:13:06 -04:00
parent ecbe4113ce
commit 67ef8be7b7
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -11,8 +11,7 @@ import hashlib
import operator import operator
import struct import struct
import time import time
from twisted.internet import defer, error, reactor, task
from twisted.internet import defer, error, reactor, threads, task
import constants import constants
import routingtable import routingtable
@ -86,8 +85,8 @@ class Node(object):
# network (add callbacks to this deferred if scheduling such # network (add callbacks to this deferred if scheduling such
# operations before the node has finished joining the network) # operations before the node has finished joining the network)
self._joinDeferred = None self._joinDeferred = None
self.next_refresh_call = None
self.change_token_lc = task.LoopingCall(self.change_token) self.change_token_lc = task.LoopingCall(self.change_token)
self.refresh_node_lc = task.LoopingCall(self._refreshNode)
# Create k-buckets (for storing contacts) # Create k-buckets (for storing contacts)
if routingTableClass is None: if routingTableClass is None:
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id) self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id)
@ -123,10 +122,9 @@ class Node(object):
self._listeningPort.stopListening() self._listeningPort.stopListening()
def stop(self): def stop(self):
# cancel callLaters: # stop LoopingCalls:
if self.next_refresh_call is not None: if self.refresh_node_lc.running:
self.next_refresh_call.cancel() self.refresh_node_lc.stop()
self.next_refresh_call = None
if self.change_token_lc.running: if self.change_token_lc.running:
self.change_token_lc.stop() self.change_token_lc.stop()
if self._listeningPort is not None: if self._listeningPort is not None:
@ -182,9 +180,11 @@ class Node(object):
# Initiate the Kademlia joining sequence - perform a search for this node's own ID # Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
result = yield self._joinDeferred # Start refreshing k-buckets periodically, if necessary
defer.returnValue(result) self.hash_watcher.tick()
yield self._joinDeferred
self.refresh_node_lc.start(constants.checkRefreshInterval)
@property @property
def contacts(self): def contacts(self):
@ -629,13 +629,14 @@ class Node(object):
result = yield outerDf result = yield outerDf
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks
def _refreshNode(self): def _refreshNode(self):
""" Periodically called to perform k-bucket refreshes and data """ Periodically called to perform k-bucket refreshes and data
replication/republishing as necessary """ replication/republishing as necessary """
df = self._refreshRoutingTable() yield self._refreshRoutingTable()
df.addCallback(self._removeExpiredPeers) self._dataStore.removeExpiredPeers()
df.addCallback(self._scheduleNextNodeRefresh) defer.returnValue(None)
def _refreshRoutingTable(self): def _refreshRoutingTable(self):
nodeIDs = self._routingTable.getRefreshList(0, False) nodeIDs = self._routingTable.getRefreshList(0, False)
@ -654,9 +655,6 @@ class Node(object):
searchForNextNodeID() searchForNextNodeID()
return outerDf return outerDf
def _scheduleNextNodeRefresh(self, *args):
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
self._refreshNode)
# args put here because _refreshRoutingTable does outerDF.callback(None) # args put here because _refreshRoutingTable does outerDF.callback(None)
def _removeExpiredPeers(self, *args): def _removeExpiredPeers(self, *args):