forked from LBRYCommunity/lbry-sdk
Rename, fix tests / deduplicate set_joined, resolve conditionally
This commit is contained in:
parent
ca8f59a643
commit
c321758afd
4 changed files with 42 additions and 49 deletions
|
@ -53,7 +53,7 @@ class Node:
|
||||||
node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i))
|
node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i))
|
||||||
|
|
||||||
if self._storage:
|
if self._storage:
|
||||||
await self._storage.update_peers(self.protocol.routing_table.get_peers())
|
await self._storage.save_kademlia_peers(self.protocol.routing_table.get_peers())
|
||||||
|
|
||||||
if self.protocol.routing_table.get_peers():
|
if self.protocol.routing_table.get_peers():
|
||||||
# if we have node ids to look up, perform the iterative search until we have k results
|
# if we have node ids to look up, perform the iterative search until we have k results
|
||||||
|
@ -136,42 +136,33 @@ class Node:
|
||||||
peer_addresses.append((node_id, address, udp_port, tcp_port))
|
peer_addresses.append((node_id, address, udp_port, tcp_port))
|
||||||
return [make_kademlia_peer(*peer_address) for peer_address in peer_addresses]
|
return [make_kademlia_peer(*peer_address) for peer_address in peer_addresses]
|
||||||
|
|
||||||
def set_joined():
|
|
||||||
self.joined.set()
|
|
||||||
log.info(
|
|
||||||
"joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()),
|
|
||||||
self.protocol.routing_table.buckets_with_contacts()
|
|
||||||
)
|
|
||||||
|
|
||||||
if not self.listening_port:
|
if not self.listening_port:
|
||||||
await self.start_listening(interface)
|
await self.start_listening(interface)
|
||||||
self.protocol.ping_queue.start()
|
self.protocol.ping_queue.start()
|
||||||
self._refresh_task = self.loop.create_task(self.refresh_node())
|
self._refresh_task = self.loop.create_task(self.refresh_node())
|
||||||
|
|
||||||
restored_peers = peers_from_urls(await self._storage.get_peers()) if self._storage else []
|
seed_peers = peers_from_urls(await self._storage.get_persisted_kademlia_peers()) if self._storage else []
|
||||||
|
while True:
|
||||||
fixed_peers = peers_from_urls([
|
|
||||||
(None, await resolve_host(address, udp_port, 'udp'), udp_port, None)
|
|
||||||
for address, udp_port in known_node_urls or []
|
|
||||||
])
|
|
||||||
|
|
||||||
seed_peers = restored_peers or fixed_peers
|
|
||||||
fallback = False
|
|
||||||
while seed_peers:
|
|
||||||
if self.protocol.routing_table.get_peers():
|
if self.protocol.routing_table.get_peers():
|
||||||
if not self.joined.is_set():
|
if not self.joined.is_set():
|
||||||
set_joined()
|
self.joined.set()
|
||||||
|
log.info(
|
||||||
|
"joined dht, %i peers known in %i buckets", len(self.protocol.routing_table.get_peers()),
|
||||||
|
self.protocol.routing_table.buckets_with_contacts()
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
if self.joined.is_set():
|
if self.joined.is_set():
|
||||||
self.joined.clear()
|
self.joined.clear()
|
||||||
seed_peers = fixed_peers if fallback else seed_peers
|
|
||||||
self.protocol.peer_manager.reset()
|
self.protocol.peer_manager.reset()
|
||||||
self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0)
|
self.protocol.ping_queue.enqueue_maybe_ping(*seed_peers, delay=0.0)
|
||||||
seed_peers.extend(await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32))
|
seed_peers.extend(await self.peer_search(self.protocol.node_id, shortlist=seed_peers, count=32))
|
||||||
fallback = not self.protocol.routing_table.get_peers()
|
if not seed_peers or not self.protocol.routing_table.get_peers():
|
||||||
await asyncio.sleep(1, loop=self.loop)
|
seed_peers.extend(peers_from_urls([
|
||||||
|
(None, await resolve_host(address, udp_port, 'udp'), udp_port, None)
|
||||||
|
for address, udp_port in known_node_urls or []
|
||||||
|
]))
|
||||||
|
|
||||||
set_joined()
|
await asyncio.sleep(1, loop=self.loop)
|
||||||
|
|
||||||
def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
|
def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typing.Tuple[str, int]]] = None):
|
||||||
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
|
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
|
||||||
|
|
|
@ -331,11 +331,11 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
);
|
);
|
||||||
|
|
||||||
create table if not exists peer (
|
create table if not exists peer (
|
||||||
|
node_id char(96) not null primary key,
|
||||||
address text not null,
|
address text not null,
|
||||||
udp_port integer not null,
|
udp_port integer not null,
|
||||||
tcp_port integer,
|
tcp_port integer,
|
||||||
node_id char(96) unique not null,
|
unique (address, udp_port)
|
||||||
primary key (address, udp_port)
|
|
||||||
);
|
);
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@ -815,12 +815,12 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
)
|
)
|
||||||
|
|
||||||
# # # # # # # # # # dht functions # # # # # # # # # # #
|
# # # # # # # # # # dht functions # # # # # # # # # # #
|
||||||
async def get_peers(self):
|
async def get_persisted_kademlia_peers(self) -> typing.List[typing.Tuple[bytes, str, int, int]]:
|
||||||
query = 'select node_id, address, udp_port, tcp_port from peer'
|
query = 'select node_id, address, udp_port, tcp_port from peer'
|
||||||
return [(binascii.unhexlify(n), a, u, t) for n, a, u, t in await self.db.execute_fetchall(query)]
|
return [(binascii.unhexlify(n), a, u, t) for n, a, u, t in await self.db.execute_fetchall(query)]
|
||||||
|
|
||||||
async def update_peers(self, peers: typing.List['KademliaPeer']):
|
async def save_kademlia_peers(self, peers: typing.List['KademliaPeer']):
|
||||||
def _update_peers(transaction: sqlite3.Connection):
|
def _save_kademlia_peers(transaction: sqlite3.Connection):
|
||||||
transaction.execute('delete from peer').fetchall()
|
transaction.execute('delete from peer').fetchall()
|
||||||
transaction.executemany(
|
transaction.executemany(
|
||||||
'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)', (
|
'insert into peer(node_id, address, udp_port, tcp_port) values (?, ?, ?, ?)', (
|
||||||
|
@ -829,4 +829,4 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
return await self.db.run(_update_peers)
|
return await self.db.run(_save_kademlia_peers)
|
||||||
|
|
|
@ -44,10 +44,10 @@ class DHTIntegrationTest(AsyncioTestCase):
|
||||||
|
|
||||||
for node in self.nodes:
|
for node in self.nodes:
|
||||||
node.start(external_ip, self.known_node_addresses[:seed_nodes])
|
node.start(external_ip, self.known_node_addresses[:seed_nodes])
|
||||||
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
|
||||||
|
|
||||||
async def test_replace_bad_nodes(self):
|
async def test_replace_bad_nodes(self):
|
||||||
await self.setup_network(20)
|
await self.setup_network(20)
|
||||||
|
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
||||||
self.assertEqual(len(self.nodes), 20)
|
self.assertEqual(len(self.nodes), 20)
|
||||||
node = self.nodes[0]
|
node = self.nodes[0]
|
||||||
bad_peers = []
|
bad_peers = []
|
||||||
|
@ -68,6 +68,7 @@ class DHTIntegrationTest(AsyncioTestCase):
|
||||||
|
|
||||||
async def test_re_join(self):
|
async def test_re_join(self):
|
||||||
await self.setup_network(20, seed_nodes=10)
|
await self.setup_network(20, seed_nodes=10)
|
||||||
|
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
||||||
node = self.nodes[-1]
|
node = self.nodes[-1]
|
||||||
self.assertTrue(node.joined.is_set())
|
self.assertTrue(node.joined.is_set())
|
||||||
self.assertTrue(node.protocol.routing_table.get_peers())
|
self.assertTrue(node.protocol.routing_table.get_peers())
|
||||||
|
@ -95,6 +96,7 @@ class DHTIntegrationTest(AsyncioTestCase):
|
||||||
|
|
||||||
async def test_get_token_on_announce(self):
|
async def test_get_token_on_announce(self):
|
||||||
await self.setup_network(2, seed_nodes=2)
|
await self.setup_network(2, seed_nodes=2)
|
||||||
|
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
||||||
node1, node2 = self.nodes
|
node1, node2 = self.nodes
|
||||||
node1.protocol.peer_manager.clear_token(node2.protocol.node_id)
|
node1.protocol.peer_manager.clear_token(node2.protocol.node_id)
|
||||||
blob_hash = hexlify(constants.generate_id(1337)).decode()
|
blob_hash = hexlify(constants.generate_id(1337)).decode()
|
||||||
|
@ -112,6 +114,7 @@ class DHTIntegrationTest(AsyncioTestCase):
|
||||||
# imagine that you only got bad peers and refresh will happen in one hour
|
# imagine that you only got bad peers and refresh will happen in one hour
|
||||||
# instead of failing for one hour we should be able to recover by scheduling pings to bad peers we find
|
# instead of failing for one hour we should be able to recover by scheduling pings to bad peers we find
|
||||||
await self.setup_network(2, seed_nodes=2)
|
await self.setup_network(2, seed_nodes=2)
|
||||||
|
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
||||||
node1, node2 = self.nodes
|
node1, node2 = self.nodes
|
||||||
node2.stop()
|
node2.stop()
|
||||||
# forcefully make it a bad peer but don't remove it from routing table
|
# forcefully make it a bad peer but don't remove it from routing table
|
||||||
|
@ -129,38 +132,36 @@ class DHTIntegrationTest(AsyncioTestCase):
|
||||||
self.assertFalse(node1.protocol.routing_table.get_peers())
|
self.assertFalse(node1.protocol.routing_table.get_peers())
|
||||||
|
|
||||||
async def test_peer_persistance(self):
|
async def test_peer_persistance(self):
|
||||||
num_peers = 5
|
num_nodes = 6
|
||||||
start_port = 40000
|
start_port = 40000
|
||||||
|
num_seeds = 2
|
||||||
external_ip = '127.0.0.1'
|
external_ip = '127.0.0.1'
|
||||||
|
|
||||||
# Start a node
|
# Start a node
|
||||||
node1 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers)
|
await self.setup_network(num_nodes, start_port=start_port, seed_nodes=num_seeds)
|
||||||
node1.start(external_ip)
|
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
||||||
|
|
||||||
# Add peers
|
node1 = self.nodes[-1]
|
||||||
peer_args = [(n.protocol.nodeid, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes]
|
peer_args = [(n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port, n.protocol.peer_port) for n in
|
||||||
|
self.nodes[:num_seeds]]
|
||||||
peers = [make_kademlia_peer(*args) for args in peer_args]
|
peers = [make_kademlia_peer(*args) for args in peer_args]
|
||||||
for peer in peers:
|
|
||||||
await node1.protocol._add_peer(peer)
|
|
||||||
|
|
||||||
await asyncio.sleep(.3)
|
# node1 is bootstrapped from the fixed seeds
|
||||||
self.assertTrue(node1.joined.is_set())
|
|
||||||
self.assertCountEqual(peers, node1.protocol.routing_table.get_peers())
|
self.assertCountEqual(peers, node1.protocol.routing_table.get_peers())
|
||||||
|
|
||||||
# Refresh and assert that the peers were persisted
|
# Refresh and assert that the peers were persisted
|
||||||
await node1.refresh_node(True)
|
await node1.refresh_node(True)
|
||||||
self.assertCountEqual(peer_args, await node1._storage.get_peers())
|
self.assertEqual(len(peer_args), len(await node1._storage.get_persisted_kademlia_peers()))
|
||||||
node1.stop()
|
node1.stop()
|
||||||
|
|
||||||
# Start a fresh node with the same node_id and storage
|
# Start a fresh node with the same node_id and storage, but no known peers
|
||||||
node2 = await self.create_node(constants.generate_id(num_peers), start_port+num_peers+1)
|
node2 = await self.create_node(constants.generate_id(num_nodes-1), start_port+num_nodes-1)
|
||||||
node2._storage = node1._storage
|
node2._storage = node1._storage
|
||||||
node2.start(external_ip)
|
node2.start(external_ip, [])
|
||||||
|
await node2.joined.wait()
|
||||||
|
|
||||||
# The peers are restored
|
# The peers are restored
|
||||||
await asyncio.sleep(.3)
|
self.assertEqual(num_seeds, len(node2.protocol.routing_table.get_peers()))
|
||||||
self.assertTrue(node2.joined.is_set())
|
|
||||||
self.assertCountEqual(peers, node2.protocol.routing_table.get_peers())
|
|
||||||
for bucket1, bucket2 in zip(node1.protocol.routing_table.buckets, node2.protocol.routing_table.buckets):
|
for bucket1, bucket2 in zip(node1.protocol.routing_table.buckets, node2.protocol.routing_table.buckets):
|
||||||
self.assertEqual((bucket1.range_min, bucket1.range_max), (bucket2.range_min, bucket2.range_max))
|
self.assertEqual((bucket1.range_min, bucket1.range_max), (bucket2.range_min, bucket2.range_max))
|
||||||
|
|
||||||
|
@ -170,6 +171,7 @@ class DHTIntegrationTest(AsyncioTestCase):
|
||||||
external_ip = '127.0.0.1'
|
external_ip = '127.0.0.1'
|
||||||
|
|
||||||
await self.setup_network(num_peers, seed_nodes=num_peers // 2, start_port=start_port)
|
await self.setup_network(num_peers, seed_nodes=num_peers // 2, start_port=start_port)
|
||||||
|
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
||||||
peer_args = [
|
peer_args = [
|
||||||
(n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes
|
(n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port) for n in self.nodes
|
||||||
]
|
]
|
||||||
|
@ -180,7 +182,7 @@ class DHTIntegrationTest(AsyncioTestCase):
|
||||||
|
|
||||||
# Create node with the persisted nodes in storage
|
# Create node with the persisted nodes in storage
|
||||||
node = await self.create_node(constants.generate_id(num_peers), start_port+num_peers)
|
node = await self.create_node(constants.generate_id(num_peers), start_port+num_peers)
|
||||||
await node._storage.update_peers(persisted_peers)
|
await node._storage.save_kademlia_peers(persisted_peers)
|
||||||
|
|
||||||
# Stop known peers so they stop replying and won't be added
|
# Stop known peers so they stop replying and won't be added
|
||||||
for n in known_nodes:
|
for n in known_nodes:
|
||||||
|
|
|
@ -256,6 +256,6 @@ class UpdatePeersTest(StorageTest):
|
||||||
node_id = hashlib.sha384("1234".encode()).digest()
|
node_id = hashlib.sha384("1234".encode()).digest()
|
||||||
args = (node_id, '73.186.148.72', 4444, None)
|
args = (node_id, '73.186.148.72', 4444, None)
|
||||||
fake_peer = make_kademlia_peer(*args)
|
fake_peer = make_kademlia_peer(*args)
|
||||||
await self.storage.update_peers([fake_peer])
|
await self.storage.save_kademlia_peers([fake_peer])
|
||||||
peers = await self.storage.get_peers()
|
peers = await self.storage.get_persisted_kademlia_peers()
|
||||||
self.assertTupleEqual(args, peers[0])
|
self.assertTupleEqual(args, peers[0])
|
||||||
|
|
Loading…
Reference in a new issue