From f766dbefe0760062a58b83ca73814fa8d61107db Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 00:23:04 -0300 Subject: [PATCH 01/10] announce and set completed after inserts --- lbrynet/stream/assembler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 39761fa1f..8c30c5875 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -74,17 +74,17 @@ class StreamAssembler: raise OSError(f"output directory does not exist: '{output_dir}' '{output_file_name}'") await self.setup() self.sd_blob = await self.get_blob(self.sd_hash) - await self.blob_manager.blob_completed(self.sd_blob) self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir, self.sd_blob) self.output_path = await get_next_available_file_name(self.loop, output_dir, output_file_name or self.descriptor.suggested_file_name) if not self.got_descriptor.is_set(): self.got_descriptor.set() - await self.after_got_descriptor() await self.blob_manager.storage.store_stream( self.sd_blob, self.descriptor ) + await self.blob_manager.blob_completed(self.sd_blob) + await self.after_got_descriptor() with open(self.output_path, 'wb') as stream_handle: self.stream_handle = stream_handle for blob_info in self.descriptor.blobs[:-1]: From e9b58577dd72a969827586ccfe63dbf0caeaa4a6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 00:38:40 -0300 Subject: [PATCH 02/10] tests: verify completed and to announce after assembling streams --- tests/unit/stream/test_assembler.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/stream/test_assembler.py b/tests/unit/stream/test_assembler.py index c7764b978..ac318c45b 100644 --- a/tests/unit/stream/test_assembler.py +++ b/tests/unit/stream/test_assembler.py @@ -4,9 +4,9 @@ import tempfile import shutil from torba.testcase import AsyncioTestCase from lbrynet.conf import Config -from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob.blob_file import MAX_BLOB_SIZE from lbrynet.extras.daemon.storage import SQLiteStorage +from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.stream.assembler import StreamAssembler from lbrynet.stream.descriptor import StreamDescriptor @@ -55,6 +55,11 @@ class TestStreamAssembler(AsyncioTestCase): decrypted = f.read() self.assertEqual(decrypted, self.cleartext) self.assertEqual(True, self.blob_manager.get_blob(sd_hash).get_is_verified()) + self.assertEqual(True, self.blob_manager.get_blob(descriptor.blobs[0].blob_hash).get_is_verified()) + self.assertEqual(2, len(await downloader_storage.get_all_finished_blobs())) + self.assertEqual( + [descriptor.sd_hash, descriptor.blobs[0].blob_hash], await downloader_storage.get_blobs_to_announce() + ) await downloader_storage.close() await self.storage.close() From 6ead932ccb8c79b06d9a33a5cac0a59f567453ff Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 00:39:43 -0300 Subject: [PATCH 03/10] announces naturally, removes redundant calls --- lbrynet/stream/assembler.py | 6 ++++-- lbrynet/stream/downloader.py | 2 -- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 8c30c5875..296686c6d 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -59,6 +59,7 @@ class StreamAssembler: self.wrote_bytes_event.set() await self.loop.run_in_executor(None, _decrypt_and_write) + return True async def setup(self): pass @@ -76,6 +77,7 @@ class StreamAssembler: self.sd_blob = await self.get_blob(self.sd_hash) self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir, self.sd_blob) + await self.after_got_descriptor() self.output_path = await get_next_available_file_name(self.loop, output_dir, output_file_name or self.descriptor.suggested_file_name) if not self.got_descriptor.is_set(): @@ -84,14 +86,14 @@ class StreamAssembler: self.sd_blob, self.descriptor ) await self.blob_manager.blob_completed(self.sd_blob) - await self.after_got_descriptor() with open(self.output_path, 'wb') as stream_handle: self.stream_handle = stream_handle for blob_info in self.descriptor.blobs[:-1]: while True: try: blob = await self.get_blob(blob_info.blob_hash, blob_info.length) - await self._decrypt_blob(blob, blob_info, self.descriptor.key) + if await self._decrypt_blob(blob, blob_info, self.descriptor.key): + await self.blob_manager.blob_completed(blob) break except FileNotFoundError: log.debug("stream assembler stopped") diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index bab5776bd..c269275c4 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -47,11 +47,9 @@ class StreamDownloader(StreamAssembler): async def after_got_descriptor(self): self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash) log.info("added head blob to search") - await self.blob_manager.set_should_announce(self.sd_hash, 1) async def after_finished(self): log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path) - await self.blob_manager.set_should_announce(self.descriptor.blobs[0].blob_hash, 1) await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished') def stop(self): From 30846f932b80b8cf1cba1647cad4deeb04d66f78 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 01:07:37 -0300 Subject: [PATCH 04/10] test that creating a stream marks sd and head to announce --- tests/unit/stream/test_assembler.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/unit/stream/test_assembler.py b/tests/unit/stream/test_assembler.py index ac318c45b..ce62f695d 100644 --- a/tests/unit/stream/test_assembler.py +++ b/tests/unit/stream/test_assembler.py @@ -2,6 +2,7 @@ import os import asyncio import tempfile import shutil + from torba.testcase import AsyncioTestCase from lbrynet.conf import Config from lbrynet.blob.blob_file import MAX_BLOB_SIZE @@ -9,6 +10,7 @@ from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.stream.assembler import StreamAssembler from lbrynet.stream.descriptor import StreamDescriptor +from lbrynet.stream.stream_manager import StreamManager class TestStreamAssembler(AsyncioTestCase): @@ -80,3 +82,23 @@ class TestStreamAssembler(AsyncioTestCase): async def test_create_and_decrypt_random(self): self.cleartext = os.urandom(20000000) await self.test_create_and_decrypt_one_blob_stream() + + async def test_create_managed_stream_announces(self): + # setup a blob manager + storage = SQLiteStorage(Config(), ":memory:") + await storage.open() + tmp_dir = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(tmp_dir)) + blob_manager = BlobFileManager(self.loop, tmp_dir, storage) + stream_manager = StreamManager(self.loop, Config(), blob_manager, None, storage, None) + # create the stream + download_dir = tempfile.mkdtemp() + self.addCleanup(lambda: shutil.rmtree(download_dir)) + file_path = os.path.join(download_dir, "test_file") + with open(file_path, 'wb') as f: + f.write(b'testtest') + + stream = await stream_manager.create_stream(file_path) + self.assertEqual( + [stream.sd_hash, stream.descriptor.blobs[0].blob_hash], + await storage.get_blobs_to_announce()) From 2c275efa488c14c094d189f08e0a1eb0a03beb7e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 01:07:56 -0300 Subject: [PATCH 05/10] remove redundant calls to should announce --- lbrynet/stream/managed_stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 941fe8281..4c166589e 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -160,8 +160,6 @@ class ManagedStream: await blob_manager.blob_completed(sd_blob) for blob in descriptor.blobs[:-1]: await blob_manager.blob_completed(blob_manager.get_blob(blob.blob_hash, blob.length)) - await blob_manager.set_should_announce(sd_blob.blob_hash, 1) - await blob_manager.set_should_announce(descriptor.blobs[0].blob_hash, 1) return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path), status=cls.STATUS_FINISHED) From 7b8f42e6d0e0b17808f1d17829a052bd7a5c7bbf Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 01:09:16 -0300 Subject: [PATCH 06/10] remove dead set_should_announce code --- lbrynet/blob/blob_manager.py | 3 --- lbrynet/extras/daemon/storage.py | 6 ------ 2 files changed, 9 deletions(-) diff --git a/lbrynet/blob/blob_manager.py b/lbrynet/blob/blob_manager.py index 27ed58d0d..44d8fa8b5 100644 --- a/lbrynet/blob/blob_manager.py +++ b/lbrynet/blob/blob_manager.py @@ -59,9 +59,6 @@ class BlobFileManager: blobs = [self.get_blob(b) for b in blob_hashes] return [blob.blob_hash for blob in blobs if blob.get_is_verified()] - async def set_should_announce(self, blob_hash: str, should_announce: int): - return await self.storage.set_should_announce(blob_hash, should_announce) - async def get_all_verified_blobs(self) -> typing.List[str]: blob_hashes = await self.storage.get_all_blob_hashes() return self.check_completed_blobs(blob_hashes) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 0a0359841..48364393f 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -208,12 +208,6 @@ class SQLiteStorage(SQLiteMixin): log.debug("Adding a completed blob. blob_hash=%s", blob_hash) return self.db.execute("update blob set status='finished' where blob.blob_hash=?", (blob_hash, )) - def set_should_announce(self, blob_hash: str, should_announce: int): - return self.db.execute( - "update blob set next_announce_time=?, should_announce=? where blob_hash=?", - (int(self.time_getter()), should_announce, blob_hash) - ) - def get_blob_status(self, blob_hash: str): return self.run_and_return_one_or_none( "select status from blob where blob_hash=?", blob_hash From c31e61586d6b5c3a98b24ab02f002ba2e034f2aa Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 01:25:58 -0300 Subject: [PATCH 07/10] tests: fix value, compare to blobs len --- tests/unit/stream/test_assembler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/unit/stream/test_assembler.py b/tests/unit/stream/test_assembler.py index ce62f695d..c82b9bbf6 100644 --- a/tests/unit/stream/test_assembler.py +++ b/tests/unit/stream/test_assembler.py @@ -22,7 +22,7 @@ class TestStreamAssembler(AsyncioTestCase): async def test_create_and_decrypt_one_blob_stream(self): tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite")) + self.storage = SQLiteStorage(Config(), ":memory:") await self.storage.open() self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage) @@ -58,7 +58,8 @@ class TestStreamAssembler(AsyncioTestCase): self.assertEqual(decrypted, self.cleartext) self.assertEqual(True, self.blob_manager.get_blob(sd_hash).get_is_verified()) self.assertEqual(True, self.blob_manager.get_blob(descriptor.blobs[0].blob_hash).get_is_verified()) - self.assertEqual(2, len(await downloader_storage.get_all_finished_blobs())) + # its all blobs + sd blob - last blob, which is the same size as descriptor.blobs + self.assertEqual(len(descriptor.blobs), len(await downloader_storage.get_all_finished_blobs())) self.assertEqual( [descriptor.sd_hash, descriptor.blobs[0].blob_hash], await downloader_storage.get_blobs_to_announce() ) From 63d1ec3c28627e4d5359df5a46e6f39f76e13d15 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 13:00:57 -0300 Subject: [PATCH 08/10] fixes from jack comments --- lbrynet/stream/assembler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 296686c6d..8b11ecb7e 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -89,12 +89,12 @@ class StreamAssembler: with open(self.output_path, 'wb') as stream_handle: self.stream_handle = stream_handle for blob_info in self.descriptor.blobs[:-1]: - while True: + while not stream_handle.closed: try: blob = await self.get_blob(blob_info.blob_hash, blob_info.length) if await self._decrypt_blob(blob, blob_info, self.descriptor.key): await self.blob_manager.blob_completed(blob) - break + break except FileNotFoundError: log.debug("stream assembler stopped") return From c6b910a5f3985244515244814242773d277317b6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 13:14:59 -0300 Subject: [PATCH 09/10] use time_getter on storage --- lbrynet/extras/daemon/storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 48364393f..abdacfe6f 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -684,7 +684,7 @@ class SQLiteStorage(SQLiteMixin): if success: return self.db.execute( "insert or replace into reflected_stream values (?, ?, ?)", - (sd_hash, reflector_address, time.time()) + (sd_hash, reflector_address, self.time_getter()) ) return self.db.execute( "delete from reflected_stream where sd_hash=? and reflector_address=?", From a4ced4dbd255e5732e80e70e339458747cc03507 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 5 Feb 2019 13:17:00 -0300 Subject: [PATCH 10/10] set verified after callback --- lbrynet/blob/blob_file.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/blob/blob_file.py b/lbrynet/blob/blob_file.py index ae610da9a..80702d0aa 100644 --- a/lbrynet/blob/blob_file.py +++ b/lbrynet/blob/blob_file.py @@ -111,9 +111,9 @@ class BlobFile: if self.verified.is_set(): return await self.loop.run_in_executor(None, _save_verified) - self.verified.set() if self.blob_completed_callback: await self.blob_completed_callback(self) + self.verified.set() def open_for_writing(self) -> HashBlobWriter: if os.path.exists(self.file_path):