move dht node component setup from Session into Node

This commit is contained in:
Jack Robison 2018-02-20 13:35:03 -05:00
parent e6caedac91
commit 04e76443c6
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -2,7 +2,7 @@ import logging
import miniupnpc import miniupnpc
from twisted.internet import threads, defer from twisted.internet import threads, defer
from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht import node, peerfinder, peermanager, hashannouncer from lbrynet.dht import node, peermanager, hashannouncer
from lbrynet.database.storage import SQLiteStorage from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id from lbrynet.core.utils import generate_id
@ -97,38 +97,26 @@ class Session(object):
""" """
self.db_dir = db_dir self.db_dir = db_dir
self.node_id = node_id self.node_id = node_id
self.peer_manager = peer_manager self.peer_manager = peer_manager
self.peer_finder = peer_finder
self.hash_announcer = hash_announcer
self.dht_node_port = dht_node_port self.dht_node_port = dht_node_port
self.known_dht_nodes = known_dht_nodes self.known_dht_nodes = known_dht_nodes
if self.known_dht_nodes is None: if self.known_dht_nodes is None:
self.known_dht_nodes = [] self.known_dht_nodes = []
self.peer_finder = peer_finder
self.hash_announcer = hash_announcer
self.blob_dir = blob_dir self.blob_dir = blob_dir
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.blob_tracker = None self.blob_tracker = None
self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker self.blob_tracker_class = blob_tracker_class or BlobAvailabilityTracker
self.peer_port = peer_port self.peer_port = peer_port
self.use_upnp = use_upnp self.use_upnp = use_upnp
self.rate_limiter = rate_limiter self.rate_limiter = rate_limiter
self.external_ip = external_ip self.external_ip = external_ip
self.upnp_redirects = [] self.upnp_redirects = []
self.wallet = wallet self.wallet = wallet
self.dht_node_class = dht_node_class self.dht_node_class = dht_node_class
self.dht_node = None self.dht_node = None
self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate) self.base_payment_rate_manager = BasePaymentRateManager(blob_data_payment_rate)
self.payment_rate_manager = None self.payment_rate_manager = None
self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
@ -147,21 +135,11 @@ class Session(object):
from lbrynet.core.PTCWallet import PTCWallet from lbrynet.core.PTCWallet import PTCWallet
self.wallet = PTCWallet(self.db_dir) self.wallet = PTCWallet(self.db_dir)
if self.peer_manager is None:
self.peer_manager = peermanager.PeerManager()
if self.use_upnp is True: if self.use_upnp is True:
d = self._try_upnp() d = self._try_upnp()
else: else:
d = defer.succeed(True) d = defer.succeed(True)
d.addCallback(lambda _: self._setup_dht())
if self.peer_finder is None:
d.addCallback(lambda _: self._setup_dht())
else:
if self.hash_announcer is None and self.peer_port is not None:
log.warning("The server has no way to advertise its available blobs.")
self.hash_announcer = hashannouncer.DummyHashAnnouncer()
d.addCallback(lambda _: self._setup_other_components()) d.addCallback(lambda _: self._setup_other_components())
return d return d
@ -175,10 +153,6 @@ class Session(object):
ds.append(defer.maybeDeferred(self.dht_node.stop)) ds.append(defer.maybeDeferred(self.dht_node.stop))
if self.rate_limiter is not None: if self.rate_limiter is not None:
ds.append(defer.maybeDeferred(self.rate_limiter.stop)) ds.append(defer.maybeDeferred(self.rate_limiter.stop))
if self.peer_finder is not None:
ds.append(defer.maybeDeferred(self.peer_finder.stop))
if self.hash_announcer is not None:
ds.append(defer.maybeDeferred(self.hash_announcer.stop))
if self.wallet is not None: if self.wallet is not None:
ds.append(defer.maybeDeferred(self.wallet.stop)) ds.append(defer.maybeDeferred(self.wallet.stop))
if self.blob_manager is not None: if self.blob_manager is not None:
@ -250,17 +224,16 @@ class Session(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _setup_dht(self): def _setup_dht(self):
log.info("Starting DHT") log.info("Starting DHT")
self.dht_node = self.dht_node_class( self.dht_node = self.dht_node_class(
self.hash_announcer,
udpPort=self.dht_node_port, udpPort=self.dht_node_port,
node_id=self.node_id, node_id=self.node_id,
externalIP=self.external_ip, externalIP=self.external_ip,
peerPort=self.peer_port peerPort=self.peer_port,
peer_manager=self.peer_manager,
peer_finder=self.peer_finder,
) )
yield self.dht_node.joinNetwork(self.known_dht_nodes) yield self.dht_node.joinNetwork(self.known_dht_nodes)
self.peer_finder = self.dht_node.peer_finder
self.hash_announcer = self.dht_node.hash_announcer
def _setup_other_components(self): def _setup_other_components(self):
log.debug("Setting up the rest of the components") log.debug("Setting up the rest of the components")
@ -274,12 +247,12 @@ class Session(object):
"TempBlobManager is no longer supported, specify BlobManager or db_dir") "TempBlobManager is no longer supported, specify BlobManager or db_dir")
else: else:
self.blob_manager = DiskBlobManager( self.blob_manager = DiskBlobManager(
self.hash_announcer, self.blob_dir, self.storage self.dht_node.hash_announcer, self.blob_dir, self.storage
) )
if self.blob_tracker is None: if self.blob_tracker is None:
self.blob_tracker = self.blob_tracker_class( self.blob_tracker = self.blob_tracker_class(
self.blob_manager, self.peer_finder, self.dht_node self.blob_manager, self.dht_node.peer_finder, self.dht_node
) )
if self.payment_rate_manager is None: if self.payment_rate_manager is None:
self.payment_rate_manager = self.payment_rate_manager_class( self.payment_rate_manager = self.payment_rate_manager_class(