forked from LBRYCommunity/lbry-sdk
move dht node setup back into node class
This commit is contained in:
parent
e30ea50ef4
commit
efaa97216f
2 changed files with 39 additions and 80 deletions
|
@ -247,59 +247,20 @@ class Session(object):
|
||||||
d.addErrback(upnp_failed)
|
d.addErrback(upnp_failed)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
# the callback, if any, will be invoked once the joining procedure
|
@defer.inlineCallbacks
|
||||||
# has terminated
|
|
||||||
def join_dht(self, cb=None):
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
def join_resolved_addresses(result):
|
|
||||||
addresses = []
|
|
||||||
for success, value in result:
|
|
||||||
if success is True:
|
|
||||||
addresses.append(value)
|
|
||||||
return addresses
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def join_network(knownNodes):
|
|
||||||
log.debug("join DHT using known nodes: " + str(knownNodes))
|
|
||||||
result = yield self.dht_node.joinNetwork(knownNodes)
|
|
||||||
defer.returnValue(result)
|
|
||||||
|
|
||||||
ds = []
|
|
||||||
for host, port in self.known_dht_nodes:
|
|
||||||
d = reactor.resolve(host)
|
|
||||||
d.addCallback(lambda h: (h, port)) # match host to port
|
|
||||||
ds.append(d)
|
|
||||||
|
|
||||||
dl = defer.DeferredList(ds)
|
|
||||||
dl.addCallback(join_resolved_addresses)
|
|
||||||
dl.addCallback(join_network)
|
|
||||||
if cb:
|
|
||||||
dl.addCallback(cb)
|
|
||||||
|
|
||||||
return dl
|
|
||||||
|
|
||||||
def _setup_dht(self):
|
def _setup_dht(self):
|
||||||
log.info("Starting DHT")
|
log.info("Starting DHT")
|
||||||
|
|
||||||
def start_dht(join_network_result):
|
|
||||||
self.hash_announcer.run_manage_loop()
|
|
||||||
return True
|
|
||||||
|
|
||||||
self.dht_node = self.dht_node_class(
|
self.dht_node = self.dht_node_class(
|
||||||
udpPort=self.dht_node_port,
|
udpPort=self.dht_node_port,
|
||||||
node_id=self.node_id,
|
node_id=self.node_id,
|
||||||
externalIP=self.external_ip,
|
externalIP=self.external_ip,
|
||||||
peerPort=self.peer_port
|
peerPort=self.peer_port
|
||||||
)
|
)
|
||||||
self.peer_finder = peerfinder.DHTPeerFinder(self.dht_node, self.peer_manager)
|
|
||||||
if self.hash_announcer is None:
|
|
||||||
self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.peer_port)
|
|
||||||
|
|
||||||
self.dht_node.startNetwork()
|
yield self.dht_node.joinNetwork(self.known_dht_nodes)
|
||||||
|
self.peer_finder = self.dht_node.peer_finder
|
||||||
# pass start_dht() as callback to start the remaining components after joining the DHT
|
self.hash_announcer = self.dht_node.hash_announcer
|
||||||
return self.join_dht(start_dht)
|
|
||||||
|
|
||||||
def _setup_other_components(self):
|
def _setup_other_components(self):
|
||||||
log.debug("Setting up the rest of the components")
|
log.debug("Setting up the rest of the components")
|
||||||
|
|
|
@ -18,6 +18,9 @@ import routingtable
|
||||||
import datastore
|
import datastore
|
||||||
import protocol
|
import protocol
|
||||||
|
|
||||||
|
from peermanager import PeerManager
|
||||||
|
from hashannouncer import DHTHashAnnouncer
|
||||||
|
from peerfinder import DHTPeerFinder
|
||||||
from contact import Contact
|
from contact import Contact
|
||||||
from hashwatcher import HashWatcher
|
from hashwatcher import HashWatcher
|
||||||
from distance import Distance
|
from distance import Distance
|
||||||
|
@ -120,7 +123,12 @@ class Node(object):
|
||||||
# will be used later
|
# will be used later
|
||||||
self._can_store = True
|
self._can_store = True
|
||||||
|
|
||||||
|
self.peer_manager = PeerManager()
|
||||||
|
self.peer_finder = DHTPeerFinder(self, self.peer_manager)
|
||||||
|
self.hash_announcer = DHTHashAnnouncer(self, self.port)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
|
log.warning("unclean shutdown of the dht node")
|
||||||
if self._listeningPort is not None:
|
if self._listeningPort is not None:
|
||||||
self._listeningPort.stopListening()
|
self._listeningPort.stopListening()
|
||||||
|
|
||||||
|
@ -138,60 +146,50 @@ class Node(object):
|
||||||
self._listeningPort.stopListening()
|
self._listeningPort.stopListening()
|
||||||
self.hash_watcher.stop()
|
self.hash_watcher.stop()
|
||||||
|
|
||||||
def startNetwork(self):
|
|
||||||
""" Causes the Node to start all the underlying components needed for the DHT
|
|
||||||
to work. This should be called before any other DHT operations.
|
|
||||||
"""
|
|
||||||
log.info("Starting DHT underlying components")
|
|
||||||
|
|
||||||
# Prepare the underlying Kademlia protocol
|
|
||||||
if self.port is not None:
|
|
||||||
try:
|
|
||||||
self._listeningPort = reactor.listenUDP(self.port, self._protocol)
|
|
||||||
except error.CannotListenError as e:
|
|
||||||
import traceback
|
|
||||||
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
|
|
||||||
raise ValueError("%s lbrynet may already be running." % str(e))
|
|
||||||
|
|
||||||
# Start the token looping call
|
|
||||||
self.change_token_lc.start(constants.tokenSecretChangeInterval)
|
|
||||||
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
|
|
||||||
# Start refreshing k-buckets periodically, if necessary
|
|
||||||
self.next_refresh_call = reactor.callLater(constants.checkRefreshInterval,
|
|
||||||
self._refreshNode)
|
|
||||||
self.hash_watcher.tick()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def joinNetwork(self, knownNodeAddresses=None):
|
def joinNetwork(self, known_node_addresses=None):
|
||||||
""" Causes the Node to attempt to join the DHT network by contacting the
|
""" Causes the Node to attempt to join the DHT network by contacting the
|
||||||
known DHT nodes. This can be called multiple times if the previous attempt
|
known DHT nodes. This can be called multiple times if the previous attempt
|
||||||
has failed or if the Node has lost all the contacts.
|
has failed or if the Node has lost all the contacts.
|
||||||
|
|
||||||
@param knownNodeAddresses: A sequence of tuples containing IP address
|
@param known_node_addresses: A sequence of tuples containing IP address
|
||||||
information for existing nodes on the
|
information for existing nodes on the
|
||||||
Kademlia network, in the format:
|
Kademlia network, in the format:
|
||||||
C{(<ip address>, (udp port>)}
|
C{(<ip address>, (udp port>)}
|
||||||
@type knownNodeAddresses: tuple
|
@type known_node_addresses: list
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._listeningPort = reactor.listenUDP(self.port, self._protocol)
|
||||||
|
log.info("DHT node listening on %i", self.port)
|
||||||
|
except error.CannotListenError as e:
|
||||||
|
import traceback
|
||||||
|
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
|
||||||
|
raise ValueError("%s lbrynet may already be running." % str(e))
|
||||||
|
|
||||||
|
known_node_addresses = known_node_addresses or []
|
||||||
|
bootstrap_contacts = []
|
||||||
|
for node_address, port in known_node_addresses:
|
||||||
|
host = yield reactor.resolve(node_address)
|
||||||
|
# Create temporary contact information for the list of addresses of known nodes
|
||||||
|
contact = Contact(self._generateID(), host, port, self._protocol)
|
||||||
|
bootstrap_contacts.append(contact)
|
||||||
|
|
||||||
log.info("Attempting to join the DHT network")
|
log.info("Attempting to join the DHT network")
|
||||||
|
|
||||||
# IGNORE:E1101
|
|
||||||
# Create temporary contact information for the list of addresses of known nodes
|
|
||||||
if knownNodeAddresses != None:
|
|
||||||
bootstrapContacts = []
|
|
||||||
for address, port in knownNodeAddresses:
|
|
||||||
contact = Contact(self._generateID(), address, port, self._protocol)
|
|
||||||
bootstrapContacts.append(contact)
|
|
||||||
else:
|
|
||||||
bootstrapContacts = None
|
|
||||||
|
|
||||||
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
|
# Initiate the Kademlia joining sequence - perform a search for this node's own ID
|
||||||
self._joinDeferred = self._iterativeFind(self.node_id, bootstrapContacts)
|
self._joinDeferred = self._iterativeFind(self.node_id, bootstrap_contacts)
|
||||||
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
|
# #TODO: Refresh all k-buckets further away than this node's closest neighbour
|
||||||
# Start refreshing k-buckets periodically, if necessary
|
# Start refreshing k-buckets periodically, if necessary
|
||||||
self.hash_watcher.tick()
|
self.hash_watcher.tick()
|
||||||
yield self._joinDeferred
|
yield self._joinDeferred
|
||||||
|
|
||||||
|
self.change_token_lc.start(constants.tokenSecretChangeInterval)
|
||||||
self.refresh_node_lc.start(constants.checkRefreshInterval)
|
self.refresh_node_lc.start(constants.checkRefreshInterval)
|
||||||
|
self.peer_finder.run_manage_loop()
|
||||||
|
self.hash_announcer.run_manage_loop()
|
||||||
|
|
||||||
|
#TODO: re-attempt joining the network if it fails
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def contacts(self):
|
def contacts(self):
|
||||||
|
|
Loading…
Add table
Reference in a new issue