From 7143b475a1676350e1b7e66df0871579f1f43087 Mon Sep 17 00:00:00 2001 From: Jack Robison <jackrobison@lbry.io> Date: Sun, 29 Nov 2020 13:41:24 -0500 Subject: [PATCH] RocksReaderContext --- lbry/wallet/server/leveldb.py | 119 +++++++++++++++++++++++++++++++++- 1 file changed, 117 insertions(+), 2 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index f06517edb..5fc1885f1 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -23,16 +23,20 @@ from bisect import bisect_right from collections import namedtuple from glob import glob from struct import pack, unpack +from contextvars import ContextVar +from dataclasses import dataclass +from concurrent.futures.process import ProcessPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor import attr from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache -from lbry.wallet.server.util import formatted_time +from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from from lbry.wallet.server.storage import db_class from lbry.wallet.server.history import History - +if typing.TYPE_CHECKING: + from lbry.wallet.server.storage import RocksDB UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") HEADER_PREFIX = b'H' @@ -41,6 +45,117 @@ TX_HASH_PREFIX = b'X' TX_PREFIX = b'B' TX_NUM_PREFIX = b'N' BLOCK_HASH_PREFIX = b'C' +HISTORY_PREFIX = b'A' +HASHX_UTXO_PREFIX = b'h' +UTXO_PREFIX = b'u' +HASHX_HISTORY_PREFIX = b'x' +STATE_PREFIX = b'state' + + +class RocksDBState(typing.NamedTuple): + db_version: int + genesis_hash: str + height: int + tx_count: int + tip: bytes + utxo_flush_count: int + wall_time: int + first_sync: bool + + flush_count: int + comp_flush_count: int + comp_cursor: int + + +@dataclass +class RocksReaderContext: + db: 'RocksDB' + name: str + merkle: Merkle + tx_counts: List[int] + state: RocksDBState + block_txs_cache: pylru.lrucache + merkle_tx_cache: pylru.lrucache + + def close(self): + self.db.close() + + def reopen(self): + self.db.close() + self.db.open(self.name, create=False, read_only=True) + + def update_state(self): + self.tx_counts = [ + unpack_be_uint64(tx_count) + for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) + ] + + flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'flush_count', b'\xff\xff\xff\xff')) + comp_flush_count, = unpack_le_int32_from( + self.db.get(STATE_PREFIX + b'comp_flush_count', b'\xff\xff\xff\xff') + ) + comp_cursor, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'comp_cursor', b'\xff\xff\xff\xff')) + db_version, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'db_version', b'\xff\xff\xff\xff')) + genesis = self.db.get(STATE_PREFIX + b'genesis') + tip = self.db.get(STATE_PREFIX + b'tip') + height, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'height', b'\xff\xff\xff\xff')) + tx_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'tx_count', b'\xff\xff\xff\xff')) + utxo_flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'utxo_flush_count', b'\xff\xff\xff\xff')) + wall_time, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'wall_time', b'\xff\xff\xff\xff')) + first_sync, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'first_sync', b'\xff\xff\xff\xff')) + + self.state = RocksDBState( + db_version, genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync == -1, + flush_count, comp_flush_count, comp_cursor + ) + + +proc_ctx: ContextVar[Optional[RocksReaderContext]] = ContextVar('proc_ctx') + + +def _update_rocksdb_ctx(): + ctx = proc_ctx.get() + ctx.update_state() + + +async def update_rocksdb_ctx(executor: ProcessPoolExecutor): + loop = asyncio.get_event_loop() + await asyncio.wait([loop.run_in_executor(executor, _update_rocksdb_ctx) for _ in range(executor._max_workers)]) + + + + +def _initializer(path, name): + db = RocksDB(path, name, for_sync=False, read_only=True) + state = RocksDBState(-1, '', -1, -1, b'', -1, -1, True, -1, -1, -1) + proc_ctx.set(RocksReaderContext(db, name, Merkle(), [], state, pylru.lrucache(50000), pylru.lrucache(100000))) + + +def _teardown(): + proc_ctx.get().close() + proc_ctx.set(None) + + +async def initialize_executor(workers, db_dir='/media/jack/evo970/spv_data', for_sync=False, name='lbryrocks'): + executor = ProcessPoolExecutor(workers, initializer=_initializer, initargs=(db_dir, name)) + try: + writer = RocksDB(db_dir, name, for_sync=for_sync, read_only=False) + await update_rocksdb_ctx(executor) + except Exception as err: + await teardown_executor(executor) + executor.shutdown(True) + raise err + return executor, writer + + +async def teardown_executor(executor: ProcessPoolExecutor): + try: + await asyncio.wait( + [asyncio.get_event_loop().run_in_executor(executor, _teardown) for _ in range(executor._max_workers)] + ) + finally: + executor.shutdown(True) +