diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 4db51a55a..10af7d1c2 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -101,6 +101,7 @@ class Node: self._refresh_task.cancel() if self.protocol and self.protocol.ping_queue.running: self.protocol.ping_queue.stop() + self.protocol.stop() if self.listening_port is not None: self.listening_port.close() self._join_task = None @@ -113,6 +114,7 @@ class Node: lambda: self.protocol, (interface, self.internal_udp_port) ) log.info("DHT node listening on UDP %s:%i", interface, self.internal_udp_port) + self.protocol.start() else: log.warning("Already bound to port %s", self.listening_port) @@ -130,7 +132,8 @@ class Node: if known_node_urls: for host, port in known_node_urls: address = await resolve_host(host, port, proto='udp') - if (address, port) not in known_node_addresses and address != self.protocol.external_ip: + if (address, port) not in known_node_addresses and\ + (address, port) != (self.protocol.external_ip, self.protocol.udp_port): known_node_addresses.append((address, port)) url_to_addr[address] = host diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index f1aab10ad..2c5f85a7f 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -207,7 +207,7 @@ class PingQueue: try: if self._protocol.peer_manager.peer_is_good(peer): if peer not in self._protocol.routing_table.get_peers(): - await self._protocol.add_peer(peer) + self._protocol.add_peer(peer) return await self._protocol.get_rpc_peer(peer).ping() except asyncio.TimeoutError: @@ -268,11 +268,19 @@ class KademliaProtocol(DatagramProtocol): self.node_rpc = KademliaRPC(self, self.loop, self.peer_port) self.rpc_timeout = rpc_timeout self._split_lock = asyncio.Lock(loop=self.loop) + self._to_remove: typing.Set['KademliaPeer'] = set() + self._to_add: typing.Set['KademliaPeer'] = set() + self.maintaing_routing_task: typing.Optional[asyncio.Task] = None def get_rpc_peer(self, peer: 'KademliaPeer') -> RemoteKademliaRPC: return RemoteKademliaRPC(self.loop, self.peer_manager, self, peer) + def start(self, force_delay=None): + self.maintaing_routing_task = asyncio.create_task(self.routing_table_task(force_delay)) + def stop(self): + if self.maintaing_routing_task: + self.maintaing_routing_task.cancel() if self.transport: self.disconnect() @@ -299,6 +307,7 @@ class KademliaProtocol(DatagramProtocol): return args, {} async def _add_peer(self, peer: 'KademliaPeer'): + log.debug("Trying to add %s:%d", peer.address, peer.udp_port) for p in self.routing_table.get_peers(): if (p.address, p.udp_port) == (peer.address, peer.udp_port) and p.node_id != peer.node_id: self.routing_table.remove_peer(p) @@ -363,11 +372,23 @@ class KademliaProtocol(DatagramProtocol): self.routing_table.buckets[bucket_index].remove_peer(to_replace) return await self._add_peer(peer) - async def add_peer(self, peer: 'KademliaPeer') -> bool: + def add_peer(self, peer: 'KademliaPeer') -> bool: if peer.node_id == self.node_id: return False - async with self._split_lock: - return await self._add_peer(peer) + self._to_add.add(peer) + + async def routing_table_task(self, force_delay=None): + while True: + while self._to_remove: + async with self._split_lock: + peer = self._to_remove.pop() + log.debug("Trying to remove %s:%d", peer.address, peer.udp_port) + self.routing_table.remove_peer(peer) + self.routing_table.join_buckets() + while self._to_add: + async with self._split_lock: + await self._add_peer(self._to_add.pop()) + await asyncio.sleep(force_delay or constants.rpc_timeout) async def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(), @@ -416,7 +437,7 @@ class KademliaProtocol(DatagramProtocol): self.ping_queue.enqueue_maybe_ping(peer) # only add a requesting contact to the routing table if it has replied to one of our requests elif is_good is True: - await self.add_peer(peer) + self.add_peer(peer) except ValueError as err: log.debug("error raised handling %s request from %s:%i - %s(%s)", request_datagram.method, peer.address, peer.udp_port, str(type(err)), @@ -459,7 +480,7 @@ class KademliaProtocol(DatagramProtocol): self.peer_manager.update_contact_triple(peer.node_id, address[0], address[1]) if not df.cancelled(): df.set_result(response_datagram) - await self.add_peer(peer) + self.add_peer(peer) else: log.warning("%s:%i replied, but after we cancelled the request attempt", peer.address, peer.udp_port) @@ -531,9 +552,7 @@ class KademliaProtocol(DatagramProtocol): except (asyncio.TimeoutError, RemoteException): self.peer_manager.report_failure(peer.address, peer.udp_port) if self.peer_manager.peer_is_good(peer) is False: - async with self._split_lock: - self.routing_table.remove_peer(peer) - self.routing_table.join_buckets() + self._to_remove.add(peer) raise async def send_response(self, peer: 'KademliaPeer', response: ResponseDatagram): diff --git a/tests/unit/dht/protocol/test_protocol.py b/tests/unit/dht/protocol/test_protocol.py index d5c4f2da8..e183a05bd 100644 --- a/tests/unit/dht/protocol/test_protocol.py +++ b/tests/unit/dht/protocol/test_protocol.py @@ -99,6 +99,7 @@ class TestProtocol(AsyncioTestCase): self.loop, PeerManager(self.loop), node_id, address, udp_port, tcp_port ) await self.loop.create_datagram_endpoint(lambda: proto, (address, 4444)) + proto.start(0.1) return proto, other_peer.peer_manager.get_kademlia_peer(node_id, address, udp_port=udp_port) async def test_add_peer_after_handle_request(self): @@ -112,6 +113,7 @@ class TestProtocol(AsyncioTestCase): self.loop, PeerManager(self.loop), node_id1, '1.2.3.4', 4444, 3333 ) await self.loop.create_datagram_endpoint(lambda: peer1, ('1.2.3.4', 4444)) + peer1.start(0.1) peer2, peer_2_from_peer_1 = await self._make_protocol(peer1, node_id2, '1.2.3.5', 4444, 3333) peer3, peer_3_from_peer_1 = await self._make_protocol(peer1, node_id3, '1.2.3.6', 4444, 3333) @@ -119,6 +121,7 @@ class TestProtocol(AsyncioTestCase): # peers who reply should be added await peer1.get_rpc_peer(peer_2_from_peer_1).ping() + await asyncio.sleep(0.5) self.assertListEqual([peer_2_from_peer_1], peer1.routing_table.get_peers()) peer1.routing_table.remove_peer(peer_2_from_peer_1) @@ -137,6 +140,7 @@ class TestProtocol(AsyncioTestCase): self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) pong = await peer1_from_peer4.ping() self.assertEqual(b'pong', pong) + await asyncio.sleep(0.5) self.assertEqual(1, len(peer1.routing_table.get_peers())) self.assertEqual(0, len(peer1.ping_queue._pending_contacts)) peer1.routing_table.buckets[0].peers.clear() diff --git a/tests/unit/dht/protocol/test_routing_table.py b/tests/unit/dht/protocol/test_routing_table.py index a27b316e2..c5d832783 100644 --- a/tests/unit/dht/protocol/test_routing_table.py +++ b/tests/unit/dht/protocol/test_routing_table.py @@ -57,7 +57,7 @@ class TestRouting(AsyncioTestCase): node.protocol.node_id, node.protocol.external_ip, udp_port=node.protocol.udp_port ) - added = await node_1.protocol.add_peer(peer) + added = await node_1.protocol._add_peer(peer) self.assertEqual(True, added) contact_cnt += 1 @@ -88,7 +88,7 @@ class TestRouting(AsyncioTestCase): # set all of the peers to good (as to not attempt pinging stale ones during split) node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port) node_1.protocol.peer_manager.report_last_replied(peer.address, peer.udp_port) - await node_1.protocol.add_peer(peer) + await node_1.protocol._add_peer(peer) # check that bucket 0 is always the one covering the local node id self.assertEqual(True, node_1.protocol.routing_table.buckets[0].key_in_range(node_1.protocol.node_id)) self.assertEqual(40, len(node_1.protocol.routing_table.get_peers())) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index f1fca0717..840e4ae75 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -21,6 +21,7 @@ class TestBlobAnnouncer(AsyncioTestCase): await self.storage.open() self.peer_manager = PeerManager(self.loop) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) + self.node.protocol.start(0.1) await self.node.start_listening(address) self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) for node_id, address in peer_addresses: @@ -30,9 +31,10 @@ class TestBlobAnnouncer(AsyncioTestCase): async def add_peer(self, node_id, address, add_to_routing_table=True): n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) await n.start_listening(address) + n.protocol.start(0.1) self.nodes.update({len(self.nodes): n}) if add_to_routing_table: - await self.node.protocol.add_peer( + self.node.protocol.add_peer( self.peer_manager.get_kademlia_peer( n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port ) @@ -98,6 +100,7 @@ class TestBlobAnnouncer(AsyncioTestCase): await self.chain_peer(constants.generate_id(12), '1.2.3.12') await self.chain_peer(constants.generate_id(13), '1.2.3.13') await self.chain_peer(constants.generate_id(14), '1.2.3.14') + await self.advance(61.0) last = self.nodes[len(self.nodes) - 1] search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)