diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 3bcdc97f0..5b9467742 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -13,11 +13,6 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -def drain_into(a: list, b: list): - while a: - b.append(a.pop()) - - class BlobDownloader: def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobFileManager', peer_queue: asyncio.Queue): @@ -52,11 +47,17 @@ class BlobDownloader: self.peer_queue.put_nowait(new_peers) tasks = [self.loop.create_task(get_and_re_add_peers()), self.loop.create_task(blob.verified.wait())] try: - await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED') + done, pending = await asyncio.wait(tasks, loop=self.loop, return_when='FIRST_COMPLETED') + drain_tasks(pending) except asyncio.CancelledError: drain_tasks(tasks) raise + def cleanup_active(self): + to_remove = [peer for (peer, task) in self.active_connections.items() if task.done()] + for peer in to_remove: + del self.active_connections[peer] + async def download_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': blob = self.blob_manager.get_blob(blob_hash, length) if blob.get_is_verified(): @@ -65,7 +66,11 @@ class BlobDownloader: while not blob.get_is_verified(): batch: typing.List['KademliaPeer'] = [] while not self.peer_queue.empty(): - batch.extend(await self.peer_queue.get()) + batch.extend(self.peer_queue.get_nowait()) + log.debug( + "running, %d peers, %d ignored, %d active", + len(batch), len(self.ignored), len(self.active_connections) + ) for peer in batch: if len(self.active_connections) >= self.config.max_connections_per_download: break @@ -73,11 +78,11 @@ class BlobDownloader: log.debug("request %s from %s:%i", blob_hash[:8], peer.address, peer.tcp_port) t = self.loop.create_task(self.request_blob_from_peer(blob, peer)) self.active_connections[peer] = t - t.add_done_callback( - lambda _: - None if peer not in self.active_connections else self.active_connections.pop(peer) - ) - await self.new_peer_or_finished(blob) + if self.active_connections: + await asyncio.wait(self.active_connections.values(), return_when='FIRST_COMPLETED') + self.cleanup_active() + else: + await self.new_peer_or_finished(blob) to_re_add = list(set(filter(lambda peer: peer not in self.ignored, batch))) to_re_add.sort(key=lambda peer: self.scores.get(peer, 0), reverse=True) if to_re_add: @@ -95,6 +100,9 @@ class BlobDownloader: if task and not task.done(): task.cancel() raise + except (OSError, Exception) as e: + log.exception(e) + raise e async def download_blob(loop, config: 'Config', blob_manager: 'BlobFileManager', node: 'Node',