Merge pull request #3021 from lbryio/debug-reflector
Improve reflector upload cancellation handling
This commit is contained in:
commit
52f6dcf092
4 changed files with 49 additions and 16 deletions
|
@ -337,9 +337,9 @@ class BlobFile(AbstractBlob):
|
||||||
return self.loop.create_task(write_blob())
|
return self.loop.create_task(write_blob())
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
|
super().delete()
|
||||||
if os.path.isfile(self.file_path):
|
if os.path.isfile(self.file_path):
|
||||||
os.remove(self.file_path)
|
os.remove(self.file_path)
|
||||||
return super().delete()
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def create_from_unencrypted(
|
async def create_from_unencrypted(
|
||||||
|
|
|
@ -356,13 +356,23 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
return sent
|
return sent
|
||||||
except ConnectionRefusedError:
|
except ConnectionRefusedError:
|
||||||
return sent
|
return sent
|
||||||
except OSError:
|
except (OSError, Exception) as err:
|
||||||
# raised if a blob is deleted while it's being sent
|
if isinstance(err, asyncio.CancelledError):
|
||||||
return sent
|
log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id)
|
||||||
|
raise err
|
||||||
|
if isinstance(err, OSError):
|
||||||
|
log.warning(
|
||||||
|
"stopped uploading %s#%s to reflector because blobs were deleted or moved", self.claim_name,
|
||||||
|
self.claim_id
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.exception("unexpected error reflecting %s#%s", self.claim_name, self.claim_id)
|
||||||
|
raise asyncio.CancelledError()
|
||||||
finally:
|
finally:
|
||||||
if protocol.transport:
|
if protocol.transport:
|
||||||
protocol.transport.close()
|
protocol.transport.close()
|
||||||
self.uploading_to_reflector = False
|
self.uploading_to_reflector = False
|
||||||
|
|
||||||
if not self.fully_reflected.is_set():
|
if not self.fully_reflected.is_set():
|
||||||
self.fully_reflected.set()
|
self.fully_reflected.set()
|
||||||
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||||
|
|
|
@ -64,12 +64,12 @@ class StreamReflectorClient(asyncio.Protocol):
|
||||||
self.transport.write(msg.encode())
|
self.transport.write(msg.encode())
|
||||||
self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
|
self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
|
||||||
return await self.pending_request
|
return await self.pending_request
|
||||||
except (AttributeError, asyncio.CancelledError):
|
except (AttributeError, asyncio.CancelledError) as err:
|
||||||
# attribute error happens when we transport.write after disconnect
|
# attribute error happens when we transport.write after disconnect
|
||||||
# cancelled error happens when the pending_request task is cancelled by a disconnect
|
# cancelled error happens when the pending_request task is cancelled by a disconnect
|
||||||
if self.transport:
|
if self.transport:
|
||||||
self.transport.close()
|
self.transport.close()
|
||||||
raise asyncio.TimeoutError()
|
raise err if isinstance(err, asyncio.CancelledError) else asyncio.CancelledError()
|
||||||
finally:
|
finally:
|
||||||
self.pending_request = None
|
self.pending_request = None
|
||||||
|
|
||||||
|
@ -94,8 +94,16 @@ class StreamReflectorClient(asyncio.Protocol):
|
||||||
needed = response.get('needed_blobs', [])
|
needed = response.get('needed_blobs', [])
|
||||||
sent_sd = False
|
sent_sd = False
|
||||||
if response['send_sd_blob']:
|
if response['send_sd_blob']:
|
||||||
await sd_blob.sendfile(self)
|
try:
|
||||||
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
sent = await sd_blob.sendfile(self)
|
||||||
|
if sent == -1:
|
||||||
|
log.warning("failed to send sd blob")
|
||||||
|
raise asyncio.CancelledError()
|
||||||
|
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
||||||
|
except asyncio.CancelledError as err:
|
||||||
|
if self.transport:
|
||||||
|
self.transport.close()
|
||||||
|
raise err
|
||||||
if received.get('received_sd_blob'):
|
if received.get('received_sd_blob'):
|
||||||
sent_sd = True
|
sent_sd = True
|
||||||
if not needed:
|
if not needed:
|
||||||
|
@ -118,8 +126,16 @@ class StreamReflectorClient(asyncio.Protocol):
|
||||||
if 'send_blob' not in response:
|
if 'send_blob' not in response:
|
||||||
raise ValueError("I don't know whether to send the blob or not!")
|
raise ValueError("I don't know whether to send the blob or not!")
|
||||||
if response['send_blob']:
|
if response['send_blob']:
|
||||||
await blob.sendfile(self)
|
try:
|
||||||
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
sent = await blob.sendfile(self)
|
||||||
|
if sent == -1:
|
||||||
|
log.warning("failed to send blob")
|
||||||
|
raise asyncio.CancelledError()
|
||||||
|
received = await asyncio.wait_for(self.response_queue.get(), 30)
|
||||||
|
except asyncio.CancelledError as err:
|
||||||
|
if self.transport:
|
||||||
|
self.transport.close()
|
||||||
|
raise err
|
||||||
if received.get('received_blob'):
|
if received.get('received_blob'):
|
||||||
self.reflected_blobs.append(blob.blob_hash)
|
self.reflected_blobs.append(blob.blob_hash)
|
||||||
log.info("Sent reflector blob %s", blob.blob_hash[:8])
|
log.info("Sent reflector blob %s", blob.blob_hash[:8])
|
||||||
|
|
|
@ -40,7 +40,7 @@ class TestReflector(AsyncioTestCase):
|
||||||
file_path = os.path.join(tmp_dir, "test_file")
|
file_path = os.path.join(tmp_dir, "test_file")
|
||||||
with open(file_path, 'wb') as f:
|
with open(file_path, 'wb') as f:
|
||||||
f.write(self.cleartext)
|
f.write(self.cleartext)
|
||||||
|
self.stream_manager.config.reflect_streams = False
|
||||||
self.stream = await self.stream_manager.create(file_path)
|
self.stream = await self.stream_manager.create(file_path)
|
||||||
|
|
||||||
async def _test_reflect_stream(self, response_chunk_size):
|
async def _test_reflect_stream(self, response_chunk_size):
|
||||||
|
@ -93,8 +93,10 @@ class TestReflector(AsyncioTestCase):
|
||||||
await incoming.wait()
|
await incoming.wait()
|
||||||
stop.set()
|
stop.set()
|
||||||
# this used to raise (and then propagate) a CancelledError
|
# this used to raise (and then propagate) a CancelledError
|
||||||
self.assertListEqual(await reflect_task, [])
|
with self.assertRaises(asyncio.CancelledError):
|
||||||
|
await reflect_task
|
||||||
self.assertFalse(self.stream.is_fully_reflected)
|
self.assertFalse(self.stream.is_fully_reflected)
|
||||||
|
self.assertFalse(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||||
|
|
||||||
async def test_result_from_disconnect_after_sd_transfer(self):
|
async def test_result_from_disconnect_after_sd_transfer(self):
|
||||||
stop = asyncio.Event()
|
stop = asyncio.Event()
|
||||||
|
@ -112,7 +114,8 @@ class TestReflector(AsyncioTestCase):
|
||||||
await incoming.wait()
|
await incoming.wait()
|
||||||
await not_incoming.wait()
|
await not_incoming.wait()
|
||||||
stop.set()
|
stop.set()
|
||||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
with self.assertRaises(asyncio.CancelledError):
|
||||||
|
await reflect_task
|
||||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||||
self.assertFalse(self.stream.is_fully_reflected)
|
self.assertFalse(self.stream.is_fully_reflected)
|
||||||
|
|
||||||
|
@ -134,7 +137,8 @@ class TestReflector(AsyncioTestCase):
|
||||||
await incoming.wait()
|
await incoming.wait()
|
||||||
await not_incoming.wait()
|
await not_incoming.wait()
|
||||||
stop.set()
|
stop.set()
|
||||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash, self.stream.descriptor.blobs[0].blob_hash])
|
with self.assertRaises(asyncio.CancelledError):
|
||||||
|
await reflect_task
|
||||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified())
|
self.assertTrue(self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified())
|
||||||
self.assertFalse(self.stream.is_fully_reflected)
|
self.assertFalse(self.stream.is_fully_reflected)
|
||||||
|
@ -156,7 +160,9 @@ class TestReflector(AsyncioTestCase):
|
||||||
await not_incoming.wait()
|
await not_incoming.wait()
|
||||||
await incoming.wait()
|
await incoming.wait()
|
||||||
stop.set()
|
stop.set()
|
||||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
with self.assertRaises(asyncio.CancelledError):
|
||||||
|
await reflect_task
|
||||||
|
# self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
||||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
||||||
|
@ -181,7 +187,8 @@ class TestReflector(AsyncioTestCase):
|
||||||
await incoming.wait()
|
await incoming.wait()
|
||||||
await self.stream_manager.delete(self.stream, delete_file=True)
|
await self.stream_manager.delete(self.stream, delete_file=True)
|
||||||
# this used to raise OSError when it can't read the deleted blob for the upload
|
# this used to raise OSError when it can't read the deleted blob for the upload
|
||||||
self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
with self.assertRaises(asyncio.CancelledError):
|
||||||
|
await reflect_task
|
||||||
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
||||||
|
|
Loading…
Reference in a new issue