refactor so we can stop trapping CancelledError everywhere
This commit is contained in:
parent
7a96e742f2
commit
2d8ebe25ed
3 changed files with 13 additions and 35 deletions
|
@ -116,22 +116,12 @@ class BlobDownloader:
|
|||
self.peer_queue.put_nowait(set(batch).difference(self.ignored))
|
||||
else:
|
||||
self.clearbanned()
|
||||
while self.active_connections:
|
||||
peer, task = self.active_connections.popitem()
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
blob.close()
|
||||
log.debug("downloaded %s", blob_hash[:8])
|
||||
return blob
|
||||
except asyncio.CancelledError:
|
||||
finally:
|
||||
while self.active_connections:
|
||||
peer, task = self.active_connections.popitem()
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
raise
|
||||
except (OSError, Exception) as e:
|
||||
log.exception(e)
|
||||
raise e
|
||||
self.active_connections.popitem()[1].cancel()
|
||||
|
||||
def close(self):
|
||||
self.scores.clear()
|
||||
|
|
|
@ -190,10 +190,7 @@ class Node:
|
|||
|
||||
async def _add_hashes_from_queue():
|
||||
while True:
|
||||
try:
|
||||
blob_hash = await hash_queue.get()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
blob_hash = await hash_queue.get()
|
||||
peer_generator.add_generator(
|
||||
self.get_iterative_value_finder(
|
||||
binascii.unhexlify(blob_hash.encode()), bottom_out_limit=bottom_out_limit,
|
||||
|
@ -205,10 +202,6 @@ class Node:
|
|||
async with peer_generator as junction:
|
||||
yield junction
|
||||
await peer_generator.finished.wait()
|
||||
except asyncio.CancelledError:
|
||||
if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()):
|
||||
add_hashes_task.cancel()
|
||||
raise
|
||||
finally:
|
||||
if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()):
|
||||
add_hashes_task.cancel()
|
||||
|
@ -236,19 +229,16 @@ class Node:
|
|||
|
||||
async def _accumulate_search_junction(self, search_queue: asyncio.Queue,
|
||||
result_queue: asyncio.Queue):
|
||||
try:
|
||||
async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701
|
||||
async for peers in search_junction:
|
||||
if peers:
|
||||
result_queue.put_nowait([
|
||||
peer for peer in peers
|
||||
if not (
|
||||
peer.address == self.protocol.external_ip
|
||||
and peer.tcp_port == self.protocol.peer_port
|
||||
)
|
||||
])
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
async with self.stream_peer_search_junction(search_queue) as search_junction: # pylint: disable=E1701
|
||||
async for peers in search_junction:
|
||||
if peers:
|
||||
result_queue.put_nowait([
|
||||
peer for peer in peers
|
||||
if not (
|
||||
peer.address == self.protocol.external_ip
|
||||
and peer.tcp_port == self.protocol.peer_port
|
||||
)
|
||||
])
|
||||
|
||||
def accumulate_peers(self, search_queue: asyncio.Queue,
|
||||
peer_queue: typing.Optional[asyncio.Queue] = None) -> typing.Tuple[
|
||||
|
|
|
@ -162,8 +162,6 @@ class IterativeFinder:
|
|||
async def _send_probe(self, peer: 'KademliaPeer'):
|
||||
try:
|
||||
response = await self.send_probe(peer)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
except asyncio.TimeoutError:
|
||||
self.active.discard(peer)
|
||||
return
|
||||
|
|
Loading…
Add table
Reference in a new issue