lbry-sdk/lbry/tests/unit/dht/test_node.py

152 lines
6.2 KiB
Python
Raw Normal View History

2019-01-22 18:49:43 +01:00
import asyncio
import typing
from torba.testcase import AsyncioTestCase
from tests import dht_mocks
from lbry.conf import Config
from lbry.dht import constants
from lbry.dht.node import Node
2019-10-01 02:00:10 +02:00
from lbry.dht.peer import PeerManager, make_kademlia_peer
from lbry.extras.daemon.storage import SQLiteStorage
2015-08-20 17:27:15 +02:00
2019-01-22 18:49:43 +01:00
class TestNodePingQueueDiscover(AsyncioTestCase):
async def test_ping_queue_discover(self):
loop = asyncio.get_event_loop()
2019-01-23 16:41:34 +01:00
loop.set_debug(False)
2015-08-20 17:27:15 +02:00
2019-01-22 18:49:43 +01:00
peer_addresses = [
(constants.generate_id(1), '1.2.3.1'),
(constants.generate_id(2), '1.2.3.2'),
(constants.generate_id(3), '1.2.3.3'),
(constants.generate_id(4), '1.2.3.4'),
(constants.generate_id(5), '1.2.3.5'),
(constants.generate_id(6), '1.2.3.6'),
(constants.generate_id(7), '1.2.3.7'),
(constants.generate_id(8), '1.2.3.8'),
(constants.generate_id(9), '1.2.3.9'),
]
with dht_mocks.mock_network_loop(loop):
advance = dht_mocks.get_time_accelerator(loop, loop.time())
# start the nodes
nodes: typing.Dict[int, Node] = {
i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
for i, (node_id, address) in enumerate(peer_addresses)
}
for i, n in nodes.items():
n.start(peer_addresses[i][1], [])
2015-08-20 17:27:15 +02:00
2019-01-22 18:49:43 +01:00
await advance(1)
2015-08-20 17:27:15 +02:00
2019-01-22 18:49:43 +01:00
node_1 = nodes[0]
2019-01-22 18:49:43 +01:00
# ping 8 nodes from node_1, this will result in a delayed return ping
futs = []
for i in range(1, len(peer_addresses)):
node = nodes[i]
assert node.protocol.node_id != node_1.protocol.node_id
2019-10-01 02:00:10 +02:00
peer = make_kademlia_peer(
2019-01-22 18:49:43 +01:00
node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port
)
futs.append(node_1.protocol.get_rpc_peer(peer).ping())
await advance(3)
replies = await asyncio.gather(*tuple(futs))
self.assertTrue(all(map(lambda reply: reply == b"pong", replies)))
2019-01-22 18:49:43 +01:00
# run for long enough for the delayed pings to have been sent by node 1
await advance(1000)
2017-10-10 19:31:18 +02:00
2019-01-22 18:49:43 +01:00
# verify all of the previously pinged peers have node_1 in their routing tables
for n in nodes.values():
peers = n.protocol.routing_table.get_peers()
if n is node_1:
self.assertEqual(8, len(peers))
else:
self.assertEqual(1, len(peers))
self.assertEqual((peers[0].node_id, peers[0].address, peers[0].udp_port),
(node_1.protocol.node_id, node_1.protocol.external_ip, node_1.protocol.udp_port))
2015-08-20 17:27:15 +02:00
2019-01-22 18:49:43 +01:00
# run long enough for the refresh loop to run
await advance(3600)
2017-10-10 19:31:18 +02:00
2019-01-22 18:49:43 +01:00
# verify all the nodes know about each other
for n in nodes.values():
if n is node_1:
continue
peers = n.protocol.routing_table.get_peers()
self.assertEqual(8, len(peers))
self.assertSetEqual(
{n_id[0] for n_id in peer_addresses if n_id[0] != n.protocol.node_id},
{c.node_id for c in peers}
)
self.assertSetEqual(
{n_addr[1] for n_addr in peer_addresses if n_addr[1] != n.protocol.external_ip},
{c.address for c in peers}
)
2017-10-10 19:31:18 +02:00
2019-01-22 18:49:43 +01:00
# teardown
for n in nodes.values():
n.stop()
class TestTemporarilyLosingConnetction(AsyncioTestCase):
async def test_losing_connection(self):
loop = asyncio.get_event_loop()
loop.set_debug(False)
peer_addresses = [
(f'127.0.0.1', 40000+i) for i in range(10)
]
node_ids = [constants.generate_id(i) for i in range(10)]
nodes = [
Node(
loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address,
storage=SQLiteStorage(Config(), ":memory:", self.loop, self.loop.time)
)
for node_id, (address, udp_port) in zip(node_ids, peer_addresses)
]
dht_network = {peer_addresses[i]: node.protocol for i, node in enumerate(nodes)}
num_seeds = 3
with dht_mocks.mock_network_loop(loop, dht_network):
for i, n in enumerate(nodes):
await n._storage.open()
n.start(peer_addresses[i][0], peer_addresses[:num_seeds])
node = nodes[-1]
advance = dht_mocks.get_time_accelerator(loop, loop.time())
await advance(1000)
# Join the network, assert that at least the known peers are in RT
self.assertTrue(node.joined.is_set())
self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds)
# Refresh, so that the peers are persisted
await advance(4000)
self.assertTrue(len(await node._storage.get_persisted_kademlia_peers()) > num_seeds)
# We lost internet connection - all the peers stop responding
popped_protocols = []
for peer_address in peer_addresses[:-1]:
popped_protocols.append(dht_network.pop(peer_address))
# The peers are cleared on refresh from RT and storage
await advance(4000)
self.assertFalse(node.protocol.routing_table.get_peers())
self.assertFalse(await node._storage.get_persisted_kademlia_peers())
# Reconnect some of the previously stored - node shouldn't connect
for peer_address, protocol in zip(peer_addresses[num_seeds+1:-2], popped_protocols[num_seeds+1:-2]):
dht_network[peer_address] = protocol
await advance(1000)
self.assertEqual(0, len(node.protocol.routing_table.get_peers()))
# Reconnect some of the seed nodes
for peer_address, protocol in zip(peer_addresses[:num_seeds], popped_protocols[:num_seeds]):
dht_network[peer_address] = protocol
# Check that node reconnects at least to them
await advance(1000)
self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds)