Compare commits
6 commits
master
...
combined-l
Author | SHA1 | Date | |
---|---|---|---|
|
7c96fd92c2 | ||
|
e213ec6525 | ||
|
0398d8a897 | ||
|
c611ab6423 | ||
|
e2864b76dc | ||
|
d3ac65e431 |
4 changed files with 411 additions and 590 deletions
|
@ -1,17 +1,22 @@
|
|||
import time
|
||||
import asyncio
|
||||
import typing
|
||||
from struct import pack, unpack
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from typing import Optional
|
||||
from typing import Optional, List, Tuple
|
||||
from prometheus_client import Gauge, Histogram
|
||||
import lbry
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.wallet.server.tx import Tx
|
||||
from lbry.wallet.server.db.writer import SQLDB
|
||||
from lbry.wallet.server.daemon import DaemonError
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.util import chunks, class_logger
|
||||
from lbry.wallet.server.leveldb import FlushData
|
||||
from lbry.wallet.transaction import Transaction
|
||||
from lbry.wallet.server.udp import StatusServer
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
|
||||
|
||||
class Prefetcher:
|
||||
|
@ -153,7 +158,7 @@ class BlockProcessor:
|
|||
"reorg_count", "Number of reorgs", namespace=NAMESPACE
|
||||
)
|
||||
|
||||
def __init__(self, env, db, daemon, notifications):
|
||||
def __init__(self, env, db: 'LevelDB', daemon, notifications):
|
||||
self.env = env
|
||||
self.db = db
|
||||
self.daemon = daemon
|
||||
|
@ -213,13 +218,10 @@ class BlockProcessor:
|
|||
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
|
||||
|
||||
if hprevs == chain:
|
||||
|
||||
start = time.perf_counter()
|
||||
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
|
||||
for cache in self.search_cache.values():
|
||||
cache.clear()
|
||||
self.history_cache.clear()
|
||||
self.notifications.notified_mempool_txs.clear()
|
||||
await self._maybe_flush()
|
||||
|
||||
processed_time = time.perf_counter() - start
|
||||
self.block_count_metric.set(self.height)
|
||||
self.block_update_time_metric.observe(processed_time)
|
||||
|
@ -252,7 +254,6 @@ class BlockProcessor:
|
|||
else:
|
||||
self.logger.info(f'faking a reorg of {count:,d} blocks')
|
||||
|
||||
|
||||
async def get_raw_blocks(last_height, hex_hashes):
|
||||
heights = range(last_height, last_height - len(hex_hashes), -1)
|
||||
try:
|
||||
|
@ -262,24 +263,17 @@ class BlockProcessor:
|
|||
except FileNotFoundError:
|
||||
return await self.daemon.raw_blocks(hex_hashes)
|
||||
|
||||
def flush_backup():
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
self.db.flush_backup(self.flush_data(), self.touched)
|
||||
|
||||
try:
|
||||
await self.flush(True)
|
||||
|
||||
start, last, hashes = await self.reorg_hashes(count)
|
||||
# Reverse and convert to hex strings.
|
||||
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
|
||||
self.logger.info("reorg %i block hashes", len(hashes))
|
||||
|
||||
for hex_hashes in chunks(hashes, 50):
|
||||
raw_blocks = await get_raw_blocks(last, hex_hashes)
|
||||
self.logger.info("got %i raw blocks", len(raw_blocks))
|
||||
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
|
||||
await self.run_in_thread_with_lock(flush_backup)
|
||||
last -= len(raw_blocks)
|
||||
|
||||
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
|
||||
|
@ -358,8 +352,7 @@ class BlockProcessor:
|
|||
|
||||
async def flush(self, flush_utxos):
|
||||
def flush():
|
||||
self.db.flush_dbs(self.flush_data(), flush_utxos,
|
||||
self.estimate_txs_remaining)
|
||||
self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining)
|
||||
await self.run_in_thread_with_lock(flush)
|
||||
|
||||
async def _maybe_flush(self):
|
||||
|
@ -378,7 +371,7 @@ class BlockProcessor:
|
|||
one_MB = 1000*1000
|
||||
utxo_cache_size = len(self.utxo_cache) * 205
|
||||
db_deletes_size = len(self.db_deletes) * 57
|
||||
hist_cache_size = self.db.history.unflushed_memsize()
|
||||
hist_cache_size = len(self.db.hist_unflushed) * 180 + self.db.hist_unflushed_count * 4
|
||||
# Roughly ntxs * 32 + nblocks * 42
|
||||
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
|
||||
+ (self.height - self.db.fs_height) * 42)
|
||||
|
@ -420,13 +413,20 @@ class BlockProcessor:
|
|||
self.headers.extend(headers)
|
||||
self.tip = self.coin.header_hash(headers[-1])
|
||||
|
||||
def advance_txs(self, height, txs, header, block_hash):
|
||||
self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining)
|
||||
|
||||
for cache in self.search_cache.values():
|
||||
cache.clear()
|
||||
self.history_cache.clear()
|
||||
self.notifications.notified_mempool_txs.clear()
|
||||
|
||||
def advance_txs(self, height, txs: List[Tuple[Tx, bytes]], header, block_hash):
|
||||
self.block_hashes.append(block_hash)
|
||||
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
|
||||
|
||||
undo_info = []
|
||||
tx_num = self.tx_count
|
||||
hashXs_by_tx = []
|
||||
tx_num = self.tx_count
|
||||
|
||||
# Use local vars for speed in the loops
|
||||
put_utxo = self.utxo_cache.__setitem__
|
||||
|
@ -462,7 +462,15 @@ class BlockProcessor:
|
|||
self.db.total_transactions.append(tx_hash)
|
||||
tx_num += 1
|
||||
|
||||
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
first_tx_num = self.tx_count
|
||||
_unflushed = self.db.hist_unflushed
|
||||
_count = 0
|
||||
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
for _hashX in set(_hashXs):
|
||||
_unflushed[_hashX].append(_tx_num)
|
||||
_count += len(_hashXs)
|
||||
self.db.hist_unflushed_count += _count
|
||||
self.tx_count = tx_num
|
||||
self.db.tx_counts.append(tx_num)
|
||||
|
||||
|
@ -493,6 +501,10 @@ class BlockProcessor:
|
|||
self.height -= 1
|
||||
self.db.tx_counts.pop()
|
||||
|
||||
# self.touched can include other addresses which is
|
||||
# harmless, but remove None.
|
||||
self.touched.discard(None)
|
||||
self.db.flush_backup(self.flush_data(), self.touched)
|
||||
self.logger.info(f'backed up to height {self.height:,d}')
|
||||
|
||||
def backup_txs(self, txs):
|
||||
|
@ -603,11 +615,11 @@ class BlockProcessor:
|
|||
# Value: hashX
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
candidates = {db_key: hashX for db_key, hashX
|
||||
in self.db.utxo_db.iterator(prefix=prefix)}
|
||||
in self.db.db.iterator(prefix=prefix)}
|
||||
|
||||
for hdb_key, hashX in candidates.items():
|
||||
tx_num_packed = hdb_key[-4:]
|
||||
if len(candidates) > 1:
|
||||
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
try:
|
||||
hash, height = self.db.fs_tx_hash(tx_num)
|
||||
|
@ -622,7 +634,7 @@ class BlockProcessor:
|
|||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
udb_key = b'u' + hashX + hdb_key[-6:]
|
||||
utxo_value_packed = self.db.utxo_db.get(udb_key)
|
||||
utxo_value_packed = self.db.db.get(udb_key)
|
||||
if utxo_value_packed is None:
|
||||
self.logger.warning(
|
||||
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX)
|
||||
|
@ -631,6 +643,7 @@ class BlockProcessor:
|
|||
# Remove both entries for this UTXO
|
||||
self.db_deletes.append(hdb_key)
|
||||
self.db_deletes.append(udb_key)
|
||||
|
||||
return hashX + tx_num_packed + utxo_value_packed
|
||||
|
||||
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')
|
||||
|
|
|
@ -1,324 +0,0 @@
|
|||
# Copyright (c) 2016-2018, Neil Booth
|
||||
# Copyright (c) 2017, the ElectrumX authors
|
||||
#
|
||||
# All rights reserved.
|
||||
#
|
||||
# See the file "LICENCE" for information about the copyright
|
||||
# and warranty status of this software.
|
||||
|
||||
"""History by script hash (address)."""
|
||||
|
||||
import array
|
||||
import ast
|
||||
import bisect
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
|
||||
from lbry.wallet.server import util
|
||||
from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
|
||||
|
||||
class History:
|
||||
|
||||
DB_VERSIONS = [0]
|
||||
|
||||
def __init__(self):
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
# For history compaction
|
||||
self.max_hist_row_entries = 12500
|
||||
self.unflushed = defaultdict(partial(array.array, 'I'))
|
||||
self.unflushed_count = 0
|
||||
self.db = None
|
||||
|
||||
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
|
||||
self.db = db_class('hist', for_sync)
|
||||
self.read_state()
|
||||
self.clear_excess(utxo_flush_count)
|
||||
# An incomplete compaction needs to be cancelled otherwise
|
||||
# restarting it will corrupt the history
|
||||
if not compacting:
|
||||
self._cancel_compaction()
|
||||
return self.flush_count
|
||||
|
||||
def close_db(self):
|
||||
if self.db:
|
||||
self.db.close()
|
||||
self.db = None
|
||||
|
||||
def read_state(self):
|
||||
state = self.db.get(b'state\0\0')
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise RuntimeError('failed reading state from history DB')
|
||||
self.flush_count = state['flush_count']
|
||||
self.comp_flush_count = state.get('comp_flush_count', -1)
|
||||
self.comp_cursor = state.get('comp_cursor', -1)
|
||||
self.db_version = state.get('db_version', 0)
|
||||
else:
|
||||
self.flush_count = 0
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
||||
self.db_version = max(self.DB_VERSIONS)
|
||||
|
||||
self.logger.info(f'history DB version: {self.db_version}')
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
|
||||
self.logger.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
self.logger.info(f'flush count: {self.flush_count:,d}')
|
||||
|
||||
def clear_excess(self, utxo_flush_count):
|
||||
# < might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
if self.flush_count <= utxo_flush_count:
|
||||
return
|
||||
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=b''):
|
||||
flush_id, = unpack_be_uint16_from(key[-2:])
|
||||
if flush_id > utxo_flush_count:
|
||||
keys.append(key)
|
||||
|
||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
|
||||
self.flush_count = utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.write_state(batch)
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
|
||||
def write_state(self, batch):
|
||||
"""Write state to the history DB."""
|
||||
state = {
|
||||
'flush_count': self.flush_count,
|
||||
'comp_flush_count': self.comp_flush_count,
|
||||
'comp_cursor': self.comp_cursor,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(b'state\0\0', repr(state).encode())
|
||||
|
||||
def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
unflushed = self.unflushed
|
||||
count = 0
|
||||
for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
hashXs = set(hashXs)
|
||||
for hashX in hashXs:
|
||||
unflushed[hashX].append(tx_num)
|
||||
count += len(hashXs)
|
||||
self.unflushed_count += count
|
||||
|
||||
def unflushed_memsize(self):
|
||||
return len(self.unflushed) * 180 + self.unflushed_count * 4
|
||||
|
||||
def assert_flushed(self):
|
||||
assert not self.unflushed
|
||||
|
||||
def flush(self):
|
||||
start_time = time.time()
|
||||
self.flush_count += 1
|
||||
flush_id = pack_be_uint16(self.flush_count)
|
||||
unflushed = self.unflushed
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch.put(key, unflushed[hashX].tobytes())
|
||||
self.write_state(batch)
|
||||
|
||||
count = len(unflushed)
|
||||
unflushed.clear()
|
||||
self.unflushed_count = 0
|
||||
|
||||
if self.db.for_sync:
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed history in {elapsed:.1f}s '
|
||||
f'for {count:,d} addrs')
|
||||
|
||||
def backup(self, hashXs, tx_count):
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.flush_count += 1
|
||||
nremoves = 0
|
||||
bisect_left = bisect.bisect_left
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
for hashX in sorted(hashXs):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= tx_count
|
||||
idx = bisect_left(a, tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[key] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(key)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
for key, value in puts.items():
|
||||
batch.put(key, value)
|
||||
self.write_state(batch)
|
||||
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
|
||||
# def get_txnums(self, hashX, limit=1000):
|
||||
# """Generator that returns an unpruned, sorted list of tx_nums in the
|
||||
# history of a hashX. Includes both spending and receiving
|
||||
# transactions. By default yields at most 1000 entries. Set
|
||||
# limit to None to get them all. """
|
||||
# limit = util.resolve_limit(limit)
|
||||
# for key, hist in self.db.iterator(prefix=hashX):
|
||||
# a = array.array('I')
|
||||
# a.frombytes(hist)
|
||||
# for tx_num in a:
|
||||
# if limit == 0:
|
||||
# return
|
||||
# yield tx_num
|
||||
# limit -= 1
|
||||
|
||||
#
|
||||
# History compaction
|
||||
#
|
||||
|
||||
# comp_cursor is a cursor into compaction progress.
|
||||
# -1: no compaction in progress
|
||||
# 0-65535: Compaction in progress; all prefixes < comp_cursor have
|
||||
# been compacted, and later ones have not.
|
||||
# 65536: compaction complete in-memory but not flushed
|
||||
#
|
||||
# comp_flush_count applies during compaction, and is a flush count
|
||||
# for history with prefix < comp_cursor. flush_count applies
|
||||
# to still uncompacted history. It is -1 when no compaction is
|
||||
# taking place. Key suffixes up to and including comp_flush_count
|
||||
# are used, so a parallel history flush must first increment this
|
||||
#
|
||||
# When compaction is complete and the final flush takes place,
|
||||
# flush_count is reset to comp_flush_count, and comp_flush_count to -1
|
||||
|
||||
def _flush_compaction(self, cursor, write_items, keys_to_delete):
|
||||
"""Flush a single compaction pass as a batch."""
|
||||
# Update compaction state
|
||||
if cursor == 65536:
|
||||
self.flush_count = self.comp_flush_count
|
||||
self.comp_cursor = -1
|
||||
self.comp_flush_count = -1
|
||||
else:
|
||||
self.comp_cursor = cursor
|
||||
|
||||
# History DB. Flush compacted history and updated state
|
||||
with self.db.write_batch() as batch:
|
||||
# Important: delete first! The keyspace may overlap.
|
||||
for key in keys_to_delete:
|
||||
batch.delete(key)
|
||||
for key, value in write_items:
|
||||
batch.put(key, value)
|
||||
self.write_state(batch)
|
||||
|
||||
def _compact_hashX(self, hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete):
|
||||
"""Compress history for a hashX. hist_list is an ordered list of
|
||||
the histories to be compressed."""
|
||||
# History entries (tx numbers) are 4 bytes each. Distribute
|
||||
# over rows of up to 50KB in size. A fixed row size means
|
||||
# future compactions will not need to update the first N - 1
|
||||
# rows.
|
||||
max_row_size = self.max_hist_row_entries * 4
|
||||
full_hist = b''.join(hist_list)
|
||||
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
|
||||
if nrows > 4:
|
||||
self.logger.info('hashX {} is large: {:,d} entries across '
|
||||
'{:,d} rows'
|
||||
.format(hash_to_hex_str(hashX),
|
||||
len(full_hist) // 4, nrows))
|
||||
|
||||
# Find what history needs to be written, and what keys need to
|
||||
# be deleted. Start by assuming all keys are to be deleted,
|
||||
# and then remove those that are the same on-disk as when
|
||||
# compacted.
|
||||
write_size = 0
|
||||
keys_to_delete.update(hist_map)
|
||||
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
||||
key = hashX + pack_be_uint16(n)
|
||||
if hist_map.get(key) == chunk:
|
||||
keys_to_delete.remove(key)
|
||||
else:
|
||||
write_items.append((key, chunk))
|
||||
write_size += len(chunk)
|
||||
|
||||
assert n + 1 == nrows
|
||||
self.comp_flush_count = max(self.comp_flush_count, n)
|
||||
|
||||
return write_size
|
||||
|
||||
def _compact_prefix(self, prefix, write_items, keys_to_delete):
|
||||
"""Compact all history entries for hashXs beginning with the
|
||||
given prefix. Update keys_to_delete and write."""
|
||||
prior_hashX = None
|
||||
hist_map = {}
|
||||
hist_list = []
|
||||
|
||||
key_len = HASHX_LEN + 2
|
||||
write_size = 0
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
# Ignore non-history entries
|
||||
if len(key) != key_len:
|
||||
continue
|
||||
hashX = key[:-2]
|
||||
if hashX != prior_hashX and prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map,
|
||||
hist_list, write_items,
|
||||
keys_to_delete)
|
||||
hist_map.clear()
|
||||
hist_list.clear()
|
||||
prior_hashX = hashX
|
||||
hist_map[key] = hist
|
||||
hist_list.append(hist)
|
||||
|
||||
if prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete)
|
||||
return write_size
|
||||
|
||||
def _compact_history(self, limit):
|
||||
"""Inner loop of history compaction. Loops until limit bytes have
|
||||
been processed.
|
||||
"""
|
||||
keys_to_delete = set()
|
||||
write_items = [] # A list of (key, value) pairs
|
||||
write_size = 0
|
||||
|
||||
# Loop over 2-byte prefixes
|
||||
cursor = self.comp_cursor
|
||||
while write_size < limit and cursor < 65536:
|
||||
prefix = pack_be_uint16(cursor)
|
||||
write_size += self._compact_prefix(prefix, write_items,
|
||||
keys_to_delete)
|
||||
cursor += 1
|
||||
|
||||
max_rows = self.comp_flush_count + 1
|
||||
self._flush_compaction(cursor, write_items, keys_to_delete)
|
||||
|
||||
self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
|
||||
'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
|
||||
.format(len(write_items), write_size / 1000000,
|
||||
len(keys_to_delete), max_rows,
|
||||
100 * cursor / 65536))
|
||||
return write_size
|
||||
|
||||
def _cancel_compaction(self):
|
||||
if self.comp_cursor != -1:
|
||||
self.logger.warning('cancelling in-progress history compaction')
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
|
@ -17,9 +17,10 @@ import time
|
|||
import zlib
|
||||
import typing
|
||||
from typing import Optional, List, Tuple, Iterable
|
||||
from functools import partial
|
||||
from asyncio import sleep
|
||||
from bisect import bisect_right
|
||||
from collections import namedtuple
|
||||
from bisect import bisect_right, bisect_left
|
||||
from collections import namedtuple, defaultdict
|
||||
from glob import glob
|
||||
from struct import pack, unpack
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
|
@ -28,19 +29,25 @@ from lbry.utils import LRUCacheWithMetrics
|
|||
from lbry.wallet.server import util
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
||||
from lbry.wallet.server.util import formatted_time
|
||||
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
|
||||
from lbry.wallet.server.storage import db_class
|
||||
from lbry.wallet.server.history import History
|
||||
|
||||
|
||||
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
|
||||
HEADER_PREFIX = b'H'
|
||||
TX_COUNT_PREFIX = b'T'
|
||||
TX_HASH_PREFIX = b'X'
|
||||
HISTORY_PREFIX = b'A'
|
||||
TX_PREFIX = b'B'
|
||||
TX_NUM_PREFIX = b'N'
|
||||
BLOCK_HASH_PREFIX = b'C'
|
||||
HEADER_PREFIX = b'H'
|
||||
TX_NUM_PREFIX = b'N'
|
||||
TX_COUNT_PREFIX = b'T'
|
||||
UNDO_PREFIX = b'U'
|
||||
TX_HASH_PREFIX = b'X'
|
||||
|
||||
HASHX_UTXO_PREFIX = b'h'
|
||||
HIST_STATE = b'state-hist'
|
||||
UTXO_STATE = b'state-utxo'
|
||||
UTXO_PREFIX = b'u'
|
||||
HASHX_HISTORY_PREFIX = b'x'
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
|
@ -65,6 +72,7 @@ class LevelDB:
|
|||
"""
|
||||
|
||||
DB_VERSIONS = [6]
|
||||
HIST_DB_VERSIONS = [0]
|
||||
|
||||
class DBError(Exception):
|
||||
"""Raised on general DB errors generally indicating corruption."""
|
||||
|
@ -78,8 +86,14 @@ class LevelDB:
|
|||
self.logger.info(f'switching current directory to {env.db_dir}')
|
||||
|
||||
self.db_class = db_class(env.db_dir, self.env.db_engine)
|
||||
self.history = History()
|
||||
self.utxo_db = None
|
||||
self.db = None
|
||||
|
||||
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
|
||||
self.hist_unflushed_count = 0
|
||||
self.hist_flush_count = 0
|
||||
self.hist_comp_flush_count = -1
|
||||
self.hist_comp_cursor = -1
|
||||
|
||||
self.tx_counts = None
|
||||
self.headers = None
|
||||
self.last_flush = time.time()
|
||||
|
@ -96,6 +110,19 @@ class LevelDB:
|
|||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||
self.total_transactions = None
|
||||
|
||||
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
# unflushed = self.history.unflushed
|
||||
# count = 0
|
||||
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
# hashXs = set(hashXs)
|
||||
# for hashX in hashXs:
|
||||
# unflushed[hashX].append(tx_num)
|
||||
# count += len(hashXs)
|
||||
# self.history.unflushed_count += count
|
||||
|
||||
# def unflushed_memsize(self):
|
||||
# return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4
|
||||
|
||||
async def _read_tx_counts(self):
|
||||
if self.tx_counts is not None:
|
||||
return
|
||||
|
@ -105,7 +132,7 @@ class LevelDB:
|
|||
def get_counts():
|
||||
return tuple(
|
||||
util.unpack_be_uint64(tx_count)
|
||||
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
|
||||
for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
|
||||
)
|
||||
|
||||
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
|
||||
|
@ -120,7 +147,7 @@ class LevelDB:
|
|||
|
||||
async def _read_txids(self):
|
||||
def get_txids():
|
||||
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
|
||||
return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
|
||||
|
||||
start = time.perf_counter()
|
||||
self.logger.info("loading txids")
|
||||
|
@ -136,7 +163,7 @@ class LevelDB:
|
|||
|
||||
def get_headers():
|
||||
return [
|
||||
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False)
|
||||
header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False)
|
||||
]
|
||||
|
||||
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
|
||||
|
@ -152,31 +179,97 @@ class LevelDB:
|
|||
f.write(f'ElectrumX databases and metadata for '
|
||||
f'{self.coin.NAME} {self.coin.NET}'.encode())
|
||||
|
||||
assert self.headers_db is None
|
||||
self.headers_db = self.db_class('headers', for_sync)
|
||||
if self.headers_db.is_new:
|
||||
self.logger.info('created new headers db')
|
||||
self.logger.info(f'opened headers DB (for sync: {for_sync})')
|
||||
assert self.db is None
|
||||
self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
|
||||
if self.db.is_new:
|
||||
self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
|
||||
self.logger.info(f'opened DB (for sync: {for_sync})')
|
||||
|
||||
assert self.tx_db is None
|
||||
self.tx_db = self.db_class('tx', for_sync)
|
||||
if self.tx_db.is_new:
|
||||
self.logger.info('created new tx db')
|
||||
self.logger.info(f'opened tx DB (for sync: {for_sync})')
|
||||
|
||||
assert self.utxo_db is None
|
||||
# First UTXO DB
|
||||
self.utxo_db = self.db_class('utxo', for_sync)
|
||||
if self.utxo_db.is_new:
|
||||
self.logger.info('created new utxo db')
|
||||
self.logger.info(f'opened utxo db (for sync: {for_sync})')
|
||||
self.read_utxo_state()
|
||||
|
||||
# Then history DB
|
||||
self.utxo_flush_count = self.history.open_db(
|
||||
self.db_class, for_sync, self.utxo_flush_count, compacting
|
||||
)
|
||||
self.clear_excess_undo_info()
|
||||
state = self.db.get(HIST_STATE)
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise RuntimeError('failed reading state from history DB')
|
||||
self.hist_flush_count = state['flush_count']
|
||||
self.hist_comp_flush_count = state.get('comp_flush_count', -1)
|
||||
self.hist_comp_cursor = state.get('comp_cursor', -1)
|
||||
self.hist_db_version = state.get('db_version', 0)
|
||||
else:
|
||||
self.hist_flush_count = 0
|
||||
self.hist_comp_flush_count = -1
|
||||
self.hist_comp_cursor = -1
|
||||
self.hist_db_version = max(self.HIST_DB_VERSIONS)
|
||||
|
||||
self.logger.info(f'history DB version: {self.hist_db_version}')
|
||||
if self.hist_db_version not in self.HIST_DB_VERSIONS:
|
||||
msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}'
|
||||
self.logger.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
|
||||
|
||||
# self.history.clear_excess(self.utxo_flush_count)
|
||||
# < might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
if self.hist_flush_count > self.utxo_flush_count:
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||
k = key[1:]
|
||||
flush_id, = unpack_be_uint16_from(k[-2:])
|
||||
if flush_id > self.utxo_flush_count:
|
||||
keys.append(k)
|
||||
|
||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
|
||||
self.hist_flush_count = self.utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
state = {
|
||||
'flush_count': self.hist_flush_count,
|
||||
'comp_flush_count': self.hist_comp_flush_count,
|
||||
'comp_cursor': self.hist_comp_cursor,
|
||||
'db_version': self.hist_db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
keys.append(key)
|
||||
|
||||
if keys:
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
||||
|
||||
# delete old block files
|
||||
prefix = self.raw_block_prefix()
|
||||
paths = [path for path in glob(f'{prefix}[0-9]*')
|
||||
if len(path) > len(prefix)
|
||||
and int(path[len(prefix):]) < min_height]
|
||||
if paths:
|
||||
for path in paths:
|
||||
try:
|
||||
os.remove(path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
self.logger.info(f'deleted {len(paths):,d} stale block files')
|
||||
|
||||
# Read TX counts (requires meta directory)
|
||||
await self._read_tx_counts()
|
||||
|
@ -185,10 +278,7 @@ class LevelDB:
|
|||
await self._read_headers()
|
||||
|
||||
def close(self):
|
||||
self.utxo_db.close()
|
||||
self.history.close_db()
|
||||
self.headers_db.close()
|
||||
self.tx_db.close()
|
||||
self.db.close()
|
||||
self.executor.shutdown(wait=True)
|
||||
self.executor = None
|
||||
|
||||
|
@ -209,18 +299,12 @@ class LevelDB:
|
|||
"""Open the databases for serving. If they are already open they are
|
||||
closed first.
|
||||
"""
|
||||
self.logger.info('closing DBs to re-open for serving')
|
||||
if self.utxo_db:
|
||||
self.logger.info('closing DBs to re-open for serving')
|
||||
self.utxo_db.close()
|
||||
self.history.close_db()
|
||||
self.utxo_db = None
|
||||
if self.headers_db:
|
||||
self.headers_db.close()
|
||||
self.headers_db = None
|
||||
if self.tx_db:
|
||||
self.tx_db.close()
|
||||
self.tx_db = None
|
||||
if self.db:
|
||||
return
|
||||
# self.logger.info('closing DBs to re-open for serving')
|
||||
# self.db.close()
|
||||
# self.history.close_db()
|
||||
# self.db = None
|
||||
|
||||
await self._open_dbs(False, False)
|
||||
self.logger.info("opened for serving")
|
||||
|
@ -249,107 +333,7 @@ class LevelDB:
|
|||
assert not flush_data.adds
|
||||
assert not flush_data.deletes
|
||||
assert not flush_data.undo_infos
|
||||
self.history.assert_flushed()
|
||||
|
||||
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
|
||||
"""Flush out cached state. History is always flushed; UTXOs are
|
||||
flushed if flush_utxos."""
|
||||
if flush_data.height == self.db_height:
|
||||
self.assert_flushed(flush_data)
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
prior_flush = self.last_flush
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
|
||||
# Flush to file system
|
||||
self.flush_fs(flush_data)
|
||||
|
||||
# Then history
|
||||
self.flush_history()
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
if flush_utxos:
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
self.flush_state(batch)
|
||||
|
||||
# Update and put the wall time again - otherwise we drop the
|
||||
# time it took to commit the batch
|
||||
self.flush_state(self.utxo_db)
|
||||
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'flush #{self.history.flush_count:,d} took '
|
||||
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
|
||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
# Catch-up stats
|
||||
if self.utxo_db.for_sync:
|
||||
flush_interval = self.last_flush - prior_flush
|
||||
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
|
||||
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
|
||||
eta = estimate_txs_remaining() / tx_per_sec_last
|
||||
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
|
||||
f'since last flush: {tx_per_sec_last:,d}')
|
||||
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
|
||||
f'ETA: {formatted_time(eta)}')
|
||||
|
||||
def flush_fs(self, flush_data):
|
||||
"""Write headers, tx counts and block tx hashes to the filesystem.
|
||||
|
||||
The first height to write is self.fs_height + 1. The FS
|
||||
metadata is all append-only, so in a crash we just pick up
|
||||
again from the height stored in the DB.
|
||||
"""
|
||||
prior_tx_count = (self.tx_counts[self.fs_height]
|
||||
if self.fs_height >= 0 else 0)
|
||||
assert len(flush_data.block_txs) == len(flush_data.headers)
|
||||
assert flush_data.height == self.fs_height + len(flush_data.headers)
|
||||
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
|
||||
else 0)
|
||||
assert len(self.tx_counts) == flush_data.height + 1
|
||||
assert len(
|
||||
b''.join(hashes for hashes, _ in flush_data.block_txs)
|
||||
) // 32 == flush_data.tx_count - prior_tx_count
|
||||
|
||||
# Write the headers
|
||||
start_time = time.perf_counter()
|
||||
|
||||
with self.headers_db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
for i, header in enumerate(flush_data.headers):
|
||||
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
|
||||
self.headers.append(header)
|
||||
flush_data.headers.clear()
|
||||
|
||||
height_start = self.fs_height + 1
|
||||
tx_num = prior_tx_count
|
||||
|
||||
with self.tx_db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
|
||||
tx_count = self.tx_counts[height_start]
|
||||
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
|
||||
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
height_start += 1
|
||||
offset = 0
|
||||
while offset < len(tx_hashes):
|
||||
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
|
||||
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num))
|
||||
batch_put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32])
|
||||
tx_num += 1
|
||||
offset += 32
|
||||
|
||||
flush_data.block_txs.clear()
|
||||
flush_data.block_hashes.clear()
|
||||
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
elapsed = time.perf_counter() - start_time
|
||||
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
|
||||
|
||||
def flush_history(self):
|
||||
self.history.flush()
|
||||
assert not self.hist_unflushed
|
||||
|
||||
def flush_utxo_db(self, batch, flush_data):
|
||||
"""Flush the cached DB writes and UTXO set to the batch."""
|
||||
|
@ -372,15 +356,16 @@ class LevelDB:
|
|||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(b'h' + key[:4] + suffix, hashX)
|
||||
batch_put(b'u' + hashX + suffix, value[-8:])
|
||||
batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
|
||||
batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
# New undo information
|
||||
self.flush_undo_infos(batch_put, flush_data.undo_infos)
|
||||
for undo_info, height in flush_data.undo_infos:
|
||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
if self.utxo_db.for_sync:
|
||||
if self.db.for_sync:
|
||||
block_count = flush_data.height - self.db_height
|
||||
tx_count = flush_data.tx_count - self.db_tx_count
|
||||
elapsed = time.time() - start_time
|
||||
|
@ -389,49 +374,196 @@ class LevelDB:
|
|||
f'{spend_count:,d} spends in '
|
||||
f'{elapsed:.1f}s, committing...')
|
||||
|
||||
self.utxo_flush_count = self.history.flush_count
|
||||
self.utxo_flush_count = self.hist_flush_count
|
||||
self.db_height = flush_data.height
|
||||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
def flush_state(self, batch):
|
||||
"""Flush chain state to the batch."""
|
||||
def write_history_state(self, batch):
|
||||
state = {
|
||||
'flush_count': self.hist_flush_count,
|
||||
'comp_flush_count': self.hist_comp_flush_count,
|
||||
'comp_cursor': self.hist_comp_cursor,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
|
||||
"""Flush out cached state. History is always flushed; UTXOs are
|
||||
flushed if flush_utxos."""
|
||||
if flush_data.height == self.db_height:
|
||||
self.assert_flushed(flush_data)
|
||||
return
|
||||
|
||||
start_time = time.time()
|
||||
prior_flush = self.last_flush
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
|
||||
# Flush to file system
|
||||
# self.flush_fs(flush_data)
|
||||
prior_tx_count = (self.tx_counts[self.fs_height]
|
||||
if self.fs_height >= 0 else 0)
|
||||
|
||||
assert len(flush_data.block_txs) == len(flush_data.headers)
|
||||
assert flush_data.height == self.fs_height + len(flush_data.headers)
|
||||
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
|
||||
else 0)
|
||||
assert len(self.tx_counts) == flush_data.height + 1
|
||||
assert len(
|
||||
b''.join(hashes for hashes, _ in flush_data.block_txs)
|
||||
) // 32 == flush_data.tx_count - prior_tx_count
|
||||
|
||||
|
||||
# Write the headers
|
||||
start_time = time.perf_counter()
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
batch_put = batch.put
|
||||
height_start = self.fs_height + 1
|
||||
tx_num = prior_tx_count
|
||||
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
|
||||
batch_put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
|
||||
self.headers.append(header)
|
||||
tx_count = self.tx_counts[height_start]
|
||||
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
|
||||
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
|
||||
height_start += 1
|
||||
offset = 0
|
||||
while offset < len(tx_hashes):
|
||||
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
|
||||
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
|
||||
batch_put(TX_PREFIX + tx_hashes[offset:offset + 32], txs[offset // 32])
|
||||
|
||||
tx_num += 1
|
||||
offset += 32
|
||||
flush_data.headers.clear()
|
||||
flush_data.block_txs.clear()
|
||||
flush_data.block_hashes.clear()
|
||||
# flush_data.claim_txo_cache.clear()
|
||||
# flush_data.support_txo_cache.clear()
|
||||
|
||||
self.fs_height = flush_data.height
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
|
||||
|
||||
# Then history
|
||||
self.hist_flush_count += 1
|
||||
flush_id = pack_be_uint16(self.hist_flush_count)
|
||||
unflushed = self.hist_unflushed
|
||||
|
||||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
|
||||
self.write_history_state(batch)
|
||||
|
||||
unflushed.clear()
|
||||
self.hist_unflushed_count = 0
|
||||
|
||||
|
||||
#########################
|
||||
|
||||
# Flush state last as it reads the wall time.
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
|
||||
# self.flush_state(batch)
|
||||
#
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
# # Update and put the wall time again - otherwise we drop the
|
||||
# # time it took to commit the batch
|
||||
# # self.flush_state(self.db)
|
||||
# now = time.time()
|
||||
# self.wall_time += now - self.last_flush
|
||||
# self.last_flush = now
|
||||
# self.last_flush_tx_count = self.fs_tx_count
|
||||
# self.write_utxo_state(batch)
|
||||
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'flush #{self.hist_flush_count:,d} took '
|
||||
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
|
||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
# Catch-up stats
|
||||
if self.db.for_sync:
|
||||
flush_interval = self.last_flush - prior_flush
|
||||
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
|
||||
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
|
||||
eta = estimate_txs_remaining() / tx_per_sec_last
|
||||
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
|
||||
f'since last flush: {tx_per_sec_last:,d}')
|
||||
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
|
||||
f'ETA: {formatted_time(eta)}')
|
||||
|
||||
# def flush_state(self, batch):
|
||||
# """Flush chain state to the batch."""
|
||||
# now = time.time()
|
||||
# self.wall_time += now - self.last_flush
|
||||
# self.last_flush = now
|
||||
# self.last_flush_tx_count = self.fs_tx_count
|
||||
# self.write_utxo_state(batch)
|
||||
|
||||
def flush_backup(self, flush_data, touched):
|
||||
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
|
||||
assert not flush_data.headers
|
||||
assert not flush_data.block_txs
|
||||
assert flush_data.height < self.db_height
|
||||
self.history.assert_flushed()
|
||||
assert not self.hist_unflushed
|
||||
|
||||
start_time = time.time()
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
|
||||
self.backup_fs(flush_data.height, flush_data.tx_count)
|
||||
self.history.backup(touched, flush_data.tx_count)
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
# Flush state last as it reads the wall time.
|
||||
self.flush_state(batch)
|
||||
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
|
||||
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
|
||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
def backup_fs(self, height, tx_count):
|
||||
"""Back up during a reorg. This just updates our pointers."""
|
||||
while self.fs_height > height:
|
||||
###
|
||||
while self.fs_height > flush_data.height:
|
||||
self.fs_height -= 1
|
||||
self.headers.pop()
|
||||
self.fs_tx_count = tx_count
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
# Truncate header_mc: header count is 1 more than the height.
|
||||
self.header_mc.truncate(height + 1)
|
||||
self.header_mc.truncate(flush_data.height + 1)
|
||||
|
||||
###
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.hist_flush_count += 1
|
||||
nremoves = 0
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
tx_count = flush_data.tx_count
|
||||
for hashX in sorted(touched):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
|
||||
k = key[1:]
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= tx_count
|
||||
idx = bisect_left(a, tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[k] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(k)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
for key, value in puts.items():
|
||||
batch.put(key, value)
|
||||
self.write_history_state(batch)
|
||||
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
# Flush state last as it reads the wall time.
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. '
|
||||
f'Height {flush_data.height:,d} txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
def raw_header(self, height):
|
||||
"""Return the binary header at the given height."""
|
||||
|
@ -471,9 +603,8 @@ class LevelDB:
|
|||
def _fs_transactions(self, txids: Iterable[str]):
|
||||
unpack_be_uint64 = util.unpack_be_uint64
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.tx_db.get
|
||||
tx_db_get = self.db.get
|
||||
tx_cache = self._tx_and_merkle_cache
|
||||
|
||||
tx_infos = {}
|
||||
|
||||
for tx_hash in txids:
|
||||
|
@ -531,11 +662,13 @@ class LevelDB:
|
|||
def read_history():
|
||||
db_height = self.db_height
|
||||
tx_counts = self.tx_counts
|
||||
tx_db_get = self.db.get
|
||||
pack_be_uint64 = util.pack_be_uint64
|
||||
|
||||
cnt = 0
|
||||
txs = []
|
||||
|
||||
for hist in self.history.db.iterator(prefix=hashX, include_key=False):
|
||||
for hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
|
@ -566,16 +699,11 @@ class LevelDB:
|
|||
|
||||
def undo_key(self, height):
|
||||
"""DB key for undo information at the given height."""
|
||||
return b'U' + pack('>I', height)
|
||||
return UNDO_PREFIX + pack('>I', height)
|
||||
|
||||
def read_undo_info(self, height):
|
||||
"""Read undo information from a file for the current height."""
|
||||
return self.utxo_db.get(self.undo_key(height))
|
||||
|
||||
def flush_undo_infos(self, batch_put, undo_infos):
|
||||
"""undo_infos is a list of (undo_info, height) pairs."""
|
||||
for undo_info, height in undo_infos:
|
||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
||||
return self.db.get(self.undo_key(height))
|
||||
|
||||
def raw_block_prefix(self):
|
||||
return 'block'
|
||||
|
@ -606,17 +734,16 @@ class LevelDB:
|
|||
|
||||
def clear_excess_undo_info(self):
|
||||
"""Clear excess undo info. Only most recent N are kept."""
|
||||
prefix = b'U'
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.utxo_db.iterator(prefix=prefix):
|
||||
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
keys.append(key)
|
||||
|
||||
if keys:
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
||||
|
@ -637,7 +764,7 @@ class LevelDB:
|
|||
# -- UTXO database
|
||||
|
||||
def read_utxo_state(self):
|
||||
state = self.utxo_db.get(b'state')
|
||||
state = self.db.get(UTXO_STATE)
|
||||
if not state:
|
||||
self.db_height = -1
|
||||
self.db_tx_count = 0
|
||||
|
@ -680,7 +807,7 @@ class LevelDB:
|
|||
self.logger.info(f'height: {self.db_height:,d}')
|
||||
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
|
||||
self.logger.info(f'tx count: {self.db_tx_count:,d}')
|
||||
if self.utxo_db.for_sync:
|
||||
if self.db.for_sync:
|
||||
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
|
||||
if self.first_sync:
|
||||
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
|
||||
|
@ -697,11 +824,11 @@ class LevelDB:
|
|||
'first_sync': self.first_sync,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
batch.put(b'state', repr(state).encode())
|
||||
batch.put(UTXO_STATE, repr(state).encode())
|
||||
|
||||
def set_flush_count(self, count):
|
||||
self.utxo_flush_count = count
|
||||
with self.utxo_db.write_batch() as batch:
|
||||
with self.db.write_batch() as batch:
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
async def all_utxos(self, hashX):
|
||||
|
@ -713,8 +840,8 @@ class LevelDB:
|
|||
fs_tx_hash = self.fs_tx_hash
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
prefix = b'u' + hashX
|
||||
for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
|
||||
prefix = UTXO_PREFIX + hashX
|
||||
for db_key, db_value in self.db.iterator(prefix=prefix):
|
||||
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
|
||||
value, = unpack('<Q', db_value)
|
||||
tx_hash, height = fs_tx_hash(tx_num)
|
||||
|
@ -744,10 +871,10 @@ class LevelDB:
|
|||
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
|
||||
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for db_key, hashX in self.utxo_db.iterator(prefix=prefix):
|
||||
for db_key, hashX in self.db.iterator(prefix=prefix):
|
||||
tx_num_packed = db_key[-4:]
|
||||
tx_num, = unpack('<I', tx_num_packed)
|
||||
hash, height = self.fs_tx_hash(tx_num)
|
||||
|
@ -765,8 +892,8 @@ class LevelDB:
|
|||
return None
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
key = b'u' + hashX + suffix
|
||||
db_value = self.utxo_db.get(key)
|
||||
key = UTXO_PREFIX + hashX + suffix
|
||||
db_value = self.db.get(key)
|
||||
if not db_value:
|
||||
# This can happen if the DB was updated between
|
||||
# getting the hashXs and getting the UTXOs
|
||||
|
|
|
@ -43,10 +43,12 @@ class Merkle:
|
|||
def __init__(self, hash_func=double_sha256):
|
||||
self.hash_func = hash_func
|
||||
|
||||
def tree_depth(self, hash_count):
|
||||
return self.branch_length(hash_count) + 1
|
||||
@staticmethod
|
||||
def tree_depth(hash_count):
|
||||
return Merkle.branch_length(hash_count) + 1
|
||||
|
||||
def branch_length(self, hash_count):
|
||||
@staticmethod
|
||||
def branch_length(hash_count):
|
||||
"""Return the length of a merkle branch given the number of hashes."""
|
||||
if not isinstance(hash_count, int):
|
||||
raise TypeError('hash_count must be an integer')
|
||||
|
@ -54,7 +56,8 @@ class Merkle:
|
|||
raise ValueError('hash_count must be at least 1')
|
||||
return ceil(log(hash_count, 2))
|
||||
|
||||
def branch_and_root(self, hashes, index, length=None):
|
||||
@staticmethod
|
||||
def branch_and_root(hashes, index, length=None, hash_func=double_sha256):
|
||||
"""Return a (merkle branch, merkle_root) pair given hashes, and the
|
||||
index of one of those hashes.
|
||||
"""
|
||||
|
@ -64,7 +67,7 @@ class Merkle:
|
|||
# This also asserts hashes is not empty
|
||||
if not 0 <= index < len(hashes):
|
||||
raise ValueError(f"index '{index}/{len(hashes)}' out of range")
|
||||
natural_length = self.branch_length(len(hashes))
|
||||
natural_length = Merkle.branch_length(len(hashes))
|
||||
if length is None:
|
||||
length = natural_length
|
||||
else:
|
||||
|
@ -73,7 +76,6 @@ class Merkle:
|
|||
if length < natural_length:
|
||||
raise ValueError('length out of range')
|
||||
|
||||
hash_func = self.hash_func
|
||||
branch = []
|
||||
for _ in range(length):
|
||||
if len(hashes) & 1:
|
||||
|
@ -85,44 +87,47 @@ class Merkle:
|
|||
|
||||
return branch, hashes[0]
|
||||
|
||||
def root(self, hashes, length=None):
|
||||
@staticmethod
|
||||
def root(hashes, length=None):
|
||||
"""Return the merkle root of a non-empty iterable of binary hashes."""
|
||||
branch, root = self.branch_and_root(hashes, 0, length)
|
||||
branch, root = Merkle.branch_and_root(hashes, 0, length)
|
||||
return root
|
||||
|
||||
def root_from_proof(self, hash, branch, index):
|
||||
"""Return the merkle root given a hash, a merkle branch to it, and
|
||||
its index in the hashes array.
|
||||
# @staticmethod
|
||||
# def root_from_proof(hash, branch, index, hash_func=double_sha256):
|
||||
# """Return the merkle root given a hash, a merkle branch to it, and
|
||||
# its index in the hashes array.
|
||||
#
|
||||
# branch is an iterable sorted deepest to shallowest. If the
|
||||
# returned root is the expected value then the merkle proof is
|
||||
# verified.
|
||||
#
|
||||
# The caller should have confirmed the length of the branch with
|
||||
# branch_length(). Unfortunately this is not easily done for
|
||||
# bitcoin transactions as the number of transactions in a block
|
||||
# is unknown to an SPV client.
|
||||
# """
|
||||
# for elt in branch:
|
||||
# if index & 1:
|
||||
# hash = hash_func(elt + hash)
|
||||
# else:
|
||||
# hash = hash_func(hash + elt)
|
||||
# index >>= 1
|
||||
# if index:
|
||||
# raise ValueError('index out of range for branch')
|
||||
# return hash
|
||||
|
||||
branch is an iterable sorted deepest to shallowest. If the
|
||||
returned root is the expected value then the merkle proof is
|
||||
verified.
|
||||
|
||||
The caller should have confirmed the length of the branch with
|
||||
branch_length(). Unfortunately this is not easily done for
|
||||
bitcoin transactions as the number of transactions in a block
|
||||
is unknown to an SPV client.
|
||||
"""
|
||||
hash_func = self.hash_func
|
||||
for elt in branch:
|
||||
if index & 1:
|
||||
hash = hash_func(elt + hash)
|
||||
else:
|
||||
hash = hash_func(hash + elt)
|
||||
index >>= 1
|
||||
if index:
|
||||
raise ValueError('index out of range for branch')
|
||||
return hash
|
||||
|
||||
def level(self, hashes, depth_higher):
|
||||
@staticmethod
|
||||
def level(hashes, depth_higher):
|
||||
"""Return a level of the merkle tree of hashes the given depth
|
||||
higher than the bottom row of the original tree."""
|
||||
size = 1 << depth_higher
|
||||
root = self.root
|
||||
root = Merkle.root
|
||||
return [root(hashes[n: n + size], depth_higher)
|
||||
for n in range(0, len(hashes), size)]
|
||||
|
||||
def branch_and_root_from_level(self, level, leaf_hashes, index,
|
||||
@staticmethod
|
||||
def branch_and_root_from_level(level, leaf_hashes, index,
|
||||
depth_higher):
|
||||
"""Return a (merkle branch, merkle_root) pair when a merkle-tree has a
|
||||
level cached.
|
||||
|
@ -146,10 +151,10 @@ class Merkle:
|
|||
if not isinstance(leaf_hashes, list):
|
||||
raise TypeError("leaf_hashes must be a list")
|
||||
leaf_index = (index >> depth_higher) << depth_higher
|
||||
leaf_branch, leaf_root = self.branch_and_root(
|
||||
leaf_branch, leaf_root = Merkle.branch_and_root(
|
||||
leaf_hashes, index - leaf_index, depth_higher)
|
||||
index >>= depth_higher
|
||||
level_branch, root = self.branch_and_root(level, index)
|
||||
level_branch, root = Merkle.branch_and_root(level, index)
|
||||
# Check last so that we know index is in-range
|
||||
if leaf_root != level[index]:
|
||||
raise ValueError('leaf hashes inconsistent with level')
|
||||
|
|
Loading…
Add table
Reference in a new issue