Merge pull request #1952 from lbryio/no_trap

stop trapping CancelledError
This commit is contained in:
Jack Robison 2019-02-27 11:06:02 -05:00 committed by GitHub
commit 591e66bfbb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 25 additions and 51 deletions

View file

@ -116,22 +116,12 @@ class BlobDownloader:
self.peer_queue.put_nowait(set(batch).difference(self.ignored)) self.peer_queue.put_nowait(set(batch).difference(self.ignored))
else: else:
self.clearbanned() self.clearbanned()
while self.active_connections:
peer, task = self.active_connections.popitem()
if task and not task.done():
task.cancel()
blob.close() blob.close()
log.debug("downloaded %s", blob_hash[:8]) log.debug("downloaded %s", blob_hash[:8])
return blob return blob
except asyncio.CancelledError: finally:
while self.active_connections: while self.active_connections:
peer, task = self.active_connections.popitem() self.active_connections.popitem()[1].cancel()
if task and not task.done():
task.cancel()
raise
except (OSError, Exception) as e:
log.exception(e)
raise e
def close(self): def close(self):
self.scores.clear() self.scores.clear()

View file

@ -190,10 +190,7 @@ class Node:
async def _add_hashes_from_queue(): async def _add_hashes_from_queue():
while True: while True:
try:
blob_hash = await hash_queue.get() blob_hash = await hash_queue.get()
except asyncio.CancelledError:
break
peer_generator.add_generator( peer_generator.add_generator(
self.get_iterative_value_finder( self.get_iterative_value_finder(
binascii.unhexlify(blob_hash.encode()), bottom_out_limit=bottom_out_limit, binascii.unhexlify(blob_hash.encode()), bottom_out_limit=bottom_out_limit,
@ -205,10 +202,6 @@ class Node:
async with peer_generator as junction: async with peer_generator as junction:
yield junction yield junction
await peer_generator.finished.wait() await peer_generator.finished.wait()
except asyncio.CancelledError:
if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()):
add_hashes_task.cancel()
raise
finally: finally:
if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()): if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()):
add_hashes_task.cancel() add_hashes_task.cancel()
@ -236,7 +229,6 @@ class Node:
async def _accumulate_search_junction(self, search_queue: asyncio.Queue, async def _accumulate_search_junction(self, search_queue: asyncio.Queue,
result_queue: asyncio.Queue): result_queue: asyncio.Queue):
try:
async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701 async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701
async for peers in search_junction: async for peers in search_junction:
if peers: if peers:
@ -247,8 +239,6 @@ class Node:
and peer.tcp_port == self.protocol.peer_port and peer.tcp_port == self.protocol.peer_port
) )
]) ])
except asyncio.CancelledError:
return
def accumulate_peers(self, search_queue: asyncio.Queue, def accumulate_peers(self, search_queue: asyncio.Queue,
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[ peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[

View file

@ -162,8 +162,6 @@ class IterativeFinder:
async def _send_probe(self, peer: 'KademliaPeer'): async def _send_probe(self, peer: 'KademliaPeer'):
try: try:
response = await self.send_probe(peer) response = await self.send_probe(peer)
except asyncio.CancelledError:
return
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.active.discard(peer) self.active.discard(peer)
return return

View file

@ -489,6 +489,9 @@ class Daemon(metaclass=JSONRPCServerType):
if asyncio.iscoroutine(result): if asyncio.iscoroutine(result):
result = await result result = await result
return result return result
except asyncio.CancelledError:
log.info("cancelled API call for: %s", function_name)
raise
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
log.exception("error handling api request") log.exception("error handling api request")
return JSONRPCError( return JSONRPCError(

View file

@ -206,7 +206,7 @@ class ManagedStream:
for blob_hash in we_have: for blob_hash in we_have:
await protocol.send_blob(blob_hash) await protocol.send_blob(blob_hash)
sent.append(blob_hash) sent.append(blob_hash)
except (asyncio.CancelledError, asyncio.TimeoutError, ValueError): except (asyncio.TimeoutError, ValueError):
return sent return sent
except ConnectionRefusedError: except ConnectionRefusedError:
return sent return sent

View file

@ -71,17 +71,14 @@ class ReflectorServerProtocol(asyncio.Protocol):
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
self.loop, self.blob_manager.blob_dir, self.sd_blob self.loop, self.blob_manager.blob_dir, self.sd_blob
) )
self.incoming.clear()
self.writer.close_handle()
self.writer = None
self.send_response({"received_sd_blob": True}) self.send_response({"received_sd_blob": True})
except (asyncio.TimeoutError, asyncio.CancelledError): except asyncio.TimeoutError:
self.send_response({"received_sd_blob": False})
self.transport.close()
finally:
self.incoming.clear() self.incoming.clear()
self.writer.close_handle() self.writer.close_handle()
self.writer = None self.writer = None
self.transport.close()
self.send_response({"received_sd_blob": False})
return
else: else:
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
self.loop, self.blob_manager.blob_dir, self.sd_blob self.loop, self.blob_manager.blob_dir, self.sd_blob
@ -111,7 +108,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
try: try:
await asyncio.wait_for(blob.finished_writing.wait(), 30, loop=self.loop) await asyncio.wait_for(blob.finished_writing.wait(), 30, loop=self.loop)
self.send_response({"received_blob": True}) self.send_response({"received_blob": True})
except (asyncio.TimeoutError, asyncio.CancelledError): except asyncio.TimeoutError:
self.send_response({"received_blob": False}) self.send_response({"received_blob": False})
self.incoming.clear() self.incoming.clear()
self.writer.close_handle() self.writer.close_handle()

View file

@ -275,11 +275,8 @@ class StreamManager:
def wait_for_stream_finished(self, stream: ManagedStream): def wait_for_stream_finished(self, stream: ManagedStream):
async def _wait_for_stream_finished(): async def _wait_for_stream_finished():
if stream.downloader and stream.running: if stream.downloader and stream.running:
try:
await stream.downloader.stream_finished_event.wait() await stream.downloader.stream_finished_event.wait()
stream.update_status(ManagedStream.STATUS_FINISHED) stream.update_status(ManagedStream.STATUS_FINISHED)
except asyncio.CancelledError:
pass
task = self.loop.create_task(_wait_for_stream_finished()) task = self.loop.create_task(_wait_for_stream_finished())
self.update_stream_finished_futs.append(task) self.update_stream_finished_futs.append(task)
task.add_done_callback( task.add_done_callback(
@ -358,10 +355,9 @@ class StreamManager:
stream.tx = await self.wallet.send_amount_to_address( stream.tx = await self.wallet.send_amount_to_address(
lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')) lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1'))
return stream return stream
except (asyncio.TimeoutError, asyncio.CancelledError) as e: except asyncio.TimeoutError as e:
if stream_task.exception(): if stream_task.exception():
raise stream_task.exception() raise stream_task.exception()
return
finally: finally:
if sd_hash in self.starting_streams: if sd_hash in self.starting_streams:
del self.starting_streams[sd_hash] del self.starting_streams[sd_hash]