From 10dcb64715bcf0679a43aaa7e70c062206cb0c21 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 23 Dec 2020 16:37:31 -0500 Subject: [PATCH 1/3] lru cache metrics --- lbry/conf.py | 2 +- lbry/stream/downloader.py | 2 +- lbry/utils.py | 66 ++++++++++++++++++++++++++++++++++++--- lbry/wallet/ledger.py | 11 ++++--- 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/lbry/conf.py b/lbry/conf.py index 7c05389fb..7a4abd080 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -638,7 +638,7 @@ class Config(CLIConfig): "Strategy to use when selecting UTXOs for a transaction", STRATEGIES, "standard") - transaction_cache_size = Integer("Transaction cache size", 100_000) + transaction_cache_size = Integer("Transaction cache size", 2 ** 17) save_resolved_claims = Toggle( "Save content claims to the database when they are resolved to keep file_list up to date, " "only disable this if file_x commands are not needed", True diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 94537e034..4e6bf1641 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -40,7 +40,7 @@ class StreamDownloader: async def cached_read_blob(blob_info: 'BlobInfo') -> bytes: return await self.read_blob(blob_info, 2) - if self.blob_manager.decrypted_blob_lru_cache: + if self.blob_manager.decrypted_blob_lru_cache is not None: cached_read_blob = lru_cache_concurrent(override_lru_cache=self.blob_manager.decrypted_blob_lru_cache)( cached_read_blob ) diff --git a/lbry/utils.py b/lbry/utils.py index 0db443cd9..d449076f3 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -20,8 +20,11 @@ import pkg_resources import certifi import aiohttp +from prometheus_client import Counter +from prometheus_client.registry import REGISTRY from lbry.schema.claim import Claim + log = logging.getLogger(__name__) @@ -206,15 +209,41 @@ async def resolve_host(url: str, port: int, proto: str) -> str: class LRUCache: __slots__ = [ 'capacity', - 'cache' + 'cache', + '_track_metrics', + 'hits', + 'misses' ] - def __init__(self, capacity): + def __init__(self, capacity: int, metric_name: typing.Optional[str] = None): self.capacity = capacity self.cache = collections.OrderedDict() + if metric_name is None: + self._track_metrics = False + self.hits = self.misses = None + else: + self._track_metrics = True + try: + self.hits = Counter( + f"{metric_name}_cache_hit_count", "Number of cache hits", namespace="daemon_cache" + ) + self.misses = Counter( + f"{metric_name}_cache_miss_count", "Number of cache misses", namespace="daemon_cache" + ) + except ValueError as err: + log.warning("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err) + self._track_metrics = False + self.hits = self.misses = None - def get(self, key): - value = self.cache.pop(key) + def get(self, key, default=None): + try: + value = self.cache.pop(key) + if self._track_metrics: + self.hits.inc() + except KeyError: + if self._track_metrics: + self.misses.inc() + return default self.cache[key] = value return value @@ -226,16 +255,43 @@ class LRUCache: self.cache.popitem(last=False) self.cache[key] = value + def clear(self): + self.cache.clear() + + def pop(self, key): + return self.cache.pop(key) + + def __setitem__(self, key, value): + return self.set(key, value) + + def __getitem__(self, item): + return self.get(item) + def __contains__(self, item) -> bool: return item in self.cache + def __len__(self): + return len(self.cache) + + def __delitem__(self, key): + self.cache.pop(key) + + def __del__(self): + self.clear() + if self._track_metrics: # needed for tests + try: + REGISTRY.unregister(self.hits) + REGISTRY.unregister(self.misses) + except AttributeError: + pass + def lru_cache_concurrent(cache_size: typing.Optional[int] = None, override_lru_cache: typing.Optional[LRUCache] = None): if not cache_size and override_lru_cache is None: raise ValueError("invalid cache size") concurrent_cache = {} - lru_cache = override_lru_cache or LRUCache(cache_size) + lru_cache = override_lru_cache if override_lru_cache is not None else LRUCache(cache_size) def wrapper(async_fn): diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 4dee98dc1..3d6824681 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -10,11 +10,11 @@ from collections import defaultdict from binascii import hexlify, unhexlify from typing import Dict, Tuple, Type, Iterable, List, Optional, DefaultDict, NamedTuple -import pylru from lbry.schema.result import Outputs, INVALID, NOT_FOUND from lbry.schema.url import URL from lbry.crypto.hash import hash160, double_sha256, sha256 from lbry.crypto.base58 import Base58 +from lbry.utils import LRUCache from .tasks import TaskGroup from .database import Database @@ -155,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_ready_controller = StreamController() self.on_ready = self._on_ready_controller.stream - self._tx_cache = pylru.lrucache(self.config.get("tx_cache_size", 1024)) + self._tx_cache = LRUCache(self.config.get("tx_cache_size", 1024), metric_name='tx') self._update_tasks = TaskGroup() self._other_tasks = TaskGroup() # that we dont need to start self._utxo_reservation_lock = asyncio.Lock() @@ -167,7 +167,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync = set() self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char) - self._balance_cache = pylru.lrucache(100000) + self._balance_cache = LRUCache(2 ** 15) @classmethod def get_id(cls): @@ -614,8 +614,9 @@ class Ledger(metaclass=LedgerRegistry): for txid, height in sorted(to_request, key=lambda x: x[1]): if cached: - if txid in self._tx_cache: - if self._tx_cache[txid].tx is not None and self._tx_cache[txid].tx.is_verified: + cached_tx = self._tx_cache.get(txid) + if cached_tx is not None: + if cached_tx.tx is not None and cached_tx.tx.is_verified: cache_hits.add(txid) continue else: From 13e38d6fd82f12363975dc2273c7a7b8a01ff207 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 23 Dec 2020 17:45:05 -0500 Subject: [PATCH 2/3] use LRUCache instead of pylru in wallet server --- lbry/utils.py | 6 +++--- lbry/wallet/server/daemon.py | 7 +++---- lbry/wallet/server/leveldb.py | 5 ++--- lbry/wallet/server/session.py | 7 +++---- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/lbry/utils.py b/lbry/utils.py index d449076f3..a2fd2cc86 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -215,7 +215,7 @@ class LRUCache: 'misses' ] - def __init__(self, capacity: int, metric_name: typing.Optional[str] = None): + def __init__(self, capacity: int, metric_name: typing.Optional[str] = None, namespace: str = "daemon_cache"): self.capacity = capacity self.cache = collections.OrderedDict() if metric_name is None: @@ -225,10 +225,10 @@ class LRUCache: self._track_metrics = True try: self.hits = Counter( - f"{metric_name}_cache_hit_count", "Number of cache hits", namespace="daemon_cache" + f"{metric_name}_cache_hit_count", "Number of cache hits", namespace=namespace ) self.misses = Counter( - f"{metric_name}_cache_miss_count", "Number of cache misses", namespace="daemon_cache" + f"{metric_name}_cache_miss_count", "Number of cache misses", namespace=namespace ) except ValueError as err: log.warning("failed to set up prometheus %s_cache_miss_count metric: %s", metric_name, err) diff --git a/lbry/wallet/server/daemon.py b/lbry/wallet/server/daemon.py index 44a366b6a..654b4337e 100644 --- a/lbry/wallet/server/daemon.py +++ b/lbry/wallet/server/daemon.py @@ -3,11 +3,10 @@ import itertools import json import time from functools import wraps -from pylru import lrucache import aiohttp from prometheus_client import Gauge, Histogram - +from lbry.utils import LRUCache from lbry.wallet.rpc.jsonrpc import RPCError from lbry.wallet.server.util import hex_to_bytes, class_logger from lbry.wallet.rpc import JSONRPC @@ -55,8 +54,8 @@ class Daemon: self._height = None self.available_rpcs = {} self.connector = aiohttp.TCPConnector() - self._block_hash_cache = lrucache(100000) - self._block_cache = lrucache(10000) + self._block_hash_cache = LRUCache(100000) + self._block_cache = LRUCache(10000) async def close(self): if self.connector: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 50ac57a96..542f7d61f 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -15,7 +15,6 @@ import ast import os import time import zlib -import pylru import typing from typing import Optional, List, Tuple, Iterable from asyncio import sleep @@ -25,7 +24,7 @@ from glob import glob from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor import attr - +from lbry.utils import LRUCache from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache @@ -94,7 +93,7 @@ class LevelDB: self.headers_db = None self.tx_db = None - self._tx_and_merkle_cache = pylru.lrucache(100000) + self._tx_and_merkle_cache = LRUCache(100000, metric_name='tx_and_merkle', namespace="wallet_server") self.total_transactions = None async def _read_tx_counts(self): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6f9808497..1436492eb 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -4,7 +4,6 @@ import math import time import json import zlib -import pylru import base64 import codecs import typing @@ -18,11 +17,11 @@ from collections import defaultdict from functools import partial from binascii import hexlify, unhexlify -from pylru import lrucache from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from prometheus_client import Counter, Info, Histogram, Gauge import lbry +from lbry.utils import LRUCache from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.db.writer import LBRYLevelDB @@ -811,8 +810,8 @@ class LBRYSessionManager(SessionManager): if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) self.search_cache = self.bp.search_cache - self.search_cache['search'] = lrucache(10000) - self.search_cache['resolve'] = lrucache(10000) + self.search_cache['search'] = LRUCache(10000, metric_name='search', namespace=NAMESPACE) + self.search_cache['resolve'] = LRUCache(10000, metric_name='resolve', namespace=NAMESPACE) async def process_metrics(self): while self.running: From 9dd5159414e741ab332a83ad81785606518ee3d9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 23 Dec 2020 18:26:19 -0500 Subject: [PATCH 3/3] increase server cache sizes --- lbry/wallet/server/leveldb.py | 2 +- lbry/wallet/server/session.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 542f7d61f..8f9cff42e 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -93,7 +93,7 @@ class LevelDB: self.headers_db = None self.tx_db = None - self._tx_and_merkle_cache = LRUCache(100000, metric_name='tx_and_merkle', namespace="wallet_server") + self._tx_and_merkle_cache = LRUCache(2**17, metric_name='tx_and_merkle', namespace="wallet_server") self.total_transactions = None async def _read_tx_counts(self): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 1436492eb..8d9f58f92 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -810,8 +810,8 @@ class LBRYSessionManager(SessionManager): if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) self.search_cache = self.bp.search_cache - self.search_cache['search'] = LRUCache(10000, metric_name='search', namespace=NAMESPACE) - self.search_cache['resolve'] = LRUCache(10000, metric_name='resolve', namespace=NAMESPACE) + self.search_cache['search'] = LRUCache(2**14, metric_name='search', namespace=NAMESPACE) + self.search_cache['resolve'] = LRUCache(2**16, metric_name='resolve', namespace=NAMESPACE) async def process_metrics(self): while self.running: