RocksReaderContext

This commit is contained in:
Jack Robison 2020-11-29 13:41:24 -05:00
parent 976387fefb
commit 7143b475a1
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -23,16 +23,20 @@ from bisect import bisect_right
from collections import namedtuple
from glob import glob
from struct import pack, unpack
from contextvars import ContextVar
from dataclasses import dataclass
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
import attr
from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time
from lbry.wallet.server.util import formatted_time, unpack_be_uint64, unpack_le_int32_from
from lbry.wallet.server.storage import db_class
from lbry.wallet.server.history import History
if typing.TYPE_CHECKING:
from lbry.wallet.server.storage import RocksDB
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HEADER_PREFIX = b'H'
@ -41,6 +45,117 @@ TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C'
HISTORY_PREFIX = b'A'
HASHX_UTXO_PREFIX = b'h'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
STATE_PREFIX = b'state'
class RocksDBState(typing.NamedTuple):
db_version: int
genesis_hash: str
height: int
tx_count: int
tip: bytes
utxo_flush_count: int
wall_time: int
first_sync: bool
flush_count: int
comp_flush_count: int
comp_cursor: int
@dataclass
class RocksReaderContext:
db: 'RocksDB'
name: str
merkle: Merkle
tx_counts: List[int]
state: RocksDBState
block_txs_cache: pylru.lrucache
merkle_tx_cache: pylru.lrucache
def close(self):
self.db.close()
def reopen(self):
self.db.close()
self.db.open(self.name, create=False, read_only=True)
def update_state(self):
self.tx_counts = [
unpack_be_uint64(tx_count)
for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
]
flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'flush_count', b'\xff\xff\xff\xff'))
comp_flush_count, = unpack_le_int32_from(
self.db.get(STATE_PREFIX + b'comp_flush_count', b'\xff\xff\xff\xff')
)
comp_cursor, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'comp_cursor', b'\xff\xff\xff\xff'))
db_version, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'db_version', b'\xff\xff\xff\xff'))
genesis = self.db.get(STATE_PREFIX + b'genesis')
tip = self.db.get(STATE_PREFIX + b'tip')
height, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'height', b'\xff\xff\xff\xff'))
tx_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'tx_count', b'\xff\xff\xff\xff'))
utxo_flush_count, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'utxo_flush_count', b'\xff\xff\xff\xff'))
wall_time, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'wall_time', b'\xff\xff\xff\xff'))
first_sync, = unpack_le_int32_from(self.db.get(STATE_PREFIX + b'first_sync', b'\xff\xff\xff\xff'))
self.state = RocksDBState(
db_version, genesis, height, tx_count, tip, utxo_flush_count, wall_time, first_sync == -1,
flush_count, comp_flush_count, comp_cursor
)
proc_ctx: ContextVar[Optional[RocksReaderContext]] = ContextVar('proc_ctx')
def _update_rocksdb_ctx():
ctx = proc_ctx.get()
ctx.update_state()
async def update_rocksdb_ctx(executor: ProcessPoolExecutor):
loop = asyncio.get_event_loop()
await asyncio.wait([loop.run_in_executor(executor, _update_rocksdb_ctx) for _ in range(executor._max_workers)])
def _initializer(path, name):
db = RocksDB(path, name, for_sync=False, read_only=True)
state = RocksDBState(-1, '', -1, -1, b'', -1, -1, True, -1, -1, -1)
proc_ctx.set(RocksReaderContext(db, name, Merkle(), [], state, pylru.lrucache(50000), pylru.lrucache(100000)))
def _teardown():
proc_ctx.get().close()
proc_ctx.set(None)
async def initialize_executor(workers, db_dir='/media/jack/evo970/spv_data', for_sync=False, name='lbryrocks'):
executor = ProcessPoolExecutor(workers, initializer=_initializer, initargs=(db_dir, name))
try:
writer = RocksDB(db_dir, name, for_sync=for_sync, read_only=False)
await update_rocksdb_ctx(executor)
except Exception as err:
await teardown_executor(executor)
executor.shutdown(True)
raise err
return executor, writer
async def teardown_executor(executor: ProcessPoolExecutor):
try:
await asyncio.wait(
[asyncio.get_event_loop().run_in_executor(executor, _teardown) for _ in range(executor._max_workers)]
)
finally:
executor.shutdown(True)