forked from LBRYCommunity/lbry-sdk
db state struct
-remove dead code
This commit is contained in:
parent
c681041b48
commit
b7df277a5c
3 changed files with 110 additions and 189 deletions
|
@ -9,6 +9,7 @@ from prometheus_client import Gauge, Histogram
|
|||
from collections import defaultdict
|
||||
import lbry
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.wallet.transaction import OutputScript, Output
|
||||
from lbry.wallet.server.tx import Tx
|
||||
from lbry.wallet.server.db.writer import SQLDB
|
||||
from lbry.wallet.server.daemon import DaemonError
|
||||
|
@ -702,8 +703,6 @@ class BlockProcessor:
|
|||
return ops
|
||||
|
||||
def advance_block(self, block, height: int):
|
||||
from lbry.wallet.transaction import OutputScript, Output
|
||||
|
||||
txs: List[Tuple[Tx, bytes]] = block.transactions
|
||||
# header = self.coin.electrum_header(block.header, height)
|
||||
block_hash = self.coin.header_hash(block.header)
|
||||
|
|
|
@ -27,7 +27,6 @@ class DB_PREFIXES(enum.Enum):
|
|||
TX_HASH_PREFIX = b'X'
|
||||
|
||||
HASHX_UTXO_PREFIX = b'h'
|
||||
HIST_STATE = b'state-hist'
|
||||
UTXO_STATE = b'state-utxo'
|
||||
db_state = b's'
|
||||
UTXO_PREFIX = b'u'
|
||||
HASHX_HISTORY_PREFIX = b'x'
|
||||
|
|
|
@ -11,21 +11,19 @@
|
|||
|
||||
import asyncio
|
||||
import array
|
||||
import ast
|
||||
import base64
|
||||
import os
|
||||
import time
|
||||
import typing
|
||||
import struct
|
||||
import attr
|
||||
from typing import Optional, Iterable
|
||||
from functools import partial
|
||||
from asyncio import sleep
|
||||
from bisect import bisect_right, bisect_left
|
||||
from collections import namedtuple, defaultdict
|
||||
from collections import defaultdict
|
||||
from glob import glob
|
||||
from struct import pack, unpack
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
import attr
|
||||
from lbry.utils import LRUCacheWithMetrics
|
||||
from lbry.schema.url import URL
|
||||
from lbry.wallet.server import util
|
||||
|
@ -40,7 +38,14 @@ from lbry.wallet.server.db.prefixes import Prefixes
|
|||
from lbry.wallet.server.db.claimtrie import StagedClaimtrieItem, get_update_effective_amount_ops, length_encoded_name
|
||||
from lbry.wallet.server.db.claimtrie import get_expiration_height
|
||||
|
||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||
|
||||
class UTXO(typing.NamedTuple):
|
||||
tx_num: int
|
||||
tx_pos: int
|
||||
tx_hash: bytes
|
||||
height: int
|
||||
value: int
|
||||
|
||||
|
||||
TXO_STRUCT = struct.Struct(b'>LH')
|
||||
TXO_STRUCT_unpack = TXO_STRUCT.unpack
|
||||
|
@ -55,10 +60,7 @@ TX_NUM_PREFIX = b'N'
|
|||
TX_COUNT_PREFIX = b'T'
|
||||
UNDO_PREFIX = b'U'
|
||||
TX_HASH_PREFIX = b'X'
|
||||
|
||||
HASHX_UTXO_PREFIX = b'h'
|
||||
HIST_STATE = b'state-hist'
|
||||
UTXO_STATE = b'state-utxo'
|
||||
UTXO_PREFIX = b'u'
|
||||
HASHX_HISTORY_PREFIX = b'x'
|
||||
|
||||
|
@ -100,6 +102,35 @@ class ResolveResult(typing.NamedTuple):
|
|||
reposted_claim_hash: Optional[bytes]
|
||||
|
||||
|
||||
DB_STATE_STRUCT = struct.Struct(b'>32sLL32sHLBBlll')
|
||||
DB_STATE_STRUCT_SIZE = 92
|
||||
|
||||
|
||||
class DBState(typing.NamedTuple):
|
||||
genesis: bytes
|
||||
height: int
|
||||
tx_count: int
|
||||
tip: bytes
|
||||
utxo_flush_count: int
|
||||
wall_time: int
|
||||
first_sync: bool
|
||||
db_version: int
|
||||
hist_flush_count: int
|
||||
comp_flush_count: int
|
||||
comp_cursor: int
|
||||
|
||||
def pack(self) -> bytes:
|
||||
return DB_STATE_STRUCT.pack(
|
||||
self.genesis, self.height, self.tx_count, self.tip, self.utxo_flush_count,
|
||||
self.wall_time, 1 if self.first_sync else 0, self.db_version, self.hist_flush_count,
|
||||
self.comp_flush_count, self.comp_cursor
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def unpack(cls, packed: bytes) -> 'DBState':
|
||||
return cls(*DB_STATE_STRUCT.unpack(packed[:DB_STATE_STRUCT_SIZE]))
|
||||
|
||||
|
||||
class LevelDB:
|
||||
"""Simple wrapper of the backend database for querying.
|
||||
|
||||
|
@ -107,8 +138,7 @@ class LevelDB:
|
|||
it was shutdown uncleanly.
|
||||
"""
|
||||
|
||||
DB_VERSIONS = [6]
|
||||
HIST_DB_VERSIONS = [0, 6]
|
||||
DB_VERSIONS = HIST_DB_VERSIONS = [7]
|
||||
|
||||
class DBError(Exception):
|
||||
"""Raised on general DB errors generally indicating corruption."""
|
||||
|
@ -156,15 +186,14 @@ class LevelDB:
|
|||
return
|
||||
return claim_hash_and_name[:CLAIM_HASH_LEN], claim_hash_and_name[CLAIM_HASH_LEN:]
|
||||
|
||||
def get_supported_claim_from_txo(self, tx_num, tx_idx: int):
|
||||
supported_claim_hash = self.db.get(
|
||||
DB_PREFIXES.support_to_claim.value + TXO_STRUCT_pack(tx_num, tx_idx)
|
||||
)
|
||||
def get_supported_claim_from_txo(self, tx_num: int, position: int) -> typing.Tuple[Optional[bytes], Optional[int]]:
|
||||
key = Prefixes.support_to_claim.pack_key(tx_num, position)
|
||||
supported_claim_hash = self.db.get(key)
|
||||
if supported_claim_hash:
|
||||
packed_support_amount = self.db.get(
|
||||
Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, tx_idx)
|
||||
Prefixes.claim_to_support.pack_key(supported_claim_hash, tx_num, position)
|
||||
)
|
||||
if packed_support_amount is not None:
|
||||
if packed_support_amount:
|
||||
return supported_claim_hash, Prefixes.claim_to_support.unpack_value(packed_support_amount).amount
|
||||
return None, None
|
||||
|
||||
|
@ -382,19 +411,6 @@ class LevelDB:
|
|||
)
|
||||
return expired
|
||||
|
||||
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
# unflushed = self.history.unflushed
|
||||
# count = 0
|
||||
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
# hashXs = set(hashXs)
|
||||
# for hashX in hashXs:
|
||||
# unflushed[hashX].append(tx_num)
|
||||
# count += len(hashXs)
|
||||
# self.history.unflushed_count += count
|
||||
|
||||
# def unflushed_memsize(self):
|
||||
# return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4
|
||||
|
||||
async def _read_tx_counts(self):
|
||||
if self.tx_counts is not None:
|
||||
return
|
||||
|
@ -455,32 +471,33 @@ class LevelDB:
|
|||
f'{self.coin.NAME} {self.coin.NET}'.encode())
|
||||
|
||||
assert self.db is None
|
||||
self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
|
||||
self.db = self.db_class(f'lbry-{self.env.db_engine}', True)
|
||||
if self.db.is_new:
|
||||
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
|
||||
self.logger.info(f'opened DB (for sync: {for_sync})')
|
||||
|
||||
self.read_utxo_state()
|
||||
|
||||
# Then history DB
|
||||
state = self.db.get(HIST_STATE)
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise RuntimeError('failed reading state from history DB')
|
||||
self.hist_flush_count = state['flush_count']
|
||||
self.hist_comp_flush_count = state.get('comp_flush_count', -1)
|
||||
self.hist_comp_cursor = state.get('comp_cursor', -1)
|
||||
self.hist_db_version = state.get('db_version', 0)
|
||||
else:
|
||||
self.hist_flush_count = 0
|
||||
self.hist_comp_flush_count = -1
|
||||
self.hist_comp_cursor = -1
|
||||
self.hist_db_version = max(self.HIST_DB_VERSIONS)
|
||||
self.logger.info(f'opened db: %s', f'lbry-{self.env.db_engine}')
|
||||
|
||||
self.logger.info(f'history DB version: {self.hist_db_version}')
|
||||
if self.hist_db_version not in self.HIST_DB_VERSIONS:
|
||||
msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}'
|
||||
# read db state
|
||||
self.read_db_state()
|
||||
|
||||
# These are our state as we move ahead of DB state
|
||||
self.fs_height = self.db_height
|
||||
self.fs_tx_count = self.db_tx_count
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
|
||||
# Log some stats
|
||||
self.logger.info(f'DB version: {self.db_version:d}')
|
||||
self.logger.info(f'coin: {self.coin.NAME}')
|
||||
self.logger.info(f'network: {self.coin.NET}')
|
||||
self.logger.info(f'height: {self.db_height:,d}')
|
||||
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
|
||||
self.logger.info(f'tx count: {self.db_tx_count:,d}')
|
||||
if self.db.for_sync:
|
||||
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
|
||||
if self.first_sync:
|
||||
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
|
||||
if self.hist_db_version not in self.DB_VERSIONS:
|
||||
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
|
||||
self.logger.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
|
||||
|
@ -492,7 +509,7 @@ class LevelDB:
|
|||
self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value):
|
||||
k = key[1:]
|
||||
flush_id, = unpack_be_uint16_from(k[-2:])
|
||||
if flush_id > self.utxo_flush_count:
|
||||
|
@ -503,29 +520,19 @@ class LevelDB:
|
|||
self.hist_flush_count = self.utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
state = {
|
||||
'flush_count': self.hist_flush_count,
|
||||
'comp_flush_count': self.hist_comp_flush_count,
|
||||
'comp_cursor': self.hist_comp_cursor,
|
||||
'db_version': self.hist_db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
batch.delete(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key)
|
||||
if keys:
|
||||
self.logger.info('deleted %i excess history entries', len(keys))
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||
for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
keys.append(key)
|
||||
|
||||
if keys:
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
|
@ -609,40 +616,6 @@ class LevelDB:
|
|||
assert not flush_data.undo_infos
|
||||
assert not self.hist_unflushed
|
||||
|
||||
def flush_utxo_db(self, batch, flush_data):
|
||||
"""Flush the cached DB writes and UTXO set to the batch."""
|
||||
# Care is needed because the writes generated by flushing the
|
||||
# UTXO state may have keys in common with our write cache or
|
||||
# may be in the DB already.
|
||||
start_time = time.time()
|
||||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed {block_count:,d} blocks with '
|
||||
f'{tx_count:,d} txs, {add_count:,d} UTXO adds, '
|
||||
f'{spend_count:,d} spends in '
|
||||
f'{elapsed:.1f}s, committing...')
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
def write_history_state(self, batch):
|
||||
state = {
|
||||
'flush_count': self.hist_flush_count,
|
||||
'comp_flush_count': self.hist_comp_flush_count,
|
||||
'comp_cursor': self.hist_comp_cursor,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(DB_PREFIXES.HIST_STATE.value, repr(state).encode())
|
||||
|
||||
def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
|
||||
"""Flush out cached state. History is always flushed; UTXOs are
|
||||
flushed if flush_utxos."""
|
||||
|
@ -704,9 +677,11 @@ class LevelDB:
|
|||
else:
|
||||
batch_delete(staged_change.key)
|
||||
flush_data.claimtrie_stash.clear()
|
||||
|
||||
for undo_claims, height in flush_data.undo_claimtrie:
|
||||
batch_put(DB_PREFIXES.undo_claimtrie.value + util.pack_be_uint64(height), undo_claims)
|
||||
flush_data.undo_claimtrie.clear()
|
||||
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
|
||||
|
@ -718,7 +693,6 @@ class LevelDB:
|
|||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes())
|
||||
self.write_history_state(batch)
|
||||
|
||||
unflushed.clear()
|
||||
self.hist_unflushed_count = 0
|
||||
|
@ -762,29 +736,18 @@ class LevelDB:
|
|||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
# self.flush_state(batch)
|
||||
#
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
# # Update and put the wall time again - otherwise we drop the
|
||||
# # time it took to commit the batch
|
||||
# # self.flush_state(self.db)
|
||||
# now = time.time()
|
||||
# self.wall_time += now - self.last_flush
|
||||
# self.last_flush = now
|
||||
# self.last_flush_tx_count = self.fs_tx_count
|
||||
# self.write_utxo_state(batch)
|
||||
|
||||
self.write_db_state(batch)
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'flush #{self.hist_flush_count:,d} took '
|
||||
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
|
||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
# Catch-up stats
|
||||
if self.db.for_sync:
|
||||
flush_interval = self.last_flush - prior_flush
|
||||
|
@ -796,14 +759,6 @@ class LevelDB:
|
|||
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
|
||||
f'ETA: {formatted_time(eta)}')
|
||||
|
||||
# def flush_state(self, batch):
|
||||
# """Flush chain state to the batch."""
|
||||
# now = time.time()
|
||||
# self.wall_time += now - self.last_flush
|
||||
# self.last_flush = now
|
||||
# self.last_flush_tx_count = self.fs_tx_count
|
||||
# self.write_utxo_state(batch)
|
||||
|
||||
def flush_backup(self, flush_data, touched):
|
||||
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
|
||||
assert not flush_data.headers
|
||||
|
@ -827,7 +782,7 @@ class LevelDB:
|
|||
batch_delete = batch.delete
|
||||
|
||||
claim_reorg_height = self.fs_height
|
||||
print("flush undos", flush_data.undo_claimtrie)
|
||||
# print("flush undos", flush_data.undo_claimtrie)
|
||||
for (ops, height) in reversed(flush_data.undo_claimtrie):
|
||||
claimtrie_ops = RevertableOp.unpack_stack(ops)
|
||||
print("%i undo ops for %i" % (len(claimtrie_ops), height))
|
||||
|
@ -867,9 +822,6 @@ class LevelDB:
|
|||
for key, value in puts.items():
|
||||
batch_put(key, value)
|
||||
|
||||
|
||||
self.write_history_state(batch)
|
||||
|
||||
# New undo information
|
||||
for undo_info, height in flush_data.undo_infos:
|
||||
batch.put(self.undo_key(height), b''.join(undo_info))
|
||||
|
@ -889,7 +841,6 @@ class LevelDB:
|
|||
batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
start_time = time.time()
|
||||
add_count = len(flush_data.adds)
|
||||
spend_count = len(flush_data.deletes) // 2
|
||||
|
@ -908,15 +859,12 @@ class LevelDB:
|
|||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
self.write_db_state(batch)
|
||||
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
elapsed = self.last_flush - start_time
|
||||
|
@ -1037,8 +985,6 @@ class LevelDB:
|
|||
def read_history():
|
||||
db_height = self.db_height
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.db.get
|
||||
pack_be_uint64 = util.pack_be_uint64
|
||||
|
||||
cnt = 0
|
||||
txs = []
|
||||
|
@ -1139,8 +1085,17 @@ class LevelDB:
|
|||
|
||||
# -- UTXO database
|
||||
|
||||
def read_utxo_state(self):
|
||||
state = self.db.get(UTXO_STATE)
|
||||
def write_db_state(self, batch):
|
||||
"""Write (UTXO) state to the batch."""
|
||||
db_state = DBState(
|
||||
bytes.fromhex(self.coin.GENESIS_HASH), self.db_height, self.db_tx_count, self.db_tip,
|
||||
self.utxo_flush_count, int(self.wall_time), self.first_sync, self.db_version,
|
||||
self.hist_flush_count, self.hist_comp_flush_count, self.hist_comp_cursor
|
||||
)
|
||||
batch.put(DB_PREFIXES.db_state.value, db_state.pack())
|
||||
|
||||
def read_db_state(self):
|
||||
state = self.db.get(DB_PREFIXES.db_state.value)
|
||||
if not state:
|
||||
self.db_height = -1
|
||||
self.db_tx_count = 0
|
||||
|
@ -1149,63 +1104,31 @@ class LevelDB:
|
|||
self.utxo_flush_count = 0
|
||||
self.wall_time = 0
|
||||
self.first_sync = True
|
||||
self.hist_flush_count = 0
|
||||
self.hist_comp_flush_count = -1
|
||||
self.hist_comp_cursor = -1
|
||||
self.hist_db_version = max(self.DB_VERSIONS)
|
||||
else:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise self.DBError('failed reading state from DB')
|
||||
self.db_version = state['db_version']
|
||||
state = DBState.unpack(state)
|
||||
self.db_version = state.db_version
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
raise self.DBError(f'your UTXO DB version is {self.db_version} but this '
|
||||
raise self.DBError(f'your DB version is {self.db_version} but this '
|
||||
f'software only handles versions {self.DB_VERSIONS}')
|
||||
# backwards compat
|
||||
genesis_hash = state['genesis']
|
||||
if isinstance(genesis_hash, bytes):
|
||||
genesis_hash = genesis_hash.decode()
|
||||
if genesis_hash != self.coin.GENESIS_HASH:
|
||||
genesis_hash = state.genesis
|
||||
if genesis_hash.hex() != self.coin.GENESIS_HASH:
|
||||
raise self.DBError(f'DB genesis hash {genesis_hash} does not '
|
||||
f'match coin {self.coin.GENESIS_HASH}')
|
||||
self.db_height = state['height']
|
||||
self.db_tx_count = state['tx_count']
|
||||
self.db_tip = state['tip']
|
||||
self.utxo_flush_count = state['utxo_flush_count']
|
||||
self.wall_time = state['wall_time']
|
||||
self.first_sync = state['first_sync']
|
||||
|
||||
# These are our state as we move ahead of DB state
|
||||
self.fs_height = self.db_height
|
||||
self.fs_tx_count = self.db_tx_count
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
|
||||
# Log some stats
|
||||
self.logger.info(f'DB version: {self.db_version:d}')
|
||||
self.logger.info(f'coin: {self.coin.NAME}')
|
||||
self.logger.info(f'network: {self.coin.NET}')
|
||||
self.logger.info(f'height: {self.db_height:,d}')
|
||||
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
|
||||
self.logger.info(f'tx count: {self.db_tx_count:,d}')
|
||||
if self.db.for_sync:
|
||||
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
|
||||
if self.first_sync:
|
||||
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
|
||||
|
||||
def write_utxo_state(self, batch):
|
||||
"""Write (UTXO) state to the batch."""
|
||||
state = {
|
||||
'genesis': self.coin.GENESIS_HASH,
|
||||
'height': self.db_height,
|
||||
'tx_count': self.db_tx_count,
|
||||
'tip': self.db_tip,
|
||||
'utxo_flush_count': self.utxo_flush_count,
|
||||
'wall_time': self.wall_time,
|
||||
'first_sync': self.first_sync,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
batch.put(DB_PREFIXES.UTXO_STATE.value, repr(state).encode())
|
||||
|
||||
def set_flush_count(self, count):
|
||||
self.utxo_flush_count = count
|
||||
with self.db.write_batch() as batch:
|
||||
self.write_utxo_state(batch)
|
||||
self.db_height = state.height
|
||||
self.db_tx_count = state.tx_count
|
||||
self.db_tip = state.tip
|
||||
self.utxo_flush_count = state.utxo_flush_count
|
||||
self.wall_time = state.wall_time
|
||||
self.first_sync = state.first_sync
|
||||
self.hist_flush_count = state.hist_flush_count
|
||||
self.hist_comp_flush_count = state.comp_flush_count
|
||||
self.hist_comp_cursor = state.comp_cursor
|
||||
self.hist_db_version = state.db_version
|
||||
|
||||
async def all_utxos(self, hashX):
|
||||
"""Return all UTXOs for an address sorted in no particular order."""
|
||||
|
|
Loading…
Reference in a new issue