From b7df277a5c1b7b6816d5dfc7f31dc3ad56a05dad Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 24 Feb 2021 15:20:44 -0500 Subject: [PATCH] db state struct -remove dead code --- lbry/wallet/server/block_processor.py | 3 +- lbry/wallet/server/db/__init__.py | 3 +- lbry/wallet/server/leveldb.py | 293 ++++++++++---------------- 3 files changed, 110 insertions(+), 189 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 117f5fb0a..7f5806553 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -9,6 +9,7 @@ from prometheus_client import Gauge, Histogram from collections import defaultdict import lbry from lbry.schema.claim import Claim +from lbry.wallet.transaction import OutputScript, Output from lbry.wallet.server.tx import Tx from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.daemon import DaemonError @@ -702,8 +703,6 @@ class BlockProcessor: return ops def advance_block(self, block, height: int): - from lbry.wallet.transaction import OutputScript, Output - txs: List[Tuple[Tx, bytes]] = block.transactions # header = self.coin.electrum_header(block.header, height) block_hash = self.coin.header_hash(block.header) diff --git a/lbry/wallet/server/db/__init__.py b/lbry/wallet/server/db/__init__.py index 47293217d..f41fb5b7a 100644 --- a/lbry/wallet/server/db/__init__.py +++ b/lbry/wallet/server/db/__init__.py @@ -27,7 +27,6 @@ class DB_PREFIXES(enum.Enum): TX_HASH_PREFIX = b'X' HASHX_UTXO_PREFIX = b'h' - HIST_STATE = b'state-hist' - UTXO_STATE = b'state-utxo' + db_state = b's' UTXO_PREFIX = b'u' HASHX_HISTORY_PREFIX = b'x' diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 456de2526..9518310ad 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -11,21 +11,19 @@ import asyncio import array -import ast -import base64 import os import time import typing import struct +import attr from typing import Optional, Iterable from functools import partial from asyncio import sleep from bisect import bisect_right, bisect_left -from collections import namedtuple, defaultdict +from collections import defaultdict from glob import glob from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor -import attr from lbry.utils import LRUCacheWithMetrics from lbry.schema.url import URL from lbry.wallet.server import util @@ -40,7 +38,14 @@ from lbry.wallet.server.db.prefixes import Prefixes from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name from lbry.wallet.server.db.claimtrie import get_expiration_height -UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") + +class UTXO(typing.NamedTuple): + tx_num: int + tx_pos: int + tx_hash: bytes + height: int + value: int + TXO_STRUCT = struct.Struct(b'>LH') TXO_STRUCT_unpack = TXO_STRUCT.unpack @@ -55,10 +60,7 @@ TX_NUM_PREFIX = b'N' TX_COUNT_PREFIX = b'T' UNDO_PREFIX = b'U' TX_HASH_PREFIX = b'X' - HASHX_UTXO_PREFIX = b'h' -HIST_STATE = b'state-hist' -UTXO_STATE = b'state-utxo' UTXO_PREFIX = b'u' HASHX_HISTORY_PREFIX = b'x' @@ -100,6 +102,35 @@ class ResolveResult(typing.NamedTuple): reposted_claim_hash: Optional[bytes] +DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll') +DB_STATE_STRUCT_SIZE = 92 + + +class DBState(typing.NamedTuple): + genesis: bytes + height: int + tx_count: int + tip: bytes + utxo_flush_count: int + wall_time: int + first_sync: bool + db_version: int + hist_flush_count: int + comp_flush_count: int + comp_cursor: int + + def pack(self) -> bytes: + return DB_STATE_STRUCT.pack( + self.genesis, self.height, self.tx_count, self.tip, self.utxo_flush_count, + self.wall_time, 1 if self.first_sync else 0, self.db_version, self.hist_flush_count, + self.comp_flush_count, self.comp_cursor + ) + + @classmethod + def unpack(cls, packed: bytes) -> 'DBState': + return cls(*DB_STATE_STRUCT.unpack(packed[:DB_STATE_STRUCT_SIZE])) + + class LevelDB: """Simple wrapper of the backend database for querying. @@ -107,8 +138,7 @@ class LevelDB: it was shutdown uncleanly. """ - DB_VERSIONS = [6] - HIST_DB_VERSIONS = [0, 6] + DB_VERSIONS = HIST_DB_VERSIONS = [7] class DBError(Exception): """Raised on general DB errors generally indicating corruption.""" @@ -156,15 +186,14 @@ class LevelDB: return return claim_hash_and_name[:CLAIM_HASH_LEN], claim_hash_and_name[CLAIM_HASH_LEN:] - def get_supported_claim_from_txo(self, tx_num, tx_idx: int): - supported_claim_hash = self.db.get( - DB_PREFIXES.support_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx) - ) + def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]: + key = Prefixes.support_to_claim.pack_key(tx_num, position) + supported_claim_hash = self.db.get(key) if supported_claim_hash: packed_support_amount = self.db.get( - Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, tx_idx) + Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, position) ) - if packed_support_amount is not None: + if packed_support_amount: return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount return None, None @@ -382,19 +411,6 @@ class LevelDB: ) return expired - # def add_unflushed(self, hashXs_by_tx, first_tx_num): - # unflushed = self.history.unflushed - # count = 0 - # for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): - # hashXs = set(hashXs) - # for hashX in hashXs: - # unflushed[hashX].append(tx_num) - # count += len(hashXs) - # self.history.unflushed_count += count - - # def unflushed_memsize(self): - # return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4 - async def _read_tx_counts(self): if self.tx_counts is not None: return @@ -455,32 +471,33 @@ class LevelDB: f'{self.coin.NAME} {self.coin.NET}'.encode()) assert self.db is None - self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync) + self.db = self.db_class(f'lbry-{self.env.db_engine}', True) if self.db.is_new: self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}') - self.logger.info(f'opened DB (for sync: {for_sync})') - - self.read_utxo_state() - - # Then history DB - state = self.db.get(HIST_STATE) - if state: - state = ast.literal_eval(state.decode()) - if not isinstance(state, dict): - raise RuntimeError('failed reading state from history DB') - self.hist_flush_count = state['flush_count'] - self.hist_comp_flush_count = state.get('comp_flush_count', -1) - self.hist_comp_cursor = state.get('comp_cursor', -1) - self.hist_db_version = state.get('db_version', 0) else: - self.hist_flush_count = 0 - self.hist_comp_flush_count = -1 - self.hist_comp_cursor = -1 - self.hist_db_version = max(self.HIST_DB_VERSIONS) + self.logger.info(f'opened db: %s', f'lbry-{self.env.db_engine}') - self.logger.info(f'history DB version: {self.hist_db_version}') - if self.hist_db_version not in self.HIST_DB_VERSIONS: - msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}' + # read db state + self.read_db_state() + + # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count + self.last_flush_tx_count = self.fs_tx_count + + # Log some stats + self.logger.info(f'DB version: {self.db_version:d}') + self.logger.info(f'coin: {self.coin.NAME}') + self.logger.info(f'network: {self.coin.NET}') + self.logger.info(f'height: {self.db_height:,d}') + self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') + self.logger.info(f'tx count: {self.db_tx_count:,d}') + if self.db.for_sync: + self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') + if self.first_sync: + self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') + if self.hist_db_version not in self.DB_VERSIONS: + msg = f'this software only handles DB versions {self.DB_VERSIONS}' self.logger.error(msg) raise RuntimeError(msg) self.logger.info(f'flush count: {self.hist_flush_count:,d}') @@ -492,7 +509,7 @@ class LevelDB: self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...') keys = [] - for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): + for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value): k = key[1:] flush_id, = unpack_be_uint16_from(k[-2:]) if flush_id > self.utxo_flush_count: @@ -503,29 +520,19 @@ class LevelDB: self.hist_flush_count = self.utxo_flush_count with self.db.write_batch() as batch: for key in keys: - batch.delete(HASHX_HISTORY_PREFIX + key) - state = { - 'flush_count': self.hist_flush_count, - 'comp_flush_count': self.hist_comp_flush_count, - 'comp_cursor': self.hist_comp_cursor, - 'db_version': self.hist_db_version, - } - # History entries are not prefixed; the suffix \0\0 ensures we - # look similar to other entries and aren't interfered with - batch.put(HIST_STATE, repr(state).encode()) - - self.logger.info('deleted excess history entries') + batch.delete(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key) + if keys: + self.logger.info('deleted %i excess history entries', len(keys)) self.utxo_flush_count = self.hist_flush_count min_height = self.min_undo_height(self.db_height) keys = [] - for key, hist in self.db.iterator(prefix=UNDO_PREFIX): + for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value): height, = unpack('>I', key[-4:]) if height >= min_height: break keys.append(key) - if keys: with self.db.write_batch() as batch: for key in keys: @@ -609,40 +616,6 @@ class LevelDB: assert not flush_data.undo_infos assert not self.hist_unflushed - def flush_utxo_db(self, batch, flush_data): - """Flush the cached DB writes and UTXO set to the batch.""" - # Care is needed because the writes generated by flushing the - # UTXO state may have keys in common with our write cache or - # may be in the DB already. - start_time = time.time() - add_count = len(flush_data.adds) - spend_count = len(flush_data.deletes) // 2 - - if self.db.for_sync: - block_count = flush_data.height - self.db_height - tx_count = flush_data.tx_count - self.db_tx_count - elapsed = time.time() - start_time - self.logger.info(f'flushed {block_count:,d} blocks with ' - f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' - f'{spend_count:,d} spends in ' - f'{elapsed:.1f}s, committing...') - - self.utxo_flush_count = self.hist_flush_count - self.db_height = flush_data.height - self.db_tx_count = flush_data.tx_count - self.db_tip = flush_data.tip - - def write_history_state(self, batch): - state = { - 'flush_count': self.hist_flush_count, - 'comp_flush_count': self.hist_comp_flush_count, - 'comp_cursor': self.hist_comp_cursor, - 'db_version': self.db_version, - } - # History entries are not prefixed; the suffix \0\0 ensures we - # look similar to other entries and aren't interfered with - batch.put(DB_PREFIXES.HIST_STATE.value, repr(state).encode()) - def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining): """Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.""" @@ -704,9 +677,11 @@ class LevelDB: else: batch_delete(staged_change.key) flush_data.claimtrie_stash.clear() + for undo_claims, height in flush_data.undo_claimtrie: batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_claims) flush_data.undo_claimtrie.clear() + self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count @@ -718,7 +693,6 @@ class LevelDB: for hashX in sorted(unflushed): key = hashX + flush_id batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes()) - self.write_history_state(batch) unflushed.clear() self.hist_unflushed_count = 0 @@ -762,29 +736,18 @@ class LevelDB: self.db_height = flush_data.height self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip - # self.flush_state(batch) # now = time.time() self.wall_time += now - self.last_flush self.last_flush = now self.last_flush_tx_count = self.fs_tx_count - self.write_utxo_state(batch) - - # # Update and put the wall time again - otherwise we drop the - # # time it took to commit the batch - # # self.flush_state(self.db) - # now = time.time() - # self.wall_time += now - self.last_flush - # self.last_flush = now - # self.last_flush_tx_count = self.fs_tx_count - # self.write_utxo_state(batch) + self.write_db_state(batch) elapsed = self.last_flush - start_time self.logger.info(f'flush #{self.hist_flush_count:,d} took ' f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - # Catch-up stats if self.db.for_sync: flush_interval = self.last_flush - prior_flush @@ -796,14 +759,6 @@ class LevelDB: self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' f'ETA: {formatted_time(eta)}') - # def flush_state(self, batch): - # """Flush chain state to the batch.""" - # now = time.time() - # self.wall_time += now - self.last_flush - # self.last_flush = now - # self.last_flush_tx_count = self.fs_tx_count - # self.write_utxo_state(batch) - def flush_backup(self, flush_data, touched): """Like flush_dbs() but when backing up. All UTXOs are flushed.""" assert not flush_data.headers @@ -827,7 +782,7 @@ class LevelDB: batch_delete = batch.delete claim_reorg_height = self.fs_height - print("flush undos", flush_data.undo_claimtrie) + # print("flush undos", flush_data.undo_claimtrie) for (ops, height) in reversed(flush_data.undo_claimtrie): claimtrie_ops = RevertableOp.unpack_stack(ops) print("%i undo ops for %i" % (len(claimtrie_ops), height)) @@ -867,9 +822,6 @@ class LevelDB: for key, value in puts.items(): batch_put(key, value) - - self.write_history_state(batch) - # New undo information for undo_info, height in flush_data.undo_infos: batch.put(self.undo_key(height), b''.join(undo_info)) @@ -889,7 +841,6 @@ class LevelDB: batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:]) flush_data.adds.clear() - self.flush_utxo_db(batch, flush_data) start_time = time.time() add_count = len(flush_data.adds) spend_count = len(flush_data.deletes) // 2 @@ -908,15 +859,12 @@ class LevelDB: self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip - - # Flush state last as it reads the wall time. now = time.time() self.wall_time += now - self.last_flush self.last_flush = now self.last_flush_tx_count = self.fs_tx_count - self.write_utxo_state(batch) - + self.write_db_state(batch) self.logger.info(f'backing up removed {nremoves:,d} history entries') elapsed = self.last_flush - start_time @@ -1037,8 +985,6 @@ class LevelDB: def read_history(): db_height = self.db_height tx_counts = self.tx_counts - tx_db_get = self.db.get - pack_be_uint64 = util.pack_be_uint64 cnt = 0 txs = [] @@ -1139,8 +1085,17 @@ class LevelDB: # -- UTXO database - def read_utxo_state(self): - state = self.db.get(UTXO_STATE) + def write_db_state(self, batch): + """Write (UTXO) state to the batch.""" + db_state = DBState( + bytes.fromhex(self.coin.GENESIS_HASH), self.db_height, self.db_tx_count, self.db_tip, + self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version, + self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor + ) + batch.put(DB_PREFIXES.db_state.value, db_state.pack()) + + def read_db_state(self): + state = self.db.get(DB_PREFIXES.db_state.value) if not state: self.db_height = -1 self.db_tx_count = 0 @@ -1149,63 +1104,31 @@ class LevelDB: self.utxo_flush_count = 0 self.wall_time = 0 self.first_sync = True + self.hist_flush_count = 0 + self.hist_comp_flush_count = -1 + self.hist_comp_cursor = -1 + self.hist_db_version = max(self.DB_VERSIONS) else: - state = ast.literal_eval(state.decode()) - if not isinstance(state, dict): - raise self.DBError('failed reading state from DB') - self.db_version = state['db_version'] + state = DBState.unpack(state) + self.db_version = state.db_version if self.db_version not in self.DB_VERSIONS: - raise self.DBError(f'your UTXO DB version is {self.db_version} but this ' + raise self.DBError(f'your DB version is {self.db_version} but this ' f'software only handles versions {self.DB_VERSIONS}') # backwards compat - genesis_hash = state['genesis'] - if isinstance(genesis_hash, bytes): - genesis_hash = genesis_hash.decode() - if genesis_hash != self.coin.GENESIS_HASH: + genesis_hash = state.genesis + if genesis_hash.hex() != self.coin.GENESIS_HASH: raise self.DBError(f'DB genesis hash {genesis_hash} does not ' f'match coin {self.coin.GENESIS_HASH}') - self.db_height = state['height'] - self.db_tx_count = state['tx_count'] - self.db_tip = state['tip'] - self.utxo_flush_count = state['utxo_flush_count'] - self.wall_time = state['wall_time'] - self.first_sync = state['first_sync'] - - # These are our state as we move ahead of DB state - self.fs_height = self.db_height - self.fs_tx_count = self.db_tx_count - self.last_flush_tx_count = self.fs_tx_count - - # Log some stats - self.logger.info(f'DB version: {self.db_version:d}') - self.logger.info(f'coin: {self.coin.NAME}') - self.logger.info(f'network: {self.coin.NET}') - self.logger.info(f'height: {self.db_height:,d}') - self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') - self.logger.info(f'tx count: {self.db_tx_count:,d}') - if self.db.for_sync: - self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') - if self.first_sync: - self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') - - def write_utxo_state(self, batch): - """Write (UTXO) state to the batch.""" - state = { - 'genesis': self.coin.GENESIS_HASH, - 'height': self.db_height, - 'tx_count': self.db_tx_count, - 'tip': self.db_tip, - 'utxo_flush_count': self.utxo_flush_count, - 'wall_time': self.wall_time, - 'first_sync': self.first_sync, - 'db_version': self.db_version, - } - batch.put(DB_PREFIXES.UTXO_STATE.value, repr(state).encode()) - - def set_flush_count(self, count): - self.utxo_flush_count = count - with self.db.write_batch() as batch: - self.write_utxo_state(batch) + self.db_height = state.height + self.db_tx_count = state.tx_count + self.db_tip = state.tip + self.utxo_flush_count = state.utxo_flush_count + self.wall_time = state.wall_time + self.first_sync = state.first_sync + self.hist_flush_count = state.hist_flush_count + self.hist_comp_flush_count = state.comp_flush_count + self.hist_comp_cursor = state.comp_cursor + self.hist_db_version = state.db_version async def all_utxos(self, hashX): """Return all UTXOs for an address sorted in no particular order."""