diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index cf0cc8e37..a4f2acf3c 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -11,8 +11,7 @@ import hashlib import operator import struct import time - -from twisted.internet import defer, error, reactor, threads, task +from twisted.internet import defer, error, reactor, task import constants import routingtable @@ -86,8 +85,8 @@ class Node(object): # network (add callbacks to this deferred if scheduling such # operations before the node has finished joining the network) self._joinDeferred = None - self.next_refresh_call = None self.change_token_lc = task.LoopingCall(self.change_token) + self.refresh_node_lc = task.LoopingCall(self._refreshNode) # Create k-buckets (for storing contacts) if routingTableClass is None: self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id) @@ -123,10 +122,9 @@ class Node(object): self._listeningPort.stopListening() def stop(self): - # cancel callLaters: - if self.next_refresh_call is not None: - self.next_refresh_call.cancel() - self.next_refresh_call = None + # stop LoopingCalls: + if self.refresh_node_lc.running: + self.refresh_node_lc.stop() if self.change_token_lc.running: self.change_token_lc.stop() if self._listeningPort is not None: @@ -182,9 +180,11 @@ class Node(object): # Initiate the Kademlia joining sequence - perform a search for this node's own ID self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts) - - result = yield self._joinDeferred - defer.returnValue(result) + # #TODO: Refresh all k-buckets further away than this node's closest neighbour + # Start refreshing k-buckets periodically, if necessary + self.hash_watcher.tick() + yield self._joinDeferred + self.refresh_node_lc.start(constants.checkRefreshInterval) @property def contacts(self): @@ -629,13 +629,14 @@ class Node(object): result = yield outerDf defer.returnValue(result) + @defer.inlineCallbacks def _refreshNode(self): """ Periodically called to perform k-bucket refreshes and data replication/republishing as necessary """ - df = self._refreshRoutingTable() - df.addCallback(self._removeExpiredPeers) - df.addCallback(self._scheduleNextNodeRefresh) + yield self._refreshRoutingTable() + self._dataStore.removeExpiredPeers() + defer.returnValue(None) def _refreshRoutingTable(self): nodeIDs = self._routingTable.getRefreshList(0, False) @@ -654,9 +655,6 @@ class Node(object): searchForNextNodeID() return outerDf - def _scheduleNextNodeRefresh(self, *args): - self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval, - self._refreshNode) # args put here because _refreshRoutingTable does outerDF.callback(None) def _removeExpiredPeers(self, *args):