Wrap "async for" over IterativeXXXFinder in try/finally ensuring aclose().

This commit is contained in:
Jonathan Moody 2022-04-12 12:49:32 -04:00
parent 82d7f81f41
commit 4767bb9dee

View file

@ -217,9 +217,13 @@ class Node:
shortlist: typing.Optional[typing.List['KademliaPeer']] = None shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']: ) -> typing.List['KademliaPeer']:
peers = [] peers = []
async for iteration_peers in self.get_iterative_node_finder( node_finder = self.get_iterative_node_finder(
node_id, shortlist=shortlist, max_results=max_results): node_id, shortlist=shortlist, max_results=max_results)
peers.extend(iteration_peers) try:
async for iteration_peers in node_finder:
peers.extend(iteration_peers)
finally:
await node_finder.aclose()
distance = Distance(node_id) distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id)) peers.sort(key=lambda peer: distance(peer.node_id))
return peers[:count] return peers[:count]
@ -245,36 +249,39 @@ class Node:
# prioritize peers who reply to a dht ping first # prioritize peers who reply to a dht ping first
# this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers # this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers
value_finder = self.get_iterative_value_finder(bytes.fromhex(blob_hash))
async for results in self.get_iterative_value_finder(bytes.fromhex(blob_hash)): try:
to_put = [] async for results in value_finder:
for peer in results: to_put = []
if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: for peer in results:
continue if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port:
is_good = self.protocol.peer_manager.peer_is_good(peer) continue
if is_good: is_good = self.protocol.peer_manager.peer_is_good(peer)
# the peer has replied recently over UDP, it can probably be reached on the TCP port if is_good:
to_put.append(peer) # the peer has replied recently over UDP, it can probably be reached on the TCP port
elif is_good is None: to_put.append(peer)
if not peer.udp_port: elif is_good is None:
# TODO: use the same port for TCP and UDP if not peer.udp_port:
# the udp port must be guessed # TODO: use the same port for TCP and UDP
# default to the ports being the same. if the TCP port appears to be <=0.48.0 default, # the udp port must be guessed
# including on a network with several nodes, then assume the udp port is proportionately # default to the ports being the same. if the TCP port appears to be <=0.48.0 default,
# based on a starting port of 4444 # including on a network with several nodes, then assume the udp port is proportionately
udp_port_to_try = peer.tcp_port # based on a starting port of 4444
if 3400 > peer.tcp_port > 3332: udp_port_to_try = peer.tcp_port
udp_port_to_try = (peer.tcp_port - 3333) + 4444 if 3400 > peer.tcp_port > 3332:
self.loop.create_task(put_into_result_queue_after_pong( udp_port_to_try = (peer.tcp_port - 3333) + 4444
make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) self.loop.create_task(put_into_result_queue_after_pong(
)) make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port)
))
else:
self.loop.create_task(put_into_result_queue_after_pong(peer))
else: else:
self.loop.create_task(put_into_result_queue_after_pong(peer)) # the peer is known to be bad/unreachable, skip trying to connect to it over TCP
else: log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash)
# the peer is known to be bad/unreachable, skip trying to connect to it over TCP if to_put:
log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) result_queue.put_nowait(to_put)
if to_put: finally:
result_queue.put_nowait(to_put) await value_finder.aclose()
def accumulate_peers(self, search_queue: asyncio.Queue, def accumulate_peers(self, search_queue: asyncio.Queue,
peer_queue: typing.Optional[asyncio.Queue] = None peer_queue: typing.Optional[asyncio.Queue] = None