don't save streams for network blobs and bypass disk space manager
This commit is contained in:
parent
3b27d6a9b5
commit
34d18a3a9a
4 changed files with 48 additions and 18 deletions
|
@ -419,7 +419,7 @@ class BackgroundDownloader(Component):
|
||||||
if self.component_manager.has_component(DHT_COMPONENT):
|
if self.component_manager.has_component(DHT_COMPONENT):
|
||||||
node = self.component_manager.get_component(DHT_COMPONENT)
|
node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
try:
|
try:
|
||||||
await downloader.start(node)
|
await downloader.start(node, save_stream=False)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return
|
return
|
||||||
for blob_info in downloader.descriptor.blobs[:-1]:
|
for blob_info in downloader.descriptor.blobs[:-1]:
|
||||||
|
|
|
@ -437,19 +437,28 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def get_all_blob_hashes(self):
|
def get_all_blob_hashes(self):
|
||||||
return self.run_and_return_list("select blob_hash from blob")
|
return self.run_and_return_list("select blob_hash from blob")
|
||||||
|
|
||||||
async def get_stored_blobs(self, is_mine: bool):
|
async def get_stored_blobs(self, is_mine: bool, orphans=False):
|
||||||
is_mine = 1 if is_mine else 0
|
is_mine = 1 if is_mine else 0
|
||||||
return await self.db.execute_fetchall(
|
sd_blobs = await self.db.execute_fetchall(
|
||||||
"select blob_hash, blob_length, added_on from blob where is_mine=? order by added_on asc",
|
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
||||||
|
"from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) "
|
||||||
|
"where blob.is_mine=? order by blob.added_on asc",
|
||||||
(is_mine,)
|
(is_mine,)
|
||||||
)
|
)
|
||||||
|
normal_blobs = await self.db.execute_fetchall(
|
||||||
|
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
||||||
|
"from blob join stream_blob using (blob_hash) cross join stream using (stream_hash)"
|
||||||
|
"cross join file using (stream_hash) where blob.is_mine=? order by blob.added_on asc",
|
||||||
|
(is_mine,)
|
||||||
|
)
|
||||||
|
return normal_blobs + sd_blobs
|
||||||
|
|
||||||
async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None):
|
async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None):
|
||||||
if is_mine is None:
|
if is_mine is None:
|
||||||
sql, args = "select coalesce(sum(blob_length), 0) from blob", ()
|
sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash)"
|
||||||
else:
|
else:
|
||||||
is_mine = 1 if is_mine else 0
|
sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash) where is_mine=?"
|
||||||
sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,)
|
args = (1 if is_mine else 0,) if is_mine is not None else ()
|
||||||
return (await self.db.execute_fetchone(sql, args))[0]
|
return (await self.db.execute_fetchone(sql, args))[0]
|
||||||
|
|
||||||
async def update_blob_ownership(self, sd_hash, is_mine: bool):
|
async def update_blob_ownership(self, sd_hash, is_mine: bool):
|
||||||
|
|
|
@ -83,7 +83,7 @@ class StreamDownloader:
|
||||||
)
|
)
|
||||||
log.info("loaded stream manifest %s", self.sd_hash)
|
log.info("loaded stream manifest %s", self.sd_hash)
|
||||||
|
|
||||||
async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0):
|
async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0, save_stream=True):
|
||||||
# set up peer accumulation
|
# set up peer accumulation
|
||||||
self.node = node or self.node # fixme: this shouldnt be set here!
|
self.node = node or self.node # fixme: this shouldnt be set here!
|
||||||
if self.node:
|
if self.node:
|
||||||
|
@ -102,7 +102,7 @@ class StreamDownloader:
|
||||||
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
|
||||||
log.info("added head blob to peer search for stream %s", self.sd_hash)
|
log.info("added head blob to peer search for stream %s", self.sd_hash)
|
||||||
|
|
||||||
if not await self.blob_manager.storage.stream_exists(self.sd_hash):
|
if not await self.blob_manager.storage.stream_exists(self.sd_hash) and save_stream:
|
||||||
await self.blob_manager.storage.store_stream(
|
await self.blob_manager.storage.store_stream(
|
||||||
self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor
|
self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor
|
||||||
)
|
)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import os
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
|
|
||||||
from lbry.schema import Claim
|
from lbry.schema import Claim
|
||||||
|
from lbry.stream.descriptor import StreamDescriptor
|
||||||
from lbry.testcase import CommandTestCase
|
from lbry.testcase import CommandTestCase
|
||||||
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
||||||
from lbry.wallet import Transaction
|
from lbry.wallet import Transaction
|
||||||
|
@ -574,19 +575,24 @@ class DiskSpaceManagement(CommandTestCase):
|
||||||
|
|
||||||
|
|
||||||
class TestProactiveDownloaderComponent(CommandTestCase):
|
class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
|
async def get_blobs_from_sd_blob(self, sd_blob):
|
||||||
|
descriptor = await StreamDescriptor.from_stream_descriptor_blob(
|
||||||
|
asyncio.get_running_loop(), self.daemon.blob_manager.blob_dir, sd_blob
|
||||||
|
)
|
||||||
|
return descriptor.blobs
|
||||||
|
|
||||||
async def assertBlobs(self, *sd_hashes, no_files=True):
|
async def assertBlobs(self, *sd_hashes, no_files=True):
|
||||||
# checks that we have ony the finished blobs needed for the the referenced streams
|
# checks that we have ony the finished blobs needed for the the referenced streams
|
||||||
seen = set(sd_hashes)
|
seen = set(sd_hashes)
|
||||||
for sd_hash in sd_hashes:
|
for sd_hash in sd_hashes:
|
||||||
self.assertTrue(self.daemon.blob_manager.get_blob(sd_hash).get_is_verified())
|
sd_blob = self.daemon.blob_manager.get_blob(sd_hash)
|
||||||
blobs = await self.daemon.storage.get_blobs_for_stream(
|
self.assertTrue(sd_blob.get_is_verified())
|
||||||
await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash)
|
blobs = await self.get_blobs_from_sd_blob(sd_blob)
|
||||||
)
|
|
||||||
for blob in blobs[:-1]:
|
for blob in blobs[:-1]:
|
||||||
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified())
|
||||||
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
seen.update(blob.blob_hash for blob in blobs if blob.blob_hash)
|
||||||
self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes)
|
|
||||||
if no_files:
|
if no_files:
|
||||||
|
self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes)
|
||||||
self.assertEqual(0, len(await self.file_list()))
|
self.assertEqual(0, len(await self.file_list()))
|
||||||
|
|
||||||
async def clear(self):
|
async def clear(self):
|
||||||
|
@ -596,25 +602,28 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items']))
|
||||||
|
|
||||||
async def test_ensure_download(self):
|
async def test_ensure_download(self):
|
||||||
content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 23)))
|
content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 24)))
|
||||||
content1 = content1['outputs'][0]['value']['source']['sd_hash']
|
content1 = content1['outputs'][0]['value']['source']['sd_hash']
|
||||||
content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23)))
|
content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23)))
|
||||||
content2 = content2['outputs'][0]['value']['source']['sd_hash']
|
content2 = content2['outputs'][0]['value']['source']['sd_hash']
|
||||||
|
self.assertEqual('48', (await self.status())['disk_space']['space_used'])
|
||||||
|
|
||||||
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
||||||
await self.clear()
|
await self.clear()
|
||||||
|
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
||||||
await proactive_downloader.download_blobs(content1)
|
await proactive_downloader.download_blobs(content1)
|
||||||
await self.assertBlobs(content1)
|
await self.assertBlobs(content1)
|
||||||
|
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
||||||
await proactive_downloader.download_blobs(content2)
|
await proactive_downloader.download_blobs(content2)
|
||||||
await self.assertBlobs(content1, content2)
|
await self.assertBlobs(content1, content2)
|
||||||
|
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
||||||
await self.clear()
|
await self.clear()
|
||||||
await proactive_downloader.download_blobs(content2)
|
await proactive_downloader.download_blobs(content2)
|
||||||
await self.assertBlobs(content2)
|
await self.assertBlobs(content2)
|
||||||
|
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
||||||
|
|
||||||
# tests that an attempt to download something that isn't a sd blob will download the single blob and stop
|
# tests that an attempt to download something that isn't a sd blob will download the single blob and stop
|
||||||
blobs = await self.daemon.storage.get_blobs_for_stream(
|
blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1))
|
||||||
await self.daemon.storage.get_stream_hash_for_sd_hash(content1)
|
|
||||||
)
|
|
||||||
await self.clear()
|
await self.clear()
|
||||||
await proactive_downloader.download_blobs(blobs[0].blob_hash)
|
await proactive_downloader.download_blobs(blobs[0].blob_hash)
|
||||||
self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)
|
self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)
|
||||||
|
@ -627,3 +636,15 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
await proactive_downloader.start()
|
await proactive_downloader.start()
|
||||||
await finished
|
await finished
|
||||||
await self.assertBlobs(content1)
|
await self.assertBlobs(content1)
|
||||||
|
await self.clear()
|
||||||
|
# test that disk space manager doesn't delete orphan network blobs
|
||||||
|
await proactive_downloader.download_blobs(content1)
|
||||||
|
await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning
|
||||||
|
await self.daemon.jsonrpc_get("content2", save_file=False)
|
||||||
|
while (await self.file_list())[0]['status'] == 'running':
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
await self.assertBlobs(content1, no_files=False)
|
||||||
|
|
||||||
|
self.daemon.conf.blob_storage_limit = 1
|
||||||
|
await self.blob_clean()
|
||||||
|
await self.assertBlobs(content1, no_files=False)
|
||||||
|
|
Loading…
Reference in a new issue