From f274562c92171b83a8f0117eba2e821aa4872725 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 25 Jan 2022 17:00:37 -0300 Subject: [PATCH 01/25] dont probe and ignore bad peers --- lbry/dht/protocol/iterative_find.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index f8bd2ba6b..117755797 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -142,6 +142,8 @@ class IterativeFinder: return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) def _add_active(self, peer): + if self.peer_manager.peer_is_good(peer) is False: + return if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: self.active.add(peer) if self._is_closer(peer): @@ -193,6 +195,9 @@ class IterativeFinder: continue if origin_address == (self.protocol.external_ip, self.protocol.udp_port): continue + if self.peer_manager.peer_is_good(peer) is False: + self.active.discard(peer) + continue self._schedule_probe(peer) added += 1 log.debug("running %d probes for key %s", len(self.running_probes), self.key.hex()[:8]) From e319b55db5d5279aeb7bd7120b04da511e2beb40 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 4 Feb 2022 12:44:47 -0300 Subject: [PATCH 02/25] closest peer is only ready when it was contacted and isn't known to be bad --- lbry/dht/protocol/iterative_find.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 117755797..57d39ff7d 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -111,6 +111,12 @@ class IterativeFinder: # seed nodes self._schedule_probe(peer) + @property + def is_closest_peer_ready(self): + if not self.closest_peer or not self.prev_closest_peer: + return False + return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False + async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ Send the rpc request to the peer and return an object with the FindResponse interface @@ -308,7 +314,7 @@ class IterativeNodeFinder(IterativeFinder): # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted), # self.bottom_out_count, self.iteration_count) self.bottom_out_count = 0 - elif self.prev_closest_peer and self.closest_peer: + elif self.is_closest_peer_ready: self.bottom_out_count += 1 log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count) if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: @@ -379,7 +385,7 @@ class IterativeValueFinder(IterativeFinder): # log.info("enough blob peers found") # if not self.finished.is_set(): # self.finished.set() - elif self.prev_closest_peer and self.closest_peer: + elif self.is_closest_peer_ready: self.bottom_out_count += 1 if self.bottom_out_count >= self.bottom_out_limit: log.info("blob peer search bottomed out") From 809a8c1226b5f13dff60c45ca66e43664464020e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 4 Feb 2022 15:38:15 -0300 Subject: [PATCH 03/25] fix distance sorting and improve logging --- lbry/dht/protocol/iterative_find.py | 30 ++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 57d39ff7d..be1e3e8e7 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -150,6 +150,14 @@ class IterativeFinder: def _add_active(self, peer): if self.peer_manager.peer_is_good(peer) is False: return + if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False: + log.debug("[%s] closest peer went bad", self.key.hex()[:8]) + if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False: + log.debug("[%s] previous closest was bad too", self.key.hex()[:8]) + self.closest_peer = self.prev_closest_peer + else: + self.closest_peer = None + self.prev_closest_peer = None if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: self.active.add(peer) if self._is_closer(peer): @@ -166,6 +174,7 @@ class IterativeFinder: log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address, peer.udp_port, address, udp_port) self.check_result_ready(response) + self._log_state() async def _send_probe(self, peer: 'KademliaPeer'): try: @@ -190,7 +199,8 @@ class IterativeFinder: added = 0 to_probe = list(self.active - self.contacted) - to_probe.sort(key=lambda peer: self.distance(self.key)) + to_probe.sort(key=lambda peer: self.distance(peer.node_id)) + log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None) for peer in to_probe: if added >= constants.ALPHA: break @@ -236,6 +246,14 @@ class IterativeFinder: if self.running: self.loop.call_soon(self.aclose) + def _log_state(self): + log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count", + self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count) + if self.closest_peer and self.prev_closest_peer: + log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s", + self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted, + self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8]) + def _search(self): self.tasks.append(self.loop.create_task(self._search_task())) @@ -310,15 +328,13 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") return self.put_result(self.active, finish=True) - if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer): - # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted), - # self.bottom_out_count, self.iteration_count) - self.bottom_out_count = 0 elif self.is_closest_peer_ready: self.bottom_out_count += 1 - log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count) + else: + self.bottom_out_count = 0 + if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.info("limit hit") + log.debug("limit hit") self.put_result(self.active, True) From f5bf8b86840a341f5f67342e86fee0b9c07b352c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 4 Feb 2022 16:43:19 -0300 Subject: [PATCH 04/25] bump split index to 2 --- lbry/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/conf.py b/lbry/conf.py index 234a1709e..15fe5f8b6 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -622,7 +622,7 @@ class Config(CLIConfig): "Routing table bucket index below which we always split the bucket if given a new key to add to it and " "the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) " "will increase. This setting is used by seed nodes, you probably don't want to change it during normal " - "use.", 1 + "use.", 2 ) # protocol timeouts From 4987f5794466145726b24575ddfb635485a69bc6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Feb 2022 14:54:57 -0300 Subject: [PATCH 05/25] add peers from shortlist regardless, but check from other nodes --- lbry/dht/protocol/iterative_find.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index be1e3e8e7..f29f6c048 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -106,7 +106,7 @@ class IterativeFinder: self.delayed_calls: typing.List[asyncio.Handle] = [] for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: - self._add_active(peer) + self._add_active(peer, force=True) else: # seed nodes self._schedule_probe(peer) @@ -147,8 +147,8 @@ class IterativeFinder: def _is_closer(self, peer: 'KademliaPeer') -> bool: return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) - def _add_active(self, peer): - if self.peer_manager.peer_is_good(peer) is False: + def _add_active(self, peer, force=False): + if not force and self.peer_manager.peer_is_good(peer) is False: return if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False: log.debug("[%s] closest peer went bad", self.key.hex()[:8]) @@ -211,9 +211,6 @@ class IterativeFinder: continue if origin_address == (self.protocol.external_ip, self.protocol.udp_port): continue - if self.peer_manager.peer_is_good(peer) is False: - self.active.discard(peer) - continue self._schedule_probe(peer) added += 1 log.debug("running %d probes for key %s", len(self.running_probes), self.key.hex()[:8]) From 6ba8f9651127bcedaeb0a27f824a1fa82415bfb7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Feb 2022 21:46:43 -0300 Subject: [PATCH 06/25] reset closest peer on failure --- lbry/dht/protocol/iterative_find.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index f29f6c048..c2fc10429 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -176,14 +176,22 @@ class IterativeFinder: self.check_result_ready(response) self._log_state() + def _reset_closest(self, peer): + if peer == self.prev_closest_peer: + self.prev_closest_peer = None + if peer == self.closest_peer: + self.closest_peer = self.prev_closest_peer + async def _send_probe(self, peer: 'KademliaPeer'): try: response = await self.send_probe(peer) except asyncio.TimeoutError: + self._reset_closest(peer) self.active.discard(peer) return except ValueError as err: log.warning(str(err)) + self._reset_closest(peer) self.active.discard(peer) return except TransportNotConnected: From 44c4b03d447045ebdb0feef13b9649407e0b4f8e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Feb 2022 21:47:10 -0300 Subject: [PATCH 07/25] only return good (contacted) peers --- lbry/dht/protocol/iterative_find.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index c2fc10429..ad0f03863 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -317,7 +317,7 @@ class IterativeNodeFinder(IterativeFinder): peer for peer in from_iter if peer not in self.yielded_peers and peer.node_id != self.protocol.node_id - and self.peer_manager.peer_is_good(peer) is not False + and self.peer_manager.peer_is_good(peer) is True # return only peers who answered ] not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id)) to_yield = not_yet_yielded[:max(constants.K, self.max_results)] From 2884dba52d4440be7be7f20a919c88aa627dcc0d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Feb 2022 23:13:58 -0300 Subject: [PATCH 08/25] wait until k peers are ready. do not double add peers --- lbry/dht/protocol/iterative_find.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index ad0f03863..31af402c3 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -115,7 +115,15 @@ class IterativeFinder: def is_closest_peer_ready(self): if not self.closest_peer or not self.prev_closest_peer: return False - return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False + return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) + + @property + def are_k_closest_peers_ready(self): + if not self.is_closest_peer_ready: + return False + to_probe = list(self.active) + to_probe.sort(key=lambda peer: self.distance(peer.node_id)) + return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results]) async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ @@ -150,6 +158,8 @@ class IterativeFinder: def _add_active(self, peer, force=False): if not force and self.peer_manager.peer_is_good(peer) is False: return + if peer in self.contacted: + return if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False: log.debug("[%s] closest peer went bad", self.key.hex()[:8]) if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False: @@ -177,6 +187,7 @@ class IterativeFinder: self._log_state() def _reset_closest(self, peer): + self.active.discard(peer) if peer == self.prev_closest_peer: self.prev_closest_peer = None if peer == self.closest_peer: @@ -187,16 +198,15 @@ class IterativeFinder: response = await self.send_probe(peer) except asyncio.TimeoutError: self._reset_closest(peer) - self.active.discard(peer) return except ValueError as err: log.warning(str(err)) self._reset_closest(peer) - self.active.discard(peer) return except TransportNotConnected: return self.aclose() except RemoteException: + self._reset_closest(peer) return return await self._handle_probe_result(peer, response) @@ -338,10 +348,11 @@ class IterativeNodeFinder(IterativeFinder): else: self.bottom_out_count = 0 - if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.debug("limit hit") + if self.are_k_closest_peers_ready: + self.put_result(self.active, True) + elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: + log.info("peer search bottomed out.") self.put_result(self.active, True) - class IterativeValueFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', @@ -408,7 +419,10 @@ class IterativeValueFinder(IterativeFinder): # self.finished.set() elif self.is_closest_peer_ready: self.bottom_out_count += 1 - if self.bottom_out_count >= self.bottom_out_limit: + if self.are_k_closest_peers_ready: + log.info("blob peer search finished.") + self.iteration_queue.put_nowait(None) + elif self.bottom_out_count >= self.bottom_out_limit: log.info("blob peer search bottomed out") self.iteration_queue.put_nowait(None) From 023cfb593adfb60bcc0808b68e0c3aaefad34d7a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 7 Feb 2022 23:58:28 -0300 Subject: [PATCH 09/25] bump bottom out limit of peer search so people can use 100 concurrent announcers --- lbry/dht/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 345662460..635adc2c4 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -216,7 +216,7 @@ class Node: key, bottom_out_limit, max_results, None, shortlist) async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2, - bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None + bottom_out_limit=60, shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] async for iteration_peers in self.get_iterative_node_finder( From 2ed23fbc4bde90ba8514f9556f03e5b9458e52f9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 8 Feb 2022 17:10:33 -0300 Subject: [PATCH 10/25] log bottom out of peer search in debug, show short key id for find value --- lbry/dht/protocol/iterative_find.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 31af402c3..31e6ab56a 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -351,9 +351,10 @@ class IterativeNodeFinder(IterativeFinder): if self.are_k_closest_peers_ready: self.put_result(self.active, True) elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.info("peer search bottomed out.") + log.debug("peer search bottomed out.") self.put_result(self.active, True) + class IterativeValueFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, @@ -420,10 +421,10 @@ class IterativeValueFinder(IterativeFinder): elif self.is_closest_peer_ready: self.bottom_out_count += 1 if self.are_k_closest_peers_ready: - log.info("blob peer search finished.") + log.info("blob peer search finished for %s", self.key.hex()[:8]) self.iteration_queue.put_nowait(None) elif self.bottom_out_count >= self.bottom_out_limit: - log.info("blob peer search bottomed out") + log.info("blob peer search bottomed out for %s", self.key.hex()[:8]) self.iteration_queue.put_nowait(None) def get_initial_result(self) -> typing.List['KademliaPeer']: From b7b8831109c790e5dbb8b4dc80836d5b24a3e75b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 8 Feb 2022 19:57:17 -0300 Subject: [PATCH 11/25] use a dict for the active queue --- lbry/dht/protocol/iterative_find.py | 37 +++++++++++++++++------------ 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 31e6ab56a..62c13cd5d 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -89,7 +89,7 @@ class IterativeFinder: self.max_results = max_results self.exclude = exclude or [] - self.active: typing.Set['KademliaPeer'] = set() + self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) @@ -119,11 +119,12 @@ class IterativeFinder: @property def are_k_closest_peers_ready(self): - if not self.is_closest_peer_ready: + if not self.is_closest_peer_ready or len(self.active) < self.max_results: return False - to_probe = list(self.active) - to_probe.sort(key=lambda peer: self.distance(peer.node_id)) - return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results]) + for peer in list(self.active.keys())[:self.max_results]: + if peer not in self.contacted or not self.peer_manager.peer_is_good(peer): + return False + return True async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ @@ -169,7 +170,8 @@ class IterativeFinder: self.closest_peer = None self.prev_closest_peer = None if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: - self.active.add(peer) + self.active[peer] = self.distance(peer.node_id) + self.active = dict(sorted(self.active.items(), key=lambda item: item[1])) if self._is_closer(peer): self.prev_closest_peer = self.closest_peer self.closest_peer = peer @@ -187,7 +189,8 @@ class IterativeFinder: self._log_state() def _reset_closest(self, peer): - self.active.discard(peer) + if peer in self.active: + del self.active[peer] if peer == self.prev_closest_peer: self.prev_closest_peer = None if peer == self.closest_peer: @@ -216,10 +219,14 @@ class IterativeFinder: """ added = 0 - to_probe = list(self.active - self.contacted) - to_probe.sort(key=lambda peer: self.distance(peer.node_id)) - log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None) - for peer in to_probe: + for index, peer in enumerate(self.active.keys()): + if index == 0: + log.debug("closest to probe: %s", peer.node_id.hex()[:8]) + if self.closest_peer != peer: + self.prev_closest_peer = self.closest_peer + self.closest_peer = peer + if peer in self.contacted: + continue if added >= constants.ALPHA: break origin_address = (peer.address, peer.udp_port) @@ -320,7 +327,7 @@ class IterativeNodeFinder(IterativeFinder): return FindNodeResponse(self.key, response) def search_exhausted(self): - self.put_result(self.active, finish=True) + self.put_result(self.active.keys(), finish=True) def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): not_yet_yielded = [ @@ -342,17 +349,17 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") - return self.put_result(self.active, finish=True) + return self.put_result(self.active.keys(), finish=True) elif self.is_closest_peer_ready: self.bottom_out_count += 1 else: self.bottom_out_count = 0 if self.are_k_closest_peers_ready: - self.put_result(self.active, True) + self.put_result(self.active.keys(), True) elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: log.debug("peer search bottomed out.") - self.put_result(self.active, True) + self.put_result(self.active.keys(), True) class IterativeValueFinder(IterativeFinder): From 6335590b65ef6f13fc3cc1c74d04eb491b295e15 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 8 Feb 2022 19:58:28 -0300 Subject: [PATCH 12/25] don't probe peers too far from the top closest --- lbry/dht/protocol/iterative_find.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 62c13cd5d..e334c3f61 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -86,7 +86,7 @@ class IterativeFinder: self.key = key self.bottom_out_limit = bottom_out_limit - self.max_results = max_results + self.max_results = max(constants.K, max_results) self.exclude = exclude or [] self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted @@ -229,6 +229,8 @@ class IterativeFinder: continue if added >= constants.ALPHA: break + if index > self.max_results: + break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: continue From c45f27d5cc3b5b98e4ae3944e2b6b61f44ed91ee Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 8 Feb 2022 20:00:29 -0300 Subject: [PATCH 13/25] bottoming out is now warning and no results for peer search --- lbry/dht/protocol/iterative_find.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index e334c3f61..dfeb4795a 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -360,8 +360,8 @@ class IterativeNodeFinder(IterativeFinder): if self.are_k_closest_peers_ready: self.put_result(self.active.keys(), True) elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.debug("peer search bottomed out.") - self.put_result(self.active.keys(), True) + log.warning("peer search bottomed out.") + self.put_result([], True) class IterativeValueFinder(IterativeFinder): From dc1c0e6851090a6b3837c1e6fe634966d126d3bd Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 10 Feb 2022 01:48:11 -0300 Subject: [PATCH 14/25] no stop condition, let it exhaust --- lbry/dht/protocol/iterative_find.py | 97 ++++++----------------------- 1 file changed, 18 insertions(+), 79 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index dfeb4795a..2fdc602eb 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -93,17 +93,14 @@ class IterativeFinder: self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) - self.closest_peer: typing.Optional['KademliaPeer'] = None - self.prev_closest_peer: typing.Optional['KademliaPeer'] = None - self.iteration_queue = asyncio.Queue(loop=self.loop) - self.running_probes: typing.Set[asyncio.Task] = set() + self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {} self.iteration_count = 0 self.bottom_out_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] - self.delayed_calls: typing.List[asyncio.Handle] = [] + self.delayed_call: asyncio.Handle = None for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: self._add_active(peer, force=True) @@ -111,21 +108,6 @@ class IterativeFinder: # seed nodes self._schedule_probe(peer) - @property - def is_closest_peer_ready(self): - if not self.closest_peer or not self.prev_closest_peer: - return False - return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) - - @property - def are_k_closest_peers_ready(self): - if not self.is_closest_peer_ready or len(self.active) < self.max_results: - return False - for peer in list(self.active.keys())[:self.max_results]: - if peer not in self.contacted or not self.peer_manager.peer_is_good(peer): - return False - return True - async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ Send the rpc request to the peer and return an object with the FindResponse interface @@ -153,28 +135,14 @@ class IterativeFinder: """ return [] - def _is_closer(self, peer: 'KademliaPeer') -> bool: - return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id) - def _add_active(self, peer, force=False): if not force and self.peer_manager.peer_is_good(peer) is False: return if peer in self.contacted: return - if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False: - log.debug("[%s] closest peer went bad", self.key.hex()[:8]) - if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False: - log.debug("[%s] previous closest was bad too", self.key.hex()[:8]) - self.closest_peer = self.prev_closest_peer - else: - self.closest_peer = None - self.prev_closest_peer = None if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: self.active[peer] = self.distance(peer.node_id) self.active = dict(sorted(self.active.items(), key=lambda item: item[1])) - if self._is_closer(peer): - self.prev_closest_peer = self.closest_peer - self.closest_peer = peer async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) @@ -191,10 +159,6 @@ class IterativeFinder: def _reset_closest(self, peer): if peer in self.active: del self.active[peer] - if peer == self.prev_closest_peer: - self.prev_closest_peer = None - if peer == self.closest_peer: - self.closest_peer = self.prev_closest_peer async def _send_probe(self, peer: 'KademliaPeer'): try: @@ -213,7 +177,7 @@ class IterativeFinder: return return await self._handle_probe_result(peer, response) - async def _search_round(self): + def _search_round(self): """ Send up to constants.alpha (5) probes to closest active peers """ @@ -222,14 +186,11 @@ class IterativeFinder: for index, peer in enumerate(self.active.keys()): if index == 0: log.debug("closest to probe: %s", peer.node_id.hex()[:8]) - if self.closest_peer != peer: - self.prev_closest_peer = self.closest_peer - self.closest_peer = peer if peer in self.contacted: continue - if added >= constants.ALPHA: + if len(self.running_probes) >= constants.ALPHA: break - if index > self.max_results: + if index > (constants.K - 1): break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: @@ -251,21 +212,21 @@ class IterativeFinder: t = self.loop.create_task(self._send_probe(peer)) def callback(_): - self.running_probes.difference_update({ - probe for probe in self.running_probes if probe.done() or probe == t - }) - if not self.running_probes: - self.tasks.append(self.loop.create_task(self._search_task(0.0))) + for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]: + del self.running_probes[peer] + self._search_task(0.0) t.add_done_callback(callback) - self.running_probes.add(t) + self.running_probes[peer] = t - async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): + def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): try: if self.running: - await self._search_round() - if self.running: - self.delayed_calls.append(self.loop.call_later(delay, self._search)) + if self.delayed_call: + self.delayed_call.cancel() # ensure anything scheduled gets cancelled + self._search_round() + #if self.running: + # self.delayed_call = self.loop.call_later(delay, self._search) except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected): if self.running: self.loop.call_soon(self.aclose) @@ -273,13 +234,9 @@ class IterativeFinder: def _log_state(self): log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count", self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count) - if self.closest_peer and self.prev_closest_peer: - log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s", - self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted, - self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8]) def _search(self): - self.tasks.append(self.loop.create_task(self._search_task())) + self._search_task() def __aiter__(self): if self.running: @@ -305,11 +262,11 @@ class IterativeFinder: def aclose(self): self.running = False self.iteration_queue.put_nowait(None) - for task in chain(self.tasks, self.running_probes, self.delayed_calls): + for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])): task.cancel() self.tasks.clear() self.running_probes.clear() - self.delayed_calls.clear() + self.delayed_call = None class IterativeNodeFinder(IterativeFinder): @@ -352,16 +309,6 @@ class IterativeNodeFinder(IterativeFinder): if found: log.debug("found") return self.put_result(self.active.keys(), finish=True) - elif self.is_closest_peer_ready: - self.bottom_out_count += 1 - else: - self.bottom_out_count = 0 - - if self.are_k_closest_peers_ready: - self.put_result(self.active.keys(), True) - elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.warning("peer search bottomed out.") - self.put_result([], True) class IterativeValueFinder(IterativeFinder): @@ -427,14 +374,6 @@ class IterativeValueFinder(IterativeFinder): # log.info("enough blob peers found") # if not self.finished.is_set(): # self.finished.set() - elif self.is_closest_peer_ready: - self.bottom_out_count += 1 - if self.are_k_closest_peers_ready: - log.info("blob peer search finished for %s", self.key.hex()[:8]) - self.iteration_queue.put_nowait(None) - elif self.bottom_out_count >= self.bottom_out_limit: - log.info("blob peer search bottomed out for %s", self.key.hex()[:8]) - self.iteration_queue.put_nowait(None) def get_initial_result(self) -> typing.List['KademliaPeer']: if self.protocol.data_store.has_peers_for_blob(self.key): From dcde0e78e303238b29803aef541153e327a11a3e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 11 Feb 2022 19:45:08 -0300 Subject: [PATCH 15/25] remove all references to bottoming out --- lbry/dht/constants.py | 1 - lbry/dht/node.py | 10 ++++------ lbry/dht/protocol/iterative_find.py | 22 +++++++--------------- lbry/extras/daemon/daemon.py | 13 +------------ 4 files changed, 12 insertions(+), 34 deletions(-) diff --git a/lbry/dht/constants.py b/lbry/dht/constants.py index 07dcec18a..7380ce60a 100644 --- a/lbry/dht/constants.py +++ b/lbry/dht/constants.py @@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300 # 5 minutes CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5 RPC_ID_LENGTH = 20 PROTOCOL_VERSION = 1 -BOTTOM_OUT_LIMIT = 3 MSG_SIZE_LIMIT = 1400 diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 635adc2c4..864edc077 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -202,25 +202,23 @@ class Node: self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls)) def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, - bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT, max_results: int = constants.K) -> IterativeNodeFinder: return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, - key, bottom_out_limit, max_results, None, shortlist) + key, max_results, None, shortlist) def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None, - bottom_out_limit: int = 40, max_results: int = -1) -> IterativeValueFinder: return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol, - key, bottom_out_limit, max_results, None, shortlist) + key, max_results, None, shortlist) async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2, - bottom_out_limit=60, shortlist: typing.Optional[typing.List['KademliaPeer']] = None + shortlist: typing.Optional[typing.List['KademliaPeer']] = None ) -> typing.List['KademliaPeer']: peers = [] async for iteration_peers in self.get_iterative_node_finder( - node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results): + node_id, shortlist=shortlist, max_results=max_results): peers.extend(iteration_peers) distance = Distance(node_id) peers.sort(key=lambda peer: distance(peer.node_id)) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 2fdc602eb..68c04692d 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes, class IterativeFinder: def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, + max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): if len(key) != constants.HASH_LENGTH: @@ -85,7 +85,6 @@ class IterativeFinder: self.protocol = protocol self.key = key - self.bottom_out_limit = bottom_out_limit self.max_results = max(constants.K, max_results) self.exclude = exclude or [] @@ -97,7 +96,6 @@ class IterativeFinder: self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {} self.iteration_count = 0 - self.bottom_out_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] self.delayed_call: asyncio.Handle = None @@ -232,8 +230,8 @@ class IterativeFinder: self.loop.call_soon(self.aclose) def _log_state(self): - log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count", - self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count) + log.debug("[%s] check result: %i active nodes %i contacted", + self.key.hex()[:8], len(self.active), len(self.contacted)) def _search(self): self._search_task() @@ -272,10 +270,10 @@ class IterativeFinder: class IterativeNodeFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, + max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): - super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, + super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude, shortlist) self.yielded_peers: typing.Set['KademliaPeer'] = set() @@ -314,10 +312,10 @@ class IterativeNodeFinder(IterativeFinder): class IterativeValueFinder(IterativeFinder): def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes, - bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K, + max_results: typing.Optional[int] = constants.K, exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None, shortlist: typing.Optional[typing.List['KademliaPeer']] = None): - super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude, + super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude, shortlist) self.blob_peers: typing.Set['KademliaPeer'] = set() # this tracks the index of the most recent page we requested from each peer @@ -362,18 +360,12 @@ class IterativeValueFinder(IterativeFinder): blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr) for compact_addr in response.found_compact_addresses] to_yield = [] - self.bottom_out_count = 0 for blob_peer in blob_peers: if blob_peer not in self.blob_peers: self.blob_peers.add(blob_peer) to_yield.append(blob_peer) if to_yield: - # log.info("found %i new peers for blob", len(to_yield)) self.iteration_queue.put_nowait(to_yield) - # if self.max_results and len(self.blob_peers) >= self.max_results: - # log.info("enough blob peers found") - # if not self.finished.is_set(): - # self.finished.set() def get_initial_result(self) -> typing.List['KademliaPeer']: if self.protocol.data_store.has_peers_for_blob(self.key): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 003ea9cf7..fb25207f7 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType): """ @requires(DHT_COMPONENT) - async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None): + async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None): """ Get peers for blob hash Usage: peer_list ( | --blob_hash=) - [ | --search_bottom_out_limit=] [--page=] [--page_size=] Options: --blob_hash= : (str) find available peers for this blob hash - --search_bottom_out_limit= : (int) the number of search probes in a row - that don't find any new peers - before giving up and returning --page= : (int) page to return during paginating --page_size= : (int) number of items on page during pagination @@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType): if not is_valid_blobhash(blob_hash): # TODO: use error from lbry.error raise Exception("invalid blob hash") - if search_bottom_out_limit is not None: - search_bottom_out_limit = int(search_bottom_out_limit) - if search_bottom_out_limit <= 0: - # TODO: use error from lbry.error - raise Exception("invalid bottom out limit") - else: - search_bottom_out_limit = 4 peers = [] peer_q = asyncio.Queue(loop=self.component_manager.loop) await self.dht_node._peers_for_value_producer(blob_hash, peer_q) From 12f156257e35dd862dad399fbce5e5f6fcf68baa Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 12 Feb 2022 03:28:26 -0300 Subject: [PATCH 16/25] allow running some extra probes for k replacements --- lbry/dht/protocol/iterative_find.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 68c04692d..bbfc93bd6 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -184,11 +184,12 @@ class IterativeFinder: for index, peer in enumerate(self.active.keys()): if index == 0: log.debug("closest to probe: %s", peer.node_id.hex()[:8]) + if peer in self.contacted: continue if len(self.running_probes) >= constants.ALPHA: break - if index > (constants.K - 1): + if index > (constants.K + len(self.running_probes)): break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: From 0120d989d8357b5450b2bd8136028b25c4687e83 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 18 Feb 2022 17:21:37 -0300 Subject: [PATCH 17/25] make timeout handler immune to asyncio time tricks --- lbry/testcase.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lbry/testcase.py b/lbry/testcase.py index b10ea9b27..6214553e5 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -204,7 +204,13 @@ class AsyncioTestCase(unittest.TestCase): def add_timeout(self): if self.TIMEOUT: - self.loop.call_later(self.TIMEOUT, self.cancel) + self.loop.call_later(self.TIMEOUT, self.check_timeout, time()) + + def check_timeout(self, started): + if time() - started >= self.TIMEOUT: + self.cancel() + else: + self.loop.call_later(self.TIMEOUT, self.check_timeout, started) class AdvanceTimeTestCase(AsyncioTestCase): From 51be734a0850c5a60b8048059d36078061af6607 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 18 Feb 2022 18:47:01 -0300 Subject: [PATCH 18/25] add a way to wait announcements to finish so tests are reliable --- lbry/dht/blob_announcer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py index 24cf18bbe..e4da3cfa9 100644 --- a/lbry/dht/blob_announcer.py +++ b/lbry/dht/blob_announcer.py @@ -27,6 +27,7 @@ class BlobAnnouncer: self.storage = storage self.announce_task: asyncio.Task = None self.announce_queue: typing.List[str] = [] + self._done = asyncio.Event() async def _submit_announcement(self, blob_hash): try: @@ -64,6 +65,8 @@ class BlobAnnouncer: if announced: await self.storage.update_last_announced_blobs(announced) log.info("announced %i blobs", len(announced)) + self._done.set() + self._done.clear() def start(self, batch_size: typing.Optional[int] = 10): assert not self.announce_task or self.announce_task.done(), "already running" @@ -72,3 +75,6 @@ class BlobAnnouncer: def stop(self): if self.announce_task and not self.announce_task.done(): self.announce_task.cancel() + + def wait(self): + return self._done.wait() From b574fb7771e3f82bcb0693ccc9fdba84d500f118 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 18 Feb 2022 18:51:08 -0300 Subject: [PATCH 19/25] better representation of kademliapeer on debug logs --- lbry/dht/peer.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index 495f71eff..c5a9c9e84 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -190,3 +190,6 @@ class KademliaPeer: def compact_ip(self): return make_compact_ip(self.address) + + def __str__(self): + return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})" From 586b09c1bc87148b9ba63a8cb1dc87693c6194e0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 18 Feb 2022 18:52:17 -0300 Subject: [PATCH 20/25] simplify dht mock and restore clock after accelerating --- tests/dht_mocks.py | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/tests/dht_mocks.py b/tests/dht_mocks.py index 2e01986c0..4bebcfaf1 100644 --- a/tests/dht_mocks.py +++ b/tests/dht_mocks.py @@ -9,7 +9,7 @@ if typing.TYPE_CHECKING: def get_time_accelerator(loop: asyncio.AbstractEventLoop, - now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]: + instant_step: bool = False) -> typing.Callable[[float], typing.Awaitable[None]]: """ Returns an async advance() function @@ -17,32 +17,22 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop, made by call_later, call_at, and call_soon. """ - _time = now or loop.time() - loop.time = functools.wraps(loop.time)(lambda: _time) + original = loop.time + _drift = 0 + loop.time = functools.wraps(loop.time)(lambda: original() + _drift) async def accelerate_time(seconds: float) -> None: - nonlocal _time + nonlocal _drift if seconds < 0: raise ValueError(f'Cannot go back in time ({seconds} seconds)') - _time += seconds - await past_events() + _drift += seconds await asyncio.sleep(0) - async def past_events() -> None: - while loop._scheduled: - timer: asyncio.TimerHandle = loop._scheduled[0] - if timer not in loop._ready and timer._when <= _time: - loop._scheduled.remove(timer) - loop._ready.append(timer) - if timer._when > _time: - break - await asyncio.sleep(0) - async def accelerator(seconds: float): - steps = seconds * 10.0 + steps = seconds * 10.0 if not instant_step else 1 for _ in range(max(int(steps), 1)): - await accelerate_time(0.1) + await accelerate_time(seconds/steps) return accelerator From cc104369cb0d39652608e05aa7c6c1980e3b894f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 18 Feb 2022 18:53:10 -0300 Subject: [PATCH 21/25] fix and enable test_blob_announcer --- tests/unit/dht/test_blob_announcer.py | 135 +++++++++++++++----------- tests/unit/dht/test_node.py | 4 +- 2 files changed, 78 insertions(+), 61 deletions(-) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index 5c7d921aa..d5b2c5e17 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -1,60 +1,70 @@ import contextlib +import logging import typing import binascii import socket import asyncio + from lbry.testcase import AsyncioTestCase from tests import dht_mocks +from lbry.dht.protocol.distance import Distance from lbry.conf import Config from lbry.dht import constants from lbry.dht.node import Node from lbry.dht.peer import PeerManager, make_kademlia_peer from lbry.dht.blob_announcer import BlobAnnouncer from lbry.extras.daemon.storage import SQLiteStorage -from unittest import skip + class TestBlobAnnouncer(AsyncioTestCase): + TIMEOUT = 20.0 # lower than default + async def setup_node(self, peer_addresses, address, node_id): self.nodes: typing.Dict[int, Node] = {} - self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time()) + self.advance = dht_mocks.get_time_accelerator(self.loop) + self.instant_advance = dht_mocks.get_time_accelerator(self.loop) self.conf = Config() - self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time) - await self.storage.open() self.peer_manager = PeerManager(self.loop) self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address) await self.node.start_listening(address) - self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) - for node_id, address in peer_addresses: - await self.add_peer(node_id, address) + await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses]) + for first_peer in self.nodes.values(): + for second_peer in self.nodes.values(): + if first_peer == second_peer: + continue + self.add_peer_to_routing_table(first_peer, second_peer) + self.add_peer_to_routing_table(second_peer, first_peer) + await self.advance(0.1) # just to make pings go through self.node.joined.set() self.node._refresh_task = self.loop.create_task(self.node.refresh_node()) + self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time) + await self.storage.open() + self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage) async def add_peer(self, node_id, address, add_to_routing_table=True): + #print('add', node_id.hex()[:8], address) n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address) await n.start_listening(address) self.nodes.update({len(self.nodes): n}) if add_to_routing_table: - self.node.protocol.add_peer( - make_kademlia_peer( - n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port - ) + self.add_peer_to_routing_table(self.node, n) + + def add_peer_to_routing_table(self, adder, being_added): + adder.protocol.add_peer( + make_kademlia_peer( + being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port ) + ) @contextlib.asynccontextmanager - async def _test_network_context(self, peer_addresses=None): - self.peer_addresses = peer_addresses or [ - (constants.generate_id(2), '1.2.3.2'), - (constants.generate_id(3), '1.2.3.3'), - (constants.generate_id(4), '1.2.3.4'), - (constants.generate_id(5), '1.2.3.5'), - (constants.generate_id(6), '1.2.3.6'), - (constants.generate_id(7), '1.2.3.7'), - (constants.generate_id(8), '1.2.3.8'), - (constants.generate_id(9), '1.2.3.9'), + async def _test_network_context(self, peer_count=200): + self.peer_addresses = [ + (constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big'))) + for i in range(1, peer_count + 1) ] try: with dht_mocks.mock_network_loop(self.loop): - await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1)) + await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000)) yield finally: self.blob_announcer.stop() @@ -73,43 +83,58 @@ class TestBlobAnnouncer(AsyncioTestCase): ) ) await peer.ping() - return peer + return last_node - @skip("Something from a previous test is leaking into this test and causing it to fail intermittently") async def test_announce_blobs(self): blob1 = binascii.hexlify(b'1' * 48).decode() blob2 = binascii.hexlify(b'2' * 48).decode() - async with self._test_network_context(): - await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True) - await self.storage.db.execute( - "update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)", - (blob1, blob2) - ) + async with self._test_network_context(peer_count=100): + await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True) + await self.storage.add_blobs( + *((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)), + finished=True) + await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1") to_announce = await self.storage.get_blobs_to_announce() - self.assertEqual(2, len(to_announce)) - self.blob_announcer.start(batch_size=1) # so it covers batching logic + self.assertEqual(92, len(to_announce)) + self.blob_announcer.start(batch_size=10) # so it covers batching logic # takes 60 seconds to start, but we advance 120 to ensure it processed all batches - await self.advance(60.0 * 2) + ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait()) + await self.instant_advance(60.0) + await ongoing_announcements to_announce = await self.storage.get_blobs_to_announce() self.assertEqual(0, len(to_announce)) self.blob_announcer.stop() + # as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI + tolerance = 0.8 # at least 80% of the announcements are within the top K + for blob in await self.storage.get_all_blob_hashes(): + distance = Distance(bytes.fromhex(blob)) + candidates = list(self.nodes.values()) + candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id)) + has_it = 0 + for index, node in enumerate(candidates[:constants.K], start=1): + if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)): + has_it += 1 + else: + logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8]) + self.assertGreaterEqual(has_it, int(tolerance * constants.K)) + + # test that we can route from a poorly connected peer all the way to the announced blob - await self.chain_peer(constants.generate_id(10), '1.2.3.10') - await self.chain_peer(constants.generate_id(11), '1.2.3.11') - await self.chain_peer(constants.generate_id(12), '1.2.3.12') - await self.chain_peer(constants.generate_id(13), '1.2.3.13') - await self.chain_peer(constants.generate_id(14), '1.2.3.14') - await self.advance(61.0) + current = len(self.nodes) + await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10') + await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11') + await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12') + await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13') + last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14') - last = self.nodes[len(self.nodes) - 1] search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop) search_q.put_nowait(blob1) _, task = last.accumulate_peers(search_q, peer_q) - found_peers = await peer_q.get() + found_peers = await asyncio.wait_for(peer_q.get(), 1.0) task.cancel() self.assertEqual(1, len(found_peers)) @@ -119,21 +144,13 @@ class TestBlobAnnouncer(AsyncioTestCase): async def test_popular_blob(self): peer_count = 150 - addresses = [ - (constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big'))) - for i in range(peer_count) - ] - blob_hash = b'1' * 48 + blob_hash = constants.generate_id(99999) - async with self._test_network_context(peer_addresses=addresses): + async with self._test_network_context(peer_count=peer_count): total_seen = set() - announced_to = self.nodes[0] - for i in range(1, peer_count): - node = self.nodes[i] - kad_peer = make_kademlia_peer( - node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port - ) - await announced_to.protocol._add_peer(kad_peer) + announced_to = self.nodes.pop(0) + for i, node in enumerate(self.nodes.values()): + self.add_peer_to_routing_table(announced_to, node) peer = node.protocol.get_rpc_peer( make_kademlia_peer( announced_to.protocol.node_id, @@ -144,15 +161,15 @@ class TestBlobAnnouncer(AsyncioTestCase): response = await peer.store(blob_hash) self.assertEqual(response, b'OK') peers_for_blob = await peer.find_value(blob_hash, 0) - if i == 1: + if i == 0: self.assertNotIn(blob_hash, peers_for_blob) self.assertEqual(peers_for_blob[b'p'], 0) else: - self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K)) - self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i) + self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K)) + self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1) if i - 1 > constants.K: self.assertEqual(len(peers_for_blob[b'contacts']), constants.K) - self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1) + self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1) seen = set(peers_for_blob[blob_hash]) self.assertEqual(len(seen), constants.K) self.assertEqual(len(peers_for_blob[blob_hash]), len(seen)) @@ -167,5 +184,5 @@ class TestBlobAnnouncer(AsyncioTestCase): seen.intersection_update(page_x_set) total_seen.update(page_x_set) else: - self.assertEqual(len(peers_for_blob[b'contacts']), i - 1) + self.assertEqual(len(peers_for_blob[b'contacts']), 8) # we always add 8 on first page self.assertEqual(len(total_seen), peer_count - 2) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index c862305ec..fcf65ff10 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -29,7 +29,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase): (constants.generate_id(9), '1.2.3.9'), ] with dht_mocks.mock_network_loop(loop): - advance = dht_mocks.get_time_accelerator(loop, loop.time()) + advance = dht_mocks.get_time_accelerator(loop) # start the nodes nodes: typing.Dict[int, Node] = { i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address) @@ -131,7 +131,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase): await asyncio.gather(*[n.joined.wait() for n in nodes]) node = nodes[-1] - advance = dht_mocks.get_time_accelerator(loop, loop.time()) + advance = dht_mocks.get_time_accelerator(loop) await advance(500) # Join the network, assert that at least the known peers are in RT From 3e79dcd17975e3fe8366bf08dab1c99b8ae07822 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 18 Feb 2022 19:10:11 -0300 Subject: [PATCH 22/25] timeout is now supported on dht tests --- tests/unit/dht/test_blob_announcer.py | 2 -- tests/unit/dht/test_node.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py index d5b2c5e17..be445aae7 100644 --- a/tests/unit/dht/test_blob_announcer.py +++ b/tests/unit/dht/test_blob_announcer.py @@ -17,8 +17,6 @@ from lbry.extras.daemon.storage import SQLiteStorage class TestBlobAnnouncer(AsyncioTestCase): - TIMEOUT = 20.0 # lower than default - async def setup_node(self, peer_addresses, address, node_id): self.nodes: typing.Dict[int, Node] = {} self.advance = dht_mocks.get_time_accelerator(self.loop) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index fcf65ff10..5ecad5181 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -12,7 +12,6 @@ from lbry.extras.daemon.storage import SQLiteStorage class TestNodePingQueueDiscover(AsyncioTestCase): - TIMEOUT = None # not supported as it advances time async def test_ping_queue_discover(self): loop = asyncio.get_event_loop() loop.set_debug(False) @@ -93,7 +92,6 @@ class TestNodePingQueueDiscover(AsyncioTestCase): class TestTemporarilyLosingConnection(AsyncioTestCase): - TIMEOUT = None # not supported as it advances time @unittest.SkipTest async def test_losing_connection(self): async def wait_for(check_ok, insist, timeout=20): From c1e64df528fcdfacdd8d62001273083a66f589f6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 19 Feb 2022 02:30:31 -0300 Subject: [PATCH 23/25] remove unused search rounds --- lbry/dht/protocol/iterative_find.py | 28 +++++----------------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index bbfc93bd6..409efcdb4 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -98,7 +98,6 @@ class IterativeFinder: self.iteration_count = 0 self.running = False self.tasks: typing.List[asyncio.Task] = [] - self.delayed_call: asyncio.Handle = None for peer in get_shortlist(routing_table, key, shortlist): if peer.node_id: self._add_active(peer, force=True) @@ -184,7 +183,6 @@ class IterativeFinder: for index, peer in enumerate(self.active.keys()): if index == 0: log.debug("closest to probe: %s", peer.node_id.hex()[:8]) - if peer in self.contacted: continue if len(self.running_probes) >= constants.ALPHA: @@ -211,37 +209,22 @@ class IterativeFinder: t = self.loop.create_task(self._send_probe(peer)) def callback(_): - for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]: - del self.running_probes[peer] - self._search_task(0.0) + self.running_probes.pop(peer, None) + if self.running: + self._search_round() t.add_done_callback(callback) self.running_probes[peer] = t - def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY): - try: - if self.running: - if self.delayed_call: - self.delayed_call.cancel() # ensure anything scheduled gets cancelled - self._search_round() - #if self.running: - # self.delayed_call = self.loop.call_later(delay, self._search) - except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected): - if self.running: - self.loop.call_soon(self.aclose) - def _log_state(self): log.debug("[%s] check result: %i active nodes %i contacted", self.key.hex()[:8], len(self.active), len(self.contacted)) - def _search(self): - self._search_task() - def __aiter__(self): if self.running: raise Exception("already running") self.running = True - self._search() + self.loop.call_soon(self._search_round) return self async def __anext__(self) -> typing.List['KademliaPeer']: @@ -261,11 +244,10 @@ class IterativeFinder: def aclose(self): self.running = False self.iteration_queue.put_nowait(None) - for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])): + for task in chain(self.tasks, self.running_probes.values()): task.cancel() self.tasks.clear() self.running_probes.clear() - self.delayed_call = None class IterativeNodeFinder(IterativeFinder): From d00b5befbf9dd36ed403c4a6625f75d4ac79fdea Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 19 Feb 2022 02:32:12 -0300 Subject: [PATCH 24/25] make active an explicit ordered dict --- lbry/dht/protocol/iterative_find.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 409efcdb4..ab89edddc 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -1,6 +1,6 @@ import asyncio from itertools import chain -from collections import defaultdict +from collections import defaultdict, OrderedDict import typing import logging from typing import TYPE_CHECKING @@ -88,7 +88,7 @@ class IterativeFinder: self.max_results = max(constants.K, max_results) self.exclude = exclude or [] - self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted + self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) @@ -139,7 +139,7 @@ class IterativeFinder: return if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: self.active[peer] = self.distance(peer.node_id) - self.active = dict(sorted(self.active.items(), key=lambda item: item[1])) + self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1])) async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse): self._add_active(peer) From f5b3e9bacd2afed72a62e2faa98bca8901954959 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 15 Feb 2022 00:55:36 -0300 Subject: [PATCH 25/25] implement announcer as a consumer task on gather --- lbry/dht/blob_announcer.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py index e4da3cfa9..9629e06b6 100644 --- a/lbry/dht/blob_announcer.py +++ b/lbry/dht/blob_announcer.py @@ -28,21 +28,23 @@ class BlobAnnouncer: self.announce_task: asyncio.Task = None self.announce_queue: typing.List[str] = [] self._done = asyncio.Event() + self.announced = set() - async def _submit_announcement(self, blob_hash): - try: - - peers = len(await self.node.announce_blob(blob_hash)) - self.announcements_sent_metric.labels(peers=peers, error=False).inc() - if peers > 4: - return blob_hash - else: - log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) - except Exception as err: - self.announcements_sent_metric.labels(peers=0, error=True).inc() - if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 - raise err - log.warning("error announcing %s: %s", blob_hash[:8], str(err)) + async def _run_consumer(self): + while self.announce_queue: + try: + blob_hash = self.announce_queue.pop() + peers = len(await self.node.announce_blob(blob_hash)) + self.announcements_sent_metric.labels(peers=peers, error=False).inc() + if peers > 4: + self.announced.add(blob_hash) + else: + log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers) + except Exception as err: + self.announcements_sent_metric.labels(peers=0, error=True).inc() + if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 + raise err + log.warning("error announcing %s: %s", blob_hash[:8], str(err)) async def _announce(self, batch_size: typing.Optional[int] = 10): while batch_size: @@ -57,14 +59,12 @@ class BlobAnnouncer: log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue)) while len(self.announce_queue) > 0: log.info("%i blobs to announce", len(self.announce_queue)) - announced = await asyncio.gather(*[ - self._submit_announcement( - self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue - ], loop=self.loop) - announced = list(filter(None, announced)) + await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop) + announced = list(filter(None, self.announced)) if announced: await self.storage.update_last_announced_blobs(announced) log.info("announced %i blobs", len(announced)) + self.announced.clear() self._done.set() self._done.clear()