diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index da625c381..5e4a0057b 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -356,6 +356,9 @@ class ManagedStream(ManagedDownloadSource): return sent except ConnectionRefusedError: return sent + except OSError: + # raised if a blob is deleted while it's being sent + return sent finally: if protocol.transport: protocol.transport.close() diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 6616d0d04..078e26286 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -162,3 +162,28 @@ class TestReflector(AsyncioTestCase): self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified() ) self.assertFalse(self.stream.is_fully_reflected) + + async def test_delete_file_during_reflector_upload(self): + stop = asyncio.Event() + incoming = asyncio.Event() + not_incoming = asyncio.Event() + reflector = ReflectorServer( + self.server_blob_manager, response_chunk_size=50, stop_event=stop, incoming_event=incoming, + not_incoming_event=not_incoming + ) + reflector.start_server(5566, '127.0.0.1') + await reflector.started_listening.wait() + self.addCleanup(reflector.stop_server) + self.assertEqual(0, self.stream.reflector_progress) + reflect_task = asyncio.create_task(self.stream.upload_to_reflector('127.0.0.1', 5566)) + await incoming.wait() + await not_incoming.wait() + await incoming.wait() + await self.stream_manager.delete(self.stream, delete_file=True) + # this used to raise OSError when it can't read the deleted blob for the upload + self.assertListEqual(await reflect_task, [self.stream.sd_hash]) + self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) + self.assertFalse( + self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified() + ) + self.assertFalse(self.stream.is_fully_reflected)