From ca8f59a64370b7c34eb5008adca1587a2b8ae55c Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Thu, 31 Oct 2019 13:57:30 +0100 Subject: [PATCH 01/10] Persist and restore routing table --- lbry/lbry/dht/node.py | 88 ++++++------- lbry/lbry/extras/daemon/Components.py | 10 +- lbry/lbry/extras/daemon/storage.py | 25 ++++ lbry/tests/integration/test_dht.py | 116 ++++++++++++++++-- .../tests/unit/database/test_SQLiteStorage.py | 12 ++ 5 files changed, 194 insertions(+), 57 deletions(-) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index 2159cb86a..f9c44c72e 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -19,7 +19,8 @@ log = logging.getLogger(__name__) class Node: def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.rpc_timeout, - split_buckets_under_index: int = constants.split_buckets_under_index): + split_buckets_under_index: int = constants.split_buckets_under_index, + storage: typing.Optional['SQLiteStorage'] = None): self.loop = loop self.internal_udp_port = internal_udp_port self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout, @@ -28,6 +29,7 @@ class Node: self.joined = asyncio.Event(loop=self.loop) self._join_task: asyncio.Task = None self._refresh_task: asyncio.Task = None + self._storage = storage async def refresh_node(self, force_once=False): while True: @@ -50,6 +52,9 @@ class Node: node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) + if self._storage: + await self._storage.update_peers(self.protocol.routing_table.get_peers()) + if self.protocol.routing_table.get_peers(): # if we have node ids to look up, perform the iterative search until we have k results while node_ids: @@ -111,7 +116,7 @@ class Node: self.listening_port = None log.info("Stopped DHT node") - async def start_listening(self, interface: str = '') -> None: + async def start_listening(self, interface: str = '0.0.0.0') -> None: if not self.listening_port: self.listening_port, _ = await self.loop.create_datagram_endpoint( lambda: self.protocol, (interface, self.internal_udp_port) @@ -121,56 +126,55 @@ class Node: else: log.warning("Already bound to port %s", self.listening_port) - async def join_network(self, interface: typing.Optional[str] = '', + async def join_network(self, interface: str = '0.0.0.0', known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None): + def peers_from_urls(urls: typing.Optional[typing.List[typing.Tuple[bytes, str, int, int]]]): + peer_addresses = [] + for node_id, address, udp_port, tcp_port in urls: + if (node_id, address, udp_port, tcp_port) not in peer_addresses and \ + (address, udp_port) != (self.protocol.external_ip, self.protocol.udp_port): + peer_addresses.append((node_id, address, udp_port, tcp_port)) + return [make_kademlia_peer(*peer_address) for peer_address in peer_addresses] + + def set_joined(): + self.joined.set() + log.info( + "joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), + self.protocol.routing_table.buckets_with_contacts() + ) + if not self.listening_port: await self.start_listening(interface) self.protocol.ping_queue.start() self._refresh_task = self.loop.create_task(self.refresh_node()) - # resolve the known node urls - known_node_addresses = [] - url_to_addr = {} + restored_peers = peers_from_urls(await self._storage.get_peers()) if self._storage else [] - if known_node_urls: - for host, port in known_node_urls: - address = await resolve_host(host, port, proto='udp') - if (address, port) not in known_node_addresses and\ - (address, port) != (self.protocol.external_ip, self.protocol.udp_port): - known_node_addresses.append((address, port)) - url_to_addr[address] = host + fixed_peers = peers_from_urls([ + (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) + for address, udp_port in known_node_urls or [] + ]) - if known_node_addresses: - peers = [ - make_kademlia_peer(None, address, port) - for (address, port) in known_node_addresses - ] - while True: - if not self.protocol.routing_table.get_peers(): - if self.joined.is_set(): - self.joined.clear() - self.protocol.peer_manager.reset() - self.protocol.ping_queue.enqueue_maybe_ping(*peers, delay=0.0) - peers.extend(await self.peer_search(self.protocol.node_id, shortlist=peers, count=32)) - if self.protocol.routing_table.get_peers(): - self.joined.set() - log.info( - "Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), - self.protocol.routing_table.buckets_with_contacts()) - else: - continue - await asyncio.sleep(1, loop=self.loop) + seed_peers = restored_peers or fixed_peers + fallback = False + while seed_peers: + if self.protocol.routing_table.get_peers(): + if not self.joined.is_set(): + set_joined() + else: + if self.joined.is_set(): + self.joined.clear() + seed_peers = fixed_peers if fallback else seed_peers + self.protocol.peer_manager.reset() + self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0) + seed_peers.extend(await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32)) + fallback = not self.protocol.routing_table.get_peers() + await asyncio.sleep(1, loop=self.loop) - log.info("Joined DHT, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), - self.protocol.routing_table.buckets_with_contacts()) - self.joined.set() + set_joined() - def start(self, interface: str, known_node_urls: typing.List[typing.Tuple[str, int]]): - self._join_task = self.loop.create_task( - self.join_network( - interface=interface, known_node_urls=known_node_urls - ) - ) + def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None): + self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, bottom_out_limit: int = constants.bottom_out_limit, diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index 78a793381..98b1d5250 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -189,7 +189,7 @@ class BlobComponent(Component): class DHTComponent(Component): component_name = DHT_COMPONENT - depends_on = [UPNP_COMPONENT] + depends_on = [UPNP_COMPONENT, DATABASE_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) @@ -223,6 +223,7 @@ class DHTComponent(Component): self.external_peer_port = upnp_component.upnp_redirects.get("TCP", self.conf.tcp_port) self.external_udp_port = upnp_component.upnp_redirects.get("UDP", self.conf.udp_port) external_ip = upnp_component.external_ip + storage = self.component_manager.get_component(DATABASE_COMPONENT) if not external_ip: external_ip = await utils.get_external_ip() if not external_ip: @@ -237,11 +238,10 @@ class DHTComponent(Component): external_ip=external_ip, peer_port=self.external_peer_port, rpc_timeout=self.conf.node_rpc_timeout, - split_buckets_under_index=self.conf.split_buckets_under_index - ) - self.dht_node.start( - interface=self.conf.network_interface, known_node_urls=self.conf.known_dht_nodes + split_buckets_under_index=self.conf.split_buckets_under_index, + storage=storage ) + self.dht_node.start(self.conf.network_interface, self.conf.known_dht_nodes) log.info("Started the dht") async def stop(self): diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index b5859d6c1..2dfee2ce3 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -329,6 +329,14 @@ class SQLiteStorage(SQLiteMixin): timestamp integer, primary key (sd_hash, reflector_address) ); + + create table if not exists peer ( + address text not null, + udp_port integer not null, + tcp_port integer, + node_id char(96) unique not null, + primary key (address, udp_port) + ); """ def __init__(self, conf: Config, path, loop=None, time_getter: typing.Optional[typing.Callable[[], float]] = None): @@ -805,3 +813,20 @@ class SQLiteStorage(SQLiteMixin): "where r.timestamp is null or r.timestamp < ?", int(self.time_getter()) - 86400 ) + + # # # # # # # # # # dht functions # # # # # # # # # # # + async def get_peers(self): + query = 'select node_id, address, udp_port, tcp_port from peer' + return [(binascii.unhexlify(n), a, u, t) for n, a, u, t in await self.db.execute_fetchall(query)] + + async def update_peers(self, peers: typing.List['KademliaPeer']): + def _update_peers(transaction: sqlite3.Connection): + transaction.execute('delete from peer').fetchall() + transaction.executemany( + 'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)', ( + tuple( + [(binascii.hexlify(p.node_id), p.address, p.udp_port, p.tcp_port) for p in peers] + ) + ) + ).fetchall() + return await self.db.run(_update_peers) diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index 25443ee6c..080562451 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -1,6 +1,8 @@ import asyncio from binascii import hexlify +from lbry.extras.daemon.storage import SQLiteStorage +from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node from lbry.dht import peer as dht_peer @@ -19,20 +21,29 @@ class DHTIntegrationTest(AsyncioTestCase): self.nodes = [] self.known_node_addresses = [] - async def setup_network(self, size: int, start_port=40000, seed_nodes=1): + async def create_node(self, node_id, port, external_ip='127.0.0.1'): + storage = SQLiteStorage(Config(), ":memory:", self.loop, self.loop.time) + await storage.open() + node = Node(self.loop, PeerManager(self.loop), node_id=node_id, + udp_port=port, internal_udp_port=port, + peer_port=3333, external_ip=external_ip, + storage=storage) + self.addCleanup(node.stop) + node.protocol.rpc_timeout = .5 + node.protocol.ping_queue._default_delay = .5 + node._peer_search_timeout = .5 + return node + + async def setup_network(self, size: int, start_port=40000, seed_nodes=1, external_ip='127.0.0.1'): for i in range(size): node_port = start_port + i - node = Node(self.loop, PeerManager(self.loop), node_id=constants.generate_id(i), - udp_port=node_port, internal_udp_port=node_port, - peer_port=3333, external_ip='127.0.0.1') + node_id = constants.generate_id(i) + node = await self.create_node(node_id, node_port) self.nodes.append(node) - self.known_node_addresses.append(('127.0.0.1', node_port)) - await node.start_listening('127.0.0.1') - self.addCleanup(node.stop) + self.known_node_addresses.append((external_ip, node_port)) + for node in self.nodes: - node.protocol.rpc_timeout = .5 - node.protocol.ping_queue._default_delay = .5 - node.start('127.0.0.1', self.known_node_addresses[:seed_nodes]) + node.start(external_ip, self.known_node_addresses[:seed_nodes]) await asyncio.gather(*[node.joined.wait() for node in self.nodes]) async def test_replace_bad_nodes(self): @@ -116,3 +127,88 @@ class DHTIntegrationTest(AsyncioTestCase): await node1.peer_search(node2.protocol.node_id) await asyncio.sleep(.3) # let pending events settle self.assertFalse(node1.protocol.routing_table.get_peers()) + + async def test_peer_persistance(self): + num_peers = 5 + start_port = 40000 + external_ip = '127.0.0.1' + + # Start a node + node1 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) + node1.start(external_ip) + + # Add peers + peer_args = [(n.protocol.nodeid, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes] + peers = [make_kademlia_peer(*args) for args in peer_args] + for peer in peers: + await node1.protocol._add_peer(peer) + + await asyncio.sleep(.3) + self.assertTrue(node1.joined.is_set()) + self.assertCountEqual(peers, node1.protocol.routing_table.get_peers()) + + # Refresh and assert that the peers were persisted + await node1.refresh_node(True) + self.assertCountEqual(peer_args, await node1._storage.get_peers()) + node1.stop() + + # Start a fresh node with the same node_id and storage + node2 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers+1) + node2._storage = node1._storage + node2.start(external_ip) + + # The peers are restored + await asyncio.sleep(.3) + self.assertTrue(node2.joined.is_set()) + self.assertCountEqual(peers, node2.protocol.routing_table.get_peers()) + for bucket1, bucket2 in zip(node1.protocol.routing_table.buckets, node2.protocol.routing_table.buckets): + self.assertEqual((bucket1.range_min, bucket1.range_max), (bucket2.range_min, bucket2.range_max)) + + async def test_switch_to_known_seeds(self): + num_peers = 10 + start_port = 40000 + external_ip = '127.0.0.1' + + await self.setup_network(num_peers, seed_nodes=num_peers // 2, start_port=start_port) + peer_args = [ + (n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes + ] + known_peers = [make_kademlia_peer(*args) for args in peer_args[:num_peers // 2]] + known_nodes = self.nodes[:num_peers // 2] + persisted_peers = [make_kademlia_peer(*args) for args in peer_args[num_peers // 2:]] + persisted_nodes = self.nodes[num_peers // 2:] + + # Create node with the persisted nodes in storage + node = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) + await node._storage.update_peers(persisted_peers) + + # Stop known peers so they stop replying and won't be added + for n in known_nodes: + n.stop() + + node.start(external_ip, self.known_node_addresses[:num_peers // 2]) + await node.joined.wait() + self.assertTrue(node.joined.is_set()) + + # Only persisted ones are added to the routing table + self.assertCountEqual(persisted_peers, node.protocol.routing_table.get_peers()) + + # Start the known ones, stop the persisted + for n1, n2 in zip(known_nodes, persisted_nodes): + n1.start(external_ip) + n2.stop() + asyncio.gather(*[n.joined.wait() for n in known_nodes]) + await asyncio.sleep(3) + self.assertTrue(all(known.joined.is_set() for known in known_nodes)) + self.assertTrue(all(not persisted.joined.is_set() for persisted in persisted_nodes)) + + # Remove persisted from node's routing table, set them as bad + for peer in persisted_peers: + node.protocol.routing_table.remove_peer(peer) + node.protocol.peer_manager.report_failure(peer.address, peer.udp_port) + self.assertFalse(node.protocol.routing_table.get_peers()) + + # The known_peers replace the persisted ones + await node.joined.wait() + await asyncio.sleep(3) + self.assertCountEqual(known_peers, node.protocol.routing_table.get_peers()) diff --git a/lbry/tests/unit/database/test_SQLiteStorage.py b/lbry/tests/unit/database/test_SQLiteStorage.py index e0a11ad84..bd5dfd67f 100644 --- a/lbry/tests/unit/database/test_SQLiteStorage.py +++ b/lbry/tests/unit/database/test_SQLiteStorage.py @@ -3,6 +3,7 @@ import tempfile import unittest import asyncio import logging +import hashlib from torba.testcase import AsyncioTestCase from lbry.conf import Config from lbry.extras.daemon.storage import SQLiteStorage @@ -10,6 +11,7 @@ from lbry.blob.blob_info import BlobInfo from lbry.blob.blob_manager import BlobManager from lbry.stream.descriptor import StreamDescriptor from tests.test_utils import random_lbry_hash +from lbry.dht.peer import make_kademlia_peer log = logging.getLogger() @@ -247,3 +249,13 @@ class ContentClaimStorageTests(StorageTest): current_claim_info = await self.storage.get_content_claim(stream_hash) # this should still be the previous update self.assertDictEqual(current_claim_info, update_info) + + +class UpdatePeersTest(StorageTest): + async def test_update_get_peers(self): + node_id = hashlib.sha384("1234".encode()).digest() + args = (node_id, '73.186.148.72', 4444, None) + fake_peer = make_kademlia_peer(*args) + await self.storage.update_peers([fake_peer]) + peers = await self.storage.get_peers() + self.assertTupleEqual(args, peers[0]) From c321758afd0406b4f85b634ffcd0d4a8e2afe82f Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Mon, 11 Nov 2019 15:32:24 +0100 Subject: [PATCH 02/10] Rename, fix tests / deduplicate set_joined, resolve conditionally --- lbry/lbry/dht/node.py | 37 +++++++----------- lbry/lbry/extras/daemon/storage.py | 12 +++--- lbry/tests/integration/test_dht.py | 38 ++++++++++--------- .../tests/unit/database/test_SQLiteStorage.py | 4 +- 4 files changed, 42 insertions(+), 49 deletions(-) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index f9c44c72e..d5d2564fb 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -53,7 +53,7 @@ class Node: node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) if self._storage: - await self._storage.update_peers(self.protocol.routing_table.get_peers()) + await self._storage.save_kademlia_peers(self.protocol.routing_table.get_peers()) if self.protocol.routing_table.get_peers(): # if we have node ids to look up, perform the iterative search until we have k results @@ -136,42 +136,33 @@ class Node: peer_addresses.append((node_id, address, udp_port, tcp_port)) return [make_kademlia_peer(*peer_address) for peer_address in peer_addresses] - def set_joined(): - self.joined.set() - log.info( - "joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), - self.protocol.routing_table.buckets_with_contacts() - ) - if not self.listening_port: await self.start_listening(interface) self.protocol.ping_queue.start() self._refresh_task = self.loop.create_task(self.refresh_node()) - restored_peers = peers_from_urls(await self._storage.get_peers()) if self._storage else [] - - fixed_peers = peers_from_urls([ - (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) - for address, udp_port in known_node_urls or [] - ]) - - seed_peers = restored_peers or fixed_peers - fallback = False - while seed_peers: + seed_peers = peers_from_urls(await self._storage.get_persisted_kademlia_peers()) if self._storage else [] + while True: if self.protocol.routing_table.get_peers(): if not self.joined.is_set(): - set_joined() + self.joined.set() + log.info( + "joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()), + self.protocol.routing_table.buckets_with_contacts() + ) else: if self.joined.is_set(): self.joined.clear() - seed_peers = fixed_peers if fallback else seed_peers self.protocol.peer_manager.reset() self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0) seed_peers.extend(await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32)) - fallback = not self.protocol.routing_table.get_peers() - await asyncio.sleep(1, loop=self.loop) + if not seed_peers or not self.protocol.routing_table.get_peers(): + seed_peers.extend(peers_from_urls([ + (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) + for address, udp_port in known_node_urls or [] + ])) - set_joined() + await asyncio.sleep(1, loop=self.loop) def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None): self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index 2dfee2ce3..3afd7ab11 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -331,11 +331,11 @@ class SQLiteStorage(SQLiteMixin): ); create table if not exists peer ( + node_id char(96) not null primary key, address text not null, udp_port integer not null, tcp_port integer, - node_id char(96) unique not null, - primary key (address, udp_port) + unique (address, udp_port) ); """ @@ -815,12 +815,12 @@ class SQLiteStorage(SQLiteMixin): ) # # # # # # # # # # dht functions # # # # # # # # # # # - async def get_peers(self): + async def get_persisted_kademlia_peers(self) -> typing.List[typing.Tuple[bytes, str, int, int]]: query = 'select node_id, address, udp_port, tcp_port from peer' return [(binascii.unhexlify(n), a, u, t) for n, a, u, t in await self.db.execute_fetchall(query)] - async def update_peers(self, peers: typing.List['KademliaPeer']): - def _update_peers(transaction: sqlite3.Connection): + async def save_kademlia_peers(self, peers: typing.List['KademliaPeer']): + def _save_kademlia_peers(transaction: sqlite3.Connection): transaction.execute('delete from peer').fetchall() transaction.executemany( 'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)', ( @@ -829,4 +829,4 @@ class SQLiteStorage(SQLiteMixin): ) ) ).fetchall() - return await self.db.run(_update_peers) + return await self.db.run(_save_kademlia_peers) diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index 080562451..4caff5fd4 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -44,10 +44,10 @@ class DHTIntegrationTest(AsyncioTestCase): for node in self.nodes: node.start(external_ip, self.known_node_addresses[:seed_nodes]) - await asyncio.gather(*[node.joined.wait() for node in self.nodes]) async def test_replace_bad_nodes(self): await self.setup_network(20) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) self.assertEqual(len(self.nodes), 20) node = self.nodes[0] bad_peers = [] @@ -68,6 +68,7 @@ class DHTIntegrationTest(AsyncioTestCase): async def test_re_join(self): await self.setup_network(20, seed_nodes=10) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) node = self.nodes[-1] self.assertTrue(node.joined.is_set()) self.assertTrue(node.protocol.routing_table.get_peers()) @@ -95,6 +96,7 @@ class DHTIntegrationTest(AsyncioTestCase): async def test_get_token_on_announce(self): await self.setup_network(2, seed_nodes=2) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) node1, node2 = self.nodes node1.protocol.peer_manager.clear_token(node2.protocol.node_id) blob_hash = hexlify(constants.generate_id(1337)).decode() @@ -112,6 +114,7 @@ class DHTIntegrationTest(AsyncioTestCase): # imagine that you only got bad peers and refresh will happen in one hour # instead of failing for one hour we should be able to recover by scheduling pings to bad peers we find await self.setup_network(2, seed_nodes=2) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) node1, node2 = self.nodes node2.stop() # forcefully make it a bad peer but don't remove it from routing table @@ -129,38 +132,36 @@ class DHTIntegrationTest(AsyncioTestCase): self.assertFalse(node1.protocol.routing_table.get_peers()) async def test_peer_persistance(self): - num_peers = 5 + num_nodes = 6 start_port = 40000 + num_seeds = 2 external_ip = '127.0.0.1' # Start a node - node1 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) - node1.start(external_ip) + await self.setup_network(num_nodes, start_port=start_port, seed_nodes=num_seeds) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) - # Add peers - peer_args = [(n.protocol.nodeid, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes] + node1 = self.nodes[-1] + peer_args = [(n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port, n.protocol.peer_port) for n in + self.nodes[:num_seeds]] peers = [make_kademlia_peer(*args) for args in peer_args] - for peer in peers: - await node1.protocol._add_peer(peer) - await asyncio.sleep(.3) - self.assertTrue(node1.joined.is_set()) + # node1 is bootstrapped from the fixed seeds self.assertCountEqual(peers, node1.protocol.routing_table.get_peers()) # Refresh and assert that the peers were persisted await node1.refresh_node(True) - self.assertCountEqual(peer_args, await node1._storage.get_peers()) + self.assertEqual(len(peer_args), len(await node1._storage.get_persisted_kademlia_peers())) node1.stop() - # Start a fresh node with the same node_id and storage - node2 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers+1) + # Start a fresh node with the same node_id and storage, but no known peers + node2 = await self.create_node(constants.generate_id(num_nodes-1), start_port+num_nodes-1) node2._storage = node1._storage - node2.start(external_ip) + node2.start(external_ip, []) + await node2.joined.wait() # The peers are restored - await asyncio.sleep(.3) - self.assertTrue(node2.joined.is_set()) - self.assertCountEqual(peers, node2.protocol.routing_table.get_peers()) + self.assertEqual(num_seeds, len(node2.protocol.routing_table.get_peers())) for bucket1, bucket2 in zip(node1.protocol.routing_table.buckets, node2.protocol.routing_table.buckets): self.assertEqual((bucket1.range_min, bucket1.range_max), (bucket2.range_min, bucket2.range_max)) @@ -170,6 +171,7 @@ class DHTIntegrationTest(AsyncioTestCase): external_ip = '127.0.0.1' await self.setup_network(num_peers, seed_nodes=num_peers // 2, start_port=start_port) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) peer_args = [ (n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes ] @@ -180,7 +182,7 @@ class DHTIntegrationTest(AsyncioTestCase): # Create node with the persisted nodes in storage node = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) - await node._storage.update_peers(persisted_peers) + await node._storage.save_kademlia_peers(persisted_peers) # Stop known peers so they stop replying and won't be added for n in known_nodes: diff --git a/lbry/tests/unit/database/test_SQLiteStorage.py b/lbry/tests/unit/database/test_SQLiteStorage.py index bd5dfd67f..ab63512e7 100644 --- a/lbry/tests/unit/database/test_SQLiteStorage.py +++ b/lbry/tests/unit/database/test_SQLiteStorage.py @@ -256,6 +256,6 @@ class UpdatePeersTest(StorageTest): node_id = hashlib.sha384("1234".encode()).digest() args = (node_id, '73.186.148.72', 4444, None) fake_peer = make_kademlia_peer(*args) - await self.storage.update_peers([fake_peer]) - peers = await self.storage.get_peers() + await self.storage.save_kademlia_peers([fake_peer]) + peers = await self.storage.get_persisted_kademlia_peers() self.assertTupleEqual(args, peers[0]) From a80fbcc2524fc99b8ec5cd8303e2026a8ff55ee2 Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Mon, 11 Nov 2019 17:01:41 +0100 Subject: [PATCH 03/10] Catch resolve timeouts --- lbry/lbry/dht/node.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index d5d2564fb..e963af445 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -141,7 +141,6 @@ class Node: self.protocol.ping_queue.start() self._refresh_task = self.loop.create_task(self.refresh_node()) - seed_peers = peers_from_urls(await self._storage.get_persisted_kademlia_peers()) if self._storage else [] while True: if self.protocol.routing_table.get_peers(): if not self.joined.is_set(): @@ -153,14 +152,22 @@ class Node: else: if self.joined.is_set(): self.joined.clear() + seed_peers = peers_from_urls( + await self._storage.get_persisted_kademlia_peers() + ) if self._storage else [] + if not seed_peers: + try: + seed_peers.extend(peers_from_urls([ + (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) + for address, udp_port in known_node_urls or [] + ])) + except asyncio.TimeoutError: + await asyncio.sleep(30) + continue + self.protocol.peer_manager.reset() self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0) - seed_peers.extend(await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32)) - if not seed_peers or not self.protocol.routing_table.get_peers(): - seed_peers.extend(peers_from_urls([ - (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) - for address, udp_port in known_node_urls or [] - ])) + await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32) await asyncio.sleep(1, loop=self.loop) From 7191042bb821e152f1f1d9cd5adfdce9f4bef608 Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Tue, 12 Nov 2019 22:29:56 +0100 Subject: [PATCH 04/10] Add new test, remove old test, change error type --- lbry/lbry/dht/node.py | 8 ++-- lbry/tests/dht_mocks.py | 5 ++- lbry/tests/integration/test_dht.py | 50 ----------------------- lbry/tests/unit/dht/test_node.py | 65 ++++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 56 deletions(-) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index e963af445..bf8afba61 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -2,6 +2,7 @@ import logging import asyncio import typing import binascii +import socket from lbry.utils import resolve_host from lbry.dht import constants from lbry.dht.peer import make_kademlia_peer @@ -52,9 +53,6 @@ class Node: node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i)) - if self._storage: - await self._storage.save_kademlia_peers(self.protocol.routing_table.get_peers()) - if self.protocol.routing_table.get_peers(): # if we have node ids to look up, perform the iterative search until we have k results while node_ids: @@ -72,6 +70,8 @@ class Node: to_ping = [peer for peer in set(total_peers) if self.protocol.peer_manager.peer_is_good(peer) is not True] if to_ping: self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) + if self._storage: + await self._storage.save_kademlia_peers(self.protocol.routing_table.get_peers()) if force_once: break @@ -161,7 +161,7 @@ class Node: (None, await resolve_host(address, udp_port, 'udp'), udp_port, None) for address, udp_port in known_node_urls or [] ])) - except asyncio.TimeoutError: + except socket.gaierror: await asyncio.sleep(30) continue diff --git a/lbry/tests/dht_mocks.py b/lbry/tests/dht_mocks.py index cec9b8e80..c725ec32e 100644 --- a/lbry/tests/dht_mocks.py +++ b/lbry/tests/dht_mocks.py @@ -48,8 +48,9 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop, @contextlib.contextmanager -def mock_network_loop(loop: asyncio.AbstractEventLoop): - dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = {} +def mock_network_loop(loop: asyncio.AbstractEventLoop, + dht_network: typing.Optional[typing.Dict[typing.Tuple[str, int], 'KademliaProtocol']] = None): + dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = dht_network or {} async def create_datagram_endpoint(proto_lam: typing.Callable[[], 'KademliaProtocol'], from_addr: typing.Tuple[str, int]): diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index 4caff5fd4..3e75acb91 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -164,53 +164,3 @@ class DHTIntegrationTest(AsyncioTestCase): self.assertEqual(num_seeds, len(node2.protocol.routing_table.get_peers())) for bucket1, bucket2 in zip(node1.protocol.routing_table.buckets, node2.protocol.routing_table.buckets): self.assertEqual((bucket1.range_min, bucket1.range_max), (bucket2.range_min, bucket2.range_max)) - - async def test_switch_to_known_seeds(self): - num_peers = 10 - start_port = 40000 - external_ip = '127.0.0.1' - - await self.setup_network(num_peers, seed_nodes=num_peers // 2, start_port=start_port) - await asyncio.gather(*[node.joined.wait() for node in self.nodes]) - peer_args = [ - (n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes - ] - known_peers = [make_kademlia_peer(*args) for args in peer_args[:num_peers // 2]] - known_nodes = self.nodes[:num_peers // 2] - persisted_peers = [make_kademlia_peer(*args) for args in peer_args[num_peers // 2:]] - persisted_nodes = self.nodes[num_peers // 2:] - - # Create node with the persisted nodes in storage - node = await self.create_node(constants.generate_id(num_peers), start_port+num_peers) - await node._storage.save_kademlia_peers(persisted_peers) - - # Stop known peers so they stop replying and won't be added - for n in known_nodes: - n.stop() - - node.start(external_ip, self.known_node_addresses[:num_peers // 2]) - await node.joined.wait() - self.assertTrue(node.joined.is_set()) - - # Only persisted ones are added to the routing table - self.assertCountEqual(persisted_peers, node.protocol.routing_table.get_peers()) - - # Start the known ones, stop the persisted - for n1, n2 in zip(known_nodes, persisted_nodes): - n1.start(external_ip) - n2.stop() - asyncio.gather(*[n.joined.wait() for n in known_nodes]) - await asyncio.sleep(3) - self.assertTrue(all(known.joined.is_set() for known in known_nodes)) - self.assertTrue(all(not persisted.joined.is_set() for persisted in persisted_nodes)) - - # Remove persisted from node's routing table, set them as bad - for peer in persisted_peers: - node.protocol.routing_table.remove_peer(peer) - node.protocol.peer_manager.report_failure(peer.address, peer.udp_port) - self.assertFalse(node.protocol.routing_table.get_peers()) - - # The known_peers replace the persisted ones - await node.joined.wait() - await asyncio.sleep(3) - self.assertCountEqual(known_peers, node.protocol.routing_table.get_peers()) diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index 03902b553..28bd19bbe 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -2,9 +2,11 @@ import asyncio import typing from torba.testcase import AsyncioTestCase from tests import dht_mocks +from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node from lbry.dht.peer import PeerManager, make_kademlia_peer +from lbry.extras.daemon.storage import SQLiteStorage class TestNodePingQueueDiscover(AsyncioTestCase): @@ -84,3 +86,66 @@ class TestNodePingQueueDiscover(AsyncioTestCase): # teardown for n in nodes.values(): n.stop() + + +class TestTemporarilyLosingConnetction(AsyncioTestCase): + + async def test_losing_connection(self): + loop = asyncio.get_event_loop() + loop.set_debug(False) + + peer_addresses = [ + (f'127.0.0.1', 40000+i) for i in range(10) + ] + node_ids = [constants.generate_id(i) for i in range(10)] + + nodes = [ + Node( + loop, PeerManager(loop), node_id, udp_port, udp_port, 3333, address, + storage=SQLiteStorage(Config(), ":memory:", self.loop, self.loop.time) + ) + for node_id, (address, udp_port) in zip(node_ids, peer_addresses) + ] + dht_network = {peer_addresses[i]: node.protocol for i, node in enumerate(nodes)} + num_seeds = 3 + + with dht_mocks.mock_network_loop(loop, dht_network): + for i, n in enumerate(nodes): + await n._storage.open() + n.start(peer_addresses[i][0], peer_addresses[:num_seeds]) + + node = nodes[-1] + advance = dht_mocks.get_time_accelerator(loop, loop.time()) + await advance(1000) + + # Join the network, assert that at least the known peers are in RT + self.assertTrue(node.joined.is_set()) + self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds) + + # Refresh, so that the peers are persisted + await advance(4000) + self.assertTrue(len(await node._storage.get_persisted_kademlia_peers()) > num_seeds) + + # We lost internet connection - all the peers stop responding + popped_protocols = [] + for peer_address in peer_addresses[:-1]: + popped_protocols.append(dht_network.pop(peer_address)) + + # The peers are cleared on refresh from RT and storage + await advance(4000) + self.assertFalse(node.protocol.routing_table.get_peers()) + self.assertFalse(await node._storage.get_persisted_kademlia_peers()) + + # Reconnect some of the previously stored - node shouldn't connect + for peer_address, protocol in zip(peer_addresses[num_seeds+1:-2], popped_protocols[num_seeds+1:-2]): + dht_network[peer_address] = protocol + await advance(1000) + self.assertEqual(0, len(node.protocol.routing_table.get_peers())) + + # Reconnect some of the seed nodes + for peer_address, protocol in zip(peer_addresses[:num_seeds], popped_protocols[:num_seeds]): + dht_network[peer_address] = protocol + + # Check that node reconnects at least to them + await advance(1000) + self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds) From 5951186463128d073d5502d8d1e0e4f6341a3bf3 Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Wed, 13 Nov 2019 19:14:43 +0100 Subject: [PATCH 05/10] Minor changes Fix typos --- lbry/lbry/dht/node.py | 2 +- lbry/lbry/extras/daemon/storage.py | 7 ++----- lbry/tests/integration/test_dht.py | 1 - lbry/tests/unit/dht/test_node.py | 17 ++++++++++------- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/lbry/lbry/dht/node.py b/lbry/lbry/dht/node.py index bf8afba61..9f439e25d 100644 --- a/lbry/lbry/dht/node.py +++ b/lbry/lbry/dht/node.py @@ -162,7 +162,7 @@ class Node: for address, udp_port in known_node_urls or [] ])) except socket.gaierror: - await asyncio.sleep(30) + await asyncio.sleep(30, loop=self.loop) continue self.protocol.peer_manager.reset() diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index 3afd7ab11..0c8eeb02f 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -823,10 +823,7 @@ class SQLiteStorage(SQLiteMixin): def _save_kademlia_peers(transaction: sqlite3.Connection): transaction.execute('delete from peer').fetchall() transaction.executemany( - 'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)', ( - tuple( - [(binascii.hexlify(p.node_id), p.address, p.udp_port, p.tcp_port) for p in peers] - ) - ) + 'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)', + tuple([(binascii.hexlify(p.node_id), p.address, p.udp_port, p.tcp_port) for p in peers]) ).fetchall() return await self.db.run(_save_kademlia_peers) diff --git a/lbry/tests/integration/test_dht.py b/lbry/tests/integration/test_dht.py index 3e75acb91..761a3e315 100644 --- a/lbry/tests/integration/test_dht.py +++ b/lbry/tests/integration/test_dht.py @@ -31,7 +31,6 @@ class DHTIntegrationTest(AsyncioTestCase): self.addCleanup(node.stop) node.protocol.rpc_timeout = .5 node.protocol.ping_queue._default_delay = .5 - node._peer_search_timeout = .5 return node async def setup_network(self, size: int, start_port=40000, seed_nodes=1, external_ip='127.0.0.1'): diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index 28bd19bbe..7bb0463e0 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -88,14 +88,14 @@ class TestNodePingQueueDiscover(AsyncioTestCase): n.stop() -class TestTemporarilyLosingConnetction(AsyncioTestCase): +class TestTemporarilyLosingConnection(AsyncioTestCase): async def test_losing_connection(self): - loop = asyncio.get_event_loop() + loop = self.loop loop.set_debug(False) peer_addresses = [ - (f'127.0.0.1', 40000+i) for i in range(10) + ('127.0.0.1', 40000+i) for i in range(10) ] node_ids = [constants.generate_id(i) for i in range(10)] @@ -112,17 +112,20 @@ class TestTemporarilyLosingConnetction(AsyncioTestCase): with dht_mocks.mock_network_loop(loop, dht_network): for i, n in enumerate(nodes): await n._storage.open() + self.addCleanup(n.stop) n.start(peer_addresses[i][0], peer_addresses[:num_seeds]) + await asyncio.gather(*[n.joined.wait() for n in nodes]) node = nodes[-1] advance = dht_mocks.get_time_accelerator(loop, loop.time()) - await advance(1000) + await advance(500) # Join the network, assert that at least the known peers are in RT self.assertTrue(node.joined.is_set()) self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds) # Refresh, so that the peers are persisted + self.assertFalse(len(await node._storage.get_persisted_kademlia_peers()) > num_seeds) await advance(4000) self.assertTrue(len(await node._storage.get_persisted_kademlia_peers()) > num_seeds) @@ -133,14 +136,14 @@ class TestTemporarilyLosingConnetction(AsyncioTestCase): # The peers are cleared on refresh from RT and storage await advance(4000) - self.assertFalse(node.protocol.routing_table.get_peers()) - self.assertFalse(await node._storage.get_persisted_kademlia_peers()) + self.assertListEqual([], node.protocol.routing_table.get_peers()) + self.assertListEqual([], await node._storage.get_persisted_kademlia_peers()) # Reconnect some of the previously stored - node shouldn't connect for peer_address, protocol in zip(peer_addresses[num_seeds+1:-2], popped_protocols[num_seeds+1:-2]): dht_network[peer_address] = protocol await advance(1000) - self.assertEqual(0, len(node.protocol.routing_table.get_peers())) + self.assertListEqual([], node.protocol.routing_table.get_peers()) # Reconnect some of the seed nodes for peer_address, protocol in zip(peer_addresses[:num_seeds], popped_protocols[:num_seeds]): From d7fe46dbde09df3714dce7813effd379024222f7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 22 Nov 2019 01:19:22 -0500 Subject: [PATCH 06/10] fix drop/reconnect in test_losing_connection --- lbry/tests/unit/dht/test_node.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index 7bb0463e0..c26b81045 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -130,24 +130,15 @@ class TestTemporarilyLosingConnection(AsyncioTestCase): self.assertTrue(len(await node._storage.get_persisted_kademlia_peers()) > num_seeds) # We lost internet connection - all the peers stop responding - popped_protocols = [] - for peer_address in peer_addresses[:-1]: - popped_protocols.append(dht_network.pop(peer_address)) + dht_network.pop((node.protocol.external_ip, node.protocol.udp_port)) # The peers are cleared on refresh from RT and storage await advance(4000) self.assertListEqual([], node.protocol.routing_table.get_peers()) self.assertListEqual([], await node._storage.get_persisted_kademlia_peers()) - # Reconnect some of the previously stored - node shouldn't connect - for peer_address, protocol in zip(peer_addresses[num_seeds+1:-2], popped_protocols[num_seeds+1:-2]): - dht_network[peer_address] = protocol - await advance(1000) - self.assertListEqual([], node.protocol.routing_table.get_peers()) - - # Reconnect some of the seed nodes - for peer_address, protocol in zip(peer_addresses[:num_seeds], popped_protocols[:num_seeds]): - dht_network[peer_address] = protocol + # Reconnect + dht_network[(node.protocol.external_ip, node.protocol.udp_port)] = node.protocol # Check that node reconnects at least to them await advance(1000) From c832f8ffbbcbec7b592c3203ff3da495505a36ba Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 22 Nov 2019 01:20:15 -0500 Subject: [PATCH 07/10] fix mock_network_loop param --- lbry/tests/dht_mocks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/tests/dht_mocks.py b/lbry/tests/dht_mocks.py index c725ec32e..2e01986c0 100644 --- a/lbry/tests/dht_mocks.py +++ b/lbry/tests/dht_mocks.py @@ -50,7 +50,7 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop, @contextlib.contextmanager def mock_network_loop(loop: asyncio.AbstractEventLoop, dht_network: typing.Optional[typing.Dict[typing.Tuple[str, int], 'KademliaProtocol']] = None): - dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = dht_network or {} + dht_network: typing.Dict[typing.Tuple[str, int], 'KademliaProtocol'] = dht_network if dht_network is not None else {} async def create_datagram_endpoint(proto_lam: typing.Callable[[], 'KademliaProtocol'], from_addr: typing.Tuple[str, int]): From 6bff298d1efd3e42e6b07485332caa747b9ec7cc Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Sat, 23 Nov 2019 20:51:03 +0100 Subject: [PATCH 08/10] Add migrator for the new `peer` table --- lbry/lbry/extras/daemon/Components.py | 2 +- .../lbry/extras/daemon/migrator/dbmigrator.py | 2 ++ .../extras/daemon/migrator/migrate13to14.py | 21 +++++++++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) create mode 100644 lbry/lbry/extras/daemon/migrator/migrate13to14.py diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index 98b1d5250..66d9acd9b 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -50,7 +50,7 @@ class DatabaseComponent(Component): @staticmethod def get_current_db_revision(): - return 13 + return 14 @property def revision_filename(self): diff --git a/lbry/lbry/extras/daemon/migrator/dbmigrator.py b/lbry/lbry/extras/daemon/migrator/dbmigrator.py index 99a1bb2b4..726cc1974 100644 --- a/lbry/lbry/extras/daemon/migrator/dbmigrator.py +++ b/lbry/lbry/extras/daemon/migrator/dbmigrator.py @@ -33,6 +33,8 @@ def migrate_db(conf, start, end): from .migrate11to12 import do_migration elif current == 12: from .migrate12to13 import do_migration + elif current == 13: + from .migrate13to14 import do_migration else: raise Exception(f"DB migration of version {current} to {current+1} is not available") try: diff --git a/lbry/lbry/extras/daemon/migrator/migrate13to14.py b/lbry/lbry/extras/daemon/migrator/migrate13to14.py new file mode 100644 index 000000000..5cbd6d3fa --- /dev/null +++ b/lbry/lbry/extras/daemon/migrator/migrate13to14.py @@ -0,0 +1,21 @@ +import os +import sqlite3 + + +def do_migration(conf): + db_path = os.path.join(conf.data_dir, "lbrynet.sqlite") + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + + cursor.executescript(""" + create table if not exists peer ( + node_id char(96) not null primary key, + address text not null, + udp_port integer not null, + tcp_port integer, + unique (address, udp_port) + ); + """) + + connection.commit() + connection.close() From 36101db500ee129aced8b93c6d89aed6fd936f1b Mon Sep 17 00:00:00 2001 From: Miroslav Kovar Date: Sun, 24 Nov 2019 14:20:29 +0100 Subject: [PATCH 09/10] Wait for routing table to clear with timeout --- lbry/tests/unit/dht/test_node.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index c26b81045..a24bf1155 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -91,6 +91,15 @@ class TestNodePingQueueDiscover(AsyncioTestCase): class TestTemporarilyLosingConnection(AsyncioTestCase): async def test_losing_connection(self): + async def wait_for(check_ok, insist, timeout=20): + start = loop.time() + while loop.time() - start < timeout: + if check_ok(): + break + await asyncio.sleep(0) + else: + insist() + loop = self.loop loop.set_debug(False) @@ -134,12 +143,18 @@ class TestTemporarilyLosingConnection(AsyncioTestCase): # The peers are cleared on refresh from RT and storage await advance(4000) - self.assertListEqual([], node.protocol.routing_table.get_peers()) self.assertListEqual([], await node._storage.get_persisted_kademlia_peers()) + await wait_for( + lambda: len(node.protocol.routing_table.get_peers()) == 0, + lambda: self.assertListEqual(node.protocol.routing_table.get_peers(), []) + ) # Reconnect dht_network[(node.protocol.external_ip, node.protocol.udp_port)] = node.protocol # Check that node reconnects at least to them await advance(1000) - self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds) + await wait_for( + lambda: len(node.protocol.routing_table.get_peers()) >= num_seeds, + lambda: self.assertTrue(len(node.protocol.routing_table.get_peers()) >= num_seeds) + ) From 66a4c98bee1603c0edd0d58512f6b1b91d0cd7f7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 3 Dec 2019 13:11:34 -0500 Subject: [PATCH 10/10] update test --- lbry/tests/unit/dht/test_node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/tests/unit/dht/test_node.py b/lbry/tests/unit/dht/test_node.py index a24bf1155..9872666f8 100644 --- a/lbry/tests/unit/dht/test_node.py +++ b/lbry/tests/unit/dht/test_node.py @@ -104,7 +104,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase): loop.set_debug(False) peer_addresses = [ - ('127.0.0.1', 40000+i) for i in range(10) + ('1.2.3.4', 40000+i) for i in range(10) ] node_ids = [constants.generate_id(i) for i in range(10)]