diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index 7710fecd0..0feafb8e0 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -42,14 +42,6 @@ UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" -async def gather_dict(tasks: dict): - async def wait_value(key, value): - return key, await value - return dict(await asyncio.gather(*( - wait_value(*kv) for kv in tasks.items() - ))) - - class DatabaseComponent(Component): component_name = DATABASE_COMPONENT @@ -316,8 +308,7 @@ class DHTComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.dht_node: Node = None - self.upnp_component = None + self.dht_node: typing.Optional[Node] = None self.external_udp_port = None self.external_peer_port = None @@ -343,10 +334,10 @@ class DHTComponent(Component): async def start(self): log.info("start the dht") - self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) - self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port) - self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.udp_port) - external_ip = self.upnp_component.external_ip + upnp_component = self.component_manager.get_component(UPNP_COMPONENT) + self.external_peer_port = upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port) + self.external_udp_port = upnp_component.upnp_redirects.get("UDP", self.conf.udp_port) + external_ip = upnp_component.external_ip if not external_ip: log.warning("UPnP component failed to get external ip") external_ip = await utils.get_external_ip() @@ -354,7 +345,7 @@ class DHTComponent(Component): log.warning("failed to get external ip") self.dht_node = Node( - asyncio.get_event_loop(), + self.component_manager.loop, self.component_manager.peer_manager, node_id=self.get_node_id(), internal_udp_port=self.conf.udp_port, @@ -379,7 +370,7 @@ class HashAnnouncerComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.hash_announcer: BlobAnnouncer = None + self.hash_announcer: typing.Optional[BlobAnnouncer] = None @property def component(self) -> typing.Optional[BlobAnnouncer]: @@ -388,7 +379,7 @@ class HashAnnouncerComponent(Component): async def start(self): storage = self.component_manager.get_component(DATABASE_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT) - self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage) + self.hash_announcer = BlobAnnouncer(self.component_manager.loop, dht_node, storage) self.hash_announcer.start(self.conf.concurrent_blob_announcers) log.info("Started blob announcer") @@ -477,9 +468,9 @@ class UPnPComponent(Component): self._int_peer_port = self.conf.tcp_port self._int_dht_node_port = self.conf.udp_port self.use_upnp = self.conf.use_upnp - self.upnp = None + self.upnp: typing.Optional[UPnP] = None self.upnp_redirects = {} - self.external_ip = None + self.external_ip: typing.Optional[str] = None self._maintain_redirects_task = None @property @@ -496,9 +487,11 @@ class UPnPComponent(Component): # setup the gateway if necessary if not self.upnp: try: - self.upnp = await UPnP.discover() + self.upnp = await UPnP.discover(loop=self.component_manager.loop) log.info("found upnp gateway: %s", self.upnp.gateway.manufacturer_string) except Exception as err: + if isinstance(err, asyncio.CancelledError): + raise log.warning("upnp discovery failed: %s", err) self.upnp = None @@ -509,52 +502,59 @@ class UPnPComponent(Component): external_ip = await self.upnp.get_external_ip() if external_ip != "0.0.0.0" and not self.external_ip: log.info("got external ip from UPnP: %s", external_ip) - except (asyncio.TimeoutError, UPnPError): + except (asyncio.TimeoutError, UPnPError, NotImplementedError): pass - if external_ip == "0.0.0.0" or not external_ip: + if external_ip == "0.0.0.0" or (external_ip and external_ip.startswith("192.")): log.warning("unable to get external ip from UPnP, checking lbry.com fallback") external_ip = await utils.get_external_ip() if self.external_ip and self.external_ip != external_ip: log.info("external ip changed from %s to %s", self.external_ip, external_ip) - self.external_ip = external_ip - assert self.external_ip is not None # TODO: handle going/starting offline + if self.external_ip: + self.external_ip = external_ip + # assert self.external_ip is not None # TODO: handle going/starting offline if not self.upnp_redirects and self.upnp: # setup missing redirects - try: - log.info("add UPnP port mappings") - d = {} - if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components: - d["TCP"] = self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port") - if DHT_COMPONENT not in self.component_manager.skip_components: - d["UDP"] = self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port") - upnp_redirects = await gather_dict(d) + log.info("add UPnP port mappings") + upnp_redirects = {} + if PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components: + try: + upnp_redirects["TCP"] = await self.upnp.get_next_mapping( + self._int_peer_port, "TCP", "LBRY peer port", self._int_peer_port + ) + except (UPnPError, asyncio.TimeoutError, NotImplementedError): + pass + if DHT_COMPONENT not in self.component_manager.skip_components: + try: + upnp_redirects["UDP"] = await self.upnp.get_next_mapping( + self._int_dht_node_port, "UDP", "LBRY DHT port", self._int_dht_node_port + ) + except (UPnPError, asyncio.TimeoutError, NotImplementedError): + pass + if upnp_redirects: log.info("set up redirects: %s", upnp_redirects) self.upnp_redirects.update(upnp_redirects) - except (asyncio.TimeoutError, UPnPError): - self.upnp = None - return elif self.upnp: # check existing redirects are still active found = set() mappings = await self.upnp.get_redirects() for mapping in mappings: - proto = mapping['NewProtocol'] - if proto in self.upnp_redirects and mapping['NewExternalPort'] == self.upnp_redirects[proto]: - if mapping['NewInternalClient'] == self.upnp.lan_address: + proto = mapping.protocol + if proto in self.upnp_redirects and mapping.external_port == self.upnp_redirects[proto]: + if mapping.lan_address == self.upnp.lan_address: found.add(proto) if 'UDP' not in found and DHT_COMPONENT not in self.component_manager.skip_components: try: udp_port = await self.upnp.get_next_mapping(self._int_dht_node_port, "UDP", "LBRY DHT port") self.upnp_redirects['UDP'] = udp_port log.info("refreshed upnp redirect for dht port: %i", udp_port) - except (asyncio.TimeoutError, UPnPError): + except (asyncio.TimeoutError, UPnPError, NotImplementedError): del self.upnp_redirects['UDP'] if 'TCP' not in found and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components: try: tcp_port = await self.upnp.get_next_mapping(self._int_peer_port, "TCP", "LBRY peer port") self.upnp_redirects['TCP'] = tcp_port log.info("refreshed upnp redirect for peer port: %i", tcp_port) - except (asyncio.TimeoutError, UPnPError): + except (asyncio.TimeoutError, UPnPError, NotImplementedError): del self.upnp_redirects['TCP'] if ('TCP' in self.upnp_redirects and PEER_PROTOCOL_SERVER_COMPONENT not in self.component_manager.skip_components) and ( @@ -572,7 +572,7 @@ class UPnPComponent(Component): if self.upnp: if not self.upnp_redirects and not all([x in self.component_manager.skip_components for x in (DHT_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT)]): - log.error("failed to setup upnp, debugging infomation: %s", self.upnp.zipped_debugging_info) + log.error("failed to setup upnp") else: success = True if self.upnp_redirects: @@ -583,13 +583,16 @@ class UPnPComponent(Component): await self.component_manager.analytics_manager.send_upnp_setup_success_fail( success, await self.get_status() ) - self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False)) + self._maintain_redirects_task = self.component_manager.loop.create_task( + self._repeatedly_maintain_redirects(now=False) + ) async def stop(self): if self.upnp_redirects: + log.info("Removing upnp redirects: %s", self.upnp_redirects) await asyncio.wait([ self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items() - ]) + ], loop=self.component_manager.loop) if self._maintain_redirects_task and not self._maintain_redirects_task.done(): self._maintain_redirects_task.cancel() diff --git a/lbry/setup.py b/lbry/setup.py index 6aad561e9..ce51f8626 100644 --- a/lbry/setup.py +++ b/lbry/setup.py @@ -25,7 +25,7 @@ setup( install_requires=[ 'torba', 'aiohttp==3.5.4', - 'aioupnp==0.0.12', + 'aioupnp==0.0.13', 'appdirs==1.4.3', 'certifi>=2018.11.29', 'colorama==0.3.7',