diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index de8ee6980..b456bac99 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1,5 +1,6 @@ import time import asyncio +import typing from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, List, Tuple @@ -14,6 +15,8 @@ from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.leveldb import FlushData from lbry.wallet.transaction import Transaction from lbry.wallet.server.udp import StatusServer +if typing.TYPE_CHECKING: + from lbry.wallet.server.leveldb import LevelDB class Prefetcher: @@ -155,7 +158,7 @@ class BlockProcessor: "reorg_count", "Number of reorgs", namespace=NAMESPACE ) - def __init__(self, env, db, daemon, notifications): + def __init__(self, env, db: 'LevelDB', daemon, notifications): self.env = env self.db = db self.daemon = daemon @@ -259,7 +262,6 @@ class BlockProcessor: else: self.logger.info(f'faking a reorg of {count:,d} blocks') - async def get_raw_blocks(last_height, hex_hashes): heights = range(last_height, last_height - len(hex_hashes), -1) try: @@ -277,7 +279,6 @@ class BlockProcessor: try: await self.flush(True) - start, last, hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] @@ -364,8 +365,7 @@ class BlockProcessor: async def flush(self, flush_utxos): def flush(): - self.db.flush_dbs(self.flush_data(), flush_utxos, - self.estimate_txs_remaining) + self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining) await self.run_in_thread_with_lock(flush) async def _maybe_flush(self): @@ -384,7 +384,7 @@ class BlockProcessor: one_MB = 1000*1000 utxo_cache_size = len(self.utxo_cache) * 205 db_deletes_size = len(self.db_deletes) * 57 - hist_cache_size = len(self.db.history.unflushed) * 180 + self.db.history.unflushed_count * 4 + hist_cache_size = len(self.db.hist_unflushed) * 180 + self.db.hist_unflushed_count * 4 # Roughly ntxs * 32 + nblocks * 42 tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 + (self.height - self.db.fs_height) * 42) @@ -426,7 +426,7 @@ class BlockProcessor: self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) - self.db.flush_dbs(self.flush_data(), True, self.estimate_txs_remaining) + self.db.flush_dbs(self.flush_data(), self.estimate_txs_remaining) for cache in self.search_cache.values(): cache.clear() @@ -477,13 +477,13 @@ class BlockProcessor: # self.db.add_unflushed(hashXs_by_tx, self.tx_count) first_tx_num = self.tx_count - _unflushed = self.db.history.unflushed + _unflushed = self.db.hist_unflushed _count = 0 for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num): for _hashX in set(_hashXs): _unflushed[_hashX].append(_tx_num) _count += len(_hashXs) - self.db.history.unflushed_count += _count + self.db.hist_unflushed_count += _count self.tx_count = tx_num self.db.tx_counts.append(tx_num) diff --git a/lbry/wallet/server/history.py b/lbry/wallet/server/history.py deleted file mode 100644 index 82f7ceb78..000000000 --- a/lbry/wallet/server/history.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright (c) 2016-2018, Neil Booth -# Copyright (c) 2017, the ElectrumX authors -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -"""History by script hash (address).""" - -import array -import ast -import bisect -import time -from collections import defaultdict -from functools import partial - -from lbry.wallet.server import util -from lbry.wallet.server.util import pack_be_uint16, unpack_be_uint16_from -from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN - - -HASHX_HISTORY_PREFIX = b'x' -HIST_STATE = b'state-hist' - - -class History: - - DB_VERSIONS = [0] - - def __init__(self): - self.logger = util.class_logger(__name__, self.__class__.__name__) - # For history compaction - self.max_hist_row_entries = 12500 - self.unflushed = defaultdict(partial(array.array, 'I')) - self.unflushed_count = 0 - self.flush_count = 0 - self.comp_flush_count = -1 - self.comp_cursor = -1 - # self.db = None - - # def close_db(self): - # if self.db: - # # self.db.close() - # self.db = None - - # def read_state(self): - # state = self.db.get(HIST_STATE) - # if state: - # state = ast.literal_eval(state.decode()) - # if not isinstance(state, dict): - # raise RuntimeError('failed reading state from history DB') - # self.flush_count = state['flush_count'] - # self.comp_flush_count = state.get('comp_flush_count', -1) - # self.comp_cursor = state.get('comp_cursor', -1) - # self.db_version = state.get('db_version', 0) - # else: - # self.flush_count = 0 - # self.comp_flush_count = -1 - # self.comp_cursor = -1 - # self.db_version = max(self.DB_VERSIONS) - # - # self.logger.info(f'history DB version: {self.db_version}') - # if self.db_version not in self.DB_VERSIONS: - # msg = f'this software only handles DB versions {self.DB_VERSIONS}' - # self.logger.error(msg) - # raise RuntimeError(msg) - # self.logger.info(f'flush count: {self.flush_count:,d}') - - # def clear_excess(self, utxo_flush_count): - # # < might happen at end of compaction as both DBs cannot be - # # updated atomically - # if self.flush_count <= utxo_flush_count: - # return - # - # self.logger.info('DB shut down uncleanly. Scanning for ' - # 'excess history flushes...') - # - # keys = [] - # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): - # k = key[1:] - # flush_id, = unpack_be_uint16_from(k[-2:]) - # if flush_id > utxo_flush_count: - # keys.append(k) - # - # self.logger.info(f'deleting {len(keys):,d} history entries') - # - # self.flush_count = utxo_flush_count - # with self.db.write_batch() as batch: - # for key in keys: - # batch.delete(HASHX_HISTORY_PREFIX + key) - # self.write_state(batch) - # - # self.logger.info('deleted excess history entries') - # - # def write_state(self, batch): - # """Write state to the history DB.""" - # state = { - # 'flush_count': self.flush_count, - # 'comp_flush_count': self.comp_flush_count, - # 'comp_cursor': self.comp_cursor, - # 'db_version': self.db_version, - # } - # # History entries are not prefixed; the suffix \0\0 ensures we - # # look similar to other entries and aren't interfered with - # batch.put(HIST_STATE, repr(state).encode()) - - # def add_unflushed(self, hashXs_by_tx, first_tx_num): - # unflushed = self.unflushed - # count = 0 - # for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): - # hashXs = set(hashXs) - # for hashX in hashXs: - # unflushed[hashX].append(tx_num) - # count += len(hashXs) - # self.unflushed_count += count - - # def unflushed_memsize(self): - # return len(self.unflushed) * 180 + self.unflushed_count * 4 - - # def assert_flushed(self): - # assert not self.unflushed - - # def backup(self, hashXs, tx_count): - # # Not certain this is needed, but it doesn't hurt - # self.flush_count += 1 - # nremoves = 0 - # bisect_left = bisect.bisect_left - # - # with self.db.write_batch() as batch: - # for hashX in sorted(hashXs): - # deletes = [] - # puts = {} - # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True): - # k = key[1:] - # a = array.array('I') - # a.frombytes(hist) - # # Remove all history entries >= tx_count - # idx = bisect_left(a, tx_count) - # nremoves += len(a) - idx - # if idx > 0: - # puts[k] = a[:idx].tobytes() - # break - # deletes.append(k) - # - # for key in deletes: - # batch.delete(key) - # for key, value in puts.items(): - # batch.put(key, value) - # self.write_state(batch) - # - # self.logger.info(f'backing up removed {nremoves:,d} history entries') - - # def get_txnums(self, hashX, limit=1000): - # """Generator that returns an unpruned, sorted list of tx_nums in the - # history of a hashX. Includes both spending and receiving - # transactions. By default yields at most 1000 entries. Set - # limit to None to get them all. """ - # limit = util.resolve_limit(limit) - # for key, hist in self.db.iterator(prefix=hashX): - # a = array.array('I') - # a.frombytes(hist) - # for tx_num in a: - # if limit == 0: - # return - # yield tx_num - # limit -= 1 - - # - # History compaction - # - - # comp_cursor is a cursor into compaction progress. - # -1: no compaction in progress - # 0-65535: Compaction in progress; all prefixes < comp_cursor have - # been compacted, and later ones have not. - # 65536: compaction complete in-memory but not flushed - # - # comp_flush_count applies during compaction, and is a flush count - # for history with prefix < comp_cursor. flush_count applies - # to still uncompacted history. It is -1 when no compaction is - # taking place. Key suffixes up to and including comp_flush_count - # are used, so a parallel history flush must first increment this - # - # When compaction is complete and the final flush takes place, - # flush_count is reset to comp_flush_count, and comp_flush_count to -1 - - # def _flush_compaction(self, cursor, write_items, keys_to_delete): - # """Flush a single compaction pass as a batch.""" - # # Update compaction state - # if cursor == 65536: - # self.flush_count = self.comp_flush_count - # self.comp_cursor = -1 - # self.comp_flush_count = -1 - # else: - # self.comp_cursor = cursor - # - # # History DB. Flush compacted history and updated state - # with self.db.write_batch() as batch: - # # Important: delete first! The keyspace may overlap. - # for key in keys_to_delete: - # batch.delete(HASHX_HISTORY_PREFIX + key) - # for key, value in write_items: - # batch.put(HASHX_HISTORY_PREFIX + key, value) - # self.write_state(batch) - - # def _compact_hashX(self, hashX, hist_map, hist_list, - # write_items, keys_to_delete): - # """Compress history for a hashX. hist_list is an ordered list of - # the histories to be compressed.""" - # # History entries (tx numbers) are 4 bytes each. Distribute - # # over rows of up to 50KB in size. A fixed row size means - # # future compactions will not need to update the first N - 1 - # # rows. - # max_row_size = self.max_hist_row_entries * 4 - # full_hist = b''.join(hist_list) - # nrows = (len(full_hist) + max_row_size - 1) // max_row_size - # if nrows > 4: - # self.logger.info('hashX {} is large: {:,d} entries across ' - # '{:,d} rows' - # .format(hash_to_hex_str(hashX), - # len(full_hist) // 4, nrows)) - # - # # Find what history needs to be written, and what keys need to - # # be deleted. Start by assuming all keys are to be deleted, - # # and then remove those that are the same on-disk as when - # # compacted. - # write_size = 0 - # keys_to_delete.update(hist_map) - # for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - # key = hashX + pack_be_uint16(n) - # if hist_map.get(key) == chunk: - # keys_to_delete.remove(key) - # else: - # write_items.append((key, chunk)) - # write_size += len(chunk) - # - # assert n + 1 == nrows - # self.comp_flush_count = max(self.comp_flush_count, n) - # - # return write_size - - # def _compact_prefix(self, prefix, write_items, keys_to_delete): - # """Compact all history entries for hashXs beginning with the - # given prefix. Update keys_to_delete and write.""" - # prior_hashX = None - # hist_map = {} - # hist_list = [] - # - # key_len = HASHX_LEN + 2 - # write_size = 0 - # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix): - # k = key[1:] - # # Ignore non-history entries - # if len(k) != key_len: - # continue - # hashX = k[:-2] - # if hashX != prior_hashX and prior_hashX: - # write_size += self._compact_hashX(prior_hashX, hist_map, - # hist_list, write_items, - # keys_to_delete) - # hist_map.clear() - # hist_list.clear() - # prior_hashX = hashX - # hist_map[k] = hist - # hist_list.append(hist) - # - # if prior_hashX: - # write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, - # write_items, keys_to_delete) - # return write_size - - # def _compact_history(self, limit): - # """Inner loop of history compaction. Loops until limit bytes have - # been processed. - # """ - # fnord - # keys_to_delete = set() - # write_items = [] # A list of (key, value) pairs - # write_size = 0 - # - # # Loop over 2-byte prefixes - # cursor = self.comp_cursor - # while write_size < limit and cursor < 65536: - # prefix = pack_be_uint16(cursor) - # write_size += self._compact_prefix(prefix, write_items, - # keys_to_delete) - # cursor += 1 - # - # max_rows = self.comp_flush_count + 1 - # self._flush_compaction(cursor, write_items, keys_to_delete) - # - # self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), ' - # 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete' - # .format(len(write_items), write_size / 1000000, - # len(keys_to_delete), max_rows, - # 100 * cursor / 65536)) - # return write_size - - # def _cancel_compaction(self): - # if self.comp_cursor != -1: - # self.logger.warning('cancelling in-progress history compaction') - # self.comp_flush_count = -1 - # self.comp_cursor = -1 diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 9c3d26cd7..6dc30eaac 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -18,9 +18,10 @@ import time import zlib import typing from typing import Optional, List, Tuple, Iterable +from functools import partial from asyncio import sleep from bisect import bisect_right, bisect_left -from collections import namedtuple +from collections import namedtuple, defaultdict from glob import glob from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor @@ -31,7 +32,6 @@ from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.storage import db_class -from lbry.wallet.server.history import History UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") @@ -73,6 +73,7 @@ class LevelDB: """ DB_VERSIONS = [6] + HIST_DB_VERSIONS = [0] class DBError(Exception): """Raised on general DB errors generally indicating corruption.""" @@ -86,8 +87,14 @@ class LevelDB: self.logger.info(f'switching current directory to {env.db_dir}') self.db_class = db_class(env.db_dir, self.env.db_engine) - self.history = History() self.db = None + + self.hist_unflushed = defaultdict(partial(array.array, 'I')) + self.hist_unflushed_count = 0 + self.hist_flush_count = 0 + self.hist_comp_flush_count = -1 + self.hist_comp_cursor = -1 + self.tx_counts = None self.headers = None self.encoded_headers = LRUCacheWithMetrics(1 << 21, metric_name='encoded_headers', namespace='wallet_server') @@ -188,27 +195,27 @@ class LevelDB: state = ast.literal_eval(state.decode()) if not isinstance(state, dict): raise RuntimeError('failed reading state from history DB') - self.history.flush_count = state['flush_count'] - self.history.comp_flush_count = state.get('comp_flush_count', -1) - self.history.comp_cursor = state.get('comp_cursor', -1) - self.history.db_version = state.get('db_version', 0) + self.hist_flush_count = state['flush_count'] + self.hist_comp_flush_count = state.get('comp_flush_count', -1) + self.hist_comp_cursor = state.get('comp_cursor', -1) + self.hist_db_version = state.get('db_version', 0) else: - self.history.flush_count = 0 - self.history.comp_flush_count = -1 - self.history.comp_cursor = -1 - self.history.db_version = max(self.DB_VERSIONS) + self.hist_flush_count = 0 + self.hist_comp_flush_count = -1 + self.hist_comp_cursor = -1 + self.hist_db_version = max(self.HIST_DB_VERSIONS) - self.logger.info(f'history DB version: {self.history.db_version}') - if self.history.db_version not in self.DB_VERSIONS: - msg = f'this software only handles DB versions {self.DB_VERSIONS}' + self.logger.info(f'history DB version: {self.hist_db_version}') + if self.hist_db_version not in self.HIST_DB_VERSIONS: + msg = f'this software only handles DB versions {self.HIST_DB_VERSIONS}' self.logger.error(msg) raise RuntimeError(msg) - self.logger.info(f'flush count: {self.history.flush_count:,d}') + self.logger.info(f'flush count: {self.hist_flush_count:,d}') # self.history.clear_excess(self.utxo_flush_count) # < might happen at end of compaction as both DBs cannot be # updated atomically - if self.history.flush_count > self.utxo_flush_count: + if self.hist_flush_count > self.utxo_flush_count: self.logger.info('DB shut down uncleanly. Scanning for ' 'excess history flushes...') @@ -221,15 +228,15 @@ class LevelDB: self.logger.info(f'deleting {len(keys):,d} history entries') - self.history.flush_count = self.utxo_flush_count + self.hist_flush_count = self.utxo_flush_count with self.db.write_batch() as batch: for key in keys: batch.delete(HASHX_HISTORY_PREFIX + key) state = { - 'flush_count': self.history.flush_count, - 'comp_flush_count': self.history.comp_flush_count, - 'comp_cursor': self.history.comp_cursor, - 'db_version': self.history.db_version, + 'flush_count': self.hist_flush_count, + 'comp_flush_count': self.hist_comp_flush_count, + 'comp_cursor': self.hist_comp_cursor, + 'db_version': self.hist_db_version, } # History entries are not prefixed; the suffix \0\0 ensures we # look similar to other entries and aren't interfered with @@ -237,7 +244,7 @@ class LevelDB: self.logger.info('deleted excess history entries') - self.utxo_flush_count = self.history.flush_count + self.utxo_flush_count = self.hist_flush_count min_height = self.min_undo_height(self.db_height) keys = [] @@ -328,7 +335,7 @@ class LevelDB: assert not flush_data.adds assert not flush_data.deletes assert not flush_data.undo_infos - assert not self.history.unflushed + assert not self.hist_unflushed def flush_utxo_db(self, batch, flush_data): """Flush the cached DB writes and UTXO set to the batch.""" @@ -369,23 +376,23 @@ class LevelDB: f'{spend_count:,d} spends in ' f'{elapsed:.1f}s, committing...') - self.utxo_flush_count = self.history.flush_count + self.utxo_flush_count = self.hist_flush_count self.db_height = flush_data.height self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip def write_history_state(self, batch): state = { - 'flush_count': self.history.flush_count, - 'comp_flush_count': self.history.comp_flush_count, - 'comp_cursor': self.history.comp_cursor, + 'flush_count': self.hist_flush_count, + 'comp_flush_count': self.hist_comp_flush_count, + 'comp_cursor': self.hist_comp_cursor, 'db_version': self.db_version, } # History entries are not prefixed; the suffix \0\0 ensures we # look similar to other entries and aren't interfered with batch.put(HIST_STATE, repr(state).encode()) - def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): + def flush_dbs(self, flush_data: FlushData, estimate_txs_remaining): """Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.""" if flush_data.height == self.db_height: @@ -444,9 +451,9 @@ class LevelDB: # Then history - self.history.flush_count += 1 - flush_id = pack_be_uint16(self.history.flush_count) - unflushed = self.history.unflushed + self.hist_flush_count += 1 + flush_id = pack_be_uint16(self.hist_flush_count) + unflushed = self.hist_unflushed for hashX in sorted(unflushed): key = hashX + flush_id @@ -454,14 +461,13 @@ class LevelDB: self.write_history_state(batch) unflushed.clear() - self.history.unflushed_count = 0 + self.hist_unflushed_count = 0 ######################### # Flush state last as it reads the wall time. - if flush_utxos: - self.flush_utxo_db(batch, flush_data) + self.flush_utxo_db(batch, flush_data) # self.flush_state(batch) # @@ -481,7 +487,7 @@ class LevelDB: # self.write_utxo_state(batch) elapsed = self.last_flush - start_time - self.logger.info(f'flush #{self.history.flush_count:,d} took ' + self.logger.info(f'flush #{self.hist_flush_count:,d} took ' f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') @@ -509,7 +515,7 @@ class LevelDB: assert not flush_data.headers assert not flush_data.block_txs assert flush_data.height < self.db_height - assert not self.history.unflushed + assert not self.hist_unflushed start_time = time.time() tx_delta = flush_data.tx_count - self.last_flush_tx_count @@ -523,7 +529,7 @@ class LevelDB: ### # Not certain this is needed, but it doesn't hurt - self.history.flush_count += 1 + self.hist_flush_count += 1 nremoves = 0 with self.db.write_batch() as batch: @@ -556,13 +562,10 @@ class LevelDB: self.last_flush = now self.last_flush_tx_count = self.fs_tx_count self.write_utxo_state(batch) - - self.logger.info(f'backing up removed {nremoves:,d} history entries') elapsed = self.last_flush - start_time - self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' - f'{elapsed:.1f}s. Height {flush_data.height:,d} ' - f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + self.logger.info(f'backup flush #{self.hist_flush_count:,d} took {elapsed:.1f}s. ' + f'Height {flush_data.height:,d} txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') def raw_header(self, height): """Return the binary header at the given height."""