diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index 152ff0b57..2159cb86a 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -197,25 +197,28 @@ class Node: peers.sort(key=lambda peer: distance(peer.node_id)) return peers[:count] - async def _accumulate_search_junction(self, search_queue: asyncio.Queue, - result_queue: asyncio.Queue): + async def _accumulate_peers_for_value(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue): tasks = [] try: while True: blob_hash = await search_queue.get() - tasks.append(self.loop.create_task(self._value_producer(blob_hash, result_queue))) + tasks.append(self.loop.create_task(self._peers_for_value_producer(blob_hash, result_queue))) finally: for task in tasks: task.cancel() - async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): - async def ping(_peer): + async def _peers_for_value_producer(self, blob_hash: str, result_queue: asyncio.Queue): + async def put_into_result_queue_after_pong(_peer): try: await self.protocol.get_rpc_peer(_peer).ping() result_queue.put_nowait([_peer]) + log.debug("pong from %s:%i for %s", _peer.address, _peer.udp_port, blob_hash) except asyncio.TimeoutError: pass + # prioritize peers who reply to a dht ping first + # this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers + async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): to_put = [] for peer in results: @@ -223,19 +226,25 @@ class Node: continue is_good = self.protocol.peer_manager.peer_is_good(peer) if is_good: + # the peer has replied recently over UDP, it can probably be reached on the TCP port to_put.append(peer) elif is_good is None: - udp_port_to_try = 4444 - if 3400 > peer.tcp_port > 3332: - if not peer.udp_port: - udp_port_to_try = (peer.tcp_port - 3333) + 4444 - elif 4500 > peer.tcp_port > 4443: - if not peer.udp_port: - udp_port_to_try = peer.tcp_port if not peer.udp_port: - peer = make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) - self.loop.create_task(ping(peer)) + # TODO: use the same port for TCP and UDP + # the udp port must be guessed + # default to the ports being the same. if the TCP port appears to be <=0.48.0 default, + # including on a network with several nodes, then assume the udp port is proportionately + # based on a starting port of 4444 + udp_port_to_try = peer.tcp_port + if 3400 > peer.tcp_port > 3332: + udp_port_to_try = (peer.tcp_port - 3333) + 4444 + self.loop.create_task(put_into_result_queue_after_pong( + make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) + )) + else: + self.loop.create_task(put_into_result_queue_after_pong(peer)) else: + # the peer is known to be bad/unreachable, skip trying to connect to it over TCP log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) if to_put: result_queue.put_nowait(to_put) @@ -244,4 +253,4 @@ class Node: peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ asyncio.Queue, asyncio.Task]: q = peer_queue or asyncio.Queue(loop=self.loop) - return q, self.loop.create_task(self._accumulate_search_junction(search_queue, q)) + return q, self.loop.create_task(self._accumulate_peers_for_value(search_queue, q))