Compare commits

...
Sign in to create a new pull request.

6 commits

Author SHA1 Message Date
Jack Robison
7c96fd92c2
consolidate flush_backup 2021-01-24 13:42:49 -05:00
Jack Robison
e213ec6525
remove lbry.wallet.server.history 2021-01-24 13:42:49 -05:00
Jack Robison
0398d8a897
consolidate leveldb block advance/reorg
-move methods from History to LevelDB
2021-01-24 13:42:49 -05:00
Jack Robison
c611ab6423
atomic flush_dbs 2021-01-24 13:42:49 -05:00
Jack Robison
e2864b76dc
combine leveldb databases 2021-01-24 13:42:49 -05:00
Jack Robison
d3ac65e431
Merkle staticmethods 2021-01-24 13:42:49 -05:00
4 changed files with 411 additions and 590 deletions

View file

@ -1,17 +1,22 @@
import time import time
import asyncio import asyncio
import typing
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional from typing import Optional, List, Tuple
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
import lbry import lbry
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.wallet.server.tx import Tx
from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.db.writer import SQLDB
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.util import chunks, class_logger
from lbry.wallet.server.leveldb import FlushData from lbry.wallet.server.leveldb import FlushData
from lbry.wallet.transaction import Transaction
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB
class Prefetcher: class Prefetcher:
@ -153,7 +158,7 @@ class BlockProcessor:
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace=NAMESPACE
) )
def __init__(self, env, db, daemon, notifications): def __init__(self, env, db: 'LevelDB', daemon, notifications):
self.env = env self.env = env
self.db = db self.db = db
self.daemon = daemon self.daemon = daemon
@ -213,13 +218,10 @@ class BlockProcessor:
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
if hprevs == chain: if hprevs == chain:
start = time.perf_counter() start = time.perf_counter()
await self.run_in_thread_with_lock(self.advance_blocks, blocks) await self.run_in_thread_with_lock(self.advance_blocks, blocks)
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
await self._maybe_flush()
processed_time = time.perf_counter() - start processed_time = time.perf_counter() - start
self.block_count_metric.set(self.height) self.block_count_metric.set(self.height)
self.block_update_time_metric.observe(processed_time) self.block_update_time_metric.observe(processed_time)
@ -252,7 +254,6 @@ class BlockProcessor:
else: else:
self.logger.info(f'faking a reorg of {count:,d} blocks') self.logger.info(f'faking a reorg of {count:,d} blocks')
async def get_raw_blocks(last_height, hex_hashes): async def get_raw_blocks(last_height, hex_hashes):
heights = range(last_height, last_height - len(hex_hashes), -1) heights = range(last_height, last_height - len(hex_hashes), -1)
try: try:
@ -262,24 +263,17 @@ class BlockProcessor:
except FileNotFoundError: except FileNotFoundError:
return await self.daemon.raw_blocks(hex_hashes) return await self.daemon.raw_blocks(hex_hashes)
def flush_backup():
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)
try: try:
await self.flush(True) await self.flush(True)
start, last, hashes = await self.reorg_hashes(count) start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings. # Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
self.logger.info("reorg %i block hashes", len(hashes)) self.logger.info("reorg %i block hashes", len(hashes))
for hex_hashes in chunks(hashes, 50): for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes) raw_blocks = await get_raw_blocks(last, hex_hashes)
self.logger.info("got %i raw blocks", len(raw_blocks)) self.logger.info("got %i raw blocks", len(raw_blocks))
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks) last -= len(raw_blocks)
await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height) await self.run_in_thread_with_lock(self.db.sql.delete_claims_above_height, self.height)
@ -358,8 +352,7 @@ class BlockProcessor:
async def flush(self, flush_utxos): async def flush(self, flush_utxos):
def flush(): def flush():
self.db.flush_dbs(self.flush_data(), flush_utxos, self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining)
self.estimate_txs_remaining)
await self.run_in_thread_with_lock(flush) await self.run_in_thread_with_lock(flush)
async def _maybe_flush(self): async def _maybe_flush(self):
@ -378,7 +371,7 @@ class BlockProcessor:
one_MB = 1000*1000 one_MB = 1000*1000
utxo_cache_size = len(self.utxo_cache) * 205 utxo_cache_size = len(self.utxo_cache) * 205
db_deletes_size = len(self.db_deletes) * 57 db_deletes_size = len(self.db_deletes) * 57
hist_cache_size = self.db.history.unflushed_memsize() hist_cache_size = len(self.db.hist_unflushed) * 180 + self.db.hist_unflushed_count * 4
# Roughly ntxs * 32 + nblocks * 42 # Roughly ntxs * 32 + nblocks * 42
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
+ (self.height - self.db.fs_height) * 42) + (self.height - self.db.fs_height) * 42)
@ -420,13 +413,20 @@ class BlockProcessor:
self.headers.extend(headers) self.headers.extend(headers)
self.tip = self.coin.header_hash(headers[-1]) self.tip = self.coin.header_hash(headers[-1])
def advance_txs(self, height, txs, header, block_hash): self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining)
for cache in self.search_cache.values():
cache.clear()
self.history_cache.clear()
self.notifications.notified_mempool_txs.clear()
def advance_txs(self, height, txs: List[Tuple[Tx, bytes]], header, block_hash):
self.block_hashes.append(block_hash) self.block_hashes.append(block_hash)
self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs]))
undo_info = [] undo_info = []
tx_num = self.tx_count
hashXs_by_tx = [] hashXs_by_tx = []
tx_num = self.tx_count
# Use local vars for speed in the loops # Use local vars for speed in the loops
put_utxo = self.utxo_cache.__setitem__ put_utxo = self.utxo_cache.__setitem__
@ -462,7 +462,15 @@ class BlockProcessor:
self.db.total_transactions.append(tx_hash) self.db.total_transactions.append(tx_hash)
tx_num += 1 tx_num += 1
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) # self.db.add_unflushed(hashXs_by_tx, self.tx_count)
first_tx_num = self.tx_count
_unflushed = self.db.hist_unflushed
_count = 0
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
for _hashX in set(_hashXs):
_unflushed[_hashX].append(_tx_num)
_count += len(_hashXs)
self.db.hist_unflushed_count += _count
self.tx_count = tx_num self.tx_count = tx_num
self.db.tx_counts.append(tx_num) self.db.tx_counts.append(tx_num)
@ -493,6 +501,10 @@ class BlockProcessor:
self.height -= 1 self.height -= 1
self.db.tx_counts.pop() self.db.tx_counts.pop()
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)
self.logger.info(f'backed up to height {self.height:,d}') self.logger.info(f'backed up to height {self.height:,d}')
def backup_txs(self, txs): def backup_txs(self, txs):
@ -603,11 +615,11 @@ class BlockProcessor:
# Value: hashX # Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed prefix = b'h' + tx_hash[:4] + idx_packed
candidates = {db_key: hashX for db_key, hashX candidates = {db_key: hashX for db_key, hashX
in self.db.utxo_db.iterator(prefix=prefix)} in self.db.db.iterator(prefix=prefix)}
for hdb_key, hashX in candidates.items(): for hdb_key, hashX in candidates.items():
tx_num_packed = hdb_key[-4:] tx_num_packed = hdb_key[-4:]
if len(candidates) > 1: if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed) tx_num, = unpack('<I', tx_num_packed)
try: try:
hash, height = self.db.fs_tx_hash(tx_num) hash, height = self.db.fs_tx_hash(tx_num)
@ -622,7 +634,7 @@ class BlockProcessor:
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
udb_key = b'u' + hashX + hdb_key[-6:] udb_key = b'u' + hashX + hdb_key[-6:]
utxo_value_packed = self.db.utxo_db.get(udb_key) utxo_value_packed = self.db.db.get(udb_key)
if utxo_value_packed is None: if utxo_value_packed is None:
self.logger.warning( self.logger.warning(
"%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX) "%s:%s is not found in UTXO db for %s", hash_to_hex_str(tx_hash), tx_idx, hash_to_hex_str(hashX)
@ -631,6 +643,7 @@ class BlockProcessor:
# Remove both entries for this UTXO # Remove both entries for this UTXO
self.db_deletes.append(hdb_key) self.db_deletes.append(hdb_key)
self.db_deletes.append(udb_key) self.db_deletes.append(udb_key)
return hashX + tx_num_packed + utxo_value_packed return hashX + tx_num_packed + utxo_value_packed
self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table') self.logger.error('UTXO {hash_to_hex_str(tx_hash)} / {tx_idx} not found in "h" table')

View file

@ -1,324 +0,0 @@
# Copyright (c) 2016-2018, Neil Booth
# Copyright (c) 2017, the ElectrumX authors
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
"""History by script hash (address)."""
import array
import ast
import bisect
import time
from collections import defaultdict
from functools import partial
from lbry.wallet.server import util
from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
class History:
DB_VERSIONS = [0]
def __init__(self):
self.logger = util.class_logger(__name__, self.__class__.__name__)
# For history compaction
self.max_hist_row_entries = 12500
self.unflushed = defaultdict(partial(array.array, 'I'))
self.unflushed_count = 0
self.db = None
def open_db(self, db_class, for_sync, utxo_flush_count, compacting):
self.db = db_class('hist', for_sync)
self.read_state()
self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
if not compacting:
self._cancel_compaction()
return self.flush_count
def close_db(self):
if self.db:
self.db.close()
self.db = None
def read_state(self):
state = self.db.get(b'state\0\0')
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise RuntimeError('failed reading state from history DB')
self.flush_count = state['flush_count']
self.comp_flush_count = state.get('comp_flush_count', -1)
self.comp_cursor = state.get('comp_cursor', -1)
self.db_version = state.get('db_version', 0)
else:
self.flush_count = 0
self.comp_flush_count = -1
self.comp_cursor = -1
self.db_version = max(self.DB_VERSIONS)
self.logger.info(f'history DB version: {self.db_version}')
if self.db_version not in self.DB_VERSIONS:
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
self.logger.error(msg)
raise RuntimeError(msg)
self.logger.info(f'flush count: {self.flush_count:,d}')
def clear_excess(self, utxo_flush_count):
# < might happen at end of compaction as both DBs cannot be
# updated atomically
if self.flush_count <= utxo_flush_count:
return
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
keys = []
for key, hist in self.db.iterator(prefix=b''):
flush_id, = unpack_be_uint16_from(key[-2:])
if flush_id > utxo_flush_count:
keys.append(key)
self.logger.info(f'deleting {len(keys):,d} history entries')
self.flush_count = utxo_flush_count
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.write_state(batch)
self.logger.info('deleted excess history entries')
def write_state(self, batch):
"""Write state to the history DB."""
state = {
'flush_count': self.flush_count,
'comp_flush_count': self.comp_flush_count,
'comp_cursor': self.comp_cursor,
'db_version': self.db_version,
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(b'state\0\0', repr(state).encode())
def add_unflushed(self, hashXs_by_tx, first_tx_num):
unflushed = self.unflushed
count = 0
for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
hashXs = set(hashXs)
for hashX in hashXs:
unflushed[hashX].append(tx_num)
count += len(hashXs)
self.unflushed_count += count
def unflushed_memsize(self):
return len(self.unflushed) * 180 + self.unflushed_count * 4
def assert_flushed(self):
assert not self.unflushed
def flush(self):
start_time = time.time()
self.flush_count += 1
flush_id = pack_be_uint16(self.flush_count)
unflushed = self.unflushed
with self.db.write_batch() as batch:
for hashX in sorted(unflushed):
key = hashX + flush_id
batch.put(key, unflushed[hashX].tobytes())
self.write_state(batch)
count = len(unflushed)
unflushed.clear()
self.unflushed_count = 0
if self.db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed history in {elapsed:.1f}s '
f'for {count:,d} addrs')
def backup(self, hashXs, tx_count):
# Not certain this is needed, but it doesn't hurt
self.flush_count += 1
nremoves = 0
bisect_left = bisect.bisect_left
with self.db.write_batch() as batch:
for hashX in sorted(hashXs):
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=hashX, reverse=True):
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= tx_count
idx = bisect_left(a, tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[key] = a[:idx].tobytes()
break
deletes.append(key)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.write_state(batch)
self.logger.info(f'backing up removed {nremoves:,d} history entries')
# def get_txnums(self, hashX, limit=1000):
# """Generator that returns an unpruned, sorted list of tx_nums in the
# history of a hashX. Includes both spending and receiving
# transactions. By default yields at most 1000 entries. Set
# limit to None to get them all. """
# limit = util.resolve_limit(limit)
# for key, hist in self.db.iterator(prefix=hashX):
# a = array.array('I')
# a.frombytes(hist)
# for tx_num in a:
# if limit == 0:
# return
# yield tx_num
# limit -= 1
#
# History compaction
#
# comp_cursor is a cursor into compaction progress.
# -1: no compaction in progress
# 0-65535: Compaction in progress; all prefixes < comp_cursor have
# been compacted, and later ones have not.
# 65536: compaction complete in-memory but not flushed
#
# comp_flush_count applies during compaction, and is a flush count
# for history with prefix < comp_cursor. flush_count applies
# to still uncompacted history. It is -1 when no compaction is
# taking place. Key suffixes up to and including comp_flush_count
# are used, so a parallel history flush must first increment this
#
# When compaction is complete and the final flush takes place,
# flush_count is reset to comp_flush_count, and comp_flush_count to -1
def _flush_compaction(self, cursor, write_items, keys_to_delete):
"""Flush a single compaction pass as a batch."""
# Update compaction state
if cursor == 65536:
self.flush_count = self.comp_flush_count
self.comp_cursor = -1
self.comp_flush_count = -1
else:
self.comp_cursor = cursor
# History DB. Flush compacted history and updated state
with self.db.write_batch() as batch:
# Important: delete first! The keyspace may overlap.
for key in keys_to_delete:
batch.delete(key)
for key, value in write_items:
batch.put(key, value)
self.write_state(batch)
def _compact_hashX(self, hashX, hist_map, hist_list,
write_items, keys_to_delete):
"""Compress history for a hashX. hist_list is an ordered list of
the histories to be compressed."""
# History entries (tx numbers) are 4 bytes each. Distribute
# over rows of up to 50KB in size. A fixed row size means
# future compactions will not need to update the first N - 1
# rows.
max_row_size = self.max_hist_row_entries * 4
full_hist = b''.join(hist_list)
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
if nrows > 4:
self.logger.info('hashX {} is large: {:,d} entries across '
'{:,d} rows'
.format(hash_to_hex_str(hashX),
len(full_hist) // 4, nrows))
# Find what history needs to be written, and what keys need to
# be deleted. Start by assuming all keys are to be deleted,
# and then remove those that are the same on-disk as when
# compacted.
write_size = 0
keys_to_delete.update(hist_map)
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
key = hashX + pack_be_uint16(n)
if hist_map.get(key) == chunk:
keys_to_delete.remove(key)
else:
write_items.append((key, chunk))
write_size += len(chunk)
assert n + 1 == nrows
self.comp_flush_count = max(self.comp_flush_count, n)
return write_size
def _compact_prefix(self, prefix, write_items, keys_to_delete):
"""Compact all history entries for hashXs beginning with the
given prefix. Update keys_to_delete and write."""
prior_hashX = None
hist_map = {}
hist_list = []
key_len = HASHX_LEN + 2
write_size = 0
for key, hist in self.db.iterator(prefix=prefix):
# Ignore non-history entries
if len(key) != key_len:
continue
hashX = key[:-2]
if hashX != prior_hashX and prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map,
hist_list, write_items,
keys_to_delete)
hist_map.clear()
hist_list.clear()
prior_hashX = hashX
hist_map[key] = hist
hist_list.append(hist)
if prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
write_items, keys_to_delete)
return write_size
def _compact_history(self, limit):
"""Inner loop of history compaction. Loops until limit bytes have
been processed.
"""
keys_to_delete = set()
write_items = [] # A list of (key, value) pairs
write_size = 0
# Loop over 2-byte prefixes
cursor = self.comp_cursor
while write_size < limit and cursor < 65536:
prefix = pack_be_uint16(cursor)
write_size += self._compact_prefix(prefix, write_items,
keys_to_delete)
cursor += 1
max_rows = self.comp_flush_count + 1
self._flush_compaction(cursor, write_items, keys_to_delete)
self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
.format(len(write_items), write_size / 1000000,
len(keys_to_delete), max_rows,
100 * cursor / 65536))
return write_size
def _cancel_compaction(self):
if self.comp_cursor != -1:
self.logger.warning('cancelling in-progress history compaction')
self.comp_flush_count = -1
self.comp_cursor = -1

View file

@ -17,9 +17,10 @@ import time
import zlib import zlib
import typing import typing
from typing import Optional, List, Tuple, Iterable from typing import Optional, List, Tuple, Iterable
from functools import partial
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right, bisect_left
from collections import namedtuple from collections import namedtuple, defaultdict
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
@ -28,19 +29,25 @@ from lbry.utils import LRUCacheWithMetrics
from lbry.wallet.server import util from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
from lbry.wallet.server.history import History
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
HEADER_PREFIX = b'H' HISTORY_PREFIX = b'A'
TX_COUNT_PREFIX = b'T'
TX_HASH_PREFIX = b'X'
TX_PREFIX = b'B' TX_PREFIX = b'B'
TX_NUM_PREFIX = b'N'
BLOCK_HASH_PREFIX = b'C' BLOCK_HASH_PREFIX = b'C'
HEADER_PREFIX = b'H'
TX_NUM_PREFIX = b'N'
TX_COUNT_PREFIX = b'T'
UNDO_PREFIX = b'U'
TX_HASH_PREFIX = b'X'
HASHX_UTXO_PREFIX = b'h'
HIST_STATE = b'state-hist'
UTXO_STATE = b'state-utxo'
UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x'
@attr.s(slots=True) @attr.s(slots=True)
@ -65,6 +72,7 @@ class LevelDB:
""" """
DB_VERSIONS = [6] DB_VERSIONS = [6]
HIST_DB_VERSIONS = [0]
class DBError(Exception): class DBError(Exception):
"""Raised on general DB errors generally indicating corruption.""" """Raised on general DB errors generally indicating corruption."""
@ -78,8 +86,14 @@ class LevelDB:
self.logger.info(f'switching current directory to {env.db_dir}') self.logger.info(f'switching current directory to {env.db_dir}')
self.db_class = db_class(env.db_dir, self.env.db_engine) self.db_class = db_class(env.db_dir, self.env.db_engine)
self.history = History() self.db = None
self.utxo_db = None
self.hist_unflushed = defaultdict(partial(array.array, 'I'))
self.hist_unflushed_count = 0
self.hist_flush_count = 0
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.tx_counts = None self.tx_counts = None
self.headers = None self.headers = None
self.last_flush = time.time() self.last_flush = time.time()
@ -96,6 +110,19 @@ class LevelDB:
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
self.total_transactions = None self.total_transactions = None
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
# unflushed = self.history.unflushed
# count = 0
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
# hashXs = set(hashXs)
# for hashX in hashXs:
# unflushed[hashX].append(tx_num)
# count += len(hashXs)
# self.history.unflushed_count += count
# def unflushed_memsize(self):
# return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4
async def _read_tx_counts(self): async def _read_tx_counts(self):
if self.tx_counts is not None: if self.tx_counts is not None:
return return
@ -105,7 +132,7 @@ class LevelDB:
def get_counts(): def get_counts():
return tuple( return tuple(
util.unpack_be_uint64(tx_count) util.unpack_be_uint64(tx_count)
for tx_count in self.tx_db.iterator(prefix=TX_COUNT_PREFIX, include_key=False) for tx_count in self.db.iterator(prefix=TX_COUNT_PREFIX, include_key=False)
) )
tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts) tx_counts = await asyncio.get_event_loop().run_in_executor(self.executor, get_counts)
@ -120,7 +147,7 @@ class LevelDB:
async def _read_txids(self): async def _read_txids(self):
def get_txids(): def get_txids():
return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) return list(self.db.iterator(prefix=TX_HASH_PREFIX, include_key=False))
start = time.perf_counter() start = time.perf_counter()
self.logger.info("loading txids") self.logger.info("loading txids")
@ -136,7 +163,7 @@ class LevelDB:
def get_headers(): def get_headers():
return [ return [
header for header in self.headers_db.iterator(prefix=HEADER_PREFIX, include_key=False) header for header in self.db.iterator(prefix=HEADER_PREFIX, include_key=False)
] ]
headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers) headers = await asyncio.get_event_loop().run_in_executor(self.executor, get_headers)
@ -152,31 +179,97 @@ class LevelDB:
f.write(f'ElectrumX databases and metadata for ' f.write(f'ElectrumX databases and metadata for '
f'{self.coin.NAME} {self.coin.NET}'.encode()) f'{self.coin.NAME} {self.coin.NET}'.encode())
assert self.headers_db is None assert self.db is None
self.headers_db = self.db_class('headers', for_sync) self.db = self.db_class(f'lbry-{self.env.db_engine}', for_sync)
if self.headers_db.is_new: if self.db.is_new:
self.logger.info('created new headers db') self.logger.info('created new db: %s', f'lbry-{self.env.db_engine}')
self.logger.info(f'opened headers DB (for sync: {for_sync})') self.logger.info(f'opened DB (for sync: {for_sync})')
assert self.tx_db is None
self.tx_db = self.db_class('tx', for_sync)
if self.tx_db.is_new:
self.logger.info('created new tx db')
self.logger.info(f'opened tx DB (for sync: {for_sync})')
assert self.utxo_db is None
# First UTXO DB
self.utxo_db = self.db_class('utxo', for_sync)
if self.utxo_db.is_new:
self.logger.info('created new utxo db')
self.logger.info(f'opened utxo db (for sync: {for_sync})')
self.read_utxo_state() self.read_utxo_state()
# Then history DB # Then history DB
self.utxo_flush_count = self.history.open_db( state = self.db.get(HIST_STATE)
self.db_class, for_sync, self.utxo_flush_count, compacting if state:
) state = ast.literal_eval(state.decode())
self.clear_excess_undo_info() if not isinstance(state, dict):
raise RuntimeError('failed reading state from history DB')
self.hist_flush_count = state['flush_count']
self.hist_comp_flush_count = state.get('comp_flush_count', -1)
self.hist_comp_cursor = state.get('comp_cursor', -1)
self.hist_db_version = state.get('db_version', 0)
else:
self.hist_flush_count = 0
self.hist_comp_flush_count = -1
self.hist_comp_cursor = -1
self.hist_db_version = max(self.HIST_DB_VERSIONS)
self.logger.info(f'history DB version: {self.hist_db_version}')
if self.hist_db_version not in self.HIST_DB_VERSIONS:
msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}'
self.logger.error(msg)
raise RuntimeError(msg)
self.logger.info(f'flush count: {self.hist_flush_count:,d}')
# self.history.clear_excess(self.utxo_flush_count)
# < might happen at end of compaction as both DBs cannot be
# updated atomically
if self.hist_flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
keys = []
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
k = key[1:]
flush_id, = unpack_be_uint16_from(k[-2:])
if flush_id > self.utxo_flush_count:
keys.append(k)
self.logger.info(f'deleting {len(keys):,d} history entries')
self.hist_flush_count = self.utxo_flush_count
with self.db.write_batch() as batch:
for key in keys:
batch.delete(HASHX_HISTORY_PREFIX + key)
state = {
'flush_count': self.hist_flush_count,
'comp_flush_count': self.hist_comp_flush_count,
'comp_cursor': self.hist_comp_cursor,
'db_version': self.hist_db_version,
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(HIST_STATE, repr(state).encode())
self.logger.info('deleted excess history entries')
self.utxo_flush_count = self.hist_flush_count
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
keys.append(key)
if keys:
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
# delete old block files
prefix = self.raw_block_prefix()
paths = [path for path in glob(f'{prefix}[0-9]*')
if len(path) > len(prefix)
and int(path[len(prefix):]) < min_height]
if paths:
for path in paths:
try:
os.remove(path)
except FileNotFoundError:
pass
self.logger.info(f'deleted {len(paths):,d} stale block files')
# Read TX counts (requires meta directory) # Read TX counts (requires meta directory)
await self._read_tx_counts() await self._read_tx_counts()
@ -185,10 +278,7 @@ class LevelDB:
await self._read_headers() await self._read_headers()
def close(self): def close(self):
self.utxo_db.close() self.db.close()
self.history.close_db()
self.headers_db.close()
self.tx_db.close()
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
self.executor = None self.executor = None
@ -209,18 +299,12 @@ class LevelDB:
"""Open the databases for serving. If they are already open they are """Open the databases for serving. If they are already open they are
closed first. closed first.
""" """
self.logger.info('closing DBs to re-open for serving') if self.db:
if self.utxo_db: return
self.logger.info('closing DBs to re-open for serving') # self.logger.info('closing DBs to re-open for serving')
self.utxo_db.close() # self.db.close()
self.history.close_db() # self.history.close_db()
self.utxo_db = None # self.db = None
if self.headers_db:
self.headers_db.close()
self.headers_db = None
if self.tx_db:
self.tx_db.close()
self.tx_db = None
await self._open_dbs(False, False) await self._open_dbs(False, False)
self.logger.info("opened for serving") self.logger.info("opened for serving")
@ -249,107 +333,7 @@ class LevelDB:
assert not flush_data.adds assert not flush_data.adds
assert not flush_data.deletes assert not flush_data.deletes
assert not flush_data.undo_infos assert not flush_data.undo_infos
self.history.assert_flushed() assert not self.hist_unflushed
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
self.flush_fs(flush_data)
# Then history
self.flush_history()
# Flush state last as it reads the wall time.
with self.utxo_db.write_batch() as batch:
if flush_utxos:
self.flush_utxo_db(batch, flush_data)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.utxo_db)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.utxo_db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
def flush_fs(self, flush_data):
"""Write headers, tx counts and block tx hashes to the filesystem.
The first height to write is self.fs_height + 1. The FS
metadata is all append-only, so in a crash we just pick up
again from the height stored in the DB.
"""
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_txs) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
assert len(
b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers
start_time = time.perf_counter()
with self.headers_db.write_batch() as batch:
batch_put = batch.put
for i, header in enumerate(flush_data.headers):
batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header)
self.headers.append(header)
flush_data.headers.clear()
height_start = self.fs_height + 1
tx_num = prior_tx_count
with self.tx_db.write_batch() as batch:
batch_put = batch.put
for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs):
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1
offset = 0
while offset < len(tx_hashes):
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32])
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num))
batch_put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32])
tx_num += 1
offset += 32
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
elapsed = time.perf_counter() - start_time
self.logger.info(f'flushed filesystem data in {elapsed:.2f}s')
def flush_history(self):
self.history.flush()
def flush_utxo_db(self, batch, flush_data): def flush_utxo_db(self, batch, flush_data):
"""Flush the cached DB writes and UTXO set to the batch.""" """Flush the cached DB writes and UTXO set to the batch."""
@ -372,15 +356,16 @@ class LevelDB:
# suffix = tx_idx + tx_num # suffix = tx_idx + tx_num
hashX = value[:-12] hashX = value[:-12]
suffix = key[-2:] + value[-12:-8] suffix = key[-2:] + value[-12:-8]
batch_put(b'h' + key[:4] + suffix, hashX) batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, value[-8:]) batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
flush_data.adds.clear() flush_data.adds.clear()
# New undo information # New undo information
self.flush_undo_infos(batch_put, flush_data.undo_infos) for undo_info, height in flush_data.undo_infos:
batch_put(self.undo_key(height), b''.join(undo_info))
flush_data.undo_infos.clear() flush_data.undo_infos.clear()
if self.utxo_db.for_sync: if self.db.for_sync:
block_count = flush_data.height - self.db_height block_count = flush_data.height - self.db_height
tx_count = flush_data.tx_count - self.db_tx_count tx_count = flush_data.tx_count - self.db_tx_count
elapsed = time.time() - start_time elapsed = time.time() - start_time
@ -389,49 +374,196 @@ class LevelDB:
f'{spend_count:,d} spends in ' f'{spend_count:,d} spends in '
f'{elapsed:.1f}s, committing...') f'{elapsed:.1f}s, committing...')
self.utxo_flush_count = self.history.flush_count self.utxo_flush_count = self.hist_flush_count
self.db_height = flush_data.height self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip self.db_tip = flush_data.tip
def flush_state(self, batch): def write_history_state(self, batch):
"""Flush chain state to the batch.""" state = {
now = time.time() 'flush_count': self.hist_flush_count,
self.wall_time += now - self.last_flush 'comp_flush_count': self.hist_comp_flush_count,
self.last_flush = now 'comp_cursor': self.hist_comp_cursor,
self.last_flush_tx_count = self.fs_tx_count 'db_version': self.db_version,
self.write_utxo_state(batch) }
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(HIST_STATE, repr(state).encode())
def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos."""
if flush_data.height == self.db_height:
self.assert_flushed(flush_data)
return
start_time = time.time()
prior_flush = self.last_flush
tx_delta = flush_data.tx_count - self.last_flush_tx_count
# Flush to file system
# self.flush_fs(flush_data)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
assert len(flush_data.block_txs) == len(flush_data.headers)
assert flush_data.height == self.fs_height + len(flush_data.headers)
assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts
else 0)
assert len(self.tx_counts) == flush_data.height + 1
assert len(
b''.join(hashes for hashes, _ in flush_data.block_txs)
) // 32 == flush_data.tx_count - prior_tx_count
# Write the headers
start_time = time.perf_counter()
with self.db.write_batch() as batch:
batch_put = batch.put
height_start = self.fs_height + 1
tx_num = prior_tx_count
for i, (header, block_hash, (tx_hashes, txs)) in enumerate(zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)):
batch_put(HEADER_PREFIX + util.pack_be_uint64(height_start), header)
self.headers.append(header)
tx_count = self.tx_counts[height_start]
batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1])
batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count))
height_start += 1
offset = 0
while offset < len(tx_hashes):
batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32])
batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num))
batch_put(TX_PREFIX + tx_hashes[offset:offset + 32], txs[offset // 32])
tx_num += 1
offset += 32
flush_data.headers.clear()
flush_data.block_txs.clear()
flush_data.block_hashes.clear()
# flush_data.claim_txo_cache.clear()
# flush_data.support_txo_cache.clear()
self.fs_height = flush_data.height
self.fs_tx_count = flush_data.tx_count
# Then history
self.hist_flush_count += 1
flush_id = pack_be_uint16(self.hist_flush_count)
unflushed = self.hist_unflushed
for hashX in sorted(unflushed):
key = hashX + flush_id
batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.write_history_state(batch)
unflushed.clear()
self.hist_unflushed_count = 0
#########################
# Flush state last as it reads the wall time.
self.flush_utxo_db(batch, flush_data)
# self.flush_state(batch)
#
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
# # Update and put the wall time again - otherwise we drop the
# # time it took to commit the batch
# # self.flush_state(self.db)
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.hist_flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
# Catch-up stats
if self.db.for_sync:
flush_interval = self.last_flush - prior_flush
tx_per_sec_gen = int(flush_data.tx_count / self.wall_time)
tx_per_sec_last = 1 + int(tx_delta / flush_interval)
eta = estimate_txs_remaining() / tx_per_sec_last
self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, '
f'since last flush: {tx_per_sec_last:,d}')
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}')
# def flush_state(self, batch):
# """Flush chain state to the batch."""
# now = time.time()
# self.wall_time += now - self.last_flush
# self.last_flush = now
# self.last_flush_tx_count = self.fs_tx_count
# self.write_utxo_state(batch)
def flush_backup(self, flush_data, touched): def flush_backup(self, flush_data, touched):
"""Like flush_dbs() but when backing up. All UTXOs are flushed.""" """Like flush_dbs() but when backing up. All UTXOs are flushed."""
assert not flush_data.headers assert not flush_data.headers
assert not flush_data.block_txs assert not flush_data.block_txs
assert flush_data.height < self.db_height assert flush_data.height < self.db_height
self.history.assert_flushed() assert not self.hist_unflushed
start_time = time.time() start_time = time.time()
tx_delta = flush_data.tx_count - self.last_flush_tx_count tx_delta = flush_data.tx_count - self.last_flush_tx_count
###
self.backup_fs(flush_data.height, flush_data.tx_count) while self.fs_height > flush_data.height:
self.history.backup(touched, flush_data.tx_count)
with self.utxo_db.write_batch() as batch:
self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time.
self.flush_state(batch)
elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def backup_fs(self, height, tx_count):
"""Back up during a reorg. This just updates our pointers."""
while self.fs_height > height:
self.fs_height -= 1 self.fs_height -= 1
self.headers.pop() self.headers.pop()
self.fs_tx_count = tx_count self.fs_tx_count = flush_data.tx_count
# Truncate header_mc: header count is 1 more than the height. # Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1) self.header_mc.truncate(flush_data.height + 1)
###
# Not certain this is needed, but it doesn't hurt
self.hist_flush_count += 1
nremoves = 0
with self.db.write_batch() as batch:
tx_count = flush_data.tx_count
for hashX in sorted(touched):
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
k = key[1:]
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= tx_count
idx = bisect_left(a, tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[k] = a[:idx].tobytes()
break
deletes.append(k)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.write_history_state(batch)
self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time.
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
self.logger.info(f'backing up removed {nremoves:,d} history entries')
elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. '
f'Height {flush_data.height:,d} txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def raw_header(self, height): def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""
@ -471,9 +603,8 @@ class LevelDB:
def _fs_transactions(self, txids: Iterable[str]): def _fs_transactions(self, txids: Iterable[str]):
unpack_be_uint64 = util.unpack_be_uint64 unpack_be_uint64 = util.unpack_be_uint64
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.tx_db.get tx_db_get = self.db.get
tx_cache = self._tx_and_merkle_cache tx_cache = self._tx_and_merkle_cache
tx_infos = {} tx_infos = {}
for tx_hash in txids: for tx_hash in txids:
@ -531,11 +662,13 @@ class LevelDB:
def read_history(): def read_history():
db_height = self.db_height db_height = self.db_height
tx_counts = self.tx_counts tx_counts = self.tx_counts
tx_db_get = self.db.get
pack_be_uint64 = util.pack_be_uint64
cnt = 0 cnt = 0
txs = [] txs = []
for hist in self.history.db.iterator(prefix=hashX, include_key=False): for hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I') a = array.array('I')
a.frombytes(hist) a.frombytes(hist)
for tx_num in a: for tx_num in a:
@ -566,16 +699,11 @@ class LevelDB:
def undo_key(self, height): def undo_key(self, height):
"""DB key for undo information at the given height.""" """DB key for undo information at the given height."""
return b'U' + pack('>I', height) return UNDO_PREFIX + pack('>I', height)
def read_undo_info(self, height): def read_undo_info(self, height):
"""Read undo information from a file for the current height.""" """Read undo information from a file for the current height."""
return self.utxo_db.get(self.undo_key(height)) return self.db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos):
"""undo_infos is a list of (undo_info, height) pairs."""
for undo_info, height in undo_infos:
batch_put(self.undo_key(height), b''.join(undo_info))
def raw_block_prefix(self): def raw_block_prefix(self):
return 'block' return 'block'
@ -606,17 +734,16 @@ class LevelDB:
def clear_excess_undo_info(self): def clear_excess_undo_info(self):
"""Clear excess undo info. Only most recent N are kept.""" """Clear excess undo info. Only most recent N are kept."""
prefix = b'U'
min_height = self.min_undo_height(self.db_height) min_height = self.min_undo_height(self.db_height)
keys = [] keys = []
for key, hist in self.utxo_db.iterator(prefix=prefix): for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
height, = unpack('>I', key[-4:]) height, = unpack('>I', key[-4:])
if height >= min_height: if height >= min_height:
break break
keys.append(key) keys.append(key)
if keys: if keys:
with self.utxo_db.write_batch() as batch: with self.db.write_batch() as batch:
for key in keys: for key in keys:
batch.delete(key) batch.delete(key)
self.logger.info(f'deleted {len(keys):,d} stale undo entries') self.logger.info(f'deleted {len(keys):,d} stale undo entries')
@ -637,7 +764,7 @@ class LevelDB:
# -- UTXO database # -- UTXO database
def read_utxo_state(self): def read_utxo_state(self):
state = self.utxo_db.get(b'state') state = self.db.get(UTXO_STATE)
if not state: if not state:
self.db_height = -1 self.db_height = -1
self.db_tx_count = 0 self.db_tx_count = 0
@ -680,7 +807,7 @@ class LevelDB:
self.logger.info(f'height: {self.db_height:,d}') self.logger.info(f'height: {self.db_height:,d}')
self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}') self.logger.info(f'tip: {hash_to_hex_str(self.db_tip)}')
self.logger.info(f'tx count: {self.db_tx_count:,d}') self.logger.info(f'tx count: {self.db_tx_count:,d}')
if self.utxo_db.for_sync: if self.db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB') self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync: if self.first_sync:
self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}') self.logger.info(f'sync time so far: {util.formatted_time(self.wall_time)}')
@ -697,11 +824,11 @@ class LevelDB:
'first_sync': self.first_sync, 'first_sync': self.first_sync,
'db_version': self.db_version, 'db_version': self.db_version,
} }
batch.put(b'state', repr(state).encode()) batch.put(UTXO_STATE, repr(state).encode())
def set_flush_count(self, count): def set_flush_count(self, count):
self.utxo_flush_count = count self.utxo_flush_count = count
with self.utxo_db.write_batch() as batch: with self.db.write_batch() as batch:
self.write_utxo_state(batch) self.write_utxo_state(batch)
async def all_utxos(self, hashX): async def all_utxos(self, hashX):
@ -713,8 +840,8 @@ class LevelDB:
fs_tx_hash = self.fs_tx_hash fs_tx_hash = self.fs_tx_hash
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX prefix = UTXO_PREFIX + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix): for db_key, db_value in self.db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
tx_hash, height = fs_tx_hash(tx_num) tx_hash, height = fs_tx_hash(tx_num)
@ -744,10 +871,10 @@ class LevelDB:
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num # Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX # Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches. # Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.utxo_db.iterator(prefix=prefix): for db_key, hashX in self.db.iterator(prefix=prefix):
tx_num_packed = db_key[-4:] tx_num_packed = db_key[-4:]
tx_num, = unpack('<I', tx_num_packed) tx_num, = unpack('<I', tx_num_packed)
hash, height = self.fs_tx_hash(tx_num) hash, height = self.fs_tx_hash(tx_num)
@ -765,8 +892,8 @@ class LevelDB:
return None return None
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + suffix key = UTXO_PREFIX + hashX + suffix
db_value = self.utxo_db.get(key) db_value = self.db.get(key)
if not db_value: if not db_value:
# This can happen if the DB was updated between # This can happen if the DB was updated between
# getting the hashXs and getting the UTXOs # getting the hashXs and getting the UTXOs

View file

@ -43,10 +43,12 @@ class Merkle:
def __init__(self, hash_func=double_sha256): def __init__(self, hash_func=double_sha256):
self.hash_func = hash_func self.hash_func = hash_func
def tree_depth(self, hash_count): @staticmethod
return self.branch_length(hash_count) + 1 def tree_depth(hash_count):
return Merkle.branch_length(hash_count) + 1
def branch_length(self, hash_count): @staticmethod
def branch_length(hash_count):
"""Return the length of a merkle branch given the number of hashes.""" """Return the length of a merkle branch given the number of hashes."""
if not isinstance(hash_count, int): if not isinstance(hash_count, int):
raise TypeError('hash_count must be an integer') raise TypeError('hash_count must be an integer')
@ -54,7 +56,8 @@ class Merkle:
raise ValueError('hash_count must be at least 1') raise ValueError('hash_count must be at least 1')
return ceil(log(hash_count, 2)) return ceil(log(hash_count, 2))
def branch_and_root(self, hashes, index, length=None): @staticmethod
def branch_and_root(hashes, index, length=None, hash_func=double_sha256):
"""Return a (merkle branch, merkle_root) pair given hashes, and the """Return a (merkle branch, merkle_root) pair given hashes, and the
index of one of those hashes. index of one of those hashes.
""" """
@ -64,7 +67,7 @@ class Merkle:
# This also asserts hashes is not empty # This also asserts hashes is not empty
if not 0 <= index < len(hashes): if not 0 <= index < len(hashes):
raise ValueError(f"index '{index}/{len(hashes)}' out of range") raise ValueError(f"index '{index}/{len(hashes)}' out of range")
natural_length = self.branch_length(len(hashes)) natural_length = Merkle.branch_length(len(hashes))
if length is None: if length is None:
length = natural_length length = natural_length
else: else:
@ -73,7 +76,6 @@ class Merkle:
if length < natural_length: if length < natural_length:
raise ValueError('length out of range') raise ValueError('length out of range')
hash_func = self.hash_func
branch = [] branch = []
for _ in range(length): for _ in range(length):
if len(hashes) & 1: if len(hashes) & 1:
@ -85,44 +87,47 @@ class Merkle:
return branch, hashes[0] return branch, hashes[0]
def root(self, hashes, length=None): @staticmethod
def root(hashes, length=None):
"""Return the merkle root of a non-empty iterable of binary hashes.""" """Return the merkle root of a non-empty iterable of binary hashes."""
branch, root = self.branch_and_root(hashes, 0, length) branch, root = Merkle.branch_and_root(hashes, 0, length)
return root return root
def root_from_proof(self, hash, branch, index): # @staticmethod
"""Return the merkle root given a hash, a merkle branch to it, and # def root_from_proof(hash, branch, index, hash_func=double_sha256):
its index in the hashes array. # """Return the merkle root given a hash, a merkle branch to it, and
# its index in the hashes array.
#
# branch is an iterable sorted deepest to shallowest. If the
# returned root is the expected value then the merkle proof is
# verified.
#
# The caller should have confirmed the length of the branch with
# branch_length(). Unfortunately this is not easily done for
# bitcoin transactions as the number of transactions in a block
# is unknown to an SPV client.
# """
# for elt in branch:
# if index & 1:
# hash = hash_func(elt + hash)
# else:
# hash = hash_func(hash + elt)
# index >>= 1
# if index:
# raise ValueError('index out of range for branch')
# return hash
branch is an iterable sorted deepest to shallowest. If the @staticmethod
returned root is the expected value then the merkle proof is def level(hashes, depth_higher):
verified.
The caller should have confirmed the length of the branch with
branch_length(). Unfortunately this is not easily done for
bitcoin transactions as the number of transactions in a block
is unknown to an SPV client.
"""
hash_func = self.hash_func
for elt in branch:
if index & 1:
hash = hash_func(elt + hash)
else:
hash = hash_func(hash + elt)
index >>= 1
if index:
raise ValueError('index out of range for branch')
return hash
def level(self, hashes, depth_higher):
"""Return a level of the merkle tree of hashes the given depth """Return a level of the merkle tree of hashes the given depth
higher than the bottom row of the original tree.""" higher than the bottom row of the original tree."""
size = 1 << depth_higher size = 1 << depth_higher
root = self.root root = Merkle.root
return [root(hashes[n: n + size], depth_higher) return [root(hashes[n: n + size], depth_higher)
for n in range(0, len(hashes), size)] for n in range(0, len(hashes), size)]
def branch_and_root_from_level(self, level, leaf_hashes, index, @staticmethod
def branch_and_root_from_level(level, leaf_hashes, index,
depth_higher): depth_higher):
"""Return a (merkle branch, merkle_root) pair when a merkle-tree has a """Return a (merkle branch, merkle_root) pair when a merkle-tree has a
level cached. level cached.
@ -146,10 +151,10 @@ class Merkle:
if not isinstance(leaf_hashes, list): if not isinstance(leaf_hashes, list):
raise TypeError("leaf_hashes must be a list") raise TypeError("leaf_hashes must be a list")
leaf_index = (index >> depth_higher) << depth_higher leaf_index = (index >> depth_higher) << depth_higher
leaf_branch, leaf_root = self.branch_and_root( leaf_branch, leaf_root = Merkle.branch_and_root(
leaf_hashes, index - leaf_index, depth_higher) leaf_hashes, index - leaf_index, depth_higher)
index >>= depth_higher index >>= depth_higher
level_branch, root = self.branch_and_root(level, index) level_branch, root = Merkle.branch_and_root(level, index)
# Check last so that we know index is in-range # Check last so that we know index is in-range
if leaf_root != level[index]: if leaf_root != level[index]:
raise ValueError('leaf hashes inconsistent with level') raise ValueError('leaf hashes inconsistent with level')