From a3de065c93a7796cee57c147e25b2ccfd23abb7f Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 30 Jul 2018 17:58:17 -0400 Subject: [PATCH] use txupnp instead of miniupnpc --- lbrynet/core/Wallet.py | 1 - lbrynet/core/log_support.py | 1 + lbrynet/daemon/Components.py | 187 ++++++++++++---------------------- lbrynet/daemon/auth/server.py | 7 +- setup.py | 3 +- 5 files changed, 74 insertions(+), 125 deletions(-) diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index eba48ed0f..338232a5f 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -1,4 +1,3 @@ -import os from collections import defaultdict, deque import datetime import logging diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index add93ea84..7b192136f 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -89,6 +89,7 @@ def disable_third_party_loggers(): logging.getLogger('BitcoinRPC').setLevel(logging.INFO) logging.getLogger('lbryum').setLevel(logging.WARNING) logging.getLogger('twisted').setLevel(logging.CRITICAL) + logging.getLogger('txupnp').setLevel(logging.WARNING) @_log_decorator diff --git a/lbrynet/daemon/Components.py b/lbrynet/daemon/Components.py index 1de589cf8..8b33e4909 100644 --- a/lbrynet/daemon/Components.py +++ b/lbrynet/daemon/Components.py @@ -1,13 +1,15 @@ import os import logging from hashlib import sha256 -import miniupnpc import treq import math from twisted.internet import defer, threads, reactor, error +from txupnp.fault import UPnPError +from txupnp.upnp import UPnP from lbryum.simple_config import SimpleConfig from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbrynet import conf +from lbrynet.core.utils import DeferredDict from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.BlobManager import DiskBlobManager @@ -24,7 +26,6 @@ from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverF from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.txlbryum.factory import StratumClient - from lbrynet.core.utils import generate_id log = logging.getLogger(__name__) @@ -245,7 +246,6 @@ class HeadersComponent(Component): @defer.inlineCallbacks def should_download_headers_from_s3(self): - from lbrynet import conf if conf.settings['blockchain_name'] != "lbrycrd_main": defer.returnValue(False) self._check_header_file_integrity() @@ -266,7 +266,6 @@ class HeadersComponent(Component): def _check_header_file_integrity(self): # TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity - from lbrynet import conf if conf.settings['blockchain_name'] != "lbrycrd_main": return hashsum = sha256() @@ -370,7 +369,8 @@ class DHTComponent(Component): Component.__init__(self, component_manager) self.dht_node = None self.upnp_component = None - self.udp_port, self.peer_port = None, None + self.udp_port = None + self.peer_port = None @property def component(self): @@ -561,42 +561,33 @@ class PeerProtocolServerComponent(Component): @defer.inlineCallbacks def start(self): - query_handlers = {} - upnp_component = self.component_manager.get_component(UPNP_COMPONENT) - dht_node = self.component_manager.get_component(DHT_COMPONENT) - rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT) - blob_manager = self.component_manager.get_component(BLOB_COMPONENT) wallet = self.component_manager.get_component(WALLET_COMPONENT) - payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT) + peer_port = self.component_manager.get_component(UPNP_COMPONENT).upnp_redirects["TCP"] + query_handlers = { + handler.get_primary_query_identifier(): handler for handler in [ + BlobRequestHandlerFactory( + self.component_manager.get_component(BLOB_COMPONENT), + wallet, + self.component_manager.get_component(PAYMENT_RATE_COMPONENT), + self.component_manager.analytics_manager + ), + wallet.get_wallet_info_query_handler_factory(), + ] + } + server_factory = ServerProtocolFactory( + self.component_manager.get_component(RATE_LIMITER_COMPONENT), query_handlers, + self.component_manager.get_component(DHT_COMPONENT).peer_manager + ) - peer_port, udp_port = upnp_component.get_redirects() - - handlers = [ - BlobRequestHandlerFactory( - blob_manager, - wallet, - payment_rate_manager, - self.component_manager.analytics_manager - ), - wallet.get_wallet_info_query_handler_factory(), - ] - - for handler in handlers: - query_id = handler.get_primary_query_identifier() - query_handlers[query_id] = handler - - if peer_port is not None: - server_factory = ServerProtocolFactory(rate_limiter, query_handlers, dht_node.peer_manager) - - try: - log.info("Peer protocol listening on TCP %d", peer_port) - self.lbry_server_port = yield reactor.listenTCP(peer_port, server_factory) - except error.CannotListenError as e: - import traceback - log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" - " more details.", peer_port) - log.error("%s", traceback.format_exc()) - raise ValueError("%s lbrynet may already be running on your computer." % str(e)) + try: + log.info("Peer protocol listening on TCP %d", peer_port) + self.lbry_server_port = yield reactor.listenTCP(peer_port, server_factory) + except error.CannotListenError as e: + import traceback + log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" + " more details.", peer_port) + log.error("%s", traceback.format_exc()) + raise ValueError("%s lbrynet may already be running on your computer." % str(e)) @defer.inlineCallbacks def stop(self): @@ -646,100 +637,54 @@ class UPnPComponent(Component): def __init__(self, component_manager): Component.__init__(self, component_manager) - self.peer_port = GCS('peer_port') - self.dht_node_port = GCS('dht_node_port') + self._default_peer_port = GCS('peer_port') + self._default_dht_node_port = GCS('dht_node_port') self.use_upnp = GCS('use_upnp') - self.external_ip = CS.get_external_ip() - self.upnp_redirects = [] + self.external_ip = None + self.upnp = UPnP(self.component_manager.reactor, try_miniupnpc_fallback=True) + self.upnp_redirects = {} @property def component(self): return self def get_redirects(self): - return self.peer_port, self.dht_node_port + if not self.use_upnp or not self.upnp_redirects: + return self._default_peer_port, self._default_dht_node_port + return self.upnp_redirects["TCP"], self.upnp_redirects["UDP"] + @defer.inlineCallbacks + def _setup_redirects(self): + self.external_ip = yield self.upnp.get_external_ip() + upnp_redirects = yield DeferredDict({ + "UDP": self.upnp.get_next_mapping(self._default_dht_node_port, "UDP", "LBRY DHT port"), + "TCP": self.upnp.get_next_mapping(self._default_peer_port, "TCP", "LBRY peer port") + }) + self.upnp_redirects.update(upnp_redirects) + + @defer.inlineCallbacks def start(self): log.debug("In _try_upnp") - - def get_free_port(upnp, port, protocol): - # returns an existing mapping if it exists - mapping = upnp.getspecificportmapping(port, protocol) - if not mapping: - return port - if upnp.lanaddr == mapping[0]: - return mapping[1] - return get_free_port(upnp, port + 1, protocol) - - def get_port_mapping(upnp, port, protocol, description): - # try to map to the requested port, if there is already a mapping use the next external - # port available - if protocol not in ['UDP', 'TCP']: - raise Exception("invalid protocol") - port = get_free_port(upnp, port, protocol) - if isinstance(port, tuple): - log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it", - self.external_ip, port, protocol, upnp.lanaddr, port) - return port - upnp.addportmapping(port, protocol, upnp.lanaddr, port, - description, '') - log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port, - protocol, upnp.lanaddr, port) - return port - - def threaded_try_upnp(): - if self.use_upnp is False: - log.debug("Not using upnp") - return False - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - external_ip = u.externalipaddress() - if external_ip != '0.0.0.0' and not self.external_ip: - # best not to rely on this external ip, the router can be behind layers of NATs - self.external_ip = external_ip - if self.peer_port: - self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port') - self.upnp_redirects.append((self.peer_port, 'TCP')) - if self.dht_node_port: - self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port') - self.upnp_redirects.append((self.dht_node_port, 'UDP')) - return True - return False - - def upnp_failed(err): - log.warning("UPnP failed. Reason: %s", err.getErrorMessage()) - return False - - d = threads.deferToThread(threaded_try_upnp) - d.addErrback(upnp_failed) - return d + found = yield self.upnp.discover() + if found and not self.upnp.miniupnpc_runner: + log.info("set up redirects using txupnp") + elif found and self.upnp.miniupnpc_runner: + log.warning("failed to set up redirect with txupnp, miniupnpc fallback was successful") + if found: + try: + yield self._setup_redirects() + except Exception as err: + if not self.upnp.miniupnpc_runner: + started_fallback = yield self.upnp.start_miniupnpc_fallback() + if started_fallback: + yield self._setup_redirects() + else: + log.warning("failed to set up upnp redirects") def stop(self): - log.info("Unsetting upnp for session") - - def threaded_unset_upnp(): - if self.use_upnp is False: - log.debug("Not using upnp") - return False - u = miniupnpc.UPnP() - num_devices_found = u.discover() - if num_devices_found > 0: - u.selectigd() - for port, protocol in self.upnp_redirects: - if u.getspecificportmapping(port, protocol) is None: - log.warning( - "UPnP redirect for %s %d was removed by something else.", - protocol, port) - else: - u.deleteportmapping(port, protocol) - log.info("Removed UPnP redirect for %s %d.", protocol, port) - self.upnp_redirects = [] - - d = threads.deferToThread(threaded_unset_upnp) - d.addErrback(lambda err: str(err)) - return d + return defer.DeferredList( + [self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()] + ) class ExchangeRateManagerComponent(Component): diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index db76a618c..4315c7d92 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -196,11 +196,14 @@ class AuthJSONRPCServer(AuthorizedBase): component_attributes = {} def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, to_skip=None, - looping_calls=None): + looping_calls=None, reactor=None): + if not reactor: + from twisted.internet import reactor self.analytics_manager = analytics_manager or analytics.Manager.new_instance() self.component_manager = component_manager or ComponentManager( analytics_manager=self.analytics_manager, - skip_components=to_skip or [] + skip_components=to_skip or [], + reactor=reactor ) self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).iteritems()}) self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).iteritems()} diff --git a/setup.py b/setup.py index e72f4a9d6..98d9b46c2 100644 --- a/setup.py +++ b/setup.py @@ -24,13 +24,14 @@ requires = [ 'lbryschema==0.0.16', 'lbryum==3.2.3', 'miniupnpc', + 'txupnp==0.0.1a6', 'pyyaml', 'requests', 'txJSON-RPC', 'zope.interface', 'treq', 'docopt', - 'six' + 'six', ] console_scripts = [