remove all references to bottoming out
This commit is contained in:
parent
dc1c0e6851
commit
dcde0e78e3
4 changed files with 12 additions and 34 deletions
|
@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300 # 5 minutes
|
||||||
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
|
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
|
||||||
RPC_ID_LENGTH = 20
|
RPC_ID_LENGTH = 20
|
||||||
PROTOCOL_VERSION = 1
|
PROTOCOL_VERSION = 1
|
||||||
BOTTOM_OUT_LIMIT = 3
|
|
||||||
MSG_SIZE_LIMIT = 1400
|
MSG_SIZE_LIMIT = 1400
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -202,25 +202,23 @@ class Node:
|
||||||
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
|
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
|
||||||
|
|
||||||
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
||||||
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
|
|
||||||
max_results: int = constants.K) -> IterativeNodeFinder:
|
max_results: int = constants.K) -> IterativeNodeFinder:
|
||||||
|
|
||||||
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
||||||
key, bottom_out_limit, max_results, None, shortlist)
|
key, max_results, None, shortlist)
|
||||||
|
|
||||||
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
|
||||||
bottom_out_limit: int = 40,
|
|
||||||
max_results: int = -1) -> IterativeValueFinder:
|
max_results: int = -1) -> IterativeValueFinder:
|
||||||
|
|
||||||
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
|
||||||
key, bottom_out_limit, max_results, None, shortlist)
|
key, max_results, None, shortlist)
|
||||||
|
|
||||||
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
|
async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
|
||||||
bottom_out_limit=60, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
|
||||||
) -> typing.List['KademliaPeer']:
|
) -> typing.List['KademliaPeer']:
|
||||||
peers = []
|
peers = []
|
||||||
async for iteration_peers in self.get_iterative_node_finder(
|
async for iteration_peers in self.get_iterative_node_finder(
|
||||||
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
|
node_id, shortlist=shortlist, max_results=max_results):
|
||||||
peers.extend(iteration_peers)
|
peers.extend(iteration_peers)
|
||||||
distance = Distance(node_id)
|
distance = Distance(node_id)
|
||||||
peers.sort(key=lambda peer: distance(peer.node_id))
|
peers.sort(key=lambda peer: distance(peer.node_id))
|
||||||
|
|
|
@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
|
||||||
class IterativeFinder:
|
class IterativeFinder:
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
||||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
||||||
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
|
max_results: typing.Optional[int] = constants.K,
|
||||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||||
if len(key) != constants.HASH_LENGTH:
|
if len(key) != constants.HASH_LENGTH:
|
||||||
|
@ -85,7 +85,6 @@ class IterativeFinder:
|
||||||
self.protocol = protocol
|
self.protocol = protocol
|
||||||
|
|
||||||
self.key = key
|
self.key = key
|
||||||
self.bottom_out_limit = bottom_out_limit
|
|
||||||
self.max_results = max(constants.K, max_results)
|
self.max_results = max(constants.K, max_results)
|
||||||
self.exclude = exclude or []
|
self.exclude = exclude or []
|
||||||
|
|
||||||
|
@ -97,7 +96,6 @@ class IterativeFinder:
|
||||||
|
|
||||||
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
|
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
|
||||||
self.iteration_count = 0
|
self.iteration_count = 0
|
||||||
self.bottom_out_count = 0
|
|
||||||
self.running = False
|
self.running = False
|
||||||
self.tasks: typing.List[asyncio.Task] = []
|
self.tasks: typing.List[asyncio.Task] = []
|
||||||
self.delayed_call: asyncio.Handle = None
|
self.delayed_call: asyncio.Handle = None
|
||||||
|
@ -232,8 +230,8 @@ class IterativeFinder:
|
||||||
self.loop.call_soon(self.aclose)
|
self.loop.call_soon(self.aclose)
|
||||||
|
|
||||||
def _log_state(self):
|
def _log_state(self):
|
||||||
log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
|
log.debug("[%s] check result: %i active nodes %i contacted",
|
||||||
self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
|
self.key.hex()[:8], len(self.active), len(self.contacted))
|
||||||
|
|
||||||
def _search(self):
|
def _search(self):
|
||||||
self._search_task()
|
self._search_task()
|
||||||
|
@ -272,10 +270,10 @@ class IterativeFinder:
|
||||||
class IterativeNodeFinder(IterativeFinder):
|
class IterativeNodeFinder(IterativeFinder):
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
||||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
||||||
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
|
max_results: typing.Optional[int] = constants.K,
|
||||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||||
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
|
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
||||||
shortlist)
|
shortlist)
|
||||||
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
self.yielded_peers: typing.Set['KademliaPeer'] = set()
|
||||||
|
|
||||||
|
@ -314,10 +312,10 @@ class IterativeNodeFinder(IterativeFinder):
|
||||||
class IterativeValueFinder(IterativeFinder):
|
class IterativeValueFinder(IterativeFinder):
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
|
||||||
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
|
||||||
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
|
max_results: typing.Optional[int] = constants.K,
|
||||||
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
|
||||||
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
|
||||||
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
|
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
|
||||||
shortlist)
|
shortlist)
|
||||||
self.blob_peers: typing.Set['KademliaPeer'] = set()
|
self.blob_peers: typing.Set['KademliaPeer'] = set()
|
||||||
# this tracks the index of the most recent page we requested from each peer
|
# this tracks the index of the most recent page we requested from each peer
|
||||||
|
@ -362,18 +360,12 @@ class IterativeValueFinder(IterativeFinder):
|
||||||
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
|
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
|
||||||
for compact_addr in response.found_compact_addresses]
|
for compact_addr in response.found_compact_addresses]
|
||||||
to_yield = []
|
to_yield = []
|
||||||
self.bottom_out_count = 0
|
|
||||||
for blob_peer in blob_peers:
|
for blob_peer in blob_peers:
|
||||||
if blob_peer not in self.blob_peers:
|
if blob_peer not in self.blob_peers:
|
||||||
self.blob_peers.add(blob_peer)
|
self.blob_peers.add(blob_peer)
|
||||||
to_yield.append(blob_peer)
|
to_yield.append(blob_peer)
|
||||||
if to_yield:
|
if to_yield:
|
||||||
# log.info("found %i new peers for blob", len(to_yield))
|
|
||||||
self.iteration_queue.put_nowait(to_yield)
|
self.iteration_queue.put_nowait(to_yield)
|
||||||
# if self.max_results and len(self.blob_peers) >= self.max_results:
|
|
||||||
# log.info("enough blob peers found")
|
|
||||||
# if not self.finished.is_set():
|
|
||||||
# self.finished.set()
|
|
||||||
|
|
||||||
def get_initial_result(self) -> typing.List['KademliaPeer']:
|
def get_initial_result(self) -> typing.List['KademliaPeer']:
|
||||||
if self.protocol.data_store.has_peers_for_blob(self.key):
|
if self.protocol.data_store.has_peers_for_blob(self.key):
|
||||||
|
|
|
@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@requires(DHT_COMPONENT)
|
@requires(DHT_COMPONENT)
|
||||||
async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None):
|
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
|
||||||
"""
|
"""
|
||||||
Get peers for blob hash
|
Get peers for blob hash
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
peer_list (<blob_hash> | --blob_hash=<blob_hash>)
|
peer_list (<blob_hash> | --blob_hash=<blob_hash>)
|
||||||
[<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
|
|
||||||
[--page=<page>] [--page_size=<page_size>]
|
[--page=<page>] [--page_size=<page_size>]
|
||||||
|
|
||||||
Options:
|
Options:
|
||||||
--blob_hash=<blob_hash> : (str) find available peers for this blob hash
|
--blob_hash=<blob_hash> : (str) find available peers for this blob hash
|
||||||
--search_bottom_out_limit=<search_bottom_out_limit> : (int) the number of search probes in a row
|
|
||||||
that don't find any new peers
|
|
||||||
before giving up and returning
|
|
||||||
--page=<page> : (int) page to return during paginating
|
--page=<page> : (int) page to return during paginating
|
||||||
--page_size=<page_size> : (int) number of items on page during pagination
|
--page_size=<page_size> : (int) number of items on page during pagination
|
||||||
|
|
||||||
|
@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType):
|
||||||
if not is_valid_blobhash(blob_hash):
|
if not is_valid_blobhash(blob_hash):
|
||||||
# TODO: use error from lbry.error
|
# TODO: use error from lbry.error
|
||||||
raise Exception("invalid blob hash")
|
raise Exception("invalid blob hash")
|
||||||
if search_bottom_out_limit is not None:
|
|
||||||
search_bottom_out_limit = int(search_bottom_out_limit)
|
|
||||||
if search_bottom_out_limit <= 0:
|
|
||||||
# TODO: use error from lbry.error
|
|
||||||
raise Exception("invalid bottom out limit")
|
|
||||||
else:
|
|
||||||
search_bottom_out_limit = 4
|
|
||||||
peers = []
|
peers = []
|
||||||
peer_q = asyncio.Queue(loop=self.component_manager.loop)
|
peer_q = asyncio.Queue(loop=self.component_manager.loop)
|
||||||
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
|
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
|
||||||
|
|
Loading…
Add table
Reference in a new issue