fix predicting udp port in _accumulate_peers_for_value

-add comments
-rename functions to be more descriptive of what they do
This commit is contained in:
Jack Robison 2019-11-26 14:45:39 -05:00
parent 2277d134cc
commit 71ae174e29
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -197,25 +197,28 @@ class Node:
peers.sort(key=lambda peer: distance(peer.node_id)) peers.sort(key=lambda peer: distance(peer.node_id))
return peers[:count] return peers[:count]
async def _accumulate_search_junction(self, search_queue: asyncio.Queue, async def _accumulate_peers_for_value(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue):
result_queue: asyncio.Queue):
tasks = [] tasks = []
try: try:
while True: while True:
blob_hash = await search_queue.get() blob_hash = await search_queue.get()
tasks.append(self.loop.create_task(self._value_producer(blob_hash, result_queue))) tasks.append(self.loop.create_task(self._peers_for_value_producer(blob_hash, result_queue)))
finally: finally:
for task in tasks: for task in tasks:
task.cancel() task.cancel()
async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue): async def _peers_for_value_producer(self, blob_hash: str, result_queue: asyncio.Queue):
async def ping(_peer): async def put_into_result_queue_after_pong(_peer):
try: try:
await self.protocol.get_rpc_peer(_peer).ping() await self.protocol.get_rpc_peer(_peer).ping()
result_queue.put_nowait([_peer]) result_queue.put_nowait([_peer])
log.debug("pong from %s:%i for %s", _peer.address, _peer.udp_port, blob_hash)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
# prioritize peers who reply to a dht ping first
# this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
to_put = [] to_put = []
for peer in results: for peer in results:
@ -223,19 +226,25 @@ class Node:
continue continue
is_good = self.protocol.peer_manager.peer_is_good(peer) is_good = self.protocol.peer_manager.peer_is_good(peer)
if is_good: if is_good:
# the peer has replied recently over UDP, it can probably be reached on the TCP port
to_put.append(peer) to_put.append(peer)
elif is_good is None: elif is_good is None:
udp_port_to_try = 4444
if 3400 > peer.tcp_port > 3332:
if not peer.udp_port:
udp_port_to_try = (peer.tcp_port - 3333) + 4444
elif 4500 > peer.tcp_port > 4443:
if not peer.udp_port: if not peer.udp_port:
# TODO: use the same port for TCP and UDP
# the udp port must be guessed
# default to the ports being the same. if the TCP port appears to be <=0.48.0 default,
# including on a network with several nodes, then assume the udp port is proportionately
# based on a starting port of 4444
udp_port_to_try = peer.tcp_port udp_port_to_try = peer.tcp_port
if not peer.udp_port: if 3400 > peer.tcp_port > 3332:
peer = make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port) udp_port_to_try = (peer.tcp_port - 3333) + 4444
self.loop.create_task(ping(peer)) self.loop.create_task(put_into_result_queue_after_pong(
make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port)
))
else: else:
self.loop.create_task(put_into_result_queue_after_pong(peer))
else:
# the peer is known to be bad/unreachable, skip trying to connect to it over TCP
log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash) log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash)
if to_put: if to_put:
result_queue.put_nowait(to_put) result_queue.put_nowait(to_put)
@ -244,4 +253,4 @@ class Node:
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
asyncio.Queue, asyncio.Task]: asyncio.Queue, asyncio.Task]:
q = peer_queue or asyncio.Queue(loop=self.loop) q = peer_queue or asyncio.Queue(loop=self.loop)
return q, self.loop.create_task(self._accumulate_search_junction(search_queue, q)) return q, self.loop.create_task(self._accumulate_peers_for_value(search_queue, q))