From 2d8ebe25ed38852cb7e87dc962eaa5e81ef72bb4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Feb 2019 00:03:40 -0300 Subject: [PATCH] refactor so we can stop trapping CancelledError everywhere --- lbrynet/blob_exchange/downloader.py | 14 ++--------- lbrynet/dht/node.py | 32 +++++++++----------------- lbrynet/dht/protocol/iterative_find.py | 2 -- 3 files changed, 13 insertions(+), 35 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 76bc6218f..a2661e455 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -116,22 +116,12 @@ class BlobDownloader: self.peer_queue.put_nowait(set(batch).difference(self.ignored)) else: self.clearbanned() - while self.active_connections: - peer, task = self.active_connections.popitem() - if task and not task.done(): - task.cancel() blob.close() log.debug("downloaded %s", blob_hash[:8]) return blob - except asyncio.CancelledError: + finally: while self.active_connections: - peer, task = self.active_connections.popitem() - if task and not task.done(): - task.cancel() - raise - except (OSError, Exception) as e: - log.exception(e) - raise e + self.active_connections.popitem()[1].cancel() def close(self): self.scores.clear() diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index f1d1a9574..05bcdb0f5 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -190,10 +190,7 @@ class Node: async def _add_hashes_from_queue(): while True: - try: - blob_hash = await hash_queue.get() - except asyncio.CancelledError: - break + blob_hash = await hash_queue.get() peer_generator.add_generator( self.get_iterative_value_finder( binascii.unhexlify(blob_hash.encode()), bottom_out_limit=bottom_out_limit, @@ -205,10 +202,6 @@ class Node: async with peer_generator as junction: yield junction await peer_generator.finished.wait() - except asyncio.CancelledError: - if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()): - add_hashes_task.cancel() - raise finally: if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()): add_hashes_task.cancel() @@ -236,19 +229,16 @@ class Node: async def _accumulate_search_junction(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue): - try: - async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701 - async for peers in search_junction: - if peers: - result_queue.put_nowait([ - peer for peer in peers - if not ( - peer.address == self.protocol.external_ip - and peer.tcp_port == self.protocol.peer_port - ) - ]) - except asyncio.CancelledError: - return + async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701 + async for peers in search_junction: + if peers: + result_queue.put_nowait([ + peer for peer in peers + if not ( + peer.address == self.protocol.external_ip + and peer.tcp_port == self.protocol.peer_port + ) + ]) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index ad5fa551c..a899f210d 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -162,8 +162,6 @@ class IterativeFinder: async def _send_probe(self, peer: 'KademliaPeer'): try: response = await self.send_probe(peer) - except asyncio.CancelledError: - return except asyncio.TimeoutError: self.active.discard(peer) return