consolidate leveldb block advance/reorg

-move methods from History to LevelDB
This commit is contained in:
Jack Robison 2021-01-11 18:13:39 -05:00
parent 9f224a971b
commit 62cc6dfe76
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 403 additions and 298 deletions

View file

@ -384,7 +384,7 @@ class BlockProcessor:
one_MB = 1000*1000 one_MB = 1000*1000
utxo_cache_size = len(self.utxo_cache) * 205 utxo_cache_size = len(self.utxo_cache) * 205
db_deletes_size = len(self.db_deletes) * 57 db_deletes_size = len(self.db_deletes) * 57
hist_cache_size = self.db.history.unflushed_memsize() hist_cache_size = len(self.db.history.unflushed) * 180 + self.db.history.unflushed_count * 4
# Roughly ntxs * 32 + nblocks * 42 # Roughly ntxs * 32 + nblocks * 42
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
+ (self.height - self.db.fs_height) * 42) + (self.height - self.db.fs_height) * 42)
@ -475,7 +475,16 @@ class BlockProcessor:
self.db.total_transactions.append(tx_hash) self.db.total_transactions.append(tx_hash)
tx_num += 1 tx_num += 1
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) # self.db.add_unflushed(hashXs_by_tx, self.tx_count)
first_tx_num = self.tx_count
_unflushed = self.db.history.unflushed
_count = 0
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
for _hashX in set(_hashXs):
_unflushed[_hashX].append(_tx_num)
_count += len(_hashXs)
self.db.history.unflushed_count += _count
self.tx_count = tx_num self.tx_count = tx_num
self.db.tx_counts.append(tx_num) self.db.tx_counts.append(tx_num)

View file

@ -34,150 +34,122 @@ class History:
self.max_hist_row_entries = 12500 self.max_hist_row_entries = 12500
self.unflushed = defaultdict(partial(array.array, 'I')) self.unflushed = defaultdict(partial(array.array, 'I'))
self.unflushed_count = 0 self.unflushed_count = 0
self.db = None
def open_db(self, db, for_sync, utxo_flush_count, compacting):
self.db = db #db_class('hist', for_sync)
self.read_state()
self.clear_excess(utxo_flush_count)
# An incomplete compaction needs to be cancelled otherwise
# restarting it will corrupt the history
if not compacting:
self._cancel_compaction()
return self.flush_count
def close_db(self):
if self.db:
# self.db.close()
self.db = None
def read_state(self):
state = self.db.get(HIST_STATE)
if state:
state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise RuntimeError('failed reading state from history DB')
self.flush_count = state['flush_count']
self.comp_flush_count = state.get('comp_flush_count', -1)
self.comp_cursor = state.get('comp_cursor', -1)
self.db_version = state.get('db_version', 0)
else:
self.flush_count = 0 self.flush_count = 0
self.comp_flush_count = -1 self.comp_flush_count = -1
self.comp_cursor = -1 self.comp_cursor = -1
self.db_version = max(self.DB_VERSIONS) # self.db = None
self.logger.info(f'history DB version: {self.db_version}') # def close_db(self):
if self.db_version not in self.DB_VERSIONS: # if self.db:
msg = f'this software only handles DB versions {self.DB_VERSIONS}' # # self.db.close()
self.logger.error(msg) # self.db = None
raise RuntimeError(msg)
self.logger.info(f'flush count: {self.flush_count:,d}')
def clear_excess(self, utxo_flush_count): # def read_state(self):
# < might happen at end of compaction as both DBs cannot be # state = self.db.get(HIST_STATE)
# updated atomically # if state:
if self.flush_count <= utxo_flush_count: # state = ast.literal_eval(state.decode())
return # if not isinstance(state, dict):
# raise RuntimeError('failed reading state from history DB')
# self.flush_count = state['flush_count']
# self.comp_flush_count = state.get('comp_flush_count', -1)
# self.comp_cursor = state.get('comp_cursor', -1)
# self.db_version = state.get('db_version', 0)
# else:
# self.flush_count = 0
# self.comp_flush_count = -1
# self.comp_cursor = -1
# self.db_version = max(self.DB_VERSIONS)
#
# self.logger.info(f'history DB version: {self.db_version}')
# if self.db_version not in self.DB_VERSIONS:
# msg = f'this software only handles DB versions {self.DB_VERSIONS}'
# self.logger.error(msg)
# raise RuntimeError(msg)
# self.logger.info(f'flush count: {self.flush_count:,d}')
self.logger.info('DB shut down uncleanly. Scanning for ' # def clear_excess(self, utxo_flush_count):
'excess history flushes...') # # < might happen at end of compaction as both DBs cannot be
# # updated atomically
# if self.flush_count <= utxo_flush_count:
# return
#
# self.logger.info('DB shut down uncleanly. Scanning for '
# 'excess history flushes...')
#
# keys = []
# for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
# k = key[1:]
# flush_id, = unpack_be_uint16_from(k[-2:])
# if flush_id > utxo_flush_count:
# keys.append(k)
#
# self.logger.info(f'deleting {len(keys):,d} history entries')
#
# self.flush_count = utxo_flush_count
# with self.db.write_batch() as batch:
# for key in keys:
# batch.delete(HASHX_HISTORY_PREFIX + key)
# self.write_state(batch)
#
# self.logger.info('deleted excess history entries')
#
# def write_state(self, batch):
# """Write state to the history DB."""
# state = {
# 'flush_count': self.flush_count,
# 'comp_flush_count': self.comp_flush_count,
# 'comp_cursor': self.comp_cursor,
# 'db_version': self.db_version,
# }
# # History entries are not prefixed; the suffix \0\0 ensures we
# # look similar to other entries and aren't interfered with
# batch.put(HIST_STATE, repr(state).encode())
keys = [] # def add_unflushed(self, hashXs_by_tx, first_tx_num):
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): # unflushed = self.unflushed
k = key[1:] # count = 0
flush_id, = unpack_be_uint16_from(k[-2:]) # for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
if flush_id > utxo_flush_count: # hashXs = set(hashXs)
keys.append(k) # for hashX in hashXs:
# unflushed[hashX].append(tx_num)
# count += len(hashXs)
# self.unflushed_count += count
self.logger.info(f'deleting {len(keys):,d} history entries') # def unflushed_memsize(self):
# return len(self.unflushed) * 180 + self.unflushed_count * 4
self.flush_count = utxo_flush_count # def assert_flushed(self):
with self.db.write_batch() as batch: # assert not self.unflushed
for key in keys:
batch.delete(HASHX_HISTORY_PREFIX + key)
self.write_state(batch)
self.logger.info('deleted excess history entries') # def backup(self, hashXs, tx_count):
# # Not certain this is needed, but it doesn't hurt
def write_state(self, batch): # self.flush_count += 1
"""Write state to the history DB.""" # nremoves = 0
state = { # bisect_left = bisect.bisect_left
'flush_count': self.flush_count, #
'comp_flush_count': self.comp_flush_count, # with self.db.write_batch() as batch:
'comp_cursor': self.comp_cursor, # for hashX in sorted(hashXs):
'db_version': self.db_version, # deletes = []
} # puts = {}
# History entries are not prefixed; the suffix \0\0 ensures we # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
# look similar to other entries and aren't interfered with # k = key[1:]
batch.put(HIST_STATE, repr(state).encode()) # a = array.array('I')
# a.frombytes(hist)
def add_unflushed(self, hashXs_by_tx, first_tx_num): # # Remove all history entries >= tx_count
unflushed = self.unflushed # idx = bisect_left(a, tx_count)
count = 0 # nremoves += len(a) - idx
for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): # if idx > 0:
hashXs = set(hashXs) # puts[k] = a[:idx].tobytes()
for hashX in hashXs: # break
unflushed[hashX].append(tx_num) # deletes.append(k)
count += len(hashXs) #
self.unflushed_count += count # for key in deletes:
# batch.delete(key)
def unflushed_memsize(self): # for key, value in puts.items():
return len(self.unflushed) * 180 + self.unflushed_count * 4 # batch.put(key, value)
# self.write_state(batch)
def assert_flushed(self): #
assert not self.unflushed # self.logger.info(f'backing up removed {nremoves:,d} history entries')
def flush(self):
start_time = time.time()
self.flush_count += 1
flush_id = pack_be_uint16(self.flush_count)
unflushed = self.unflushed
with self.db.write_batch() as batch:
for hashX in sorted(unflushed):
key = hashX + flush_id
batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.write_state(batch)
count = len(unflushed)
unflushed.clear()
self.unflushed_count = 0
if self.db.for_sync:
elapsed = time.time() - start_time
self.logger.info(f'flushed history in {elapsed:.1f}s '
f'for {count:,d} addrs')
def backup(self, hashXs, tx_count):
# Not certain this is needed, but it doesn't hurt
self.flush_count += 1
nremoves = 0
bisect_left = bisect.bisect_left
with self.db.write_batch() as batch:
for hashX in sorted(hashXs):
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
k = key[1:]
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= tx_count
idx = bisect_left(a, tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[k] = a[:idx].tobytes()
break
deletes.append(k)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.write_state(batch)
self.logger.info(f'backing up removed {nremoves:,d} history entries')
# def get_txnums(self, hashX, limit=1000): # def get_txnums(self, hashX, limit=1000):
# """Generator that returns an unpruned, sorted list of tx_nums in the # """Generator that returns an unpruned, sorted list of tx_nums in the
@ -213,119 +185,120 @@ class History:
# When compaction is complete and the final flush takes place, # When compaction is complete and the final flush takes place,
# flush_count is reset to comp_flush_count, and comp_flush_count to -1 # flush_count is reset to comp_flush_count, and comp_flush_count to -1
def _flush_compaction(self, cursor, write_items, keys_to_delete): # def _flush_compaction(self, cursor, write_items, keys_to_delete):
"""Flush a single compaction pass as a batch.""" # """Flush a single compaction pass as a batch."""
# Update compaction state # # Update compaction state
if cursor == 65536: # if cursor == 65536:
self.flush_count = self.comp_flush_count # self.flush_count = self.comp_flush_count
self.comp_cursor = -1 # self.comp_cursor = -1
self.comp_flush_count = -1 # self.comp_flush_count = -1
else: # else:
self.comp_cursor = cursor # self.comp_cursor = cursor
#
# # History DB. Flush compacted history and updated state
# with self.db.write_batch() as batch:
# # Important: delete first! The keyspace may overlap.
# for key in keys_to_delete:
# batch.delete(HASHX_HISTORY_PREFIX + key)
# for key, value in write_items:
# batch.put(HASHX_HISTORY_PREFIX + key, value)
# self.write_state(batch)
# History DB. Flush compacted history and updated state # def _compact_hashX(self, hashX, hist_map, hist_list,
with self.db.write_batch() as batch: # write_items, keys_to_delete):
# Important: delete first! The keyspace may overlap. # """Compress history for a hashX. hist_list is an ordered list of
for key in keys_to_delete: # the histories to be compressed."""
batch.delete(HASHX_HISTORY_PREFIX + key) # # History entries (tx numbers) are 4 bytes each. Distribute
for key, value in write_items: # # over rows of up to 50KB in size. A fixed row size means
batch.put(HASHX_HISTORY_PREFIX + key, value) # # future compactions will not need to update the first N - 1
self.write_state(batch) # # rows.
# max_row_size = self.max_hist_row_entries * 4
# full_hist = b''.join(hist_list)
# nrows = (len(full_hist) + max_row_size - 1) // max_row_size
# if nrows > 4:
# self.logger.info('hashX {} is large: {:,d} entries across '
# '{:,d} rows'
# .format(hash_to_hex_str(hashX),
# len(full_hist) // 4, nrows))
#
# # Find what history needs to be written, and what keys need to
# # be deleted. Start by assuming all keys are to be deleted,
# # and then remove those that are the same on-disk as when
# # compacted.
# write_size = 0
# keys_to_delete.update(hist_map)
# for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
# key = hashX + pack_be_uint16(n)
# if hist_map.get(key) == chunk:
# keys_to_delete.remove(key)
# else:
# write_items.append((key, chunk))
# write_size += len(chunk)
#
# assert n + 1 == nrows
# self.comp_flush_count = max(self.comp_flush_count, n)
#
# return write_size
def _compact_hashX(self, hashX, hist_map, hist_list, # def _compact_prefix(self, prefix, write_items, keys_to_delete):
write_items, keys_to_delete): # """Compact all history entries for hashXs beginning with the
"""Compress history for a hashX. hist_list is an ordered list of # given prefix. Update keys_to_delete and write."""
the histories to be compressed.""" # prior_hashX = None
# History entries (tx numbers) are 4 bytes each. Distribute # hist_map = {}
# over rows of up to 50KB in size. A fixed row size means # hist_list = []
# future compactions will not need to update the first N - 1 #
# rows. # key_len = HASHX_LEN + 2
max_row_size = self.max_hist_row_entries * 4 # write_size = 0
full_hist = b''.join(hist_list) # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
nrows = (len(full_hist) + max_row_size - 1) // max_row_size # k = key[1:]
if nrows > 4: # # Ignore non-history entries
self.logger.info('hashX {} is large: {:,d} entries across ' # if len(k) != key_len:
'{:,d} rows' # continue
.format(hash_to_hex_str(hashX), # hashX = k[:-2]
len(full_hist) // 4, nrows)) # if hashX != prior_hashX and prior_hashX:
# write_size += self._compact_hashX(prior_hashX, hist_map,
# hist_list, write_items,
# keys_to_delete)
# hist_map.clear()
# hist_list.clear()
# prior_hashX = hashX
# hist_map[k] = hist
# hist_list.append(hist)
#
# if prior_hashX:
# write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
# write_items, keys_to_delete)
# return write_size
# Find what history needs to be written, and what keys need to # def _compact_history(self, limit):
# be deleted. Start by assuming all keys are to be deleted, # """Inner loop of history compaction. Loops until limit bytes have
# and then remove those that are the same on-disk as when # been processed.
# compacted. # """
write_size = 0 # fnord
keys_to_delete.update(hist_map) # keys_to_delete = set()
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): # write_items = [] # A list of (key, value) pairs
key = hashX + pack_be_uint16(n) # write_size = 0
if hist_map.get(key) == chunk: #
keys_to_delete.remove(key) # # Loop over 2-byte prefixes
else: # cursor = self.comp_cursor
write_items.append((key, chunk)) # while write_size < limit and cursor < 65536:
write_size += len(chunk) # prefix = pack_be_uint16(cursor)
# write_size += self._compact_prefix(prefix, write_items,
# keys_to_delete)
# cursor += 1
#
# max_rows = self.comp_flush_count + 1
# self._flush_compaction(cursor, write_items, keys_to_delete)
#
# self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
# 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
# .format(len(write_items), write_size / 1000000,
# len(keys_to_delete), max_rows,
# 100 * cursor / 65536))
# return write_size
assert n + 1 == nrows # def _cancel_compaction(self):
self.comp_flush_count = max(self.comp_flush_count, n) # if self.comp_cursor != -1:
# self.logger.warning('cancelling in-progress history compaction')
return write_size # self.comp_flush_count = -1
# self.comp_cursor = -1
def _compact_prefix(self, prefix, write_items, keys_to_delete):
"""Compact all history entries for hashXs beginning with the
given prefix. Update keys_to_delete and write."""
prior_hashX = None
hist_map = {}
hist_list = []
key_len = HASHX_LEN + 2
write_size = 0
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
k = key[1:]
# Ignore non-history entries
if len(k) != key_len:
continue
hashX = k[:-2]
if hashX != prior_hashX and prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map,
hist_list, write_items,
keys_to_delete)
hist_map.clear()
hist_list.clear()
prior_hashX = hashX
hist_map[k] = hist
hist_list.append(hist)
if prior_hashX:
write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
write_items, keys_to_delete)
return write_size
def _compact_history(self, limit):
"""Inner loop of history compaction. Loops until limit bytes have
been processed.
"""
keys_to_delete = set()
write_items = [] # A list of (key, value) pairs
write_size = 0
# Loop over 2-byte prefixes
cursor = self.comp_cursor
while write_size < limit and cursor < 65536:
prefix = pack_be_uint16(cursor)
write_size += self._compact_prefix(prefix, write_items,
keys_to_delete)
cursor += 1
max_rows = self.comp_flush_count + 1
self._flush_compaction(cursor, write_items, keys_to_delete)
self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
.format(len(write_items), write_size / 1000000,
len(keys_to_delete), max_rows,
100 * cursor / 65536))
return write_size
def _cancel_compaction(self):
if self.comp_cursor != -1:
self.logger.warning('cancelling in-progress history compaction')
self.comp_flush_count = -1
self.comp_cursor = -1

View file

@ -19,7 +19,7 @@ import zlib
import typing import typing
from typing import Optional, List, Tuple, Iterable from typing import Optional, List, Tuple, Iterable
from asyncio import sleep from asyncio import sleep
from bisect import bisect_right from bisect import bisect_right, bisect_left
from collections import namedtuple from collections import namedtuple
from glob import glob from glob import glob
from struct import pack, unpack from struct import pack, unpack
@ -29,7 +29,7 @@ from lbry.utils import LRUCacheWithMetrics
from lbry.wallet.server import util from lbry.wallet.server import util
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.merkle import Merkle, MerkleCache
from lbry.wallet.server.util import formatted_time, pack_be_uint16 from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
from lbry.wallet.server.storage import db_class from lbry.wallet.server.storage import db_class
from lbry.wallet.server.history import History from lbry.wallet.server.history import History
@ -41,6 +41,7 @@ BLOCK_HASH_PREFIX = b'C'
HEADER_PREFIX = b'H' HEADER_PREFIX = b'H'
TX_NUM_PREFIX = b'N' TX_NUM_PREFIX = b'N'
TX_COUNT_PREFIX = b'T' TX_COUNT_PREFIX = b'T'
UNDO_PREFIX = b'U'
TX_HASH_PREFIX = b'X' TX_HASH_PREFIX = b'X'
HASHX_UTXO_PREFIX = b'h' HASHX_UTXO_PREFIX = b'h'
@ -50,9 +51,6 @@ UTXO_PREFIX = b'u'
HASHX_HISTORY_PREFIX = b'x' HASHX_HISTORY_PREFIX = b'x'
@attr.s(slots=True) @attr.s(slots=True)
class FlushData: class FlushData:
height = attr.ib() height = attr.ib()
@ -107,6 +105,19 @@ class LevelDB:
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
self.total_transactions = None self.total_transactions = None
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
# unflushed = self.history.unflushed
# count = 0
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
# hashXs = set(hashXs)
# for hashX in hashXs:
# unflushed[hashX].append(tx_num)
# count += len(hashXs)
# self.history.unflushed_count += count
# def unflushed_memsize(self):
# return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4
async def _read_tx_counts(self): async def _read_tx_counts(self):
if self.tx_counts is not None: if self.tx_counts is not None:
return return
@ -172,10 +183,88 @@ class LevelDB:
self.read_utxo_state() self.read_utxo_state()
# Then history DB # Then history DB
self.utxo_flush_count = self.history.open_db( state = self.db.get(HIST_STATE)
self.db, for_sync, self.utxo_flush_count, compacting if state:
) state = ast.literal_eval(state.decode())
self.clear_excess_undo_info() if not isinstance(state, dict):
raise RuntimeError('failed reading state from history DB')
self.history.flush_count = state['flush_count']
self.history.comp_flush_count = state.get('comp_flush_count', -1)
self.history.comp_cursor = state.get('comp_cursor', -1)
self.history.db_version = state.get('db_version', 0)
else:
self.history.flush_count = 0
self.history.comp_flush_count = -1
self.history.comp_cursor = -1
self.history.db_version = max(self.DB_VERSIONS)
self.logger.info(f'history DB version: {self.history.db_version}')
if self.history.db_version not in self.DB_VERSIONS:
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
self.logger.error(msg)
raise RuntimeError(msg)
self.logger.info(f'flush count: {self.history.flush_count:,d}')
# self.history.clear_excess(self.utxo_flush_count)
# < might happen at end of compaction as both DBs cannot be
# updated atomically
if self.history.flush_count > self.utxo_flush_count:
self.logger.info('DB shut down uncleanly. Scanning for '
'excess history flushes...')
keys = []
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
k = key[1:]
flush_id, = unpack_be_uint16_from(k[-2:])
if flush_id > self.utxo_flush_count:
keys.append(k)
self.logger.info(f'deleting {len(keys):,d} history entries')
self.history.flush_count = self.utxo_flush_count
with self.db.write_batch() as batch:
for key in keys:
batch.delete(HASHX_HISTORY_PREFIX + key)
state = {
'flush_count': self.history.flush_count,
'comp_flush_count': self.history.comp_flush_count,
'comp_cursor': self.history.comp_cursor,
'db_version': self.history.db_version,
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(HIST_STATE, repr(state).encode())
self.logger.info('deleted excess history entries')
self.utxo_flush_count = self.history.flush_count
min_height = self.min_undo_height(self.db_height)
keys = []
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
height, = unpack('>I', key[-4:])
if height >= min_height:
break
keys.append(key)
if keys:
with self.db.write_batch() as batch:
for key in keys:
batch.delete(key)
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
# delete old block files
prefix = self.raw_block_prefix()
paths = [path for path in glob(f'{prefix}[0-9]*')
if len(path) > len(prefix)
and int(path[len(prefix):]) < min_height]
if paths:
for path in paths:
try:
os.remove(path)
except FileNotFoundError:
pass
self.logger.info(f'deleted {len(paths):,d} stale block files')
# Read TX counts (requires meta directory) # Read TX counts (requires meta directory)
await self._read_tx_counts() await self._read_tx_counts()
@ -185,7 +274,6 @@ class LevelDB:
def close(self): def close(self):
self.db.close() self.db.close()
self.history.close_db()
self.executor.shutdown(wait=True) self.executor.shutdown(wait=True)
self.executor = None self.executor = None
@ -240,7 +328,7 @@ class LevelDB:
assert not flush_data.adds assert not flush_data.adds
assert not flush_data.deletes assert not flush_data.deletes
assert not flush_data.undo_infos assert not flush_data.undo_infos
self.history.assert_flushed() assert not self.history.unflushed
def flush_utxo_db(self, batch, flush_data): def flush_utxo_db(self, batch, flush_data):
"""Flush the cached DB writes and UTXO set to the batch.""" """Flush the cached DB writes and UTXO set to the batch."""
@ -263,12 +351,13 @@ class LevelDB:
# suffix = tx_idx + tx_num # suffix = tx_idx + tx_num
hashX = value[:-12] hashX = value[:-12]
suffix = key[-2:] + value[-12:-8] suffix = key[-2:] + value[-12:-8]
batch_put(b'h' + key[:4] + suffix, hashX) batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
batch_put(b'u' + hashX + suffix, value[-8:]) batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
flush_data.adds.clear() flush_data.adds.clear()
# New undo information # New undo information
self.flush_undo_infos(batch_put, flush_data.undo_infos) for undo_info, height in flush_data.undo_infos:
batch_put(self.undo_key(height), b''.join(undo_info))
flush_data.undo_infos.clear() flush_data.undo_infos.clear()
if self.db.for_sync: if self.db.for_sync:
@ -285,6 +374,17 @@ class LevelDB:
self.db_tx_count = flush_data.tx_count self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip self.db_tip = flush_data.tip
def write_history_state(self, batch):
state = {
'flush_count': self.history.flush_count,
'comp_flush_count': self.history.comp_flush_count,
'comp_cursor': self.history.comp_cursor,
'db_version': self.db_version,
}
# History entries are not prefixed; the suffix \0\0 ensures we
# look similar to other entries and aren't interfered with
batch.put(HIST_STATE, repr(state).encode())
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
"""Flush out cached state. History is always flushed; UTXOs are """Flush out cached state. History is always flushed; UTXOs are
flushed if flush_utxos.""" flushed if flush_utxos."""
@ -351,7 +451,7 @@ class LevelDB:
for hashX in sorted(unflushed): for hashX in sorted(unflushed):
key = hashX + flush_id key = hashX + flush_id
batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes()) batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
self.history.write_state(batch) self.write_history_state(batch)
unflushed.clear() unflushed.clear()
self.history.unflushed_count = 0 self.history.unflushed_count = 0
@ -396,45 +496,74 @@ class LevelDB:
self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
f'ETA: {formatted_time(eta)}') f'ETA: {formatted_time(eta)}')
def flush_state(self, batch): # def flush_state(self, batch):
"""Flush chain state to the batch.""" # """Flush chain state to the batch."""
now = time.time() # now = time.time()
self.wall_time += now - self.last_flush # self.wall_time += now - self.last_flush
self.last_flush = now # self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count # self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch) # self.write_utxo_state(batch)
def flush_backup(self, flush_data, touched): def flush_backup(self, flush_data, touched):
"""Like flush_dbs() but when backing up. All UTXOs are flushed.""" """Like flush_dbs() but when backing up. All UTXOs are flushed."""
assert not flush_data.headers assert not flush_data.headers
assert not flush_data.block_txs assert not flush_data.block_txs
assert flush_data.height < self.db_height assert flush_data.height < self.db_height
self.history.assert_flushed() assert not self.history.unflushed
start_time = time.time() start_time = time.time()
tx_delta = flush_data.tx_count - self.last_flush_tx_count tx_delta = flush_data.tx_count - self.last_flush_tx_count
###
while self.fs_height > flush_data.height:
self.fs_height -= 1
self.headers.pop()
self.fs_tx_count = flush_data.tx_count
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(flush_data.height + 1)
###
# Not certain this is needed, but it doesn't hurt
self.history.flush_count += 1
nremoves = 0
self.backup_fs(flush_data.height, flush_data.tx_count)
self.history.backup(touched, flush_data.tx_count)
with self.db.write_batch() as batch: with self.db.write_batch() as batch:
tx_count = flush_data.tx_count
for hashX in sorted(touched):
deletes = []
puts = {}
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
k = key[1:]
a = array.array('I')
a.frombytes(hist)
# Remove all history entries >= tx_count
idx = bisect_left(a, tx_count)
nremoves += len(a) - idx
if idx > 0:
puts[k] = a[:idx].tobytes()
break
deletes.append(k)
for key in deletes:
batch.delete(key)
for key, value in puts.items():
batch.put(key, value)
self.write_history_state(batch)
self.flush_utxo_db(batch, flush_data) self.flush_utxo_db(batch, flush_data)
# Flush state last as it reads the wall time. # Flush state last as it reads the wall time.
self.flush_state(batch) now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
self.last_flush_tx_count = self.fs_tx_count
self.write_utxo_state(batch)
self.logger.info(f'backing up removed {nremoves:,d} history entries')
elapsed = self.last_flush - start_time elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
def backup_fs(self, height, tx_count):
"""Back up during a reorg. This just updates our pointers."""
while self.fs_height > height:
self.fs_height -= 1
self.headers.pop()
self.fs_tx_count = tx_count
# Truncate header_mc: header count is 1 more than the height.
self.header_mc.truncate(height + 1)
def raw_header(self, height): def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""
header, n = self.read_headers(height, 1) header, n = self.read_headers(height, 1)
@ -555,7 +684,7 @@ class LevelDB:
cnt = 0 cnt = 0
txs = [] txs = []
for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): for hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
a = array.array('I') a = array.array('I')
a.frombytes(hist) a.frombytes(hist)
for tx_num in a: for tx_num in a:
@ -586,17 +715,12 @@ class LevelDB:
def undo_key(self, height): def undo_key(self, height):
"""DB key for undo information at the given height.""" """DB key for undo information at the given height."""
return b'U' + pack('>I', height) return UNDO_PREFIX + pack('>I', height)
def read_undo_info(self, height): def read_undo_info(self, height):
"""Read undo information from a file for the current height.""" """Read undo information from a file for the current height."""
return self.db.get(self.undo_key(height)) return self.db.get(self.undo_key(height))
def flush_undo_infos(self, batch_put, undo_infos):
"""undo_infos is a list of (undo_info, height) pairs."""
for undo_info, height in undo_infos:
batch_put(self.undo_key(height), b''.join(undo_info))
def raw_block_prefix(self): def raw_block_prefix(self):
return 'block' return 'block'
@ -626,10 +750,9 @@ class LevelDB:
def clear_excess_undo_info(self): def clear_excess_undo_info(self):
"""Clear excess undo info. Only most recent N are kept.""" """Clear excess undo info. Only most recent N are kept."""
prefix = b'U'
min_height = self.min_undo_height(self.db_height) min_height = self.min_undo_height(self.db_height)
keys = [] keys = []
for key, hist in self.db.iterator(prefix=prefix): for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
height, = unpack('>I', key[-4:]) height, = unpack('>I', key[-4:])
if height >= min_height: if height >= min_height:
break break
@ -733,7 +856,7 @@ class LevelDB:
fs_tx_hash = self.fs_tx_hash fs_tx_hash = self.fs_tx_hash
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX prefix = UTXO_PREFIX + hashX
for db_key, db_value in self.db.iterator(prefix=prefix): for db_key, db_value in self.db.iterator(prefix=prefix):
tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
@ -764,7 +887,7 @@ class LevelDB:
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num # Key: b'h' + compressed_tx_hash + tx_idx + tx_num
# Value: hashX # Value: hashX
prefix = b'h' + tx_hash[:4] + idx_packed prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
# Find which entry, if any, the TX_HASH matches. # Find which entry, if any, the TX_HASH matches.
for db_key, hashX in self.db.iterator(prefix=prefix): for db_key, hashX in self.db.iterator(prefix=prefix):
@ -785,7 +908,7 @@ class LevelDB:
return None return None
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
key = b'u' + hashX + suffix key = UTXO_PREFIX + hashX + suffix
db_value = self.db.get(key) db_value = self.db.get(key)
if not db_value: if not db_value:
# This can happen if the DB was updated between # This can happen if the DB was updated between