diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 47cbff39e..74270c404 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -30,12 +30,12 @@ class Node: ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int, internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT, - split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False, storage: typing.Optional['SQLiteStorage'] = None): self.loop = loop self.internal_udp_port = internal_udp_port self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout, - split_buckets_under_index) + split_buckets_under_index, is_bootstrap_node) self.listening_port: asyncio.DatagramTransport = None self.joined = asyncio.Event(loop=self.loop) self._join_task: asyncio.Task = None diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 813fc5813..89563c89b 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -218,6 +218,10 @@ class PingQueue: def running(self): return self._running + @property + def busy(self): + return self._running and (any(self._running_pings) or any(self._pending_contacts)) + def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None): delay = delay if delay is not None else self._default_delay now = self._loop.time() @@ -294,7 +298,7 @@ class KademliaProtocol(DatagramProtocol): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str, udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT, - split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_boostrap_node: bool = False): self.peer_manager = peer_manager self.loop = loop self.node_id = node_id @@ -309,7 +313,8 @@ class KademliaProtocol(DatagramProtocol): self.transport: DatagramTransport = None self.old_token_secret = constants.generate_id() self.token_secret = constants.generate_id() - self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id, split_buckets_under_index) + self.routing_table = TreeRoutingTable( + self.loop, self.peer_manager, self.node_id, split_buckets_under_index, is_bootstrap_node=is_boostrap_node) self.data_store = DictDataStore(self.loop, self.peer_manager) self.ping_queue = PingQueue(self.loop, self) self.node_rpc = KademliaRPC(self, self.loop, self.peer_port) diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py index 73c18ac47..b2e927b57 100644 --- a/lbry/dht/protocol/routing_table.py +++ b/lbry/dht/protocol/routing_table.py @@ -28,7 +28,8 @@ class KBucket: namespace="dht_node", labelnames=("amount",) ) - def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes): + def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, + node_id: bytes, capacity: int = constants.K): """ @param range_min: The lower boundary for the range in the n-bit ID space covered by this k-bucket @@ -42,6 +43,7 @@ class KBucket: self.peers: typing.List['KademliaPeer'] = [] self._node_id = node_id self._distance_to_self = Distance(node_id) + self.capacity = capacity def add_peer(self, peer: 'KademliaPeer') -> bool: """ Add contact to _contact list in the right order. This will move the @@ -68,7 +70,7 @@ class KBucket: self.peers.remove(local_peer) self.peers.append(peer) return True - if len(self.peers) < constants.K: + if len(self.peers) < self.capacity: self.peers.append(peer) self.peer_in_routing_table_metric.labels("global").inc() bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id) @@ -76,7 +78,6 @@ class KBucket: return True else: return False - # raise BucketFull("No space in bucket to insert contact") def get_peer(self, node_id: bytes) -> 'KademliaPeer': for peer in self.peers: @@ -178,6 +179,13 @@ class TreeRoutingTable: version of the Kademlia paper, in section 2.4. It does, however, use the ping RPC-based k-bucket eviction algorithm described in section 2.2 of that paper. + + BOOTSTRAP MODE: if set to True, we always add all peers. This is so a + bootstrap node does not get a bias towards its own node id and replies are + the best it can provide (joining peer knows its neighbors immediately). + Over time, this will need to be optimized so we use the disk as holding + everything in memory won't be feasible anymore. + See: https://github.com/bittorrent/bootstrap-dht """ bucket_in_routing_table_metric = Gauge( "buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node", @@ -185,14 +193,15 @@ class TreeRoutingTable: ) def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes, - split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX): + split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False): self._loop = loop self._peer_manager = peer_manager self._parent_node_id = parent_node_id self._split_buckets_under_index = split_buckets_under_index self.buckets: typing.List[KBucket] = [ KBucket( - self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id + self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id, + capacity=1 << 32 if is_bootstrap_node else constants.K ) ] diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 3dd6dca82..8e03dad93 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -83,7 +83,7 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional await storage.open() node = Node( loop, PeerManager(loop), node_id, port, port, 3333, None, - storage=storage + storage=storage, is_bootstrap_node=True ) if prometheus_port > 0: metrics = SimpleMetrics(prometheus_port, node if export else None) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index 5ecad5181..051a2e6db 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -11,6 +11,34 @@ from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.extras.daemon.storage import SQLiteStorage +class TestBootstrapNode(AsyncioTestCase): + TIMEOUT = 10.0 # do not increase. Hitting a timeout is a real failure + async def test_it_adds_all(self): + loop = asyncio.get_event_loop() + loop.set_debug(False) + + with dht_mocks.mock_network_loop(loop): + advance = dht_mocks.get_time_accelerator(loop) + self.bootstrap_node = Node(self.loop, PeerManager(loop), constants.generate_id(), + 4444, 4444, 3333, '1.2.3.4', is_bootstrap_node=True) + self.bootstrap_node.start('1.2.3.4', []) + self.bootstrap_node.protocol.ping_queue._default_delay = 0 + self.addCleanup(self.bootstrap_node.stop) + + # start the nodes + nodes = {} + futs = [] + for i in range(100): + nodes[i] = Node(loop, PeerManager(loop), constants.generate_id(i), 4444, 4444, 3333, f'1.3.3.{i}') + nodes[i].start(f'1.3.3.{i}', [('1.2.3.4', 4444)]) + self.addCleanup(nodes[i].stop) + futs.append(nodes[i].joined.wait()) + await asyncio.gather(*futs) + while self.bootstrap_node.protocol.ping_queue.busy: + await advance(1) + self.assertEqual(100, len(self.bootstrap_node.protocol.routing_table.get_peers())) + + class TestNodePingQueueDiscover(AsyncioTestCase): async def test_ping_queue_discover(self): loop = asyncio.get_event_loop()