From 8b1307e1caaf702cd46e987b9da215a02cebe24d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 15 Jul 2019 04:45:22 -0300 Subject: [PATCH] file status completed set only if all bytes were written --- lbry/lbry/stream/managed_stream.py | 22 ++++++------------- lbry/tests/unit/stream/test_managed_stream.py | 11 +++++++++- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index b7c842c3e..21bfddeb6 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -56,7 +56,6 @@ class ManagedStream: 'stream_claim_info', 'download_id', 'rowid', - 'written_bytes', 'content_fee', 'downloader', 'analytics_manager', @@ -89,7 +88,6 @@ class ManagedStream: self.stream_claim_info = claim self.download_id = download_id or binascii.hexlify(generate_id()).decode() self.rowid = rowid - self.written_bytes = 0 self.content_fee = content_fee self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager @@ -121,6 +119,10 @@ class ManagedStream: def status(self) -> str: return self._status + @property + def written_bytes(self) -> int: + return 0 if not self.output_file_exists else os.stat(self.full_path).st_size + async def update_status(self, status: str): assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] self._status = status @@ -205,21 +207,14 @@ class ManagedStream: full_path = self.full_path file_name = self.file_name download_directory = self.download_directory - if self.full_path and self.output_file_exists: - if self.written_bytes: - written_bytes = self.written_bytes - else: - written_bytes = os.stat(self.full_path).st_size - else: + if not self.output_file_exists: full_path = None file_name = None download_directory = None written_bytes = None return { 'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}", - 'completed': (self.output_file_exists and (self.status in ('stopped', 'finished')) - or not self.saving.is_set()) or all( - self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]), + 'completed': self.written_bytes >= self.descriptor.lower_bound_decrypted_length(), 'file_name': file_name, 'download_directory': download_directory, 'points_paid': 0.0, @@ -233,7 +228,7 @@ class ManagedStream: 'key': self.descriptor.key, 'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(), 'total_bytes': self.descriptor.upper_bound_decrypted_length(), - 'written_bytes': written_bytes, + 'written_bytes': self.written_bytes, 'blobs_completed': self.blobs_completed, 'blobs_in_stream': self.blobs_in_stream, 'blobs_remaining': self.blobs_remaining, @@ -371,7 +366,6 @@ class ManagedStream: async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID): log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted) - self.written_bytes += len(decrypted) if not self.started_writing.is_set(): self.started_writing.set() await self.update_status(ManagedStream.STATUS_FINISHED) @@ -387,7 +381,6 @@ class ManagedStream: if os.path.isfile(output_path): log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) os.remove(output_path) - self.written_bytes = 0 if isinstance(err, asyncio.TimeoutError): self.downloader.stop() await self.blob_manager.storage.change_file_download_dir_and_file_name( @@ -425,7 +418,6 @@ class ManagedStream: self.stream_hash, self.download_directory, self.file_name ) await self.update_status(ManagedStream.STATUS_RUNNING) - self.written_bytes = 0 self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) await self.started_writing.wait() diff --git a/lbry/tests/unit/stream/test_managed_stream.py b/lbry/tests/unit/stream/test_managed_stream.py index 3e5f56f4b..d91a1dea8 100644 --- a/lbry/tests/unit/stream/test_managed_stream.py +++ b/lbry/tests/unit/stream/test_managed_stream.py @@ -40,6 +40,15 @@ class TestManagedStream(BlobExchangeTestBase): self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir ) + async def test_status_file_completed(self): + await self._test_transfer_stream(10) + self.assertTrue(self.stream.output_file_exists) + self.assertTrue(self.stream.as_dict()['completed']) + with open(self.stream.full_path, 'w+b') as outfile: + outfile.truncate(1) + self.assertTrue(self.stream.output_file_exists) + self.assertFalse(self.stream.as_dict()['completed']) + async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True): await self.setup_stream(blob_count) mock_node = mock.Mock(spec=Node) @@ -52,7 +61,7 @@ class TestManagedStream(BlobExchangeTestBase): mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers await self.stream.save_file(node=mock_node) - await self.stream.finished_writing.wait() + await self.stream.finished_write_attempt.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) if stop_when_done: await self.stream.stop()