From bc7fe680c067c4272bf46feca39865f1427af10a Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 11 Jan 2021 18:13:39 -0500 Subject: [PATCH] consolidate leveldb block advance/reorg -move methods from History to LevelDB --- lbry/wallet/server/block_processor.py | 13 +- lbry/wallet/server/history.py | 471 ++++++++++++-------------- lbry/wallet/server/leveldb.py | 217 +++++++++--- 3 files changed, 403 insertions(+), 298 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 5289cc8f6..de8ee6980 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -384,7 +384,7 @@ class BlockProcessor: one_MB = 1000*1000 utxo_cache_size = len(self.utxo_cache) * 205 db_deletes_size = len(self.db_deletes) * 57 - hist_cache_size = self.db.history.unflushed_memsize() + hist_cache_size = len(self.db.history.unflushed) * 180 + self.db.history.unflushed_count * 4 # Roughly ntxs * 32 + nblocks * 42 tx_hash_size = ((self.tx_count - self.db.fs_tx_count) * 32 + (self.height - self.db.fs_height) * 42) @@ -475,7 +475,16 @@ class BlockProcessor: self.db.total_transactions.append(tx_hash) tx_num += 1 - self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) + # self.db.add_unflushed(hashXs_by_tx, self.tx_count) + first_tx_num = self.tx_count + _unflushed = self.db.history.unflushed + _count = 0 + for _tx_num, _hashXs in enumerate(hashXs_by_tx, start=first_tx_num): + for _hashX in set(_hashXs): + _unflushed[_hashX].append(_tx_num) + _count += len(_hashXs) + self.db.history.unflushed_count += _count + self.tx_count = tx_num self.db.tx_counts.append(tx_num) diff --git a/lbry/wallet/server/history.py b/lbry/wallet/server/history.py index 312eaed03..82f7ceb78 100644 --- a/lbry/wallet/server/history.py +++ b/lbry/wallet/server/history.py @@ -34,150 +34,122 @@ class History: self.max_hist_row_entries = 12500 self.unflushed = defaultdict(partial(array.array, 'I')) self.unflushed_count = 0 - self.db = None + self.flush_count = 0 + self.comp_flush_count = -1 + self.comp_cursor = -1 + # self.db = None - def open_db(self, db, for_sync, utxo_flush_count, compacting): - self.db = db #db_class('hist', for_sync) - self.read_state() - self.clear_excess(utxo_flush_count) - # An incomplete compaction needs to be cancelled otherwise - # restarting it will corrupt the history - if not compacting: - self._cancel_compaction() - return self.flush_count + # def close_db(self): + # if self.db: + # # self.db.close() + # self.db = None - def close_db(self): - if self.db: - # self.db.close() - self.db = None + # def read_state(self): + # state = self.db.get(HIST_STATE) + # if state: + # state = ast.literal_eval(state.decode()) + # if not isinstance(state, dict): + # raise RuntimeError('failed reading state from history DB') + # self.flush_count = state['flush_count'] + # self.comp_flush_count = state.get('comp_flush_count', -1) + # self.comp_cursor = state.get('comp_cursor', -1) + # self.db_version = state.get('db_version', 0) + # else: + # self.flush_count = 0 + # self.comp_flush_count = -1 + # self.comp_cursor = -1 + # self.db_version = max(self.DB_VERSIONS) + # + # self.logger.info(f'history DB version: {self.db_version}') + # if self.db_version not in self.DB_VERSIONS: + # msg = f'this software only handles DB versions {self.DB_VERSIONS}' + # self.logger.error(msg) + # raise RuntimeError(msg) + # self.logger.info(f'flush count: {self.flush_count:,d}') - def read_state(self): - state = self.db.get(HIST_STATE) - if state: - state = ast.literal_eval(state.decode()) - if not isinstance(state, dict): - raise RuntimeError('failed reading state from history DB') - self.flush_count = state['flush_count'] - self.comp_flush_count = state.get('comp_flush_count', -1) - self.comp_cursor = state.get('comp_cursor', -1) - self.db_version = state.get('db_version', 0) - else: - self.flush_count = 0 - self.comp_flush_count = -1 - self.comp_cursor = -1 - self.db_version = max(self.DB_VERSIONS) + # def clear_excess(self, utxo_flush_count): + # # < might happen at end of compaction as both DBs cannot be + # # updated atomically + # if self.flush_count <= utxo_flush_count: + # return + # + # self.logger.info('DB shut down uncleanly. Scanning for ' + # 'excess history flushes...') + # + # keys = [] + # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): + # k = key[1:] + # flush_id, = unpack_be_uint16_from(k[-2:]) + # if flush_id > utxo_flush_count: + # keys.append(k) + # + # self.logger.info(f'deleting {len(keys):,d} history entries') + # + # self.flush_count = utxo_flush_count + # with self.db.write_batch() as batch: + # for key in keys: + # batch.delete(HASHX_HISTORY_PREFIX + key) + # self.write_state(batch) + # + # self.logger.info('deleted excess history entries') + # + # def write_state(self, batch): + # """Write state to the history DB.""" + # state = { + # 'flush_count': self.flush_count, + # 'comp_flush_count': self.comp_flush_count, + # 'comp_cursor': self.comp_cursor, + # 'db_version': self.db_version, + # } + # # History entries are not prefixed; the suffix \0\0 ensures we + # # look similar to other entries and aren't interfered with + # batch.put(HIST_STATE, repr(state).encode()) - self.logger.info(f'history DB version: {self.db_version}') - if self.db_version not in self.DB_VERSIONS: - msg = f'this software only handles DB versions {self.DB_VERSIONS}' - self.logger.error(msg) - raise RuntimeError(msg) - self.logger.info(f'flush count: {self.flush_count:,d}') + # def add_unflushed(self, hashXs_by_tx, first_tx_num): + # unflushed = self.unflushed + # count = 0 + # for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): + # hashXs = set(hashXs) + # for hashX in hashXs: + # unflushed[hashX].append(tx_num) + # count += len(hashXs) + # self.unflushed_count += count - def clear_excess(self, utxo_flush_count): - # < might happen at end of compaction as both DBs cannot be - # updated atomically - if self.flush_count <= utxo_flush_count: - return + # def unflushed_memsize(self): + # return len(self.unflushed) * 180 + self.unflushed_count * 4 - self.logger.info('DB shut down uncleanly. Scanning for ' - 'excess history flushes...') + # def assert_flushed(self): + # assert not self.unflushed - keys = [] - for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): - k = key[1:] - flush_id, = unpack_be_uint16_from(k[-2:]) - if flush_id > utxo_flush_count: - keys.append(k) - - self.logger.info(f'deleting {len(keys):,d} history entries') - - self.flush_count = utxo_flush_count - with self.db.write_batch() as batch: - for key in keys: - batch.delete(HASHX_HISTORY_PREFIX + key) - self.write_state(batch) - - self.logger.info('deleted excess history entries') - - def write_state(self, batch): - """Write state to the history DB.""" - state = { - 'flush_count': self.flush_count, - 'comp_flush_count': self.comp_flush_count, - 'comp_cursor': self.comp_cursor, - 'db_version': self.db_version, - } - # History entries are not prefixed; the suffix \0\0 ensures we - # look similar to other entries and aren't interfered with - batch.put(HIST_STATE, repr(state).encode()) - - def add_unflushed(self, hashXs_by_tx, first_tx_num): - unflushed = self.unflushed - count = 0 - for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): - hashXs = set(hashXs) - for hashX in hashXs: - unflushed[hashX].append(tx_num) - count += len(hashXs) - self.unflushed_count += count - - def unflushed_memsize(self): - return len(self.unflushed) * 180 + self.unflushed_count * 4 - - def assert_flushed(self): - assert not self.unflushed - - def flush(self): - start_time = time.time() - self.flush_count += 1 - flush_id = pack_be_uint16(self.flush_count) - unflushed = self.unflushed - - with self.db.write_batch() as batch: - for hashX in sorted(unflushed): - key = hashX + flush_id - batch.put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes()) - self.write_state(batch) - - count = len(unflushed) - unflushed.clear() - self.unflushed_count = 0 - - if self.db.for_sync: - elapsed = time.time() - start_time - self.logger.info(f'flushed history in {elapsed:.1f}s ' - f'for {count:,d} addrs') - - def backup(self, hashXs, tx_count): - # Not certain this is needed, but it doesn't hurt - self.flush_count += 1 - nremoves = 0 - bisect_left = bisect.bisect_left - - with self.db.write_batch() as batch: - for hashX in sorted(hashXs): - deletes = [] - puts = {} - for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True): - k = key[1:] - a = array.array('I') - a.frombytes(hist) - # Remove all history entries >= tx_count - idx = bisect_left(a, tx_count) - nremoves += len(a) - idx - if idx > 0: - puts[k] = a[:idx].tobytes() - break - deletes.append(k) - - for key in deletes: - batch.delete(key) - for key, value in puts.items(): - batch.put(key, value) - self.write_state(batch) - - self.logger.info(f'backing up removed {nremoves:,d} history entries') + # def backup(self, hashXs, tx_count): + # # Not certain this is needed, but it doesn't hurt + # self.flush_count += 1 + # nremoves = 0 + # bisect_left = bisect.bisect_left + # + # with self.db.write_batch() as batch: + # for hashX in sorted(hashXs): + # deletes = [] + # puts = {} + # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True): + # k = key[1:] + # a = array.array('I') + # a.frombytes(hist) + # # Remove all history entries >= tx_count + # idx = bisect_left(a, tx_count) + # nremoves += len(a) - idx + # if idx > 0: + # puts[k] = a[:idx].tobytes() + # break + # deletes.append(k) + # + # for key in deletes: + # batch.delete(key) + # for key, value in puts.items(): + # batch.put(key, value) + # self.write_state(batch) + # + # self.logger.info(f'backing up removed {nremoves:,d} history entries') # def get_txnums(self, hashX, limit=1000): # """Generator that returns an unpruned, sorted list of tx_nums in the @@ -213,119 +185,120 @@ class History: # When compaction is complete and the final flush takes place, # flush_count is reset to comp_flush_count, and comp_flush_count to -1 - def _flush_compaction(self, cursor, write_items, keys_to_delete): - """Flush a single compaction pass as a batch.""" - # Update compaction state - if cursor == 65536: - self.flush_count = self.comp_flush_count - self.comp_cursor = -1 - self.comp_flush_count = -1 - else: - self.comp_cursor = cursor + # def _flush_compaction(self, cursor, write_items, keys_to_delete): + # """Flush a single compaction pass as a batch.""" + # # Update compaction state + # if cursor == 65536: + # self.flush_count = self.comp_flush_count + # self.comp_cursor = -1 + # self.comp_flush_count = -1 + # else: + # self.comp_cursor = cursor + # + # # History DB. Flush compacted history and updated state + # with self.db.write_batch() as batch: + # # Important: delete first! The keyspace may overlap. + # for key in keys_to_delete: + # batch.delete(HASHX_HISTORY_PREFIX + key) + # for key, value in write_items: + # batch.put(HASHX_HISTORY_PREFIX + key, value) + # self.write_state(batch) - # History DB. Flush compacted history and updated state - with self.db.write_batch() as batch: - # Important: delete first! The keyspace may overlap. - for key in keys_to_delete: - batch.delete(HASHX_HISTORY_PREFIX + key) - for key, value in write_items: - batch.put(HASHX_HISTORY_PREFIX + key, value) - self.write_state(batch) + # def _compact_hashX(self, hashX, hist_map, hist_list, + # write_items, keys_to_delete): + # """Compress history for a hashX. hist_list is an ordered list of + # the histories to be compressed.""" + # # History entries (tx numbers) are 4 bytes each. Distribute + # # over rows of up to 50KB in size. A fixed row size means + # # future compactions will not need to update the first N - 1 + # # rows. + # max_row_size = self.max_hist_row_entries * 4 + # full_hist = b''.join(hist_list) + # nrows = (len(full_hist) + max_row_size - 1) // max_row_size + # if nrows > 4: + # self.logger.info('hashX {} is large: {:,d} entries across ' + # '{:,d} rows' + # .format(hash_to_hex_str(hashX), + # len(full_hist) // 4, nrows)) + # + # # Find what history needs to be written, and what keys need to + # # be deleted. Start by assuming all keys are to be deleted, + # # and then remove those that are the same on-disk as when + # # compacted. + # write_size = 0 + # keys_to_delete.update(hist_map) + # for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): + # key = hashX + pack_be_uint16(n) + # if hist_map.get(key) == chunk: + # keys_to_delete.remove(key) + # else: + # write_items.append((key, chunk)) + # write_size += len(chunk) + # + # assert n + 1 == nrows + # self.comp_flush_count = max(self.comp_flush_count, n) + # + # return write_size - def _compact_hashX(self, hashX, hist_map, hist_list, - write_items, keys_to_delete): - """Compress history for a hashX. hist_list is an ordered list of - the histories to be compressed.""" - # History entries (tx numbers) are 4 bytes each. Distribute - # over rows of up to 50KB in size. A fixed row size means - # future compactions will not need to update the first N - 1 - # rows. - max_row_size = self.max_hist_row_entries * 4 - full_hist = b''.join(hist_list) - nrows = (len(full_hist) + max_row_size - 1) // max_row_size - if nrows > 4: - self.logger.info('hashX {} is large: {:,d} entries across ' - '{:,d} rows' - .format(hash_to_hex_str(hashX), - len(full_hist) // 4, nrows)) + # def _compact_prefix(self, prefix, write_items, keys_to_delete): + # """Compact all history entries for hashXs beginning with the + # given prefix. Update keys_to_delete and write.""" + # prior_hashX = None + # hist_map = {} + # hist_list = [] + # + # key_len = HASHX_LEN + 2 + # write_size = 0 + # for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix): + # k = key[1:] + # # Ignore non-history entries + # if len(k) != key_len: + # continue + # hashX = k[:-2] + # if hashX != prior_hashX and prior_hashX: + # write_size += self._compact_hashX(prior_hashX, hist_map, + # hist_list, write_items, + # keys_to_delete) + # hist_map.clear() + # hist_list.clear() + # prior_hashX = hashX + # hist_map[k] = hist + # hist_list.append(hist) + # + # if prior_hashX: + # write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, + # write_items, keys_to_delete) + # return write_size - # Find what history needs to be written, and what keys need to - # be deleted. Start by assuming all keys are to be deleted, - # and then remove those that are the same on-disk as when - # compacted. - write_size = 0 - keys_to_delete.update(hist_map) - for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - key = hashX + pack_be_uint16(n) - if hist_map.get(key) == chunk: - keys_to_delete.remove(key) - else: - write_items.append((key, chunk)) - write_size += len(chunk) + # def _compact_history(self, limit): + # """Inner loop of history compaction. Loops until limit bytes have + # been processed. + # """ + # fnord + # keys_to_delete = set() + # write_items = [] # A list of (key, value) pairs + # write_size = 0 + # + # # Loop over 2-byte prefixes + # cursor = self.comp_cursor + # while write_size < limit and cursor < 65536: + # prefix = pack_be_uint16(cursor) + # write_size += self._compact_prefix(prefix, write_items, + # keys_to_delete) + # cursor += 1 + # + # max_rows = self.comp_flush_count + 1 + # self._flush_compaction(cursor, write_items, keys_to_delete) + # + # self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), ' + # 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete' + # .format(len(write_items), write_size / 1000000, + # len(keys_to_delete), max_rows, + # 100 * cursor / 65536)) + # return write_size - assert n + 1 == nrows - self.comp_flush_count = max(self.comp_flush_count, n) - - return write_size - - def _compact_prefix(self, prefix, write_items, keys_to_delete): - """Compact all history entries for hashXs beginning with the - given prefix. Update keys_to_delete and write.""" - prior_hashX = None - hist_map = {} - hist_list = [] - - key_len = HASHX_LEN + 2 - write_size = 0 - for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + prefix): - k = key[1:] - # Ignore non-history entries - if len(k) != key_len: - continue - hashX = k[:-2] - if hashX != prior_hashX and prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, - hist_list, write_items, - keys_to_delete) - hist_map.clear() - hist_list.clear() - prior_hashX = hashX - hist_map[k] = hist - hist_list.append(hist) - - if prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, - write_items, keys_to_delete) - return write_size - - def _compact_history(self, limit): - """Inner loop of history compaction. Loops until limit bytes have - been processed. - """ - keys_to_delete = set() - write_items = [] # A list of (key, value) pairs - write_size = 0 - - # Loop over 2-byte prefixes - cursor = self.comp_cursor - while write_size < limit and cursor < 65536: - prefix = pack_be_uint16(cursor) - write_size += self._compact_prefix(prefix, write_items, - keys_to_delete) - cursor += 1 - - max_rows = self.comp_flush_count + 1 - self._flush_compaction(cursor, write_items, keys_to_delete) - - self.logger.info('history compaction: wrote {:,d} rows ({:.1f} MB), ' - 'removed {:,d} rows, largest: {:,d}, {:.1f}% complete' - .format(len(write_items), write_size / 1000000, - len(keys_to_delete), max_rows, - 100 * cursor / 65536)) - return write_size - - def _cancel_compaction(self): - if self.comp_cursor != -1: - self.logger.warning('cancelling in-progress history compaction') - self.comp_flush_count = -1 - self.comp_cursor = -1 + # def _cancel_compaction(self): + # if self.comp_cursor != -1: + # self.logger.warning('cancelling in-progress history compaction') + # self.comp_flush_count = -1 + # self.comp_cursor = -1 diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index e3104de4c..9c3d26cd7 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -19,7 +19,7 @@ import zlib import typing from typing import Optional, List, Tuple, Iterable from asyncio import sleep -from bisect import bisect_right +from bisect import bisect_right, bisect_left from collections import namedtuple from glob import glob from struct import pack, unpack @@ -29,7 +29,7 @@ from lbry.utils import LRUCacheWithMetrics from lbry.wallet.server import util from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.merkle import Merkle, MerkleCache -from lbry.wallet.server.util import formatted_time, pack_be_uint16 +from lbry.wallet.server.util import formatted_time, pack_be_uint16, unpack_be_uint16_from from lbry.wallet.server.storage import db_class from lbry.wallet.server.history import History @@ -41,6 +41,7 @@ BLOCK_HASH_PREFIX = b'C' HEADER_PREFIX = b'H' TX_NUM_PREFIX = b'N' TX_COUNT_PREFIX = b'T' +UNDO_PREFIX = b'U' TX_HASH_PREFIX = b'X' HASHX_UTXO_PREFIX = b'h' @@ -50,9 +51,6 @@ UTXO_PREFIX = b'u' HASHX_HISTORY_PREFIX = b'x' - - - @attr.s(slots=True) class FlushData: height = attr.ib() @@ -107,6 +105,19 @@ class LevelDB: self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") self.total_transactions = None + # def add_unflushed(self, hashXs_by_tx, first_tx_num): + # unflushed = self.history.unflushed + # count = 0 + # for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): + # hashXs = set(hashXs) + # for hashX in hashXs: + # unflushed[hashX].append(tx_num) + # count += len(hashXs) + # self.history.unflushed_count += count + + # def unflushed_memsize(self): + # return len(self.history.unflushed) * 180 + self.history.unflushed_count * 4 + async def _read_tx_counts(self): if self.tx_counts is not None: return @@ -172,10 +183,88 @@ class LevelDB: self.read_utxo_state() # Then history DB - self.utxo_flush_count = self.history.open_db( - self.db, for_sync, self.utxo_flush_count, compacting - ) - self.clear_excess_undo_info() + state = self.db.get(HIST_STATE) + if state: + state = ast.literal_eval(state.decode()) + if not isinstance(state, dict): + raise RuntimeError('failed reading state from history DB') + self.history.flush_count = state['flush_count'] + self.history.comp_flush_count = state.get('comp_flush_count', -1) + self.history.comp_cursor = state.get('comp_cursor', -1) + self.history.db_version = state.get('db_version', 0) + else: + self.history.flush_count = 0 + self.history.comp_flush_count = -1 + self.history.comp_cursor = -1 + self.history.db_version = max(self.DB_VERSIONS) + + self.logger.info(f'history DB version: {self.history.db_version}') + if self.history.db_version not in self.DB_VERSIONS: + msg = f'this software only handles DB versions {self.DB_VERSIONS}' + self.logger.error(msg) + raise RuntimeError(msg) + self.logger.info(f'flush count: {self.history.flush_count:,d}') + + # self.history.clear_excess(self.utxo_flush_count) + # < might happen at end of compaction as both DBs cannot be + # updated atomically + if self.history.flush_count > self.utxo_flush_count: + self.logger.info('DB shut down uncleanly. Scanning for ' + 'excess history flushes...') + + keys = [] + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX): + k = key[1:] + flush_id, = unpack_be_uint16_from(k[-2:]) + if flush_id > self.utxo_flush_count: + keys.append(k) + + self.logger.info(f'deleting {len(keys):,d} history entries') + + self.history.flush_count = self.utxo_flush_count + with self.db.write_batch() as batch: + for key in keys: + batch.delete(HASHX_HISTORY_PREFIX + key) + state = { + 'flush_count': self.history.flush_count, + 'comp_flush_count': self.history.comp_flush_count, + 'comp_cursor': self.history.comp_cursor, + 'db_version': self.history.db_version, + } + # History entries are not prefixed; the suffix \0\0 ensures we + # look similar to other entries and aren't interfered with + batch.put(HIST_STATE, repr(state).encode()) + + self.logger.info('deleted excess history entries') + + self.utxo_flush_count = self.history.flush_count + + min_height = self.min_undo_height(self.db_height) + keys = [] + for key, hist in self.db.iterator(prefix=UNDO_PREFIX): + height, = unpack('>I', key[-4:]) + if height >= min_height: + break + keys.append(key) + + if keys: + with self.db.write_batch() as batch: + for key in keys: + batch.delete(key) + self.logger.info(f'deleted {len(keys):,d} stale undo entries') + + # delete old block files + prefix = self.raw_block_prefix() + paths = [path for path in glob(f'{prefix}[0-9]*') + if len(path) > len(prefix) + and int(path[len(prefix):]) < min_height] + if paths: + for path in paths: + try: + os.remove(path) + except FileNotFoundError: + pass + self.logger.info(f'deleted {len(paths):,d} stale block files') # Read TX counts (requires meta directory) await self._read_tx_counts() @@ -185,7 +274,6 @@ class LevelDB: def close(self): self.db.close() - self.history.close_db() self.executor.shutdown(wait=True) self.executor = None @@ -240,7 +328,7 @@ class LevelDB: assert not flush_data.adds assert not flush_data.deletes assert not flush_data.undo_infos - self.history.assert_flushed() + assert not self.history.unflushed def flush_utxo_db(self, batch, flush_data): """Flush the cached DB writes and UTXO set to the batch.""" @@ -263,12 +351,13 @@ class LevelDB: # suffix = tx_idx + tx_num hashX = value[:-12] suffix = key[-2:] + value[-12:-8] - batch_put(b'h' + key[:4] + suffix, hashX) - batch_put(b'u' + hashX + suffix, value[-8:]) + batch_put(HASHX_UTXO_PREFIX + key[:4] + suffix, hashX) + batch_put(UTXO_PREFIX + hashX + suffix, value[-8:]) flush_data.adds.clear() # New undo information - self.flush_undo_infos(batch_put, flush_data.undo_infos) + for undo_info, height in flush_data.undo_infos: + batch_put(self.undo_key(height), b''.join(undo_info)) flush_data.undo_infos.clear() if self.db.for_sync: @@ -285,6 +374,17 @@ class LevelDB: self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip + def write_history_state(self, batch): + state = { + 'flush_count': self.history.flush_count, + 'comp_flush_count': self.history.comp_flush_count, + 'comp_cursor': self.history.comp_cursor, + 'db_version': self.db_version, + } + # History entries are not prefixed; the suffix \0\0 ensures we + # look similar to other entries and aren't interfered with + batch.put(HIST_STATE, repr(state).encode()) + def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): """Flush out cached state. History is always flushed; UTXOs are flushed if flush_utxos.""" @@ -351,7 +451,7 @@ class LevelDB: for hashX in sorted(unflushed): key = hashX + flush_id batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes()) - self.history.write_state(batch) + self.write_history_state(batch) unflushed.clear() self.history.unflushed_count = 0 @@ -396,45 +496,74 @@ class LevelDB: self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' f'ETA: {formatted_time(eta)}') - def flush_state(self, batch): - """Flush chain state to the batch.""" - now = time.time() - self.wall_time += now - self.last_flush - self.last_flush = now - self.last_flush_tx_count = self.fs_tx_count - self.write_utxo_state(batch) + # def flush_state(self, batch): + # """Flush chain state to the batch.""" + # now = time.time() + # self.wall_time += now - self.last_flush + # self.last_flush = now + # self.last_flush_tx_count = self.fs_tx_count + # self.write_utxo_state(batch) def flush_backup(self, flush_data, touched): """Like flush_dbs() but when backing up. All UTXOs are flushed.""" assert not flush_data.headers assert not flush_data.block_txs assert flush_data.height < self.db_height - self.history.assert_flushed() + assert not self.history.unflushed start_time = time.time() tx_delta = flush_data.tx_count - self.last_flush_tx_count + ### + while self.fs_height > flush_data.height: + self.fs_height -= 1 + self.headers.pop() + self.fs_tx_count = flush_data.tx_count + # Truncate header_mc: header count is 1 more than the height. + self.header_mc.truncate(flush_data.height + 1) + + ### + # Not certain this is needed, but it doesn't hurt + self.history.flush_count += 1 + nremoves = 0 - self.backup_fs(flush_data.height, flush_data.tx_count) - self.history.backup(touched, flush_data.tx_count) with self.db.write_batch() as batch: + tx_count = flush_data.tx_count + for hashX in sorted(touched): + deletes = [] + puts = {} + for key, hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, reverse=True): + k = key[1:] + a = array.array('I') + a.frombytes(hist) + # Remove all history entries >= tx_count + idx = bisect_left(a, tx_count) + nremoves += len(a) - idx + if idx > 0: + puts[k] = a[:idx].tobytes() + break + deletes.append(k) + + for key in deletes: + batch.delete(key) + for key, value in puts.items(): + batch.put(key, value) + self.write_history_state(batch) + self.flush_utxo_db(batch, flush_data) # Flush state last as it reads the wall time. - self.flush_state(batch) + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + self.last_flush_tx_count = self.fs_tx_count + self.write_utxo_state(batch) + + self.logger.info(f'backing up removed {nremoves:,d} history entries') elapsed = self.last_flush - start_time self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') - def backup_fs(self, height, tx_count): - """Back up during a reorg. This just updates our pointers.""" - while self.fs_height > height: - self.fs_height -= 1 - self.headers.pop() - self.fs_tx_count = tx_count - # Truncate header_mc: header count is 1 more than the height. - self.header_mc.truncate(height + 1) - def raw_header(self, height): """Return the binary header at the given height.""" header, n = self.read_headers(height, 1) @@ -555,7 +684,7 @@ class LevelDB: cnt = 0 txs = [] - for hist in self.history.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): + for hist in self.db.iterator(prefix=HASHX_HISTORY_PREFIX + hashX, include_key=False): a = array.array('I') a.frombytes(hist) for tx_num in a: @@ -586,17 +715,12 @@ class LevelDB: def undo_key(self, height): """DB key for undo information at the given height.""" - return b'U' + pack('>I', height) + return UNDO_PREFIX + pack('>I', height) def read_undo_info(self, height): """Read undo information from a file for the current height.""" return self.db.get(self.undo_key(height)) - def flush_undo_infos(self, batch_put, undo_infos): - """undo_infos is a list of (undo_info, height) pairs.""" - for undo_info, height in undo_infos: - batch_put(self.undo_key(height), b''.join(undo_info)) - def raw_block_prefix(self): return 'block' @@ -626,10 +750,9 @@ class LevelDB: def clear_excess_undo_info(self): """Clear excess undo info. Only most recent N are kept.""" - prefix = b'U' min_height = self.min_undo_height(self.db_height) keys = [] - for key, hist in self.db.iterator(prefix=prefix): + for key, hist in self.db.iterator(prefix=UNDO_PREFIX): height, = unpack('>I', key[-4:]) if height >= min_height: break @@ -733,7 +856,7 @@ class LevelDB: fs_tx_hash = self.fs_tx_hash # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer - prefix = b'u' + hashX + prefix = UTXO_PREFIX + hashX for db_key, db_value in self.db.iterator(prefix=prefix): tx_pos, tx_num = s_unpack('