forked from LBRYCommunity/lbry-sdk
update aioupnp to 0.0.13
This commit is contained in:
parent
21f02d3e6d
commit
103c93a06c
2 changed files with 47 additions and 44 deletions
|
@ -42,14 +42,6 @@ UPNP_COMPONENT = "upnp"
|
||||||
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
|
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
|
||||||
|
|
||||||
|
|
||||||
async def gather_dict(tasks: dict):
|
|
||||||
async def wait_value(key, value):
|
|
||||||
return key, await value
|
|
||||||
return dict(await asyncio.gather(*(
|
|
||||||
wait_value(*kv) for kv in tasks.items()
|
|
||||||
)))
|
|
||||||
|
|
||||||
|
|
||||||
class DatabaseComponent(Component):
|
class DatabaseComponent(Component):
|
||||||
component_name = DATABASE_COMPONENT
|
component_name = DATABASE_COMPONENT
|
||||||
|
|
||||||
|
@ -316,8 +308,7 @@ class DHTComponent(Component):
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.dht_node: Node = None
|
self.dht_node: typing.Optional[Node] = None
|
||||||
self.upnp_component = None
|
|
||||||
self.external_udp_port = None
|
self.external_udp_port = None
|
||||||
self.external_peer_port = None
|
self.external_peer_port = None
|
||||||
|
|
||||||
|
@ -343,10 +334,10 @@ class DHTComponent(Component):
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
log.info("start the dht")
|
log.info("start the dht")
|
||||||
self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
|
upnp_component = self.component_manager.get_component(UPNP_COMPONENT)
|
||||||
self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port)
|
self.external_peer_port = upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port)
|
||||||
self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.udp_port)
|
self.external_udp_port = upnp_component.upnp_redirects.get("UDP", self.conf.udp_port)
|
||||||
external_ip = self.upnp_component.external_ip
|
external_ip = upnp_component.external_ip
|
||||||
if not external_ip:
|
if not external_ip:
|
||||||
log.warning("UPnP component failed to get external ip")
|
log.warning("UPnP component failed to get external ip")
|
||||||
external_ip = await utils.get_external_ip()
|
external_ip = await utils.get_external_ip()
|
||||||
|
@ -354,7 +345,7 @@ class DHTComponent(Component):
|
||||||
log.warning("failed to get external ip")
|
log.warning("failed to get external ip")
|
||||||
|
|
||||||
self.dht_node = Node(
|
self.dht_node = Node(
|
||||||
asyncio.get_event_loop(),
|
self.component_manager.loop,
|
||||||
self.component_manager.peer_manager,
|
self.component_manager.peer_manager,
|
||||||
node_id=self.get_node_id(),
|
node_id=self.get_node_id(),
|
||||||
internal_udp_port=self.conf.udp_port,
|
internal_udp_port=self.conf.udp_port,
|
||||||
|
@ -379,7 +370,7 @@ class HashAnnouncerComponent(Component):
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.hash_announcer: BlobAnnouncer = None
|
self.hash_announcer: typing.Optional[BlobAnnouncer] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self) -> typing.Optional[BlobAnnouncer]:
|
def component(self) -> typing.Optional[BlobAnnouncer]:
|
||||||
|
@ -388,7 +379,7 @@ class HashAnnouncerComponent(Component):
|
||||||
async def start(self):
|
async def start(self):
|
||||||
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||||
dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage)
|
self.hash_announcer = BlobAnnouncer(self.component_manager.loop, dht_node, storage)
|
||||||
self.hash_announcer.start(self.conf.concurrent_blob_announcers)
|
self.hash_announcer.start(self.conf.concurrent_blob_announcers)
|
||||||
log.info("Started blob announcer")
|
log.info("Started blob announcer")
|
||||||
|
|
||||||
|
@ -477,9 +468,9 @@ class UPnPComponent(Component):
|
||||||
self._int_peer_port = self.conf.tcp_port
|
self._int_peer_port = self.conf.tcp_port
|
||||||
self._int_dht_node_port = self.conf.udp_port
|
self._int_dht_node_port = self.conf.udp_port
|
||||||
self.use_upnp = self.conf.use_upnp
|
self.use_upnp = self.conf.use_upnp
|
||||||
self.upnp = None
|
self.upnp: typing.Optional[UPnP] = None
|
||||||
self.upnp_redirects = {}
|
self.upnp_redirects = {}
|
||||||
self.external_ip = None
|
self.external_ip: typing.Optional[str] = None
|
||||||
self._maintain_redirects_task = None
|
self._maintain_redirects_task = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -496,9 +487,11 @@ class UPnPComponent(Component):
|
||||||
# setup the gateway if necessary
|
# setup the gateway if necessary
|
||||||
if not self.upnp:
|
if not self.upnp:
|
||||||
try:
|
try:
|
||||||
self.upnp = await UPnP.discover()
|
self.upnp = await UPnP.discover(loop=self.component_manager.loop)
|
||||||
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
|
log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
if isinstance(err, asyncio.CancelledError):
|
||||||
|
raise
|
||||||
log.warning("upnp discovery failed: %s", err)
|
log.warning("upnp discovery failed: %s", err)
|
||||||
self.upnp = None
|
self.upnp = None
|
||||||
|
|
||||||
|
@ -509,52 +502,59 @@ class UPnPComponent(Component):
|
||||||
external_ip = await self.upnp.get_external_ip()
|
external_ip = await self.upnp.get_external_ip()
|
||||||
if external_ip != "0.0.0.0" and not self.external_ip:
|
if external_ip != "0.0.0.0" and not self.external_ip:
|
||||||
log.info("got external ip from UPnP: %s", external_ip)
|
log.info("got external ip from UPnP: %s", external_ip)
|
||||||
except (asyncio.TimeoutError, UPnPError):
|
except (asyncio.TimeoutError, UPnPError, NotImplementedError):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if external_ip == "0.0.0.0" or not external_ip:
|
if external_ip == "0.0.0.0" or (external_ip and external_ip.startswith("192.")):
|
||||||
log.warning("unable to get external ip from UPnP, checking lbry.com fallback")
|
log.warning("unable to get external ip from UPnP, checking lbry.com fallback")
|
||||||
external_ip = await utils.get_external_ip()
|
external_ip = await utils.get_external_ip()
|
||||||
if self.external_ip and self.external_ip != external_ip:
|
if self.external_ip and self.external_ip != external_ip:
|
||||||
log.info("external ip changed from %s to %s", self.external_ip, external_ip)
|
log.info("external ip changed from %s to %s", self.external_ip, external_ip)
|
||||||
self.external_ip = external_ip
|
if self.external_ip:
|
||||||
assert self.external_ip is not None # TODO: handle going/starting offline
|
self.external_ip = external_ip
|
||||||
|
# assert self.external_ip is not None # TODO: handle going/starting offline
|
||||||
|
|
||||||
if not self.upnp_redirects and self.upnp: # setup missing redirects
|
if not self.upnp_redirects and self.upnp: # setup missing redirects
|
||||||
try:
|
log.info("add UPnP port mappings")
|
||||||
log.info("add UPnP port mappings")
|
upnp_redirects = {}
|
||||||
d = {}
|
if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
|
||||||
if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
|
try:
|
||||||
d["TCP"] = self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
|
upnp_redirects["TCP"] = await self.upnp.get_next_mapping(
|
||||||
if DHT_COMPONENT not in self.component_manager.skip_components:
|
self._int_peer_port, "TCP", "LBRY peer port", self._int_peer_port
|
||||||
d["UDP"] = self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
|
)
|
||||||
upnp_redirects = await gather_dict(d)
|
except (UPnPError, asyncio.TimeoutError, NotImplementedError):
|
||||||
|
pass
|
||||||
|
if DHT_COMPONENT not in self.component_manager.skip_components:
|
||||||
|
try:
|
||||||
|
upnp_redirects["UDP"] = await self.upnp.get_next_mapping(
|
||||||
|
self._int_dht_node_port, "UDP", "LBRY DHT port", self._int_dht_node_port
|
||||||
|
)
|
||||||
|
except (UPnPError, asyncio.TimeoutError, NotImplementedError):
|
||||||
|
pass
|
||||||
|
if upnp_redirects:
|
||||||
log.info("set up redirects: %s", upnp_redirects)
|
log.info("set up redirects: %s", upnp_redirects)
|
||||||
self.upnp_redirects.update(upnp_redirects)
|
self.upnp_redirects.update(upnp_redirects)
|
||||||
except (asyncio.TimeoutError, UPnPError):
|
|
||||||
self.upnp = None
|
|
||||||
return
|
|
||||||
elif self.upnp: # check existing redirects are still active
|
elif self.upnp: # check existing redirects are still active
|
||||||
found = set()
|
found = set()
|
||||||
mappings = await self.upnp.get_redirects()
|
mappings = await self.upnp.get_redirects()
|
||||||
for mapping in mappings:
|
for mapping in mappings:
|
||||||
proto = mapping['NewProtocol']
|
proto = mapping.protocol
|
||||||
if proto in self.upnp_redirects and mapping['NewExternalPort'] == self.upnp_redirects[proto]:
|
if proto in self.upnp_redirects and mapping.external_port == self.upnp_redirects[proto]:
|
||||||
if mapping['NewInternalClient'] == self.upnp.lan_address:
|
if mapping.lan_address == self.upnp.lan_address:
|
||||||
found.add(proto)
|
found.add(proto)
|
||||||
if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components:
|
if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components:
|
||||||
try:
|
try:
|
||||||
udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
|
udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port")
|
||||||
self.upnp_redirects['UDP'] = udp_port
|
self.upnp_redirects['UDP'] = udp_port
|
||||||
log.info("refreshed upnp redirect for dht port: %i", udp_port)
|
log.info("refreshed upnp redirect for dht port: %i", udp_port)
|
||||||
except (asyncio.TimeoutError, UPnPError):
|
except (asyncio.TimeoutError, UPnPError, NotImplementedError):
|
||||||
del self.upnp_redirects['UDP']
|
del self.upnp_redirects['UDP']
|
||||||
if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
|
if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components:
|
||||||
try:
|
try:
|
||||||
tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
|
tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port")
|
||||||
self.upnp_redirects['TCP'] = tcp_port
|
self.upnp_redirects['TCP'] = tcp_port
|
||||||
log.info("refreshed upnp redirect for peer port: %i", tcp_port)
|
log.info("refreshed upnp redirect for peer port: %i", tcp_port)
|
||||||
except (asyncio.TimeoutError, UPnPError):
|
except (asyncio.TimeoutError, UPnPError, NotImplementedError):
|
||||||
del self.upnp_redirects['TCP']
|
del self.upnp_redirects['TCP']
|
||||||
if ('TCP' in self.upnp_redirects
|
if ('TCP' in self.upnp_redirects
|
||||||
and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components) and (
|
and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components) and (
|
||||||
|
@ -572,7 +572,7 @@ class UPnPComponent(Component):
|
||||||
if self.upnp:
|
if self.upnp:
|
||||||
if not self.upnp_redirects and not all([x in self.component_manager.skip_components for x in
|
if not self.upnp_redirects and not all([x in self.component_manager.skip_components for x in
|
||||||
(DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)]):
|
(DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)]):
|
||||||
log.error("failed to setup upnp, debugging infomation: %s", self.upnp.zipped_debugging_info)
|
log.error("failed to setup upnp")
|
||||||
else:
|
else:
|
||||||
success = True
|
success = True
|
||||||
if self.upnp_redirects:
|
if self.upnp_redirects:
|
||||||
|
@ -583,13 +583,16 @@ class UPnPComponent(Component):
|
||||||
await self.component_manager.analytics_manager.send_upnp_setup_success_fail(
|
await self.component_manager.analytics_manager.send_upnp_setup_success_fail(
|
||||||
success, await self.get_status()
|
success, await self.get_status()
|
||||||
)
|
)
|
||||||
self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False))
|
self._maintain_redirects_task = self.component_manager.loop.create_task(
|
||||||
|
self._repeatedly_maintain_redirects(now=False)
|
||||||
|
)
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
if self.upnp_redirects:
|
if self.upnp_redirects:
|
||||||
|
log.info("Removing upnp redirects: %s", self.upnp_redirects)
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
|
self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items()
|
||||||
])
|
], loop=self.component_manager.loop)
|
||||||
if self._maintain_redirects_task and not self._maintain_redirects_task.done():
|
if self._maintain_redirects_task and not self._maintain_redirects_task.done():
|
||||||
self._maintain_redirects_task.cancel()
|
self._maintain_redirects_task.cancel()
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ setup(
|
||||||
install_requires=[
|
install_requires=[
|
||||||
'torba',
|
'torba',
|
||||||
'aiohttp==3.5.4',
|
'aiohttp==3.5.4',
|
||||||
'aioupnp==0.0.12',
|
'aioupnp==0.0.13',
|
||||||
'appdirs==1.4.3',
|
'appdirs==1.4.3',
|
||||||
'certifi>=2018.11.29',
|
'certifi>=2018.11.29',
|
||||||
'colorama==0.3.7',
|
'colorama==0.3.7',
|
||||||
|
|
Loading…
Reference in a new issue