From 10dcb64715bcf0679a43aaa7e70c062206cb0c21 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 23 Dec 2020 16:37:31 -0500 Subject: [PATCH] lru cache metrics --- lbry/conf.py | 2 +- lbry/stream/downloader.py | 2 +- lbry/utils.py | 66 ++++++++++++++++++++++++++++++++++++--- lbry/wallet/ledger.py | 11 ++++--- 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/lbry/conf.py b/lbry/conf.py index 7c05389fb..7a4abd080 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -638,7 +638,7 @@ class Config(CLIConfig): "Strategy to use when selecting UTXOs for a transaction", STRATEGIES, "standard") - transaction_cache_size = Integer("Transaction cache size", 100_000) + transaction_cache_size = Integer("Transaction cache size", 2 ** 17) save_resolved_claims = Toggle( "Save content claims to the database when they are resolved to keep file_list up to date, " "only disable this if file_x commands are not needed", True diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 94537e034..4e6bf1641 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -40,7 +40,7 @@ class StreamDownloader: async def cached_read_blob(blob_info: 'BlobInfo') -> bytes: return await self.read_blob(blob_info, 2) - if self.blob_manager.decrypted_blob_lru_cache: + if self.blob_manager.decrypted_blob_lru_cache is not None: cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)( cached_read_blob ) diff --git a/lbry/utils.py b/lbry/utils.py index 0db443cd9..d449076f3 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -20,8 +20,11 @@ import pkg_resources import certifi import aiohttp +from prometheus_client import Counter +from prometheus_client.registry import REGISTRY from lbry.schema.claim import Claim + log = logging.getLogger(__name__) @@ -206,15 +209,41 @@ async def resolve_host(url: str, port: int, proto: str) -> str: class LRUCache: __slots__ = [ 'capacity', - 'cache' + 'cache', + '_track_metrics', + 'hits', + 'misses' ] - def __init__(self, capacity): + def __init__(self, capacity: int, metric_name: typing.Optional[str] = None): self.capacity = capacity self.cache = collections.OrderedDict() + if metric_name is None: + self._track_metrics = False + self.hits = self.misses = None + else: + self._track_metrics = True + try: + self.hits = Counter( + f"{metric_name}_cache_hit_count", "Number of cache hits", namespace="daemon_cache" + ) + self.misses = Counter( + f"{metric_name}_cache_miss_count", "Number of cache misses", namespace="daemon_cache" + ) + except ValueError as err: + log.warning("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err) + self._track_metrics = False + self.hits = self.misses = None - def get(self, key): - value = self.cache.pop(key) + def get(self, key, default=None): + try: + value = self.cache.pop(key) + if self._track_metrics: + self.hits.inc() + except KeyError: + if self._track_metrics: + self.misses.inc() + return default self.cache[key] = value return value @@ -226,16 +255,43 @@ class LRUCache: self.cache.popitem(last=False) self.cache[key] = value + def clear(self): + self.cache.clear() + + def pop(self, key): + return self.cache.pop(key) + + def __setitem__(self, key, value): + return self.set(key, value) + + def __getitem__(self, item): + return self.get(item) + def __contains__(self, item) -> bool: return item in self.cache + def __len__(self): + return len(self.cache) + + def __delitem__(self, key): + self.cache.pop(key) + + def __del__(self): + self.clear() + if self._track_metrics: # needed for tests + try: + REGISTRY.unregister(self.hits) + REGISTRY.unregister(self.misses) + except AttributeError: + pass + def lru_cache_concurrent(cache_size: typing.Optional[int] = None, override_lru_cache: typing.Optional[LRUCache] = None): if not cache_size and override_lru_cache is None: raise ValueError("invalid cache size") concurrent_cache = {} - lru_cache = override_lru_cache or LRUCache(cache_size) + lru_cache = override_lru_cache if override_lru_cache is not None else LRUCache(cache_size) def wrapper(async_fn): diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 4dee98dc1..3d6824681 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -10,11 +10,11 @@ from collections import defaultdict from binascii import hexlify, unhexlify from typing import Dict, Tuple, Type, Iterable, List, Optional, DefaultDict, NamedTuple -import pylru from lbry.schema.result import Outputs, INVALID, NOT_FOUND from lbry.schema.url import URL from lbry.crypto.hash import hash160, double_sha256, sha256 from lbry.crypto.base58 import Base58 +from lbry.utils import LRUCache from .tasks import TaskGroup from .database import Database @@ -155,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_ready_controller = StreamController() self.on_ready = self._on_ready_controller.stream - self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 1024)) + self._tx_cache = LRUCache(self.config.get("tx_cache_size", 1024), metric_name='tx') self._update_tasks = TaskGroup() self._other_tasks = TaskGroup() # that we dont need to start self._utxo_reservation_lock = asyncio.Lock() @@ -167,7 +167,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync = set() self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char) - self._balance_cache = pylru.lrucache(100000) + self._balance_cache = LRUCache(2 ** 15) @classmethod def get_id(cls): @@ -614,8 +614,9 @@ class Ledger(metaclass=LedgerRegistry): for txid, height in sorted(to_request, key=lambda x: x[1]): if cached: - if txid in self._tx_cache: - if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + cached_tx = self._tx_cache.get(txid) + if cached_tx is not None: + if cached_tx.tx is not None and cached_tx.tx.is_verified: cache_hits.add(txid) continue else: