add LRUCache with no prometheus metrics

This commit is contained in:
Jack Robison 2021-01-16 16:25:46 -05:00
parent 36fd1b91ae
commit 68f1661452
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 67 additions and 16 deletions

View file

@ -2,7 +2,7 @@ import os
import typing
import asyncio
import logging
from lbry.utils import LRUCache
from lbry.utils import LRUCacheWithMetrics
from lbry.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
from lbry.stream.descriptor import StreamDescriptor
from lbry.connection_manager import ConnectionManager
@ -32,7 +32,7 @@ class BlobManager:
else self._node_data_store.completed_blobs
self.blobs: typing.Dict[str, AbstractBlob] = {}
self.config = config
self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCache(
self.decrypted_blob_lru_cache = None if not self.config.blob_lru_cache_size else LRUCacheWithMetrics(
self.config.blob_lru_cache_size)
self.connection_manager = ConnectionManager(loop)

View file

@ -206,7 +206,7 @@ async def resolve_host(url: str, port: int, proto: str) -> str:
))[0][4][0]
class LRUCache:
class LRUCacheWithMetrics:
__slots__ = [
'capacity',
'cache',
@ -286,12 +286,63 @@ class LRUCache:
pass
class LRUCache:
__slots__ = [
'capacity',
'cache'
]
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = collections.OrderedDict()
def get(self, key, default=None):
try:
value = self.cache.pop(key)
except KeyError:
return default
self.cache[key] = value
return value
def set(self, key, value):
try:
self.cache.pop(key)
except KeyError:
if len(self.cache) >= self.capacity:
self.cache.popitem(last=False)
self.cache[key] = value
def clear(self):
self.cache.clear()
def pop(self, key):
return self.cache.pop(key)
def __setitem__(self, key, value):
return self.set(key, value)
def __getitem__(self, item):
return self.get(item)
def __contains__(self, item) -> bool:
return item in self.cache
def __len__(self):
return len(self.cache)
def __delitem__(self, key):
self.cache.pop(key)
def __del__(self):
self.clear()
def lru_cache_concurrent(cache_size: typing.Optional[int] = None,
override_lru_cache: typing.Optional[LRUCache] = None):
override_lru_cache: typing.Optional[LRUCacheWithMetrics] = None):
if not cache_size and override_lru_cache is None:
raise ValueError("invalid cache size")
concurrent_cache = {}
lru_cache = override_lru_cache if override_lru_cache is not None else LRUCache(cache_size)
lru_cache = override_lru_cache if override_lru_cache is not None else LRUCacheWithMetrics(cache_size)
def wrapper(async_fn):

View file

@ -14,7 +14,7 @@ from lbry.schema.result import Outputs, INVALID, NOT_FOUND
from lbry.schema.url import URL
from lbry.crypto.hash import hash160, double_sha256, sha256
from lbry.crypto.base58 import Base58
from lbry.utils import LRUCache
from lbry.utils import LRUCacheWithMetrics
from .tasks import TaskGroup
from .database import Database
@ -155,7 +155,7 @@ class Ledger(metaclass=LedgerRegistry):
self._on_ready_controller = StreamController()
self.on_ready = self._on_ready_controller.stream
self._tx_cache = LRUCache(self.config.get("tx_cache_size", 1024), metric_name='tx')
self._tx_cache = LRUCacheWithMetrics(self.config.get("tx_cache_size", 1024), metric_name='tx')
self._update_tasks = TaskGroup()
self._other_tasks = TaskGroup() # that we dont need to start
self._utxo_reservation_lock = asyncio.Lock()
@ -167,7 +167,7 @@ class Ledger(metaclass=LedgerRegistry):
self._known_addresses_out_of_sync = set()
self.fee_per_name_char = self.config.get('fee_per_name_char', self.default_fee_per_name_char)
self._balance_cache = LRUCache(2 ** 15)
self._balance_cache = LRUCacheWithMetrics(2 ** 15)
@classmethod
def get_id(cls):

View file

@ -6,7 +6,7 @@ from functools import wraps
import aiohttp
from prometheus_client import Gauge, Histogram
from lbry.utils import LRUCache
from lbry.utils import LRUCacheWithMetrics
from lbry.wallet.rpc.jsonrpc import RPCError
from lbry.wallet.server.util import hex_to_bytes, class_logger
from lbry.wallet.rpc import JSONRPC
@ -54,8 +54,8 @@ class Daemon:
self._height = None
self.available_rpcs = {}
self.connector = aiohttp.TCPConnector()
self._block_hash_cache = LRUCache(100000)
self._block_cache = LRUCache(2**16, metric_name='block', namespace=NAMESPACE)
self._block_hash_cache = LRUCacheWithMetrics(100000)
self._block_cache = LRUCacheWithMetrics(2 ** 16, metric_name='block', namespace=NAMESPACE)
async def close(self):
if self.connector:

View file

@ -24,7 +24,7 @@ from glob import glob
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
import attr
from lbry.utils import LRUCache
from lbry.utils import LRUCacheWithMetrics
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache
@ -93,7 +93,7 @@ class LevelDB:
self.headers_db = None
self.tx_db = None
self._tx_and_merkle_cache = LRUCache(2**17, metric_name='tx_and_merkle', namespace="wallet_server")
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
self.total_transactions = None
async def _read_tx_counts(self):

View file

@ -21,7 +21,7 @@ from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from prometheus_client import Counter, Info, Histogram, Gauge
import lbry
from lbry.utils import LRUCache
from lbry.utils import LRUCacheWithMetrics
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db.writer import LBRYLevelDB
@ -810,8 +810,8 @@ class LBRYSessionManager(SessionManager):
if self.env.websocket_host is not None and self.env.websocket_port is not None:
self.websocket = AdminWebSocket(self)
self.search_cache = self.bp.search_cache
self.search_cache['search'] = LRUCache(2**14, metric_name='search', namespace=NAMESPACE)
self.search_cache['resolve'] = LRUCache(2**16, metric_name='resolve', namespace=NAMESPACE)
self.search_cache['search'] = LRUCacheWithMetrics(2 ** 14, metric_name='search', namespace=NAMESPACE)
self.search_cache['resolve'] = LRUCacheWithMetrics(2 ** 16, metric_name='resolve', namespace=NAMESPACE)
async def process_metrics(self):
while self.running: