diff --git a/lbrynet/dht/constants.py b/lbrynet/dht/constants.py index e52fdd377..c64720706 100644 --- a/lbrynet/dht/constants.py +++ b/lbrynet/dht/constants.py @@ -16,6 +16,7 @@ refresh_interval = 3600 # 1 hour replicate_interval = refresh_interval data_expiration = 86400 # 24 hours token_secret_refresh_interval = 300 # 5 minutes +maybe_ping_delay = 300 # 5 minutes check_refresh_interval = refresh_interval / 5 max_datagram_size = 8192 # 8 KB rpc_id_length = 20 diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 4e7a212bb..0c95be813 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -187,56 +187,46 @@ class PingQueue: def __init__(self, loop: asyncio.BaseEventLoop, protocol: 'KademliaProtocol'): self._loop = loop self._protocol = protocol - self._enqueued_contacts: typing.List['KademliaPeer'] = [] self._pending_contacts: typing.Dict['KademliaPeer', float] = {} self._process_task: asyncio.Task = None - self._next_task: asyncio.Future = None - self._next_timer: asyncio.TimerHandle = None self._running = False + self._running_pings: typing.List[asyncio.Task] = [] @property def running(self): return self._running - def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): - delay = constants.check_refresh_interval if delay is None else delay + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: float = constants.maybe_ping_delay): + now = self._loop.time() for peer in peers: - if delay and peer not in self._enqueued_contacts: - self._pending_contacts[peer] = self._loop.time() + delay - elif peer not in self._enqueued_contacts: - self._enqueued_contacts.append(peer) - if peer in self._pending_contacts: - del self._pending_contacts[peer] + if peer not in self._pending_contacts or now + delay < self._pending_contacts[peer]: + self._pending_contacts[peer] = delay + now - async def _process(self): - async def _ping(p: 'KademliaPeer'): + def maybe_ping(self, peer: 'KademliaPeer'): + async def ping_task(): try: - if self._protocol.peer_manager.peer_is_good(p): - await self._protocol.add_peer(p) + if self._protocol.peer_manager.peer_is_good(peer): + if peer not in self._protocol.routing_table.get_peers(): + await self._protocol.add_peer(peer) return - await self._protocol.get_rpc_peer(p).ping() + await self._protocol.get_rpc_peer(peer).ping() except asyncio.TimeoutError: pass + task = self._loop.create_task(ping_task()) + task.add_done_callback(lambda _: None if task not in self._running_pings else self._running_pings.remove(task)) + self._running_pings.append(task) + + async def _process(self): # send up to 1 ping per second while True: - tasks = [] - - if self._enqueued_contacts or self._pending_contacts: - now = self._loop.time() - scheduled = [k for k, d in self._pending_contacts.items() if now >= d] - for k in scheduled: - del self._pending_contacts[k] - if k not in self._enqueued_contacts: - self._enqueued_contacts.append(k) - while self._enqueued_contacts: - peer = self._enqueued_contacts.pop() - tasks.append(self._loop.create_task(_ping(peer))) - if tasks: - await asyncio.wait(tasks, loop=self._loop) - - f = self._loop.create_future() - self._loop.call_later(1.0, lambda: None if f.done() else f.set_result(None)) - await f + enqueued = list(self._pending_contacts.keys()) + now = self._loop.time() + for peer in enqueued: + if self._pending_contacts[peer] <= now: + del self._pending_contacts[peer] + self.maybe_ping(peer) + break + await asyncio.sleep(1, loop=self._loop) def start(self): assert not self._running @@ -250,12 +240,8 @@ class PingQueue: if self._process_task: self._process_task.cancel() self._process_task = None - if self._next_task: - self._next_task.cancel() - self._next_task = None - if self._next_timer: - self._next_timer.cancel() - self._next_timer = None + while self._running_pings: + self._running_pings[0].cancel() class KademliaProtocol(DatagramProtocol): @@ -313,9 +299,13 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): + for p in self.routing_table.get_peers(): + if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: + self.routing_table.remove_peer(p) bucket_index = self.routing_table.kbucket_index(peer.node_id) if self.routing_table.buckets[bucket_index].add_peer(peer): return True + # The bucket is full; see if it can be split (by checking if its range includes the host node's node_id) if self.routing_table.should_split(bucket_index, peer.node_id): self.routing_table.split_bucket(bucket_index) @@ -409,12 +399,13 @@ class KademliaProtocol(DatagramProtocol): sender_contact, ResponseDatagram(RESPONSE_TYPE, message.rpc_id, self.node_id, result), ) - async def handle_request_datagram(self, address, request_datagram: RequestDatagram): + async def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram): # This is an RPC method request self.peer_manager.report_last_requested(address[0], address[1]) - self.peer_manager.update_contact_triple(request_datagram.node_id, address[0], address[1]) - # only add a requesting contact to the routing table if it has replied to one of our requests - peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1]) + try: + peer = self.routing_table.get_peer(request_datagram.node_id) + except IndexError: + peer = self.peer_manager.get_kademlia_peer(request_datagram.node_id, address[0], address[1]) try: await self._handle_rpc(peer, request_datagram) # if the contact is not known to be bad (yet) and we haven't yet queried it, send it a ping so that it @@ -422,6 +413,7 @@ class KademliaProtocol(DatagramProtocol): is_good = self.peer_manager.peer_is_good(peer) if is_good is None: self.ping_queue.enqueue_maybe_ping(peer) + # only add a requesting contact to the routing table if it has replied to one of our requests elif is_good is True: await self.add_peer(peer) except ValueError as err: @@ -449,11 +441,10 @@ class KademliaProtocol(DatagramProtocol): peer, df, request = self.sent_messages[response_datagram.rpc_id] if peer.address != address[0]: df.set_exception(RemoteException( - f"response from {address[0]}:{address[1]}, " - f"expected {peer.address}:{peer.udp_port}") + f"response from {address[0]}, expected {peer.address}") ) return - peer.set_id(response_datagram.node_id) + # We got a result from the RPC if peer.node_id == self.node_id: df.set_exception(RemoteException("node has our node id")) @@ -461,6 +452,8 @@ class KademliaProtocol(DatagramProtocol): elif response_datagram.node_id == self.node_id: df.set_exception(RemoteException("incoming message is from our node id")) return + peer.set_id(response_datagram.node_id) + peer.update_udp_port(address[1]) self.peer_manager.report_last_replied(address[0], address[1]) self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) if not df.cancelled(): diff --git a/lbrynet/dht/protocol/routing_table.py b/lbrynet/dht/protocol/routing_table.py index 600abc7bf..b21b86559 100644 --- a/lbrynet/dht/protocol/routing_table.py +++ b/lbrynet/dht/protocol/routing_table.py @@ -49,7 +49,14 @@ class KBucket: self.peers.remove(peer) self.peers.append(peer) return True - elif len(self.peers) < constants.k: + else: + for i in range(len(self.peers)): + p = self.peers[i] + if p.node_id == peer.node_id: + self.peers.remove(p) + self.peers.append(peer) + return True + if len(self.peers) < constants.k: self.peers.append(peer) return True else: diff --git a/tests/unit/dht/protocol/test_protocol.py b/tests/unit/dht/protocol/test_protocol.py index 8a19dce74..d5c4f2da8 100644 --- a/tests/unit/dht/protocol/test_protocol.py +++ b/tests/unit/dht/protocol/test_protocol.py @@ -93,3 +93,65 @@ class TestProtocol(AsyncioTestCase): peer2.stop() peer1.disconnect() peer2.disconnect() + + async def _make_protocol(self, other_peer, node_id, address, udp_port, tcp_port): + proto = KademliaProtocol( + self.loop, PeerManager(self.loop), node_id, address, udp_port, tcp_port + ) + await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444)) + return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port) + + async def test_add_peer_after_handle_request(self): + with dht_mocks.mock_network_loop(self.loop): + node_id1 = constants.generate_id() + node_id2 = constants.generate_id() + node_id3 = constants.generate_id() + node_id4 = constants.generate_id() + + peer1 = KademliaProtocol( + self.loop, PeerManager(self.loop), node_id1, '1.2.3.4', 4444, 3333 + ) + await self.loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) + + peer2, peer_2_from_peer_1 = await self._make_protocol(peer1, node_id2, '1.2.3.5', 4444, 3333) + peer3, peer_3_from_peer_1 = await self._make_protocol(peer1, node_id3, '1.2.3.6', 4444, 3333) + peer4, peer_4_from_peer_1 = await self._make_protocol(peer1, node_id4, '1.2.3.7', 4444, 3333) + + # peers who reply should be added + await peer1.get_rpc_peer(peer_2_from_peer_1).ping() + self.assertListEqual([peer_2_from_peer_1], peer1.routing_table.get_peers()) + peer1.routing_table.remove_peer(peer_2_from_peer_1) + + # peers not known by be good/bad should be enqueued to maybe-ping + peer1_from_peer3 = peer3.get_rpc_peer(peer3.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) + pong = await peer1_from_peer3.ping() + self.assertEqual(b'pong', pong) + self.assertEqual(1, len(peer1.ping_queue._pending_contacts)) + peer1.ping_queue._pending_contacts.clear() + + # peers who are already good should be added + peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) + peer1.peer_manager.report_last_replied('1.2.3.7', 4444) + self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) + pong = await peer1_from_peer4.ping() + self.assertEqual(b'pong', pong) + self.assertEqual(1, len(peer1.routing_table.get_peers())) + self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) + peer1.routing_table.buckets[0].peers.clear() + + # peers who are known to be bad recently should not be added or maybe-pinged + peer1_from_peer4 = peer4.get_rpc_peer(peer4.peer_manager.get_kademlia_peer(node_id1, '1.2.3.4', 4444)) + peer1.peer_manager.update_contact_triple(node_id4,'1.2.3.7', 4444) + peer1.peer_manager.report_failure('1.2.3.7', 4444) + peer1.peer_manager.report_failure('1.2.3.7', 4444) + self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) + pong = await peer1_from_peer4.ping() + self.assertEqual(b'pong', pong) + self.assertEqual(0, len(peer1.routing_table.get_peers())) + self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) + + for p in [peer1, peer2, peer3, peer4]: + p.stop() + p.disconnect() diff --git a/tests/unit/dht/routing/test_kbucket.py b/tests/unit/dht/routing/test_kbucket.py index 88644132c..9f1fda60a 100644 --- a/tests/unit/dht/routing/test_kbucket.py +++ b/tests/unit/dht/routing/test_kbucket.py @@ -2,7 +2,7 @@ import struct import asyncio from lbrynet.utils import generate_id from lbrynet.dht.protocol.routing_table import KBucket -from lbrynet.dht.peer import PeerManager +from lbrynet.dht.peer import PeerManager, KademliaPeer from lbrynet.dht import constants from torba.testcase import AsyncioTestCase @@ -29,6 +29,33 @@ class TestKBucket(AsyncioTestCase): self.kbucket = KBucket(self.peer_manager, 0, 2**constants.hash_bits, generate_id()) def test_add_peer(self): + peer = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4444) + peer_update2 = KademliaPeer(None, '1.2.3.4', constants.generate_id(2), udp_port=4445) + + self.assertListEqual([], self.kbucket.peers) + + # add the peer + self.kbucket.add_peer(peer) + self.assertListEqual([peer], self.kbucket.peers) + + # re-add it + self.kbucket.add_peer(peer) + self.assertListEqual([peer], self.kbucket.peers) + self.assertEqual(self.kbucket.peers[0].udp_port, 4444) + + # add a new peer object with the same id and address but a different port + self.kbucket.add_peer(peer_update2) + self.assertListEqual([peer_update2], self.kbucket.peers) + self.assertEqual(self.kbucket.peers[0].udp_port, 4445) + + # modify the peer object to have a different port + peer_update2.udp_port = 4444 + self.kbucket.add_peer(peer_update2) + self.assertListEqual([peer_update2], self.kbucket.peers) + self.assertEqual(self.kbucket.peers[0].udp_port, 4444) + + self.kbucket.peers.clear() + # Test if contacts can be added to empty list # Add k contacts to bucket for i in range(constants.k):