diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index d2080db8c..a4246bfab 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -108,7 +108,8 @@ def encode_file_doc(): 'metadata': '(dict) None if claim is not found else the claim metadata', 'channel_claim_id': '(str) None if claim is not found or not signed', 'channel_name': '(str) None if claim is not found or not signed', - 'claim_name': '(str) None if claim is not found else the claim name' + 'claim_name': '(str) None if claim is not found else the claim name', + 'reflector_progress': '(int) reflector upload progress, 0 to 100' } @@ -307,7 +308,8 @@ class JSONResponseEncoder(JSONEncoder): 'height': tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, 'timestamp': self.ledger.headers.estimated_timestamp(tx_height), - 'is_fully_reflected': managed_stream.is_fully_reflected + 'is_fully_reflected': managed_stream.is_fully_reflected, + 'reflector_progress': managed_stream.reflector_progress } def encode_claim(self, claim): diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index bf6955a9b..5071d879e 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -65,6 +65,7 @@ class ManagedStream: 'downloader', 'analytics_manager', 'fully_reflected', + 'reflector_progress', 'file_output_task', 'delayed_stop_task', 'streaming_responses', @@ -101,6 +102,7 @@ class ManagedStream: self.analytics_manager = analytics_manager self.fully_reflected = asyncio.Event(loop=self.loop) + self.reflector_progress = 0 self.file_output_task: typing.Optional[asyncio.Task] = None self.delayed_stop_task: typing.Optional[asyncio.Task] = None self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] @@ -445,9 +447,10 @@ class ManagedStream: ] log.info("we have %i/%i needed blobs needed by reflector for lbry://%s#%s", len(we_have), len(needed), self.claim_name, self.claim_id) - for blob_hash in we_have: + for i, blob_hash in enumerate(we_have): await protocol.send_blob(blob_hash) sent.append(blob_hash) + self.reflector_progress = int((i + 1) / len(we_have) * 100) except (asyncio.TimeoutError, ValueError): return sent except ConnectionRefusedError: diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 8c228f92c..4845948d1 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -46,7 +46,9 @@ class TestStreamAssembler(AsyncioTestCase): reflector.start_server(5566, '127.0.0.1') await reflector.started_listening.wait() self.addCleanup(reflector.stop_server) + self.assertEqual(0, self.stream.reflector_progress) sent = await self.stream.upload_to_reflector('127.0.0.1', 5566) + self.assertEqual(100, self.stream.reflector_progress) self.assertSetEqual( set(sent), set(map(lambda b: b.blob_hash,