From 7a96e742f2c6398f0239ac7977c334ace0f2f4d3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 21 Feb 2019 23:07:45 -0300 Subject: [PATCH 1/4] stop trapping CancelledError --- lbrynet/stream/managed_stream.py | 2 +- lbrynet/stream/reflector/server.py | 13 +++++-------- lbrynet/stream/stream_manager.py | 10 +++------- 3 files changed, 9 insertions(+), 16 deletions(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 5e7404e08..2ca0247ef 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -206,7 +206,7 @@ class ManagedStream: for blob_hash in we_have: await protocol.send_blob(blob_hash) sent.append(blob_hash) - except (asyncio.CancelledError, asyncio.TimeoutError, ValueError): + except (asyncio.TimeoutError, ValueError): return sent except ConnectionRefusedError: return sent diff --git a/lbrynet/stream/reflector/server.py b/lbrynet/stream/reflector/server.py index f403fa329..ff3226e29 100644 --- a/lbrynet/stream/reflector/server.py +++ b/lbrynet/stream/reflector/server.py @@ -71,17 +71,14 @@ class ReflectorServerProtocol(asyncio.Protocol): self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.loop, self.blob_manager.blob_dir, self.sd_blob ) - self.incoming.clear() - self.writer.close_handle() - self.writer = None self.send_response({"received_sd_blob": True}) - except (asyncio.TimeoutError, asyncio.CancelledError): + except asyncio.TimeoutError: + self.send_response({"received_sd_blob": False}) + self.transport.close() + finally: self.incoming.clear() self.writer.close_handle() self.writer = None - self.transport.close() - self.send_response({"received_sd_blob": False}) - return else: self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.loop, self.blob_manager.blob_dir, self.sd_blob @@ -111,7 +108,7 @@ class ReflectorServerProtocol(asyncio.Protocol): try: await asyncio.wait_for(blob.finished_writing.wait(), 30, loop=self.loop) self.send_response({"received_blob": True}) - except (asyncio.TimeoutError, asyncio.CancelledError): + except (asyncio.TimeoutError): self.send_response({"received_blob": False}) self.incoming.clear() self.writer.close_handle() diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 436620fa4..386f0902e 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -275,11 +275,8 @@ class StreamManager: def wait_for_stream_finished(self, stream: ManagedStream): async def _wait_for_stream_finished(): if stream.downloader and stream.running: - try: - await stream.downloader.stream_finished_event.wait() - stream.update_status(ManagedStream.STATUS_FINISHED) - except asyncio.CancelledError: - pass + await stream.downloader.stream_finished_event.wait() + stream.update_status(ManagedStream.STATUS_FINISHED) task = self.loop.create_task(_wait_for_stream_finished()) self.update_stream_finished_futs.append(task) task.add_done_callback( @@ -358,10 +355,9 @@ class StreamManager: stream.tx = await self.wallet.send_amount_to_address( lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')) return stream - except (asyncio.TimeoutError, asyncio.CancelledError) as e: + except asyncio.TimeoutError as e: if stream_task.exception(): raise stream_task.exception() - return finally: if sd_hash in self.starting_streams: del self.starting_streams[sd_hash] From 2d8ebe25ed38852cb7e87dc962eaa5e81ef72bb4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Feb 2019 00:03:40 -0300 Subject: [PATCH 2/4] refactor so we can stop trapping CancelledError everywhere --- lbrynet/blob_exchange/downloader.py | 14 ++--------- lbrynet/dht/node.py | 32 +++++++++----------------- lbrynet/dht/protocol/iterative_find.py | 2 -- 3 files changed, 13 insertions(+), 35 deletions(-) diff --git a/lbrynet/blob_exchange/downloader.py b/lbrynet/blob_exchange/downloader.py index 76bc6218f..a2661e455 100644 --- a/lbrynet/blob_exchange/downloader.py +++ b/lbrynet/blob_exchange/downloader.py @@ -116,22 +116,12 @@ class BlobDownloader: self.peer_queue.put_nowait(set(batch).difference(self.ignored)) else: self.clearbanned() - while self.active_connections: - peer, task = self.active_connections.popitem() - if task and not task.done(): - task.cancel() blob.close() log.debug("downloaded %s", blob_hash[:8]) return blob - except asyncio.CancelledError: + finally: while self.active_connections: - peer, task = self.active_connections.popitem() - if task and not task.done(): - task.cancel() - raise - except (OSError, Exception) as e: - log.exception(e) - raise e + self.active_connections.popitem()[1].cancel() def close(self): self.scores.clear() diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index f1d1a9574..05bcdb0f5 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -190,10 +190,7 @@ class Node: async def _add_hashes_from_queue(): while True: - try: - blob_hash = await hash_queue.get() - except asyncio.CancelledError: - break + blob_hash = await hash_queue.get() peer_generator.add_generator( self.get_iterative_value_finder( binascii.unhexlify(blob_hash.encode()), bottom_out_limit=bottom_out_limit, @@ -205,10 +202,6 @@ class Node: async with peer_generator as junction: yield junction await peer_generator.finished.wait() - except asyncio.CancelledError: - if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()): - add_hashes_task.cancel() - raise finally: if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()): add_hashes_task.cancel() @@ -236,19 +229,16 @@ class Node: async def _accumulate_search_junction(self, search_queue: asyncio.Queue, result_queue: asyncio.Queue): - try: - async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701 - async for peers in search_junction: - if peers: - result_queue.put_nowait([ - peer for peer in peers - if not ( - peer.address == self.protocol.external_ip - and peer.tcp_port == self.protocol.peer_port - ) - ]) - except asyncio.CancelledError: - return + async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701 + async for peers in search_junction: + if peers: + result_queue.put_nowait([ + peer for peer in peers + if not ( + peer.address == self.protocol.external_ip + and peer.tcp_port == self.protocol.peer_port + ) + ]) def accumulate_peers(self, search_queue: asyncio.Queue, peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index ad5fa551c..a899f210d 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -162,8 +162,6 @@ class IterativeFinder: async def _send_probe(self, peer: 'KademliaPeer'): try: response = await self.send_probe(peer) - except asyncio.CancelledError: - return except asyncio.TimeoutError: self.active.discard(peer) return From 9f071bbe3652f437d34899a7d510c86a3ba6bb32 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Feb 2019 00:08:36 -0300 Subject: [PATCH 3/4] pylint --- lbrynet/stream/reflector/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/stream/reflector/server.py b/lbrynet/stream/reflector/server.py index ff3226e29..d5effd0a4 100644 --- a/lbrynet/stream/reflector/server.py +++ b/lbrynet/stream/reflector/server.py @@ -108,7 +108,7 @@ class ReflectorServerProtocol(asyncio.Protocol): try: await asyncio.wait_for(blob.finished_writing.wait(), 30, loop=self.loop) self.send_response({"received_blob": True}) - except (asyncio.TimeoutError): + except asyncio.TimeoutError: self.send_response({"received_blob": False}) self.incoming.clear() self.writer.close_handle() From e04774896120b1a00207fa9668a44102ac1ace40 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Feb 2019 00:36:23 -0300 Subject: [PATCH 4/4] log dropped api request instead of stack trace --- lbrynet/extras/daemon/Daemon.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 836053132..f4e590e2b 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -489,6 +489,9 @@ class Daemon(metaclass=JSONRPCServerType): if asyncio.iscoroutine(result): result = await result return result + except asyncio.CancelledError: + log.info("cancelled API call for: %s", function_name) + raise except Exception as e: # pylint: disable=broad-except log.exception("error handling api request") return JSONRPCError(