move peer manager class to lbrynet.core

This commit is contained in:
Jack Robison 2018-02-21 14:53:12 -05:00
parent d2a6dd3ed3
commit 88970cb0a8
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
9 changed files with 24 additions and 22 deletions

View file

@ -2,7 +2,7 @@ import logging
import miniupnpc import miniupnpc
from twisted.internet import threads, defer from twisted.internet import threads, defer
from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht import node, peermanager, hashannouncer from lbrynet.dht import node
from lbrynet.database.storage import SQLiteStorage from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id from lbrynet.core.utils import generate_id
@ -122,6 +122,7 @@ class Session(object):
self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager self.payment_rate_manager_class = payment_rate_manager_class or NegotiatedPaymentRateManager
self.is_generous = is_generous self.is_generous = is_generous
self.storage = storage or SQLiteStorage(self.db_dir) self.storage = storage or SQLiteStorage(self.db_dir)
self._join_dht_deferred = None
def setup(self): def setup(self):
"""Create the blob directory and database if necessary, start all desired services""" """Create the blob directory and database if necessary, start all desired services"""
@ -221,9 +222,7 @@ class Session(object):
d.addErrback(upnp_failed) d.addErrback(upnp_failed)
return d return d
@defer.inlineCallbacks def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary
def _setup_dht(self):
log.info("Starting DHT")
self.dht_node = self.dht_node_class( self.dht_node = self.dht_node_class(
self.hash_announcer, self.hash_announcer,
udpPort=self.dht_node_port, udpPort=self.dht_node_port,
@ -233,7 +232,11 @@ class Session(object):
peer_manager=self.peer_manager, peer_manager=self.peer_manager,
peer_finder=self.peer_finder, peer_finder=self.peer_finder,
) )
yield self.dht_node.joinNetwork(self.known_dht_nodes) self.peer_manager = self.dht_node.peer_manager
self.peer_finder = self.dht_node.peer_finder
self.hash_announcer = self.dht_node.hash_announcer
self._join_dht_deferred = self.dht_node.joinNetwork(self.known_dht_nodes)
self._join_dht_deferred.addCallback(lambda _: log.info("Joined the dht"))
def _setup_other_components(self): def _setup_other_components(self):
log.debug("Setting up the rest of the components") log.debug("Setting up the rest of the components")

View file

@ -313,7 +313,7 @@ class Daemon(AuthJSONRPCServer):
self.session.peer_manager) self.session.peer_manager)
try: try:
log.info("Daemon bound to port: %d", self.peer_port) log.info("Peer protocol listening on TCP %d", self.peer_port)
self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory) self.lbry_server_port = reactor.listenTCP(self.peer_port, server_factory)
except error.CannotListenError as e: except error.CannotListenError as e:
import traceback import traceback

View file

@ -39,6 +39,7 @@ class DaemonServer(object):
try: try:
self.server_port = reactor.listenTCP( self.server_port = reactor.listenTCP(
conf.settings['api_port'], lbrynet_server, interface=conf.settings['api_host']) conf.settings['api_port'], lbrynet_server, interface=conf.settings['api_host'])
log.info("lbrynet API listening on TCP %s:%i", conf.settings['api_host'], conf.settings['api_port'])
except error.CannotListenError: except error.CannotListenError:
log.info('Daemon already running, exiting app') log.info('Daemon already running, exiting app')
raise raise

View file

@ -11,23 +11,23 @@ import hashlib
import operator import operator
import struct import struct
import time import time
import logging
from twisted.internet import defer, error, task from twisted.internet import defer, error, task
from lbrynet.core.utils import generate_id
from lbrynet.core.PeerManager import PeerManager
import constants import constants
import routingtable import routingtable
import datastore import datastore
import protocol import protocol
from error import TimeoutError from error import TimeoutError
from peermanager import PeerManager
from hashannouncer import DHTHashAnnouncer from hashannouncer import DHTHashAnnouncer
from peerfinder import DHTPeerFinder from peerfinder import DHTPeerFinder
from contact import Contact from contact import Contact
from hashwatcher import HashWatcher from hashwatcher import HashWatcher
from distance import Distance from distance import Distance
import logging
from lbrynet.core.utils import generate_id
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -168,7 +168,6 @@ class Node(object):
def start_listening(self): def start_listening(self):
try: try:
self._listeningPort = self.reactor_listenUDP(self.port, self._protocol) self._listeningPort = self.reactor_listenUDP(self.port, self._protocol)
log.info("DHT node listening on %i", self.port)
except error.CannotListenError as e: except error.CannotListenError as e:
import traceback import traceback
log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc()) log.error("Couldn't bind to port %d. %s", self.port, traceback.format_exc())
@ -191,12 +190,12 @@ class Node(object):
bootstrap_contacts = self.contacts bootstrap_contacts = self.contacts
defer.returnValue(bootstrap_contacts) defer.returnValue(bootstrap_contacts)
def _rerun(bootstrap_contacts): def _rerun(closest_nodes):
if not bootstrap_contacts: if not closest_nodes:
log.info("Failed to join the dht, re-attempting in 60 seconds") log.info("Failed to join the dht, re-attempting in 30 seconds")
self.reactor_callLater(60, self.bootstrap_join, known_node_addresses, finished_d) self.reactor_callLater(30, self.bootstrap_join, known_node_addresses, finished_d)
elif not finished_d.called: elif not finished_d.called:
finished_d.callback(bootstrap_contacts) finished_d.callback(closest_nodes)
log.info("Attempting to join the DHT network") log.info("Attempting to join the DHT network")
d = _resolve_seeds() d = _resolve_seeds()

View file

@ -23,7 +23,7 @@ from twisted.trial.unittest import TestCase
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from lbrynet.dht.peermanager import PeerManager from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter from lbrynet.core.RateLimiter import DummyRateLimiter, RateLimiter
from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory from lbrynet.core.server.BlobRequestHandler import BlobRequestHandlerFactory
from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.server.ServerProtocol import ServerProtocolFactory

View file

@ -4,8 +4,7 @@ from twisted.trial import unittest
from lbrynet import conf from lbrynet import conf
from lbrynet.core.StreamDescriptor import get_sd_info from lbrynet.core.StreamDescriptor import get_sd_info
from lbrynet import reflector from lbrynet import reflector
from lbrynet.core import BlobManager from lbrynet.core import BlobManager, PeerManager
from lbrynet.dht import peermanager
from lbrynet.core import Session from lbrynet.core import Session
from lbrynet.core import StreamDescriptor from lbrynet.core import StreamDescriptor
from lbrynet.lbry_file.client import EncryptedFileOptions from lbrynet.lbry_file.client import EncryptedFileOptions
@ -28,7 +27,7 @@ class TestReflector(unittest.TestCase):
self.port = None self.port = None
self.addCleanup(self.take_down_env) self.addCleanup(self.take_down_env)
wallet = mocks.Wallet() wallet = mocks.Wallet()
peer_manager = peermanager.PeerManager() peer_manager = PeerManager.PeerManager()
peer_finder = mocks.PeerFinder(5553, peer_manager, 2) peer_finder = mocks.PeerFinder(5553, peer_manager, 2)
hash_announcer = mocks.Announcer() hash_announcer = mocks.Announcer()
sd_identifier = StreamDescriptor.StreamDescriptorIdentifier() sd_identifier = StreamDescriptor.StreamDescriptorIdentifier()

View file

@ -13,7 +13,7 @@ from lbrynet.core.StreamDescriptor import StreamDescriptorIdentifier
from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file from lbrynet.file_manager.EncryptedFileCreator import create_lbry_file
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.core.StreamDescriptor import get_sd_info from lbrynet.core.StreamDescriptor import get_sd_info
from lbrynet.dht.peermanager import PeerManager from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.RateLimiter import DummyRateLimiter from lbrynet.core.RateLimiter import DummyRateLimiter
from lbrynet.tests import mocks from lbrynet.tests import mocks

View file

@ -3,7 +3,7 @@ from lbrynet.core.server.ServerProtocol import ServerProtocol
from lbrynet.core.client.ClientProtocol import ClientProtocol from lbrynet.core.client.ClientProtocol import ClientProtocol
from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.Peer import Peer from lbrynet.core.Peer import Peer
from lbrynet.dht.peermanager import PeerManager from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.Error import NoResponseError from lbrynet.core.Error import NoResponseError
from twisted.trial import unittest from twisted.trial import unittest