fix skipping dht, peer_protocol_server, and hash_announcer components

-move PeerFinder and PeerManager initialization to the ComponentManager

-remove dht component requirement from all but the hash_announcer component. This allows running the file manager component without either of the server components.
This commit is contained in:
Jack Robison 2018-10-30 13:41:38 -04:00
parent 0bf8416d9f
commit 253912b52e
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
8 changed files with 104 additions and 127 deletions

View file

@ -13,7 +13,6 @@ from . import constants
from . import routingtable from . import routingtable
from . import datastore from . import datastore
from . import protocol from . import protocol
from .peerfinder import DHTPeerFinder
from .contact import ContactManager from .contact import ContactManager
from .iterativefind import iterativeFind from .iterativefind import iterativeFind
@ -83,8 +82,8 @@ class Node(MockKademliaHelper):
def __init__(self, node_id=None, udpPort=4000, dataStore=None, def __init__(self, node_id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None, routingTableClass=None, networkProtocol=None,
externalIP=None, peerPort=3333, listenUDP=None, externalIP=None, peerPort=3333, listenUDP=None,
callLater=None, resolve=None, clock=None, peer_finder=None, callLater=None, resolve=None, clock=None,
peer_manager=None, interface='', externalUDPPort=None): interface='', externalUDPPort=None):
""" """
@param dataStore: The data store to use. This must be class inheriting @param dataStore: The data store to use. This must be class inheriting
from the C{DataStore} interface (or providing the from the C{DataStore} interface (or providing the
@ -124,20 +123,13 @@ class Node(MockKademliaHelper):
else: else:
self._routingTable = routingTableClass(self.node_id, self.clock.seconds) self._routingTable = routingTableClass(self.node_id, self.clock.seconds)
# Initialize this node's network access mechanisms self._protocol = networkProtocol or protocol.KademliaProtocol(self)
if networkProtocol is None:
self._protocol = protocol.KademliaProtocol(self)
else:
self._protocol = networkProtocol
# Initialize the data storage mechanism used by this node
self.token_secret = self._generateID() self.token_secret = self._generateID()
self.old_token_secret = None self.old_token_secret = None
self.externalIP = externalIP self.externalIP = externalIP
self.peerPort = peerPort self.peerPort = peerPort
self.externalUDPPort = externalUDPPort or self.port self.externalUDPPort = externalUDPPort or self.port
self._dataStore = dataStore or datastore.DictDataStore(self.clock.seconds) self._dataStore = dataStore or datastore.DictDataStore(self.clock.seconds)
self.peer_manager = peer_manager or PeerManager()
self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager)
self._join_deferred = None self._join_deferred = None
#def __del__(self): #def __del__(self):

View file

@ -18,13 +18,12 @@ class DHTPeerFinder(DummyPeerFinder):
"""This class finds peers which have announced to the DHT that they have certain blobs""" """This class finds peers which have announced to the DHT that they have certain blobs"""
#implements(IPeerFinder) #implements(IPeerFinder)
def __init__(self, dht_node, peer_manager): def __init__(self, component_manager):
""" """
dht_node - an instance of dht.Node class component_manager - an instance of ComponentManager
peer_manager - an instance of PeerManager class
""" """
self.dht_node = dht_node self.component_manager = component_manager
self.peer_manager = peer_manager self.peer_manager = component_manager.peer_manager
self.peers = {} self.peers = {}
self._ongoing_searchs = {} self._ongoing_searchs = {}
@ -39,19 +38,30 @@ class DHTPeerFinder(DummyPeerFinder):
Returns: Returns:
list of peers for the blob list of peers for the blob
""" """
self.peers.setdefault(blob_hash, {(self.dht_node.externalIP, self.dht_node.peerPort,)}) if "dht" in self.component_manager.skip_components:
return defer.succeed([])
if not self.component_manager.all_components_running("dht"):
return defer.succeed([])
dht_node = self.component_manager.get_component("dht")
self.peers.setdefault(blob_hash, {(dht_node.externalIP, dht_node.peerPort,)})
if not blob_hash in self._ongoing_searchs or self._ongoing_searchs[blob_hash].called: if not blob_hash in self._ongoing_searchs or self._ongoing_searchs[blob_hash].called:
self._ongoing_searchs[blob_hash] = self._execute_peer_search(blob_hash, timeout) self._ongoing_searchs[blob_hash] = self._execute_peer_search(dht_node, blob_hash, timeout)
peers = set(self._filter_self(blob_hash) if filter_self else self.peers[blob_hash])
def _filter_self(blob_hash):
my_host, my_port = dht_node.externalIP, dht_node.peerPort
return {(host, port) for host, port in self.peers[blob_hash] if (host, port) != (my_host, my_port)}
peers = set(_filter_self(blob_hash) if filter_self else self.peers[blob_hash])
return defer.succeed([self.peer_manager.get_peer(*peer) for peer in peers]) return defer.succeed([self.peer_manager.get_peer(*peer) for peer in peers])
@defer.inlineCallbacks @defer.inlineCallbacks
def _execute_peer_search(self, blob_hash, timeout): def _execute_peer_search(self, dht_node, blob_hash, timeout):
bin_hash = binascii.unhexlify(blob_hash) bin_hash = binascii.unhexlify(blob_hash)
finished_deferred = self.dht_node.iterativeFindValue(bin_hash, exclude=self.peers[blob_hash]) finished_deferred = dht_node.iterativeFindValue(bin_hash, exclude=self.peers[blob_hash])
timeout = timeout or conf.settings['peer_search_timeout'] timeout = timeout or conf.settings['peer_search_timeout']
if timeout: if timeout:
finished_deferred.addTimeout(timeout, self.dht_node.clock) finished_deferred.addTimeout(timeout, dht_node.clock)
try: try:
peer_list = yield finished_deferred peer_list = yield finished_deferred
self.peers[blob_hash].update({(host, port) for _, host, port in peer_list}) self.peers[blob_hash].update({(host, port) for _, host, port in peer_list})
@ -59,7 +69,3 @@ class DHTPeerFinder(DummyPeerFinder):
log.debug("DHT timed out while looking peers for blob %s after %s seconds", blob_hash, timeout) log.debug("DHT timed out while looking peers for blob %s after %s seconds", blob_hash, timeout)
finally: finally:
del self._ongoing_searchs[blob_hash] del self._ongoing_searchs[blob_hash]
def _filter_self(self, blob_hash):
my_host, my_port = self.dht_node.externalIP, self.dht_node.peerPort
return {(host, port) for host, port in self.peers[blob_hash] if (host, port) != (my_host, my_port)}

View file

@ -1,7 +1,8 @@
import logging import logging
from twisted.internet import defer from twisted.internet import defer
from lbrynet.p2p.Error import ComponentStartConditionNotMet from lbrynet.p2p.Error import ComponentStartConditionNotMet
from lbrynet.core.PeerManager import PeerManager
from lbrynet.dht.peerfinder import DHTPeerFinder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -33,13 +34,15 @@ class RequiredCondition(metaclass=RequiredConditionType):
class ComponentManager: class ComponentManager:
default_component_classes = {} default_component_classes = {}
def __init__(self, reactor=None, analytics_manager=None, skip_components=None, **override_components): def __init__(self, reactor=None, analytics_manager=None, skip_components=None,
peer_manager=None, peer_finder=None, **override_components):
self.skip_components = skip_components or [] self.skip_components = skip_components or []
self.reactor = reactor self.reactor = reactor
self.component_classes = {} self.component_classes = {}
self.components = set() self.components = set()
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
self.peer_manager = peer_manager or PeerManager()
self.peer_finder = peer_finder or DHTPeerFinder(self)
for component_name, component_class in self.default_component_classes.items(): for component_name, component_class in self.default_component_classes.items():
if component_name in override_components: if component_name in override_components:
@ -114,9 +117,9 @@ class ComponentManager:
:return: (defer.Deferred) :return: (defer.Deferred)
""" """
for component_name, cb in callbacks.items(): for component_name, cb in callbacks.items():
if component_name not in self.component_classes: if component_name not in self.component_classes:
if component_name not in self.skip_components:
raise NameError("unknown component: %s" % component_name) raise NameError("unknown component: %s" % component_name)
if not callable(cb): if not callable(cb):
raise ValueError("%s is not callable" % cb) raise ValueError("%s is not callable" % cb)

View file

@ -42,7 +42,6 @@ HEADERS_COMPONENT = "blockchain_headers"
WALLET_COMPONENT = "wallet" WALLET_COMPONENT = "wallet"
DHT_COMPONENT = "dht" DHT_COMPONENT = "dht"
HASH_ANNOUNCER_COMPONENT = "hash_announcer" HASH_ANNOUNCER_COMPONENT = "hash_announcer"
STREAM_IDENTIFIER_COMPONENT = "stream_identifier"
FILE_MANAGER_COMPONENT = "file_manager" FILE_MANAGER_COMPONENT = "file_manager"
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
REFLECTOR_COMPONENT = "reflector" REFLECTOR_COMPONENT = "reflector"
@ -366,7 +365,7 @@ class WalletComponent(Component):
class BlobComponent(Component): class BlobComponent(Component):
component_name = BLOB_COMPONENT component_name = BLOB_COMPONENT
depends_on = [DATABASE_COMPONENT, DHT_COMPONENT] depends_on = [DATABASE_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
super().__init__(component_manager) super().__init__(component_manager)
@ -378,8 +377,12 @@ class BlobComponent(Component):
def start(self): def start(self):
storage = self.component_manager.get_component(DATABASE_COMPONENT) storage = self.component_manager.get_component(DATABASE_COMPONENT)
datastore = None
if DHT_COMPONENT not in self.component_manager.skip_components:
dht_node = self.component_manager.get_component(DHT_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT)
self.blob_manager = DiskBlobManager(CS.get_blobfiles_dir(), storage, dht_node._dataStore) if dht_node:
datastore = dht_node._dataStore
self.blob_manager = DiskBlobManager(CS.get_blobfiles_dir(), storage, datastore)
return self.blob_manager.setup() return self.blob_manager.setup()
def stop(self): def stop(self):
@ -439,11 +442,7 @@ class DHTComponent(Component):
peerPort=self.external_peer_port peerPort=self.external_peer_port
) )
self.dht_node.start_listening() yield self.dht_node.start(GCS('known_dht_nodes'))
yield self.dht_node._protocol._listening
d = self.dht_node.joinNetwork(GCS('known_dht_nodes'))
d.addCallback(lambda _: self.dht_node.start_looping_calls())
d.addCallback(lambda _: log.info("Joined the dht"))
log.info("Started the dht") log.info("Started the dht")
@defer.inlineCallbacks @defer.inlineCallbacks
@ -500,41 +499,6 @@ class RateLimiterComponent(Component):
return defer.succeed(None) return defer.succeed(None)
class StreamIdentifierComponent(Component):
component_name = STREAM_IDENTIFIER_COMPONENT
depends_on = [DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
def __init__(self, component_manager):
super().__init__(component_manager)
self.sd_identifier = StreamDescriptorIdentifier()
@property
def component(self):
return self.sd_identifier
@defer.inlineCallbacks
def start(self):
dht_node = self.component_manager.get_component(DHT_COMPONENT)
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT)
add_lbry_file_to_sd_identifier(self.sd_identifier)
file_saver_factory = EncryptedFileSaverFactory(
dht_node.peer_finder,
rate_limiter,
blob_manager,
storage,
wallet,
GCS('download_directory')
)
yield self.sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
def stop(self):
pass
class PaymentRateComponent(Component): class PaymentRateComponent(Component):
component_name = PAYMENT_RATE_COMPONENT component_name = PAYMENT_RATE_COMPONENT
@ -555,8 +519,8 @@ class PaymentRateComponent(Component):
class FileManagerComponent(Component): class FileManagerComponent(Component):
component_name = FILE_MANAGER_COMPONENT component_name = FILE_MANAGER_COMPONENT
depends_on = [DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, depends_on = [RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT,
STREAM_IDENTIFIER_COMPONENT, PAYMENT_RATE_COMPONENT] PAYMENT_RATE_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
super().__init__(component_manager) super().__init__(component_manager)
@ -575,15 +539,26 @@ class FileManagerComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
dht_node = self.component_manager.get_component(DHT_COMPONENT)
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT) rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT) blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT) storage = self.component_manager.get_component(DATABASE_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT) wallet = self.component_manager.get_component(WALLET_COMPONENT)
sd_identifier = self.component_manager.get_component(STREAM_IDENTIFIER_COMPONENT)
sd_identifier = StreamDescriptorIdentifier()
add_lbry_file_to_sd_identifier(sd_identifier)
file_saver_factory = EncryptedFileSaverFactory(
self.component_manager.peer_finder,
rate_limiter,
blob_manager,
storage,
wallet,
GCS('download_directory')
)
yield sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory)
payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT) payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT)
log.info('Starting the file manager') log.info('Starting the file manager')
self.file_manager = EncryptedFileManager(dht_node.peer_finder, rate_limiter, blob_manager, wallet, self.file_manager = EncryptedFileManager(self.component_manager.peer_finder, rate_limiter, blob_manager, wallet,
payment_rate_manager, storage, sd_identifier) payment_rate_manager, storage, sd_identifier)
yield self.file_manager.setup() yield self.file_manager.setup()
log.info('Done setting up file manager') log.info('Done setting up file manager')
@ -595,7 +570,7 @@ class FileManagerComponent(Component):
class PeerProtocolServerComponent(Component): class PeerProtocolServerComponent(Component):
component_name = PEER_PROTOCOL_SERVER_COMPONENT component_name = PEER_PROTOCOL_SERVER_COMPONENT
depends_on = [UPNP_COMPONENT, DHT_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, depends_on = [UPNP_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT,
PAYMENT_RATE_COMPONENT] PAYMENT_RATE_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
@ -624,7 +599,7 @@ class PeerProtocolServerComponent(Component):
} }
server_factory = ServerProtocolFactory( server_factory = ServerProtocolFactory(
self.component_manager.get_component(RATE_LIMITER_COMPONENT), query_handlers, self.component_manager.get_component(RATE_LIMITER_COMPONENT), query_handlers,
self.component_manager.get_component(DHT_COMPONENT).peer_manager self.component_manager.peer_manager
) )
try: try:
@ -648,7 +623,7 @@ class PeerProtocolServerComponent(Component):
class ReflectorComponent(Component): class ReflectorComponent(Component):
component_name = REFLECTOR_COMPONENT component_name = REFLECTOR_COMPONENT
depends_on = [DHT_COMPONENT, BLOB_COMPONENT, FILE_MANAGER_COMPONENT] depends_on = [BLOB_COMPONENT, FILE_MANAGER_COMPONENT]
def __init__(self, component_manager): def __init__(self, component_manager):
super().__init__(component_manager) super().__init__(component_manager)
@ -662,10 +637,9 @@ class ReflectorComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
log.info("Starting reflector server") log.info("Starting reflector server")
dht_node = self.component_manager.get_component(DHT_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT) blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
reflector_factory = reflector_server_factory(dht_node.peer_manager, blob_manager, file_manager) reflector_factory = reflector_server_factory(self.component_manager.peer_manager, blob_manager, file_manager)
try: try:
self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory) self.reflector_server = yield reactor.listenTCP(self.reflector_server_port, reflector_factory)
log.info('Started reflector on port %s', self.reflector_server_port) log.info('Started reflector on port %s', self.reflector_server_port)
@ -701,10 +675,12 @@ class UPnPComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def _setup_redirects(self): def _setup_redirects(self):
upnp_redirects = yield DeferredDict({ d = {}
"UDP": from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")), if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
"TCP": from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")) d["TCP"] = from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port"))
}) if DHT_COMPONENT not in self.component_manager.skip_components:
d["UDP"] = from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port"))
upnp_redirects = yield DeferredDict(d)
self.upnp_redirects.update(upnp_redirects) self.upnp_redirects.update(upnp_redirects)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -739,10 +715,12 @@ class UPnPComponent(Component):
if not self.upnp_redirects and self.upnp: # setup missing redirects if not self.upnp_redirects and self.upnp: # setup missing redirects
try: try:
log.info("add UPnP port mappings") log.info("add UPnP port mappings")
upnp_redirects = yield DeferredDict({ d = {}
"UDP": from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")), if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
"TCP": from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")) d["TCP"] = from_future(self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port"))
}) if DHT_COMPONENT not in self.component_manager.skip_components:
d["UDP"] = from_future(self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port"))
upnp_redirects = yield DeferredDict(d)
log.info("set up redirects: %s", upnp_redirects) log.info("set up redirects: %s", upnp_redirects)
self.upnp_redirects.update(upnp_redirects) self.upnp_redirects.update(upnp_redirects)
except (asyncio.TimeoutError, UPnPError): except (asyncio.TimeoutError, UPnPError):
@ -756,7 +734,7 @@ class UPnPComponent(Component):
if proto in self.upnp_redirects and mapping['NewExternalPort'] == self.upnp_redirects[proto]: if proto in self.upnp_redirects and mapping['NewExternalPort'] == self.upnp_redirects[proto]:
if mapping['NewInternalClient'] == self.upnp.lan_address: if mapping['NewInternalClient'] == self.upnp.lan_address:
found.add(proto) found.add(proto)
if 'UDP' not in found: if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components:
try: try:
udp_port = yield from_future( udp_port = yield from_future(
self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port") self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
@ -765,7 +743,7 @@ class UPnPComponent(Component):
log.info("refreshed upnp redirect for dht port: %i", udp_port) log.info("refreshed upnp redirect for dht port: %i", udp_port)
except (asyncio.TimeoutError, UPnPError): except (asyncio.TimeoutError, UPnPError):
del self.upnp_redirects['UDP'] del self.upnp_redirects['UDP']
if 'TCP' not in found: if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
try: try:
tcp_port = yield from_future( tcp_port = yield from_future(
self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port") self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
@ -774,7 +752,10 @@ class UPnPComponent(Component):
log.info("refreshed upnp redirect for peer port: %i", tcp_port) log.info("refreshed upnp redirect for peer port: %i", tcp_port)
except (asyncio.TimeoutError, UPnPError): except (asyncio.TimeoutError, UPnPError):
del self.upnp_redirects['TCP'] del self.upnp_redirects['TCP']
if 'TCP' in self.upnp_redirects and 'UDP' in self.upnp_redirects: if ('TCP' in self.upnp_redirects
and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components) and (
'UDP' in self.upnp_redirects and DHT_COMPONENT not in self.component_manager.skip_components):
if self.upnp_redirects:
log.debug("upnp redirects are still active") log.debug("upnp redirects are still active")
@defer.inlineCallbacks @defer.inlineCallbacks
@ -786,10 +767,12 @@ class UPnPComponent(Component):
success = False success = False
yield self._maintain_redirects() yield self._maintain_redirects()
if self.upnp: if self.upnp:
if not self.upnp_redirects: if not self.upnp_redirects and not all([x in self.component_manager.skip_components for x in
(DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)]):
log.error("failed to setup upnp, debugging infomation: %s", self.upnp.zipped_debugging_info) log.error("failed to setup upnp, debugging infomation: %s", self.upnp.zipped_debugging_info)
else: else:
success = True success = True
if self.upnp_redirects:
log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string) log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string)
else: else:
log.error("failed to setup upnp") log.error("failed to setup upnp")

View file

@ -210,7 +210,6 @@ class Daemon(AuthJSONRPCServer):
DATABASE_COMPONENT: "storage", DATABASE_COMPONENT: "storage",
DHT_COMPONENT: "dht_node", DHT_COMPONENT: "dht_node",
WALLET_COMPONENT: "wallet_manager", WALLET_COMPONENT: "wallet_manager",
STREAM_IDENTIFIER_COMPONENT: "sd_identifier",
FILE_MANAGER_COMPONENT: "file_manager", FILE_MANAGER_COMPONENT: "file_manager",
EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager", EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager",
PAYMENT_RATE_COMPONENT: "payment_rate_manager", PAYMENT_RATE_COMPONENT: "payment_rate_manager",
@ -241,7 +240,6 @@ class Daemon(AuthJSONRPCServer):
self.storage = None self.storage = None
self.dht_node = None self.dht_node = None
self.wallet_manager: LbryWalletManager = None self.wallet_manager: LbryWalletManager = None
self.sd_identifier = None
self.file_manager = None self.file_manager = None
self.exchange_rate_manager = None self.exchange_rate_manager = None
self.payment_rate_manager = None self.payment_rate_manager = None
@ -305,7 +303,7 @@ class Daemon(AuthJSONRPCServer):
rate_manager = rate_manager or self.payment_rate_manager rate_manager = rate_manager or self.payment_rate_manager
timeout = timeout or 30 timeout = timeout or 30
downloader = StandaloneBlobDownloader( downloader = StandaloneBlobDownloader(
blob_hash, self.blob_manager, self.dht_node.peer_finder, self.rate_limiter, blob_hash, self.blob_manager, self.component_manager.peer_finder, self.rate_limiter,
rate_manager, self.wallet_manager, timeout rate_manager, self.wallet_manager, timeout
) )
return downloader.download() return downloader.download()
@ -372,8 +370,8 @@ class Daemon(AuthJSONRPCServer):
self.analytics_manager.send_download_started(download_id, name, claim_dict) self.analytics_manager.send_download_started(download_id, name, claim_dict)
self.analytics_manager.send_new_download_start(download_id, name, claim_dict) self.analytics_manager.send_new_download_start(download_id, name, claim_dict)
self.streams[sd_hash] = GetStream( self.streams[sd_hash] = GetStream(
self.sd_identifier, self.wallet_manager, self.exchange_rate_manager, self.blob_manager, self.file_manager.sd_identifier, self.wallet_manager, self.exchange_rate_manager, self.blob_manager,
self.dht_node.peer_finder, self.rate_limiter, self.payment_rate_manager, self.storage, self.component_manager.peer_finder, self.rate_limiter, self.payment_rate_manager, self.storage,
conf.settings['max_key_fee'], conf.settings['disable_max_key_fee'], conf.settings['data_rate'], conf.settings['max_key_fee'], conf.settings['disable_max_key_fee'], conf.settings['data_rate'],
timeout timeout
) )
@ -432,7 +430,7 @@ class Daemon(AuthJSONRPCServer):
if blob: if blob:
return self.blob_manager.get_blob(blob[0]) return self.blob_manager.get_blob(blob[0])
return download_sd_blob( return download_sd_blob(
sd_hash.decode(), self.blob_manager, self.dht_node.peer_finder, self.rate_limiter, sd_hash.decode(), self.blob_manager, self.component_manager.peer_finder, self.rate_limiter,
self.payment_rate_manager, self.wallet_manager, timeout=conf.settings['peer_search_timeout'], self.payment_rate_manager, self.wallet_manager, timeout=conf.settings['peer_search_timeout'],
download_mirrors=conf.settings['download_mirrors'] download_mirrors=conf.settings['download_mirrors']
) )
@ -819,7 +817,7 @@ class Daemon(AuthJSONRPCServer):
'ip': (str) remote ip, if available, 'ip': (str) remote ip, if available,
'lbrynet_version': (str) lbrynet_version, 'lbrynet_version': (str) lbrynet_version,
'lbryum_version': (str) lbryum_version, 'lbryum_version': (str) lbryum_version,
'lbrynet.schema_version': (str) lbrynet.schema_version, 'lbryschema_version': (str) lbryschema_version,
'os_release': (str) os release string 'os_release': (str) os release string
'os_system': (str) os name 'os_system': (str) os name
'platform': (str) platform string 'platform': (str) platform string
@ -1776,8 +1774,8 @@ class Daemon(AuthJSONRPCServer):
results[resolved_uri] = resolved[resolved_uri] results[resolved_uri] = resolved[resolved_uri]
return results return results
@requires(STREAM_IDENTIFIER_COMPONENT, WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED]) conditions=[WALLET_IS_UNLOCKED])
async def jsonrpc_get(self, uri, file_name=None, timeout=None): async def jsonrpc_get(self, uri, file_name=None, timeout=None):
""" """
@ -1962,7 +1960,7 @@ class Daemon(AuthJSONRPCServer):
response = yield self._render_response(result) response = yield self._render_response(result)
defer.returnValue(response) defer.returnValue(response)
@requires(STREAM_IDENTIFIER_COMPONENT, WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT,
conditions=[WALLET_IS_UNLOCKED]) conditions=[WALLET_IS_UNLOCKED])
def jsonrpc_stream_cost_estimate(self, uri, size=None): def jsonrpc_stream_cost_estimate(self, uri, size=None):

View file

@ -3,10 +3,10 @@ from twisted.trial import unittest
from io import StringIO from io import StringIO
from twisted.internet import defer from twisted.internet import defer
from lbrynet.extras.daemon import conf from lbrynet import conf
from lbrynet.extras import cli from lbrynet import cli
from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT, \ from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT, \
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT, \ DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, \
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, \ PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, \
RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT
from lbrynet.extras.daemon.Daemon import Daemon from lbrynet.extras.daemon.Daemon import Daemon
@ -35,7 +35,7 @@ class CLIIntegrationTest(unittest.TestCase):
def setUp(self): def setUp(self):
skip = [ skip = [
DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT, DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT,
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT,
RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT
] ]

View file

@ -2,7 +2,7 @@ from twisted.internet.task import Clock
from twisted.trial import unittest from twisted.trial import unittest
from lbrynet.extras.daemon.ComponentManager import ComponentManager from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, STREAM_IDENTIFIER_COMPONENT from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT
from lbrynet.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT from lbrynet.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT
from lbrynet.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT from lbrynet.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
from lbrynet.extras.daemon.Components import RATE_LIMITER_COMPONENT, HEADERS_COMPONENT, PAYMENT_RATE_COMPONENT from lbrynet.extras.daemon.Components import RATE_LIMITER_COMPONENT, HEADERS_COMPONENT, PAYMENT_RATE_COMPONENT
@ -13,6 +13,7 @@ from tests import mocks
class TestComponentManager(unittest.TestCase): class TestComponentManager(unittest.TestCase):
def setUp(self): def setUp(self):
mocks.mock_conf_settings(self) mocks.mock_conf_settings(self)
self.default_components_sort = [ self.default_components_sort = [
[ [
Components.HeadersComponent, Components.HeadersComponent,
@ -23,19 +24,14 @@ class TestComponentManager(unittest.TestCase):
Components.UPnPComponent Components.UPnPComponent
], ],
[ [
Components.BlobComponent,
Components.DHTComponent, Components.DHTComponent,
Components.WalletComponent Components.WalletComponent
], ],
[ [
Components.BlobComponent, Components.FileManagerComponent,
Components.HashAnnouncerComponent Components.HashAnnouncerComponent,
], Components.PeerProtocolServerComponent
[
Components.PeerProtocolServerComponent,
Components.StreamIdentifierComponent
],
[
Components.FileManagerComponent
], ],
[ [
Components.ReflectorComponent Components.ReflectorComponent
@ -48,7 +44,6 @@ class TestComponentManager(unittest.TestCase):
def test_sort_components(self): def test_sort_components(self):
stages = self.component_manager.sort_components() stages = self.component_manager.sort_components()
for stage_list, sorted_stage_list in zip(stages, self.default_components_sort): for stage_list, sorted_stage_list in zip(stages, self.default_components_sort):
self.assertEqual([type(stage) for stage in stage_list], sorted_stage_list) self.assertEqual([type(stage) for stage in stage_list], sorted_stage_list)
@ -101,7 +96,7 @@ class TestComponentManagerProperStart(unittest.TestCase):
self.reactor = Clock() self.reactor = Clock()
mocks.mock_conf_settings(self) mocks.mock_conf_settings(self)
self.component_manager = ComponentManager( self.component_manager = ComponentManager(
skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT,
HEADERS_COMPONENT, PAYMENT_RATE_COMPONENT, RATE_LIMITER_COMPONENT, HEADERS_COMPONENT, PAYMENT_RATE_COMPONENT, RATE_LIMITER_COMPONENT,
EXCHANGE_RATE_MANAGER_COMPONENT], EXCHANGE_RATE_MANAGER_COMPONENT],

View file

@ -12,7 +12,7 @@ from lbrynet.schema.decode import smart_decode
from lbrynet.extras.daemon import conf from lbrynet.extras.daemon import conf
from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.extras.daemon.storage import SQLiteStorage
from lbrynet.extras.daemon.ComponentManager import ComponentManager from lbrynet.extras.daemon.ComponentManager import ComponentManager
from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, STREAM_IDENTIFIER_COMPONENT from lbrynet.extras.daemon.Components import DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT
from lbrynet.extras.daemon.Components import f2d from lbrynet.extras.daemon.Components import f2d
from lbrynet.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, BLOB_COMPONENT from lbrynet.extras.daemon.Components import HASH_ANNOUNCER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, BLOB_COMPONENT
from lbrynet.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT from lbrynet.extras.daemon.Components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT
@ -45,7 +45,7 @@ def get_test_daemon(data_rate=None, generous=True, with_fee=False):
component_manager = ComponentManager( component_manager = ComponentManager(
skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, UPNP_COMPONENT, skip_components=[DATABASE_COMPONENT, DHT_COMPONENT, WALLET_COMPONENT, UPNP_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, HASH_ANNOUNCER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, HASH_ANNOUNCER_COMPONENT,
STREAM_IDENTIFIER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT,
HEADERS_COMPONENT, RATE_LIMITER_COMPONENT], HEADERS_COMPONENT, RATE_LIMITER_COMPONENT],
file_manager=FakeFileManager file_manager=FakeFileManager
) )