forked from LBRYCommunity/lbry-sdk
consolidate leveldb block advance/reorg
-move methods from History to LevelDB
This commit is contained in:
parent
3ed748f2fd
commit
bc7fe680c0
3 changed files with 403 additions and 298 deletions
|
@ -384,7 +384,7 @@ class BlockProcessor:
|
|||
one_MB = 1000*1000
|
||||
utxo_cache_size = len(self.utxo_cache) * 205
|
||||
db_deletes_size = len(self.db_deletes) * 57
|
||||
hist_cache_size = self.db.history.unflushed_memsize()
|
||||
hist_cache_size = len(self.db.history.unflushed) * 180 + self.db.history.unflushed_count * 4
|
||||
# Roughly ntxs * 32 + nblocks * 42
|
||||
tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32
|
||||
+ (self.height - self.db.fs_height) * 42)
|
||||
|
@ -475,7 +475,16 @@ class BlockProcessor:
|
|||
self.db.total_transactions.append(tx_hash)
|
||||
tx_num += 1
|
||||
|
||||
self.db.history.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
# self.db.add_unflushed(hashXs_by_tx, self.tx_count)
|
||||
first_tx_num = self.tx_count
|
||||
_unflushed = self.db.history.unflushed
|
||||
_count = 0
|
||||
for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
for _hashX in set(_hashXs):
|
||||
_unflushed[_hashX].append(_tx_num)
|
||||
_count += len(_hashXs)
|
||||
self.db.history.unflushed_count += _count
|
||||
|
||||
self.tx_count = tx_num
|
||||
self.db.tx_counts.append(tx_num)
|
||||
|
||||
|
|
|
@ -34,150 +34,122 @@ class History:
|
|||
self.max_hist_row_entries = 12500
|
||||
self.unflushed = defaultdict(partial(array.array, 'I'))
|
||||
self.unflushed_count = 0
|
||||
self.db = None
|
||||
|
||||
def open_db(self, db, for_sync, utxo_flush_count, compacting):
|
||||
self.db = db #db_class('hist', for_sync)
|
||||
self.read_state()
|
||||
self.clear_excess(utxo_flush_count)
|
||||
# An incomplete compaction needs to be cancelled otherwise
|
||||
# restarting it will corrupt the history
|
||||
if not compacting:
|
||||
self._cancel_compaction()
|
||||
return self.flush_count
|
||||
|
||||
def close_db(self):
|
||||
if self.db:
|
||||
# self.db.close()
|
||||
self.db = None
|
||||
|
||||
def read_state(self):
|
||||
state = self.db.get(HIST_STATE)
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise RuntimeError('failed reading state from history DB')
|
||||
self.flush_count = state['flush_count']
|
||||
self.comp_flush_count = state.get('comp_flush_count', -1)
|
||||
self.comp_cursor = state.get('comp_cursor', -1)
|
||||
self.db_version = state.get('db_version', 0)
|
||||
else:
|
||||
self.flush_count = 0
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
||||
self.db_version = max(self.DB_VERSIONS)
|
||||
# self.db = None
|
||||
|
||||
self.logger.info(f'history DB version: {self.db_version}')
|
||||
if self.db_version not in self.DB_VERSIONS:
|
||||
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
|
||||
self.logger.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
self.logger.info(f'flush count: {self.flush_count:,d}')
|
||||
# def close_db(self):
|
||||
# if self.db:
|
||||
# # self.db.close()
|
||||
# self.db = None
|
||||
|
||||
def clear_excess(self, utxo_flush_count):
|
||||
# < might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
if self.flush_count <= utxo_flush_count:
|
||||
return
|
||||
# def read_state(self):
|
||||
# state = self.db.get(HIST_STATE)
|
||||
# if state:
|
||||
# state = ast.literal_eval(state.decode())
|
||||
# if not isinstance(state, dict):
|
||||
# raise RuntimeError('failed reading state from history DB')
|
||||
# self.flush_count = state['flush_count']
|
||||
# self.comp_flush_count = state.get('comp_flush_count', -1)
|
||||
# self.comp_cursor = state.get('comp_cursor', -1)
|
||||
# self.db_version = state.get('db_version', 0)
|
||||
# else:
|
||||
# self.flush_count = 0
|
||||
# self.comp_flush_count = -1
|
||||
# self.comp_cursor = -1
|
||||
# self.db_version = max(self.DB_VERSIONS)
|
||||
#
|
||||
# self.logger.info(f'history DB version: {self.db_version}')
|
||||
# if self.db_version not in self.DB_VERSIONS:
|
||||
# msg = f'this software only handles DB versions {self.DB_VERSIONS}'
|
||||
# self.logger.error(msg)
|
||||
# raise RuntimeError(msg)
|
||||
# self.logger.info(f'flush count: {self.flush_count:,d}')
|
||||
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
# def clear_excess(self, utxo_flush_count):
|
||||
# # < might happen at end of compaction as both DBs cannot be
|
||||
# # updated atomically
|
||||
# if self.flush_count <= utxo_flush_count:
|
||||
# return
|
||||
#
|
||||
# self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
# 'excess history flushes...')
|
||||
#
|
||||
# keys = []
|
||||
# for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||
# k = key[1:]
|
||||
# flush_id, = unpack_be_uint16_from(k[-2:])
|
||||
# if flush_id > utxo_flush_count:
|
||||
# keys.append(k)
|
||||
#
|
||||
# self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
#
|
||||
# self.flush_count = utxo_flush_count
|
||||
# with self.db.write_batch() as batch:
|
||||
# for key in keys:
|
||||
# batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
# self.write_state(batch)
|
||||
#
|
||||
# self.logger.info('deleted excess history entries')
|
||||
#
|
||||
# def write_state(self, batch):
|
||||
# """Write state to the history DB."""
|
||||
# state = {
|
||||
# 'flush_count': self.flush_count,
|
||||
# 'comp_flush_count': self.comp_flush_count,
|
||||
# 'comp_cursor': self.comp_cursor,
|
||||
# 'db_version': self.db_version,
|
||||
# }
|
||||
# # History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# # look similar to other entries and aren't interfered with
|
||||
# batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||
k = key[1:]
|
||||
flush_id, = unpack_be_uint16_from(k[-2:])
|
||||
if flush_id > utxo_flush_count:
|
||||
keys.append(k)
|
||||
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
# unflushed = self.unflushed
|
||||
# count = 0
|
||||
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
# hashXs = set(hashXs)
|
||||
# for hashX in hashXs:
|
||||
# unflushed[hashX].append(tx_num)
|
||||
# count += len(hashXs)
|
||||
# self.unflushed_count += count
|
||||
|
||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
# def unflushed_memsize(self):
|
||||
# return len(self.unflushed) * 180 + self.unflushed_count * 4
|
||||
|
||||
self.flush_count = utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
self.write_state(batch)
|
||||
# def assert_flushed(self):
|
||||
# assert not self.unflushed
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
|
||||
def write_state(self, batch):
|
||||
"""Write state to the history DB."""
|
||||
state = {
|
||||
'flush_count': self.flush_count,
|
||||
'comp_flush_count': self.comp_flush_count,
|
||||
'comp_cursor': self.comp_cursor,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
unflushed = self.unflushed
|
||||
count = 0
|
||||
for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
hashXs = set(hashXs)
|
||||
for hashX in hashXs:
|
||||
unflushed[hashX].append(tx_num)
|
||||
count += len(hashXs)
|
||||
self.unflushed_count += count
|
||||
|
||||
def unflushed_memsize(self):
|
||||
return len(self.unflushed) * 180 + self.unflushed_count * 4
|
||||
|
||||
def assert_flushed(self):
|
||||
assert not self.unflushed
|
||||
|
||||
def flush(self):
|
||||
start_time = time.time()
|
||||
self.flush_count += 1
|
||||
flush_id = pack_be_uint16(self.flush_count)
|
||||
unflushed = self.unflushed
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
|
||||
self.write_state(batch)
|
||||
|
||||
count = len(unflushed)
|
||||
unflushed.clear()
|
||||
self.unflushed_count = 0
|
||||
|
||||
if self.db.for_sync:
|
||||
elapsed = time.time() - start_time
|
||||
self.logger.info(f'flushed history in {elapsed:.1f}s '
|
||||
f'for {count:,d} addrs')
|
||||
|
||||
def backup(self, hashXs, tx_count):
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.flush_count += 1
|
||||
nremoves = 0
|
||||
bisect_left = bisect.bisect_left
|
||||
|
||||
with self.db.write_batch() as batch:
|
||||
for hashX in sorted(hashXs):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
|
||||
k = key[1:]
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= tx_count
|
||||
idx = bisect_left(a, tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[k] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(k)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
for key, value in puts.items():
|
||||
batch.put(key, value)
|
||||
self.write_state(batch)
|
||||
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
# def backup(self, hashXs, tx_count):
|
||||
# # Not certain this is needed, but it doesn't hurt
|
||||
# self.flush_count += 1
|
||||
# nremoves = 0
|
||||
# bisect_left = bisect.bisect_left
|
||||
#
|
||||
# with self.db.write_batch() as batch:
|
||||
# for hashX in sorted(hashXs):
|
||||
# deletes = []
|
||||
# puts = {}
|
||||
# for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
|
||||
# k = key[1:]
|
||||
# a = array.array('I')
|
||||
# a.frombytes(hist)
|
||||
# # Remove all history entries >= tx_count
|
||||
# idx = bisect_left(a, tx_count)
|
||||
# nremoves += len(a) - idx
|
||||
# if idx > 0:
|
||||
# puts[k] = a[:idx].tobytes()
|
||||
# break
|
||||
# deletes.append(k)
|
||||
#
|
||||
# for key in deletes:
|
||||
# batch.delete(key)
|
||||
# for key, value in puts.items():
|
||||
# batch.put(key, value)
|
||||
# self.write_state(batch)
|
||||
#
|
||||
# self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
|
||||
# def get_txnums(self, hashX, limit=1000):
|
||||
# """Generator that returns an unpruned, sorted list of tx_nums in the
|
||||
|
@ -213,119 +185,120 @@ class History:
|
|||
# When compaction is complete and the final flush takes place,
|
||||
# flush_count is reset to comp_flush_count, and comp_flush_count to -1
|
||||
|
||||
def _flush_compaction(self, cursor, write_items, keys_to_delete):
|
||||
"""Flush a single compaction pass as a batch."""
|
||||
# Update compaction state
|
||||
if cursor == 65536:
|
||||
self.flush_count = self.comp_flush_count
|
||||
self.comp_cursor = -1
|
||||
self.comp_flush_count = -1
|
||||
else:
|
||||
self.comp_cursor = cursor
|
||||
# def _flush_compaction(self, cursor, write_items, keys_to_delete):
|
||||
# """Flush a single compaction pass as a batch."""
|
||||
# # Update compaction state
|
||||
# if cursor == 65536:
|
||||
# self.flush_count = self.comp_flush_count
|
||||
# self.comp_cursor = -1
|
||||
# self.comp_flush_count = -1
|
||||
# else:
|
||||
# self.comp_cursor = cursor
|
||||
#
|
||||
# # History DB. Flush compacted history and updated state
|
||||
# with self.db.write_batch() as batch:
|
||||
# # Important: delete first! The keyspace may overlap.
|
||||
# for key in keys_to_delete:
|
||||
# batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
# for key, value in write_items:
|
||||
# batch.put(HASHX_HISTORY_PREFIX + key, value)
|
||||
# self.write_state(batch)
|
||||
|
||||
# History DB. Flush compacted history and updated state
|
||||
with self.db.write_batch() as batch:
|
||||
# Important: delete first! The keyspace may overlap.
|
||||
for key in keys_to_delete:
|
||||
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
for key, value in write_items:
|
||||
batch.put(HASHX_HISTORY_PREFIX + key, value)
|
||||
self.write_state(batch)
|
||||
# def _compact_hashX(self, hashX, hist_map, hist_list,
|
||||
# write_items, keys_to_delete):
|
||||
# """Compress history for a hashX. hist_list is an ordered list of
|
||||
# the histories to be compressed."""
|
||||
# # History entries (tx numbers) are 4 bytes each. Distribute
|
||||
# # over rows of up to 50KB in size. A fixed row size means
|
||||
# # future compactions will not need to update the first N - 1
|
||||
# # rows.
|
||||
# max_row_size = self.max_hist_row_entries * 4
|
||||
# full_hist = b''.join(hist_list)
|
||||
# nrows = (len(full_hist) + max_row_size - 1) // max_row_size
|
||||
# if nrows > 4:
|
||||
# self.logger.info('hashX {} is large: {:,d} entries across '
|
||||
# '{:,d} rows'
|
||||
# .format(hash_to_hex_str(hashX),
|
||||
# len(full_hist) // 4, nrows))
|
||||
#
|
||||
# # Find what history needs to be written, and what keys need to
|
||||
# # be deleted. Start by assuming all keys are to be deleted,
|
||||
# # and then remove those that are the same on-disk as when
|
||||
# # compacted.
|
||||
# write_size = 0
|
||||
# keys_to_delete.update(hist_map)
|
||||
# for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
||||
# key = hashX + pack_be_uint16(n)
|
||||
# if hist_map.get(key) == chunk:
|
||||
# keys_to_delete.remove(key)
|
||||
# else:
|
||||
# write_items.append((key, chunk))
|
||||
# write_size += len(chunk)
|
||||
#
|
||||
# assert n + 1 == nrows
|
||||
# self.comp_flush_count = max(self.comp_flush_count, n)
|
||||
#
|
||||
# return write_size
|
||||
|
||||
def _compact_hashX(self, hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete):
|
||||
"""Compress history for a hashX. hist_list is an ordered list of
|
||||
the histories to be compressed."""
|
||||
# History entries (tx numbers) are 4 bytes each. Distribute
|
||||
# over rows of up to 50KB in size. A fixed row size means
|
||||
# future compactions will not need to update the first N - 1
|
||||
# rows.
|
||||
max_row_size = self.max_hist_row_entries * 4
|
||||
full_hist = b''.join(hist_list)
|
||||
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
|
||||
if nrows > 4:
|
||||
self.logger.info('hashX {} is large: {:,d} entries across '
|
||||
'{:,d} rows'
|
||||
.format(hash_to_hex_str(hashX),
|
||||
len(full_hist) // 4, nrows))
|
||||
# def _compact_prefix(self, prefix, write_items, keys_to_delete):
|
||||
# """Compact all history entries for hashXs beginning with the
|
||||
# given prefix. Update keys_to_delete and write."""
|
||||
# prior_hashX = None
|
||||
# hist_map = {}
|
||||
# hist_list = []
|
||||
#
|
||||
# key_len = HASHX_LEN + 2
|
||||
# write_size = 0
|
||||
# for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
|
||||
# k = key[1:]
|
||||
# # Ignore non-history entries
|
||||
# if len(k) != key_len:
|
||||
# continue
|
||||
# hashX = k[:-2]
|
||||
# if hashX != prior_hashX and prior_hashX:
|
||||
# write_size += self._compact_hashX(prior_hashX, hist_map,
|
||||
# hist_list, write_items,
|
||||
# keys_to_delete)
|
||||
# hist_map.clear()
|
||||
# hist_list.clear()
|
||||
# prior_hashX = hashX
|
||||
# hist_map[k] = hist
|
||||
# hist_list.append(hist)
|
||||
#
|
||||
# if prior_hashX:
|
||||
# write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
|
||||
# write_items, keys_to_delete)
|
||||
# return write_size
|
||||
|
||||
# Find what history needs to be written, and what keys need to
|
||||
# be deleted. Start by assuming all keys are to be deleted,
|
||||
# and then remove those that are the same on-disk as when
|
||||
# compacted.
|
||||
write_size = 0
|
||||
keys_to_delete.update(hist_map)
|
||||
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
|
||||
key = hashX + pack_be_uint16(n)
|
||||
if hist_map.get(key) == chunk:
|
||||
keys_to_delete.remove(key)
|
||||
else:
|
||||
write_items.append((key, chunk))
|
||||
write_size += len(chunk)
|
||||
# def _compact_history(self, limit):
|
||||
# """Inner loop of history compaction. Loops until limit bytes have
|
||||
# been processed.
|
||||
# """
|
||||
# fnord
|
||||
# keys_to_delete = set()
|
||||
# write_items = [] # A list of (key, value) pairs
|
||||
# write_size = 0
|
||||
#
|
||||
# # Loop over 2-byte prefixes
|
||||
# cursor = self.comp_cursor
|
||||
# while write_size < limit and cursor < 65536:
|
||||
# prefix = pack_be_uint16(cursor)
|
||||
# write_size += self._compact_prefix(prefix, write_items,
|
||||
# keys_to_delete)
|
||||
# cursor += 1
|
||||
#
|
||||
# max_rows = self.comp_flush_count + 1
|
||||
# self._flush_compaction(cursor, write_items, keys_to_delete)
|
||||
#
|
||||
# self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
|
||||
# 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
|
||||
# .format(len(write_items), write_size / 1000000,
|
||||
# len(keys_to_delete), max_rows,
|
||||
# 100 * cursor / 65536))
|
||||
# return write_size
|
||||
|
||||
assert n + 1 == nrows
|
||||
self.comp_flush_count = max(self.comp_flush_count, n)
|
||||
|
||||
return write_size
|
||||
|
||||
def _compact_prefix(self, prefix, write_items, keys_to_delete):
|
||||
"""Compact all history entries for hashXs beginning with the
|
||||
given prefix. Update keys_to_delete and write."""
|
||||
prior_hashX = None
|
||||
hist_map = {}
|
||||
hist_list = []
|
||||
|
||||
key_len = HASHX_LEN + 2
|
||||
write_size = 0
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix):
|
||||
k = key[1:]
|
||||
# Ignore non-history entries
|
||||
if len(k) != key_len:
|
||||
continue
|
||||
hashX = k[:-2]
|
||||
if hashX != prior_hashX and prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map,
|
||||
hist_list, write_items,
|
||||
keys_to_delete)
|
||||
hist_map.clear()
|
||||
hist_list.clear()
|
||||
prior_hashX = hashX
|
||||
hist_map[k] = hist
|
||||
hist_list.append(hist)
|
||||
|
||||
if prior_hashX:
|
||||
write_size += self._compact_hashX(prior_hashX, hist_map, hist_list,
|
||||
write_items, keys_to_delete)
|
||||
return write_size
|
||||
|
||||
def _compact_history(self, limit):
|
||||
"""Inner loop of history compaction. Loops until limit bytes have
|
||||
been processed.
|
||||
"""
|
||||
keys_to_delete = set()
|
||||
write_items = [] # A list of (key, value) pairs
|
||||
write_size = 0
|
||||
|
||||
# Loop over 2-byte prefixes
|
||||
cursor = self.comp_cursor
|
||||
while write_size < limit and cursor < 65536:
|
||||
prefix = pack_be_uint16(cursor)
|
||||
write_size += self._compact_prefix(prefix, write_items,
|
||||
keys_to_delete)
|
||||
cursor += 1
|
||||
|
||||
max_rows = self.comp_flush_count + 1
|
||||
self._flush_compaction(cursor, write_items, keys_to_delete)
|
||||
|
||||
self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), '
|
||||
'removed {:,d} rows, largest: {:,d}, {:.1f}% complete'
|
||||
.format(len(write_items), write_size / 1000000,
|
||||
len(keys_to_delete), max_rows,
|
||||
100 * cursor / 65536))
|
||||
return write_size
|
||||
|
||||
def _cancel_compaction(self):
|
||||
if self.comp_cursor != -1:
|
||||
self.logger.warning('cancelling in-progress history compaction')
|
||||
self.comp_flush_count = -1
|
||||
self.comp_cursor = -1
|
||||
# def _cancel_compaction(self):
|
||||
# if self.comp_cursor != -1:
|
||||
# self.logger.warning('cancelling in-progress history compaction')
|
||||
# self.comp_flush_count = -1
|
||||
# self.comp_cursor = -1
|
||||
|
|
|
@ -19,7 +19,7 @@ import zlib
|
|||
import typing
|
||||
from typing import Optional, List, Tuple, Iterable
|
||||
from asyncio import sleep
|
||||
from bisect import bisect_right
|
||||
from bisect import bisect_right, bisect_left
|
||||
from collections import namedtuple
|
||||
from glob import glob
|
||||
from struct import pack, unpack
|
||||
|
@ -29,7 +29,7 @@ from lbry.utils import LRUCacheWithMetrics
|
|||
from lbry.wallet.server import util
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
|
||||
from lbry.wallet.server.merkle import Merkle, MerkleCache
|
||||
from lbry.wallet.server.util import formatted_time, pack_be_uint16
|
||||
from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from
|
||||
from lbry.wallet.server.storage import db_class
|
||||
from lbry.wallet.server.history import History
|
||||
|
||||
|
@ -41,6 +41,7 @@ BLOCK_HASH_PREFIX = b'C'
|
|||
HEADER_PREFIX = b'H'
|
||||
TX_NUM_PREFIX = b'N'
|
||||
TX_COUNT_PREFIX = b'T'
|
||||
UNDO_PREFIX = b'U'
|
||||
TX_HASH_PREFIX = b'X'
|
||||
|
||||
HASHX_UTXO_PREFIX = b'h'
|
||||
|
@ -50,9 +51,6 @@ UTXO_PREFIX = b'u'
|
|||
HASHX_HISTORY_PREFIX = b'x'
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
class FlushData:
|
||||
height = attr.ib()
|
||||
|
@ -107,6 +105,19 @@ class LevelDB:
|
|||
self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server")
|
||||
self.total_transactions = None
|
||||
|
||||
# def add_unflushed(self, hashXs_by_tx, first_tx_num):
|
||||
# unflushed = self.history.unflushed
|
||||
# count = 0
|
||||
# for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num):
|
||||
# hashXs = set(hashXs)
|
||||
# for hashX in hashXs:
|
||||
# unflushed[hashX].append(tx_num)
|
||||
# count += len(hashXs)
|
||||
# self.history.unflushed_count += count
|
||||
|
||||
# def unflushed_memsize(self):
|
||||
# return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4
|
||||
|
||||
async def _read_tx_counts(self):
|
||||
if self.tx_counts is not None:
|
||||
return
|
||||
|
@ -172,10 +183,88 @@ class LevelDB:
|
|||
self.read_utxo_state()
|
||||
|
||||
# Then history DB
|
||||
self.utxo_flush_count = self.history.open_db(
|
||||
self.db, for_sync, self.utxo_flush_count, compacting
|
||||
)
|
||||
self.clear_excess_undo_info()
|
||||
state = self.db.get(HIST_STATE)
|
||||
if state:
|
||||
state = ast.literal_eval(state.decode())
|
||||
if not isinstance(state, dict):
|
||||
raise RuntimeError('failed reading state from history DB')
|
||||
self.history.flush_count = state['flush_count']
|
||||
self.history.comp_flush_count = state.get('comp_flush_count', -1)
|
||||
self.history.comp_cursor = state.get('comp_cursor', -1)
|
||||
self.history.db_version = state.get('db_version', 0)
|
||||
else:
|
||||
self.history.flush_count = 0
|
||||
self.history.comp_flush_count = -1
|
||||
self.history.comp_cursor = -1
|
||||
self.history.db_version = max(self.DB_VERSIONS)
|
||||
|
||||
self.logger.info(f'history DB version: {self.history.db_version}')
|
||||
if self.history.db_version not in self.DB_VERSIONS:
|
||||
msg = f'this software only handles DB versions {self.DB_VERSIONS}'
|
||||
self.logger.error(msg)
|
||||
raise RuntimeError(msg)
|
||||
self.logger.info(f'flush count: {self.history.flush_count:,d}')
|
||||
|
||||
# self.history.clear_excess(self.utxo_flush_count)
|
||||
# < might happen at end of compaction as both DBs cannot be
|
||||
# updated atomically
|
||||
if self.history.flush_count > self.utxo_flush_count:
|
||||
self.logger.info('DB shut down uncleanly. Scanning for '
|
||||
'excess history flushes...')
|
||||
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX):
|
||||
k = key[1:]
|
||||
flush_id, = unpack_be_uint16_from(k[-2:])
|
||||
if flush_id > self.utxo_flush_count:
|
||||
keys.append(k)
|
||||
|
||||
self.logger.info(f'deleting {len(keys):,d} history entries')
|
||||
|
||||
self.history.flush_count = self.utxo_flush_count
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(HASHX_HISTORY_PREFIX + key)
|
||||
state = {
|
||||
'flush_count': self.history.flush_count,
|
||||
'comp_flush_count': self.history.comp_flush_count,
|
||||
'comp_cursor': self.history.comp_cursor,
|
||||
'db_version': self.history.db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
self.logger.info('deleted excess history entries')
|
||||
|
||||
self.utxo_flush_count = self.history.flush_count
|
||||
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
keys.append(key)
|
||||
|
||||
if keys:
|
||||
with self.db.write_batch() as batch:
|
||||
for key in keys:
|
||||
batch.delete(key)
|
||||
self.logger.info(f'deleted {len(keys):,d} stale undo entries')
|
||||
|
||||
# delete old block files
|
||||
prefix = self.raw_block_prefix()
|
||||
paths = [path for path in glob(f'{prefix}[0-9]*')
|
||||
if len(path) > len(prefix)
|
||||
and int(path[len(prefix):]) < min_height]
|
||||
if paths:
|
||||
for path in paths:
|
||||
try:
|
||||
os.remove(path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
self.logger.info(f'deleted {len(paths):,d} stale block files')
|
||||
|
||||
# Read TX counts (requires meta directory)
|
||||
await self._read_tx_counts()
|
||||
|
@ -185,7 +274,6 @@ class LevelDB:
|
|||
|
||||
def close(self):
|
||||
self.db.close()
|
||||
self.history.close_db()
|
||||
self.executor.shutdown(wait=True)
|
||||
self.executor = None
|
||||
|
||||
|
@ -240,7 +328,7 @@ class LevelDB:
|
|||
assert not flush_data.adds
|
||||
assert not flush_data.deletes
|
||||
assert not flush_data.undo_infos
|
||||
self.history.assert_flushed()
|
||||
assert not self.history.unflushed
|
||||
|
||||
def flush_utxo_db(self, batch, flush_data):
|
||||
"""Flush the cached DB writes and UTXO set to the batch."""
|
||||
|
@ -263,12 +351,13 @@ class LevelDB:
|
|||
# suffix = tx_idx + tx_num
|
||||
hashX = value[:-12]
|
||||
suffix = key[-2:] + value[-12:-8]
|
||||
batch_put(b'h' + key[:4] + suffix, hashX)
|
||||
batch_put(b'u' + hashX + suffix, value[-8:])
|
||||
batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX)
|
||||
batch_put(UTXO_PREFIX + hashX + suffix, value[-8:])
|
||||
flush_data.adds.clear()
|
||||
|
||||
# New undo information
|
||||
self.flush_undo_infos(batch_put, flush_data.undo_infos)
|
||||
for undo_info, height in flush_data.undo_infos:
|
||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
||||
flush_data.undo_infos.clear()
|
||||
|
||||
if self.db.for_sync:
|
||||
|
@ -285,6 +374,17 @@ class LevelDB:
|
|||
self.db_tx_count = flush_data.tx_count
|
||||
self.db_tip = flush_data.tip
|
||||
|
||||
def write_history_state(self, batch):
|
||||
state = {
|
||||
'flush_count': self.history.flush_count,
|
||||
'comp_flush_count': self.history.comp_flush_count,
|
||||
'comp_cursor': self.history.comp_cursor,
|
||||
'db_version': self.db_version,
|
||||
}
|
||||
# History entries are not prefixed; the suffix \0\0 ensures we
|
||||
# look similar to other entries and aren't interfered with
|
||||
batch.put(HIST_STATE, repr(state).encode())
|
||||
|
||||
def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
|
||||
"""Flush out cached state. History is always flushed; UTXOs are
|
||||
flushed if flush_utxos."""
|
||||
|
@ -351,7 +451,7 @@ class LevelDB:
|
|||
for hashX in sorted(unflushed):
|
||||
key = hashX + flush_id
|
||||
batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes())
|
||||
self.history.write_state(batch)
|
||||
self.write_history_state(batch)
|
||||
|
||||
unflushed.clear()
|
||||
self.history.unflushed_count = 0
|
||||
|
@ -396,45 +496,74 @@ class LevelDB:
|
|||
self.logger.info(f'sync time: {formatted_time(self.wall_time)} '
|
||||
f'ETA: {formatted_time(eta)}')
|
||||
|
||||
def flush_state(self, batch):
|
||||
"""Flush chain state to the batch."""
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
# def flush_state(self, batch):
|
||||
# """Flush chain state to the batch."""
|
||||
# now = time.time()
|
||||
# self.wall_time += now - self.last_flush
|
||||
# self.last_flush = now
|
||||
# self.last_flush_tx_count = self.fs_tx_count
|
||||
# self.write_utxo_state(batch)
|
||||
|
||||
def flush_backup(self, flush_data, touched):
|
||||
"""Like flush_dbs() but when backing up. All UTXOs are flushed."""
|
||||
assert not flush_data.headers
|
||||
assert not flush_data.block_txs
|
||||
assert flush_data.height < self.db_height
|
||||
self.history.assert_flushed()
|
||||
assert not self.history.unflushed
|
||||
|
||||
start_time = time.time()
|
||||
tx_delta = flush_data.tx_count - self.last_flush_tx_count
|
||||
###
|
||||
while self.fs_height > flush_data.height:
|
||||
self.fs_height -= 1
|
||||
self.headers.pop()
|
||||
self.fs_tx_count = flush_data.tx_count
|
||||
# Truncate header_mc: header count is 1 more than the height.
|
||||
self.header_mc.truncate(flush_data.height + 1)
|
||||
|
||||
###
|
||||
# Not certain this is needed, but it doesn't hurt
|
||||
self.history.flush_count += 1
|
||||
nremoves = 0
|
||||
|
||||
self.backup_fs(flush_data.height, flush_data.tx_count)
|
||||
self.history.backup(touched, flush_data.tx_count)
|
||||
with self.db.write_batch() as batch:
|
||||
tx_count = flush_data.tx_count
|
||||
for hashX in sorted(touched):
|
||||
deletes = []
|
||||
puts = {}
|
||||
for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True):
|
||||
k = key[1:]
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
# Remove all history entries >= tx_count
|
||||
idx = bisect_left(a, tx_count)
|
||||
nremoves += len(a) - idx
|
||||
if idx > 0:
|
||||
puts[k] = a[:idx].tobytes()
|
||||
break
|
||||
deletes.append(k)
|
||||
|
||||
for key in deletes:
|
||||
batch.delete(key)
|
||||
for key, value in puts.items():
|
||||
batch.put(key, value)
|
||||
self.write_history_state(batch)
|
||||
|
||||
self.flush_utxo_db(batch, flush_data)
|
||||
# Flush state last as it reads the wall time.
|
||||
self.flush_state(batch)
|
||||
now = time.time()
|
||||
self.wall_time += now - self.last_flush
|
||||
self.last_flush = now
|
||||
self.last_flush_tx_count = self.fs_tx_count
|
||||
self.write_utxo_state(batch)
|
||||
|
||||
|
||||
self.logger.info(f'backing up removed {nremoves:,d} history entries')
|
||||
elapsed = self.last_flush - start_time
|
||||
self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
|
||||
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
|
||||
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')
|
||||
|
||||
def backup_fs(self, height, tx_count):
|
||||
"""Back up during a reorg. This just updates our pointers."""
|
||||
while self.fs_height > height:
|
||||
self.fs_height -= 1
|
||||
self.headers.pop()
|
||||
self.fs_tx_count = tx_count
|
||||
# Truncate header_mc: header count is 1 more than the height.
|
||||
self.header_mc.truncate(height + 1)
|
||||
|
||||
def raw_header(self, height):
|
||||
"""Return the binary header at the given height."""
|
||||
header, n = self.read_headers(height, 1)
|
||||
|
@ -555,7 +684,7 @@ class LevelDB:
|
|||
cnt = 0
|
||||
txs = []
|
||||
|
||||
for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
|
||||
for hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False):
|
||||
a = array.array('I')
|
||||
a.frombytes(hist)
|
||||
for tx_num in a:
|
||||
|
@ -586,17 +715,12 @@ class LevelDB:
|
|||
|
||||
def undo_key(self, height):
|
||||
"""DB key for undo information at the given height."""
|
||||
return b'U' + pack('>I', height)
|
||||
return UNDO_PREFIX + pack('>I', height)
|
||||
|
||||
def read_undo_info(self, height):
|
||||
"""Read undo information from a file for the current height."""
|
||||
return self.db.get(self.undo_key(height))
|
||||
|
||||
def flush_undo_infos(self, batch_put, undo_infos):
|
||||
"""undo_infos is a list of (undo_info, height) pairs."""
|
||||
for undo_info, height in undo_infos:
|
||||
batch_put(self.undo_key(height), b''.join(undo_info))
|
||||
|
||||
def raw_block_prefix(self):
|
||||
return 'block'
|
||||
|
||||
|
@ -626,10 +750,9 @@ class LevelDB:
|
|||
|
||||
def clear_excess_undo_info(self):
|
||||
"""Clear excess undo info. Only most recent N are kept."""
|
||||
prefix = b'U'
|
||||
min_height = self.min_undo_height(self.db_height)
|
||||
keys = []
|
||||
for key, hist in self.db.iterator(prefix=prefix):
|
||||
for key, hist in self.db.iterator(prefix=UNDO_PREFIX):
|
||||
height, = unpack('>I', key[-4:])
|
||||
if height >= min_height:
|
||||
break
|
||||
|
@ -733,7 +856,7 @@ class LevelDB:
|
|||
fs_tx_hash = self.fs_tx_hash
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
prefix = b'u' + hashX
|
||||
prefix = UTXO_PREFIX + hashX
|
||||
for db_key, db_value in self.db.iterator(prefix=prefix):
|
||||
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
|
||||
value, = unpack('<Q', db_value)
|
||||
|
@ -764,7 +887,7 @@ class LevelDB:
|
|||
|
||||
# Key: b'h' + compressed_tx_hash + tx_idx + tx_num
|
||||
# Value: hashX
|
||||
prefix = b'h' + tx_hash[:4] + idx_packed
|
||||
prefix = HASHX_UTXO_PREFIX + tx_hash[:4] + idx_packed
|
||||
|
||||
# Find which entry, if any, the TX_HASH matches.
|
||||
for db_key, hashX in self.db.iterator(prefix=prefix):
|
||||
|
@ -785,7 +908,7 @@ class LevelDB:
|
|||
return None
|
||||
# Key: b'u' + address_hashX + tx_idx + tx_num
|
||||
# Value: the UTXO value as a 64-bit unsigned integer
|
||||
key = b'u' + hashX + suffix
|
||||
key = UTXO_PREFIX + hashX + suffix
|
||||
db_value = self.db.get(key)
|
||||
if not db_value:
|
||||
# This can happen if the DB was updated between
|
||||
|
|
Loading…
Reference in a new issue