forked from LBRYCommunity/lbry-sdk
lru cache metrics
This commit is contained in:
parent
7551b51e7d
commit
10dcb64715
4 changed files with 69 additions and 12 deletions
|
@ -638,7 +638,7 @@ class Config(CLIConfig):
|
||||||
"Strategy to use when selecting UTXOs for a transaction",
|
"Strategy to use when selecting UTXOs for a transaction",
|
||||||
STRATEGIES, "standard")
|
STRATEGIES, "standard")
|
||||||
|
|
||||||
transaction_cache_size = Integer("Transaction cache size", 100_000)
|
transaction_cache_size = Integer("Transaction cache size", 2 ** 17)
|
||||||
save_resolved_claims = Toggle(
|
save_resolved_claims = Toggle(
|
||||||
"Save content claims to the database when they are resolved to keep file_list up to date, "
|
"Save content claims to the database when they are resolved to keep file_list up to date, "
|
||||||
"only disable this if file_x commands are not needed", True
|
"only disable this if file_x commands are not needed", True
|
||||||
|
|
|
@ -40,7 +40,7 @@ class StreamDownloader:
|
||||||
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
|
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
|
||||||
return await self.read_blob(blob_info, 2)
|
return await self.read_blob(blob_info, 2)
|
||||||
|
|
||||||
if self.blob_manager.decrypted_blob_lru_cache:
|
if self.blob_manager.decrypted_blob_lru_cache is not None:
|
||||||
cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)(
|
cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)(
|
||||||
cached_read_blob
|
cached_read_blob
|
||||||
)
|
)
|
||||||
|
|
|
@ -20,8 +20,11 @@ import pkg_resources
|
||||||
|
|
||||||
import certifi
|
import certifi
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
from prometheus_client import Counter
|
||||||
|
from prometheus_client.registry import REGISTRY
|
||||||
from lbry.schema.claim import Claim
|
from lbry.schema.claim import Claim
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -206,15 +209,41 @@ async def resolve_host(url: str, port: int, proto: str) -> str:
|
||||||
class LRUCache:
|
class LRUCache:
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
'capacity',
|
'capacity',
|
||||||
'cache'
|
'cache',
|
||||||
|
'_track_metrics',
|
||||||
|
'hits',
|
||||||
|
'misses'
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, capacity):
|
def __init__(self, capacity: int, metric_name: typing.Optional[str] = None):
|
||||||
self.capacity = capacity
|
self.capacity = capacity
|
||||||
self.cache = collections.OrderedDict()
|
self.cache = collections.OrderedDict()
|
||||||
|
if metric_name is None:
|
||||||
|
self._track_metrics = False
|
||||||
|
self.hits = self.misses = None
|
||||||
|
else:
|
||||||
|
self._track_metrics = True
|
||||||
|
try:
|
||||||
|
self.hits = Counter(
|
||||||
|
f"{metric_name}_cache_hit_count", "Number of cache hits", namespace="daemon_cache"
|
||||||
|
)
|
||||||
|
self.misses = Counter(
|
||||||
|
f"{metric_name}_cache_miss_count", "Number of cache misses", namespace="daemon_cache"
|
||||||
|
)
|
||||||
|
except ValueError as err:
|
||||||
|
log.warning("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err)
|
||||||
|
self._track_metrics = False
|
||||||
|
self.hits = self.misses = None
|
||||||
|
|
||||||
def get(self, key):
|
def get(self, key, default=None):
|
||||||
value = self.cache.pop(key)
|
try:
|
||||||
|
value = self.cache.pop(key)
|
||||||
|
if self._track_metrics:
|
||||||
|
self.hits.inc()
|
||||||
|
except KeyError:
|
||||||
|
if self._track_metrics:
|
||||||
|
self.misses.inc()
|
||||||
|
return default
|
||||||
self.cache[key] = value
|
self.cache[key] = value
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
@ -226,16 +255,43 @@ class LRUCache:
|
||||||
self.cache.popitem(last=False)
|
self.cache.popitem(last=False)
|
||||||
self.cache[key] = value
|
self.cache[key] = value
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.cache.clear()
|
||||||
|
|
||||||
|
def pop(self, key):
|
||||||
|
return self.cache.pop(key)
|
||||||
|
|
||||||
|
def __setitem__(self, key, value):
|
||||||
|
return self.set(key, value)
|
||||||
|
|
||||||
|
def __getitem__(self, item):
|
||||||
|
return self.get(item)
|
||||||
|
|
||||||
def __contains__(self, item) -> bool:
|
def __contains__(self, item) -> bool:
|
||||||
return item in self.cache
|
return item in self.cache
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.cache)
|
||||||
|
|
||||||
|
def __delitem__(self, key):
|
||||||
|
self.cache.pop(key)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
self.clear()
|
||||||
|
if self._track_metrics: # needed for tests
|
||||||
|
try:
|
||||||
|
REGISTRY.unregister(self.hits)
|
||||||
|
REGISTRY.unregister(self.misses)
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
|
def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
|
||||||
override_lru_cache: typing.Optional[LRUCache] = None):
|
override_lru_cache: typing.Optional[LRUCache] = None):
|
||||||
if not cache_size and override_lru_cache is None:
|
if not cache_size and override_lru_cache is None:
|
||||||
raise ValueError("invalid cache size")
|
raise ValueError("invalid cache size")
|
||||||
concurrent_cache = {}
|
concurrent_cache = {}
|
||||||
lru_cache = override_lru_cache or LRUCache(cache_size)
|
lru_cache = override_lru_cache if override_lru_cache is not None else LRUCache(cache_size)
|
||||||
|
|
||||||
def wrapper(async_fn):
|
def wrapper(async_fn):
|
||||||
|
|
||||||
|
|
|
@ -10,11 +10,11 @@ from collections import defaultdict
|
||||||
from binascii import hexlify, unhexlify
|
from binascii import hexlify, unhexlify
|
||||||
from typing import Dict, Tuple, Type, Iterable, List, Optional, DefaultDict, NamedTuple
|
from typing import Dict, Tuple, Type, Iterable, List, Optional, DefaultDict, NamedTuple
|
||||||
|
|
||||||
import pylru
|
|
||||||
from lbry.schema.result import Outputs, INVALID, NOT_FOUND
|
from lbry.schema.result import Outputs, INVALID, NOT_FOUND
|
||||||
from lbry.schema.url import URL
|
from lbry.schema.url import URL
|
||||||
from lbry.crypto.hash import hash160, double_sha256, sha256
|
from lbry.crypto.hash import hash160, double_sha256, sha256
|
||||||
from lbry.crypto.base58 import Base58
|
from lbry.crypto.base58 import Base58
|
||||||
|
from lbry.utils import LRUCache
|
||||||
|
|
||||||
from .tasks import TaskGroup
|
from .tasks import TaskGroup
|
||||||
from .database import Database
|
from .database import Database
|
||||||
|
@ -155,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._on_ready_controller = StreamController()
|
self._on_ready_controller = StreamController()
|
||||||
self.on_ready = self._on_ready_controller.stream
|
self.on_ready = self._on_ready_controller.stream
|
||||||
|
|
||||||
self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 1024))
|
self._tx_cache = LRUCache(self.config.get("tx_cache_size", 1024), metric_name='tx')
|
||||||
self._update_tasks = TaskGroup()
|
self._update_tasks = TaskGroup()
|
||||||
self._other_tasks = TaskGroup() # that we dont need to start
|
self._other_tasks = TaskGroup() # that we dont need to start
|
||||||
self._utxo_reservation_lock = asyncio.Lock()
|
self._utxo_reservation_lock = asyncio.Lock()
|
||||||
|
@ -167,7 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
self._known_addresses_out_of_sync = set()
|
self._known_addresses_out_of_sync = set()
|
||||||
|
|
||||||
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
|
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
|
||||||
self._balance_cache = pylru.lrucache(100000)
|
self._balance_cache = LRUCache(2 ** 15)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_id(cls):
|
def get_id(cls):
|
||||||
|
@ -614,8 +614,9 @@ class Ledger(metaclass=LedgerRegistry):
|
||||||
|
|
||||||
for txid, height in sorted(to_request, key=lambda x: x[1]):
|
for txid, height in sorted(to_request, key=lambda x: x[1]):
|
||||||
if cached:
|
if cached:
|
||||||
if txid in self._tx_cache:
|
cached_tx = self._tx_cache.get(txid)
|
||||||
if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified:
|
if cached_tx is not None:
|
||||||
|
if cached_tx.tx is not None and cached_tx.tx.is_verified:
|
||||||
cache_hits.add(txid)
|
cache_hits.add(txid)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in a new issue