forked from LBRYCommunity/lbry-sdk
Merge pull request #3631 from lbryio/bootstrap_node
Add all peers when running as a bootstrap node
This commit is contained in:
commit
a334a93757
8 changed files with 165 additions and 164 deletions
|
@ -624,6 +624,10 @@ class Config(CLIConfig):
|
||||||
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
|
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
|
||||||
"use.", 2
|
"use.", 2
|
||||||
)
|
)
|
||||||
|
is_bootstrap_node = Toggle(
|
||||||
|
"When running as a bootstrap node, disable all logic related to balancing the routing table, so we can "
|
||||||
|
"add as many peers as possible and better help first-runs.", False
|
||||||
|
)
|
||||||
|
|
||||||
# protocol timeouts
|
# protocol timeouts
|
||||||
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
|
download_timeout = Float("Cumulative timeout for a stream to begin downloading before giving up", 30.0)
|
||||||
|
|
|
@ -30,12 +30,12 @@ class Node:
|
||||||
)
|
)
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
|
||||||
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
|
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
|
||||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
|
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False,
|
||||||
storage: typing.Optional['SQLiteStorage'] = None):
|
storage: typing.Optional['SQLiteStorage'] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.internal_udp_port = internal_udp_port
|
self.internal_udp_port = internal_udp_port
|
||||||
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout,
|
self.protocol = KademliaProtocol(loop, peer_manager, node_id, external_ip, udp_port, peer_port, rpc_timeout,
|
||||||
split_buckets_under_index)
|
split_buckets_under_index, is_bootstrap_node)
|
||||||
self.listening_port: asyncio.DatagramTransport = None
|
self.listening_port: asyncio.DatagramTransport = None
|
||||||
self.joined = asyncio.Event(loop=self.loop)
|
self.joined = asyncio.Event(loop=self.loop)
|
||||||
self._join_task: asyncio.Task = None
|
self._join_task: asyncio.Task = None
|
||||||
|
@ -70,13 +70,6 @@ class Node:
|
||||||
|
|
||||||
# get ids falling in the midpoint of each bucket that hasn't been recently updated
|
# get ids falling in the midpoint of each bucket that hasn't been recently updated
|
||||||
node_ids = self.protocol.routing_table.get_refresh_list(0, True)
|
node_ids = self.protocol.routing_table.get_refresh_list(0, True)
|
||||||
# if we have 3 or fewer populated buckets get two random ids in the range of each to try and
|
|
||||||
# populate/split the buckets further
|
|
||||||
buckets_with_contacts = self.protocol.routing_table.buckets_with_contacts()
|
|
||||||
if buckets_with_contacts <= 3:
|
|
||||||
for i in range(buckets_with_contacts):
|
|
||||||
node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i))
|
|
||||||
node_ids.append(self.protocol.routing_table.random_id_in_bucket_range(i))
|
|
||||||
|
|
||||||
if self.protocol.routing_table.get_peers():
|
if self.protocol.routing_table.get_peers():
|
||||||
# if we have node ids to look up, perform the iterative search until we have k results
|
# if we have node ids to look up, perform the iterative search until we have k results
|
||||||
|
@ -203,15 +196,13 @@ class Node:
|
||||||
|
|
||||||
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
||||||
max_results: int = constants.K) -> IterativeNodeFinder:
|
max_results: int = constants.K) -> IterativeNodeFinder:
|
||||||
|
shortlist = shortlist or self.protocol.routing_table.find_close_peers(key)
|
||||||
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
return IterativeNodeFinder(self.loop, self.protocol, key, max_results, shortlist)
|
||||||
key, max_results, None, shortlist)
|
|
||||||
|
|
||||||
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
||||||
max_results: int = -1) -> IterativeValueFinder:
|
max_results: int = -1) -> IterativeValueFinder:
|
||||||
|
shortlist = shortlist or self.protocol.routing_table.find_close_peers(key)
|
||||||
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
return IterativeValueFinder(self.loop, self.protocol, key, max_results, shortlist)
|
||||||
key, max_results, None, shortlist)
|
|
||||||
|
|
||||||
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
|
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
|
||||||
|
|
|
@ -12,7 +12,6 @@ from lbry.dht.peer import make_kademlia_peer
|
||||||
from lbry.dht.serialization.datagram import PAGE_KEY
|
from lbry.dht.serialization.datagram import PAGE_KEY
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from lbry.dht.protocol.routing_table import TreeRoutingTable
|
|
||||||
from lbry.dht.protocol.protocol import KademliaProtocol
|
from lbry.dht.protocol.protocol import KademliaProtocol
|
||||||
from lbry.dht.peer import PeerManager, KademliaPeer
|
from lbry.dht.peer import PeerManager, KademliaPeer
|
||||||
|
|
||||||
|
@ -57,37 +56,19 @@ class FindValueResponse(FindResponse):
|
||||||
return [(node_id, address.decode(), port) for node_id, address, port in self.close_triples]
|
return [(node_id, address.decode(), port) for node_id, address, port in self.close_triples]
|
||||||
|
|
||||||
|
|
||||||
def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
|
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']]) -> typing.List['KademliaPeer']:
|
|
||||||
"""
|
|
||||||
If not provided, initialize the shortlist of peers to probe to the (up to) k closest peers in the routing table
|
|
||||||
|
|
||||||
:param routing_table: a TreeRoutingTable
|
|
||||||
:param key: a 48 byte hash
|
|
||||||
:param shortlist: optional manually provided shortlist, this is done during bootstrapping when there are no
|
|
||||||
peers in the routing table. During bootstrap the shortlist is set to be the seed nodes.
|
|
||||||
"""
|
|
||||||
if len(key) != constants.HASH_LENGTH:
|
|
||||||
raise ValueError("invalid key length: %i" % len(key))
|
|
||||||
return shortlist or routing_table.find_close_peers(key)
|
|
||||||
|
|
||||||
|
|
||||||
class IterativeFinder(AsyncIterator):
|
class IterativeFinder(AsyncIterator):
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
def __init__(self, loop: asyncio.AbstractEventLoop,
|
||||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
protocol: 'KademliaProtocol', key: bytes,
|
||||||
max_results: typing.Optional[int] = constants.K,
|
max_results: typing.Optional[int] = constants.K,
|
||||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||||
if len(key) != constants.HASH_LENGTH:
|
if len(key) != constants.HASH_LENGTH:
|
||||||
raise ValueError("invalid key length: %i" % len(key))
|
raise ValueError("invalid key length: %i" % len(key))
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.peer_manager = peer_manager
|
self.peer_manager = protocol.peer_manager
|
||||||
self.routing_table = routing_table
|
|
||||||
self.protocol = protocol
|
self.protocol = protocol
|
||||||
|
|
||||||
self.key = key
|
self.key = key
|
||||||
self.max_results = max(constants.K, max_results)
|
self.max_results = max(constants.K, max_results)
|
||||||
self.exclude = exclude or []
|
|
||||||
|
|
||||||
self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
|
self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
|
||||||
self.contacted: typing.Set['KademliaPeer'] = set()
|
self.contacted: typing.Set['KademliaPeer'] = set()
|
||||||
|
@ -99,7 +80,7 @@ class IterativeFinder(AsyncIterator):
|
||||||
self.iteration_count = 0
|
self.iteration_count = 0
|
||||||
self.running = False
|
self.running = False
|
||||||
self.tasks: typing.List[asyncio.Task] = []
|
self.tasks: typing.List[asyncio.Task] = []
|
||||||
for peer in get_shortlist(routing_table, key, shortlist):
|
for peer in shortlist:
|
||||||
if peer.node_id:
|
if peer.node_id:
|
||||||
self._add_active(peer, force=True)
|
self._add_active(peer, force=True)
|
||||||
else:
|
else:
|
||||||
|
@ -198,8 +179,6 @@ class IterativeFinder(AsyncIterator):
|
||||||
if index > (constants.K + len(self.running_probes)):
|
if index > (constants.K + len(self.running_probes)):
|
||||||
break
|
break
|
||||||
origin_address = (peer.address, peer.udp_port)
|
origin_address = (peer.address, peer.udp_port)
|
||||||
if origin_address in self.exclude:
|
|
||||||
continue
|
|
||||||
if peer.node_id == self.protocol.node_id:
|
if peer.node_id == self.protocol.node_id:
|
||||||
continue
|
continue
|
||||||
if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
|
if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
|
||||||
|
@ -277,13 +256,11 @@ class IterativeFinder(AsyncIterator):
|
||||||
type(self).__name__, id(self), self.key.hex()[:8])
|
type(self).__name__, id(self), self.key.hex()[:8])
|
||||||
|
|
||||||
class IterativeNodeFinder(IterativeFinder):
|
class IterativeNodeFinder(IterativeFinder):
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
def __init__(self, loop: asyncio.AbstractEventLoop,
|
||||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
protocol: 'KademliaProtocol', key: bytes,
|
||||||
max_results: typing.Optional[int] = constants.K,
|
max_results: typing.Optional[int] = constants.K,
|
||||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||||
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
super().__init__(loop, protocol, key, max_results, shortlist)
|
||||||
shortlist)
|
|
||||||
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
||||||
|
|
||||||
async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
|
async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
|
||||||
|
@ -319,13 +296,11 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
|
|
||||||
|
|
||||||
class IterativeValueFinder(IterativeFinder):
|
class IterativeValueFinder(IterativeFinder):
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
def __init__(self, loop: asyncio.AbstractEventLoop,
|
||||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
protocol: 'KademliaProtocol', key: bytes,
|
||||||
max_results: typing.Optional[int] = constants.K,
|
max_results: typing.Optional[int] = constants.K,
|
||||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||||
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
super().__init__(loop, protocol, key, max_results, shortlist)
|
||||||
shortlist)
|
|
||||||
self.blob_peers: typing.Set['KademliaPeer'] = set()
|
self.blob_peers: typing.Set['KademliaPeer'] = set()
|
||||||
# this tracks the index of the most recent page we requested from each peer
|
# this tracks the index of the most recent page we requested from each peer
|
||||||
self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
|
self.peer_pages: typing.DefaultDict['KademliaPeer', int] = defaultdict(int)
|
||||||
|
|
|
@ -218,6 +218,10 @@ class PingQueue:
|
||||||
def running(self):
|
def running(self):
|
||||||
return self._running
|
return self._running
|
||||||
|
|
||||||
|
@property
|
||||||
|
def busy(self):
|
||||||
|
return self._running and (any(self._running_pings) or any(self._pending_contacts))
|
||||||
|
|
||||||
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
|
def enqueue_maybe_ping(self, *peers: 'KademliaPeer', delay: typing.Optional[float] = None):
|
||||||
delay = delay if delay is not None else self._default_delay
|
delay = delay if delay is not None else self._default_delay
|
||||||
now = self._loop.time()
|
now = self._loop.time()
|
||||||
|
@ -229,7 +233,7 @@ class PingQueue:
|
||||||
async def ping_task():
|
async def ping_task():
|
||||||
try:
|
try:
|
||||||
if self._protocol.peer_manager.peer_is_good(peer):
|
if self._protocol.peer_manager.peer_is_good(peer):
|
||||||
if peer not in self._protocol.routing_table.get_peers():
|
if not self._protocol.routing_table.get_peer(peer.node_id):
|
||||||
self._protocol.add_peer(peer)
|
self._protocol.add_peer(peer)
|
||||||
return
|
return
|
||||||
await self._protocol.get_rpc_peer(peer).ping()
|
await self._protocol.get_rpc_peer(peer).ping()
|
||||||
|
@ -294,7 +298,7 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
|
||||||
udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
|
udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
|
||||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
|
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_boostrap_node: bool = False):
|
||||||
self.peer_manager = peer_manager
|
self.peer_manager = peer_manager
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.node_id = node_id
|
self.node_id = node_id
|
||||||
|
@ -309,7 +313,8 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
self.transport: DatagramTransport = None
|
self.transport: DatagramTransport = None
|
||||||
self.old_token_secret = constants.generate_id()
|
self.old_token_secret = constants.generate_id()
|
||||||
self.token_secret = constants.generate_id()
|
self.token_secret = constants.generate_id()
|
||||||
self.routing_table = TreeRoutingTable(self.loop, self.peer_manager, self.node_id, split_buckets_under_index)
|
self.routing_table = TreeRoutingTable(
|
||||||
|
self.loop, self.peer_manager, self.node_id, split_buckets_under_index, is_bootstrap_node=is_boostrap_node)
|
||||||
self.data_store = DictDataStore(self.loop, self.peer_manager)
|
self.data_store = DictDataStore(self.loop, self.peer_manager)
|
||||||
self.ping_queue = PingQueue(self.loop, self)
|
self.ping_queue = PingQueue(self.loop, self)
|
||||||
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
|
self.node_rpc = KademliaRPC(self, self.loop, self.peer_port)
|
||||||
|
@ -356,72 +361,10 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
return args, {}
|
return args, {}
|
||||||
|
|
||||||
async def _add_peer(self, peer: 'KademliaPeer'):
|
async def _add_peer(self, peer: 'KademliaPeer'):
|
||||||
if not peer.node_id:
|
async def probe(some_peer: 'KademliaPeer'):
|
||||||
log.warning("Tried adding a peer with no node id!")
|
rpc_peer = self.get_rpc_peer(some_peer)
|
||||||
return False
|
await rpc_peer.ping()
|
||||||
for my_peer in self.routing_table.get_peers():
|
return await self.routing_table.add_peer(peer, probe)
|
||||||
if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id:
|
|
||||||
self.routing_table.remove_peer(my_peer)
|
|
||||||
self.routing_table.join_buckets()
|
|
||||||
bucket_index = self.routing_table.kbucket_index(peer.node_id)
|
|
||||||
if self.routing_table.buckets[bucket_index].add_peer(peer):
|
|
||||||
return True
|
|
||||||
|
|
||||||
# The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
|
|
||||||
if self.routing_table.should_split(bucket_index, peer.node_id):
|
|
||||||
self.routing_table.split_bucket(bucket_index)
|
|
||||||
# Retry the insertion attempt
|
|
||||||
result = await self._add_peer(peer)
|
|
||||||
self.routing_table.join_buckets()
|
|
||||||
return result
|
|
||||||
else:
|
|
||||||
# We can't split the k-bucket
|
|
||||||
#
|
|
||||||
# The 13 page kademlia paper specifies that the least recently contacted node in the bucket
|
|
||||||
# shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
|
|
||||||
# the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
|
|
||||||
#
|
|
||||||
# A reasonable extension to this is BEP 0005, which extends the above:
|
|
||||||
#
|
|
||||||
# Not all nodes that we learn about are equal. Some are "good" and some are not.
|
|
||||||
# Many nodes using the DHT are able to send queries and receive responses,
|
|
||||||
# but are not able to respond to queries from other nodes. It is important that
|
|
||||||
# each node's routing table must contain only known good nodes. A good node is
|
|
||||||
# a node has responded to one of our queries within the last 15 minutes. A node
|
|
||||||
# is also good if it has ever responded to one of our queries and has sent us a
|
|
||||||
# query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
|
|
||||||
# questionable. Nodes become bad when they fail to respond to multiple queries
|
|
||||||
# in a row. Nodes that we know are good are given priority over nodes with unknown status.
|
|
||||||
#
|
|
||||||
# When there are bad or questionable nodes in the bucket, the least recent is selected for
|
|
||||||
# potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
|
|
||||||
# contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
|
|
||||||
# is ignored if the pinged node replies.
|
|
||||||
|
|
||||||
not_good_contacts = self.routing_table.buckets[bucket_index].get_bad_or_unknown_peers()
|
|
||||||
not_recently_replied = []
|
|
||||||
for my_peer in not_good_contacts:
|
|
||||||
last_replied = self.peer_manager.get_last_replied(my_peer.address, my_peer.udp_port)
|
|
||||||
if not last_replied or last_replied + 60 < self.loop.time():
|
|
||||||
not_recently_replied.append(my_peer)
|
|
||||||
if not_recently_replied:
|
|
||||||
to_replace = not_recently_replied[0]
|
|
||||||
else:
|
|
||||||
to_replace = self.routing_table.buckets[bucket_index].peers[0]
|
|
||||||
last_replied = self.peer_manager.get_last_replied(to_replace.address, to_replace.udp_port)
|
|
||||||
if last_replied and last_replied + 60 > self.loop.time():
|
|
||||||
return False
|
|
||||||
log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port)
|
|
||||||
try:
|
|
||||||
to_replace_rpc = self.get_rpc_peer(to_replace)
|
|
||||||
await to_replace_rpc.ping()
|
|
||||||
return False
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
|
|
||||||
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
|
|
||||||
if to_replace in self.routing_table.buckets[bucket_index]:
|
|
||||||
self.routing_table.buckets[bucket_index].remove_peer(to_replace)
|
|
||||||
return await self._add_peer(peer)
|
|
||||||
|
|
||||||
def add_peer(self, peer: 'KademliaPeer'):
|
def add_peer(self, peer: 'KademliaPeer'):
|
||||||
if peer.node_id == self.node_id:
|
if peer.node_id == self.node_id:
|
||||||
|
@ -439,7 +382,6 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
async with self._split_lock:
|
async with self._split_lock:
|
||||||
peer = self._to_remove.pop()
|
peer = self._to_remove.pop()
|
||||||
self.routing_table.remove_peer(peer)
|
self.routing_table.remove_peer(peer)
|
||||||
self.routing_table.join_buckets()
|
|
||||||
while self._to_add:
|
while self._to_add:
|
||||||
async with self._split_lock:
|
async with self._split_lock:
|
||||||
await self._add_peer(self._to_add.pop())
|
await self._add_peer(self._to_add.pop())
|
||||||
|
@ -482,9 +424,8 @@ class KademliaProtocol(DatagramProtocol):
|
||||||
# This is an RPC method request
|
# This is an RPC method request
|
||||||
self.received_request_metric.labels(method=request_datagram.method).inc()
|
self.received_request_metric.labels(method=request_datagram.method).inc()
|
||||||
self.peer_manager.report_last_requested(address[0], address[1])
|
self.peer_manager.report_last_requested(address[0], address[1])
|
||||||
try:
|
|
||||||
peer = self.routing_table.get_peer(request_datagram.node_id)
|
peer = self.routing_table.get_peer(request_datagram.node_id)
|
||||||
except IndexError:
|
if not peer:
|
||||||
try:
|
try:
|
||||||
peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1])
|
peer = make_kademlia_peer(request_datagram.node_id, address[0], address[1])
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
|
|
|
@ -28,7 +28,8 @@ class KBucket:
|
||||||
namespace="dht_node", labelnames=("amount",)
|
namespace="dht_node", labelnames=("amount",)
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes):
|
def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int,
|
||||||
|
node_id: bytes, capacity: int = constants.K):
|
||||||
"""
|
"""
|
||||||
@param range_min: The lower boundary for the range in the n-bit ID
|
@param range_min: The lower boundary for the range in the n-bit ID
|
||||||
space covered by this k-bucket
|
space covered by this k-bucket
|
||||||
|
@ -36,12 +37,12 @@ class KBucket:
|
||||||
covered by this k-bucket
|
covered by this k-bucket
|
||||||
"""
|
"""
|
||||||
self._peer_manager = peer_manager
|
self._peer_manager = peer_manager
|
||||||
self.last_accessed = 0
|
|
||||||
self.range_min = range_min
|
self.range_min = range_min
|
||||||
self.range_max = range_max
|
self.range_max = range_max
|
||||||
self.peers: typing.List['KademliaPeer'] = []
|
self.peers: typing.List['KademliaPeer'] = []
|
||||||
self._node_id = node_id
|
self._node_id = node_id
|
||||||
self._distance_to_self = Distance(node_id)
|
self._distance_to_self = Distance(node_id)
|
||||||
|
self.capacity = capacity
|
||||||
|
|
||||||
def add_peer(self, peer: 'KademliaPeer') -> bool:
|
def add_peer(self, peer: 'KademliaPeer') -> bool:
|
||||||
""" Add contact to _contact list in the right order. This will move the
|
""" Add contact to _contact list in the right order. This will move the
|
||||||
|
@ -68,7 +69,7 @@ class KBucket:
|
||||||
self.peers.remove(local_peer)
|
self.peers.remove(local_peer)
|
||||||
self.peers.append(peer)
|
self.peers.append(peer)
|
||||||
return True
|
return True
|
||||||
if len(self.peers) < constants.K:
|
if len(self.peers) < self.capacity:
|
||||||
self.peers.append(peer)
|
self.peers.append(peer)
|
||||||
self.peer_in_routing_table_metric.labels("global").inc()
|
self.peer_in_routing_table_metric.labels("global").inc()
|
||||||
bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
|
bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
|
||||||
|
@ -76,13 +77,11 @@ class KBucket:
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
# raise BucketFull("No space in bucket to insert contact")
|
|
||||||
|
|
||||||
def get_peer(self, node_id: bytes) -> 'KademliaPeer':
|
def get_peer(self, node_id: bytes) -> 'KademliaPeer':
|
||||||
for peer in self.peers:
|
for peer in self.peers:
|
||||||
if peer.node_id == node_id:
|
if peer.node_id == node_id:
|
||||||
return peer
|
return peer
|
||||||
raise IndexError(node_id)
|
|
||||||
|
|
||||||
def get_peers(self, count=-1, exclude_contact=None, sort_distance_to=None) -> typing.List['KademliaPeer']:
|
def get_peers(self, count=-1, exclude_contact=None, sort_distance_to=None) -> typing.List['KademliaPeer']:
|
||||||
""" Returns a list containing up to the first count number of contacts
|
""" Returns a list containing up to the first count number of contacts
|
||||||
|
@ -179,6 +178,13 @@ class TreeRoutingTable:
|
||||||
version of the Kademlia paper, in section 2.4. It does, however, use the
|
version of the Kademlia paper, in section 2.4. It does, however, use the
|
||||||
ping RPC-based k-bucket eviction algorithm described in section 2.2 of
|
ping RPC-based k-bucket eviction algorithm described in section 2.2 of
|
||||||
that paper.
|
that paper.
|
||||||
|
|
||||||
|
BOOTSTRAP MODE: if set to True, we always add all peers. This is so a
|
||||||
|
bootstrap node does not get a bias towards its own node id and replies are
|
||||||
|
the best it can provide (joining peer knows its neighbors immediately).
|
||||||
|
Over time, this will need to be optimized so we use the disk as holding
|
||||||
|
everything in memory won't be feasible anymore.
|
||||||
|
See: https://github.com/bittorrent/bootstrap-dht
|
||||||
"""
|
"""
|
||||||
bucket_in_routing_table_metric = Gauge(
|
bucket_in_routing_table_metric = Gauge(
|
||||||
"buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node",
|
"buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node",
|
||||||
|
@ -186,21 +192,22 @@ class TreeRoutingTable:
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
|
||||||
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
|
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX, is_bootstrap_node: bool = False):
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._peer_manager = peer_manager
|
self._peer_manager = peer_manager
|
||||||
self._parent_node_id = parent_node_id
|
self._parent_node_id = parent_node_id
|
||||||
self._split_buckets_under_index = split_buckets_under_index
|
self._split_buckets_under_index = split_buckets_under_index
|
||||||
self.buckets: typing.List[KBucket] = [
|
self.buckets: typing.List[KBucket] = [
|
||||||
KBucket(
|
KBucket(
|
||||||
self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id
|
self._peer_manager, range_min=0, range_max=2 ** constants.HASH_BITS, node_id=self._parent_node_id,
|
||||||
|
capacity=1 << 32 if is_bootstrap_node else constants.K
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
|
|
||||||
def get_peers(self) -> typing.List['KademliaPeer']:
|
def get_peers(self) -> typing.List['KademliaPeer']:
|
||||||
return list(itertools.chain.from_iterable(map(lambda bucket: bucket.peers, self.buckets)))
|
return list(itertools.chain.from_iterable(map(lambda bucket: bucket.peers, self.buckets)))
|
||||||
|
|
||||||
def should_split(self, bucket_index: int, to_add: bytes) -> bool:
|
def _should_split(self, bucket_index: int, to_add: bytes) -> bool:
|
||||||
# https://stackoverflow.com/questions/32129978/highly-unbalanced-kademlia-routing-table/32187456#32187456
|
# https://stackoverflow.com/questions/32129978/highly-unbalanced-kademlia-routing-table/32187456#32187456
|
||||||
if bucket_index < self._split_buckets_under_index:
|
if bucket_index < self._split_buckets_under_index:
|
||||||
return True
|
return True
|
||||||
|
@ -225,39 +232,32 @@ class TreeRoutingTable:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def get_peer(self, contact_id: bytes) -> 'KademliaPeer':
|
def get_peer(self, contact_id: bytes) -> 'KademliaPeer':
|
||||||
"""
|
return self.buckets[self._kbucket_index(contact_id)].get_peer(contact_id)
|
||||||
@raise IndexError: No contact with the specified contact ID is known
|
|
||||||
by this node
|
|
||||||
"""
|
|
||||||
return self.buckets[self.kbucket_index(contact_id)].get_peer(contact_id)
|
|
||||||
|
|
||||||
def get_refresh_list(self, start_index: int = 0, force: bool = False) -> typing.List[bytes]:
|
def get_refresh_list(self, start_index: int = 0, force: bool = False) -> typing.List[bytes]:
|
||||||
bucket_index = start_index
|
|
||||||
refresh_ids = []
|
refresh_ids = []
|
||||||
now = int(self._loop.time())
|
for offset, _ in enumerate(self.buckets[start_index:]):
|
||||||
for bucket in self.buckets[start_index:]:
|
refresh_ids.append(self._midpoint_id_in_bucket_range(start_index + offset))
|
||||||
if force or now - bucket.last_accessed >= constants.REFRESH_INTERVAL:
|
# if we have 3 or fewer populated buckets get two random ids in the range of each to try and
|
||||||
to_search = self.midpoint_id_in_bucket_range(bucket_index)
|
# populate/split the buckets further
|
||||||
refresh_ids.append(to_search)
|
buckets_with_contacts = self.buckets_with_contacts()
|
||||||
bucket_index += 1
|
if buckets_with_contacts <= 3:
|
||||||
|
for i in range(buckets_with_contacts):
|
||||||
|
refresh_ids.append(self._random_id_in_bucket_range(i))
|
||||||
|
refresh_ids.append(self._random_id_in_bucket_range(i))
|
||||||
return refresh_ids
|
return refresh_ids
|
||||||
|
|
||||||
def remove_peer(self, peer: 'KademliaPeer') -> None:
|
def remove_peer(self, peer: 'KademliaPeer') -> None:
|
||||||
if not peer.node_id:
|
if not peer.node_id:
|
||||||
return
|
return
|
||||||
bucket_index = self.kbucket_index(peer.node_id)
|
bucket_index = self._kbucket_index(peer.node_id)
|
||||||
try:
|
try:
|
||||||
self.buckets[bucket_index].remove_peer(peer)
|
self.buckets[bucket_index].remove_peer(peer)
|
||||||
|
self._join_buckets()
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return
|
return
|
||||||
|
|
||||||
def touch_kbucket(self, key: bytes) -> None:
|
def _kbucket_index(self, key: bytes) -> int:
|
||||||
self.touch_kbucket_by_index(self.kbucket_index(key))
|
|
||||||
|
|
||||||
def touch_kbucket_by_index(self, bucket_index: int):
|
|
||||||
self.buckets[bucket_index].last_accessed = int(self._loop.time())
|
|
||||||
|
|
||||||
def kbucket_index(self, key: bytes) -> int:
|
|
||||||
i = 0
|
i = 0
|
||||||
for bucket in self.buckets:
|
for bucket in self.buckets:
|
||||||
if bucket.key_in_range(key):
|
if bucket.key_in_range(key):
|
||||||
|
@ -266,19 +266,19 @@ class TreeRoutingTable:
|
||||||
i += 1
|
i += 1
|
||||||
return i
|
return i
|
||||||
|
|
||||||
def random_id_in_bucket_range(self, bucket_index: int) -> bytes:
|
def _random_id_in_bucket_range(self, bucket_index: int) -> bytes:
|
||||||
random_id = int(random.randrange(self.buckets[bucket_index].range_min, self.buckets[bucket_index].range_max))
|
random_id = int(random.randrange(self.buckets[bucket_index].range_min, self.buckets[bucket_index].range_max))
|
||||||
return Distance(
|
return Distance(
|
||||||
self._parent_node_id
|
self._parent_node_id
|
||||||
)(random_id.to_bytes(constants.HASH_LENGTH, 'big')).to_bytes(constants.HASH_LENGTH, 'big')
|
)(random_id.to_bytes(constants.HASH_LENGTH, 'big')).to_bytes(constants.HASH_LENGTH, 'big')
|
||||||
|
|
||||||
def midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes:
|
def _midpoint_id_in_bucket_range(self, bucket_index: int) -> bytes:
|
||||||
half = int((self.buckets[bucket_index].range_max - self.buckets[bucket_index].range_min) // 2)
|
half = int((self.buckets[bucket_index].range_max - self.buckets[bucket_index].range_min) // 2)
|
||||||
return Distance(self._parent_node_id)(
|
return Distance(self._parent_node_id)(
|
||||||
int(self.buckets[bucket_index].range_min + half).to_bytes(constants.HASH_LENGTH, 'big')
|
int(self.buckets[bucket_index].range_min + half).to_bytes(constants.HASH_LENGTH, 'big')
|
||||||
).to_bytes(constants.HASH_LENGTH, 'big')
|
).to_bytes(constants.HASH_LENGTH, 'big')
|
||||||
|
|
||||||
def split_bucket(self, old_bucket_index: int) -> None:
|
def _split_bucket(self, old_bucket_index: int) -> None:
|
||||||
""" Splits the specified k-bucket into two new buckets which together
|
""" Splits the specified k-bucket into two new buckets which together
|
||||||
cover the same range in the key/ID space
|
cover the same range in the key/ID space
|
||||||
|
|
||||||
|
@ -303,7 +303,7 @@ class TreeRoutingTable:
|
||||||
old_bucket.remove_peer(contact)
|
old_bucket.remove_peer(contact)
|
||||||
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
|
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
|
||||||
|
|
||||||
def join_buckets(self):
|
def _join_buckets(self):
|
||||||
if len(self.buckets) == 1:
|
if len(self.buckets) == 1:
|
||||||
return
|
return
|
||||||
to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
|
to_pop = [i for i, bucket in enumerate(self.buckets) if len(bucket) == 0]
|
||||||
|
@ -326,14 +326,7 @@ class TreeRoutingTable:
|
||||||
self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
|
self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
|
||||||
self.buckets.remove(bucket)
|
self.buckets.remove(bucket)
|
||||||
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
|
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
|
||||||
return self.join_buckets()
|
return self._join_buckets()
|
||||||
|
|
||||||
def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool:
|
|
||||||
for bucket in self.buckets:
|
|
||||||
for contact in bucket.get_peers(sort_distance_to=False):
|
|
||||||
if address_tuple[0] == contact.address and address_tuple[1] == contact.udp_port:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def buckets_with_contacts(self) -> int:
|
def buckets_with_contacts(self) -> int:
|
||||||
count = 0
|
count = 0
|
||||||
|
@ -341,3 +334,70 @@ class TreeRoutingTable:
|
||||||
if len(bucket) > 0:
|
if len(bucket) > 0:
|
||||||
count += 1
|
count += 1
|
||||||
return count
|
return count
|
||||||
|
|
||||||
|
async def add_peer(self, peer: 'KademliaPeer', probe: typing.Callable[['KademliaPeer'], typing.Awaitable]):
|
||||||
|
if not peer.node_id:
|
||||||
|
log.warning("Tried adding a peer with no node id!")
|
||||||
|
return False
|
||||||
|
for my_peer in self.get_peers():
|
||||||
|
if (my_peer.address, my_peer.udp_port) == (peer.address, peer.udp_port) and my_peer.node_id != peer.node_id:
|
||||||
|
self.remove_peer(my_peer)
|
||||||
|
self._join_buckets()
|
||||||
|
bucket_index = self._kbucket_index(peer.node_id)
|
||||||
|
if self.buckets[bucket_index].add_peer(peer):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# The bucket is full; see if it can be split (by checking if its range includes the host node's node_id)
|
||||||
|
if self._should_split(bucket_index, peer.node_id):
|
||||||
|
self._split_bucket(bucket_index)
|
||||||
|
# Retry the insertion attempt
|
||||||
|
result = await self.add_peer(peer, probe)
|
||||||
|
self._join_buckets()
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
# We can't split the k-bucket
|
||||||
|
#
|
||||||
|
# The 13 page kademlia paper specifies that the least recently contacted node in the bucket
|
||||||
|
# shall be pinged. If it fails to reply it is replaced with the new contact. If the ping is successful
|
||||||
|
# the new contact is ignored and not added to the bucket (sections 2.2 and 2.4).
|
||||||
|
#
|
||||||
|
# A reasonable extension to this is BEP 0005, which extends the above:
|
||||||
|
#
|
||||||
|
# Not all nodes that we learn about are equal. Some are "good" and some are not.
|
||||||
|
# Many nodes using the DHT are able to send queries and receive responses,
|
||||||
|
# but are not able to respond to queries from other nodes. It is important that
|
||||||
|
# each node's routing table must contain only known good nodes. A good node is
|
||||||
|
# a node has responded to one of our queries within the last 15 minutes. A node
|
||||||
|
# is also good if it has ever responded to one of our queries and has sent us a
|
||||||
|
# query within the last 15 minutes. After 15 minutes of inactivity, a node becomes
|
||||||
|
# questionable. Nodes become bad when they fail to respond to multiple queries
|
||||||
|
# in a row. Nodes that we know are good are given priority over nodes with unknown status.
|
||||||
|
#
|
||||||
|
# When there are bad or questionable nodes in the bucket, the least recent is selected for
|
||||||
|
# potential replacement (BEP 0005). When all nodes in the bucket are fresh, the head (least recent)
|
||||||
|
# contact is selected as described in section 2.2 of the kademlia paper. In both cases the new contact
|
||||||
|
# is ignored if the pinged node replies.
|
||||||
|
|
||||||
|
not_good_contacts = self.buckets[bucket_index].get_bad_or_unknown_peers()
|
||||||
|
not_recently_replied = []
|
||||||
|
for my_peer in not_good_contacts:
|
||||||
|
last_replied = self._peer_manager.get_last_replied(my_peer.address, my_peer.udp_port)
|
||||||
|
if not last_replied or last_replied + 60 < self._loop.time():
|
||||||
|
not_recently_replied.append(my_peer)
|
||||||
|
if not_recently_replied:
|
||||||
|
to_replace = not_recently_replied[0]
|
||||||
|
else:
|
||||||
|
to_replace = self.buckets[bucket_index].peers[0]
|
||||||
|
last_replied = self._peer_manager.get_last_replied(to_replace.address, to_replace.udp_port)
|
||||||
|
if last_replied and last_replied + 60 > self._loop.time():
|
||||||
|
return False
|
||||||
|
log.debug("pinging %s:%s", to_replace.address, to_replace.udp_port)
|
||||||
|
try:
|
||||||
|
await probe(to_replace)
|
||||||
|
return False
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
log.debug("Replacing dead contact in bucket %i: %s:%i with %s:%i ", bucket_index,
|
||||||
|
to_replace.address, to_replace.udp_port, peer.address, peer.udp_port)
|
||||||
|
if to_replace in self.buckets[bucket_index]:
|
||||||
|
self.buckets[bucket_index].remove_peer(to_replace)
|
||||||
|
return await self.add_peer(peer, probe)
|
||||||
|
|
|
@ -297,6 +297,7 @@ class DHTComponent(Component):
|
||||||
peer_port=self.external_peer_port,
|
peer_port=self.external_peer_port,
|
||||||
rpc_timeout=self.conf.node_rpc_timeout,
|
rpc_timeout=self.conf.node_rpc_timeout,
|
||||||
split_buckets_under_index=self.conf.split_buckets_under_index,
|
split_buckets_under_index=self.conf.split_buckets_under_index,
|
||||||
|
is_bootstrap_node=self.conf.is_bootstrap_node,
|
||||||
storage=storage
|
storage=storage
|
||||||
)
|
)
|
||||||
self.dht_node.start(self.conf.network_interface, self.conf.known_dht_nodes)
|
self.dht_node.start(self.conf.network_interface, self.conf.known_dht_nodes)
|
||||||
|
|
|
@ -83,7 +83,7 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional
|
||||||
await storage.open()
|
await storage.open()
|
||||||
node = Node(
|
node = Node(
|
||||||
loop, PeerManager(loop), node_id, port, port, 3333, None,
|
loop, PeerManager(loop), node_id, port, port, 3333, None,
|
||||||
storage=storage
|
storage=storage, is_bootstrap_node=True
|
||||||
)
|
)
|
||||||
if prometheus_port > 0:
|
if prometheus_port > 0:
|
||||||
metrics = SimpleMetrics(prometheus_port, node if export else None)
|
metrics = SimpleMetrics(prometheus_port, node if export else None)
|
||||||
|
|
|
@ -11,6 +11,35 @@ from lbry.dht.peer import PeerManager, make_kademlia_peer
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
|
|
||||||
|
|
||||||
|
class TestBootstrapNode(AsyncioTestCase):
|
||||||
|
TIMEOUT = 10.0 # do not increase. Hitting a timeout is a real failure
|
||||||
|
|
||||||
|
async def test_bootstrap_node_adds_all_peers(self):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
loop.set_debug(False)
|
||||||
|
|
||||||
|
with dht_mocks.mock_network_loop(loop):
|
||||||
|
advance = dht_mocks.get_time_accelerator(loop)
|
||||||
|
self.bootstrap_node = Node(self.loop, PeerManager(loop), constants.generate_id(),
|
||||||
|
4444, 4444, 3333, '1.2.3.4', is_bootstrap_node=True)
|
||||||
|
self.bootstrap_node.start('1.2.3.4', [])
|
||||||
|
self.bootstrap_node.protocol.ping_queue._default_delay = 0
|
||||||
|
self.addCleanup(self.bootstrap_node.stop)
|
||||||
|
|
||||||
|
# start the nodes
|
||||||
|
nodes = {}
|
||||||
|
futs = []
|
||||||
|
for i in range(100):
|
||||||
|
nodes[i] = Node(loop, PeerManager(loop), constants.generate_id(i), 4444, 4444, 3333, f'1.3.3.{i}')
|
||||||
|
nodes[i].start(f'1.3.3.{i}', [('1.2.3.4', 4444)])
|
||||||
|
self.addCleanup(nodes[i].stop)
|
||||||
|
futs.append(nodes[i].joined.wait())
|
||||||
|
await asyncio.gather(*futs)
|
||||||
|
while self.bootstrap_node.protocol.ping_queue.busy:
|
||||||
|
await advance(1)
|
||||||
|
self.assertEqual(100, len(self.bootstrap_node.protocol.routing_table.get_peers()))
|
||||||
|
|
||||||
|
|
||||||
class TestNodePingQueueDiscover(AsyncioTestCase):
|
class TestNodePingQueueDiscover(AsyncioTestCase):
|
||||||
async def test_ping_queue_discover(self):
|
async def test_ping_queue_discover(self):
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
Loading…
Reference in a new issue