diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index e44f2bd0b..6615e7b0a 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1,12 +1,14 @@ import time import asyncio import typing +import struct from bisect import bisect_right from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, List, Tuple, Set, DefaultDict, Dict from prometheus_client import Gauge, Histogram from collections import defaultdict +import array import lbry from lbry.schema.claim import Claim from lbry.schema.mime_types import guess_stream_type @@ -195,21 +197,18 @@ class BlockProcessor: # Meta self.next_cache_check = 0 self.touched = set() - self.reorg_count = 0 # Caches of unflushed items. - self.headers = [] self.block_hashes = [] self.block_txs = [] self.undo_infos = [] # UTXO cache - self.utxo_cache = {} + self.utxo_cache: Dict[Tuple[bytes, int], bytes] = {} self.db_deletes = [] # Claimtrie cache self.db_op_stack: Optional[RevertableOpStack] = None - self.undo_claims = [] # If the lock is successfully acquired, in-memory chain state # is consistent with self.height @@ -263,6 +262,7 @@ class BlockProcessor: self.doesnt_have_valid_signature: Set[bytes] = set() self.claim_channels: Dict[bytes, bytes] = {} + self.hashXs_by_tx: DefaultDict[bytes, List[int]] = defaultdict(list) def claim_producer(self): if self.db.db_height <= 1: @@ -295,6 +295,7 @@ class BlockProcessor: """Process the list of raw blocks passed. Detects and handles reorgs. """ + if not raw_blocks: return first = self.height + 1 @@ -305,7 +306,7 @@ class BlockProcessor: chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] if hprevs == chain: - start = time.perf_counter() + total_start = time.perf_counter() try: for block in blocks: start = time.perf_counter() @@ -323,14 +324,7 @@ class BlockProcessor: except: self.logger.exception("advance blocks failed") raise - # if self.sql: - - # for cache in self.search_cache.values(): - # cache.clear() - self.history_cache.clear() # TODO: is this needed? - self.notifications.notified_mempool_txs.clear() - - processed_time = time.perf_counter() - start + processed_time = time.perf_counter() - total_start self.block_count_metric.set(self.height) self.block_update_time_metric.observe(processed_time) self.status_server.set_height(self.db.fs_height, self.db.db_tip) @@ -338,13 +332,32 @@ class BlockProcessor: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) if self._caught_up_event.is_set(): - # if self.sql: - # await self.db.search_index.apply_filters(self.sql.blocked_streams, self.sql.blocked_channels, - # self.sql.filtered_streams, self.sql.filtered_channels) await self.notifications.on_block(self.touched, self.height) self.touched = set() elif hprevs[0] != chain[0]: - await self.reorg_chain() + min_start_height = max(self.height - self.coin.REORG_LIMIT, 0) + count = 1 + block_hashes_from_lbrycrd = await self.daemon.block_hex_hashes( + min_start_height, self.coin.REORG_LIMIT + ) + for height, block_hash in zip( + reversed(range(min_start_height, min_start_height + self.coin.REORG_LIMIT)), + reversed(block_hashes_from_lbrycrd)): + if self.block_hashes[height][::-1].hex() == block_hash: + break + count += 1 + self.logger.warning(f"blockchain reorg detected at {self.height}, unwinding last {count} blocks") + try: + assert count > 0, count + for _ in range(count): + await self.run_in_thread_with_lock(self.backup_block) + await self.prefetcher.reset_height(self.height) + self.reorg_count_metric.inc() + except: + self.logger.exception("reorg blocks failed") + raise + finally: + self.logger.info("backed up to block %i", self.height) else: # It is probably possible but extremely rare that what # bitcoind returns doesn't form a chain because it @@ -355,101 +368,26 @@ class BlockProcessor: 'resetting the prefetcher') await self.prefetcher.reset_height(self.height) - async def reorg_chain(self, count: Optional[int] = None): - """Handle a chain reorganisation. - - Count is the number of blocks to simulate a reorg, or None for - a real reorg.""" - if count is None: - self.logger.info('chain reorg detected') - else: - self.logger.info(f'faking a reorg of {count:,d} blocks') - - async def get_raw_blocks(last_height, hex_hashes): - heights = range(last_height, last_height - len(hex_hashes), -1) - try: - blocks = [await self.db.read_raw_block(height) for height in heights] - self.logger.info(f'read {len(blocks)} blocks from disk') - return blocks - except FileNotFoundError: - return await self.daemon.raw_blocks(hex_hashes) - - try: - start, last, hashes = await self.reorg_hashes(count) - # Reverse and convert to hex strings. - hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)] - self.logger.info("reorg %i block hashes", len(hashes)) - - for hex_hashes in chunks(hashes, 50): - raw_blocks = await get_raw_blocks(last, hex_hashes) - self.logger.info("got %i raw blocks", len(raw_blocks)) - await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks) - last -= len(raw_blocks) - - await self.prefetcher.reset_height(self.height) - self.reorg_count_metric.inc() - except: - self.logger.exception("boom") - raise - finally: - self.logger.info("done with reorg") - - async def reorg_hashes(self, count): - """Return a pair (start, last, hashes) of blocks to back up during a - reorg. - - The hashes are returned in order of increasing height. Start - is the height of the first hash, last of the last. - """ - - """Calculate the reorg range""" - - def diff_pos(hashes1, hashes2): - """Returns the index of the first difference in the hash lists. - If both lists match returns their length.""" - for n, (hash1, hash2) in enumerate(zip(hashes1, hashes2)): - if hash1 != hash2: - return n - return len(hashes) - - if count is None: - # A real reorg - start = self.height - 1 - count = 1 - while start > 0: - hashes = await self.db.fs_block_hashes(start, count) - hex_hashes = [hash_to_hex_str(hash) for hash in hashes] - d_hex_hashes = await self.daemon.block_hex_hashes(start, count) - n = diff_pos(hex_hashes, d_hex_hashes) - if n > 0: - start += n - break - count = min(count * 2, start) - start -= count - - count = (self.height - start) + 1 - else: - start = (self.height - count) + 1 - last = start + count - 1 - s = '' if count == 1 else 's' - self.logger.info(f'chain was reorganised replacing {count:,d} ' - f'block{s} at heights {start:,d}-{last:,d}') - - return start, last, await self.db.fs_block_hashes(start, count) # - Flushing def flush_data(self): """The data for a flush. The lock must be taken.""" assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.headers, self.block_hashes, - self.block_txs, self.db_op_stack, self.undo_infos, self.utxo_cache, - self.db_deletes, self.tip, self.undo_claims) + return FlushData(self.height, self.tx_count, self.block_hashes, + self.block_txs, self.db_op_stack, self.tip) async def flush(self): def flush(): self.db.flush_dbs(self.flush_data()) await self.run_in_thread_with_lock(flush) + async def write_state(self): + def flush(): + with self.db.db.write_batch() as batch: + self.db.write_db_state(batch) + + await self.run_in_thread_with_lock(flush) + def _add_claim_or_update(self, height: int, txo: 'Output', tx_hash: bytes, tx_num: int, nout: int, spent_claims: typing.Dict[bytes, typing.Tuple[int, int, str]]): try: @@ -1167,51 +1105,51 @@ class BlockProcessor: block_hash = self.coin.header_hash(block.header) self.block_hashes.append(block_hash) - self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) + self.db_op_stack.append(RevertablePut(*Prefixes.block_hash.pack_item(height, block_hash))) - first_tx_num = self.tx_count - undo_info = [] - hashXs_by_tx = [] tx_count = self.tx_count # Use local vars for speed in the loops - put_utxo = self.utxo_cache.__setitem__ - claimtrie_stash_extend = self.db_op_stack.extend spend_utxo = self.spend_utxo - undo_info_append = undo_info.append - update_touched = self.touched.update - append_hashX_by_tx = hashXs_by_tx.append - hashX_from_script = self.coin.hashX_from_script + add_utxo = self.add_utxo + + spend_claim_or_support_txo = self._spend_claim_or_support_txo + add_claim_or_support = self._add_claim_or_support for tx, tx_hash in txs: spent_claims = {} - - hashXs = [] # hashXs touched by spent inputs/rx outputs - append_hashX = hashXs.append - tx_numb = pack('= self.daemon.cached_height() - self.env.reorg_limit: - self.undo_infos.append((undo_info, height)) - self.undo_claims.append((undo_claims, height)) - self.db.write_raw_block(block.raw, height) + self.db_op_stack.append(RevertablePut(*Prefixes.undo.pack_item(height, self.db_op_stack.get_undo_ops()))) self.height = height - self.headers.append(block.header) + self.db.headers.append(block.header) self.tip = self.coin.header_hash(block.header) self.db.flush_dbs(self.flush_data()) + self.clear_after_advance_or_reorg() + def clear_after_advance_or_reorg(self): self.db_op_stack.clear() self.txo_to_claim.clear() self.claim_hash_to_txo.clear() @@ -1277,186 +1216,83 @@ class BlockProcessor: self.expired_claim_hashes.clear() self.doesnt_have_valid_signature.clear() self.claim_channels.clear() - - # for cache in self.search_cache.values(): - # cache.clear() + self.utxo_cache.clear() + self.hashXs_by_tx.clear() self.history_cache.clear() self.notifications.notified_mempool_txs.clear() - def backup_blocks(self, raw_blocks): - """Backup the raw blocks and flush. - - The blocks should be in order of decreasing height, starting at. - self.height. A flush is performed once the blocks are backed up. - """ + def backup_block(self): self.db.assert_flushed(self.flush_data()) - assert self.height >= len(raw_blocks) - - coin = self.coin - for raw_block in raw_blocks: - self.logger.info("backup block %i", self.height) - # Check and update self.tip - block = coin.block(raw_block, self.height) - header_hash = coin.header_hash(block.header) - if header_hash != self.tip: - raise ChainError('backup block {} not tip {} at height {:,d}' - .format(hash_to_hex_str(header_hash), - hash_to_hex_str(self.tip), - self.height)) - self.tip = coin.header_prevhash(block.header) - self.backup_txs(block.transactions) - self.height -= 1 - self.db.tx_counts.pop() - - # self.touched can include other addresses which is - # harmless, but remove None. - self.touched.discard(None) - - self.db.flush_backup(self.flush_data(), self.touched) - self.logger.info(f'backed up to height {self.height:,d}') - - def backup_txs(self, txs): - # Prevout values, in order down the block (coinbase first if present) - # undo_info is in reverse block order - undo_info, undo_claims = self.db.read_undo_info(self.height) - if undo_info is None: + self.logger.info("backup block %i", self.height) + # Check and update self.tip + undo_ops = self.db.read_undo_info(self.height) + if undo_ops is None: raise ChainError(f'no undo information found for height {self.height:,d}') - n = len(undo_info) - - # Use local vars for speed in the loops - s_pack = pack - undo_entry_len = 12 + HASHX_LEN - - for tx, tx_hash in reversed(txs): - for idx, txout in enumerate(tx.outputs): - # Spend the TX outputs. Be careful with unspendable - # outputs - we didn't save those in the first place. - hashX = self.coin.hashX_from_script(txout.pk_script) - if hashX: - cache_value = self.spend_utxo(tx_hash, idx) - self.touched.add(cache_value[:-12]) - - # Restore the inputs - for txin in reversed(tx.inputs): - if txin.is_generation(): - continue - n -= undo_entry_len - undo_item = undo_info[n:n + undo_entry_len] - self.utxo_cache[txin.prev_hash + s_pack(' self.db.tx_counts[-1]: self.db.transaction_num_mapping.pop(self.db.total_transactions.pop()) + self.tx_count -= 1 + self.height -= 1 + # self.touched can include other addresses which is + # harmless, but remove None. + self.touched.discard(None) + self.db.flush_backup(self.flush_data()) + self.clear_after_advance_or_reorg() + self.logger.info(f'backed up to height {self.height:,d}') - assert n == 0 - self.tx_count -= len(txs) - self.undo_claims.append((undo_claims, self.height)) - - """An in-memory UTXO cache, representing all changes to UTXO state - since the last DB flush. - - We want to store millions of these in memory for optimal - performance during initial sync, because then it is possible to - spend UTXOs without ever going to the database (other than as an - entry in the address history, and there is only one such entry per - TX not per UTXO). So store them in a Python dictionary with - binary keys and values. - - Key: TX_HASH + TX_IDX (32 + 2 = 34 bytes) - Value: HASHX + TX_NUM + VALUE (11 + 4 + 8 = 23 bytes) - - That's 57 bytes of raw data in-memory. Python dictionary overhead - means each entry actually uses about 205 bytes of memory. So - almost 5 million UTXOs can fit in 1GB of RAM. There are - approximately 42 million UTXOs on bitcoin mainnet at height - 433,000. - - Semantics: - - add: Add it to the cache dictionary. - - spend: Remove it if in the cache dictionary. Otherwise it's - been flushed to the DB. Each UTXO is responsible for two - entries in the DB. Mark them for deletion in the next - cache flush. - - The UTXO database format has to be able to do two things efficiently: - - 1. Given an address be able to list its UTXOs and their values - so its balance can be efficiently computed. - - 2. When processing transactions, for each prevout spent - a (tx_hash, - idx) pair - we have to be able to remove it from the DB. To send - notifications to clients we also need to know any address it paid - to. - - To this end we maintain two "tables", one for each point above: - - 1. Key: b'u' + address_hashX + tx_idx + tx_num - Value: the UTXO value as a 64-bit unsigned integer - - 2. Key: b'h' + compressed_tx_hash + tx_idx + tx_num - Value: hashX - - The compressed tx hash is just the first few bytes of the hash of - the tx in which the UTXO was created. As this is not unique there - will be potential collisions so tx_num is also in the key. When - looking up a UTXO the prefix space of the compressed hash needs to - be searched and resolved if necessary with the tx_num. The - collision rate is low (<0.1%). - """ - - def spend_utxo(self, tx_hash, tx_idx): - """Spend a UTXO and return the 33-byte value. - - If the UTXO is not in the cache it must be on disk. We store - all UTXOs so not finding one indicates a logic error or DB - corruption. - """ + def add_utxo(self, tx_hash: bytes, tx_num: int, nout: int, txout: 'TxOutput') -> Optional[bytes]: + hashX = self.coin.hashX_from_script(txout.pk_script) + if hashX: + self.utxo_cache[(tx_hash, nout)] = hashX + self.db_op_stack.extend([ + RevertablePut( + *Prefixes.utxo.pack_item(hashX, tx_num, nout, txout.value) + ), + RevertablePut( + *Prefixes.hashX_utxo.pack_item(tx_hash[:4], tx_num, nout, hashX) + ) + ]) + return hashX + def spend_utxo(self, tx_hash: bytes, nout: int): # Fast track is it being in the cache - idx_packed = pack(' 1: - tx_num, = unpack('False state. first_sync = self.db.first_sync self.db.first_sync = False - await self.flush() + await self.write_state() if first_sync: self.logger.info(f'{lbry.__version__} synced to ' f'height {self.height:,d}, halting here.') diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 2faa3bff6..4441e7c4f 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -65,16 +65,10 @@ TXO_STRUCT_pack = TXO_STRUCT.pack class FlushData: height = attr.ib() tx_count = attr.ib() - headers = attr.ib() block_hashes = attr.ib() block_txs = attr.ib() - claimtrie_stash = attr.ib() - # The following are flushed to the UTXO DB if undo_infos is not None - undo_infos = attr.ib() - adds = attr.ib() - deletes = attr.ib() + put_and_delete_ops = attr.ib() tip = attr.ib() - undo = attr.ib() OptionalResolveResultOrError = Optional[typing.Union[ResolveResult, LookupError, ValueError]] @@ -143,9 +137,6 @@ class LevelDB: self.merkle = Merkle() self.header_mc = MerkleCache(self.merkle, self.fs_block_hashes) - self.headers_db = None - self.tx_db = None - self._tx_and_merkle_cache = LRUCacheWithMetrics(2 ** 17, metric_name='tx_and_merkle', namespace="wallet_server") self.total_transactions = None self.transaction_num_mapping = {} @@ -748,61 +739,8 @@ class LevelDB: raise RuntimeError(msg) self.logger.info(f'flush count: {self.hist_flush_count:,d}') - # self.history.clear_excess(self.utxo_flush_count) - # < might happen at end of compaction as both DBs cannot be - # updated atomically - if self.hist_flush_count > self.utxo_flush_count: - self.logger.info('DB shut down uncleanly. Scanning for excess history flushes...') - - keys = [] - for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value): - k = key[1:] - flush_id = int.from_bytes(k[-4:], byteorder='big') - if flush_id > self.utxo_flush_count: - keys.append(k) - - self.logger.info(f'deleting {len(keys):,d} history entries') - - self.hist_flush_count = self.utxo_flush_count - with self.db.write_batch() as batch: - for key in keys: - batch.delete(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key) - if keys: - self.logger.info('deleted %i excess history entries', len(keys)) - self.utxo_flush_count = self.hist_flush_count - min_height = self.min_undo_height(self.db_height) - keys = [] - for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value): - height, = unpack('>I', key[-4:]) - if height >= min_height: - break - keys.append(key) - if min_height > 0: - for key in self.db.iterator(start=DB_PREFIXES.undo_claimtrie.value, - stop=Prefixes.undo.pack_key(min_height), - include_value=False): - keys.append(key) - if keys: - with self.db.write_batch() as batch: - for key in keys: - batch.delete(key) - self.logger.info(f'deleted {len(keys):,d} stale undo entries') - - # delete old block files - prefix = self.raw_block_prefix() - paths = [path for path in glob(f'{prefix}[0-9]*') - if len(path) > len(prefix) - and int(path[len(prefix):]) < min_height] - if paths: - for path in paths: - try: - os.remove(path) - except FileNotFoundError: - pass - self.logger.info(f'deleted {len(paths):,d} stale block files') - # Read TX counts (requires meta directory) await self._read_tx_counts() if self.total_transactions is None: @@ -836,129 +774,50 @@ class LevelDB: assert flush_data.tx_count == self.fs_tx_count == self.db_tx_count assert flush_data.height == self.fs_height == self.db_height assert flush_data.tip == self.db_tip - assert not flush_data.headers assert not flush_data.block_txs - assert not flush_data.adds - assert not flush_data.deletes - assert not flush_data.undo_infos - assert not self.hist_unflushed + assert not len(flush_data.put_and_delete_ops) def flush_dbs(self, flush_data: FlushData): - """Flush out cached state. History is always flushed; UTXOs are - flushed if flush_utxos.""" - if flush_data.height == self.db_height: self.assert_flushed(flush_data) return - # start_time = time.time() - prior_flush = self.last_flush - tx_delta = flush_data.tx_count - self.last_flush_tx_count - - # Flush to file system - # self.flush_fs(flush_data) - prior_tx_count = (self.tx_counts[self.fs_height] - if self.fs_height >= 0 else 0) - - assert len(flush_data.block_txs) == len(flush_data.headers) - assert flush_data.height == self.fs_height + len(flush_data.headers) - assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts - else 0) - assert len(self.tx_counts) == flush_data.height + 1 - assert len( - b''.join(hashes for hashes, _ in flush_data.block_txs) - ) // 32 == flush_data.tx_count - prior_tx_count, f"{len(b''.join(hashes for hashes, _ in flush_data.block_txs)) // 32} != {flush_data.tx_count}" - - # Write the headers - # start_time = time.perf_counter() + min_height = self.min_undo_height(self.db_height) + delete_undo_keys = [] + if min_height > 0: + delete_undo_keys.extend( + self.db.iterator( + start=Prefixes.undo.pack_key(0), stop=Prefixes.undo.pack_key(min_height), include_value=False + ) + ) with self.db.write_batch() as batch: - self.put = batch.put - batch_put = self.put + batch_put = batch.put batch_delete = batch.delete - height_start = self.fs_height + 1 - tx_num = prior_tx_count - for i, (header, block_hash, (tx_hashes, txs)) in enumerate( - zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)): - batch_put(DB_PREFIXES.HEADER_PREFIX.value + util.pack_be_uint64(height_start), header) - self.headers.append(header) - tx_count = self.tx_counts[height_start] - batch_put(DB_PREFIXES.BLOCK_HASH_PREFIX.value + util.pack_be_uint64(height_start), block_hash[::-1]) - batch_put(DB_PREFIXES.TX_COUNT_PREFIX.value + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) - height_start += 1 - offset = 0 - while offset < len(tx_hashes): - batch_put(DB_PREFIXES.TX_HASH_PREFIX.value + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32]) - batch_put(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num)) - batch_put(DB_PREFIXES.TX_PREFIX.value + tx_hashes[offset:offset + 32], txs[offset // 32]) - tx_num += 1 - offset += 32 - flush_data.headers.clear() - flush_data.block_txs.clear() - flush_data.block_hashes.clear() - for staged_change in flush_data.claimtrie_stash: - # print("ADVANCE", staged_change) + + for staged_change in flush_data.put_and_delete_ops: if staged_change.is_put: batch_put(staged_change.key, staged_change.value) else: batch_delete(staged_change.key) - flush_data.claimtrie_stash.clear() - - for undo_ops, height in flush_data.undo: - batch_put(*Prefixes.undo.pack_item(height, undo_ops)) - flush_data.undo.clear() + for delete_key in delete_undo_keys: + batch_delete(delete_key) self.fs_height = flush_data.height self.fs_tx_count = flush_data.tx_count - - # Then history self.hist_flush_count += 1 - flush_id = util.pack_be_uint32(self.hist_flush_count) - unflushed = self.hist_unflushed - - for hashX in sorted(unflushed): - key = hashX + flush_id - batch_put(DB_PREFIXES.HASHX_HISTORY_PREFIX.value + key, unflushed[hashX].tobytes()) - - unflushed.clear() self.hist_unflushed_count = 0 - - ######################### - - # New undo information - for undo_info, height in flush_data.undo_infos: - batch_put(self.undo_key(height), b''.join(undo_info)) - flush_data.undo_infos.clear() - - # Spends - for key in sorted(flush_data.deletes): - batch_delete(key) - flush_data.deletes.clear() - - # New UTXOs - for key, value in flush_data.adds.items(): - # suffix = tx_idx + tx_num - hashX = value[:-12] - suffix = key[-2:] + value[-12:-8] - batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX) - batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:]) - flush_data.adds.clear() - self.utxo_flush_count = self.hist_flush_count self.db_height = flush_data.height self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip - + self.last_flush_tx_count = self.fs_tx_count now = time.time() self.wall_time += now - self.last_flush self.last_flush = now - self.last_flush_tx_count = self.fs_tx_count - self.write_db_state(batch) - def flush_backup(self, flush_data, touched): - """Like flush_dbs() but when backing up. All UTXOs are flushed.""" - assert not flush_data.headers + def flush_backup(self, flush_data): assert not flush_data.block_txs assert flush_data.height < self.db_height assert not self.hist_unflushed @@ -974,82 +833,25 @@ class LevelDB: self.hist_flush_count += 1 nremoves = 0 - undo_ops = RevertableOpStack(self.db.get) - - for (packed_ops, height) in reversed(flush_data.undo): - undo_ops.extend(reversed(RevertableOp.unpack_stack(packed_ops))) - undo_ops.append( - RevertableDelete(*Prefixes.undo.pack_item(height, packed_ops)) - ) - with self.db.write_batch() as batch: batch_put = batch.put batch_delete = batch.delete - - # print("flush undos", flush_data.undo_claimtrie) - for op in undo_ops: + for op in flush_data.put_and_delete_ops: # print("REWIND", op) if op.is_put: batch_put(op.key, op.value) else: batch_delete(op.key) - - flush_data.undo.clear() - while self.fs_height > flush_data.height: self.fs_height -= 1 - self.headers.pop() - tx_count = flush_data.tx_count - for hashX in sorted(touched): - deletes = [] - puts = {} - for key, hist in self.db.iterator(prefix=DB_PREFIXES.HASHX_HISTORY_PREFIX.value + hashX, reverse=True): - k = key[1:] - a = array.array('I') - a.frombytes(hist) - # Remove all history entries >= tx_count - idx = bisect_left(a, tx_count) - nremoves += len(a) - idx - if idx > 0: - puts[k] = a[:idx].tobytes() - break - deletes.append(k) - - for key in deletes: - batch_delete(key) - for key, value in puts.items(): - batch_put(key, value) - - # New undo information - for undo_info, height in flush_data.undo: - batch.put(self.undo_key(height), b''.join(undo_info)) - flush_data.undo.clear() - - # Spends - for key in sorted(flush_data.deletes): - batch_delete(key) - flush_data.deletes.clear() - - # New UTXOs - for key, value in flush_data.adds.items(): - # suffix = tx_idx + tx_num - hashX = value[:-12] - suffix = key[-2:] + value[-12:-8] - batch_put(DB_PREFIXES.HASHX_UTXO_PREFIX.value + key[:4] + suffix, hashX) - batch_put(DB_PREFIXES.UTXO_PREFIX.value + hashX + suffix, value[-8:]) - flush_data.adds.clear() start_time = time.time() - add_count = len(flush_data.adds) - spend_count = len(flush_data.deletes) // 2 - if self.db.for_sync: block_count = flush_data.height - self.db_height tx_count = flush_data.tx_count - self.db_tx_count elapsed = time.time() - start_time self.logger.info(f'flushed {block_count:,d} blocks with ' - f'{tx_count:,d} txs, {add_count:,d} UTXO adds, ' - f'{spend_count:,d} spends in ' + f'{tx_count:,d} txs in ' f'{elapsed:.1f}s, committing...') self.utxo_flush_count = self.hist_flush_count @@ -1121,7 +923,6 @@ class LevelDB: return None, tx_height def _fs_transactions(self, txids: Iterable[str]): - unpack_be_uint64 = util.unpack_be_uint64 tx_counts = self.tx_counts tx_db_get = self.db.get tx_cache = self._tx_and_merkle_cache @@ -1133,14 +934,12 @@ class LevelDB: tx, merkle = cached_tx else: tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = tx_db_get(DB_PREFIXES.TX_NUM_PREFIX.value + tx_hash_bytes) + tx_num = self.transaction_num_mapping.get(tx_hash_bytes) tx = None tx_height = -1 if tx_num is not None: - tx_num = unpack_be_uint64(tx_num) tx_height = bisect_right(tx_counts, tx_num) - if tx_height < self.db_height: - tx = tx_db_get(DB_PREFIXES.TX_PREFIX.value + tx_hash_bytes) + tx = tx_db_get(Prefixes.tx.pack_key(tx_hash_bytes)) if tx_height == -1: merkle = { 'block_height': -1 @@ -1204,67 +1003,10 @@ class LevelDB: def undo_key(self, height: int) -> bytes: """DB key for undo information at the given height.""" - return DB_PREFIXES.UNDO_PREFIX.value + pack('>I', height) + return Prefixes.undo.pack_key(height) - def read_undo_info(self, height): - """Read undo information from a file for the current height.""" - return self.db.get(self.undo_key(height)), self.db.get(Prefixes.undo.pack_key(self.fs_height)) - - def raw_block_prefix(self): - return 'block' - - def raw_block_path(self, height): - return os.path.join(self.env.db_dir, f'{self.raw_block_prefix()}{height:d}') - - async def read_raw_block(self, height): - """Returns a raw block read from disk. Raises FileNotFoundError - if the block isn't on-disk.""" - - def read(): - with util.open_file(self.raw_block_path(height)) as f: - return f.read(-1) - - return await asyncio.get_event_loop().run_in_executor(self.executor, read) - - def write_raw_block(self, block, height): - """Write a raw block to disk.""" - with util.open_truncate(self.raw_block_path(height)) as f: - f.write(block) - # Delete old blocks to prevent them accumulating - try: - del_height = self.min_undo_height(height) - 1 - os.remove(self.raw_block_path(del_height)) - except FileNotFoundError: - pass - - def clear_excess_undo_info(self): - """Clear excess undo info. Only most recent N are kept.""" - min_height = self.min_undo_height(self.db_height) - keys = [] - for key, hist in self.db.iterator(prefix=DB_PREFIXES.UNDO_PREFIX.value): - height, = unpack('>I', key[-4:]) - if height >= min_height: - break - keys.append(key) - - if keys: - with self.db.write_batch() as batch: - for key in keys: - batch.delete(key) - self.logger.info(f'deleted {len(keys):,d} stale undo entries') - - # delete old block files - prefix = self.raw_block_prefix() - paths = [path for path in glob(f'{prefix}[0-9]*') - if len(path) > len(prefix) - and int(path[len(prefix):]) < min_height] - if paths: - for path in paths: - try: - os.remove(path) - except FileNotFoundError: - pass - self.logger.info(f'deleted {len(paths):,d} stale block files') + def read_undo_info(self, height: int): + return self.db.get(Prefixes.undo.pack_key(height)) # -- UTXO database diff --git a/tests/integration/blockchain/test_resolve_command.py b/tests/integration/blockchain/test_resolve_command.py index a78299efb..5f230f53f 100644 --- a/tests/integration/blockchain/test_resolve_command.py +++ b/tests/integration/blockchain/test_resolve_command.py @@ -82,7 +82,7 @@ class BaseResolveTestCase(CommandTestCase): check_supports(c['claimId'], c['supports']) claim_hash = bytes.fromhex(c['claimId']) self.assertEqual(c['validAtHeight'], db.get_activation( - db.total_transactions.index(bytes.fromhex(c['txId'])[::-1]), c['n'] + db.transaction_num_mapping[bytes.fromhex(c['txId'])[::-1]], c['n'] )) self.assertEqual(c['effectiveAmount'], db.get_effective_amount(claim_hash))