add integration tests, fix bug that made refresh not exclude bad peers
This commit is contained in:
parent
e675f1387c
commit
b7d76fd09f
4 changed files with 72 additions and 9 deletions
|
@ -28,7 +28,7 @@ class Node:
|
|||
self._join_task: asyncio.Task = None
|
||||
self._refresh_task: asyncio.Task = None
|
||||
|
||||
async def refresh_node(self):
|
||||
async def refresh_node(self, force_once=False):
|
||||
while True:
|
||||
# remove peers with expired blob announcements from the datastore
|
||||
self.protocol.data_store.removed_expired_peers()
|
||||
|
@ -55,6 +55,8 @@ class Node:
|
|||
peers = await self.peer_search(node_ids.pop())
|
||||
total_peers.extend(peers)
|
||||
else:
|
||||
if force_once:
|
||||
break
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
self.loop.call_later(constants.refresh_interval // 4, fut.set_result, None)
|
||||
await fut
|
||||
|
@ -64,6 +66,8 @@ class Node:
|
|||
to_ping = [peer for peer in set(total_peers) if self.protocol.peer_manager.peer_is_good(peer) is not True]
|
||||
if to_ping:
|
||||
self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0)
|
||||
if force_once:
|
||||
break
|
||||
|
||||
fut = asyncio.Future(loop=self.loop)
|
||||
self.loop.call_later(constants.refresh_interval, fut.set_result, None)
|
||||
|
|
|
@ -31,7 +31,6 @@ class PeerManager:
|
|||
self._node_id_mapping: typing.Dict[typing.Tuple[str, int], bytes] = {}
|
||||
self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = {}
|
||||
self._node_tokens: typing.Dict[bytes, (float, bytes)] = {}
|
||||
self._kademlia_peers: typing.Dict[typing.Tuple[bytes, str, int], 'KademliaPeer']
|
||||
|
||||
def report_failure(self, address: str, udp_port: int):
|
||||
now = self._loop.time()
|
||||
|
@ -104,11 +103,12 @@ class PeerManager:
|
|||
|
||||
delay = self._loop.time() - constants.check_refresh_interval
|
||||
|
||||
if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping:
|
||||
return
|
||||
addr_tup = (address, udp_port)
|
||||
if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id:
|
||||
return
|
||||
# fixme: find a way to re-enable that without breaking other parts
|
||||
#if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping:
|
||||
# return
|
||||
#addr_tup = (address, udp_port)
|
||||
#if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id:
|
||||
# return
|
||||
previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None))
|
||||
last_requested = self._last_requested.get((address, udp_port))
|
||||
last_replied = self._last_replied.get((address, udp_port))
|
||||
|
|
|
@ -191,12 +191,14 @@ class PingQueue:
|
|||
self._process_task: asyncio.Task = None
|
||||
self._running = False
|
||||
self._running_pings: typing.Set[asyncio.Task] = set()
|
||||
self._default_delay = constants.maybe_ping_delay
|
||||
|
||||
@property
|
||||
def running(self):
|
||||
return self._running
|
||||
|
||||
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: float = constants.maybe_ping_delay):
|
||||
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
|
||||
delay = delay if delay is not None else self._default_delay
|
||||
now = self._loop.time()
|
||||
for peer in peers:
|
||||
if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]:
|
||||
|
@ -390,7 +392,7 @@ class KademliaProtocol(DatagramProtocol):
|
|||
while self._to_add:
|
||||
async with self._split_lock:
|
||||
await self._add_peer(self._to_add.pop())
|
||||
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(0.2))
|
||||
await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1))
|
||||
self._wakeup_routing_task.clear()
|
||||
|
||||
def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
|
||||
|
|
57
tests/integration/test_dht.py
Normal file
57
tests/integration/test_dht.py
Normal file
|
@ -0,0 +1,57 @@
|
|||
import asyncio
|
||||
|
||||
from lbrynet.dht import constants
|
||||
from lbrynet.dht.node import Node
|
||||
from lbrynet.dht.peer import PeerManager, KademliaPeer
|
||||
from torba.testcase import AsyncioTestCase
|
||||
|
||||
|
||||
class CLIIntegrationTest(AsyncioTestCase):
|
||||
|
||||
async def asyncSetUp(self):
|
||||
import logging
|
||||
logging.getLogger('asyncio').setLevel(logging.ERROR)
|
||||
logging.getLogger('lbrynet.dht').setLevel(logging.DEBUG)
|
||||
self.nodes = []
|
||||
self.known_node_addresses = []
|
||||
|
||||
async def setup_network(self, size: int, start_port=40000):
|
||||
for i in range(size):
|
||||
node_port = start_port + i
|
||||
node = Node(self.loop, PeerManager(self.loop), node_id=constants.generate_id(i),
|
||||
udp_port=node_port, internal_udp_port=node_port,
|
||||
peer_port=3333, external_ip='127.0.0.1')
|
||||
self.nodes.append(node)
|
||||
self.known_node_addresses.append(('127.0.0.1', node_port))
|
||||
await node.start_listening('127.0.0.1')
|
||||
for node in self.nodes:
|
||||
node.protocol.rpc_timeout = .2
|
||||
node.protocol.ping_queue._default_delay = .5
|
||||
node.start('127.0.0.1', self.known_node_addresses[:1])
|
||||
await asyncio.gather(*[node.joined.wait() for node in self.nodes])
|
||||
|
||||
async def asyncTearDown(self):
|
||||
for node in self.nodes:
|
||||
node.stop()
|
||||
|
||||
async def test_replace_bad_nodes(self):
|
||||
await self.setup_network(20)
|
||||
self.assertEquals(len(self.nodes), 20)
|
||||
node = self.nodes[0]
|
||||
bad_peers = []
|
||||
for candidate in self.nodes[1:10]:
|
||||
address, port, node_id = candidate.protocol.external_ip, candidate.protocol.udp_port, candidate.protocol.node_id
|
||||
peer = KademliaPeer(self.loop, address, node_id, port)
|
||||
bad_peers.append(peer)
|
||||
node.protocol.add_peer(peer)
|
||||
candidate.stop()
|
||||
await asyncio.sleep(.3) # let pending events settle
|
||||
for bad_peer in bad_peers:
|
||||
self.assertIn(bad_peer, node.protocol.routing_table.get_peers())
|
||||
await node.refresh_node(True)
|
||||
await asyncio.sleep(.3) # let pending events settle
|
||||
good_nodes = {good_node.protocol.node_id for good_node in self.nodes[10:]}
|
||||
for peer in node.protocol.routing_table.get_peers():
|
||||
self.assertIn(peer.node_id, good_nodes)
|
||||
|
||||
|
Loading…
Reference in a new issue