refactor joinNetwork into smaller functions

-try to re-join network if no contacts are known
This commit is contained in:
Jack Robison 2018-02-20 13:37:02 -05:00
parent 04e76443c6
commit 43896c8d17
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -100,7 +100,7 @@ class Node(object):
# information from the DHT as soon as the node is part of the # information from the DHT as soon as the node is part of the
# network (add callbacks to this deferred if scheduling such # network (add callbacks to this deferred if scheduling such
# operations before the node has finished joining the network) # operations before the node has finished joining the network)
self._joinDeferred = None self._joinDeferred = defer.Deferred(None)
self.change_token_lc = task.LoopingCall(self.change_token) self.change_token_lc = task.LoopingCall(self.change_token)
self.change_token_lc.clock = self.clock self.change_token_lc.clock = self.clock
self.refresh_node_lc = task.LoopingCall(self._refreshNode) self.refresh_node_lc = task.LoopingCall(self._refreshNode)
@ -164,6 +164,45 @@ class Node(object):
if self.hash_watcher.lc.running: if self.hash_watcher.lc.running:
yield self.hash_watcher.stop() yield self.hash_watcher.stop()
def start_listening(self):
try:
self._listeningPort = self.reactor_listenUDP(self.port, self._protocol)
log.info("DHT node listening on %i", self.port)
except error.CannotListenError as e:
import traceback
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
raise ValueError("%s lbrynet may already be running." % str(e))
def bootstrap_join(self, known_node_addresses, finished_d):
@defer.inlineCallbacks
def _resolve_seeds():
bootstrap_contacts = []
for node_address, port in known_node_addresses:
host = yield self.reactor_resolve(node_address)
# Create temporary contact information for the list of addresses of known nodes
contact = Contact(self._generateID(), host, port, self._protocol)
bootstrap_contacts.append(contact)
if not bootstrap_contacts:
if not self.hasContacts():
log.warning("No known contacts!")
else:
log.info("found contacts")
bootstrap_contacts = self.contacts
defer.returnValue(bootstrap_contacts)
def _rerun(bootstrap_contacts):
if not bootstrap_contacts:
log.info("Failed to join the dht, re-attempting in 60 seconds")
self.reactor_callLater(60, self.bootstrap_join, known_node_addresses, finished_d)
elif not finished_d.called:
finished_d.callback(bootstrap_contacts)
log.info("Attempting to join the DHT network")
d = _resolve_seeds()
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
d.addCallback(lambda contacts: self._iterativeFind(self.node_id, contacts))
d.addCallback(_rerun)
@defer.inlineCallbacks @defer.inlineCallbacks
def joinNetwork(self, known_node_addresses=None): def joinNetwork(self, known_node_addresses=None):
""" Causes the Node to attempt to join the DHT network by contacting the """ Causes the Node to attempt to join the DHT network by contacting the
@ -177,38 +216,16 @@ class Node(object):
@type known_node_addresses: list @type known_node_addresses: list
""" """
try: self.start_listening()
self._listeningPort = reactor.listenUDP(self.port, self._protocol)
log.info("DHT node listening on %i", self.port)
except error.CannotListenError as e:
import traceback
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
raise ValueError("%s lbrynet may already be running." % str(e))
known_node_addresses = known_node_addresses or []
bootstrap_contacts = []
for node_address, port in known_node_addresses:
host = yield reactor.resolve(node_address)
# Create temporary contact information for the list of addresses of known nodes
contact = Contact(self._generateID(), host, port, self._protocol)
bootstrap_contacts.append(contact)
log.info("Attempting to join the DHT network")
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
self._joinDeferred = self._iterativeFind(self.node_id, bootstrap_contacts)
# #TODO: Refresh all k-buckets further away than this node's closest neighbour # #TODO: Refresh all k-buckets further away than this node's closest neighbour
# Start refreshing k-buckets periodically, if necessary # Start refreshing k-buckets periodically, if necessary
self.hash_watcher.tick() self.bootstrap_join(known_node_addresses or [], self._joinDeferred)
yield self._joinDeferred yield self._joinDeferred
self.hash_watcher.start() self.hash_watcher.start()
self.change_token_lc.start(constants.tokenSecretChangeInterval) self.change_token_lc.start(constants.tokenSecretChangeInterval)
self.refresh_node_lc.start(constants.checkRefreshInterval) self.refresh_node_lc.start(constants.checkRefreshInterval)
self.hash_announcer.run_manage_loop() self.hash_announcer.run_manage_loop()
#TODO: re-attempt joining the network if it fails
@property @property
def contacts(self): def contacts(self):
def _inner(): def _inner():