Merge pull request #1942 from lbryio/split_buckets_under_index
add split_buckets_under_index config setting for seed nodes
This commit is contained in:
commit
8944a36291
6 changed files with 22 additions and 7 deletions
|
@ -473,6 +473,14 @@ class Config(CLIConfig):
|
||||||
tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port'])
|
tcp_port = Integer("TCP port to listen for incoming blob requests", 3333, previous_names=['peer_port'])
|
||||||
network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0')
|
network_interface = String("Interface to use for the DHT and blob exchange", '0.0.0.0')
|
||||||
|
|
||||||
|
# routing table
|
||||||
|
split_buckets_under_index = Integer(
|
||||||
|
"Routing table bucket index below which we always split the bucket if given a new key to add to it and "
|
||||||
|
"the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
|
||||||
|
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
|
||||||
|
"use.", 1
|
||||||
|
)
|
||||||
|
|
||||||
# protocol timeouts
|
# protocol timeouts
|
||||||
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
|
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
|
||||||
blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0)
|
blob_download_timeout = Float("Timeout to download a blob from a peer", 30.0)
|
||||||
|
|
|
@ -6,6 +6,7 @@ hash_length = hash_class().digest_size
|
||||||
hash_bits = hash_length * 8
|
hash_bits = hash_length * 8
|
||||||
alpha = 5
|
alpha = 5
|
||||||
k = 8
|
k = 8
|
||||||
|
split_buckets_under_index = 1
|
||||||
replacement_cache_size = 8
|
replacement_cache_size = 8
|
||||||
rpc_timeout = 5.0
|
rpc_timeout = 5.0
|
||||||
rpc_attempts = 5
|
rpc_attempts = 5
|
||||||
|
|
|
@ -20,10 +20,12 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
class Node:
|
class Node:
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
||||||
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: typing.Optional[float] = 5.0):
|
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.rpc_timeout,
|
||||||
|
split_buckets_under_index: int = constants.split_buckets_under_index):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.internal_udp_port = internal_udp_port
|
self.internal_udp_port = internal_udp_port
|
||||||
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout)
|
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout,
|
||||||
|
split_buckets_under_index)
|
||||||
self.listening_port: asyncio.DatagramTransport = None
|
self.listening_port: asyncio.DatagramTransport = None
|
||||||
self.joined = asyncio.Event(loop=self.loop)
|
self.joined = asyncio.Event(loop=self.loop)
|
||||||
self._join_task: asyncio.Task = None
|
self._join_task: asyncio.Task = None
|
||||||
|
|
|
@ -260,7 +260,8 @@ class PingQueue:
|
||||||
|
|
||||||
class KademliaProtocol(DatagramProtocol):
|
class KademliaProtocol(DatagramProtocol):
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
|
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
|
||||||
udp_port: int, peer_port: int, rpc_timeout: float = 5.0):
|
udp_port: int, peer_port: int, rpc_timeout: float = constants.rpc_timeout,
|
||||||
|
split_buckets_under_index: int = constants.split_buckets_under_index):
|
||||||
self.peer_manager = peer_manager
|
self.peer_manager = peer_manager
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.node_id = node_id
|
self.node_id = node_id
|
||||||
|
@ -275,7 +276,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self.transport: DatagramTransport = None
|
self.transport: DatagramTransport = None
|
||||||
self.old_token_secret = constants.generate_id()
|
self.old_token_secret = constants.generate_id()
|
||||||
self.token_secret = constants.generate_id()
|
self.token_secret = constants.generate_id()
|
||||||
self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id)
|
self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id, split_buckets_under_index)
|
||||||
self.data_store = DictDataStore(self.loop, self.peer_manager)
|
self.data_store = DictDataStore(self.loop, self.peer_manager)
|
||||||
self.ping_queue = PingQueue(self.loop, self)
|
self.ping_queue = PingQueue(self.loop, self)
|
||||||
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
|
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
|
||||||
|
|
|
@ -156,10 +156,12 @@ class TreeRoutingTable:
|
||||||
that paper.
|
that paper.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes):
|
def __init__(self, loop: asyncio.BaseEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
|
||||||
|
split_buckets_under_index: int = constants.split_buckets_under_index):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._peer_manager = peer_manager
|
self._peer_manager = peer_manager
|
||||||
self._parent_node_id = parent_node_id
|
self._parent_node_id = parent_node_id
|
||||||
|
self._split_buckets_under_index = split_buckets_under_index
|
||||||
self.buckets: typing.List[KBucket] = [
|
self.buckets: typing.List[KBucket] = [
|
||||||
KBucket(
|
KBucket(
|
||||||
self._peer_manager, range_min=0, range_max=2 ** constants.hash_bits, node_id=self._parent_node_id
|
self._peer_manager, range_min=0, range_max=2 ** constants.hash_bits, node_id=self._parent_node_id
|
||||||
|
@ -171,7 +173,7 @@ class TreeRoutingTable:
|
||||||
|
|
||||||
def should_split(self, bucket_index: int, to_add: bytes) -> bool:
|
def should_split(self, bucket_index: int, to_add: bytes) -> bool:
|
||||||
# https://stackoverflow.com/questions/32129978/highly-unbalanced-kademlia-routing-table/32187456#32187456
|
# https://stackoverflow.com/questions/32129978/highly-unbalanced-kademlia-routing-table/32187456#32187456
|
||||||
if not bucket_index:
|
if bucket_index < self._split_buckets_under_index:
|
||||||
return True
|
return True
|
||||||
contacts = self.get_peers()
|
contacts = self.get_peers()
|
||||||
distance = Distance(self._parent_node_id)
|
distance = Distance(self._parent_node_id)
|
||||||
|
|
|
@ -372,7 +372,8 @@ class DHTComponent(Component):
|
||||||
udp_port=self.external_udp_port,
|
udp_port=self.external_udp_port,
|
||||||
external_ip=external_ip,
|
external_ip=external_ip,
|
||||||
peer_port=self.external_peer_port,
|
peer_port=self.external_peer_port,
|
||||||
rpc_timeout=self.conf.node_rpc_timeout
|
rpc_timeout=self.conf.node_rpc_timeout,
|
||||||
|
split_buckets_under_index=self.conf.split_buckets_under_index
|
||||||
)
|
)
|
||||||
self.dht_node.start(
|
self.dht_node.start(
|
||||||
interface=self.conf.network_interface, known_node_urls=self.conf.known_dht_nodes
|
interface=self.conf.network_interface, known_node_urls=self.conf.known_dht_nodes
|
||||||
|
|
Loading…
Reference in a new issue