From 68f1661452db70bb8acaa7a9face0c951ad6988d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 16 Jan 2021 16:25:46 -0500 Subject: [PATCH] add LRUCache with no prometheus metrics --- lbry/blob/blob_manager.py | 4 +-- lbry/utils.py | 57 +++++++++++++++++++++++++++++++++-- lbry/wallet/ledger.py | 6 ++-- lbry/wallet/server/daemon.py | 6 ++-- lbry/wallet/server/leveldb.py | 4 +-- lbry/wallet/server/session.py | 6 ++-- 6 files changed, 67 insertions(+), 16 deletions(-) diff --git a/lbry/blob/blob_manager.py b/lbry/blob/blob_manager.py index 49a67b18a..4a7e54740 100644 --- a/lbry/blob/blob_manager.py +++ b/lbry/blob/blob_manager.py @@ -2,7 +2,7 @@ import os import typing import asyncio import logging -from lbry.utils import LRUCache +from lbry.utils import LRUCacheWithMetrics from lbry.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob from lbry.stream.descriptor import StreamDescriptor from lbry.connection_manager import ConnectionManager @@ -32,7 +32,7 @@ class BlobManager: else self._node_data_store.completed_blobs self.blobs: typing.Dict[str, AbstractBlob] = {} self.config = config - self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache( + self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCacheWithMetrics( self.config.blob_lru_cache_size) self.connection_manager = ConnectionManager(loop) diff --git a/lbry/utils.py b/lbry/utils.py index a2fd2cc86..4202a2e27 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -206,7 +206,7 @@ async def resolve_host(url: str, port: int, proto: str) -> str: ))[0][4][0] -class LRUCache: +class LRUCacheWithMetrics: __slots__ = [ 'capacity', 'cache', @@ -286,12 +286,63 @@ class LRUCache: pass +class LRUCache: + __slots__ = [ + 'capacity', + 'cache' + ] + + def __init__(self, capacity: int): + self.capacity = capacity + self.cache = collections.OrderedDict() + + def get(self, key, default=None): + try: + value = self.cache.pop(key) + except KeyError: + return default + self.cache[key] = value + return value + + def set(self, key, value): + try: + self.cache.pop(key) + except KeyError: + if len(self.cache) >= self.capacity: + self.cache.popitem(last=False) + self.cache[key] = value + + def clear(self): + self.cache.clear() + + def pop(self, key): + return self.cache.pop(key) + + def __setitem__(self, key, value): + return self.set(key, value) + + def __getitem__(self, item): + return self.get(item) + + def __contains__(self, item) -> bool: + return item in self.cache + + def __len__(self): + return len(self.cache) + + def __delitem__(self, key): + self.cache.pop(key) + + def __del__(self): + self.clear() + + def lru_cache_concurrent(cache_size: typing.Optional[int] = None, - override_lru_cache: typing.Optional[LRUCache] = None): + override_lru_cache: typing.Optional[LRUCacheWithMetrics] = None): if not cache_size and override_lru_cache is None: raise ValueError("invalid cache size") concurrent_cache = {} - lru_cache = override_lru_cache if override_lru_cache is not None else LRUCache(cache_size) + lru_cache = override_lru_cache if override_lru_cache is not None else LRUCacheWithMetrics(cache_size) def wrapper(async_fn): diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 3d6824681..c29c835c9 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -14,7 +14,7 @@ from lbry.schema.result import Outputs, INVALID, NOT_FOUND from lbry.schema.url import URL from lbry.crypto.hash import hash160, double_sha256, sha256 from lbry.crypto.base58 import Base58 -from lbry.utils import LRUCache +from lbry.utils import LRUCacheWithMetrics from .tasks import TaskGroup from .database import Database @@ -155,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry): self._on_ready_controller = StreamController() self.on_ready = self._on_ready_controller.stream - self._tx_cache = LRUCache(self.config.get("tx_cache_size", 1024), metric_name='tx') + self._tx_cache = LRUCacheWithMetrics(self.config.get("tx_cache_size", 1024), metric_name='tx') self._update_tasks = TaskGroup() self._other_tasks = TaskGroup() # that we dont need to start self._utxo_reservation_lock = asyncio.Lock() @@ -167,7 +167,7 @@ class Ledger(metaclass=LedgerRegistry): self._known_addresses_out_of_sync = set() self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char) - self._balance_cache = LRUCache(2 ** 15) + self._balance_cache = LRUCacheWithMetrics(2 ** 15) @classmethod def get_id(cls): diff --git a/lbry/wallet/server/daemon.py b/lbry/wallet/server/daemon.py index c6f4db3d2..abcfdf71a 100644 --- a/lbry/wallet/server/daemon.py +++ b/lbry/wallet/server/daemon.py @@ -6,7 +6,7 @@ from functools import wraps import aiohttp from prometheus_client import Gauge, Histogram -from lbry.utils import LRUCache +from lbry.utils import LRUCacheWithMetrics from lbry.wallet.rpc.jsonrpc import RPCError from lbry.wallet.server.util import hex_to_bytes, class_logger from lbry.wallet.rpc import JSONRPC @@ -54,8 +54,8 @@ class Daemon: self._height = None self.available_rpcs = {} self.connector = aiohttp.TCPConnector() - self._block_hash_cache = LRUCache(100000) - self._block_cache = LRUCache(2**16, metric_name='block', namespace=NAMESPACE) + self._block_hash_cache = LRUCacheWithMetrics(100000) + self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE) async def close(self): if self.connector: diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 8f9cff42e..109ae9a8c 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -24,7 +24,7 @@ from glob import glob from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor import attr -from lbry.utils import LRUCache +from lbry.utils import LRUCacheWithMetrics from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache @@ -93,7 +93,7 @@ class LevelDB: self.headers_db = None self.tx_db = None - self._tx_and_merkle_cache = LRUCache(2**17, metric_name='tx_and_merkle', namespace="wallet_server") + self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") self.total_transactions = None async def _read_tx_counts(self): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 0fee8484a..37c06e19d 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -21,7 +21,7 @@ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from prometheus_client import Counter, Info, Histogram, Gauge import lbry -from lbry.utils import LRUCache +from lbry.utils import LRUCacheWithMetrics from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.db.writer import LBRYLevelDB @@ -810,8 +810,8 @@ class LBRYSessionManager(SessionManager): if self.env.websocket_host is not None and self.env.websocket_port is not None: self.websocket = AdminWebSocket(self) self.search_cache = self.bp.search_cache - self.search_cache['search'] = LRUCache(2**14, metric_name='search', namespace=NAMESPACE) - self.search_cache['resolve'] = LRUCache(2**16, metric_name='resolve', namespace=NAMESPACE) + self.search_cache['search'] = LRUCacheWithMetrics(2 ** 14, metric_name='search', namespace=NAMESPACE) + self.search_cache['resolve'] = LRUCacheWithMetrics(2 ** 16, metric_name='resolve', namespace=NAMESPACE) async def process_metrics(self): while self.running: