db state struct
-remove dead code
This commit is contained in:
3 changed files with 110 additions and 189 deletions
@ -9,6 +9,7 @@ from prometheus_client import Gauge, Histogram
from collections import defaultdict
import lbry
from lbry.schema.claim import Claim
from lbry.wallet.transaction import OutputScript, Output
from lbry.wallet.server.tx import Tx
from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError
@ -702,8 +703,6 @@ class BlockProcessor:
return ops
def advance_block(self, block, height: int):
from lbry.wallet.transaction import OutputScript, Output
txs: List[Tuple[Tx, bytes]] = block.transactions
# header = self.coin.electrum_header(block.header, height)
block_hash = self.coin.header_hash(block.header)
@ -27,7 +27,6 @@ class DB_PREFIXES(enum.Enum):
HIST_STATE = b'state-hist'
UTXO_STATE = b'state-utxo'
db_state = b's'
@ -11,21 +11,19 @@
import asyncio
import array
import ast
import base64
import os
import time
import typing
import struct
import attr
from typing import Optional, Iterable
from functools import partial
from asyncio import sleep
from bisect import bisect_right, bisect_left
from collections import namedtuple, defaultdict
from collections import defaultdict
from glob import glob
from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor
import attr
from lbry.utils import LRUCacheWithMetrics
from lbry.schema.url import URL
from lbry.wallet.server import util
@ -40,7 +38,14 @@ from lbry.wallet.server.db.prefixes import Prefixes
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name
from lbry.wallet.server.db.claimtrie import get_expiration_height
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class UTXO(typing.NamedTuple):
tx_num: int
tx_pos: int
tx_hash: bytes
height: int
value: int
TXO_STRUCT = struct.Struct(b'>LH')
TXO_STRUCT_unpack = TXO_STRUCT.unpack
@ -55,10 +60,7 @@ TX_NUM_PREFIX = b'N'
HIST_STATE = b'state-hist'
UTXO_STATE = b'state-utxo'
@ -100,6 +102,35 @@ class ResolveResult(typing.NamedTuple):
reposted_claim_hash: Optional[bytes]
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll')
class DBState(typing.NamedTuple):
genesis: bytes
height: int
tx_count: int
tip: bytes
utxo_flush_count: int
wall_time: int
first_sync: bool
db_version: int
hist_flush_count: int
comp_flush_count: int
comp_cursor: int
def pack(self) -> bytes:
return DB_STATE_STRUCT.pack(
self.genesis, self.height, self.tx_count, self.tip, self.utxo_flush_count,
self.wall_time, 1 if self.first_sync else 0, self.db_version, self.hist_flush_count,
self.comp_flush_count, self.comp_cursor
def unpack(cls, packed: bytes) -> 'DBState':
return cls(*DB_STATE_STRUCT.unpack(packed[:DB_STATE_STRUCT_SIZE]))
class LevelDB:
"""Simple wrapper of the backend database for querying.
@ -107,8 +138,7 @@ class LevelDB:
it was shutdown uncleanly.
class DBError(Exception):
"""Raised on general DB errors generally indicating corruption."""
@ -156,15 +186,14 @@ class LevelDB:
return claim_hash_and_name[:CLAIM_HASH_LEN], claim_hash_and_name[CLAIM_HASH_LEN:]
def get_supported_claim_from_txo(self, tx_num, tx_idx: int):
supported_claim_hash = self.db.get(
DB_PREFIXES.support_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx)
def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]:
key = Prefixes.support_to_claim.pack_key(tx_num, position)
supported_claim_hash = self.db.get(key)
if supported_claim_hash:
packed_support_amount = self.db.get(
Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, tx_idx)
Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, position)
if packed_support_amount is not None:
if packed_support_amount:
return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount
return None, None
@ -382,19 +411,6 @@ class LevelDB:
return expired
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
# unflushed = self.history.unflushed
# count = 0
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
# hashXs = set(hashXs)
# for hashX in hashXs:
# unflushed[hashX].append(tx_num)
# count += len(hashXs)
# self.history.unflushed_count += count
# def unflushed_memsize(self):
# return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4
async def _read_tx_counts(self):
if self.tx_counts is not None:
@ -455,32 +471,33 @@ class LevelDB:
f'{self.coin.NAME} {self.coin.NET}'.encode())
assert self.db is None
self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
self.db = self.db_class(f'lbry-{self.env.db_engine}', True)
if self.db.is_new:
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
self.logger.info(f'opened DB (for sync: {for_sync})')
# Then history DB
state = self.db.get(HIST_STATE)
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise RuntimeError('failed reading state from history DB')
self.hist_flush_count = state['flush_count']
self.hist_comp_flush_count = state.get('comp_flush_count', -1)
self.hist_comp_cursor = state.get('comp_cursor', -1)
self.hist_db_version = state.get('db_version', 0)
self.hist_flush_count = 0
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.hist_db_version = max(self.HIST_DB_VERSIONS)
self.logger.info(f'opened db: %s', f'lbry-{self.env.db_engine}')
self.logger.info(f'history DB version: {self.hist_db_version}')
if self.hist_db_version not in self.HIST_DB_VERSIONS:
msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}'
# read db state
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.last_flush_tx_count = self.fs_tx_count
# Log some stats
self.logger.info(f'DB version: {self.db_version:d}')
self.logger.info(f'coin: {self.coin.NAME}')
self.logger.info(f'network: {self.coin.NET}')
self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
if self.hist_db_version not in self.DB_VERSIONS:
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
raise RuntimeError(msg)
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
@ -492,7 +509,7 @@ class LevelDB:
self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...')
keys = []
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value):
k = key[1:]
flush_id, = unpack_be_uint16_from(k[-2:])
if flush_id > self.utxo_flush_count:
@ -503,29 +520,19 @@ class LevelDB:
self.hist_flush_count = self.utxo_flush_count
with self.db.write_batch() as batch:
for key in keys:
batch.delete(HASHX_HISTORY_PREFIX + key)
state = {
'flush_count': self.hist_flush_count,
'comp_flush_count': self.hist_comp_flush_count,
'comp_cursor': self.hist_comp_cursor,
'db_version': self.hist_db_version,
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(HIST_STATE, repr(state).encode())
self.logger.info('deleted excess history entries')
batch.delete(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key)
if keys:
self.logger.info('deleted %i excess history entries', len(keys))
self.utxo_flush_count = self.hist_flush_count
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value):
height, = unpack('>I', key[-4:])
if height >= min_height:
if keys:
with self.db.write_batch() as batch:
for key in keys:
@ -609,40 +616,6 @@ class LevelDB:
assert not flush_data.undo_infos
assert not self.hist_unflushed
def flush_utxo_db(self, batch, flush_data):
"""Flush the cached DB writes and UTXO set to the batch."""
# Care is needed because the writes generated by flushing the
# UTXO state may have keys in common with our write cache or
# may be in the DB already.
start_time = time.time()
add_count = len(flush_data.adds)
spend_count = len(flush_data.deletes) // 2
if self.db.for_sync:
block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time
self.logger.info(f'flushed {block_count:,d} blocks with '
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
f'{spend_count:,d} spends in '
f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.hist_flush_count
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
def write_history_state(self, batch):
state = {
'flush_count': self.hist_flush_count,
'comp_flush_count': self.hist_comp_flush_count,
'comp_cursor': self.hist_comp_cursor,
'db_version': self.db_version,
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(DB_PREFIXES.HIST_STATE.value, repr(state).encode())
def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
@ -704,9 +677,11 @@ class LevelDB:
for undo_claims, height in flush_data.undo_claimtrie:
batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_claims)
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
@ -718,7 +693,6 @@ class LevelDB:
for hashX in sorted(unflushed):
key = hashX + flush_id
batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes())
self.hist_unflushed_count = 0
@ -762,29 +736,18 @@ class LevelDB:
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
# self.flush_state(batch)
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
# # Update and put the wall time again - otherwise we drop the
# # time it took to commit the batch
# # self.flush_state(self.db)
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.hist_flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
@ -796,14 +759,6 @@ class LevelDB:
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
# def flush_state(self, batch):
# """Flush chain state to the batch."""
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)
def flush_backup(self, flush_data, touched):
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
assert not flush_data.headers
@ -827,7 +782,7 @@ class LevelDB:
batch_delete = batch.delete
claim_reorg_height = self.fs_height
print("flush undos", flush_data.undo_claimtrie)
# print("flush undos", flush_data.undo_claimtrie)
for (ops, height) in reversed(flush_data.undo_claimtrie):
claimtrie_ops = RevertableOp.unpack_stack(ops)
print("%i undo ops for %i" % (len(claimtrie_ops), height))
@ -867,9 +822,6 @@ class LevelDB:
for key, value in puts.items():
batch_put(key, value)
# New undo information
for undo_info, height in flush_data.undo_infos:
batch.put(self.undo_key(height), b''.join(undo_info))
@ -889,7 +841,6 @@ class LevelDB:
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
self.flush_utxo_db(batch, flush_data)
start_time = time.time()
add_count = len(flush_data.adds)
spend_count = len(flush_data.deletes) // 2
@ -908,15 +859,12 @@ class LevelDB:
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
# Flush state last as it reads the wall time.
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.logger.info(f'backing up removed {nremoves:,d} history entries')
elapsed = self.last_flush - start_time
@ -1037,8 +985,6 @@ class LevelDB:
def read_history():
db_height = self.db_height
tx_counts = self.tx_counts
tx_db_get = self.db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0
txs = []
@ -1139,8 +1085,17 @@ class LevelDB:
# -- UTXO database
def read_utxo_state(self):
state = self.db.get(UTXO_STATE)
def write_db_state(self, batch):
"""Write (UTXO) state to the batch."""
db_state = DBState(
bytes.fromhex(self.coin.GENESIS_HASH), self.db_height, self.db_tx_count, self.db_tip,
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor
batch.put(DB_PREFIXES.db_state.value, db_state.pack())
def read_db_state(self):
state = self.db.get(DB_PREFIXES.db_state.value)
if not state:
self.db_height = -1
self.db_tx_count = 0
@ -1149,63 +1104,31 @@ class LevelDB:
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
self.hist_flush_count = 0
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.hist_db_version = max(self.DB_VERSIONS)
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise self.DBError('failed reading state from DB')
self.db_version = state['db_version']
state = DBState.unpack(state)
self.db_version = state.db_version
if self.db_version not in self.DB_VERSIONS:
raise self.DBError(f'your UTXO DB version is {self.db_version} but this '
raise self.DBError(f'your DB version is {self.db_version} but this '
f'software only handles versions {self.DB_VERSIONS}')
# backwards compat
genesis_hash = state['genesis']
if isinstance(genesis_hash, bytes):
genesis_hash = genesis_hash.decode()
if genesis_hash != self.coin.GENESIS_HASH:
genesis_hash = state.genesis
if genesis_hash.hex() != self.coin.GENESIS_HASH:
raise self.DBError(f'DB genesis hash {genesis_hash} does not '
f'match coin {self.coin.GENESIS_HASH}')
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.last_flush_tx_count = self.fs_tx_count
# Log some stats
self.logger.info(f'DB version: {self.db_version:d}')
self.logger.info(f'coin: {self.coin.NAME}')
self.logger.info(f'network: {self.coin.NET}')
self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
def write_utxo_state(self, batch):
"""Write (UTXO) state to the batch."""
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
'db_version': self.db_version,
batch.put(DB_PREFIXES.UTXO_STATE.value, repr(state).encode())
def set_flush_count(self, count):
self.utxo_flush_count = count
with self.db.write_batch() as batch:
self.db_height = state.height
self.db_tx_count = state.tx_count
self.db_tip = state.tip
self.utxo_flush_count = state.utxo_flush_count
self.wall_time = state.wall_time
self.first_sync = state.first_sync
self.hist_flush_count = state.hist_flush_count
self.hist_comp_flush_count = state.comp_flush_count
self.hist_comp_cursor = state.comp_cursor
self.hist_db_version = state.db_version
async def all_utxos(self, hashX):
"""Return all UTXOs for an address sorted in no particular order."""
Add table
Reference in a new issue