Merge pull request #3127 from lbryio/lru-cache-metrics

Add metrics for lru caches
This commit is contained in:
Jack Robison 2020-12-23 21:08:01 -05:00 committed by GitHub
commit 364f484f04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 23 deletions

View file

@ -638,7 +638,7 @@ class Config(CLIConfig):
"Strategy to use when selecting UTXOs for a transaction",
STRATEGIES, "standard")
transaction_cache_size = Integer("Transaction cache size", 100_000)
transaction_cache_size = Integer("Transaction cache size", 2 ** 17)
save_resolved_claims = Toggle(
"Save content claims to the database when they are resolved to keep file_list up to date, "
"only disable this if file_x commands are not needed", True

View file

@ -40,7 +40,7 @@ class StreamDownloader:
async def cached_read_blob(blob_info: 'BlobInfo') -> bytes:
return await self.read_blob(blob_info, 2)
if self.blob_manager.decrypted_blob_lru_cache:
if self.blob_manager.decrypted_blob_lru_cache is not None:
cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)(
cached_read_blob
)

View file

@ -20,8 +20,11 @@ import pkg_resources
import certifi
import aiohttp
from prometheus_client import Counter
from prometheus_client.registry import REGISTRY
from lbry.schema.claim import Claim
log = logging.getLogger(__name__)
@ -206,15 +209,41 @@ async def resolve_host(url: str, port: int, proto: str) -> str:
class LRUCache:
__slots__ = [
'capacity',
'cache'
'cache',
'_track_metrics',
'hits',
'misses'
]
def __init__(self, capacity):
def __init__(self, capacity: int, metric_name: typing.Optional[str] = None, namespace: str = "daemon_cache"):
self.capacity = capacity
self.cache = collections.OrderedDict()
if metric_name is None:
self._track_metrics = False
self.hits = self.misses = None
else:
self._track_metrics = True
try:
self.hits = Counter(
f"{metric_name}_cache_hit_count", "Number of cache hits", namespace=namespace
)
self.misses = Counter(
f"{metric_name}_cache_miss_count", "Number of cache misses", namespace=namespace
)
except ValueError as err:
log.warning("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err)
self._track_metrics = False
self.hits = self.misses = None
def get(self, key):
value = self.cache.pop(key)
def get(self, key, default=None):
try:
value = self.cache.pop(key)
if self._track_metrics:
self.hits.inc()
except KeyError:
if self._track_metrics:
self.misses.inc()
return default
self.cache[key] = value
return value
@ -226,16 +255,43 @@ class LRUCache:
self.cache.popitem(last=False)
self.cache[key] = value
def clear(self):
self.cache.clear()
def pop(self, key):
return self.cache.pop(key)
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self.cache
def __len__(self):
return len(self.cache)
def __delitem__(self, key):
self.cache.pop(key)
def __del__(self):
self.clear()
if self._track_metrics: # needed for tests
try:
REGISTRY.unregister(self.hits)
REGISTRY.unregister(self.misses)
except AttributeError:
pass
def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
override_lru_cache: typing.Optional[LRUCache] = None):
if not cache_size and override_lru_cache is None:
raise ValueError("invalid cache size")
concurrent_cache = {}
lru_cache = override_lru_cache or LRUCache(cache_size)
lru_cache = override_lru_cache if override_lru_cache is not None else LRUCache(cache_size)
def wrapper(async_fn):

View file

@ -10,11 +10,11 @@ from collections import defaultdict
from binascii import hexlify, unhexlify
from typing import Dict, Tuple, Type, Iterable, List, Optional, DefaultDict, NamedTuple
import pylru
from lbry.schema.result import Outputs, INVALID, NOT_FOUND
from lbry.schema.url import URL
from lbry.crypto.hash import hash160, double_sha256, sha256
from lbry.crypto.base58 import Base58
from lbry.utils import LRUCache
from .tasks import TaskGroup
from .database import Database
@ -155,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry):
self._on_ready_controller = StreamController()
self.on_ready = self._on_ready_controller.stream
self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 1024))
self._tx_cache = LRUCache(self.config.get("tx_cache_size", 1024), metric_name='tx')
self._update_tasks = TaskGroup()
self._other_tasks = TaskGroup() # that we dont need to start
self._utxo_reservation_lock = asyncio.Lock()
@ -167,7 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
self._known_addresses_out_of_sync = set()
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
self._balance_cache = pylru.lrucache(100000)
self._balance_cache = LRUCache(2 ** 15)
@classmethod
def get_id(cls):
@ -614,8 +614,9 @@ class Ledger(metaclass=LedgerRegistry):
for txid, height in sorted(to_request, key=lambda x: x[1]):
if cached:
if txid in self._tx_cache:
if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified:
cached_tx = self._tx_cache.get(txid)
if cached_tx is not None:
if cached_tx.tx is not None and cached_tx.tx.is_verified:
cache_hits.add(txid)
continue
else:

View file

@ -3,11 +3,10 @@ import itertools
import json
import time
from functools import wraps
from pylru import lrucache
import aiohttp
from prometheus_client import Gauge, Histogram
from lbry.utils import LRUCache
from lbry.wallet.rpc.jsonrpc import RPCError
from lbry.wallet.server.util import hex_to_bytes, class_logger
from lbry.wallet.rpc import JSONRPC
@ -55,8 +54,8 @@ class Daemon:
self._height = None
self.available_rpcs = {}
self.connector = aiohttp.TCPConnector()
self._block_hash_cache = lrucache(100000)
self._block_cache = lrucache(10000)
self._block_hash_cache = LRUCache(100000)
self._block_cache = LRUCache(10000)
async def close(self):
if self.connector:

View file

@ -15,7 +15,6 @@ import ast
import os
import time
import zlib
import pylru
import typing
from typing import Optional, List, Tuple, Iterable
from asyncio import sleep
@ -25,7 +24,7 @@ from glob import glob
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
import attr
from lbry.utils import LRUCache
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache
@ -94,7 +93,7 @@ class LevelDB:
self.headers_db = None
self.tx_db = None
self._tx_and_merkle_cache = pylru.lrucache(100000)
self._tx_and_merkle_cache = LRUCache(2**17, metric_name='tx_and_merkle', namespace="wallet_server")
self.total_transactions = None
async def _read_tx_counts(self):

View file

@ -4,7 +4,6 @@ import math
import time
import json
import zlib
import pylru
import base64
import codecs
import typing
@ -18,11 +17,11 @@ from collections import defaultdict
from functools import partial
from binascii import hexlify, unhexlify
from pylru import lrucache
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from prometheus_client import Counter, Info, Histogram, Gauge
import lbry
from lbry.utils import LRUCache
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYLevelDB
@ -811,8 +810,8 @@ class LBRYSessionManager(SessionManager):
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
self.search_cache = self.bp.search_cache
self.search_cache['search'] = lrucache(10000)
self.search_cache['resolve'] = lrucache(10000)
self.search_cache['search'] = LRUCache(2**14, metric_name='search', namespace=NAMESPACE)
self.search_cache['resolve'] = LRUCache(2**16, metric_name='resolve', namespace=NAMESPACE)
async def process_metrics(self):
while self.running: