From 2ceec7c3d4257af13f96e83bdcff4b74193b3741 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 20 Oct 2021 02:18:34 -0300 Subject: [PATCH] don't save streams for network blobs and bypass disk space manager --- lbry/extras/daemon/components.py | 2 +- lbry/extras/daemon/storage.py | 21 +++++++--- lbry/stream/downloader.py | 4 +- .../datanetwork/test_file_commands.py | 39 ++++++++++++++----- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 44f128f5d..5371e74b5 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -419,7 +419,7 @@ class BackgroundDownloader(Component): if self.component_manager.has_component(DHT_COMPONENT): node = self.component_manager.get_component(DHT_COMPONENT) try: - await downloader.start(node) + await downloader.start(node, save_stream=False) except ValueError: return for blob_info in downloader.descriptor.blobs[:-1]: diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 758c25970..832dab1c8 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -437,19 +437,28 @@ class SQLiteStorage(SQLiteMixin): def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") - async def get_stored_blobs(self, is_mine: bool): + async def get_stored_blobs(self, is_mine: bool, orphans=False): is_mine = 1 if is_mine else 0 - return await self.db.execute_fetchall( - "select blob_hash, blob_length, added_on from blob where is_mine=? order by added_on asc", + sd_blobs = await self.db.execute_fetchall( + "select blob.blob_hash, blob.blob_length, blob.added_on " + "from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) " + "where blob.is_mine=? order by blob.added_on asc", (is_mine,) ) + normal_blobs = await self.db.execute_fetchall( + "select blob.blob_hash, blob.blob_length, blob.added_on " + "from blob join stream_blob using (blob_hash) cross join stream using (stream_hash)" + "cross join file using (stream_hash) where blob.is_mine=? order by blob.added_on asc", + (is_mine,) + ) + return normal_blobs + sd_blobs async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None): if is_mine is None: - sql, args = "select coalesce(sum(blob_length), 0) from blob", () + sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash)" else: - is_mine = 1 if is_mine else 0 - sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,) + sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash) where is_mine=?" + args = (1 if is_mine else 0,) if is_mine is not None else () return (await self.db.execute_fetchone(sql, args))[0] async def update_blob_ownership(self, sd_hash, is_mine: bool): diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 83ff67fef..12c63fb96 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -83,7 +83,7 @@ class StreamDownloader: ) log.info("loaded stream manifest %s", self.sd_hash) - async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0): + async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0, save_stream=True): # set up peer accumulation self.node = node or self.node # fixme: this shouldnt be set here! if self.node: @@ -102,7 +102,7 @@ class StreamDownloader: self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash) log.info("added head blob to peer search for stream %s", self.sd_hash) - if not await self.blob_manager.storage.stream_exists(self.sd_hash): + if not await self.blob_manager.storage.stream_exists(self.sd_hash) and save_stream: await self.blob_manager.storage.store_stream( self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor ) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 5c2ee3e0b..c4766983c 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -4,6 +4,7 @@ import os from binascii import hexlify from lbry.schema import Claim +from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT from lbry.wallet import Transaction @@ -574,19 +575,24 @@ class DiskSpaceManagement(CommandTestCase): class TestProactiveDownloaderComponent(CommandTestCase): + async def get_blobs_from_sd_blob(self, sd_blob): + descriptor = await StreamDescriptor.from_stream_descriptor_blob( + asyncio.get_running_loop(), self.daemon.blob_manager.blob_dir, sd_blob + ) + return descriptor.blobs + async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) for sd_hash in sd_hashes: - self.assertTrue(self.daemon.blob_manager.get_blob(sd_hash).get_is_verified()) - blobs = await self.daemon.storage.get_blobs_for_stream( - await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) - ) + sd_blob = self.daemon.blob_manager.get_blob(sd_hash) + self.assertTrue(sd_blob.get_is_verified()) + blobs = await self.get_blobs_from_sd_blob(sd_blob) for blob in blobs[:-1]: self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) - self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) if no_files: + self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) self.assertEqual(0, len(await self.file_list())) async def clear(self): @@ -596,25 +602,28 @@ class TestProactiveDownloaderComponent(CommandTestCase): self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) async def test_ensure_download(self): - content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 23))) + content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 24))) content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23))) content2 = content2['outputs'][0]['value']['source']['sd_hash'] + self.assertEqual('48', (await self.status())['disk_space']['space_used']) proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) await self.clear() + self.assertEqual('0', (await self.status())['disk_space']['space_used']) await proactive_downloader.download_blobs(content1) await self.assertBlobs(content1) + self.assertEqual('0', (await self.status())['disk_space']['space_used']) await proactive_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) + self.assertEqual('0', (await self.status())['disk_space']['space_used']) await self.clear() await proactive_downloader.download_blobs(content2) await self.assertBlobs(content2) + self.assertEqual('0', (await self.status())['disk_space']['space_used']) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop - blobs = await self.daemon.storage.get_blobs_for_stream( - await self.daemon.storage.get_stream_hash_for_sd_hash(content1) - ) + blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) await self.clear() await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) @@ -627,3 +636,15 @@ class TestProactiveDownloaderComponent(CommandTestCase): await proactive_downloader.start() await finished await self.assertBlobs(content1) + await self.clear() + # test that disk space manager doesn't delete orphan network blobs + await proactive_downloader.download_blobs(content1) + await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning + await self.daemon.jsonrpc_get("content2", save_file=False) + while (await self.file_list())[0]['status'] == 'running': + await asyncio.sleep(0.5) + await self.assertBlobs(content1, no_files=False) + + self.daemon.conf.blob_storage_limit = 1 + await self.blob_clean() + await self.assertBlobs(content1, no_files=False)