From 53d78e9194a5024d376661cdf2002c787cabdc2d Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 11 Feb 2022 16:18:14 -0300
Subject: [PATCH 01/32] check that the stored blob is at least 1 prefix byte
 close to peer id

---
 lbry/extras/daemon/components.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index 780b7e5d5..662a1762b 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -413,7 +413,8 @@ class BackgroundDownloaderComponent(Component):
             self.space_available = await self.space_manager.get_free_space_mb(True)
             if not self.is_busy and self.space_available > 10:
                 blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if
-                                 key.hex() not in self.blob_manager.completed_blob_hashes), None)
+                                 key[0] == self.dht_node.protocol.node_id[0]
+                                 and key.hex() not in self.blob_manager.completed_blob_hashes), None)
                 if blob_hash:
                     self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
             await asyncio.sleep(self.download_loop_delay_seconds)

From d1bc981b11cf3368793cb6ca7d1c83fcc561dddf Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Sat, 19 Feb 2022 03:09:29 -0300
Subject: [PATCH 02/32] extract min_prefix_colliding_bits to a contanst

---
 lbry/extras/daemon/components.py | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index 662a1762b..96b6646cf 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -381,6 +381,7 @@ class FileManagerComponent(Component):
 
 
 class BackgroundDownloaderComponent(Component):
+    MIN_PREFIX_COLLIDING_BITS = 8
     component_name = BACKGROUND_DOWNLOADER_COMPONENT
     depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
 
@@ -412,11 +413,14 @@ class BackgroundDownloaderComponent(Component):
         while True:
             self.space_available = await self.space_manager.get_free_space_mb(True)
             if not self.is_busy and self.space_available > 10:
-                blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if
-                                 key[0] == self.dht_node.protocol.node_id[0]
-                                 and key.hex() not in self.blob_manager.completed_blob_hashes), None)
-                if blob_hash:
-                    self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
+                node_id_prefix = int.from_bytes(self.dht_node.protocol.node_id[:4], "big")
+                for hash in self.dht_node.stored_blob_hashes:
+                    colliding_bits = 16 - int(node_id_prefix ^ int.from_bytes(hash[:4], "big")).bit_length()
+                    if hash.hex() in self.blob_manager.completed_blob_hashes:
+                        continue
+                    if colliding_bits >= self.MIN_PREFIX_COLLIDING_BITS:
+                        self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(hash))
+                        break
             await asyncio.sleep(self.download_loop_delay_seconds)
 
     async def start(self):

From f274562c92171b83a8f0117eba2e821aa4872725 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 25 Jan 2022 17:00:37 -0300
Subject: [PATCH 03/32] dont probe and ignore bad peers

---
 lbry/dht/protocol/iterative_find.py | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index f8bd2ba6b..117755797 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -142,6 +142,8 @@ class IterativeFinder:
         return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
 
     def _add_active(self, peer):
+        if self.peer_manager.peer_is_good(peer) is False:
+            return
         if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
             self.active.add(peer)
             if self._is_closer(peer):
@@ -193,6 +195,9 @@ class IterativeFinder:
                 continue
             if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
                 continue
+            if self.peer_manager.peer_is_good(peer) is False:
+                self.active.discard(peer)
+                continue
             self._schedule_probe(peer)
             added += 1
         log.debug("running %d probes for key %s", len(self.running_probes), self.key.hex()[:8])

From e319b55db5d5279aeb7bd7120b04da511e2beb40 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 4 Feb 2022 12:44:47 -0300
Subject: [PATCH 04/32] closest peer is only ready when it was contacted and
 isn't known to be bad

---
 lbry/dht/protocol/iterative_find.py | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 117755797..57d39ff7d 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -111,6 +111,12 @@ class IterativeFinder:
                 # seed nodes
                 self._schedule_probe(peer)
 
+    @property
+    def is_closest_peer_ready(self):
+        if not self.closest_peer or not self.prev_closest_peer:
+            return False
+        return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False
+
     async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
         """
         Send the rpc request to the peer and return an object with the FindResponse interface
@@ -308,7 +314,7 @@ class IterativeNodeFinder(IterativeFinder):
             # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
             #          self.bottom_out_count, self.iteration_count)
             self.bottom_out_count = 0
-        elif self.prev_closest_peer and self.closest_peer:
+        elif self.is_closest_peer_ready:
             self.bottom_out_count += 1
             log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
         if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
@@ -379,7 +385,7 @@ class IterativeValueFinder(IterativeFinder):
                 #     log.info("enough blob peers found")
                 #     if not self.finished.is_set():
                 #         self.finished.set()
-        elif self.prev_closest_peer and self.closest_peer:
+        elif self.is_closest_peer_ready:
             self.bottom_out_count += 1
             if self.bottom_out_count >= self.bottom_out_limit:
                 log.info("blob peer search bottomed out")

From 809a8c1226b5f13dff60c45ca66e43664464020e Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 4 Feb 2022 15:38:15 -0300
Subject: [PATCH 05/32] fix distance sorting and improve logging

---
 lbry/dht/protocol/iterative_find.py | 30 ++++++++++++++++++++++-------
 1 file changed, 23 insertions(+), 7 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 57d39ff7d..be1e3e8e7 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -150,6 +150,14 @@ class IterativeFinder:
     def _add_active(self, peer):
         if self.peer_manager.peer_is_good(peer) is False:
             return
+        if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
+            log.debug("[%s] closest peer went bad", self.key.hex()[:8])
+            if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
+                log.debug("[%s] previous closest was bad too", self.key.hex()[:8])
+                self.closest_peer = self.prev_closest_peer
+            else:
+                self.closest_peer = None
+            self.prev_closest_peer = None
         if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
             self.active.add(peer)
             if self._is_closer(peer):
@@ -166,6 +174,7 @@ class IterativeFinder:
                 log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
                             peer.udp_port, address, udp_port)
         self.check_result_ready(response)
+        self._log_state()
 
     async def _send_probe(self, peer: 'KademliaPeer'):
         try:
@@ -190,7 +199,8 @@ class IterativeFinder:
 
         added = 0
         to_probe = list(self.active - self.contacted)
-        to_probe.sort(key=lambda peer: self.distance(self.key))
+        to_probe.sort(key=lambda peer: self.distance(peer.node_id))
+        log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
         for peer in to_probe:
             if added >= constants.ALPHA:
                 break
@@ -236,6 +246,14 @@ class IterativeFinder:
             if self.running:
                 self.loop.call_soon(self.aclose)
 
+    def _log_state(self):
+        log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
+                  self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
+        if self.closest_peer and self.prev_closest_peer:
+            log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s",
+                      self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted,
+                      self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8])
+
     def _search(self):
         self.tasks.append(self.loop.create_task(self._search_task()))
 
@@ -310,15 +328,13 @@ class IterativeNodeFinder(IterativeFinder):
         if found:
             log.debug("found")
             return self.put_result(self.active, finish=True)
-        if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
-            # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
-            #          self.bottom_out_count, self.iteration_count)
-            self.bottom_out_count = 0
         elif self.is_closest_peer_ready:
             self.bottom_out_count += 1
-            log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
+        else:
+            self.bottom_out_count = 0
+
         if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
-            log.info("limit hit")
+            log.debug("limit hit")
             self.put_result(self.active, True)
 
 

From f5bf8b86840a341f5f67342e86fee0b9c07b352c Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 4 Feb 2022 16:43:19 -0300
Subject: [PATCH 06/32] bump split index to 2

---
 lbry/conf.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lbry/conf.py b/lbry/conf.py
index 234a1709e..15fe5f8b6 100644
--- a/lbry/conf.py
+++ b/lbry/conf.py
@@ -622,7 +622,7 @@ class Config(CLIConfig):
         "Routing table bucket index below which we always split the bucket if given a new key to add to it and "
         "the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
         "will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
-        "use.", 1
+        "use.", 2
     )
 
     # protocol timeouts

From 4987f5794466145726b24575ddfb635485a69bc6 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Mon, 7 Feb 2022 14:54:57 -0300
Subject: [PATCH 07/32] add peers from shortlist regardless, but check from
 other nodes

---
 lbry/dht/protocol/iterative_find.py | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index be1e3e8e7..f29f6c048 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -106,7 +106,7 @@ class IterativeFinder:
         self.delayed_calls: typing.List[asyncio.Handle] = []
         for peer in get_shortlist(routing_table, key, shortlist):
             if peer.node_id:
-                self._add_active(peer)
+                self._add_active(peer, force=True)
             else:
                 # seed nodes
                 self._schedule_probe(peer)
@@ -147,8 +147,8 @@ class IterativeFinder:
     def _is_closer(self, peer: 'KademliaPeer') -> bool:
         return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
 
-    def _add_active(self, peer):
-        if self.peer_manager.peer_is_good(peer) is False:
+    def _add_active(self, peer, force=False):
+        if not force and self.peer_manager.peer_is_good(peer) is False:
             return
         if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
             log.debug("[%s] closest peer went bad", self.key.hex()[:8])
@@ -211,9 +211,6 @@ class IterativeFinder:
                 continue
             if origin_address == (self.protocol.external_ip, self.protocol.udp_port):
                 continue
-            if self.peer_manager.peer_is_good(peer) is False:
-                self.active.discard(peer)
-                continue
             self._schedule_probe(peer)
             added += 1
         log.debug("running %d probes for key %s", len(self.running_probes), self.key.hex()[:8])

From 6ba8f9651127bcedaeb0a27f824a1fa82415bfb7 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Mon, 7 Feb 2022 21:46:43 -0300
Subject: [PATCH 08/32] reset closest peer on failure

---
 lbry/dht/protocol/iterative_find.py | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index f29f6c048..c2fc10429 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -176,14 +176,22 @@ class IterativeFinder:
         self.check_result_ready(response)
         self._log_state()
 
+    def _reset_closest(self, peer):
+        if peer == self.prev_closest_peer:
+            self.prev_closest_peer = None
+        if peer == self.closest_peer:
+            self.closest_peer = self.prev_closest_peer
+
     async def _send_probe(self, peer: 'KademliaPeer'):
         try:
             response = await self.send_probe(peer)
         except asyncio.TimeoutError:
+            self._reset_closest(peer)
             self.active.discard(peer)
             return
         except ValueError as err:
             log.warning(str(err))
+            self._reset_closest(peer)
             self.active.discard(peer)
             return
         except TransportNotConnected:

From 44c4b03d447045ebdb0feef13b9649407e0b4f8e Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Mon, 7 Feb 2022 21:47:10 -0300
Subject: [PATCH 09/32] only return good (contacted) peers

---
 lbry/dht/protocol/iterative_find.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index c2fc10429..ad0f03863 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -317,7 +317,7 @@ class IterativeNodeFinder(IterativeFinder):
             peer for peer in from_iter
             if peer not in self.yielded_peers
             and peer.node_id != self.protocol.node_id
-            and self.peer_manager.peer_is_good(peer) is not False
+            and self.peer_manager.peer_is_good(peer) is True  # return only peers who answered
         ]
         not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
         to_yield = not_yet_yielded[:max(constants.K, self.max_results)]

From 2884dba52d4440be7be7f20a919c88aa627dcc0d Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Mon, 7 Feb 2022 23:13:58 -0300
Subject: [PATCH 10/32] wait until k peers are ready. do not double add peers

---
 lbry/dht/protocol/iterative_find.py | 28 +++++++++++++++++++++-------
 1 file changed, 21 insertions(+), 7 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index ad0f03863..31af402c3 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -115,7 +115,15 @@ class IterativeFinder:
     def is_closest_peer_ready(self):
         if not self.closest_peer or not self.prev_closest_peer:
             return False
-        return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer) is not False
+        return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer)
+
+    @property
+    def are_k_closest_peers_ready(self):
+        if not self.is_closest_peer_ready:
+            return False
+        to_probe = list(self.active)
+        to_probe.sort(key=lambda peer: self.distance(peer.node_id))
+        return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])
 
     async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
         """
@@ -150,6 +158,8 @@ class IterativeFinder:
     def _add_active(self, peer, force=False):
         if not force and self.peer_manager.peer_is_good(peer) is False:
             return
+        if peer in self.contacted:
+            return
         if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
             log.debug("[%s] closest peer went bad", self.key.hex()[:8])
             if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
@@ -177,6 +187,7 @@ class IterativeFinder:
         self._log_state()
 
     def _reset_closest(self, peer):
+        self.active.discard(peer)
         if peer == self.prev_closest_peer:
             self.prev_closest_peer = None
         if peer == self.closest_peer:
@@ -187,16 +198,15 @@ class IterativeFinder:
             response = await self.send_probe(peer)
         except asyncio.TimeoutError:
             self._reset_closest(peer)
-            self.active.discard(peer)
             return
         except ValueError as err:
             log.warning(str(err))
             self._reset_closest(peer)
-            self.active.discard(peer)
             return
         except TransportNotConnected:
             return self.aclose()
         except RemoteException:
+            self._reset_closest(peer)
             return
         return await self._handle_probe_result(peer, response)
 
@@ -338,10 +348,11 @@ class IterativeNodeFinder(IterativeFinder):
         else:
             self.bottom_out_count = 0
 
-        if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
-            log.debug("limit hit")
+        if self.are_k_closest_peers_ready:
+            self.put_result(self.active, True)
+        elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
+            log.info("peer search bottomed out.")
             self.put_result(self.active, True)
-
 
 class IterativeValueFinder(IterativeFinder):
     def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
@@ -408,7 +419,10 @@ class IterativeValueFinder(IterativeFinder):
                 #         self.finished.set()
         elif self.is_closest_peer_ready:
             self.bottom_out_count += 1
-            if self.bottom_out_count >= self.bottom_out_limit:
+            if self.are_k_closest_peers_ready:
+                log.info("blob peer search finished.")
+                self.iteration_queue.put_nowait(None)
+            elif self.bottom_out_count >= self.bottom_out_limit:
                 log.info("blob peer search bottomed out")
                 self.iteration_queue.put_nowait(None)
 

From 023cfb593adfb60bcc0808b68e0c3aaefad34d7a Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Mon, 7 Feb 2022 23:58:28 -0300
Subject: [PATCH 11/32] bump bottom out limit of peer search so people can use
 100 concurrent announcers

---
 lbry/dht/node.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lbry/dht/node.py b/lbry/dht/node.py
index 345662460..635adc2c4 100644
--- a/lbry/dht/node.py
+++ b/lbry/dht/node.py
@@ -216,7 +216,7 @@ class Node:
                                     key, bottom_out_limit, max_results, None, shortlist)
 
     async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
-                          bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
+                          bottom_out_limit=60, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
                           ) -> typing.List['KademliaPeer']:
         peers = []
         async for iteration_peers in self.get_iterative_node_finder(

From 2ed23fbc4bde90ba8514f9556f03e5b9458e52f9 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 8 Feb 2022 17:10:33 -0300
Subject: [PATCH 12/32] log bottom out of peer search in debug, show short key
 id for find value

---
 lbry/dht/protocol/iterative_find.py | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 31af402c3..31e6ab56a 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -351,9 +351,10 @@ class IterativeNodeFinder(IterativeFinder):
         if self.are_k_closest_peers_ready:
             self.put_result(self.active, True)
         elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
-            log.info("peer search bottomed out.")
+            log.debug("peer search bottomed out.")
             self.put_result(self.active, True)
 
+
 class IterativeValueFinder(IterativeFinder):
     def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
                  routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
@@ -420,10 +421,10 @@ class IterativeValueFinder(IterativeFinder):
         elif self.is_closest_peer_ready:
             self.bottom_out_count += 1
             if self.are_k_closest_peers_ready:
-                log.info("blob peer search finished.")
+                log.info("blob peer search finished for %s", self.key.hex()[:8])
                 self.iteration_queue.put_nowait(None)
             elif self.bottom_out_count >= self.bottom_out_limit:
-                log.info("blob peer search bottomed out")
+                log.info("blob peer search bottomed out for %s", self.key.hex()[:8])
                 self.iteration_queue.put_nowait(None)
 
     def get_initial_result(self) -> typing.List['KademliaPeer']:

From b7b8831109c790e5dbb8b4dc80836d5b24a3e75b Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 8 Feb 2022 19:57:17 -0300
Subject: [PATCH 13/32] use a dict for the active queue

---
 lbry/dht/protocol/iterative_find.py | 37 +++++++++++++++++------------
 1 file changed, 22 insertions(+), 15 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 31e6ab56a..62c13cd5d 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -89,7 +89,7 @@ class IterativeFinder:
         self.max_results = max_results
         self.exclude = exclude or []
 
-        self.active: typing.Set['KademliaPeer'] = set()
+        self.active: typing.Dict['KademliaPeer', int] = {}  # peer: distance, sorted
         self.contacted: typing.Set['KademliaPeer'] = set()
         self.distance = Distance(key)
 
@@ -119,11 +119,12 @@ class IterativeFinder:
 
     @property
     def are_k_closest_peers_ready(self):
-        if not self.is_closest_peer_ready:
+        if not self.is_closest_peer_ready or len(self.active) < self.max_results:
             return False
-        to_probe = list(self.active)
-        to_probe.sort(key=lambda peer: self.distance(peer.node_id))
-        return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])
+        for peer in list(self.active.keys())[:self.max_results]:
+            if peer not in self.contacted or not self.peer_manager.peer_is_good(peer):
+                return False
+        return True
 
     async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
         """
@@ -169,7 +170,8 @@ class IterativeFinder:
                 self.closest_peer = None
             self.prev_closest_peer = None
         if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
-            self.active.add(peer)
+            self.active[peer] = self.distance(peer.node_id)
+            self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))
             if self._is_closer(peer):
                 self.prev_closest_peer = self.closest_peer
                 self.closest_peer = peer
@@ -187,7 +189,8 @@ class IterativeFinder:
         self._log_state()
 
     def _reset_closest(self, peer):
-        self.active.discard(peer)
+        if peer in self.active:
+            del self.active[peer]
         if peer == self.prev_closest_peer:
             self.prev_closest_peer = None
         if peer == self.closest_peer:
@@ -216,10 +219,14 @@ class IterativeFinder:
         """
 
         added = 0
-        to_probe = list(self.active - self.contacted)
-        to_probe.sort(key=lambda peer: self.distance(peer.node_id))
-        log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
-        for peer in to_probe:
+        for index, peer in enumerate(self.active.keys()):
+            if index == 0:
+                log.debug("closest to probe: %s", peer.node_id.hex()[:8])
+                if self.closest_peer != peer:
+                    self.prev_closest_peer = self.closest_peer
+                    self.closest_peer = peer
+            if peer in self.contacted:
+                continue
             if added >= constants.ALPHA:
                 break
             origin_address = (peer.address, peer.udp_port)
@@ -320,7 +327,7 @@ class IterativeNodeFinder(IterativeFinder):
         return FindNodeResponse(self.key, response)
 
     def search_exhausted(self):
-        self.put_result(self.active, finish=True)
+        self.put_result(self.active.keys(), finish=True)
 
     def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
         not_yet_yielded = [
@@ -342,17 +349,17 @@ class IterativeNodeFinder(IterativeFinder):
 
         if found:
             log.debug("found")
-            return self.put_result(self.active, finish=True)
+            return self.put_result(self.active.keys(), finish=True)
         elif self.is_closest_peer_ready:
             self.bottom_out_count += 1
         else:
             self.bottom_out_count = 0
 
         if self.are_k_closest_peers_ready:
-            self.put_result(self.active, True)
+            self.put_result(self.active.keys(), True)
         elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
             log.debug("peer search bottomed out.")
-            self.put_result(self.active, True)
+            self.put_result(self.active.keys(), True)
 
 
 class IterativeValueFinder(IterativeFinder):

From 6335590b65ef6f13fc3cc1c74d04eb491b295e15 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 8 Feb 2022 19:58:28 -0300
Subject: [PATCH 14/32] don't probe peers too far from the top closest

---
 lbry/dht/protocol/iterative_find.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 62c13cd5d..e334c3f61 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -86,7 +86,7 @@ class IterativeFinder:
 
         self.key = key
         self.bottom_out_limit = bottom_out_limit
-        self.max_results = max_results
+        self.max_results = max(constants.K, max_results)
         self.exclude = exclude or []
 
         self.active: typing.Dict['KademliaPeer', int] = {}  # peer: distance, sorted
@@ -229,6 +229,8 @@ class IterativeFinder:
                 continue
             if added >= constants.ALPHA:
                 break
+            if index > self.max_results:
+                break
             origin_address = (peer.address, peer.udp_port)
             if origin_address in self.exclude:
                 continue

From c45f27d5cc3b5b98e4ae3944e2b6b61f44ed91ee Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 8 Feb 2022 20:00:29 -0300
Subject: [PATCH 15/32] bottoming out is now warning and no results for peer
 search

---
 lbry/dht/protocol/iterative_find.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index e334c3f61..dfeb4795a 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -360,8 +360,8 @@ class IterativeNodeFinder(IterativeFinder):
         if self.are_k_closest_peers_ready:
             self.put_result(self.active.keys(), True)
         elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
-            log.debug("peer search bottomed out.")
-            self.put_result(self.active.keys(), True)
+            log.warning("peer search bottomed out.")
+            self.put_result([], True)
 
 
 class IterativeValueFinder(IterativeFinder):

From dc1c0e6851090a6b3837c1e6fe634966d126d3bd Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Thu, 10 Feb 2022 01:48:11 -0300
Subject: [PATCH 16/32] no stop condition, let it exhaust

---
 lbry/dht/protocol/iterative_find.py | 97 ++++++-----------------------
 1 file changed, 18 insertions(+), 79 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index dfeb4795a..2fdc602eb 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -93,17 +93,14 @@ class IterativeFinder:
         self.contacted: typing.Set['KademliaPeer'] = set()
         self.distance = Distance(key)
 
-        self.closest_peer: typing.Optional['KademliaPeer'] = None
-        self.prev_closest_peer: typing.Optional['KademliaPeer'] = None
-
         self.iteration_queue = asyncio.Queue(loop=self.loop)
 
-        self.running_probes: typing.Set[asyncio.Task] = set()
+        self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
         self.iteration_count = 0
         self.bottom_out_count = 0
         self.running = False
         self.tasks: typing.List[asyncio.Task] = []
-        self.delayed_calls: typing.List[asyncio.Handle] = []
+        self.delayed_call: asyncio.Handle = None
         for peer in get_shortlist(routing_table, key, shortlist):
             if peer.node_id:
                 self._add_active(peer, force=True)
@@ -111,21 +108,6 @@ class IterativeFinder:
                 # seed nodes
                 self._schedule_probe(peer)
 
-    @property
-    def is_closest_peer_ready(self):
-        if not self.closest_peer or not self.prev_closest_peer:
-            return False
-        return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer)
-
-    @property
-    def are_k_closest_peers_ready(self):
-        if not self.is_closest_peer_ready or len(self.active) < self.max_results:
-            return False
-        for peer in list(self.active.keys())[:self.max_results]:
-            if peer not in self.contacted or not self.peer_manager.peer_is_good(peer):
-                return False
-        return True
-
     async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
         """
         Send the rpc request to the peer and return an object with the FindResponse interface
@@ -153,28 +135,14 @@ class IterativeFinder:
         """
         return []
 
-    def _is_closer(self, peer: 'KademliaPeer') -> bool:
-        return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)
-
     def _add_active(self, peer, force=False):
         if not force and self.peer_manager.peer_is_good(peer) is False:
             return
         if peer in self.contacted:
             return
-        if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
-            log.debug("[%s] closest peer went bad", self.key.hex()[:8])
-            if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
-                log.debug("[%s] previous closest was bad too", self.key.hex()[:8])
-                self.closest_peer = self.prev_closest_peer
-            else:
-                self.closest_peer = None
-            self.prev_closest_peer = None
         if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
             self.active[peer] = self.distance(peer.node_id)
             self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))
-            if self._is_closer(peer):
-                self.prev_closest_peer = self.closest_peer
-                self.closest_peer = peer
 
     async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
         self._add_active(peer)
@@ -191,10 +159,6 @@ class IterativeFinder:
     def _reset_closest(self, peer):
         if peer in self.active:
             del self.active[peer]
-        if peer == self.prev_closest_peer:
-            self.prev_closest_peer = None
-        if peer == self.closest_peer:
-            self.closest_peer = self.prev_closest_peer
 
     async def _send_probe(self, peer: 'KademliaPeer'):
         try:
@@ -213,7 +177,7 @@ class IterativeFinder:
             return
         return await self._handle_probe_result(peer, response)
 
-    async def _search_round(self):
+    def _search_round(self):
         """
         Send up to constants.alpha (5) probes to closest active peers
         """
@@ -222,14 +186,11 @@ class IterativeFinder:
         for index, peer in enumerate(self.active.keys()):
             if index == 0:
                 log.debug("closest to probe: %s", peer.node_id.hex()[:8])
-                if self.closest_peer != peer:
-                    self.prev_closest_peer = self.closest_peer
-                    self.closest_peer = peer
             if peer in self.contacted:
                 continue
-            if added >= constants.ALPHA:
+            if len(self.running_probes) >= constants.ALPHA:
                 break
-            if index > self.max_results:
+            if index > (constants.K - 1):
                 break
             origin_address = (peer.address, peer.udp_port)
             if origin_address in self.exclude:
@@ -251,21 +212,21 @@ class IterativeFinder:
         t = self.loop.create_task(self._send_probe(peer))
 
         def callback(_):
-            self.running_probes.difference_update({
-                probe for probe in self.running_probes if probe.done() or probe == t
-            })
-            if not self.running_probes:
-                self.tasks.append(self.loop.create_task(self._search_task(0.0)))
+            for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]:
+                del self.running_probes[peer]
+            self._search_task(0.0)
 
         t.add_done_callback(callback)
-        self.running_probes.add(t)
+        self.running_probes[peer] = t
 
-    async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
+    def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
         try:
             if self.running:
-                await self._search_round()
-            if self.running:
-                self.delayed_calls.append(self.loop.call_later(delay, self._search))
+                if self.delayed_call:
+                    self.delayed_call.cancel()  # ensure anything scheduled gets cancelled
+                self._search_round()
+            #if self.running:
+            #    self.delayed_call = self.loop.call_later(delay, self._search)
         except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
             if self.running:
                 self.loop.call_soon(self.aclose)
@@ -273,13 +234,9 @@ class IterativeFinder:
     def _log_state(self):
         log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
                   self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
-        if self.closest_peer and self.prev_closest_peer:
-            log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s",
-                      self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted,
-                      self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8])
 
     def _search(self):
-        self.tasks.append(self.loop.create_task(self._search_task()))
+        self._search_task()
 
     def __aiter__(self):
         if self.running:
@@ -305,11 +262,11 @@ class IterativeFinder:
     def aclose(self):
         self.running = False
         self.iteration_queue.put_nowait(None)
-        for task in chain(self.tasks, self.running_probes, self.delayed_calls):
+        for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])):
             task.cancel()
         self.tasks.clear()
         self.running_probes.clear()
-        self.delayed_calls.clear()
+        self.delayed_call = None
 
 
 class IterativeNodeFinder(IterativeFinder):
@@ -352,16 +309,6 @@ class IterativeNodeFinder(IterativeFinder):
         if found:
             log.debug("found")
             return self.put_result(self.active.keys(), finish=True)
-        elif self.is_closest_peer_ready:
-            self.bottom_out_count += 1
-        else:
-            self.bottom_out_count = 0
-
-        if self.are_k_closest_peers_ready:
-            self.put_result(self.active.keys(), True)
-        elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
-            log.warning("peer search bottomed out.")
-            self.put_result([], True)
 
 
 class IterativeValueFinder(IterativeFinder):
@@ -427,14 +374,6 @@ class IterativeValueFinder(IterativeFinder):
                 #     log.info("enough blob peers found")
                 #     if not self.finished.is_set():
                 #         self.finished.set()
-        elif self.is_closest_peer_ready:
-            self.bottom_out_count += 1
-            if self.are_k_closest_peers_ready:
-                log.info("blob peer search finished for %s", self.key.hex()[:8])
-                self.iteration_queue.put_nowait(None)
-            elif self.bottom_out_count >= self.bottom_out_limit:
-                log.info("blob peer search bottomed out for %s", self.key.hex()[:8])
-                self.iteration_queue.put_nowait(None)
 
     def get_initial_result(self) -> typing.List['KademliaPeer']:
         if self.protocol.data_store.has_peers_for_blob(self.key):

From dcde0e78e303238b29803aef541153e327a11a3e Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 11 Feb 2022 19:45:08 -0300
Subject: [PATCH 17/32] remove all references to bottoming out

---
 lbry/dht/constants.py               |  1 -
 lbry/dht/node.py                    | 10 ++++------
 lbry/dht/protocol/iterative_find.py | 22 +++++++---------------
 lbry/extras/daemon/daemon.py        | 13 +------------
 4 files changed, 12 insertions(+), 34 deletions(-)

diff --git a/lbry/dht/constants.py b/lbry/dht/constants.py
index 07dcec18a..7380ce60a 100644
--- a/lbry/dht/constants.py
+++ b/lbry/dht/constants.py
@@ -20,7 +20,6 @@ MAYBE_PING_DELAY = 300  # 5 minutes
 CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
 RPC_ID_LENGTH = 20
 PROTOCOL_VERSION = 1
-BOTTOM_OUT_LIMIT = 3
 MSG_SIZE_LIMIT = 1400
 
 
diff --git a/lbry/dht/node.py b/lbry/dht/node.py
index 635adc2c4..864edc077 100644
--- a/lbry/dht/node.py
+++ b/lbry/dht/node.py
@@ -202,25 +202,23 @@ class Node:
         self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))
 
     def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
-                                  bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
                                   max_results: int = constants.K) -> IterativeNodeFinder:
 
         return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
-                                   key, bottom_out_limit, max_results, None, shortlist)
+                                   key, max_results, None, shortlist)
 
     def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
-                                   bottom_out_limit: int = 40,
                                    max_results: int = -1) -> IterativeValueFinder:
 
         return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
-                                    key, bottom_out_limit, max_results, None, shortlist)
+                                    key, max_results, None, shortlist)
 
     async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
-                          bottom_out_limit=60, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
+                          shortlist: typing.Optional[typing.List['KademliaPeer']] = None
                           ) -> typing.List['KademliaPeer']:
         peers = []
         async for iteration_peers in self.get_iterative_node_finder(
-                node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
+                node_id, shortlist=shortlist, max_results=max_results):
             peers.extend(iteration_peers)
         distance = Distance(node_id)
         peers.sort(key=lambda peer: distance(peer.node_id))
diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 2fdc602eb..68c04692d 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
 class IterativeFinder:
     def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
                  routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
-                 bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
+                 max_results: typing.Optional[int] = constants.K,
                  exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
                  shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
         if len(key) != constants.HASH_LENGTH:
@@ -85,7 +85,6 @@ class IterativeFinder:
         self.protocol = protocol
 
         self.key = key
-        self.bottom_out_limit = bottom_out_limit
         self.max_results = max(constants.K, max_results)
         self.exclude = exclude or []
 
@@ -97,7 +96,6 @@ class IterativeFinder:
 
         self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
         self.iteration_count = 0
-        self.bottom_out_count = 0
         self.running = False
         self.tasks: typing.List[asyncio.Task] = []
         self.delayed_call: asyncio.Handle = None
@@ -232,8 +230,8 @@ class IterativeFinder:
                 self.loop.call_soon(self.aclose)
 
     def _log_state(self):
-        log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
-                  self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
+        log.debug("[%s] check result: %i active nodes %i contacted",
+                  self.key.hex()[:8], len(self.active), len(self.contacted))
 
     def _search(self):
         self._search_task()
@@ -272,10 +270,10 @@ class IterativeFinder:
 class IterativeNodeFinder(IterativeFinder):
     def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
                  routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
-                 bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
+                 max_results: typing.Optional[int] = constants.K,
                  exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
                  shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
-        super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
+        super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
                          shortlist)
         self.yielded_peers: typing.Set['KademliaPeer'] = set()
 
@@ -314,10 +312,10 @@ class IterativeNodeFinder(IterativeFinder):
 class IterativeValueFinder(IterativeFinder):
     def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
                  routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
-                 bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
+                 max_results: typing.Optional[int] = constants.K,
                  exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
                  shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
-        super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
+        super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
                          shortlist)
         self.blob_peers: typing.Set['KademliaPeer'] = set()
         # this tracks the index of the most recent page we requested from each peer
@@ -362,18 +360,12 @@ class IterativeValueFinder(IterativeFinder):
             blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
                           for compact_addr in response.found_compact_addresses]
             to_yield = []
-            self.bottom_out_count = 0
             for blob_peer in blob_peers:
                 if blob_peer not in self.blob_peers:
                     self.blob_peers.add(blob_peer)
                     to_yield.append(blob_peer)
             if to_yield:
-                # log.info("found %i new peers for blob", len(to_yield))
                 self.iteration_queue.put_nowait(to_yield)
-                # if self.max_results and len(self.blob_peers) >= self.max_results:
-                #     log.info("enough blob peers found")
-                #     if not self.finished.is_set():
-                #         self.finished.set()
 
     def get_initial_result(self) -> typing.List['KademliaPeer']:
         if self.protocol.data_store.has_peers_for_blob(self.key):
diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py
index 003ea9cf7..fb25207f7 100644
--- a/lbry/extras/daemon/daemon.py
+++ b/lbry/extras/daemon/daemon.py
@@ -4885,20 +4885,16 @@ class Daemon(metaclass=JSONRPCServerType):
     """
 
     @requires(DHT_COMPONENT)
-    async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None):
+    async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
         """
         Get peers for blob hash
 
         Usage:
             peer_list (<blob_hash> | --blob_hash=<blob_hash>)
-                [<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
                 [--page=<page>] [--page_size=<page_size>]
 
         Options:
             --blob_hash=<blob_hash>                                  : (str) find available peers for this blob hash
-            --search_bottom_out_limit=<search_bottom_out_limit>      : (int) the number of search probes in a row
-                                                                             that don't find any new peers
-                                                                             before giving up and returning
             --page=<page>                                            : (int) page to return during paginating
             --page_size=<page_size>                                  : (int) number of items on page during pagination
 
@@ -4910,13 +4906,6 @@ class Daemon(metaclass=JSONRPCServerType):
         if not is_valid_blobhash(blob_hash):
             # TODO: use error from lbry.error
             raise Exception("invalid blob hash")
-        if search_bottom_out_limit is not None:
-            search_bottom_out_limit = int(search_bottom_out_limit)
-            if search_bottom_out_limit <= 0:
-                # TODO: use error from lbry.error
-                raise Exception("invalid bottom out limit")
-        else:
-            search_bottom_out_limit = 4
         peers = []
         peer_q = asyncio.Queue(loop=self.component_manager.loop)
         await self.dht_node._peers_for_value_producer(blob_hash, peer_q)

From 12f156257e35dd862dad399fbce5e5f6fcf68baa Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Sat, 12 Feb 2022 03:28:26 -0300
Subject: [PATCH 18/32] allow running some extra probes for k replacements

---
 lbry/dht/protocol/iterative_find.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 68c04692d..bbfc93bd6 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -184,11 +184,12 @@ class IterativeFinder:
         for index, peer in enumerate(self.active.keys()):
             if index == 0:
                 log.debug("closest to probe: %s", peer.node_id.hex()[:8])
+
             if peer in self.contacted:
                 continue
             if len(self.running_probes) >= constants.ALPHA:
                 break
-            if index > (constants.K - 1):
+            if index > (constants.K + len(self.running_probes)):
                 break
             origin_address = (peer.address, peer.udp_port)
             if origin_address in self.exclude:

From 0120d989d8357b5450b2bd8136028b25c4687e83 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 18 Feb 2022 17:21:37 -0300
Subject: [PATCH 19/32] make timeout handler immune to asyncio time tricks

---
 lbry/testcase.py | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/lbry/testcase.py b/lbry/testcase.py
index b10ea9b27..6214553e5 100644
--- a/lbry/testcase.py
+++ b/lbry/testcase.py
@@ -204,7 +204,13 @@ class AsyncioTestCase(unittest.TestCase):
 
     def add_timeout(self):
         if self.TIMEOUT:
-            self.loop.call_later(self.TIMEOUT, self.cancel)
+            self.loop.call_later(self.TIMEOUT, self.check_timeout, time())
+
+    def check_timeout(self, started):
+        if time() - started >= self.TIMEOUT:
+            self.cancel()
+        else:
+            self.loop.call_later(self.TIMEOUT, self.check_timeout, started)
 
 
 class AdvanceTimeTestCase(AsyncioTestCase):

From 51be734a0850c5a60b8048059d36078061af6607 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 18 Feb 2022 18:47:01 -0300
Subject: [PATCH 20/32] add a way to wait announcements to finish so tests are
 reliable

---
 lbry/dht/blob_announcer.py | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py
index 24cf18bbe..e4da3cfa9 100644
--- a/lbry/dht/blob_announcer.py
+++ b/lbry/dht/blob_announcer.py
@@ -27,6 +27,7 @@ class BlobAnnouncer:
         self.storage = storage
         self.announce_task: asyncio.Task = None
         self.announce_queue: typing.List[str] = []
+        self._done = asyncio.Event()
 
     async def _submit_announcement(self, blob_hash):
         try:
@@ -64,6 +65,8 @@ class BlobAnnouncer:
                 if announced:
                     await self.storage.update_last_announced_blobs(announced)
                     log.info("announced %i blobs", len(announced))
+            self._done.set()
+            self._done.clear()
 
     def start(self, batch_size: typing.Optional[int] = 10):
         assert not self.announce_task or self.announce_task.done(), "already running"
@@ -72,3 +75,6 @@ class BlobAnnouncer:
     def stop(self):
         if self.announce_task and not self.announce_task.done():
             self.announce_task.cancel()
+
+    def wait(self):
+        return self._done.wait()

From b574fb7771e3f82bcb0693ccc9fdba84d500f118 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 18 Feb 2022 18:51:08 -0300
Subject: [PATCH 21/32] better representation of kademliapeer on debug logs

---
 lbry/dht/peer.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py
index 495f71eff..c5a9c9e84 100644
--- a/lbry/dht/peer.py
+++ b/lbry/dht/peer.py
@@ -190,3 +190,6 @@ class KademliaPeer:
 
     def compact_ip(self):
         return make_compact_ip(self.address)
+
+    def __str__(self):
+        return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})"

From 586b09c1bc87148b9ba63a8cb1dc87693c6194e0 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 18 Feb 2022 18:52:17 -0300
Subject: [PATCH 22/32] simplify dht mock and restore clock after accelerating

---
 tests/dht_mocks.py | 26 ++++++++------------------
 1 file changed, 8 insertions(+), 18 deletions(-)

diff --git a/tests/dht_mocks.py b/tests/dht_mocks.py
index 2e01986c0..4bebcfaf1 100644
--- a/tests/dht_mocks.py
+++ b/tests/dht_mocks.py
@@ -9,7 +9,7 @@ if typing.TYPE_CHECKING:
 
 
 def get_time_accelerator(loop: asyncio.AbstractEventLoop,
-                         now: typing.Optional[float] = None) -> typing.Callable[[float], typing.Awaitable[None]]:
+                         instant_step: bool = False) -> typing.Callable[[float], typing.Awaitable[None]]:
     """
     Returns an async advance() function
 
@@ -17,32 +17,22 @@ def get_time_accelerator(loop: asyncio.AbstractEventLoop,
     made by call_later, call_at, and call_soon.
     """
 
-    _time = now or loop.time()
-    loop.time = functools.wraps(loop.time)(lambda: _time)
+    original = loop.time
+    _drift = 0
+    loop.time = functools.wraps(loop.time)(lambda: original() + _drift)
 
     async def accelerate_time(seconds: float) -> None:
-        nonlocal _time
+        nonlocal _drift
         if seconds < 0:
             raise ValueError(f'Cannot go back in time ({seconds} seconds)')
-        _time += seconds
-        await past_events()
+        _drift += seconds
         await asyncio.sleep(0)
 
-    async def past_events() -> None:
-        while loop._scheduled:
-            timer: asyncio.TimerHandle = loop._scheduled[0]
-            if timer not in loop._ready and timer._when <= _time:
-                loop._scheduled.remove(timer)
-                loop._ready.append(timer)
-            if timer._when > _time:
-                break
-            await asyncio.sleep(0)
-
     async def accelerator(seconds: float):
-        steps = seconds * 10.0
+        steps = seconds * 10.0 if not instant_step else 1
 
         for _ in range(max(int(steps), 1)):
-            await accelerate_time(0.1)
+            await accelerate_time(seconds/steps)
 
     return accelerator
 

From cc104369cb0d39652608e05aa7c6c1980e3b894f Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 18 Feb 2022 18:53:10 -0300
Subject: [PATCH 23/32] fix and enable test_blob_announcer

---
 tests/unit/dht/test_blob_announcer.py | 135 +++++++++++++++-----------
 tests/unit/dht/test_node.py           |   4 +-
 2 files changed, 78 insertions(+), 61 deletions(-)

diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py
index 5c7d921aa..d5b2c5e17 100644
--- a/tests/unit/dht/test_blob_announcer.py
+++ b/tests/unit/dht/test_blob_announcer.py
@@ -1,60 +1,70 @@
 import contextlib
+import logging
 import typing
 import binascii
 import socket
 import asyncio
+
 from lbry.testcase import AsyncioTestCase
 from tests import dht_mocks
+from lbry.dht.protocol.distance import Distance
 from lbry.conf import Config
 from lbry.dht import constants
 from lbry.dht.node import Node
 from lbry.dht.peer import PeerManager, make_kademlia_peer
 from lbry.dht.blob_announcer import BlobAnnouncer
 from lbry.extras.daemon.storage import SQLiteStorage
-from unittest import skip
+
 
 class TestBlobAnnouncer(AsyncioTestCase):
+    TIMEOUT = 20.0  # lower than default
+
     async def setup_node(self, peer_addresses, address, node_id):
         self.nodes: typing.Dict[int, Node] = {}
-        self.advance = dht_mocks.get_time_accelerator(self.loop, self.loop.time())
+        self.advance = dht_mocks.get_time_accelerator(self.loop)
+        self.instant_advance = dht_mocks.get_time_accelerator(self.loop)
         self.conf = Config()
-        self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
-        await self.storage.open()
         self.peer_manager = PeerManager(self.loop)
         self.node = Node(self.loop, self.peer_manager, node_id, 4444, 4444, 3333, address)
         await self.node.start_listening(address)
-        self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
-        for node_id, address in peer_addresses:
-            await self.add_peer(node_id, address)
+        await asyncio.gather(*[self.add_peer(node_id, address) for node_id, address in peer_addresses])
+        for first_peer in self.nodes.values():
+            for second_peer in self.nodes.values():
+                if first_peer == second_peer:
+                    continue
+                self.add_peer_to_routing_table(first_peer, second_peer)
+                self.add_peer_to_routing_table(second_peer, first_peer)
+        await self.advance(0.1)  # just to make pings go through
         self.node.joined.set()
         self.node._refresh_task = self.loop.create_task(self.node.refresh_node())
+        self.storage = SQLiteStorage(self.conf, ":memory:", self.loop, self.loop.time)
+        await self.storage.open()
+        self.blob_announcer = BlobAnnouncer(self.loop, self.node, self.storage)
 
     async def add_peer(self, node_id, address, add_to_routing_table=True):
+        #print('add', node_id.hex()[:8], address)
         n = Node(self.loop, PeerManager(self.loop), node_id, 4444, 4444, 3333, address)
         await n.start_listening(address)
         self.nodes.update({len(self.nodes): n})
         if add_to_routing_table:
-            self.node.protocol.add_peer(
-                make_kademlia_peer(
-                    n.protocol.node_id, n.protocol.external_ip, n.protocol.udp_port
-                )
+            self.add_peer_to_routing_table(self.node, n)
+
+    def add_peer_to_routing_table(self, adder, being_added):
+        adder.protocol.add_peer(
+            make_kademlia_peer(
+                being_added.protocol.node_id, being_added.protocol.external_ip, being_added.protocol.udp_port
             )
+        )
 
     @contextlib.asynccontextmanager
-    async def _test_network_context(self, peer_addresses=None):
-        self.peer_addresses = peer_addresses or [
-            (constants.generate_id(2), '1.2.3.2'),
-            (constants.generate_id(3), '1.2.3.3'),
-            (constants.generate_id(4), '1.2.3.4'),
-            (constants.generate_id(5), '1.2.3.5'),
-            (constants.generate_id(6), '1.2.3.6'),
-            (constants.generate_id(7), '1.2.3.7'),
-            (constants.generate_id(8), '1.2.3.8'),
-            (constants.generate_id(9), '1.2.3.9'),
+    async def _test_network_context(self, peer_count=200):
+        self.peer_addresses = [
+            (constants.generate_id(i), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
+            for i in range(1, peer_count + 1)
         ]
         try:
             with dht_mocks.mock_network_loop(self.loop):
-                await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1))
+                await self.setup_node(self.peer_addresses, '1.2.3.1', constants.generate_id(1000))
                 yield
         finally:
             self.blob_announcer.stop()
@@ -73,43 +83,58 @@ class TestBlobAnnouncer(AsyncioTestCase):
             )
         )
         await peer.ping()
-        return peer
+        return last_node
 
-    @skip("Something from a previous test is leaking into this test and causing it to fail intermittently")
     async def test_announce_blobs(self):
         blob1 = binascii.hexlify(b'1' * 48).decode()
         blob2 = binascii.hexlify(b'2' * 48).decode()
 
-        async with self._test_network_context():
-            await self.storage.add_blobs((blob1, 1024), (blob2, 1024), finished=True)
-            await self.storage.db.execute(
-                "update blob set next_announce_time=0, should_announce=1 where blob_hash in (?, ?)",
-                (blob1, blob2)
-            )
+        async with self._test_network_context(peer_count=100):
+            await self.storage.add_blobs((blob1, 1024, 0, True), (blob2, 1024, 0, True), finished=True)
+            await self.storage.add_blobs(
+                *((constants.generate_id(value).hex(), 1024, 0, True) for value in range(1000, 1090)),
+                finished=True)
+            await self.storage.db.execute("update blob set next_announce_time=0, should_announce=1")
             to_announce = await self.storage.get_blobs_to_announce()
-            self.assertEqual(2, len(to_announce))
-            self.blob_announcer.start(batch_size=1)  # so it covers batching logic
+            self.assertEqual(92, len(to_announce))
+            self.blob_announcer.start(batch_size=10)  # so it covers batching logic
             # takes 60 seconds to start, but we advance 120 to ensure it processed all batches
-            await self.advance(60.0 * 2)
+            ongoing_announcements = asyncio.ensure_future(self.blob_announcer.wait())
+            await self.instant_advance(60.0)
+            await ongoing_announcements
             to_announce = await self.storage.get_blobs_to_announce()
             self.assertEqual(0, len(to_announce))
             self.blob_announcer.stop()
 
+            # as routing table pollution will cause some peers to be hard to reach, we add a tolerance for CI
+            tolerance = 0.8  # at least 80% of the announcements are within the top K
+            for blob in await self.storage.get_all_blob_hashes():
+                distance = Distance(bytes.fromhex(blob))
+                candidates = list(self.nodes.values())
+                candidates.sort(key=lambda sorting_node: distance(sorting_node.protocol.node_id))
+                has_it = 0
+                for index, node in enumerate(candidates[:constants.K], start=1):
+                    if node.protocol.data_store.get_peers_for_blob(bytes.fromhex(blob)):
+                        has_it += 1
+                    else:
+                        logging.warning("blob %s wasnt found between the best K (%s)", blob[:8], node.protocol.node_id.hex()[:8])
+                self.assertGreaterEqual(has_it, int(tolerance * constants.K))
+
+
             # test that we can route from a poorly connected peer all the way to the announced blob
 
-            await self.chain_peer(constants.generate_id(10), '1.2.3.10')
-            await self.chain_peer(constants.generate_id(11), '1.2.3.11')
-            await self.chain_peer(constants.generate_id(12), '1.2.3.12')
-            await self.chain_peer(constants.generate_id(13), '1.2.3.13')
-            await self.chain_peer(constants.generate_id(14), '1.2.3.14')
-            await self.advance(61.0)
+            current = len(self.nodes)
+            await self.chain_peer(constants.generate_id(current + 1), '1.2.3.10')
+            await self.chain_peer(constants.generate_id(current + 2), '1.2.3.11')
+            await self.chain_peer(constants.generate_id(current + 3), '1.2.3.12')
+            await self.chain_peer(constants.generate_id(current + 4), '1.2.3.13')
+            last = await self.chain_peer(constants.generate_id(current + 5), '1.2.3.14')
 
-            last = self.nodes[len(self.nodes) - 1]
             search_q, peer_q = asyncio.Queue(loop=self.loop), asyncio.Queue(loop=self.loop)
             search_q.put_nowait(blob1)
 
             _, task = last.accumulate_peers(search_q, peer_q)
-            found_peers = await peer_q.get()
+            found_peers = await asyncio.wait_for(peer_q.get(), 1.0)
             task.cancel()
 
             self.assertEqual(1, len(found_peers))
@@ -119,21 +144,13 @@ class TestBlobAnnouncer(AsyncioTestCase):
 
     async def test_popular_blob(self):
         peer_count = 150
-        addresses = [
-            (constants.generate_id(i + 1), socket.inet_ntoa(int(i + 0x01000001).to_bytes(length=4, byteorder='big')))
-            for i in range(peer_count)
-        ]
-        blob_hash = b'1' * 48
+        blob_hash = constants.generate_id(99999)
 
-        async with self._test_network_context(peer_addresses=addresses):
+        async with self._test_network_context(peer_count=peer_count):
             total_seen = set()
-            announced_to = self.nodes[0]
-            for i in range(1, peer_count):
-                node = self.nodes[i]
-                kad_peer = make_kademlia_peer(
-                    node.protocol.node_id, node.protocol.external_ip, node.protocol.udp_port
-                )
-                await announced_to.protocol._add_peer(kad_peer)
+            announced_to = self.nodes.pop(0)
+            for i, node in enumerate(self.nodes.values()):
+                self.add_peer_to_routing_table(announced_to, node)
                 peer = node.protocol.get_rpc_peer(
                     make_kademlia_peer(
                         announced_to.protocol.node_id,
@@ -144,15 +161,15 @@ class TestBlobAnnouncer(AsyncioTestCase):
                 response = await peer.store(blob_hash)
                 self.assertEqual(response, b'OK')
                 peers_for_blob = await peer.find_value(blob_hash, 0)
-                if i == 1:
+                if i == 0:
                     self.assertNotIn(blob_hash, peers_for_blob)
                     self.assertEqual(peers_for_blob[b'p'], 0)
                 else:
-                    self.assertEqual(len(peers_for_blob[blob_hash]), min(i - 1, constants.K))
-                    self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i)
+                    self.assertEqual(len(peers_for_blob[blob_hash]), min(i, constants.K))
+                    self.assertEqual(len(announced_to.protocol.data_store.get_peers_for_blob(blob_hash)), i + 1)
                 if i - 1 > constants.K:
                     self.assertEqual(len(peers_for_blob[b'contacts']), constants.K)
-                    self.assertEqual(peers_for_blob[b'p'], ((i - 1) // (constants.K + 1)) + 1)
+                    self.assertEqual(peers_for_blob[b'p'], (i // (constants.K + 1)) + 1)
                     seen = set(peers_for_blob[blob_hash])
                     self.assertEqual(len(seen), constants.K)
                     self.assertEqual(len(peers_for_blob[blob_hash]), len(seen))
@@ -167,5 +184,5 @@ class TestBlobAnnouncer(AsyncioTestCase):
                         seen.intersection_update(page_x_set)
                         total_seen.update(page_x_set)
                 else:
-                    self.assertEqual(len(peers_for_blob[b'contacts']), i - 1)
+                    self.assertEqual(len(peers_for_blob[b'contacts']), 8)  # we always add 8 on first page
             self.assertEqual(len(total_seen), peer_count - 2)
diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py
index c862305ec..fcf65ff10 100644
--- a/tests/unit/dht/test_node.py
+++ b/tests/unit/dht/test_node.py
@@ -29,7 +29,7 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
             (constants.generate_id(9), '1.2.3.9'),
         ]
         with dht_mocks.mock_network_loop(loop):
-            advance = dht_mocks.get_time_accelerator(loop, loop.time())
+            advance = dht_mocks.get_time_accelerator(loop)
             # start the nodes
             nodes: typing.Dict[int, Node] = {
                 i: Node(loop, PeerManager(loop), node_id, 4444, 4444, 3333, address)
@@ -131,7 +131,7 @@ class TestTemporarilyLosingConnection(AsyncioTestCase):
             await asyncio.gather(*[n.joined.wait() for n in nodes])
 
             node = nodes[-1]
-            advance = dht_mocks.get_time_accelerator(loop, loop.time())
+            advance = dht_mocks.get_time_accelerator(loop)
             await advance(500)
 
             # Join the network, assert that at least the known peers are in RT

From 3e79dcd17975e3fe8366bf08dab1c99b8ae07822 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Fri, 18 Feb 2022 19:10:11 -0300
Subject: [PATCH 24/32] timeout is now supported on dht tests

---
 tests/unit/dht/test_blob_announcer.py | 2 --
 tests/unit/dht/test_node.py           | 2 --
 2 files changed, 4 deletions(-)

diff --git a/tests/unit/dht/test_blob_announcer.py b/tests/unit/dht/test_blob_announcer.py
index d5b2c5e17..be445aae7 100644
--- a/tests/unit/dht/test_blob_announcer.py
+++ b/tests/unit/dht/test_blob_announcer.py
@@ -17,8 +17,6 @@ from lbry.extras.daemon.storage import SQLiteStorage
 
 
 class TestBlobAnnouncer(AsyncioTestCase):
-    TIMEOUT = 20.0  # lower than default
-
     async def setup_node(self, peer_addresses, address, node_id):
         self.nodes: typing.Dict[int, Node] = {}
         self.advance = dht_mocks.get_time_accelerator(self.loop)
diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py
index fcf65ff10..5ecad5181 100644
--- a/tests/unit/dht/test_node.py
+++ b/tests/unit/dht/test_node.py
@@ -12,7 +12,6 @@ from lbry.extras.daemon.storage import SQLiteStorage
 
 
 class TestNodePingQueueDiscover(AsyncioTestCase):
-    TIMEOUT = None  # not supported as it advances time
     async def test_ping_queue_discover(self):
         loop = asyncio.get_event_loop()
         loop.set_debug(False)
@@ -93,7 +92,6 @@ class TestNodePingQueueDiscover(AsyncioTestCase):
 
 
 class TestTemporarilyLosingConnection(AsyncioTestCase):
-    TIMEOUT = None  # not supported as it advances time
     @unittest.SkipTest
     async def test_losing_connection(self):
         async def wait_for(check_ok, insist, timeout=20):

From c1e64df528fcdfacdd8d62001273083a66f589f6 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Sat, 19 Feb 2022 02:30:31 -0300
Subject: [PATCH 25/32] remove unused search rounds

---
 lbry/dht/protocol/iterative_find.py | 28 +++++-----------------------
 1 file changed, 5 insertions(+), 23 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index bbfc93bd6..409efcdb4 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -98,7 +98,6 @@ class IterativeFinder:
         self.iteration_count = 0
         self.running = False
         self.tasks: typing.List[asyncio.Task] = []
-        self.delayed_call: asyncio.Handle = None
         for peer in get_shortlist(routing_table, key, shortlist):
             if peer.node_id:
                 self._add_active(peer, force=True)
@@ -184,7 +183,6 @@ class IterativeFinder:
         for index, peer in enumerate(self.active.keys()):
             if index == 0:
                 log.debug("closest to probe: %s", peer.node_id.hex()[:8])
-
             if peer in self.contacted:
                 continue
             if len(self.running_probes) >= constants.ALPHA:
@@ -211,37 +209,22 @@ class IterativeFinder:
         t = self.loop.create_task(self._send_probe(peer))
 
         def callback(_):
-            for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]:
-                del self.running_probes[peer]
-            self._search_task(0.0)
+            self.running_probes.pop(peer, None)
+            if self.running:
+                self._search_round()
 
         t.add_done_callback(callback)
         self.running_probes[peer] = t
 
-    def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
-        try:
-            if self.running:
-                if self.delayed_call:
-                    self.delayed_call.cancel()  # ensure anything scheduled gets cancelled
-                self._search_round()
-            #if self.running:
-            #    self.delayed_call = self.loop.call_later(delay, self._search)
-        except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
-            if self.running:
-                self.loop.call_soon(self.aclose)
-
     def _log_state(self):
         log.debug("[%s] check result: %i active nodes %i contacted",
                   self.key.hex()[:8], len(self.active), len(self.contacted))
 
-    def _search(self):
-        self._search_task()
-
     def __aiter__(self):
         if self.running:
             raise Exception("already running")
         self.running = True
-        self._search()
+        self.loop.call_soon(self._search_round)
         return self
 
     async def __anext__(self) -> typing.List['KademliaPeer']:
@@ -261,11 +244,10 @@ class IterativeFinder:
     def aclose(self):
         self.running = False
         self.iteration_queue.put_nowait(None)
-        for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])):
+        for task in chain(self.tasks, self.running_probes.values()):
             task.cancel()
         self.tasks.clear()
         self.running_probes.clear()
-        self.delayed_call = None
 
 
 class IterativeNodeFinder(IterativeFinder):

From d00b5befbf9dd36ed403c4a6625f75d4ac79fdea Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Sat, 19 Feb 2022 02:32:12 -0300
Subject: [PATCH 26/32] make active an explicit ordered dict

---
 lbry/dht/protocol/iterative_find.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py
index 409efcdb4..ab89edddc 100644
--- a/lbry/dht/protocol/iterative_find.py
+++ b/lbry/dht/protocol/iterative_find.py
@@ -1,6 +1,6 @@
 import asyncio
 from itertools import chain
-from collections import defaultdict
+from collections import defaultdict, OrderedDict
 import typing
 import logging
 from typing import TYPE_CHECKING
@@ -88,7 +88,7 @@ class IterativeFinder:
         self.max_results = max(constants.K, max_results)
         self.exclude = exclude or []
 
-        self.active: typing.Dict['KademliaPeer', int] = {}  # peer: distance, sorted
+        self.active: typing.Dict['KademliaPeer', int] = OrderedDict()  # peer: distance, sorted
         self.contacted: typing.Set['KademliaPeer'] = set()
         self.distance = Distance(key)
 
@@ -139,7 +139,7 @@ class IterativeFinder:
             return
         if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
             self.active[peer] = self.distance(peer.node_id)
-            self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))
+            self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))
 
     async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
         self._add_active(peer)

From f5b3e9bacd2afed72a62e2faa98bca8901954959 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 15 Feb 2022 00:55:36 -0300
Subject: [PATCH 27/32] implement announcer as a consumer task on gather

---
 lbry/dht/blob_announcer.py | 38 +++++++++++++++++++-------------------
 1 file changed, 19 insertions(+), 19 deletions(-)

diff --git a/lbry/dht/blob_announcer.py b/lbry/dht/blob_announcer.py
index e4da3cfa9..9629e06b6 100644
--- a/lbry/dht/blob_announcer.py
+++ b/lbry/dht/blob_announcer.py
@@ -28,21 +28,23 @@ class BlobAnnouncer:
         self.announce_task: asyncio.Task = None
         self.announce_queue: typing.List[str] = []
         self._done = asyncio.Event()
+        self.announced = set()
 
-    async def _submit_announcement(self, blob_hash):
-        try:
-
-            peers = len(await self.node.announce_blob(blob_hash))
-            self.announcements_sent_metric.labels(peers=peers, error=False).inc()
-            if peers > 4:
-                return blob_hash
-            else:
-                log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
-        except Exception as err:
-            self.announcements_sent_metric.labels(peers=0, error=True).inc()
-            if isinstance(err, asyncio.CancelledError):  # TODO: remove when updated to 3.8
-                raise err
-            log.warning("error announcing %s: %s", blob_hash[:8], str(err))
+    async def _run_consumer(self):
+        while self.announce_queue:
+            try:
+                blob_hash = self.announce_queue.pop()
+                peers = len(await self.node.announce_blob(blob_hash))
+                self.announcements_sent_metric.labels(peers=peers, error=False).inc()
+                if peers > 4:
+                    self.announced.add(blob_hash)
+                else:
+                    log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
+            except Exception as err:
+                self.announcements_sent_metric.labels(peers=0, error=True).inc()
+                if isinstance(err, asyncio.CancelledError):  # TODO: remove when updated to 3.8
+                    raise err
+                log.warning("error announcing %s: %s", blob_hash[:8], str(err))
 
     async def _announce(self, batch_size: typing.Optional[int] = 10):
         while batch_size:
@@ -57,14 +59,12 @@ class BlobAnnouncer:
             log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
             while len(self.announce_queue) > 0:
                 log.info("%i blobs to announce", len(self.announce_queue))
-                announced = await asyncio.gather(*[
-                    self._submit_announcement(
-                        self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
-                ], loop=self.loop)
-                announced = list(filter(None, announced))
+                await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
+                announced = list(filter(None, self.announced))
                 if announced:
                     await self.storage.update_last_announced_blobs(announced)
                     log.info("announced %i blobs", len(announced))
+                    self.announced.clear()
             self._done.set()
             self._done.clear()
 

From 2a1e1d542ffcec2846b1871724dc6e5749690dd2 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Sun, 20 Feb 2022 23:03:55 -0300
Subject: [PATCH 28/32] extract method and avoid using hash builtin name

---
 lbry/extras/daemon/components.py | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index 96b6646cf..f51af4f3f 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -413,16 +413,18 @@ class BackgroundDownloaderComponent(Component):
         while True:
             self.space_available = await self.space_manager.get_free_space_mb(True)
             if not self.is_busy and self.space_available > 10:
-                node_id_prefix = int.from_bytes(self.dht_node.protocol.node_id[:4], "big")
-                for hash in self.dht_node.stored_blob_hashes:
-                    colliding_bits = 16 - int(node_id_prefix ^ int.from_bytes(hash[:4], "big")).bit_length()
-                    if hash.hex() in self.blob_manager.completed_blob_hashes:
-                        continue
-                    if colliding_bits >= self.MIN_PREFIX_COLLIDING_BITS:
-                        self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(hash))
-                        break
+                self._download_next_close_blob_hash()
             await asyncio.sleep(self.download_loop_delay_seconds)
 
+    def _download_next_close_blob_hash(self):
+        node_id_prefix = int.from_bytes(self.dht_node.protocol.node_id[:4], "big")
+        for blob_hash in self.dht_node.stored_blob_hashes:
+            colliding_bits = 32 - int(node_id_prefix ^ int.from_bytes(blob_hash[:4], "big")).bit_length()
+            if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
+                continue
+            if colliding_bits >= self.MIN_PREFIX_COLLIDING_BITS:
+                self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
+
     async def start(self):
         self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
         if not self.component_manager.has_component(DHT_COMPONENT):

From af9cc457ec88fb51e4e218db93104523609fb14b Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 22 Feb 2022 15:46:08 -0300
Subject: [PATCH 29/32] add get_colliding_prefix_bits, docs and tests

---
 lbry/utils.py            | 18 ++++++++++++++++++
 tests/unit/test_utils.py | 19 +++++++++++++++++++
 2 files changed, 37 insertions(+)
 create mode 100644 tests/unit/test_utils.py

diff --git a/lbry/utils.py b/lbry/utils.py
index a5dc1a26e..19cacb689 100644
--- a/lbry/utils.py
+++ b/lbry/utils.py
@@ -474,3 +474,21 @@ class LockWithMetrics(asyncio.Lock):
             return super().release()
         finally:
             self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time)
+
+
+def get_colliding_prefix_bits(first_value: bytes, second_value: bytes, size: int):
+    """
+    Calculates the amount of colliding bits between <first_value> and <second_value> over the <size> first bits.
+    This is given by the amount of bits that are the same until the first different one (via XOR).
+    :param first_value: first value to compare, bigger than size.
+    :param second_value: second value to compare, bigger than size.
+    :param size: prefix size in bits.
+    :return: amount of prefix colliding bits.
+    """
+    assert size % 8 == 0, "size has to be a multiple of 8"
+    size_in_bytes = size // 8
+    assert len(first_value) >= size_in_bytes, "first_value has to be larger than size parameter"
+    first_value = int.from_bytes(first_value[:size_in_bytes], "big")
+    assert len(second_value) >= size_in_bytes, "second_value has to be larger than size parameter"
+    second_value = int.from_bytes(second_value[:size_in_bytes], "big")
+    return size - (first_value ^ second_value).bit_length()
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
new file mode 100644
index 000000000..8aab6ff99
--- /dev/null
+++ b/tests/unit/test_utils.py
@@ -0,0 +1,19 @@
+import unittest
+from lbry import utils
+
+
+class UtilsTestCase(unittest.TestCase):
+
+    def test_get_colliding_prefix_bits(self):
+        self.assertEqual(
+            0, utils.get_colliding_prefix_bits(0xffffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 32))
+        self.assertEqual(
+            1, utils.get_colliding_prefix_bits(0xefffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 32))
+        self.assertEqual(
+            8, utils.get_colliding_prefix_bits(0x00ffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 16))
+        self.assertEqual(
+            8, utils.get_colliding_prefix_bits(0x00ffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 8))
+        self.assertEqual(
+            1, utils.get_colliding_prefix_bits(0xefffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 16))
+        self.assertEqual(
+            1, utils.get_colliding_prefix_bits(0xefffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 8))

From af7574dc9db8977fa3e1a2f97f732d97c88eceae Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 22 Feb 2022 15:51:36 -0300
Subject: [PATCH 30/32] replace duplicated code

---
 lbry/dht/protocol/routing_table.py | 11 +++++------
 lbry/extras/daemon/components.py   |  5 ++---
 2 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py
index 344158a95..b16efb37c 100644
--- a/lbry/dht/protocol/routing_table.py
+++ b/lbry/dht/protocol/routing_table.py
@@ -6,6 +6,7 @@ import itertools
 
 from prometheus_client import Gauge
 
+from lbry import utils
 from lbry.dht import constants
 from lbry.dht.protocol.distance import Distance
 if typing.TYPE_CHECKING:
@@ -70,9 +71,8 @@ class KBucket:
         if len(self.peers) < constants.K:
             self.peers.append(peer)
             self.peer_in_routing_table_metric.labels("global").inc()
-            if peer.node_id[0] == self._node_id[0]:
-                bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
-                self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc()
+            bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id, 32)
+            self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).inc()
             return True
         else:
             return False
@@ -140,9 +140,8 @@ class KBucket:
     def remove_peer(self, peer: 'KademliaPeer') -> None:
         self.peers.remove(peer)
         self.peer_in_routing_table_metric.labels("global").dec()
-        if peer.node_id[0] == self._node_id[0]:
-            bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
-            self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec()
+        bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id, 32)
+        self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).dec()
 
     def key_in_range(self, key: bytes) -> bool:
         """ Tests whether the specified key (i.e. node ID) is in the range
diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index f51af4f3f..c1e3fe046 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -417,12 +417,11 @@ class BackgroundDownloaderComponent(Component):
             await asyncio.sleep(self.download_loop_delay_seconds)
 
     def _download_next_close_blob_hash(self):
-        node_id_prefix = int.from_bytes(self.dht_node.protocol.node_id[:4], "big")
+        node_id = self.dht_node.protocol.node_id
         for blob_hash in self.dht_node.stored_blob_hashes:
-            colliding_bits = 32 - int(node_id_prefix ^ int.from_bytes(blob_hash[:4], "big")).bit_length()
             if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
                 continue
-            if colliding_bits >= self.MIN_PREFIX_COLLIDING_BITS:
+            if utils.get_colliding_prefix_bits(node_id, blob_hash, 32) >= self.MIN_PREFIX_COLLIDING_BITS:
                 self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
 
     async def start(self):

From 6fb1443e6347e45b149b24497fa6040cb17536eb Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 22 Feb 2022 15:54:39 -0300
Subject: [PATCH 31/32] stop after finding what to download

---
 lbry/extras/daemon/components.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index c1e3fe046..931140445 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -423,6 +423,7 @@ class BackgroundDownloaderComponent(Component):
                 continue
             if utils.get_colliding_prefix_bits(node_id, blob_hash, 32) >= self.MIN_PREFIX_COLLIDING_BITS:
                 self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
+                return
 
     async def start(self):
         self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)

From f68ea01056f64806a1c3ff83cd26f28d62a7a447 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Tue, 22 Feb 2022 22:38:04 -0300
Subject: [PATCH 32/32] simplify, genaralize to any size and fix tests

---
 lbry/dht/protocol/routing_table.py |  4 ++--
 lbry/extras/daemon/components.py   |  2 +-
 lbry/utils.py                      | 17 +++++++----------
 tests/unit/test_utils.py           | 12 ++++++------
 4 files changed, 16 insertions(+), 19 deletions(-)

diff --git a/lbry/dht/protocol/routing_table.py b/lbry/dht/protocol/routing_table.py
index b16efb37c..42c82e2bc 100644
--- a/lbry/dht/protocol/routing_table.py
+++ b/lbry/dht/protocol/routing_table.py
@@ -71,7 +71,7 @@ class KBucket:
         if len(self.peers) < constants.K:
             self.peers.append(peer)
             self.peer_in_routing_table_metric.labels("global").inc()
-            bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id, 32)
+            bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
             self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).inc()
             return True
         else:
@@ -140,7 +140,7 @@ class KBucket:
     def remove_peer(self, peer: 'KademliaPeer') -> None:
         self.peers.remove(peer)
         self.peer_in_routing_table_metric.labels("global").dec()
-        bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id, 32)
+        bits_colliding = utils.get_colliding_prefix_bits(peer.node_id, self._node_id)
         self.peer_with_x_bit_colliding_metric.labels(amount=bits_colliding).dec()
 
     def key_in_range(self, key: bytes) -> bool:
diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index 931140445..03bef1534 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -421,7 +421,7 @@ class BackgroundDownloaderComponent(Component):
         for blob_hash in self.dht_node.stored_blob_hashes:
             if blob_hash.hex() in self.blob_manager.completed_blob_hashes:
                 continue
-            if utils.get_colliding_prefix_bits(node_id, blob_hash, 32) >= self.MIN_PREFIX_COLLIDING_BITS:
+            if utils.get_colliding_prefix_bits(node_id, blob_hash) >= self.MIN_PREFIX_COLLIDING_BITS:
                 self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash.hex()))
                 return
 
diff --git a/lbry/utils.py b/lbry/utils.py
index 19cacb689..6a6cdd618 100644
--- a/lbry/utils.py
+++ b/lbry/utils.py
@@ -476,19 +476,16 @@ class LockWithMetrics(asyncio.Lock):
             self._lock_held_time_metric.observe(time.perf_counter() - self._lock_acquired_time)
 
 
-def get_colliding_prefix_bits(first_value: bytes, second_value: bytes, size: int):
+def get_colliding_prefix_bits(first_value: bytes, second_value: bytes):
     """
-    Calculates the amount of colliding bits between <first_value> and <second_value> over the <size> first bits.
-    This is given by the amount of bits that are the same until the first different one (via XOR).
+    Calculates the amount of colliding prefix bits between <first_value> and <second_value>.
+    This is given by the amount of bits that are the same until the first different one (via XOR),
+    starting from the most significant bit to the least significant bit.
     :param first_value: first value to compare, bigger than size.
     :param second_value: second value to compare, bigger than size.
-    :param size: prefix size in bits.
     :return: amount of prefix colliding bits.
     """
-    assert size % 8 == 0, "size has to be a multiple of 8"
-    size_in_bytes = size // 8
-    assert len(first_value) >= size_in_bytes, "first_value has to be larger than size parameter"
-    first_value = int.from_bytes(first_value[:size_in_bytes], "big")
-    assert len(second_value) >= size_in_bytes, "second_value has to be larger than size parameter"
-    second_value = int.from_bytes(second_value[:size_in_bytes], "big")
+    assert len(first_value) == len(second_value), "length should be the same"
+    size = len(first_value) * 8
+    first_value, second_value = int.from_bytes(first_value, "big"), int.from_bytes(second_value, "big")
     return size - (first_value ^ second_value).bit_length()
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index 8aab6ff99..ebcbeea16 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -6,14 +6,14 @@ class UtilsTestCase(unittest.TestCase):
 
     def test_get_colliding_prefix_bits(self):
         self.assertEqual(
-            0, utils.get_colliding_prefix_bits(0xffffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 32))
+            0, utils.get_colliding_prefix_bits(0xffffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
         self.assertEqual(
-            1, utils.get_colliding_prefix_bits(0xefffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 32))
+            1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
         self.assertEqual(
-            8, utils.get_colliding_prefix_bits(0x00ffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 16))
+            8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
         self.assertEqual(
-            8, utils.get_colliding_prefix_bits(0x00ffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 8))
+            8, utils.get_colliding_prefix_bits(0x00ffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
         self.assertEqual(
-            1, utils.get_colliding_prefix_bits(0xefffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 16))
+            1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))
         self.assertEqual(
-            1, utils.get_colliding_prefix_bits(0xefffff.to_bytes(4, "big"), 0x00000000.to_bytes(4, "big"), 8))
+            1, utils.get_colliding_prefix_bits(0x7fffffff.to_bytes(4, "big"), 0x0000000000.to_bytes(4, "big")))