remove lbry.wallet.server.history

This commit is contained in:
Jack Robison 2021-01-12 12:24:08 -05:00 committed by Victor Shyba
parent bc7fe680c0
commit eda1b0b3fc
3 changed files with 54 additions and 355 deletions

View file

@ -1,5 +1,6 @@
import time import time
import asyncio import asyncio
import typing
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple from typing import Optional, List, Tuple
@ -14,6 +15,8 @@ from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.transaction import Transaction from lbry.wallet.transaction import Transaction
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB
class Prefetcher: class Prefetcher:
@ -155,7 +158,7 @@ class BlockProcessor:
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace=NAMESPACE
) )
def __init__(self, env, db, daemon, notifications): def __init__(self, env, db: 'LevelDB', daemon, notifications):
self.env = env self.env = env
self.db = db self.db = db
self.daemon = daemon self.daemon = daemon
@ -259,7 +262,6 @@ class BlockProcessor:
else: else:
self.logger.info(f'faking a reorg of {count:,d} blocks') self.logger.info(f'faking a reorg of {count:,d} blocks')
async def get_raw_blocks(last_height, hex_hashes): async def get_raw_blocks(last_height, hex_hashes):
heights = range(last_height, last_height - len(hex_hashes), -1) heights = range(last_height, last_height - len(hex_hashes), -1)
try: try:
@ -277,7 +279,6 @@ class BlockProcessor:
try: try:
await self.flush(True) await self.flush(True)
start, last, hashes = await self.reorg_hashes(count) start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings. # Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
@ -364,8 +365,7 @@ class BlockProcessor:
async def flush(self, flush_utxos): async def flush(self, flush_utxos):
def flush(): def flush():
self.db.flush_dbs(self.flush_data(), flush_utxos, self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining)
self.estimate_txs_remaining)
await self.run_in_thread_with_lock(flush) await self.run_in_thread_with_lock(flush)
async def _maybe_flush(self): async def _maybe_flush(self):
@ -384,7 +384,7 @@ class BlockProcessor:
one_MB = 1000*1000 one_MB = 1000*1000
utxo_cache_size = len(self.utxo_cache) * 205 utxo_cache_size = len(self.utxo_cache) * 205
db_deletes_size = len(self.db_deletes) * 57 db_deletes_size = len(self.db_deletes) * 57
hist_cache_size = len(self.db.history.unflushed) * 180 + self.db.history.unflushed_count * 4 hist_cache_size = len(self.db.hist_unflushed) * 180 + self.db.hist_unflushed_count * 4
# Roughly ntxs * 32 + nblocks * 42 # Roughly ntxs * 32 + nblocks * 42
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
+ (self.height - self.db.fs_height) * 42) + (self.height - self.db.fs_height) * 42)
@ -426,7 +426,7 @@ class BlockProcessor:
self.headers.extend(headers) self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1]) self.tip = self.coin.header_hash(headers[-1])
self.db.flush_dbs(self.flush_data(), True, self.estimate_txs_remaining) self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining)
for cache in self.search_cache.values(): for cache in self.search_cache.values():
cache.clear() cache.clear()
@ -477,13 +477,13 @@ class BlockProcessor:
# self.db.add_unflushed(hashXs_by_tx, self.tx_count) # self.db.add_unflushed(hashXs_by_tx, self.tx_count)
first_tx_num = self.tx_count first_tx_num = self.tx_count
_unflushed = self.db.history.unflushed _unflushed = self.db.hist_unflushed
_count = 0 _count = 0
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num): for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
for _hashX in set(_hashXs): for _hashX in set(_hashXs):
_unflushed[_hashX].append(_tx_num) _unflushed[_hashX].append(_tx_num)
_count += len(_hashXs) _count += len(_hashXs)
self.db.history.unflushed_count += _count self.db.hist_unflushed_count += _count
self.tx_count = tx_num self.tx_count = tx_num
self.db.tx_counts.append(tx_num) self.db.tx_counts.append(tx_num)

View file

@ -1,304 +0,0 @@
# Copyright (c) 2016-2018, Neil Booth
# Copyright (c) 2017, the ElectrumX authors
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
"""History by script hash (address)."""
import array
import ast
import bisect
import time
from collections import defaultdict
from functools import partial
from lbry.wallet.server import util
from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
HASHX_HISTORY_PREFIX = b'x'
HIST_STATE = b'state-hist'
class History:
DB_VERSIONS = [0]
def __init__(self):
self.logger = util.class_logger(__name__, self.__class__.__name__)
# For history compaction
self.max_hist_row_entries = 12500
self.unflushed = defaultdict(partial(array.array, 'I'))
self.unflushed_count = 0
self.flush_count = 0
self.comp_flush_count = -1
self.comp_cursor = -1
# self.db = None
# def close_db(self):
# if self.db:
# # self.db.close()
# self.db = None
# def read_state(self):
# state = self.db.get(HIST_STATE)
# if state:
# state = ast.literal_eval(state.decode())
# if not isinstance(state, dict):
# raise RuntimeError('failed reading state from history DB')
# self.flush_count = state['flush_count']
# self.comp_flush_count = state.get('comp_flush_count', -1)
# self.comp_cursor = state.get('comp_cursor', -1)
# self.db_version = state.get('db_version', 0)
# else:
# self.flush_count = 0
# self.comp_flush_count = -1
# self.comp_cursor = -1
# self.db_version = max(self.DB_VERSIONS)
#
# self.logger.info(f'history DB version: {self.db_version}')
# if self.db_version not in self.DB_VERSIONS:
# msg = f'this software only handles DB versions {self.DB_VERSIONS}'
# self.logger.error(msg)
# raise RuntimeError(msg)
# self.logger.info(f'flush count: {self.flush_count:,d}')
# def clear_excess(self, utxo_flush_count):
# # < might happen at end of compaction as both DBs cannot be
# # updated atomically
# if self.flush_count <= utxo_flush_count:
# return
#
# self.logger.info('DB shut down uncleanly. Scanning for '
# 'excess history flushes...')
#
# keys = []
# for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
# k = key[1:]
# flush_id, = unpack_be_uint16_from(k[-2:])
# if flush_id > utxo_flush_count:
# keys.append(k)
#
# self.logger.info(f'deleting {len(keys):,d} history entries')
#
# self.flush_count = utxo_flush_count
# with self.db.write_batch() as batch:
# for key in keys:
# batch.delete(HASHX_HISTORY_PREFIX + key)
# self.write_state(batch)
#
# self.logger.info('deleted excess history entries')
#
# def write_state(self, batch):
# """Write state to the history DB."""
# state = {
# 'flush_count': self.flush_count,
# 'comp_flush_count': self.comp_flush_count,
# 'comp_cursor': self.comp_cursor,
# 'db_version': self.db_version,
# }
# # History entries are not prefixed; the suffix \0\0 ensures we
# # look similar to other entries and aren't interfered with
# batch.put(HIST_STATE, repr(state).encode())
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
# unflushed = self.unflushed
# count = 0
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
# hashXs = set(hashXs)
# for hashX in hashXs:
# unflushed[hashX].append(tx_num)
# count += len(hashXs)
# self.unflushed_count += count
# def unflushed_memsize(self):
# return len(self.unflushed) * 180 + self.unflushed_count * 4
# def assert_flushed(self):
# assert not self.unflushed
# def backup(self, hashXs, tx_count):
# # Not certain this is needed, but it doesn't hurt
# self.flush_count += 1
# nremoves = 0
# bisect_left = bisect.bisect_left
#
# with self.db.write_batch() as batch:
# for hashX in sorted(hashXs):
# deletes = []
# puts = {}
# for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
# k = key[1:]
# a = array.array('I')
# a.frombytes(hist)
# # Remove all history entries >= tx_count
# idx = bisect_left(a, tx_count)
# nremoves += len(a) - idx
# if idx > 0:
# puts[k] = a[:idx].tobytes()
# break
# deletes.append(k)
#
# for key in deletes:
# batch.delete(key)
# for key, value in puts.items():
# batch.put(key, value)
# self.write_state(batch)
#
# self.logger.info(f'backing up removed {nremoves:,d} history entries')
# def get_txnums(self, hashX, limit=1000):
# """Generator that returns an unpruned, sorted list of tx_nums in the
# history of a hashX. Includes both spending and receiving
# transactions. By default yields at most 1000 entries. Set
# limit to None to get them all. """
# limit = util.resolve_limit(limit)
# for key, hist in self.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# if limit == 0:
# return
# yield tx_num
# limit -= 1
#
# History compaction
#
# comp_cursor is a cursor into compaction progress.
# -1: no compaction in progress
# 0-65535: Compaction in progress; all prefixes < comp_cursor have
# been compacted, and later ones have not.
# 65536: compaction complete in-memory but not flushed
#
# comp_flush_count applies during compaction, and is a flush count
# for history with prefix < comp_cursor. flush_count applies
# to still uncompacted history. It is -1 when no compaction is
# taking place. Key suffixes up to and including comp_flush_count
# are used, so a parallel history flush must first increment this
#
# When compaction is complete and the final flush takes place,
# flush_count is reset to comp_flush_count, and comp_flush_count to -1
# def _flush_compaction(self, cursor, write_items, keys_to_delete):
# """Flush a single compaction pass as a batch."""
# # Update compaction state
# if cursor == 65536:
# self.flush_count = self.comp_flush_count
# self.comp_cursor = -1
# self.comp_flush_count = -1
# else:
# self.comp_cursor = cursor
#
# # History DB. Flush compacted history and updated state
# with self.db.write_batch() as batch:
# # Important: delete first! The keyspace may overlap.
# for key in keys_to_delete:
# batch.delete(HASHX_HISTORY_PREFIX + key)
# for key, value in write_items:
# batch.put(HASHX_HISTORY_PREFIX + key, value)
# self.write_state(batch)
# def _compact_hashX(self, hashX, hist_map, hist_list,
# write_items, keys_to_delete):
# """Compress history for a hashX. hist_list is an ordered list of
# the histories to be compressed."""
# # History entries (tx numbers) are 4 bytes each. Distribute
# # over rows of up to 50KB in size. A fixed row size means
# # future compactions will not need to update the first N - 1
# # rows.
# max_row_size = self.max_hist_row_entries * 4
# full_hist = b''.join(hist_list)
# nrows = (len(full_hist) + max_row_size - 1) // max_row_size
# if nrows > 4:
# self.logger.info('hashX {} is large: {:,d} entries across '
# '{:,d} rows'
# .format(hash_to_hex_str(hashX),
# len(full_hist) // 4, nrows))
#
# # Find what history needs to be written, and what keys need to
# # be deleted. Start by assuming all keys are to be deleted,
# # and then remove those that are the same on-disk as when
# # compacted.
# write_size = 0
# keys_to_delete.update(hist_map)
# for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
# key = hashX + pack_be_uint16(n)
# if hist_map.get(key) == chunk:
# keys_to_delete.remove(key)
# else:
# write_items.append((key, chunk))
# write_size += len(chunk)
#
# assert n + 1 == nrows
# self.comp_flush_count = max(self.comp_flush_count, n)
#
# return write_size
# def _compact_prefix(self, prefix, write_items, keys_to_delete):
# """Compact all history entries for hashXs beginning with the
# given prefix. Update keys_to_delete and write."""
# prior_hashX = None
# hist_map = {}
# hist_list = []
#
# key_len = HASHX_LEN + 2
# write_size = 0
# for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
# k = key[1:]
# # Ignore non-history entries
# if len(k) != key_len:
# continue
# hashX = k[:-2]
# if hashX != prior_hashX and prior_hashX:
# write_size += self._compact_hashX(prior_hashX, hist_map,
# hist_list, write_items,
# keys_to_delete)
# hist_map.clear()
# hist_list.clear()
# prior_hashX = hashX
# hist_map[k] = hist
# hist_list.append(hist)
#
# if prior_hashX:
# write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
# write_items, keys_to_delete)
# return write_size
# def _compact_history(self, limit):
# """Inner loop of history compaction. Loops until limit bytes have
# been processed.
# """
# fnord
# keys_to_delete = set()
# write_items = [] # A list of (key, value) pairs
# write_size = 0
#
# # Loop over 2-byte prefixes
# cursor = self.comp_cursor
# while write_size < limit and cursor < 65536:
# prefix = pack_be_uint16(cursor)
# write_size += self._compact_prefix(prefix, write_items,
# keys_to_delete)
# cursor += 1
#
# max_rows = self.comp_flush_count + 1
# self._flush_compaction(cursor, write_items, keys_to_delete)
#
# self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
# 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
# .format(len(write_items), write_size / 1000000,
# len(keys_to_delete), max_rows,
# 100 * cursor / 65536))
# return write_size
# def _cancel_compaction(self):
# if self.comp_cursor != -1:
# self.logger.warning('cancelling in-progress history compaction')
# self.comp_flush_count = -1
# self.comp_cursor = -1

View file

@ -18,9 +18,10 @@ import time
import zlib import zlib
import typing import typing
from typing import Optional, List, Tuple, Iterable from typing import Optional, List, Tuple, Iterable
from functools import partial
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right, bisect_left from bisect import bisect_right, bisect_left
from collections import namedtuple from collections import namedtuple, defaultdict
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -31,7 +32,6 @@ from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
from lbry.wallet.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@ -73,6 +73,7 @@ class LevelDB:
""" """
DB_VERSIONS = [6] DB_VERSIONS = [6]
HIST_DB_VERSIONS = [0]
class DBError(Exception): class DBError(Exception):
"""Raised on general DB errors generally indicating corruption.""" """Raised on general DB errors generally indicating corruption."""
@ -86,8 +87,14 @@ class LevelDB:
self.logger.info(f'switching current directory to {env.db_dir}') self.logger.info(f'switching current directory to {env.db_dir}')
self.db_class = db_class(env.db_dir, self.env.db_engine) self.db_class = db_class(env.db_dir, self.env.db_engine)
self.history = History()
self.db = None self.db = None
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
self.hist_unflushed_count = 0
self.hist_flush_count = 0
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.tx_counts = None self.tx_counts = None
self.headers = None self.headers = None
self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server') self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server')
@ -188,27 +195,27 @@ class LevelDB:
state = ast.literal_eval(state.decode()) state = ast.literal_eval(state.decode())
if not isinstance(state, dict): if not isinstance(state, dict):
raise RuntimeError('failed reading state from history DB') raise RuntimeError('failed reading state from history DB')
self.history.flush_count = state['flush_count'] self.hist_flush_count = state['flush_count']
self.history.comp_flush_count = state.get('comp_flush_count', -1) self.hist_comp_flush_count = state.get('comp_flush_count', -1)
self.history.comp_cursor = state.get('comp_cursor', -1) self.hist_comp_cursor = state.get('comp_cursor', -1)
self.history.db_version = state.get('db_version', 0) self.hist_db_version = state.get('db_version', 0)
else: else:
self.history.flush_count = 0 self.hist_flush_count = 0
self.history.comp_flush_count = -1 self.hist_comp_flush_count = -1
self.history.comp_cursor = -1 self.hist_comp_cursor = -1
self.history.db_version = max(self.DB_VERSIONS) self.hist_db_version = max(self.HIST_DB_VERSIONS)
self.logger.info(f'history DB version: {self.history.db_version}') self.logger.info(f'history DB version: {self.hist_db_version}')
if self.history.db_version not in self.DB_VERSIONS: if self.hist_db_version not in self.HIST_DB_VERSIONS:
msg = f'this software only handles DB versions {self.DB_VERSIONS}' msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}'
self.logger.error(msg) self.logger.error(msg)
raise RuntimeError(msg) raise RuntimeError(msg)
self.logger.info(f'flush count: {self.history.flush_count:,d}') self.logger.info(f'flush count: {self.hist_flush_count:,d}')
# self.history.clear_excess(self.utxo_flush_count) # self.history.clear_excess(self.utxo_flush_count)
# < might happen at end of compaction as both DBs cannot be # < might happen at end of compaction as both DBs cannot be
# updated atomically # updated atomically
if self.history.flush_count > self.utxo_flush_count: if self.hist_flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for ' self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...') 'excess history flushes...')
@ -221,15 +228,15 @@ class LevelDB:
self.logger.info(f'deleting {len(keys):,d} history entries') self.logger.info(f'deleting {len(keys):,d} history entries')
self.history.flush_count = self.utxo_flush_count self.hist_flush_count = self.utxo_flush_count
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
for key in keys: for key in keys:
batch.delete(HASHX_HISTORY_PREFIX + key) batch.delete(HASHX_HISTORY_PREFIX + key)
state = { state = {
'flush_count': self.history.flush_count, 'flush_count': self.hist_flush_count,
'comp_flush_count': self.history.comp_flush_count, 'comp_flush_count': self.hist_comp_flush_count,
'comp_cursor': self.history.comp_cursor, 'comp_cursor': self.hist_comp_cursor,
'db_version': self.history.db_version, 'db_version': self.hist_db_version,
} }
# History entries are not prefixed; the suffix \0\0 ensures we # History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with # look similar to other entries and aren't interfered with
@ -237,7 +244,7 @@ class LevelDB:
self.logger.info('deleted excess history entries') self.logger.info('deleted excess history entries')
self.utxo_flush_count = self.history.flush_count self.utxo_flush_count = self.hist_flush_count
min_height = self.min_undo_height(self.db_height) min_height = self.min_undo_height(self.db_height)
keys = [] keys = []
@ -328,7 +335,7 @@ class LevelDB:
assert not flush_data.adds assert not flush_data.adds
assert not flush_data.deletes assert not flush_data.deletes
assert not flush_data.undo_infos assert not flush_data.undo_infos
assert not self.history.unflushed assert not self.hist_unflushed
def flush_utxo_db(self, batch, flush_data): def flush_utxo_db(self, batch, flush_data):
"""Flush the cached DB writes and UTXO set to the batch.""" """Flush the cached DB writes and UTXO set to the batch."""
@ -369,23 +376,23 @@ class LevelDB:
f'{spend_count:,d} spends in ' f'{spend_count:,d} spends in '
f'{elapsed:.1f}s, committing...') f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.history.flush_count self.utxo_flush_count = self.hist_flush_count
self.db_height = flush_data.height self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip self.db_tip = flush_data.tip
def write_history_state(self, batch): def write_history_state(self, batch):
state = { state = {
'flush_count': self.history.flush_count, 'flush_count': self.hist_flush_count,
'comp_flush_count': self.history.comp_flush_count, 'comp_flush_count': self.hist_comp_flush_count,
'comp_cursor': self.history.comp_cursor, 'comp_cursor': self.hist_comp_cursor,
'db_version': self.db_version, 'db_version': self.db_version,
} }
# History entries are not prefixed; the suffix \0\0 ensures we # History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with # look similar to other entries and aren't interfered with
batch.put(HIST_STATE, repr(state).encode()) batch.put(HIST_STATE, repr(state).encode())
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are """Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos.""" flushed if flush_utxos."""
if flush_data.height == self.db_height: if flush_data.height == self.db_height:
@ -444,9 +451,9 @@ class LevelDB:
# Then history # Then history
self.history.flush_count += 1 self.hist_flush_count += 1
flush_id = pack_be_uint16(self.history.flush_count) flush_id = pack_be_uint16(self.hist_flush_count)
unflushed = self.history.unflushed unflushed = self.hist_unflushed
for hashX in sorted(unflushed): for hashX in sorted(unflushed):
key = hashX + flush_id key = hashX + flush_id
@ -454,14 +461,13 @@ class LevelDB:
self.write_history_state(batch) self.write_history_state(batch)
unflushed.clear() unflushed.clear()
self.history.unflushed_count = 0 self.hist_unflushed_count = 0
######################### #########################
# Flush state last as it reads the wall time. # Flush state last as it reads the wall time.
if flush_utxos: self.flush_utxo_db(batch, flush_data)
self.flush_utxo_db(batch, flush_data)
# self.flush_state(batch) # self.flush_state(batch)
# #
@ -481,7 +487,7 @@ class LevelDB:
# self.write_utxo_state(batch) # self.write_utxo_state(batch)
elapsed = self.last_flush - start_time elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took ' self.logger.info(f'flush #{self.hist_flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
@ -509,7 +515,7 @@ class LevelDB:
assert not flush_data.headers assert not flush_data.headers
assert not flush_data.block_txs assert not flush_data.block_txs
assert flush_data.height < self.db_height assert flush_data.height < self.db_height
assert not self.history.unflushed assert not self.hist_unflushed
start_time = time.time() start_time = time.time()
tx_delta = flush_data.tx_count - self.last_flush_tx_count tx_delta = flush_data.tx_count - self.last_flush_tx_count
@ -523,7 +529,7 @@ class LevelDB:
### ###
# Not certain this is needed, but it doesn't hurt # Not certain this is needed, but it doesn't hurt
self.history.flush_count += 1 self.hist_flush_count += 1
nremoves = 0 nremoves = 0
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
@ -556,13 +562,10 @@ class LevelDB:
self.last_flush = now self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch) self.write_utxo_state(batch)
self.logger.info(f'backing up removed {nremoves:,d} history entries') self.logger.info(f'backing up removed {nremoves:,d} history entries')
elapsed = self.last_flush - start_time elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. '
f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'Height {flush_data.height:,d} txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def raw_header(self, height): def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""