Merge pull request #2648 from lbryio/fix-blob-udp-ping
Fix UDP ping port in blob peer accumulator
This commit is contained in:
commit
7f526be879
1 changed files with 24 additions and 15 deletions
|
@ -197,25 +197,28 @@ class Node:
|
||||||
peers.sort(key=lambda peer: distance(peer.node_id))
|
peers.sort(key=lambda peer: distance(peer.node_id))
|
||||||
return peers[:count]
|
return peers[:count]
|
||||||
|
|
||||||
async def _accumulate_search_junction(self, search_queue: asyncio.Queue,
|
async def _accumulate_peers_for_value(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue):
|
||||||
result_queue: asyncio.Queue):
|
|
||||||
tasks = []
|
tasks = []
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
blob_hash = await search_queue.get()
|
blob_hash = await search_queue.get()
|
||||||
tasks.append(self.loop.create_task(self._value_producer(blob_hash, result_queue)))
|
tasks.append(self.loop.create_task(self._peers_for_value_producer(blob_hash, result_queue)))
|
||||||
finally:
|
finally:
|
||||||
for task in tasks:
|
for task in tasks:
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
|
||||||
async def _value_producer(self, blob_hash: str, result_queue: asyncio.Queue):
|
async def _peers_for_value_producer(self, blob_hash: str, result_queue: asyncio.Queue):
|
||||||
async def ping(_peer):
|
async def put_into_result_queue_after_pong(_peer):
|
||||||
try:
|
try:
|
||||||
await self.protocol.get_rpc_peer(_peer).ping()
|
await self.protocol.get_rpc_peer(_peer).ping()
|
||||||
result_queue.put_nowait([_peer])
|
result_queue.put_nowait([_peer])
|
||||||
|
log.debug("pong from %s:%i for %s", _peer.address, _peer.udp_port, blob_hash)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# prioritize peers who reply to a dht ping first
|
||||||
|
# this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers
|
||||||
|
|
||||||
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
|
async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
|
||||||
to_put = []
|
to_put = []
|
||||||
for peer in results:
|
for peer in results:
|
||||||
|
@ -223,19 +226,25 @@ class Node:
|
||||||
continue
|
continue
|
||||||
is_good = self.protocol.peer_manager.peer_is_good(peer)
|
is_good = self.protocol.peer_manager.peer_is_good(peer)
|
||||||
if is_good:
|
if is_good:
|
||||||
|
# the peer has replied recently over UDP, it can probably be reached on the TCP port
|
||||||
to_put.append(peer)
|
to_put.append(peer)
|
||||||
elif is_good is None:
|
elif is_good is None:
|
||||||
udp_port_to_try = 4444
|
|
||||||
if 3400 > peer.tcp_port > 3332:
|
|
||||||
if not peer.udp_port:
|
|
||||||
udp_port_to_try = (peer.tcp_port - 3333) + 4444
|
|
||||||
elif 4500 > peer.tcp_port > 4443:
|
|
||||||
if not peer.udp_port:
|
|
||||||
udp_port_to_try = peer.tcp_port
|
|
||||||
if not peer.udp_port:
|
if not peer.udp_port:
|
||||||
peer = make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port)
|
# TODO: use the same port for TCP and UDP
|
||||||
self.loop.create_task(ping(peer))
|
# the udp port must be guessed
|
||||||
|
# default to the ports being the same. if the TCP port appears to be <=0.48.0 default,
|
||||||
|
# including on a network with several nodes, then assume the udp port is proportionately
|
||||||
|
# based on a starting port of 4444
|
||||||
|
udp_port_to_try = peer.tcp_port
|
||||||
|
if 3400 > peer.tcp_port > 3332:
|
||||||
|
udp_port_to_try = (peer.tcp_port - 3333) + 4444
|
||||||
|
self.loop.create_task(put_into_result_queue_after_pong(
|
||||||
|
make_kademlia_peer(peer.node_id, peer.address, udp_port_to_try, peer.tcp_port)
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
self.loop.create_task(put_into_result_queue_after_pong(peer))
|
||||||
else:
|
else:
|
||||||
|
# the peer is known to be bad/unreachable, skip trying to connect to it over TCP
|
||||||
log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash)
|
log.debug("skip bad peer %s:%i for %s", peer.address, peer.tcp_port, blob_hash)
|
||||||
if to_put:
|
if to_put:
|
||||||
result_queue.put_nowait(to_put)
|
result_queue.put_nowait(to_put)
|
||||||
|
@ -244,4 +253,4 @@ class Node:
|
||||||
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
|
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
|
||||||
asyncio.Queue, asyncio.Task]:
|
asyncio.Queue, asyncio.Task]:
|
||||||
q = peer_queue or asyncio.Queue(loop=self.loop)
|
q = peer_queue or asyncio.Queue(loop=self.loop)
|
||||||
return q, self.loop.create_task(self._accumulate_search_junction(search_queue, q))
|
return q, self.loop.create_task(self._accumulate_peers_for_value(search_queue, q))
|
||||||
|
|
Loading…
Reference in a new issue