From 1992b83faf500f6ae0696c3177701ccaf4503fd7 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 8 Nov 2021 14:18:22 -0500 Subject: [PATCH] use rocksdb instead of leveldb -dont use block processor directly from session manager --- docker/Dockerfile.wallet_server | 1 - lbry/wallet/server/db/db.py | 123 +++++++++++++++++++++++++++--- lbry/wallet/server/db/prefixes.py | 38 +-------- lbry/wallet/server/server.py | 5 +- lbry/wallet/server/session.py | 45 ++++++----- setup.py | 10 ++- 6 files changed, 151 insertions(+), 71 deletions(-) diff --git a/docker/Dockerfile.wallet_server b/docker/Dockerfile.wallet_server index 185a184a8..b928844c1 100644 --- a/docker/Dockerfile.wallet_server +++ b/docker/Dockerfile.wallet_server @@ -15,7 +15,6 @@ RUN apt-get update && \ build-essential \ automake libtool \ pkg-config \ - libleveldb-dev \ python3.7 \ python3-dev \ python3-pip \ diff --git a/lbry/wallet/server/db/db.py b/lbry/wallet/server/db/db.py index 6d613df93..1545ee05f 100644 --- a/lbry/wallet/server/db/db.py +++ b/lbry/wallet/server/db/db.py @@ -1,32 +1,130 @@ import struct +import rocksdb from typing import Optional from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete -class KeyValueStorage: +class RocksDBStore: + def __init__(self, path: str, cache_mb: int, max_open_files: int, secondary_path: str = ''): + # Use snappy compression (the default) + self.path = path + self._max_open_files = max_open_files + self.db = rocksdb.DB(path, self.get_options(), secondary_name=secondary_path) + # self.multi_get = self.db.multi_get + + def get_options(self): + return rocksdb.Options( + create_if_missing=True, use_fsync=True, target_file_size_base=33554432, + max_open_files=self._max_open_files + ) + def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: - raise NotImplemented() + return self.db.get(key, fill_cache=fill_cache) def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, include_key=True, include_value=True, fill_cache=True): - raise NotImplemented() + return RocksDBIterator( + self.db, reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop, + prefix=prefix, include_key=include_key, include_value=include_value + ) - def write_batch(self, transaction: bool = False): - raise NotImplemented() + def write_batch(self, disable_wal: bool = False, sync: bool = False): + return RocksDBWriteBatch(self.db, sync=sync, disable_wal=disable_wal) def close(self): - raise NotImplemented() + self.db.close() + self.db = None @property def closed(self) -> bool: - raise NotImplemented() + return self.db is None + + def try_catch_up_with_primary(self): + self.db.try_catch_up_with_primary() + + +class RocksDBWriteBatch: + def __init__(self, db: rocksdb.DB, sync: bool = False, disable_wal: bool = False): + self.batch = rocksdb.WriteBatch() + self.db = db + self.sync = sync + self.disable_wal = disable_wal + + def __enter__(self): + return self.batch + + def __exit__(self, exc_type, exc_val, exc_tb): + if not exc_val: + self.db.write(self.batch, sync=self.sync, disable_wal=self.disable_wal) + + +class RocksDBIterator: + """An iterator for RocksDB.""" + + __slots__ = [ + 'start', + 'prefix', + 'stop', + 'iterator', + 'include_key', + 'include_value', + 'prev_k', + 'reverse', + 'include_start', + 'include_stop' + ] + + def __init__(self, db: rocksdb.DB, prefix: bytes = None, start: bool = None, stop: bytes = None, + include_key: bool = True, include_value: bool = True, reverse: bool = False, + include_start: bool = True, include_stop: bool = False): + assert (start is None and stop is None) or (prefix is None), 'cannot use start/stop and prefix' + self.start = start + self.prefix = prefix + self.stop = stop + self.iterator = db.iteritems() if not reverse else reversed(db.iteritems()) + if prefix is not None: + self.iterator.seek(prefix) + elif start is not None: + self.iterator.seek(start) + self.include_key = include_key + self.include_value = include_value + self.prev_k = None + self.reverse = reverse + self.include_start = include_start + self.include_stop = include_stop + + def __iter__(self): + return self + + def _check_stop_iteration(self, key: bytes): + if self.stop is not None and (key.startswith(self.stop) or self.stop < key[:len(self.stop)]): + raise StopIteration + elif self.start is not None and self.start > key[:len(self.start)]: + raise StopIteration + elif self.prefix is not None and not key.startswith(self.prefix): + raise StopIteration + + def __next__(self): + # TODO: include start/stop on/off + # check for needing to stop from previous iteration + if self.prev_k is not None: + self._check_stop_iteration(self.prev_k) + k, v = next(self.iterator) + self._check_stop_iteration(k) + self.prev_k = k + + if self.include_key and self.include_value: + return k, v + elif self.include_key: + return k + return v class PrefixDB: UNDO_KEY_STRUCT = struct.Struct(b'>Q') - def __init__(self, db: KeyValueStorage, max_undo_depth: int = 200, unsafe_prefixes=None): + def __init__(self, db: RocksDBStore, max_undo_depth: int = 200, unsafe_prefixes=None): self._db = db self._op_stack = RevertableOpStack(db.get, unsafe_prefixes=unsafe_prefixes) self._max_undo_depth = max_undo_depth @@ -37,7 +135,7 @@ class PrefixDB: Changes written cannot be undone """ try: - with self._db.write_batch(transaction=True) as batch: + with self._db.write_batch(sync=True) as batch: batch_put = batch.put batch_delete = batch.delete for staged_change in self._op_stack: @@ -61,7 +159,7 @@ class PrefixDB: include_value=False )) try: - with self._db.write_batch(transaction=True) as batch: + with self._db.write_batch(sync=True) as batch: batch_put = batch.put batch_delete = batch.delete for staged_change in self._op_stack: @@ -82,7 +180,7 @@ class PrefixDB: undo_key = DB_PREFIXES.undo.value + self.UNDO_KEY_STRUCT.pack(height) self._op_stack.apply_packed_undo_ops(self._db.get(undo_key)) try: - with self._db.write_batch(transaction=True) as batch: + with self._db.write_batch(sync=True) as batch: batch_put = batch.put batch_delete = batch.delete for staged_change in self._op_stack: @@ -108,6 +206,9 @@ class PrefixDB: if not self._db.closed: self._db.close() + def try_catch_up_with_primary(self): + self._db.try_catch_up_with_primary() + @property def closed(self): return self._db.closed diff --git a/lbry/wallet/server/db/prefixes.py b/lbry/wallet/server/db/prefixes.py index 4dbfe707e..e51313097 100644 --- a/lbry/wallet/server/db/prefixes.py +++ b/lbry/wallet/server/db/prefixes.py @@ -4,7 +4,7 @@ import array import base64 from typing import Union, Tuple, NamedTuple, Optional from lbry.wallet.server.db import DB_PREFIXES -from lbry.wallet.server.db.db import KeyValueStorage, PrefixDB +from lbry.wallet.server.db.db import RocksDBStore, PrefixDB from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.schema.url import normalize_name @@ -38,7 +38,7 @@ class PrefixRow(metaclass=PrefixRowType): value_struct: struct.Struct key_part_lambdas = [] - def __init__(self, db: KeyValueStorage, op_stack: RevertableOpStack): + def __init__(self, db: RocksDBStore, op_stack: RevertableOpStack): self._db = db self._op_stack = op_stack @@ -1595,40 +1595,10 @@ class BlockTxsPrefixRow(PrefixRow): return cls.pack_key(height), cls.pack_value(tx_hashes) -class LevelDBStore(KeyValueStorage): - def __init__(self, path: str, cache_mb: int, max_open_files: int): - import plyvel - self.db = plyvel.DB( - path, create_if_missing=True, max_open_files=max_open_files, - lru_cache_size=cache_mb * 1024 * 1024, write_buffer_size=64 * 1024 * 1024, - max_file_size=1024 * 1024 * 64, bloom_filter_bits=32 - ) - - def get(self, key: bytes, fill_cache: bool = True) -> Optional[bytes]: - return self.db.get(key, fill_cache=fill_cache) - - def iterator(self, reverse=False, start=None, stop=None, include_start=True, include_stop=False, prefix=None, - include_key=True, include_value=True, fill_cache=True): - return self.db.iterator( - reverse=reverse, start=start, stop=stop, include_start=include_start, include_stop=include_stop, - prefix=prefix, include_key=include_key, include_value=include_value, fill_cache=fill_cache - ) - - def write_batch(self, transaction: bool = False, sync: bool = False): - return self.db.write_batch(transaction=transaction, sync=sync) - - def close(self): - return self.db.close() - - @property - def closed(self) -> bool: - return self.db.closed - - class HubDB(PrefixDB): def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, - unsafe_prefixes: Optional[typing.Set[bytes]] = None): - db = LevelDBStore(path, cache_mb, max_open_files) + secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): + db = RocksDBStore(path, cache_mb, max_open_files, secondary_path=secondary_path) super().__init__(db, reorg_limit, unsafe_prefixes=unsafe_prefixes) self.claim_to_support = ClaimToSupportPrefixRow(db, self._op_stack) self.support_to_claim = SupportToClaimPrefixRow(db, self._op_stack) diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index 966d5c31e..e53297541 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -26,7 +26,10 @@ class Server: self.prometheus_server: typing.Optional[PrometheusServer] = None self.session_mgr = LBRYSessionManager( - env, db, bp, daemon, self.shutdown_event + env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon, + self.shutdown_event, + on_available_callback=bp.status_server.set_available, + on_unavailable_callback=bp.status_server.set_unavailable ) self._indexer_task = None diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 6218b3992..4ff192dc5 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -170,13 +170,16 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'Env', db: LevelDB, bp: BlockProcessor, daemon: 'Daemon', shutdown_event: asyncio.Event): + def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache, + daemon: 'Daemon', shutdown_event: asyncio.Event, + on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) self.env = env self.db = db - self.bp = bp + self.on_available_callback = on_available_callback + self.on_unavailable_callback = on_unavailable_callback self.daemon = daemon - self.mempool = bp.mempool + self.mempool = mempool self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers: typing.Dict[str, asyncio.AbstractServer] = {} @@ -186,7 +189,9 @@ class SessionManager: self.cur_group = SessionGroup(0) self.txs_sent = 0 self.start_time = time.time() - self.history_cache = self.bp.history_cache + self.history_cache = history_cache + self.resolve_cache = resolve_cache + self.resolve_outputs_cache = resolve_outputs_cache self.notified_height: typing.Optional[int] = None # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 @@ -243,7 +248,7 @@ class SessionManager: await self.session_event.wait() self.session_event.clear() if not paused and len(self.sessions) >= max_sessions: - self.bp.status_server.set_unavailable() + self.on_unavailable_callback() self.logger.info(f'maximum sessions {max_sessions:,d} ' f'reached, stopping new connections until ' f'count drops to {low_watermark:,d}') @@ -252,7 +257,7 @@ class SessionManager: # Start listening for incoming connections if paused and # session count has fallen if paused and len(self.sessions) <= low_watermark: - self.bp.status_server.set_available() + self.on_available_callback() self.logger.info('resuming listening for incoming connections') await self._start_external_servers() paused = False @@ -533,7 +538,7 @@ class SessionManager: await self.start_other() await self._start_external_servers() server_listening_event.set() - self.bp.status_server.set_available() + self.on_available_callback() # Peer discovery should start after the external servers # because we connect to ourself await asyncio.wait([ @@ -628,8 +633,9 @@ class SessionManager: for hashX in touched.intersection(self.mempool_statuses.keys()): self.mempool_statuses.pop(hashX, None) + # self.bp._chain_executor await asyncio.get_event_loop().run_in_executor( - self.bp._chain_executor, touched.intersection_update, self.hashx_subscriptions_by_session.keys() + None, touched.intersection_update, self.hashx_subscriptions_by_session.keys() ) if touched or new_touched or (height_changed and self.mempool_statuses): @@ -866,8 +872,7 @@ class LBRYElectrumX(SessionBase): self.protocol_tuple = self.PROTOCOL_MIN self.protocol_string = None self.daemon = self.session_mgr.daemon - self.bp: BlockProcessor = self.session_mgr.bp - self.db: LevelDB = self.bp.db + self.db: LevelDB = self.session_mgr.db @classmethod def protocol_min_max_strings(cls): @@ -1008,21 +1013,21 @@ class LBRYElectrumX(SessionBase): self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) async def _cached_resolve_url(self, url): - if url not in self.bp.resolve_cache: - self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) - return self.bp.resolve_cache[url] + if url not in self.session_mgr.resolve_cache: + self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) + return self.session_mgr.resolve_cache[url] async def claimtrie_resolve(self, *urls) -> str: sorted_urls = tuple(sorted(urls)) self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) try: - if sorted_urls in self.bp.resolve_outputs_cache: - return self.bp.resolve_outputs_cache[sorted_urls] + if sorted_urls in self.session_mgr.resolve_outputs_cache: + return self.session_mgr.resolve_outputs_cache[sorted_urls] rows, extra = [], [] for url in urls: - if url not in self.bp.resolve_cache: - self.bp.resolve_cache[url] = await self._cached_resolve_url(url) - stream, channel, repost, reposted_channel = self.bp.resolve_cache[url] + if url not in self.session_mgr.resolve_cache: + self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url) + stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url] if isinstance(channel, ResolveCensoredError): rows.append(channel) extra.append(channel.censor_row) @@ -1047,7 +1052,7 @@ class LBRYElectrumX(SessionBase): if reposted_channel: extra.append(reposted_channel) await asyncio.sleep(0) - self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( + self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( None, Outputs.to_base64, rows, extra, 0, None, None ) return result @@ -1055,7 +1060,7 @@ class LBRYElectrumX(SessionBase): self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) async def get_server_height(self): - return self.bp.height + return self.db.db_height async def transaction_get_height(self, tx_hash): self.assert_tx_hash(tx_hash) diff --git a/setup.py b/setup.py index 56832e8eb..91e661e9a 100644 --- a/setup.py +++ b/setup.py @@ -7,9 +7,11 @@ BASE = os.path.dirname(__file__) with open(os.path.join(BASE, 'README.md'), encoding='utf-8') as fh: long_description = fh.read() -PLYVEL = [] -if sys.platform.startswith('linux'): - PLYVEL.append('plyvel==1.3.0') + +ROCKSDB = [] +if sys.platform.startswith('linux') or sys.platform.startswith('darwin'): + ROCKSDB.append('lbry-rocksdb==0.8.1') + setup( name=__name__, @@ -57,7 +59,7 @@ setup( 'pylru==1.1.0', 'elasticsearch==7.10.1', 'grpcio==1.38.0' - ] + PLYVEL, + ] + ROCKSDB, extras_require={ 'torrent': ['lbry-libtorrent'], 'lint': ['pylint==2.10.0'],