diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index fcd4afe8f..9d890bab6 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -236,3 +236,19 @@ class Node: distance = Distance(node_id) accumulated.sort(key=lambda peer: distance(peer.node_id)) return accumulated[:count] + + async def _accumulate_search_junction(self, search_queue: asyncio.Queue, + result_queue: asyncio.Queue): + try: + async with self.stream_peer_search_junction(search_queue) as search_junction: + async for peers in search_junction: + if peers: + result_queue.put_nowait(peers) + except asyncio.CancelledError: + return + + def accumulate_peers(self, search_queue: asyncio.Queue, + peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ + asyncio.Queue, asyncio.Task]: + q = peer_queue or asyncio.Queue() + return q, asyncio.create_task(self._accumulate_search_junction(search_queue, q))