From 8b1307e1caaf702cd46e987b9da215a02cebe24d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 15 Jul 2019 04:45:22 -0300 Subject: [PATCH 1/3] file status completed set only if all bytes were written --- lbry/lbry/stream/managed_stream.py | 22 ++++++------------- lbry/tests/unit/stream/test_managed_stream.py | 11 +++++++++- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index b7c842c3e..21bfddeb6 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -56,7 +56,6 @@ class ManagedStream: 'stream_claim_info', 'download_id', 'rowid', - 'written_bytes', 'content_fee', 'downloader', 'analytics_manager', @@ -89,7 +88,6 @@ class ManagedStream: self.stream_claim_info = claim self.download_id = download_id or binascii.hexlify(generate_id()).decode() self.rowid = rowid - self.written_bytes = 0 self.content_fee = content_fee self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager @@ -121,6 +119,10 @@ class ManagedStream: def status(self) -> str: return self._status + @property + def written_bytes(self) -> int: + return 0 if not self.output_file_exists else os.stat(self.full_path).st_size + async def update_status(self, status: str): assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] self._status = status @@ -205,21 +207,14 @@ class ManagedStream: full_path = self.full_path file_name = self.file_name download_directory = self.download_directory - if self.full_path and self.output_file_exists: - if self.written_bytes: - written_bytes = self.written_bytes - else: - written_bytes = os.stat(self.full_path).st_size - else: + if not self.output_file_exists: full_path = None file_name = None download_directory = None written_bytes = None return { 'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}", - 'completed': (self.output_file_exists and (self.status in ('stopped', 'finished')) - or not self.saving.is_set()) or all( - self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]), + 'completed': self.written_bytes >= self.descriptor.lower_bound_decrypted_length(), 'file_name': file_name, 'download_directory': download_directory, 'points_paid': 0.0, @@ -233,7 +228,7 @@ class ManagedStream: 'key': self.descriptor.key, 'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(), 'total_bytes': self.descriptor.upper_bound_decrypted_length(), - 'written_bytes': written_bytes, + 'written_bytes': self.written_bytes, 'blobs_completed': self.blobs_completed, 'blobs_in_stream': self.blobs_in_stream, 'blobs_remaining': self.blobs_remaining, @@ -371,7 +366,6 @@ class ManagedStream: async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID): log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted) - self.written_bytes += len(decrypted) if not self.started_writing.is_set(): self.started_writing.set() await self.update_status(ManagedStream.STATUS_FINISHED) @@ -387,7 +381,6 @@ class ManagedStream: if os.path.isfile(output_path): log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) os.remove(output_path) - self.written_bytes = 0 if isinstance(err, asyncio.TimeoutError): self.downloader.stop() await self.blob_manager.storage.change_file_download_dir_and_file_name( @@ -425,7 +418,6 @@ class ManagedStream: self.stream_hash, self.download_directory, self.file_name ) await self.update_status(ManagedStream.STATUS_RUNNING) - self.written_bytes = 0 self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) await self.started_writing.wait() diff --git a/lbry/tests/unit/stream/test_managed_stream.py b/lbry/tests/unit/stream/test_managed_stream.py index 3e5f56f4b..d91a1dea8 100644 --- a/lbry/tests/unit/stream/test_managed_stream.py +++ b/lbry/tests/unit/stream/test_managed_stream.py @@ -40,6 +40,15 @@ class TestManagedStream(BlobExchangeTestBase): self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir ) + async def test_status_file_completed(self): + await self._test_transfer_stream(10) + self.assertTrue(self.stream.output_file_exists) + self.assertTrue(self.stream.as_dict()['completed']) + with open(self.stream.full_path, 'w+b') as outfile: + outfile.truncate(1) + self.assertTrue(self.stream.output_file_exists) + self.assertFalse(self.stream.as_dict()['completed']) + async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True): await self.setup_stream(blob_count) mock_node = mock.Mock(spec=Node) @@ -52,7 +61,7 @@ class TestManagedStream(BlobExchangeTestBase): mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers await self.stream.save_file(node=mock_node) - await self.stream.finished_writing.wait() + await self.stream.finished_write_attempt.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) if stop_when_done: await self.stream.stop() From 4d64fca9de6ad9d8fdc493e322f144125ac55a2f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 17 Jul 2019 02:49:08 -0300 Subject: [PATCH 2/3] fix memory blob showing as completed/verified --- lbry/lbry/blob/blob_manager.py | 4 ++-- lbry/tests/unit/blob/test_blob_manager.py | 12 ++++++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/lbry/lbry/blob/blob_manager.py b/lbry/lbry/blob/blob_manager.py index 50d414fe3..64ef5f2cb 100644 --- a/lbry/lbry/blob/blob_manager.py +++ b/lbry/lbry/blob/blob_manager.py @@ -69,10 +69,10 @@ class BlobManager: def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool: if not is_valid_blobhash(blob_hash): raise ValueError(blob_hash) - if blob_hash in self.blobs: - return self.blobs[blob_hash].get_is_verified() if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)): return False + if blob_hash in self.blobs: + return self.blobs[blob_hash].get_is_verified() return self._get_blob(blob_hash, length).get_is_verified() async def setup(self) -> bool: diff --git a/lbry/tests/unit/blob/test_blob_manager.py b/lbry/tests/unit/blob/test_blob_manager.py index 0c9a26cf1..eabcbb625 100644 --- a/lbry/tests/unit/blob/test_blob_manager.py +++ b/lbry/tests/unit/blob/test_blob_manager.py @@ -17,6 +17,18 @@ class TestBlobManager(AsyncioTestCase): self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config) await self.storage.open() + async def test_memory_blobs_arent_verifie_but_real_ones_are(self): + for save_blobs in (False, True): + await self.setup_blob_manager(save_blobs=save_blobs) + # add a blob file + blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" + blob_bytes = b'1' * ((2 * 2 ** 20) - 1) + blob = self.blob_manager.get_blob(blob_hash, len(blob_bytes)) + blob.save_verified_blob(blob_bytes) + self.assertTrue(blob.get_is_verified()) + self.blob_manager.blob_completed(blob) + self.assertEqual(self.blob_manager.is_blob_verified(blob_hash), save_blobs) + async def test_sync_blob_file_manager_on_startup(self): await self.setup_blob_manager(save_blobs=True) From 576ef9c4ed424cf8319ffacd5f0f94a288c15751 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 17 Jul 2019 11:11:47 -0300 Subject: [PATCH 3/3] remove unused variable --- lbry/lbry/stream/managed_stream.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index 21bfddeb6..f82d6a51b 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -211,7 +211,6 @@ class ManagedStream: full_path = None file_name = None download_directory = None - written_bytes = None return { 'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}", 'completed': self.written_bytes >= self.descriptor.lower_bound_decrypted_length(),