diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 955798c8e..b10884644 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -28,7 +28,7 @@ class Node: self._join_task: asyncio.Task = None self._refresh_task: asyncio.Task = None - async def refresh_node(self): + async def refresh_node(self, force_once=False): while True: # remove peers with expired blob announcements from the datastore self.protocol.data_store.removed_expired_peers() @@ -55,6 +55,8 @@ class Node: peers = await self.peer_search(node_ids.pop()) total_peers.extend(peers) else: + if force_once: + break fut = asyncio.Future(loop=self.loop) self.loop.call_later(constants.refresh_interval // 4, fut.set_result, None) await fut @@ -64,6 +66,8 @@ class Node: to_ping = [peer for peer in set(total_peers) if self.protocol.peer_manager.peer_is_good(peer) is not True] if to_ping: self.protocol.ping_queue.enqueue_maybe_ping(*to_ping, delay=0) + if force_once: + break fut = asyncio.Future(loop=self.loop) self.loop.call_later(constants.refresh_interval, fut.set_result, None) diff --git a/lbrynet/dht/peer.py b/lbrynet/dht/peer.py index 23bc29635..8253d1332 100644 --- a/lbrynet/dht/peer.py +++ b/lbrynet/dht/peer.py @@ -31,7 +31,6 @@ class PeerManager: self._node_id_mapping: typing.Dict[typing.Tuple[str, int], bytes] = {} self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = {} self._node_tokens: typing.Dict[bytes, (float, bytes)] = {} - self._kademlia_peers: typing.Dict[typing.Tuple[bytes, str, int], 'KademliaPeer'] def report_failure(self, address: str, udp_port: int): now = self._loop.time() @@ -104,11 +103,12 @@ class PeerManager: delay = self._loop.time() - constants.check_refresh_interval - if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: - return - addr_tup = (address, udp_port) - if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id: - return + # fixme: find a way to re-enable that without breaking other parts + #if node_id not in self._node_id_reverse_mapping or (address, udp_port) not in self._node_id_mapping: + # return + #addr_tup = (address, udp_port) + #if self._node_id_reverse_mapping[node_id] != addr_tup or self._node_id_mapping[addr_tup] != node_id: + # return previous_failure, most_recent_failure = self._rpc_failures.get((address, udp_port), (None, None)) last_requested = self._last_requested.get((address, udp_port)) last_replied = self._last_replied.get((address, udp_port)) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 152965b3f..346c7c8b4 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -191,12 +191,14 @@ class PingQueue: self._process_task: asyncio.Task = None self._running = False self._running_pings: typing.Set[asyncio.Task] = set() + self._default_delay = constants.maybe_ping_delay @property def running(self): return self._running - def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: float = constants.maybe_ping_delay): + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): + delay = delay if delay is not None else self._default_delay now = self._loop.time() for peer in peers: if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]: @@ -390,7 +392,7 @@ class KademliaProtocol(DatagramProtocol): while self._to_add: async with self._split_lock: await self._add_peer(self._to_add.pop()) - await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(0.2)) + await asyncio.gather(self._wakeup_routing_task.wait(), asyncio.sleep(.1)) self._wakeup_routing_task.clear() def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): diff --git a/tests/integration/test_dht.py b/tests/integration/test_dht.py new file mode 100644 index 000000000..8092116cb --- /dev/null +++ b/tests/integration/test_dht.py @@ -0,0 +1,57 @@ +import asyncio + +from lbrynet.dht import constants +from lbrynet.dht.node import Node +from lbrynet.dht.peer import PeerManager, KademliaPeer +from torba.testcase import AsyncioTestCase + + +class CLIIntegrationTest(AsyncioTestCase): + + async def asyncSetUp(self): + import logging + logging.getLogger('asyncio').setLevel(logging.ERROR) + logging.getLogger('lbrynet.dht').setLevel(logging.DEBUG) + self.nodes = [] + self.known_node_addresses = [] + + async def setup_network(self, size: int, start_port=40000): + for i in range(size): + node_port = start_port + i + node = Node(self.loop, PeerManager(self.loop), node_id=constants.generate_id(i), + udp_port=node_port, internal_udp_port=node_port, + peer_port=3333, external_ip='127.0.0.1') + self.nodes.append(node) + self.known_node_addresses.append(('127.0.0.1', node_port)) + await node.start_listening('127.0.0.1') + for node in self.nodes: + node.protocol.rpc_timeout = .2 + node.protocol.ping_queue._default_delay = .5 + node.start('127.0.0.1', self.known_node_addresses[:1]) + await asyncio.gather(*[node.joined.wait() for node in self.nodes]) + + async def asyncTearDown(self): + for node in self.nodes: + node.stop() + + async def test_replace_bad_nodes(self): + await self.setup_network(20) + self.assertEquals(len(self.nodes), 20) + node = self.nodes[0] + bad_peers = [] + for candidate in self.nodes[1:10]: + address, port, node_id = candidate.protocol.external_ip, candidate.protocol.udp_port, candidate.protocol.node_id + peer = KademliaPeer(self.loop, address, node_id, port) + bad_peers.append(peer) + node.protocol.add_peer(peer) + candidate.stop() + await asyncio.sleep(.3) # let pending events settle + for bad_peer in bad_peers: + self.assertIn(bad_peer, node.protocol.routing_table.get_peers()) + await node.refresh_node(True) + await asyncio.sleep(.3) # let pending events settle + good_nodes = {good_node.protocol.node_id for good_node in self.nodes[10:]} + for peer in node.protocol.routing_table.get_peers(): + self.assertIn(peer.node_id, good_nodes) + +