improve reflector upload cancellation handling
This commit is contained in:
parent
da391bcc8d
commit
8426b674a3
3 changed files with 48 additions and 15 deletions
|
@ -356,13 +356,23 @@ class ManagedStream(ManagedDownloadSource):
|
|||
return sent
|
||||
except ConnectionRefusedError:
|
||||
return sent
|
||||
except OSError:
|
||||
# raised if a blob is deleted while it's being sent
|
||||
return sent
|
||||
except (OSError, Exception) as err:
|
||||
if isinstance(err, asyncio.CancelledError):
|
||||
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
|
||||
raise err
|
||||
elif isinstance(err, OSError):
|
||||
log.warning(
|
||||
"stopped uploading %s#%s to reflector because blobs were deleted or moved", self.claim_name,
|
||||
self.claim_id
|
||||
)
|
||||
else:
|
||||
log.exception("unexpected error reflecting %s#%s", self.claim_name, self.claim_id)
|
||||
raise asyncio.CancelledError()
|
||||
finally:
|
||||
if protocol.transport:
|
||||
protocol.transport.close()
|
||||
self.uploading_to_reflector = False
|
||||
|
||||
if not self.fully_reflected.is_set():
|
||||
self.fully_reflected.set()
|
||||
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||
|
|
|
@ -64,12 +64,12 @@ class StreamReflectorClient(asyncio.Protocol):
|
|||
self.transport.write(msg.encode())
|
||||
self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
|
||||
return await self.pending_request
|
||||
except (AttributeError, asyncio.CancelledError):
|
||||
except (AttributeError, asyncio.CancelledError) as err:
|
||||
# attribute error happens when we transport.write after disconnect
|
||||
# cancelled error happens when the pending_request task is cancelled by a disconnect
|
||||
if self.transport:
|
||||
self.transport.close()
|
||||
raise asyncio.TimeoutError()
|
||||
raise err if isinstance(err, asyncio.CancelledError) else asyncio.CancelledError()
|
||||
finally:
|
||||
self.pending_request = None
|
||||
|
||||
|
@ -94,8 +94,16 @@ class StreamReflectorClient(asyncio.Protocol):
|
|||
needed = response.get('needed_blobs', [])
|
||||
sent_sd = False
|
||||
if response['send_sd_blob']:
|
||||
await sd_blob.sendfile(self)
|
||||
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
||||
try:
|
||||
sent = await sd_blob.sendfile(self)
|
||||
if sent == -1:
|
||||
log.warning("failed to send sd blob")
|
||||
raise asyncio.CancelledError()
|
||||
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
||||
except asyncio.CancelledError as err:
|
||||
if self.transport:
|
||||
self.transport.close()
|
||||
raise err
|
||||
if received.get('received_sd_blob'):
|
||||
sent_sd = True
|
||||
if not needed:
|
||||
|
@ -118,8 +126,16 @@ class StreamReflectorClient(asyncio.Protocol):
|
|||
if 'send_blob' not in response:
|
||||
raise ValueError("I don't know whether to send the blob or not!")
|
||||
if response['send_blob']:
|
||||
await blob.sendfile(self)
|
||||
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
||||
try:
|
||||
sent = await blob.sendfile(self)
|
||||
if sent == -1:
|
||||
log.warning("failed to send blob")
|
||||
raise asyncio.CancelledError()
|
||||
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
||||
except asyncio.CancelledError as err:
|
||||
if self.transport:
|
||||
self.transport.close()
|
||||
raise err
|
||||
if received.get('received_blob'):
|
||||
self.reflected_blobs.append(blob.blob_hash)
|
||||
log.info("Sent reflector blob %s", blob.blob_hash[:8])
|
||||
|
|
|
@ -40,7 +40,7 @@ class TestReflector(AsyncioTestCase):
|
|||
file_path = os.path.join(tmp_dir, "test_file")
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(self.cleartext)
|
||||
|
||||
self.stream_manager.config.reflect_streams = False
|
||||
self.stream = await self.stream_manager.create(file_path)
|
||||
|
||||
async def _test_reflect_stream(self, response_chunk_size):
|
||||
|
@ -93,8 +93,10 @@ class TestReflector(AsyncioTestCase):
|
|||
await incoming.wait()
|
||||
stop.set()
|
||||
# this used to raise (and then propagate) a CancelledError
|
||||
self.assertListEqual(await reflect_task, [])
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await reflect_task
|
||||
self.assertFalse(self.stream.is_fully_reflected)
|
||||
self.assertFalse(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||
|
||||
async def test_result_from_disconnect_after_sd_transfer(self):
|
||||
stop = asyncio.Event()
|
||||
|
@ -112,7 +114,8 @@ class TestReflector(AsyncioTestCase):
|
|||
await incoming.wait()
|
||||
await not_incoming.wait()
|
||||
stop.set()
|
||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await reflect_task
|
||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||
self.assertFalse(self.stream.is_fully_reflected)
|
||||
|
||||
|
@ -134,7 +137,8 @@ class TestReflector(AsyncioTestCase):
|
|||
await incoming.wait()
|
||||
await not_incoming.wait()
|
||||
stop.set()
|
||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash, self.stream.descriptor.blobs[0].blob_hash])
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await reflect_task
|
||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified())
|
||||
self.assertFalse(self.stream.is_fully_reflected)
|
||||
|
@ -156,7 +160,9 @@ class TestReflector(AsyncioTestCase):
|
|||
await not_incoming.wait()
|
||||
await incoming.wait()
|
||||
stop.set()
|
||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await reflect_task
|
||||
# self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||
self.assertFalse(
|
||||
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
||||
|
@ -181,7 +187,8 @@ class TestReflector(AsyncioTestCase):
|
|||
await incoming.wait()
|
||||
await self.stream_manager.delete(self.stream, delete_file=True)
|
||||
# this used to raise OSError when it can't read the deleted blob for the upload
|
||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
||||
with self.assertRaises(asyncio.CancelledError):
|
||||
await reflect_task
|
||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||
self.assertFalse(
|
||||
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
||||
|
|
Loading…
Add table
Reference in a new issue