forked from LBRYCommunity/lbry-sdk
Merge pull request #2178 from lbryio/lru-cached-read-blob
add a lru cache for decrypted blobs to minimize redownloading them, add `blob_lru_cache_size` to the config to set the cache size
This commit is contained in:
commit
b922ea8e7e
7 changed files with 76 additions and 7 deletions
|
@ -2,6 +2,7 @@ import os
|
||||||
import typing
|
import typing
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
from lbrynet.utils import LRUCache
|
||||||
from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
|
from lbrynet.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
|
|
||||||
|
@ -30,6 +31,8 @@ class BlobManager:
|
||||||
else self._node_data_store.completed_blobs
|
else self._node_data_store.completed_blobs
|
||||||
self.blobs: typing.Dict[str, AbstractBlob] = {}
|
self.blobs: typing.Dict[str, AbstractBlob] = {}
|
||||||
self.config = config
|
self.config = config
|
||||||
|
self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache(
|
||||||
|
self.config.blob_lru_cache_size)
|
||||||
|
|
||||||
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
|
def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None):
|
||||||
if self.config.save_blobs:
|
if self.config.save_blobs:
|
||||||
|
|
|
@ -485,7 +485,10 @@ class Config(CLIConfig):
|
||||||
|
|
||||||
# blob announcement and download
|
# blob announcement and download
|
||||||
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
|
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
|
||||||
|
blob_lru_cache_size = Integer(
|
||||||
|
"LRU cache size for decrypted downloaded blobs used to minimize re-downloading the same blobs when "
|
||||||
|
"replying to a range request. Set to 0 to disable.", 32
|
||||||
|
)
|
||||||
announce_head_and_sd_only = Toggle(
|
announce_head_and_sd_only = Toggle(
|
||||||
"Announce only the descriptor and first (rather than all) data blob for a stream to the DHT", True,
|
"Announce only the descriptor and first (rather than all) data blob for a stream to the DHT", True,
|
||||||
previous_names=['announce_head_blobs_only']
|
previous_names=['announce_head_blobs_only']
|
||||||
|
|
|
@ -3,7 +3,7 @@ import typing
|
||||||
import logging
|
import logging
|
||||||
import binascii
|
import binascii
|
||||||
from lbrynet.error import DownloadSDTimeout
|
from lbrynet.error import DownloadSDTimeout
|
||||||
from lbrynet.utils import resolve_host
|
from lbrynet.utils import resolve_host, lru_cache_concurrent
|
||||||
from lbrynet.stream.descriptor import StreamDescriptor
|
from lbrynet.stream.descriptor import StreamDescriptor
|
||||||
from lbrynet.blob_exchange.downloader import BlobDownloader
|
from lbrynet.blob_exchange.downloader import BlobDownloader
|
||||||
from lbrynet.dht.peer import KademliaPeer
|
from lbrynet.dht.peer import KademliaPeer
|
||||||
|
@ -36,6 +36,16 @@ class StreamDownloader:
|
||||||
self.time_to_descriptor: typing.Optional[float] = None
|
self.time_to_descriptor: typing.Optional[float] = None
|
||||||
self.time_to_first_bytes: typing.Optional[float] = None
|
self.time_to_first_bytes: typing.Optional[float] = None
|
||||||
|
|
||||||
|
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
|
||||||
|
return await self.read_blob(blob_info, 2)
|
||||||
|
|
||||||
|
if self.blob_manager.decrypted_blob_lru_cache:
|
||||||
|
cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)(
|
||||||
|
cached_read_blob
|
||||||
|
)
|
||||||
|
|
||||||
|
self.cached_read_blob = cached_read_blob
|
||||||
|
|
||||||
async def add_fixed_peers(self):
|
async def add_fixed_peers(self):
|
||||||
def _delayed_add_fixed_peers():
|
def _delayed_add_fixed_peers():
|
||||||
self.added_fixed_peers = True
|
self.added_fixed_peers = True
|
||||||
|
|
|
@ -42,6 +42,9 @@ class ManagedStream:
|
||||||
STATUS_STOPPED = "stopped"
|
STATUS_STOPPED = "stopped"
|
||||||
STATUS_FINISHED = "finished"
|
STATUS_FINISHED = "finished"
|
||||||
|
|
||||||
|
SAVING_ID = 1
|
||||||
|
STREAMING_ID = 2
|
||||||
|
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
'loop',
|
'loop',
|
||||||
'config',
|
'config',
|
||||||
|
@ -304,7 +307,10 @@ class ManagedStream:
|
||||||
raise IndexError(start_blob_num)
|
raise IndexError(start_blob_num)
|
||||||
for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
|
for i, blob_info in enumerate(self.descriptor.blobs[start_blob_num:-1]):
|
||||||
assert i + start_blob_num == blob_info.blob_num
|
assert i + start_blob_num == blob_info.blob_num
|
||||||
decrypted = await self.downloader.read_blob(blob_info, connection_id)
|
if connection_id == self.STREAMING_ID:
|
||||||
|
decrypted = await self.downloader.cached_read_blob(blob_info)
|
||||||
|
else:
|
||||||
|
decrypted = await self.downloader.read_blob(blob_info, connection_id)
|
||||||
yield (blob_info, decrypted)
|
yield (blob_info, decrypted)
|
||||||
|
|
||||||
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
|
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
|
||||||
|
@ -354,7 +360,7 @@ class ManagedStream:
|
||||||
self.started_writing.clear()
|
self.started_writing.clear()
|
||||||
try:
|
try:
|
||||||
with open(output_path, 'wb') as file_write_handle:
|
with open(output_path, 'wb') as file_write_handle:
|
||||||
async for blob_info, decrypted in self._aiter_read_stream(connection_id=1):
|
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
|
||||||
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
||||||
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
|
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
|
||||||
self.written_bytes += len(decrypted)
|
self.written_bytes += len(decrypted)
|
||||||
|
|
|
@ -62,6 +62,7 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
LEDGER = lbrynet.wallet
|
LEDGER = lbrynet.wallet
|
||||||
MANAGER = LbryWalletManager
|
MANAGER = LbryWalletManager
|
||||||
VERBOSITY = logging.WARN
|
VERBOSITY = logging.WARN
|
||||||
|
blob_lru_cache_size = 0
|
||||||
|
|
||||||
async def asyncSetUp(self):
|
async def asyncSetUp(self):
|
||||||
await super().asyncSetUp()
|
await super().asyncSetUp()
|
||||||
|
@ -81,6 +82,7 @@ class CommandTestCase(IntegrationTestCase):
|
||||||
conf.lbryum_servers = [('127.0.0.1', 50001)]
|
conf.lbryum_servers = [('127.0.0.1', 50001)]
|
||||||
conf.reflector_servers = [('127.0.0.1', 5566)]
|
conf.reflector_servers = [('127.0.0.1', 5566)]
|
||||||
conf.known_dht_nodes = []
|
conf.known_dht_nodes = []
|
||||||
|
conf.blob_lru_cache_size = self.blob_lru_cache_size
|
||||||
|
|
||||||
await self.account.ensure_address_gap()
|
await self.account.ensure_address_gap()
|
||||||
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
|
address = (await self.account.receiving.get_addresses(limit=1, only_usable=True))[0]
|
||||||
|
|
|
@ -229,11 +229,12 @@ class LRUCache:
|
||||||
return item in self.cache
|
return item in self.cache
|
||||||
|
|
||||||
|
|
||||||
def lru_cache_concurrent(cache_size: int):
|
def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
|
||||||
if not cache_size > 0:
|
override_lru_cache: typing.Optional[LRUCache] = None):
|
||||||
|
if not cache_size and override_lru_cache is None:
|
||||||
raise ValueError("invalid cache size")
|
raise ValueError("invalid cache size")
|
||||||
concurrent_cache = {}
|
concurrent_cache = {}
|
||||||
lru_cache = LRUCache(cache_size)
|
lru_cache = override_lru_cache or LRUCache(cache_size)
|
||||||
|
|
||||||
def wrapper(async_fn):
|
def wrapper(async_fn):
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ import os
|
||||||
import hashlib
|
import hashlib
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import aiohttp.web
|
import aiohttp.web
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from lbrynet.utils import aiohttp_request
|
from lbrynet.utils import aiohttp_request
|
||||||
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
from lbrynet.blob.blob_file import MAX_BLOB_SIZE
|
||||||
|
@ -373,3 +374,46 @@ class RangeRequests(CommandTestCase):
|
||||||
await stream.finished_writing.wait()
|
await stream.finished_writing.wait()
|
||||||
with open(stream.full_path, 'rb') as f:
|
with open(stream.full_path, 'rb') as f:
|
||||||
self.assertEqual(self.data, f.read())
|
self.assertEqual(self.data, f.read())
|
||||||
|
|
||||||
|
|
||||||
|
class RangeRequestsLRUCache(CommandTestCase):
|
||||||
|
blob_lru_cache_size = 32
|
||||||
|
|
||||||
|
async def _request_stream(self):
|
||||||
|
name = 'foo'
|
||||||
|
url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/{name}'
|
||||||
|
|
||||||
|
async with aiohttp_request('get', url) as req:
|
||||||
|
self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream')
|
||||||
|
content_range = req.headers.get('Content-Range')
|
||||||
|
content_length = int(req.headers.get('Content-Length'))
|
||||||
|
streamed_bytes = await req.content.read()
|
||||||
|
self.assertEqual(content_length, len(streamed_bytes))
|
||||||
|
self.assertEqual(15, content_length)
|
||||||
|
self.assertEqual(b'hi\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', streamed_bytes)
|
||||||
|
self.assertEqual('bytes 0-14/15', content_range)
|
||||||
|
|
||||||
|
async def test_range_requests_with_blob_lru_cache(self):
|
||||||
|
self.data = b'hi'
|
||||||
|
self.daemon.conf.save_blobs = False
|
||||||
|
self.daemon.conf.save_files = False
|
||||||
|
await self.stream_create('foo', '0.01', data=self.data, file_size=0)
|
||||||
|
await self.daemon.jsonrpc_file_list()[0].fully_reflected.wait()
|
||||||
|
await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo')
|
||||||
|
self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir)))
|
||||||
|
|
||||||
|
await self.daemon.streaming_runner.setup()
|
||||||
|
site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host,
|
||||||
|
self.daemon.conf.streaming_port)
|
||||||
|
await site.start()
|
||||||
|
self.assertListEqual(self.daemon.jsonrpc_file_list(), [])
|
||||||
|
|
||||||
|
await self._request_stream()
|
||||||
|
self.assertEqual(1, len(self.daemon.jsonrpc_file_list()))
|
||||||
|
self.server.stop_server()
|
||||||
|
|
||||||
|
# running with cache size 0 gets through without errors without
|
||||||
|
# this since the server doesnt stop immediately
|
||||||
|
await asyncio.sleep(1, loop=self.loop)
|
||||||
|
|
||||||
|
await self._request_stream()
|
||||||
|
|
Loading…
Reference in a new issue