diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 5e4a0057b..a00dbb29a 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -356,13 +356,23 @@ class ManagedStream(ManagedDownloadSource): return sent except ConnectionRefusedError: return sent - except OSError: - # raised if a blob is deleted while it's being sent - return sent + except (OSError, Exception) as err: + if isinstance(err, asyncio.CancelledError): + log.warning("stopped uploading %s#%s to reflector", self.claim_name, self.claim_id) + raise err + elif isinstance(err, OSError): + log.warning( + "stopped uploading %s#%s to reflector because blobs were deleted or moved", self.claim_name, + self.claim_id + ) + else: + log.exception("unexpected error reflecting %s#%s", self.claim_name, self.claim_id) + raise asyncio.CancelledError() finally: if protocol.transport: protocol.transport.close() self.uploading_to_reflector = False + if not self.fully_reflected.is_set(): self.fully_reflected.set() await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") diff --git a/lbry/stream/reflector/client.py b/lbry/stream/reflector/client.py index 07544733e..459e87b75 100644 --- a/lbry/stream/reflector/client.py +++ b/lbry/stream/reflector/client.py @@ -64,12 +64,12 @@ class StreamReflectorClient(asyncio.Protocol): self.transport.write(msg.encode()) self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout)) return await self.pending_request - except (AttributeError, asyncio.CancelledError): + except (AttributeError, asyncio.CancelledError) as err: # attribute error happens when we transport.write after disconnect # cancelled error happens when the pending_request task is cancelled by a disconnect if self.transport: self.transport.close() - raise asyncio.TimeoutError() + raise err if isinstance(err, asyncio.CancelledError) else asyncio.CancelledError() finally: self.pending_request = None @@ -94,8 +94,16 @@ class StreamReflectorClient(asyncio.Protocol): needed = response.get('needed_blobs', []) sent_sd = False if response['send_sd_blob']: - await sd_blob.sendfile(self) - received = await asyncio.wait_for(self.response_queue.get(), 30) + try: + sent = await sd_blob.sendfile(self) + if sent == -1: + log.warning("failed to send sd blob") + raise asyncio.CancelledError() + received = await asyncio.wait_for(self.response_queue.get(), 30) + except asyncio.CancelledError as err: + if self.transport: + self.transport.close() + raise err if received.get('received_sd_blob'): sent_sd = True if not needed: @@ -118,8 +126,16 @@ class StreamReflectorClient(asyncio.Protocol): if 'send_blob' not in response: raise ValueError("I don't know whether to send the blob or not!") if response['send_blob']: - await blob.sendfile(self) - received = await asyncio.wait_for(self.response_queue.get(), 30) + try: + sent = await blob.sendfile(self) + if sent == -1: + log.warning("failed to send blob") + raise asyncio.CancelledError() + received = await asyncio.wait_for(self.response_queue.get(), 30) + except asyncio.CancelledError as err: + if self.transport: + self.transport.close() + raise err if received.get('received_blob'): self.reflected_blobs.append(blob.blob_hash) log.info("Sent reflector blob %s", blob.blob_hash[:8]) diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 078e26286..a02728c49 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -40,7 +40,7 @@ class TestReflector(AsyncioTestCase): file_path = os.path.join(tmp_dir, "test_file") with open(file_path, 'wb') as f: f.write(self.cleartext) - + self.stream_manager.config.reflect_streams = False self.stream = await self.stream_manager.create(file_path) async def _test_reflect_stream(self, response_chunk_size): @@ -93,8 +93,10 @@ class TestReflector(AsyncioTestCase): await incoming.wait() stop.set() # this used to raise (and then propagate) a CancelledError - self.assertListEqual(await reflect_task, []) + with self.assertRaises(asyncio.CancelledError): + await reflect_task self.assertFalse(self.stream.is_fully_reflected) + self.assertFalse(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) async def test_result_from_disconnect_after_sd_transfer(self): stop = asyncio.Event() @@ -112,7 +114,8 @@ class TestReflector(AsyncioTestCase): await incoming.wait() await not_incoming.wait() stop.set() - self.assertListEqual(await reflect_task, [self.stream.sd_hash]) + with self.assertRaises(asyncio.CancelledError): + await reflect_task self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertFalse(self.stream.is_fully_reflected) @@ -134,7 +137,8 @@ class TestReflector(AsyncioTestCase): await incoming.wait() await not_incoming.wait() stop.set() - self.assertListEqual(await reflect_task, [self.stream.sd_hash, self.stream.descriptor.blobs[0].blob_hash]) + with self.assertRaises(asyncio.CancelledError): + await reflect_task self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertTrue(self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()) self.assertFalse(self.stream.is_fully_reflected) @@ -156,7 +160,9 @@ class TestReflector(AsyncioTestCase): await not_incoming.wait() await incoming.wait() stop.set() - self.assertListEqual(await reflect_task, [self.stream.sd_hash]) + with self.assertRaises(asyncio.CancelledError): + await reflect_task + # self.assertListEqual(await reflect_task, [self.stream.sd_hash]) self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertFalse( self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified() @@ -181,7 +187,8 @@ class TestReflector(AsyncioTestCase): await incoming.wait() await self.stream_manager.delete(self.stream, delete_file=True) # this used to raise OSError when it can't read the deleted blob for the upload - self.assertListEqual(await reflect_task, [self.stream.sd_hash]) + with self.assertRaises(asyncio.CancelledError): + await reflect_task self.assertTrue(self.server_blob_manager.get_blob(self.stream.sd_hash).get_is_verified()) self.assertFalse( self.server_blob_manager.get_blob(self.stream.descriptor.blobs[0].blob_hash).get_is_verified()