forked from LBRYCommunity/lbry-sdk
move more logic out of the downloader component
This commit is contained in:
parent
b0bf6eff16
commit
c3b89a896b
7 changed files with 52 additions and 77 deletions
|
@ -15,8 +15,12 @@ class DiskSpaceManager:
|
||||||
self.task = None
|
self.task = None
|
||||||
self.analytics = analytics
|
self.analytics = analytics
|
||||||
|
|
||||||
|
async def get_free_space_bytes(self, is_network_blob=False):
|
||||||
|
limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit
|
||||||
|
return max(0, limit_mb*1024*1024 - (await self.get_space_used_mb(is_network_blob)))
|
||||||
|
|
||||||
async def get_space_used_bytes(self, is_network_blob=False):
|
async def get_space_used_bytes(self, is_network_blob=False):
|
||||||
return await self.db.get_stored_blob_disk_usage(is_orphan_blob=is_network_blob)
|
return await self.db.get_stored_blob_disk_usage(is_network_blob=is_network_blob)
|
||||||
|
|
||||||
async def get_space_used_mb(self, is_network_blob=False):
|
async def get_space_used_mb(self, is_network_blob=False):
|
||||||
return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0)
|
return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0)
|
||||||
|
@ -25,15 +29,13 @@ class DiskSpaceManager:
|
||||||
await self._clean(False)
|
await self._clean(False)
|
||||||
await self._clean(True)
|
await self._clean(True)
|
||||||
|
|
||||||
async def _clean(self, from_network_storage=False):
|
async def _clean(self, is_network_blob=False):
|
||||||
space_used_bytes = await self.get_space_used_bytes(from_network_storage)
|
space_used_bytes = await self.get_space_used_bytes(is_network_blob)
|
||||||
if from_network_storage:
|
storage_limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit
|
||||||
storage_limit = self.config.network_storage_limit*1024*1024 if self.config.network_storage_limit else None
|
storage_limit = storage_limit_mb*1024*1024 if storage_limit_mb else None
|
||||||
else:
|
|
||||||
storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None
|
|
||||||
if self.analytics:
|
if self.analytics:
|
||||||
asyncio.create_task(
|
asyncio.create_task(
|
||||||
self.analytics.send_disk_space_used(space_used_bytes, storage_limit, from_network_storage)
|
self.analytics.send_disk_space_used(space_used_bytes, storage_limit, is_network_blob)
|
||||||
)
|
)
|
||||||
if not storage_limit:
|
if not storage_limit:
|
||||||
return 0
|
return 0
|
||||||
|
@ -41,7 +43,7 @@ class DiskSpaceManager:
|
||||||
available = storage_limit - space_used_bytes
|
available = storage_limit - space_used_bytes
|
||||||
if available > 0:
|
if available > 0:
|
||||||
return 0
|
return 0
|
||||||
for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, orphans=from_network_storage):
|
for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, is_network_blob=is_network_blob):
|
||||||
delete.append(blob_hash)
|
delete.append(blob_hash)
|
||||||
available += file_size
|
available += file_size
|
||||||
if available > 0:
|
if available > 0:
|
||||||
|
|
|
@ -31,6 +31,11 @@ class Node:
|
||||||
self._refresh_task: asyncio.Task = None
|
self._refresh_task: asyncio.Task = None
|
||||||
self._storage = storage
|
self._storage = storage
|
||||||
|
|
||||||
|
@property
|
||||||
|
def last_requested_blob_hash(self):
|
||||||
|
if len(self.protocol.data_store.requested_blobs) > 0:
|
||||||
|
return self.protocol.data_store.requested_blobs[-1]
|
||||||
|
|
||||||
async def refresh_node(self, force_once=False):
|
async def refresh_node(self, force_once=False):
|
||||||
while True:
|
while True:
|
||||||
# remove peers with expired blob announcements from the datastore
|
# remove peers with expired blob announcements from the datastore
|
||||||
|
|
|
@ -4,7 +4,6 @@ import asyncio
|
||||||
import logging
|
import logging
|
||||||
import binascii
|
import binascii
|
||||||
import typing
|
import typing
|
||||||
from collections import deque
|
|
||||||
|
|
||||||
import base58
|
import base58
|
||||||
|
|
||||||
|
@ -19,7 +18,7 @@ from lbry.dht.blob_announcer import BlobAnnouncer
|
||||||
from lbry.blob.blob_manager import BlobManager
|
from lbry.blob.blob_manager import BlobManager
|
||||||
from lbry.blob.disk_space_manager import DiskSpaceManager
|
from lbry.blob.disk_space_manager import DiskSpaceManager
|
||||||
from lbry.blob_exchange.server import BlobServer
|
from lbry.blob_exchange.server import BlobServer
|
||||||
from lbry.stream.downloader import StreamDownloader
|
from lbry.stream.background_downloader import BackgroundDownloader
|
||||||
from lbry.stream.stream_manager import StreamManager
|
from lbry.stream.stream_manager import StreamManager
|
||||||
from lbry.file.file_manager import FileManager
|
from lbry.file.file_manager import FileManager
|
||||||
from lbry.extras.daemon.component import Component
|
from lbry.extras.daemon.component import Component
|
||||||
|
@ -381,7 +380,7 @@ class FileManagerComponent(Component):
|
||||||
self.file_manager.stop()
|
self.file_manager.stop()
|
||||||
|
|
||||||
|
|
||||||
class BackgroundDownloader(Component):
|
class BackgroundDownloaderComponent(Component):
|
||||||
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
component_name = BACKGROUND_DOWNLOADER_COMPONENT
|
||||||
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
|
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]
|
||||||
|
|
||||||
|
@ -389,57 +388,46 @@ class BackgroundDownloader(Component):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
self.task: typing.Optional[asyncio.Task] = None
|
self.task: typing.Optional[asyncio.Task] = None
|
||||||
self.download_loop_delay_seconds = 60
|
self.download_loop_delay_seconds = 60
|
||||||
self.finished_iteration = asyncio.Event()
|
|
||||||
self.requested_blobs = deque(maxlen=10)
|
|
||||||
self.ongoing_download: typing.Optional[asyncio.Task] = None
|
self.ongoing_download: typing.Optional[asyncio.Task] = None
|
||||||
|
self.space_manager: typing.Optional[DiskSpaceManager] = None
|
||||||
|
self.background_downloader: typing.Optional[BackgroundDownloader] = None
|
||||||
|
self.dht_node: typing.Optional[Node] = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def component(self) -> 'BackgroundDownloader':
|
def is_busy(self):
|
||||||
|
return bool(self.ongoing_download and not self.ongoing_download.done())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def component(self) -> 'BackgroundDownloaderComponent':
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def get_status(self):
|
async def get_status(self):
|
||||||
return {'running': self.task is not None and not self.task.done(), 'enqueued': len(self.requested_blobs)}
|
return {'running': self.task is not None and not self.task.done(),
|
||||||
|
'ongoing_download': self.is_busy}
|
||||||
|
|
||||||
async def loop(self):
|
async def loop(self):
|
||||||
while True:
|
while True:
|
||||||
if self.component_manager.has_component(DHT_COMPONENT):
|
if not self.is_busy and await self.space_manager.get_free_space_bytes(True) > 0:
|
||||||
node = self.component_manager.get_component(DHT_COMPONENT)
|
blob_hash = self.dht_node.last_requested_blob_hash
|
||||||
self.requested_blobs = node.protocol.data_store.requested_blobs
|
if blob_hash:
|
||||||
if self.requested_blobs and (not self.ongoing_download or self.ongoing_download.done()):
|
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
|
||||||
blob_hash = self.requested_blobs.pop()
|
|
||||||
self.ongoing_download = asyncio.create_task(self.download_blobs(blob_hash))
|
|
||||||
self.ongoing_download.add_done_callback(lambda _: self.finished_iteration.set())
|
|
||||||
self.finished_iteration.clear()
|
|
||||||
await asyncio.sleep(self.download_loop_delay_seconds)
|
await asyncio.sleep(self.download_loop_delay_seconds)
|
||||||
|
|
||||||
async def download_blobs(self, sd_hash):
|
|
||||||
if self.conf.network_storage_limit <= 0:
|
|
||||||
return
|
|
||||||
space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
|
|
||||||
if (await space_manager.get_space_used_mb(True)) >= self.conf.network_storage_limit:
|
|
||||||
log.info("Allocated space for proactive downloader is full. Background download aborted.")
|
|
||||||
return
|
|
||||||
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
|
||||||
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash)
|
|
||||||
storage = blob_manager.storage
|
|
||||||
node = None
|
|
||||||
if self.component_manager.has_component(DHT_COMPONENT):
|
|
||||||
node = self.component_manager.get_component(DHT_COMPONENT)
|
|
||||||
try:
|
|
||||||
await downloader.start(node, save_stream=False)
|
|
||||||
except ValueError:
|
|
||||||
return
|
|
||||||
for blob_info in downloader.descriptor.blobs[:-1]:
|
|
||||||
await downloader.download_stream_blob(blob_info)
|
|
||||||
await storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash)
|
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
|
||||||
|
if not self.component_manager.has_component(DHT_COMPONENT):
|
||||||
|
return
|
||||||
|
self.dht_node = self.component_manager.get_component(DHT_COMPONENT)
|
||||||
|
blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
|
||||||
|
storage = self.component_manager.get_component(DATABASE_COMPONENT)
|
||||||
|
self.background_downloader = BackgroundDownloader(self.conf, storage, blob_manager, self.dht_node)
|
||||||
self.task = asyncio.create_task(self.loop())
|
self.task = asyncio.create_task(self.loop())
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
if self.ongoing_download and not self.ongoing_download.done():
|
if self.ongoing_download and not self.ongoing_download.done():
|
||||||
self.ongoing_download.cancel()
|
self.ongoing_download.cancel()
|
||||||
self.task.cancel()
|
if self.task:
|
||||||
|
self.task.cancel()
|
||||||
|
|
||||||
|
|
||||||
class DiskSpaceComponent(Component):
|
class DiskSpaceComponent(Component):
|
||||||
|
|
|
@ -442,9 +442,9 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def get_all_blob_hashes(self):
|
def get_all_blob_hashes(self):
|
||||||
return self.run_and_return_list("select blob_hash from blob")
|
return self.run_and_return_list("select blob_hash from blob")
|
||||||
|
|
||||||
async def get_stored_blobs(self, is_mine: bool, orphans=False):
|
async def get_stored_blobs(self, is_mine: bool, is_network_blob=False):
|
||||||
is_mine = 1 if is_mine else 0
|
is_mine = 1 if is_mine else 0
|
||||||
if orphans:
|
if is_network_blob:
|
||||||
return await self.db.execute_fetchall(
|
return await self.db.execute_fetchall(
|
||||||
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
"select blob.blob_hash, blob.blob_length, blob.added_on "
|
||||||
"from blob left join stream_blob using (blob_hash) "
|
"from blob left join stream_blob using (blob_hash) "
|
||||||
|
@ -466,14 +466,14 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
)
|
)
|
||||||
return normal_blobs + sd_blobs
|
return normal_blobs + sd_blobs
|
||||||
|
|
||||||
async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_orphan_blob: bool = False):
|
async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_network_blob: bool = False):
|
||||||
sql = "select coalesce(sum(blob_length), 0) "
|
sql = "select coalesce(sum(blob_length), 0) "
|
||||||
if is_orphan_blob:
|
if is_network_blob:
|
||||||
sql += "from blob left join stream_blob using (blob_hash) where stream_blob.stream_hash is null"
|
sql += "from blob left join stream_blob using (blob_hash) where stream_blob.stream_hash is null"
|
||||||
else:
|
else:
|
||||||
sql += "from blob join stream_blob using (blob_hash)"
|
sql += "from blob join stream_blob using (blob_hash)"
|
||||||
if is_mine is not None:
|
if is_mine is not None:
|
||||||
sql += f'{(" and " if is_orphan_blob else " where ")} is_mine=?'
|
sql += f'{(" and " if is_network_blob else " where ")} is_mine=?'
|
||||||
args = (1 if is_mine else 0,) if is_mine is not None else ()
|
args = (1 if is_mine else 0,) if is_mine is not None else ()
|
||||||
return (await self.db.execute_fetchone(sql, args))[0]
|
return (await self.db.execute_fetchone(sql, args))[0]
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ from lbry.stream.downloader import StreamDownloader
|
||||||
|
|
||||||
|
|
||||||
class BackgroundDownloader:
|
class BackgroundDownloader:
|
||||||
def __init__(self, conf, storage, blob_manager, dht_node):
|
def __init__(self, conf, storage, blob_manager, dht_node=None):
|
||||||
self.storage = storage
|
self.storage = storage
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.node = dht_node
|
self.node = dht_node
|
||||||
|
@ -18,4 +18,4 @@ class BackgroundDownloader:
|
||||||
return
|
return
|
||||||
for blob_info in downloader.descriptor.blobs[:-1]:
|
for blob_info in downloader.descriptor.blobs[:-1]:
|
||||||
await downloader.download_stream_blob(blob_info)
|
await downloader.download_stream_blob(blob_info)
|
||||||
await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash)
|
await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash)
|
||||||
|
|
|
@ -4,6 +4,7 @@ import os
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
|
|
||||||
from lbry.schema import Claim
|
from lbry.schema import Claim
|
||||||
|
from lbry.stream.background_downloader import BackgroundDownloader
|
||||||
from lbry.stream.descriptor import StreamDescriptor
|
from lbry.stream.descriptor import StreamDescriptor
|
||||||
from lbry.testcase import CommandTestCase
|
from lbry.testcase import CommandTestCase
|
||||||
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT
|
||||||
|
@ -611,8 +612,7 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
content2 = content2['outputs'][0]['value']['source']['sd_hash']
|
content2 = content2['outputs'][0]['value']['source']['sd_hash']
|
||||||
self.assertEqual('48', (await self.status())['disk_space']['space_used'])
|
self.assertEqual('48', (await self.status())['disk_space']['space_used'])
|
||||||
|
|
||||||
proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT)
|
proactive_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager)
|
||||||
self.daemon.conf.network_storage_limit = 100
|
|
||||||
await self.clear()
|
await self.clear()
|
||||||
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
self.assertEqual('0', (await self.status())['disk_space']['space_used'])
|
||||||
self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used'])
|
self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used'])
|
||||||
|
@ -636,34 +636,14 @@ class TestProactiveDownloaderComponent(CommandTestCase):
|
||||||
await proactive_downloader.download_blobs(blobs[0].blob_hash)
|
await proactive_downloader.download_blobs(blobs[0].blob_hash)
|
||||||
self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)
|
self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes)
|
||||||
|
|
||||||
# trigger from requested blobs
|
|
||||||
await self.clear()
|
|
||||||
await proactive_downloader.stop()
|
|
||||||
proactive_downloader.requested_blobs.append(content1)
|
|
||||||
finished = proactive_downloader.finished_iteration.wait()
|
|
||||||
await proactive_downloader.start()
|
|
||||||
await finished
|
|
||||||
await self.assertBlobs(content1)
|
|
||||||
await self.clear()
|
|
||||||
# test that disk space manager doesn't delete orphan network blobs
|
# test that disk space manager doesn't delete orphan network blobs
|
||||||
await proactive_downloader.download_blobs(content1)
|
await proactive_downloader.download_blobs(content1)
|
||||||
await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning
|
await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning
|
||||||
await self.daemon.jsonrpc_get("content2", save_file=False)
|
await self.daemon.jsonrpc_get("content2", save_file=False)
|
||||||
while (await self.file_list())[0]['status'] == 'running':
|
while (await self.file_list())[0]['status'] != 'stopped':
|
||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(0.5)
|
||||||
await self.assertBlobs(content1, no_files=False)
|
await self.assertBlobs(content1, no_files=False)
|
||||||
|
|
||||||
self.daemon.conf.blob_storage_limit = 1
|
self.daemon.conf.blob_storage_limit = 1
|
||||||
await self.blob_clean()
|
await self.blob_clean()
|
||||||
await self.assertBlobs(content1, no_files=False)
|
await self.assertBlobs(content1, no_files=False)
|
||||||
|
|
||||||
# downloading above limit triggers cleanup
|
|
||||||
self.daemon.conf.network_storage_limit = 6
|
|
||||||
with self.assertLogs() as log:
|
|
||||||
await proactive_downloader.download_blobs(content2)
|
|
||||||
self.assertIn('Allocated space for proactive downloader is full.', log.output[0])
|
|
||||||
await self.assertBlobs(content1, no_files=False)
|
|
||||||
self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used'])
|
|
||||||
await self.blob_clean()
|
|
||||||
self.assertLessEqual(int((await self.status())['disk_space']['network_seeding_space_used']),
|
|
||||||
self.daemon.conf.network_storage_limit)
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ class TestComponentManager(AsyncioTestCase):
|
||||||
components.WalletServerPaymentsComponent
|
components.WalletServerPaymentsComponent
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
components.BackgroundDownloader,
|
components.BackgroundDownloaderComponent,
|
||||||
]
|
]
|
||||||
]
|
]
|
||||||
self.component_manager = ComponentManager(Config())
|
self.component_manager = ComponentManager(Config())
|
||||||
|
|
Loading…
Reference in a new issue