From c321758afd0406b4f85b634ffcd0d4a8e2afe82f Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Mon, 11 Nov 2019 15:32:24 +0100 Subject: [PATCH] Rename, fix tests / deduplicate set_joined, resolve conditionally --- lbry/lbry/dht/node.py | 37 +++++++----------- lbry/lbry/extras/daemon/storage.py | 12 +++--- lbry/tests/integration/test_dht.py | 38 ++++++++++--------- .../tests/unit/database/test_SQLiteStorage.py | 4 +- 4 files changed, 42 insertions(+), 49 deletions(-) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index f9c44c72e..d5d2564fb 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -53,7 +53,7 @@ class Node: node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) if self._storage: - await self._storage.update_peers(self.protocol.routing_table.get_peers()) + await self._storage.save_kademlia_peers(self.protocol.routing_table.get_peers()) if self.protocol.routing_table.get_peers(): # if we have node ids to look up, perform the iterative search until we have k results @@ -136,42 +136,33 @@ class Node: peer_addresses.append((node_id, address, udp_port, tcp_port)) return [make_kademlia_peer(*peer_address) for peer_address in peer_addresses] - def set_joined(): - self.joined.set() - log.info( - "joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), - self.protocol.routing_table.buckets_with_contacts() - ) - if not self.listening_port: await self.start_listening(interface) self.protocol.ping_queue.start() self._refresh_task = self.loop.create_task(self.refresh_node()) - restored_peers = peers_from_urls(await self._storage.get_peers()) if self._storage else [] - - fixed_peers = peers_from_urls([ - (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) - for address, udp_port in known_node_urls or [] - ]) - - seed_peers = restored_peers or fixed_peers - fallback = False - while seed_peers: + seed_peers = peers_from_urls(await self._storage.get_persisted_kademlia_peers()) if self._storage else [] + while True: if self.protocol.routing_table.get_peers(): if not self.joined.is_set(): - set_joined() + self.joined.set() + log.info( + "joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), + self.protocol.routing_table.buckets_with_contacts() + ) else: if self.joined.is_set(): self.joined.clear() - seed_peers = fixed_peers if fallback else seed_peers self.protocol.peer_manager.reset() self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0) seed_peers.extend(await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32)) - fallback = not self.protocol.routing_table.get_peers() - await asyncio.sleep(1, loop=self.loop) + if not seed_peers or not self.protocol.routing_table.get_peers(): + seed_peers.extend(peers_from_urls([ + (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) + for address, udp_port in known_node_urls or [] + ])) - set_joined() + await asyncio.sleep(1, loop=self.loop) def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None): self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index 2dfee2ce3..3afd7ab11 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -331,11 +331,11 @@ class SQLiteStorage(SQLiteMixin): ); create table if not exists peer ( + node_id char(96) not null primary key, address text not null, udp_port integer not null, tcp_port integer, - node_id char(96) unique not null, - primary key (address, udp_port) + unique (address, udp_port) ); """ @@ -815,12 +815,12 @@ class SQLiteStorage(SQLiteMixin): ) # # # # # # # # # # dht functions # # # # # # # # # # # - async def get_peers(self): + async def get_persisted_kademlia_peers(self) -> typing.List[typing.Tuple[bytes, str, int, int]]: query = 'select node_id, address, udp_port, tcp_port from peer' return [(binascii.unhexlify(n), a, u, t) for n, a, u, t in await self.db.execute_fetchall(query)] - async def update_peers(self, peers: typing.List['KademliaPeer']): - def _update_peers(transaction: sqlite3.Connection): + async def save_kademlia_peers(self, peers: typing.List['KademliaPeer']): + def _save_kademlia_peers(transaction: sqlite3.Connection): transaction.execute('delete from peer').fetchall() transaction.executemany( 'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)', ( @@ -829,4 +829,4 @@ class SQLiteStorage(SQLiteMixin): ) ) ).fetchall() - return await self.db.run(_update_peers) + return await self.db.run(_save_kademlia_peers) diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index 080562451..4caff5fd4 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -44,10 +44,10 @@ class DHTIntegrationTest(AsyncioTestCase): for node in self.nodes: node.start(external_ip, self.known_node_addresses[:seed_nodes]) - await asyncio.gather(*[node.joined.wait() for node in self.nodes]) async def test_replace_bad_nodes(self): await self.setup_network(20) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) self.assertEqual(len(self.nodes), 20) node = self.nodes[0] bad_peers = [] @@ -68,6 +68,7 @@ class DHTIntegrationTest(AsyncioTestCase): async def test_re_join(self): await self.setup_network(20, seed_nodes=10) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) node = self.nodes[-1] self.assertTrue(node.joined.is_set()) self.assertTrue(node.protocol.routing_table.get_peers()) @@ -95,6 +96,7 @@ class DHTIntegrationTest(AsyncioTestCase): async def test_get_token_on_announce(self): await self.setup_network(2, seed_nodes=2) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) node1, node2 = self.nodes node1.protocol.peer_manager.clear_token(node2.protocol.node_id) blob_hash = hexlify(constants.generate_id(1337)).decode() @@ -112,6 +114,7 @@ class DHTIntegrationTest(AsyncioTestCase): # imagine that you only got bad peers and refresh will happen in one hour # instead of failing for one hour we should be able to recover by scheduling pings to bad peers we find await self.setup_network(2, seed_nodes=2) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) node1, node2 = self.nodes node2.stop() # forcefully make it a bad peer but don't remove it from routing table @@ -129,38 +132,36 @@ class DHTIntegrationTest(AsyncioTestCase): self.assertFalse(node1.protocol.routing_table.get_peers()) async def test_peer_persistance(self): - num_peers = 5 + num_nodes = 6 start_port = 40000 + num_seeds = 2 external_ip = '127.0.0.1' # Start a node - node1 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) - node1.start(external_ip) + await self.setup_network(num_nodes, start_port=start_port, seed_nodes=num_seeds) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) - # Add peers - peer_args = [(n.protocol.nodeid, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes] + node1 = self.nodes[-1] + peer_args = [(n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port, n.protocol.peer_port) for n in + self.nodes[:num_seeds]] peers = [make_kademlia_peer(*args) for args in peer_args] - for peer in peers: - await node1.protocol._add_peer(peer) - await asyncio.sleep(.3) - self.assertTrue(node1.joined.is_set()) + # node1 is bootstrapped from the fixed seeds self.assertCountEqual(peers, node1.protocol.routing_table.get_peers()) # Refresh and assert that the peers were persisted await node1.refresh_node(True) - self.assertCountEqual(peer_args, await node1._storage.get_peers()) + self.assertEqual(len(peer_args), len(await node1._storage.get_persisted_kademlia_peers())) node1.stop() - # Start a fresh node with the same node_id and storage - node2 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers+1) + # Start a fresh node with the same node_id and storage, but no known peers + node2 = await self.create_node(constants.generate_id(num_nodes-1), start_port+num_nodes-1) node2._storage = node1._storage - node2.start(external_ip) + node2.start(external_ip, []) + await node2.joined.wait() # The peers are restored - await asyncio.sleep(.3) - self.assertTrue(node2.joined.is_set()) - self.assertCountEqual(peers, node2.protocol.routing_table.get_peers()) + self.assertEqual(num_seeds, len(node2.protocol.routing_table.get_peers())) for bucket1, bucket2 in zip(node1.protocol.routing_table.buckets, node2.protocol.routing_table.buckets): self.assertEqual((bucket1.range_min, bucket1.range_max), (bucket2.range_min, bucket2.range_max)) @@ -170,6 +171,7 @@ class DHTIntegrationTest(AsyncioTestCase): external_ip = '127.0.0.1' await self.setup_network(num_peers, seed_nodes=num_peers // 2, start_port=start_port) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) peer_args = [ (n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes ] @@ -180,7 +182,7 @@ class DHTIntegrationTest(AsyncioTestCase): # Create node with the persisted nodes in storage node = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) - await node._storage.update_peers(persisted_peers) + await node._storage.save_kademlia_peers(persisted_peers) # Stop known peers so they stop replying and won't be added for n in known_nodes: diff --git a/lbry/tests/unit/database/test_SQLiteStorage.py b/lbry/tests/unit/database/test_SQLiteStorage.py index bd5dfd67f..ab63512e7 100644 --- a/lbry/tests/unit/database/test_SQLiteStorage.py +++ b/lbry/tests/unit/database/test_SQLiteStorage.py @@ -256,6 +256,6 @@ class UpdatePeersTest(StorageTest): node_id = hashlib.sha384("1234".encode()).digest() args = (node_id, '73.186.148.72', 4444, None) fake_peer = make_kademlia_peer(*args) - await self.storage.update_peers([fake_peer]) - peers = await self.storage.get_peers() + await self.storage.save_kademlia_peers([fake_peer]) + peers = await self.storage.get_persisted_kademlia_peers() self.assertTupleEqual(args, peers[0])