use txupnp instead of miniupnpc

This commit is contained in:
Jack Robison 2018-07-30 17:58:17 -04:00
parent 99207b7221
commit a3de065c93
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 74 additions and 125 deletions

View file

@ -1,4 +1,3 @@
import os
from collections import defaultdict, deque from collections import defaultdict, deque
import datetime import datetime
import logging import logging

View file

@ -89,6 +89,7 @@ def disable_third_party_loggers():
logging.getLogger('BitcoinRPC').setLevel(logging.INFO) logging.getLogger('BitcoinRPC').setLevel(logging.INFO)
logging.getLogger('lbryum').setLevel(logging.WARNING) logging.getLogger('lbryum').setLevel(logging.WARNING)
logging.getLogger('twisted').setLevel(logging.CRITICAL) logging.getLogger('twisted').setLevel(logging.CRITICAL)
logging.getLogger('txupnp').setLevel(logging.WARNING)
@_log_decorator @_log_decorator

View file

@ -1,13 +1,15 @@
import os import os
import logging import logging
from hashlib import sha256 from hashlib import sha256
import miniupnpc
import treq import treq
import math import math
from twisted.internet import defer, threads, reactor, error from twisted.internet import defer, threads, reactor, error
from txupnp.fault import UPnPError
from txupnp.upnp import UPnP
from lbryum.simple_config import SimpleConfig from lbryum.simple_config import SimpleConfig
from lbryum.constants import HEADERS_URL, HEADER_SIZE from lbryum.constants import HEADERS_URL, HEADER_SIZE
from lbrynet import conf from lbrynet import conf
from lbrynet.core.utils import DeferredDict
from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.BlobManager import DiskBlobManager
@ -24,7 +26,6 @@ from lbrynet.lbry_file.client.EncryptedFileDownloader import EncryptedFileSaverF
from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbry_file.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier
from lbrynet.reflector import ServerFactory as reflector_server_factory from lbrynet.reflector import ServerFactory as reflector_server_factory
from lbrynet.txlbryum.factory import StratumClient from lbrynet.txlbryum.factory import StratumClient
from lbrynet.core.utils import generate_id from lbrynet.core.utils import generate_id
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -245,7 +246,6 @@ class HeadersComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def should_download_headers_from_s3(self): def should_download_headers_from_s3(self):
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main": if conf.settings['blockchain_name'] != "lbrycrd_main":
defer.returnValue(False) defer.returnValue(False)
self._check_header_file_integrity() self._check_header_file_integrity()
@ -266,7 +266,6 @@ class HeadersComponent(Component):
def _check_header_file_integrity(self): def _check_header_file_integrity(self):
# TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity # TODO: temporary workaround for usability. move to txlbryum and check headers instead of file integrity
from lbrynet import conf
if conf.settings['blockchain_name'] != "lbrycrd_main": if conf.settings['blockchain_name'] != "lbrycrd_main":
return return
hashsum = sha256() hashsum = sha256()
@ -370,7 +369,8 @@ class DHTComponent(Component):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
self.dht_node = None self.dht_node = None
self.upnp_component = None self.upnp_component = None
self.udp_port, self.peer_port = None, None self.udp_port = None
self.peer_port = None
@property @property
def component(self): def component(self):
@ -561,32 +561,23 @@ class PeerProtocolServerComponent(Component):
@defer.inlineCallbacks @defer.inlineCallbacks
def start(self): def start(self):
query_handlers = {}
upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
dht_node = self.component_manager.get_component(DHT_COMPONENT)
rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT)
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
wallet = self.component_manager.get_component(WALLET_COMPONENT) wallet = self.component_manager.get_component(WALLET_COMPONENT)
payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT) peer_port = self.component_manager.get_component(UPNP_COMPONENT).upnp_redirects["TCP"]
query_handlers = {
peer_port, udp_port = upnp_component.get_redirects() handler.get_primary_query_identifier(): handler for handler in [
handlers = [
BlobRequestHandlerFactory( BlobRequestHandlerFactory(
blob_manager, self.component_manager.get_component(BLOB_COMPONENT),
wallet, wallet,
payment_rate_manager, self.component_manager.get_component(PAYMENT_RATE_COMPONENT),
self.component_manager.analytics_manager self.component_manager.analytics_manager
), ),
wallet.get_wallet_info_query_handler_factory(), wallet.get_wallet_info_query_handler_factory(),
] ]
}
for handler in handlers: server_factory = ServerProtocolFactory(
query_id = handler.get_primary_query_identifier() self.component_manager.get_component(RATE_LIMITER_COMPONENT), query_handlers,
query_handlers[query_id] = handler self.component_manager.get_component(DHT_COMPONENT).peer_manager
)
if peer_port is not None:
server_factory = ServerProtocolFactory(rate_limiter, query_handlers, dht_node.peer_manager)
try: try:
log.info("Peer protocol listening on TCP %d", peer_port) log.info("Peer protocol listening on TCP %d", peer_port)
@ -646,100 +637,54 @@ class UPnPComponent(Component):
def __init__(self, component_manager): def __init__(self, component_manager):
Component.__init__(self, component_manager) Component.__init__(self, component_manager)
self.peer_port = GCS('peer_port') self._default_peer_port = GCS('peer_port')
self.dht_node_port = GCS('dht_node_port') self._default_dht_node_port = GCS('dht_node_port')
self.use_upnp = GCS('use_upnp') self.use_upnp = GCS('use_upnp')
self.external_ip = CS.get_external_ip() self.external_ip = None
self.upnp_redirects = [] self.upnp = UPnP(self.component_manager.reactor, try_miniupnpc_fallback=True)
self.upnp_redirects = {}
@property @property
def component(self): def component(self):
return self return self
def get_redirects(self): def get_redirects(self):
return self.peer_port, self.dht_node_port if not self.use_upnp or not self.upnp_redirects:
return self._default_peer_port, self._default_dht_node_port
return self.upnp_redirects["TCP"], self.upnp_redirects["UDP"]
@defer.inlineCallbacks
def _setup_redirects(self):
self.external_ip = yield self.upnp.get_external_ip()
upnp_redirects = yield DeferredDict({
"UDP": self.upnp.get_next_mapping(self._default_dht_node_port, "UDP", "LBRY DHT port"),
"TCP": self.upnp.get_next_mapping(self._default_peer_port, "TCP", "LBRY peer port")
})
self.upnp_redirects.update(upnp_redirects)
@defer.inlineCallbacks
def start(self): def start(self):
log.debug("In _try_upnp") log.debug("In _try_upnp")
found = yield self.upnp.discover()
def get_free_port(upnp, port, protocol): if found and not self.upnp.miniupnpc_runner:
# returns an existing mapping if it exists log.info("set up redirects using txupnp")
mapping = upnp.getspecificportmapping(port, protocol) elif found and self.upnp.miniupnpc_runner:
if not mapping: log.warning("failed to set up redirect with txupnp, miniupnpc fallback was successful")
return port if found:
if upnp.lanaddr == mapping[0]: try:
return mapping[1] yield self._setup_redirects()
return get_free_port(upnp, port + 1, protocol) except Exception as err:
if not self.upnp.miniupnpc_runner:
def get_port_mapping(upnp, port, protocol, description): started_fallback = yield self.upnp.start_miniupnpc_fallback()
# try to map to the requested port, if there is already a mapping use the next external if started_fallback:
# port available yield self._setup_redirects()
if protocol not in ['UDP', 'TCP']: else:
raise Exception("invalid protocol") log.warning("failed to set up upnp redirects")
port = get_free_port(upnp, port, protocol)
if isinstance(port, tuple):
log.info("Found existing UPnP redirect %s:%i (%s) to %s:%i, using it",
self.external_ip, port, protocol, upnp.lanaddr, port)
return port
upnp.addportmapping(port, protocol, upnp.lanaddr, port,
description, '')
log.info("Set UPnP redirect %s:%i (%s) to %s:%i", self.external_ip, port,
protocol, upnp.lanaddr, port)
return port
def threaded_try_upnp():
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
external_ip = u.externalipaddress()
if external_ip != '0.0.0.0' and not self.external_ip:
# best not to rely on this external ip, the router can be behind layers of NATs
self.external_ip = external_ip
if self.peer_port:
self.peer_port = get_port_mapping(u, self.peer_port, 'TCP', 'LBRY peer port')
self.upnp_redirects.append((self.peer_port, 'TCP'))
if self.dht_node_port:
self.dht_node_port = get_port_mapping(u, self.dht_node_port, 'UDP', 'LBRY DHT port')
self.upnp_redirects.append((self.dht_node_port, 'UDP'))
return True
return False
def upnp_failed(err):
log.warning("UPnP failed. Reason: %s", err.getErrorMessage())
return False
d = threads.deferToThread(threaded_try_upnp)
d.addErrback(upnp_failed)
return d
def stop(self): def stop(self):
log.info("Unsetting upnp for session") return defer.DeferredList(
[self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()]
def threaded_unset_upnp(): )
if self.use_upnp is False:
log.debug("Not using upnp")
return False
u = miniupnpc.UPnP()
num_devices_found = u.discover()
if num_devices_found > 0:
u.selectigd()
for port, protocol in self.upnp_redirects:
if u.getspecificportmapping(port, protocol) is None:
log.warning(
"UPnP redirect for %s %d was removed by something else.",
protocol, port)
else:
u.deleteportmapping(port, protocol)
log.info("Removed UPnP redirect for %s %d.", protocol, port)
self.upnp_redirects = []
d = threads.deferToThread(threaded_unset_upnp)
d.addErrback(lambda err: str(err))
return d
class ExchangeRateManagerComponent(Component): class ExchangeRateManagerComponent(Component):

View file

@ -196,11 +196,14 @@ class AuthJSONRPCServer(AuthorizedBase):
component_attributes = {} component_attributes = {}
def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, to_skip=None, def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, to_skip=None,
looping_calls=None): looping_calls=None, reactor=None):
if not reactor:
from twisted.internet import reactor
self.analytics_manager = analytics_manager or analytics.Manager.new_instance() self.analytics_manager = analytics_manager or analytics.Manager.new_instance()
self.component_manager = component_manager or ComponentManager( self.component_manager = component_manager or ComponentManager(
analytics_manager=self.analytics_manager, analytics_manager=self.analytics_manager,
skip_components=to_skip or [] skip_components=to_skip or [],
reactor=reactor
) )
self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).iteritems()}) self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).iteritems()})
self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).iteritems()} self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).iteritems()}

View file

@ -24,13 +24,14 @@ requires = [
'lbryschema==0.0.16', 'lbryschema==0.0.16',
'lbryum==3.2.3', 'lbryum==3.2.3',
'miniupnpc', 'miniupnpc',
'txupnp==0.0.1a6',
'pyyaml', 'pyyaml',
'requests', 'requests',
'txJSON-RPC', 'txJSON-RPC',
'zope.interface', 'zope.interface',
'treq', 'treq',
'docopt', 'docopt',
'six' 'six',
] ]
console_scripts = [ console_scripts = [