From cc104369cb0d39652608e05aa7c6c1980e3b894f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 18 Feb 2022 18:53:10 -0300 Subject: [PATCH] fix and enable test_blob_announcer --- tests/unit/dht/test_blob_announcer.py | 135 +++++++++++++++----------- tests/unit/dht/test_node.py | 4 +- 2 files changed, 78 insertions(+), 61 deletions(-) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 5c7d921aa..d5b2c5e17 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -1,60 +1,70 @@ import contextlib +import logging import typing import binascii import socket import asyncio + from lbry.testcase import AsyncioTestCase from tests import dht_mocks +from lbry.dht.protocol.distance import Distance from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht.blob_announcer import BlobAnnouncer from lbry.extras.daemon.storage import SQLiteStorage -from unittest import skip + class TestBlobAnnouncer(AsyncioTestCase): + TIMEOUT = 20.0 # lower than default + async def setup_node(self, peer_addresses, address, node_id): self.nodes: typing.Dict[int, Node] = {} - self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time()) + self.advance = dht_mocks.get_time_accelerator(self.loop) + self.instant_advance = dht_mocks.get_time_accelerator(self.loop) self.conf = Config() - self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time) - await self.storage.open() self.peer_manager = PeerManager(self.loop) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) await self.node.start_listening(address) - self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) - for node_id, address in peer_addresses: - await self.add_peer(node_id, address) + await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses]) + for first_peer in self.nodes.values(): + for second_peer in self.nodes.values(): + if first_peer == second_peer: + continue + self.add_peer_to_routing_table(first_peer, second_peer) + self.add_peer_to_routing_table(second_peer, first_peer) + await self.advance(0.1) # just to make pings go through self.node.joined.set() self.node._refresh_task = self.loop.create_task(self.node.refresh_node()) + self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time) + await self.storage.open() + self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) async def add_peer(self, node_id, address, add_to_routing_table=True): + #print('add', node_id.hex()[:8], address) n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) await n.start_listening(address) self.nodes.update({len(self.nodes): n}) if add_to_routing_table: - self.node.protocol.add_peer( - make_kademlia_peer( - n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port - ) + self.add_peer_to_routing_table(self.node, n) + + def add_peer_to_routing_table(self, adder, being_added): + adder.protocol.add_peer( + make_kademlia_peer( + being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port ) + ) @contextlib.asynccontextmanager - async def _test_network_context(self, peer_addresses=None): - self.peer_addresses = peer_addresses or [ - (constants.generate_id(2), '1.2.3.2'), - (constants.generate_id(3), '1.2.3.3'), - (constants.generate_id(4), '1.2.3.4'), - (constants.generate_id(5), '1.2.3.5'), - (constants.generate_id(6), '1.2.3.6'), - (constants.generate_id(7), '1.2.3.7'), - (constants.generate_id(8), '1.2.3.8'), - (constants.generate_id(9), '1.2.3.9'), + async def _test_network_context(self, peer_count=200): + self.peer_addresses = [ + (constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big'))) + for i in range(1, peer_count + 1) ] try: with dht_mocks.mock_network_loop(self.loop): - await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1)) + await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000)) yield finally: self.blob_announcer.stop() @@ -73,43 +83,58 @@ class TestBlobAnnouncer(AsyncioTestCase): ) ) await peer.ping() - return peer + return last_node - @skip("Something from a previous test is leaking into this test and causing it to fail intermittently") async def test_announce_blobs(self): blob1 = binascii.hexlify(b'1' * 48).decode() blob2 = binascii.hexlify(b'2' * 48).decode() - async with self._test_network_context(): - await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True) - await self.storage.db.execute( - "update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)", - (blob1, blob2) - ) + async with self._test_network_context(peer_count=100): + await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True) + await self.storage.add_blobs( + *((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)), + finished=True) + await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1") to_announce = await self.storage.get_blobs_to_announce() - self.assertEqual(2, len(to_announce)) - self.blob_announcer.start(batch_size=1) # so it covers batching logic + self.assertEqual(92, len(to_announce)) + self.blob_announcer.start(batch_size=10) # so it covers batching logic # takes 60 seconds to start, but we advance 120 to ensure it processed all batches - await self.advance(60.0 * 2) + ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait()) + await self.instant_advance(60.0) + await ongoing_announcements to_announce = await self.storage.get_blobs_to_announce() self.assertEqual(0, len(to_announce)) self.blob_announcer.stop() + # as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI + tolerance = 0.8 # at least 80% of the announcements are within the top K + for blob in await self.storage.get_all_blob_hashes(): + distance = Distance(bytes.fromhex(blob)) + candidates = list(self.nodes.values()) + candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id)) + has_it = 0 + for index, node in enumerate(candidates[:constants.K], start=1): + if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)): + has_it += 1 + else: + logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8]) + self.assertGreaterEqual(has_it, int(tolerance * constants.K)) + + # test that we can route from a poorly connected peer all the way to the announced blob - await self.chain_peer(constants.generate_id(10), '1.2.3.10') - await self.chain_peer(constants.generate_id(11), '1.2.3.11') - await self.chain_peer(constants.generate_id(12), '1.2.3.12') - await self.chain_peer(constants.generate_id(13), '1.2.3.13') - await self.chain_peer(constants.generate_id(14), '1.2.3.14') - await self.advance(61.0) + current = len(self.nodes) + await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10') + await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11') + await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12') + await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13') + last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14') - last = self.nodes[len(self.nodes) - 1] search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop) search_q.put_nowait(blob1) _, task = last.accumulate_peers(search_q, peer_q) - found_peers = await peer_q.get() + found_peers = await asyncio.wait_for(peer_q.get(), 1.0) task.cancel() self.assertEqual(1, len(found_peers)) @@ -119,21 +144,13 @@ class TestBlobAnnouncer(AsyncioTestCase): async def test_popular_blob(self): peer_count = 150 - addresses = [ - (constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big'))) - for i in range(peer_count) - ] - blob_hash = b'1' * 48 + blob_hash = constants.generate_id(99999) - async with self._test_network_context(peer_addresses=addresses): + async with self._test_network_context(peer_count=peer_count): total_seen = set() - announced_to = self.nodes[0] - for i in range(1, peer_count): - node = self.nodes[i] - kad_peer = make_kademlia_peer( - node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port - ) - await announced_to.protocol._add_peer(kad_peer) + announced_to = self.nodes.pop(0) + for i, node in enumerate(self.nodes.values()): + self.add_peer_to_routing_table(announced_to, node) peer = node.protocol.get_rpc_peer( make_kademlia_peer( announced_to.protocol.node_id, @@ -144,15 +161,15 @@ class TestBlobAnnouncer(AsyncioTestCase): response = await peer.store(blob_hash) self.assertEqual(response, b'OK') peers_for_blob = await peer.find_value(blob_hash, 0) - if i == 1: + if i == 0: self.assertNotIn(blob_hash, peers_for_blob) self.assertEqual(peers_for_blob[b'p'], 0) else: - self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K)) - self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i) + self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K)) + self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1) if i - 1 > constants.K: self.assertEqual(len(peers_for_blob[b'contacts']), constants.K) - self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1) + self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1) seen = set(peers_for_blob[blob_hash]) self.assertEqual(len(seen), constants.K) self.assertEqual(len(peers_for_blob[blob_hash]), len(seen)) @@ -167,5 +184,5 @@ class TestBlobAnnouncer(AsyncioTestCase): seen.intersection_update(page_x_set) total_seen.update(page_x_set) else: - self.assertEqual(len(peers_for_blob[b'contacts']), i - 1) + self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page self.assertEqual(len(total_seen), peer_count - 2) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index c862305ec..fcf65ff10 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -29,7 +29,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase): (constants.generate_id(9), '1.2.3.9'), ] with dht_mocks.mock_network_loop(loop): - advance = dht_mocks.get_time_accelerator(loop, loop.time()) + advance = dht_mocks.get_time_accelerator(loop) # start the nodes nodes: typing.Dict[int, Node] = { i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address) @@ -131,7 +131,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase): await asyncio.gather(*[n.joined.wait() for n in nodes]) node = nodes[-1] - advance = dht_mocks.get_time_accelerator(loop, loop.time()) + advance = dht_mocks.get_time_accelerator(loop) await advance(500) # Join the network, assert that at least the known peers are in RT