diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index f286840be..8daf91d1f 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -1,4 +1,5 @@ import enum +from typing import NamedTuple, Optional @enum.unique @@ -43,3 +44,6 @@ class DB_PREFIXES(enum.Enum): trending_notifications = b'c' mempool_tx = b'd' touched_hashX = b'e' + + +COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass diff --git a/lbry/wallet/server/db/db.py b/lbry/wallet/server/db/db.py index abe8b6388..b6ed270f5 100644 --- a/lbry/wallet/server/db/db.py +++ b/lbry/wallet/server/db/db.py @@ -44,7 +44,7 @@ class HubDB: def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, - secondary_name: str = '', max_open_files: int = 256, blocking_channel_ids: List[str] = None, + secondary_name: str = '', max_open_files: int = 64, blocking_channel_ids: List[str] = None, filtering_channel_ids: List[str] = None, executor: ThreadPoolExecutor = None): self.logger = util.class_logger(__name__, self.__class__.__name__) self.coin = coin diff --git a/lbry/wallet/server/db/interface.py b/lbry/wallet/server/db/interface.py index b926af4e8..53fec1639 100644 --- a/lbry/wallet/server/db/interface.py +++ b/lbry/wallet/server/db/interface.py @@ -3,7 +3,7 @@ import typing import rocksdb from typing import Optional -from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db import DB_PREFIXES, COLUMN_SETTINGS from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete @@ -15,20 +15,25 @@ class BasePrefixDB: PARTIAL_UNDO_KEY_STRUCT = struct.Struct(b'>Q') def __init__(self, path, max_open_files=64, secondary_path='', max_undo_depth: int = 200, unsafe_prefixes=None): - column_family_options = { - prefix.value: rocksdb.ColumnFamilyOptions() for prefix in DB_PREFIXES - } + column_family_options = {} + for prefix in DB_PREFIXES: + settings = COLUMN_SETTINGS[prefix.value] + column_family_options[prefix.value] = rocksdb.ColumnFamilyOptions() + column_family_options[prefix.value].table_factory = rocksdb.BlockBasedTableFactory( + block_cache=rocksdb.LRUCache(settings['cache_size']), + ) self.column_families: typing.Dict[bytes, 'rocksdb.ColumnFamilyHandle'] = {} + options = rocksdb.Options( + create_if_missing=True, use_fsync=False, target_file_size_base=33554432, + max_open_files=max_open_files if not secondary_path else -1, create_missing_column_families=True + ) self._db = rocksdb.DB( - path, rocksdb.Options( - create_if_missing=True, use_fsync=False, target_file_size_base=33554432, - max_open_files=max_open_files if not secondary_path else -1, create_missing_column_families=True - ), secondary_name=secondary_path, column_families=column_family_options + path, options, secondary_name=secondary_path, column_families=column_family_options ) for prefix in DB_PREFIXES: cf = self._db.get_column_family(prefix.value) if cf is None and not secondary_path: - self._db.create_column_family(prefix.value, rocksdb.ColumnFamilyOptions()) + self._db.create_column_family(prefix.value, column_family_options[prefix.value]) cf = self._db.get_column_family(prefix.value) self.column_families[prefix.value] = cf @@ -116,7 +121,7 @@ class BasePrefixDB: def iterator(self, start: bytes, column_family: 'rocksdb.ColumnFamilyHandle' = None, iterate_lower_bound: bytes = None, iterate_upper_bound: bytes = None, reverse: bool = False, include_key: bool = True, include_value: bool = True, - fill_cache: bool = True, prefix_same_as_start: bool = False, auto_prefix_mode: bool = False): + fill_cache: bool = True, prefix_same_as_start: bool = False, auto_prefix_mode: bool = True): return self._db.iterator( start=start, column_family=column_family, iterate_lower_bound=iterate_lower_bound, iterate_upper_bound=iterate_upper_bound, reverse=reverse, include_key=include_key, diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index b88d31551..3b091fa77 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -2,14 +2,13 @@ import typing import struct import array import base64 +import rocksdb +import rocksdb.interfaces from typing import Union, Tuple, NamedTuple, Optional -from lbry.wallet.server.db import DB_PREFIXES +from lbry.wallet.server.db import DB_PREFIXES, COLUMN_SETTINGS from lbry.wallet.server.db.interface import BasePrefixDB -from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.schema.url import normalize_name -if typing.TYPE_CHECKING: - import rocksdb ACTIVATED_CLAIM_TXO_TYPE = 1 ACTIVATED_SUPPORT_TXO_TYPE = 2 @@ -32,6 +31,10 @@ class PrefixRowType(type): klass = super().__new__(cls, name, bases, kwargs) if name != "PrefixRow": ROW_TYPES[klass.prefix] = klass + cache_size = klass.cache_size + COLUMN_SETTINGS[klass.prefix] = { + 'cache_size': cache_size, + } return klass @@ -40,6 +43,7 @@ class PrefixRow(metaclass=PrefixRowType): key_struct: struct.Struct value_struct: struct.Struct key_part_lambdas = [] + cache_size: int = 1024 * 1024 * 64 def __init__(self, db: 'rocksdb.DB', op_stack: RevertableOpStack): self._db = db @@ -568,6 +572,7 @@ class ActiveAmountPrefixRow(PrefixRow): struct.Struct(b'>20sBLL').pack, struct.Struct(b'>20sBLLH').pack ] + cache_size = 1024 * 1024 * 128 @classmethod def pack_key(cls, claim_hash: bytes, txo_type: int, activation_height: int, tx_num: int, position: int): @@ -598,6 +603,7 @@ class ClaimToTXOPrefixRow(PrefixRow): lambda: b'', struct.Struct(b'>20s').pack ] + cache_size = 1024 * 1024 * 128 @classmethod def pack_key(cls, claim_hash: bytes): @@ -637,6 +643,7 @@ class TXOToClaimPrefixRow(PrefixRow): prefix = DB_PREFIXES.txo_to_claim.value key_struct = struct.Struct(b'>LH') value_struct = struct.Struct(b'>20s') + cache_size = 1024 * 1024 * 128 @classmethod def pack_key(cls, tx_num: int, position: int): @@ -1031,6 +1038,7 @@ class EffectiveAmountPrefixRow(PrefixRow): shortid_key_helper(b'>QL'), shortid_key_helper(b'>QLH'), ] + cache_size = 1024 * 1024 * 128 @classmethod def pack_key(cls, name: str, effective_amount: int, tx_num: int, position: int): @@ -1061,6 +1069,7 @@ class EffectiveAmountPrefixRow(PrefixRow): class RepostPrefixRow(PrefixRow): prefix = DB_PREFIXES.repost.value + key_struct = struct.Struct(b'>20s') key_part_lambdas = [ lambda: b'', @@ -1069,13 +1078,11 @@ class RepostPrefixRow(PrefixRow): @classmethod def pack_key(cls, claim_hash: bytes): - return cls.prefix + claim_hash + return super().pack_key(claim_hash) @classmethod def unpack_key(cls, key: bytes) -> RepostKey: - assert key[:1] == cls.prefix - assert len(key) == 21 - return RepostKey(key[1:]) + return RepostKey(*super().unpack_key(key)) @classmethod def pack_value(cls, reposted_claim_hash: bytes) -> bytes: @@ -1746,7 +1753,7 @@ class TouchedHashXPrefixRow(PrefixRow): class PrefixDB(BasePrefixDB): - def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, + def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): super().__init__(path, max_open_files=max_open_files, secondary_path=secondary_path, max_undo_depth=reorg_limit, unsafe_prefixes=unsafe_prefixes) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index 74f88b938..00f5a8989 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -36,7 +36,7 @@ class Env: allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, - database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None, + database_query_timeout=None, db_max_open_files=64, elastic_notifier_port=None, blocking_channel_ids=None, filtering_channel_ids=None, peer_hubs=None, peer_announce=None): self.logger = class_logger(__name__, self.__class__.__name__) @@ -303,8 +303,8 @@ class Env: parser.add_argument('--daemon_url', help='URL for rpc from lbrycrd, :@', default=cls.default('DAEMON_URL', None)) - parser.add_argument('--db_max_open_files', type=int, default=512, - help='number of files leveldb can have open at a time') + parser.add_argument('--db_max_open_files', type=int, default=64, + help='number of files rocksdb can have open at a time') parser.add_argument('--host', type=str, default=cls.default('HOST', 'localhost'), help='Interface for hub server to listen on') parser.add_argument('--tcp_port', type=int, default=cls.integer('TCP_PORT', 50001),