forked from LBRYCommunity/lbry-sdk
catch reflector error if a blob is deleted while it's being sent
This commit is contained in:
parent
0c95d96f32
commit
1f82a8b99e
2 changed files with 28 additions and 0 deletions
|
@ -356,6 +356,9 @@ class ManagedStream(ManagedDownloadSource):
|
||||||
return sent
|
return sent
|
||||||
except ConnectionRefusedError:
|
except ConnectionRefusedError:
|
||||||
return sent
|
return sent
|
||||||
|
except OSError:
|
||||||
|
# raised if a blob is deleted while it's being sent
|
||||||
|
return sent
|
||||||
finally:
|
finally:
|
||||||
if protocol.transport:
|
if protocol.transport:
|
||||||
protocol.transport.close()
|
protocol.transport.close()
|
||||||
|
|
|
@ -162,3 +162,28 @@ class TestReflector(AsyncioTestCase):
|
||||||
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
||||||
)
|
)
|
||||||
self.assertFalse(self.stream.is_fully_reflected)
|
self.assertFalse(self.stream.is_fully_reflected)
|
||||||
|
|
||||||
|
async def test_delete_file_during_reflector_upload(self):
|
||||||
|
stop = asyncio.Event()
|
||||||
|
incoming = asyncio.Event()
|
||||||
|
not_incoming = asyncio.Event()
|
||||||
|
reflector = ReflectorServer(
|
||||||
|
self.server_blob_manager, response_chunk_size=50, stop_event=stop, incoming_event=incoming,
|
||||||
|
not_incoming_event=not_incoming
|
||||||
|
)
|
||||||
|
reflector.start_server(5566, '127.0.0.1')
|
||||||
|
await reflector.started_listening.wait()
|
||||||
|
self.addCleanup(reflector.stop_server)
|
||||||
|
self.assertEqual(0, self.stream.reflector_progress)
|
||||||
|
reflect_task = asyncio.create_task(self.stream.upload_to_reflector('127.0.0.1', 5566))
|
||||||
|
await incoming.wait()
|
||||||
|
await not_incoming.wait()
|
||||||
|
await incoming.wait()
|
||||||
|
await self.stream_manager.delete(self.stream, delete_file=True)
|
||||||
|
# this used to raise OSError when it can't read the deleted blob for the upload
|
||||||
|
self.assertListEqual(await reflect_task, [self.stream.sd_hash])
|
||||||
|
self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified())
|
||||||
|
self.assertFalse(
|
||||||
|
self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()
|
||||||
|
)
|
||||||
|
self.assertFalse(self.stream.is_fully_reflected)
|
||||||
|
|
Loading…
Reference in a new issue