forked from LBRYCommunity/lbry-sdk
cleanup background downloader blobs from conf
This commit is contained in:
parent
95703b3af5
commit
813108b9d8
5 changed files with 41 additions and 5 deletions
|
@ -22,9 +22,17 @@ class DiskSpaceManager:
|
||||||
return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0)
|
return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0)
|
||||||
|
|
||||||
async def clean(self):
|
async def clean(self):
|
||||||
space_used_bytes = await self.get_space_used_bytes()
|
await self._clean(False)
|
||||||
storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None
|
await self._clean(True)
|
||||||
|
|
||||||
|
async def _clean(self, from_network_storage=False):
|
||||||
|
space_used_bytes = await self.get_space_used_bytes(from_network_storage)
|
||||||
|
if from_network_storage:
|
||||||
|
storage_limit = self.config.network_storage_limit*1024*1024 if self.config.network_storage_limit else None
|
||||||
|
else:
|
||||||
|
storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None
|
||||||
if self.analytics:
|
if self.analytics:
|
||||||
|
# todo: add metrics for network case
|
||||||
asyncio.create_task(self.analytics.send_disk_space_used(space_used_bytes, storage_limit))
|
asyncio.create_task(self.analytics.send_disk_space_used(space_used_bytes, storage_limit))
|
||||||
if not storage_limit:
|
if not storage_limit:
|
||||||
return 0
|
return 0
|
||||||
|
@ -32,7 +40,7 @@ class DiskSpaceManager:
|
||||||
available = storage_limit - space_used_bytes
|
available = storage_limit - space_used_bytes
|
||||||
if available > 0:
|
if available > 0:
|
||||||
return 0
|
return 0
|
||||||
for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False):
|
for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, orphans=from_network_storage):
|
||||||
delete.append(blob_hash)
|
delete.append(blob_hash)
|
||||||
available += file_size
|
available += file_size
|
||||||
if available > 0:
|
if available > 0:
|
||||||
|
|
|
@ -383,7 +383,7 @@ class FileManagerComponent(Component):
|
||||||
|
|
||||||
class BackgroundDownloader(Component):
|
class BackgroundDownloader(Component):
|
||||||
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
||||||
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
|
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
|
@ -413,6 +413,12 @@ class BackgroundDownloader(Component):
|
||||||
await asyncio.sleep(self.download_loop_delay_seconds)
|
await asyncio.sleep(self.download_loop_delay_seconds)
|
||||||
|
|
||||||
async def download_blobs(self, sd_hash):
|
async def download_blobs(self, sd_hash):
|
||||||
|
if self.conf.network_storage_limit <= 0:
|
||||||
|
return
|
||||||
|
space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
|
||||||
|
if (await space_manager.get_space_used_mb(True)) >= self.conf.network_storage_limit:
|
||||||
|
log.info("Allocated space for proactive downloader is full. Background download aborted.")
|
||||||
|
return
|
||||||
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||||
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
|
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
|
||||||
node = None
|
node = None
|
||||||
|
|
|
@ -439,6 +439,14 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
|
|
||||||
async def get_stored_blobs(self, is_mine: bool, orphans=False):
|
async def get_stored_blobs(self, is_mine: bool, orphans=False):
|
||||||
is_mine = 1 if is_mine else 0
|
is_mine = 1 if is_mine else 0
|
||||||
|
if orphans:
|
||||||
|
return await self.db.execute_fetchall(
|
||||||
|
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
||||||
|
"from blob left join stream_blob using (blob_hash) "
|
||||||
|
"where stream_blob.stream_hash is null and blob.is_mine=? order by blob.added_on asc",
|
||||||
|
(is_mine,)
|
||||||
|
)
|
||||||
|
|
||||||
sd_blobs = await self.db.execute_fetchall(
|
sd_blobs = await self.db.execute_fetchall(
|
||||||
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
||||||
"from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) "
|
"from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) "
|
||||||
|
|
|
@ -609,6 +609,7 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
self.assertEqual('48', (await self.status())['disk_space']['space_used'])
|
self.assertEqual('48', (await self.status())['disk_space']['space_used'])
|
||||||
|
|
||||||
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
||||||
|
self.daemon.conf.network_storage_limit = 100
|
||||||
await self.clear()
|
await self.clear()
|
||||||
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
||||||
self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used'])
|
self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used'])
|
||||||
|
@ -652,3 +653,14 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
self.daemon.conf.blob_storage_limit = 1
|
self.daemon.conf.blob_storage_limit = 1
|
||||||
await self.blob_clean()
|
await self.blob_clean()
|
||||||
await self.assertBlobs(content1, no_files=False)
|
await self.assertBlobs(content1, no_files=False)
|
||||||
|
|
||||||
|
# downloading above limit triggers cleanup
|
||||||
|
self.daemon.conf.network_storage_limit = 6
|
||||||
|
with self.assertLogs() as log:
|
||||||
|
await proactive_downloader.download_blobs(content2)
|
||||||
|
self.assertIn('Allocated space for proactive downloader is full.', log.output[0])
|
||||||
|
await self.assertBlobs(content1, no_files=False)
|
||||||
|
self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used'])
|
||||||
|
await self.blob_clean()
|
||||||
|
self.assertLessEqual(int((await self.status())['disk_space']['network_seeding_space_used']),
|
||||||
|
self.daemon.conf.network_storage_limit)
|
||||||
|
|
|
@ -26,12 +26,14 @@ class TestComponentManager(AsyncioTestCase):
|
||||||
components.WalletComponent
|
components.WalletComponent
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
components.BackgroundDownloader,
|
|
||||||
components.DiskSpaceComponent,
|
components.DiskSpaceComponent,
|
||||||
components.FileManagerComponent,
|
components.FileManagerComponent,
|
||||||
components.HashAnnouncerComponent,
|
components.HashAnnouncerComponent,
|
||||||
components.PeerProtocolServerComponent,
|
components.PeerProtocolServerComponent,
|
||||||
components.WalletServerPaymentsComponent
|
components.WalletServerPaymentsComponent
|
||||||
|
],
|
||||||
|
[
|
||||||
|
components.BackgroundDownloader,
|
||||||
]
|
]
|
||||||
]
|
]
|
||||||
self.component_manager = ComponentManager(Config())
|
self.component_manager = ComponentManager(Config())
|
||||||
|
|
Loading…
Reference in a new issue