add cache_size attribute to prefix classes to set the rocksdb lru cache size

-updates rocksdb column families to use custom sized `block_cache` (an lru cache) in a `BlockBasedTableFactory`
-lowers the default max open files to 64
This commit is contained in:
Jack Robison 2022-02-11 15:56:28 -05:00
parent e6c275f86e
commit 0d9d576436
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 39 additions and 23 deletions

View file

@ -1,4 +1,5 @@
import enum import enum
from typing import NamedTuple, Optional
@enum.unique @enum.unique
@ -43,3 +44,6 @@ class DB_PREFIXES(enum.Enum):
trending_notifications = b'c' trending_notifications = b'c'
mempool_tx = b'd' mempool_tx = b'd'
touched_hashX = b'e' touched_hashX = b'e'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -44,7 +44,7 @@ class HubDB:
def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
secondary_name: str = '', max_open_files: int = 256, blocking_channel_ids: List[str] = None, secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None,
filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None): filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None):
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.coin = coin self.coin = coin

View file

@ -3,7 +3,7 @@ import typing
import rocksdb import rocksdb
from typing import Optional from typing import Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES, COLUMN_SETTINGS
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
@ -15,20 +15,25 @@ class BasePrefixDB:
PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q') PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q')
def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None): def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None):
column_family_options = { column_family_options = {}
prefix.value: rocksdb.ColumnFamilyOptions() for prefix in DB_PREFIXES for prefix in DB_PREFIXES:
} settings = COLUMN_SETTINGS[prefix.value]
column_family_options[prefix.value] = rocksdb.ColumnFamilyOptions()
column_family_options[prefix.value].table_factory = rocksdb.BlockBasedTableFactory(
block_cache=rocksdb.LRUCache(settings['cache_size']),
)
self.column_families: typing.Dict[bytes, 'rocksdb.ColumnFamilyHandle'] = {} self.column_families: typing.Dict[bytes, 'rocksdb.ColumnFamilyHandle'] = {}
options = rocksdb.Options(
create_if_missing=True, use_fsync=False, target_file_size_base=33554432,
max_open_files=max_open_files if not secondary_path else -1, create_missing_column_families=True
)
self._db = rocksdb.DB( self._db = rocksdb.DB(
path, rocksdb.Options( path, options, secondary_name=secondary_path, column_families=column_family_options
create_if_missing=True, use_fsync=False, target_file_size_base=33554432,
max_open_files=max_open_files if not secondary_path else -1, create_missing_column_families=True
), secondary_name=secondary_path, column_families=column_family_options
) )
for prefix in DB_PREFIXES: for prefix in DB_PREFIXES:
cf = self._db.get_column_family(prefix.value) cf = self._db.get_column_family(prefix.value)
if cf is None and not secondary_path: if cf is None and not secondary_path:
self._db.create_column_family(prefix.value, rocksdb.ColumnFamilyOptions()) self._db.create_column_family(prefix.value, column_family_options[prefix.value])
cf = self._db.get_column_family(prefix.value) cf = self._db.get_column_family(prefix.value)
self.column_families[prefix.value] = cf self.column_families[prefix.value] = cf
@ -116,7 +121,7 @@ class BasePrefixDB:
def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None, def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None,
iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None, iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None,
reverse: bool = False, include_key: bool = True, include_value: bool = True, reverse: bool = False, include_key: bool = True, include_value: bool = True,
fill_cache: bool = True, prefix_same_as_start: bool = False, auto_prefix_mode: bool = False): fill_cache: bool = True, prefix_same_as_start: bool = False, auto_prefix_mode: bool = True):
return self._db.iterator( return self._db.iterator(
start=start, column_family=column_family, iterate_lower_bound=iterate_lower_bound, start=start, column_family=column_family, iterate_lower_bound=iterate_lower_bound,
iterate_upper_bound=iterate_upper_bound, reverse=reverse, include_key=include_key, iterate_upper_bound=iterate_upper_bound, reverse=reverse, include_key=include_key,

View file

@ -2,14 +2,13 @@ import typing
import struct import struct
import array import array
import base64 import base64
import rocksdb
import rocksdb.interfaces
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES, COLUMN_SETTINGS
from lbry.wallet.server.db.interface import BasePrefixDB from lbry.wallet.server.db.interface import BasePrefixDB
from lbry.wallet.server.db.common import TrendingNotification
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name from lbry.schema.url import normalize_name
if typing.TYPE_CHECKING:
import rocksdb
ACTIVATED_CLAIM_TXO_TYPE = 1 ACTIVATED_CLAIM_TXO_TYPE = 1
ACTIVATED_SUPPORT_TXO_TYPE = 2 ACTIVATED_SUPPORT_TXO_TYPE = 2
@ -32,6 +31,10 @@ class PrefixRowType(type):
klass = super().__new__(cls, name, bases, kwargs) klass = super().__new__(cls, name, bases, kwargs)
if name != "PrefixRow": if name != "PrefixRow":
ROW_TYPES[klass.prefix] = klass ROW_TYPES[klass.prefix] = klass
cache_size = klass.cache_size
COLUMN_SETTINGS[klass.prefix] = {
'cache_size': cache_size,
}
return klass return klass
@ -40,6 +43,7 @@ class PrefixRow(metaclass=PrefixRowType):
key_struct: struct.Struct key_struct: struct.Struct
value_struct: struct.Struct value_struct: struct.Struct
key_part_lambdas = [] key_part_lambdas = []
cache_size: int = 1024 * 1024 * 64
def __init__(self, db: 'rocksdb.DB', op_stack: RevertableOpStack): def __init__(self, db: 'rocksdb.DB', op_stack: RevertableOpStack):
self._db = db self._db = db
@ -568,6 +572,7 @@ class ActiveAmountPrefixRow(PrefixRow):
struct.Struct(b'>20sBLL').pack, struct.Struct(b'>20sBLL').pack,
struct.Struct(b'>20sBLLH').pack struct.Struct(b'>20sBLLH').pack
] ]
cache_size = 1024 * 1024 * 128
@classmethod @classmethod
def pack_key(cls, claim_hash: bytes, txo_type: int, activation_height: int, tx_num: int, position: int): def pack_key(cls, claim_hash: bytes, txo_type: int, activation_height: int, tx_num: int, position: int):
@ -598,6 +603,7 @@ class ClaimToTXOPrefixRow(PrefixRow):
lambda: b'', lambda: b'',
struct.Struct(b'>20s').pack struct.Struct(b'>20s').pack
] ]
cache_size = 1024 * 1024 * 128
@classmethod @classmethod
def pack_key(cls, claim_hash: bytes): def pack_key(cls, claim_hash: bytes):
@ -637,6 +643,7 @@ class TXOToClaimPrefixRow(PrefixRow):
prefix = DB_PREFIXES.txo_to_claim.value prefix = DB_PREFIXES.txo_to_claim.value
key_struct = struct.Struct(b'>LH') key_struct = struct.Struct(b'>LH')
value_struct = struct.Struct(b'>20s') value_struct = struct.Struct(b'>20s')
cache_size = 1024 * 1024 * 128
@classmethod @classmethod
def pack_key(cls, tx_num: int, position: int): def pack_key(cls, tx_num: int, position: int):
@ -1031,6 +1038,7 @@ class EffectiveAmountPrefixRow(PrefixRow):
shortid_key_helper(b'>QL'), shortid_key_helper(b'>QL'),
shortid_key_helper(b'>QLH'), shortid_key_helper(b'>QLH'),
] ]
cache_size = 1024 * 1024 * 128
@classmethod @classmethod
def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int): def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int):
@ -1061,6 +1069,7 @@ class EffectiveAmountPrefixRow(PrefixRow):
class RepostPrefixRow(PrefixRow): class RepostPrefixRow(PrefixRow):
prefix = DB_PREFIXES.repost.value prefix = DB_PREFIXES.repost.value
key_struct = struct.Struct(b'>20s')
key_part_lambdas = [ key_part_lambdas = [
lambda: b'', lambda: b'',
@ -1069,13 +1078,11 @@ class RepostPrefixRow(PrefixRow):
@classmethod @classmethod
def pack_key(cls, claim_hash: bytes): def pack_key(cls, claim_hash: bytes):
return cls.prefix + claim_hash return super().pack_key(claim_hash)
@classmethod @classmethod
def unpack_key(cls, key: bytes) -> RepostKey: def unpack_key(cls, key: bytes) -> RepostKey:
assert key[:1] == cls.prefix return RepostKey(*super().unpack_key(key))
assert len(key) == 21
return RepostKey(key[1:])
@classmethod @classmethod
def pack_value(cls, reposted_claim_hash: bytes) -> bytes: def pack_value(cls, reposted_claim_hash: bytes) -> bytes:
@ -1746,7 +1753,7 @@ class TouchedHashXPrefixRow(PrefixRow):
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path, super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path,
max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes) max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes)

View file

@ -36,7 +36,7 @@ class Env:
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None, database_query_timeout=None, db_max_open_files=64, elastic_notifier_port=None,
blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None):
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
@ -303,8 +303,8 @@ class Env:
parser.add_argument('--daemon_url', parser.add_argument('--daemon_url',
help='URL for rpc from lbrycrd, <rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>', help='URL for rpc from lbrycrd, <rpcuser>:<rpcpassword>@<lbrycrd rpc ip><lbrycrd rpc port>',
default=cls.default('DAEMON_URL', None)) default=cls.default('DAEMON_URL', None))
parser.add_argument('--db_max_open_files', type=int, default=512, parser.add_argument('--db_max_open_files', type=int, default=64,
help='number of files leveldb can have open at a time') help='number of files rocksdb can have open at a time')
parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'),
help='Interface for hub server to listen on') help='Interface for hub server to listen on')
parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001), parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),