From 7191042bb821e152f1f1d9cd5adfdce9f4bef608 Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Tue, 12 Nov 2019 22:29:56 +0100 Subject: [PATCH] Add new test, remove old test, change error type --- lbry/lbry/dht/node.py | 8 ++-- lbry/tests/dht_mocks.py | 5 ++- lbry/tests/integration/test_dht.py | 50 ----------------------- lbry/tests/unit/dht/test_node.py | 65 ++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 56 deletions(-) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index e963af445..bf8afba61 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -2,6 +2,7 @@ import logging import asyncio import typing import binascii +import socket from lbry.utils import resolve_host from lbry.dht import constants from lbry.dht.peer import make_kademlia_peer @@ -52,9 +53,6 @@ class Node: node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) - if self._storage: - await self._storage.save_kademlia_peers(self.protocol.routing_table.get_peers()) - if self.protocol.routing_table.get_peers(): # if we have node ids to look up, perform the iterative search until we have k results while node_ids: @@ -72,6 +70,8 @@ class Node: to_ping = [peer for peer in set(total_peers) if self.protocol.peer_manager.peer_is_good(peer) is not True] if to_ping: self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) + if self._storage: + await self._storage.save_kademlia_peers(self.protocol.routing_table.get_peers()) if force_once: break @@ -161,7 +161,7 @@ class Node: (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) for address, udp_port in known_node_urls or [] ])) - except asyncio.TimeoutError: + except socket.gaierror: await asyncio.sleep(30) continue diff --git a/lbry/tests/dht_mocks.py b/lbry/tests/dht_mocks.py index cec9b8e80..c725ec32e 100644 --- a/lbry/tests/dht_mocks.py +++ b/lbry/tests/dht_mocks.py @@ -48,8 +48,9 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop, @contextlib.contextmanager -def mock_network_loop(loop: asyncio.AbstractEventLoop): - dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = {} +def mock_network_loop(loop: asyncio.AbstractEventLoop, + dht_network: typing.Optional[typing.Dict[typing.Tuple[str, int], 'KademliaProtocol']] = None): + dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = dht_network or {} async def create_datagram_endpoint(proto_lam: typing.Callable[[], 'KademliaProtocol'], from_addr: typing.Tuple[str, int]): diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index 4caff5fd4..3e75acb91 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -164,53 +164,3 @@ class DHTIntegrationTest(AsyncioTestCase): self.assertEqual(num_seeds, len(node2.protocol.routing_table.get_peers())) for bucket1, bucket2 in zip(node1.protocol.routing_table.buckets, node2.protocol.routing_table.buckets): self.assertEqual((bucket1.range_min, bucket1.range_max), (bucket2.range_min, bucket2.range_max)) - - async def test_switch_to_known_seeds(self): - num_peers = 10 - start_port = 40000 - external_ip = '127.0.0.1' - - await self.setup_network(num_peers, seed_nodes=num_peers // 2, start_port=start_port) - await asyncio.gather(*[node.joined.wait() for node in self.nodes]) - peer_args = [ - (n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes - ] - known_peers = [make_kademlia_peer(*args) for args in peer_args[:num_peers // 2]] - known_nodes = self.nodes[:num_peers // 2] - persisted_peers = [make_kademlia_peer(*args) for args in peer_args[num_peers // 2:]] - persisted_nodes = self.nodes[num_peers // 2:] - - # Create node with the persisted nodes in storage - node = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) - await node._storage.save_kademlia_peers(persisted_peers) - - # Stop known peers so they stop replying and won't be added - for n in known_nodes: - n.stop() - - node.start(external_ip, self.known_node_addresses[:num_peers // 2]) - await node.joined.wait() - self.assertTrue(node.joined.is_set()) - - # Only persisted ones are added to the routing table - self.assertCountEqual(persisted_peers, node.protocol.routing_table.get_peers()) - - # Start the known ones, stop the persisted - for n1, n2 in zip(known_nodes, persisted_nodes): - n1.start(external_ip) - n2.stop() - asyncio.gather(*[n.joined.wait() for n in known_nodes]) - await asyncio.sleep(3) - self.assertTrue(all(known.joined.is_set() for known in known_nodes)) - self.assertTrue(all(not persisted.joined.is_set() for persisted in persisted_nodes)) - - # Remove persisted from node's routing table, set them as bad - for peer in persisted_peers: - node.protocol.routing_table.remove_peer(peer) - node.protocol.peer_manager.report_failure(peer.address, peer.udp_port) - self.assertFalse(node.protocol.routing_table.get_peers()) - - # The known_peers replace the persisted ones - await node.joined.wait() - await asyncio.sleep(3) - self.assertCountEqual(known_peers, node.protocol.routing_table.get_peers()) diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index 03902b553..28bd19bbe 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -2,9 +2,11 @@ import asyncio import typing from torba.testcase import AsyncioTestCase from tests import dht_mocks +from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node from lbry.dht.peer import PeerManager, make_kademlia_peer +from lbry.extras.daemon.storage import SQLiteStorage class TestNodePingQueueDiscover(AsyncioTestCase): @@ -84,3 +86,66 @@ class TestNodePingQueueDiscover(AsyncioTestCase): # teardown for n in nodes.values(): n.stop() + + +class TestTemporarilyLosingConnetction(AsyncioTestCase): + + async def test_losing_connection(self): + loop = asyncio.get_event_loop() + loop.set_debug(False) + + peer_addresses = [ + (f'127.0.0.1', 40000+i) for i in range(10) + ] + node_ids = [constants.generate_id(i) for i in range(10)] + + nodes = [ + Node( + loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address, + storage=SQLiteStorage(Config(), ":memory:", self.loop, self.loop.time) + ) + for node_id, (address, udp_port) in zip(node_ids, peer_addresses) + ] + dht_network = {peer_addresses[i]: node.protocol for i, node in enumerate(nodes)} + num_seeds = 3 + + with dht_mocks.mock_network_loop(loop, dht_network): + for i, n in enumerate(nodes): + await n._storage.open() + n.start(peer_addresses[i][0], peer_addresses[:num_seeds]) + + node = nodes[-1] + advance = dht_mocks.get_time_accelerator(loop, loop.time()) + await advance(1000) + + # Join the network, assert that at least the known peers are in RT + self.assertTrue(node.joined.is_set()) + self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds) + + # Refresh, so that the peers are persisted + await advance(4000) + self.assertTrue(len(await node._storage.get_persisted_kademlia_peers()) > num_seeds) + + # We lost internet connection - all the peers stop responding + popped_protocols = [] + for peer_address in peer_addresses[:-1]: + popped_protocols.append(dht_network.pop(peer_address)) + + # The peers are cleared on refresh from RT and storage + await advance(4000) + self.assertFalse(node.protocol.routing_table.get_peers()) + self.assertFalse(await node._storage.get_persisted_kademlia_peers()) + + # Reconnect some of the previously stored - node shouldn't connect + for peer_address, protocol in zip(peer_addresses[num_seeds+1:-2], popped_protocols[num_seeds+1:-2]): + dht_network[peer_address] = protocol + await advance(1000) + self.assertEqual(0, len(node.protocol.routing_table.get_peers())) + + # Reconnect some of the seed nodes + for peer_address, protocol in zip(peer_addresses[:num_seeds], popped_protocols[:num_seeds]): + dht_network[peer_address] = protocol + + # Check that node reconnects at least to them + await advance(1000) + self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds)