diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 7ac468b8b..5289cc8f6 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -2,14 +2,17 @@ import time import asyncio from struct import pack, unpack from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional +from typing import Optional, List, Tuple from prometheus_client import Gauge, Histogram import lbry +from lbry.schema.claim import Claim +from lbry.wallet.server.tx import Tx from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.leveldb import FlushData +from lbry.wallet.transaction import Transaction from lbry.wallet.server.udp import StatusServer @@ -212,15 +215,15 @@ class BlockProcessor: chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]] if hprevs == chain: + start = time.perf_counter() await self.run_in_thread_with_lock(self.advance_blocks, blocks) if self.sql: await self.db.search_index.claim_consumer(self.sql.claim_producer()) for cache in self.search_cache.values(): cache.clear() - self.history_cache.clear() + self.history_cache.clear() # TODO: is this needed? self.notifications.notified_mempool_txs.clear() - await self._maybe_flush() processed_time = time.perf_counter() - start self.block_count_metric.set(self.height) self.block_update_time_metric.observe(processed_time) @@ -423,7 +426,14 @@ class BlockProcessor: self.headers.extend(headers) self.tip = self.coin.header_hash(headers[-1]) - def advance_txs(self, height, txs, header, block_hash): + self.db.flush_dbs(self.flush_data(), True, self.estimate_txs_remaining) + + for cache in self.search_cache.values(): + cache.clear() + self.history_cache.clear() + self.notifications.notified_mempool_txs.clear() + + def advance_txs(self, height, txs: List[Tuple[Tx, bytes]], header, block_hash): self.block_hashes.append(block_hash) self.block_txs.append((b''.join(tx_hash for tx, tx_hash in txs), [tx.raw for tx, _ in txs])) @@ -611,7 +621,6 @@ class BlockProcessor: for hdb_key, hashX in candidates.items(): tx_num_packed = hdb_key[-4:] if len(candidates) > 1: - tx_num, = unpack('= 0 else 0) - assert len(flush_data.block_txs) == len(flush_data.headers) - assert flush_data.height == self.fs_height + len(flush_data.headers) - assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts - else 0) - assert len(self.tx_counts) == flush_data.height + 1 - assert len( - b''.join(hashes for hashes, _ in flush_data.block_txs) - ) // 32 == flush_data.tx_count - prior_tx_count - - # Write the headers - start_time = time.perf_counter() - - with self.db.write_batch() as batch: - batch_put = batch.put - for i, header in enumerate(flush_data.headers): - batch_put(HEADER_PREFIX + util.pack_be_uint64(self.fs_height + i + 1), header) - self.headers.append(header) - flush_data.headers.clear() - - height_start = self.fs_height + 1 - tx_num = prior_tx_count - - with self.db.write_batch() as batch: - batch_put = batch.put - for block_hash, (tx_hashes, txs) in zip(flush_data.block_hashes, flush_data.block_txs): - tx_count = self.tx_counts[height_start] - batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) - batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) - height_start += 1 - offset = 0 - while offset < len(tx_hashes): - batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset+32]) - batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset+32], util.pack_be_uint64(tx_num)) - batch_put(TX_PREFIX + tx_hashes[offset:offset+32], txs[offset // 32]) - tx_num += 1 - offset += 32 - - flush_data.block_txs.clear() - flush_data.block_hashes.clear() - - self.fs_height = flush_data.height - self.fs_tx_count = flush_data.tx_count - elapsed = time.perf_counter() - start_time - self.logger.info(f'flushed filesystem data in {elapsed:.2f}s') - - def flush_history(self): - self.history.flush() - def flush_utxo_db(self, batch, flush_data): """Flush the cached DB writes and UTXO set to the batch.""" # Care is needed because the writes generated by flushing the @@ -384,6 +285,117 @@ class LevelDB: self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip + def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): + """Flush out cached state. History is always flushed; UTXOs are + flushed if flush_utxos.""" + if flush_data.height == self.db_height: + self.assert_flushed(flush_data) + return + + start_time = time.time() + prior_flush = self.last_flush + tx_delta = flush_data.tx_count - self.last_flush_tx_count + + # Flush to file system + # self.flush_fs(flush_data) + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) + + assert len(flush_data.block_txs) == len(flush_data.headers) + assert flush_data.height == self.fs_height + len(flush_data.headers) + assert flush_data.tx_count == (self.tx_counts[-1] if self.tx_counts + else 0) + assert len(self.tx_counts) == flush_data.height + 1 + assert len( + b''.join(hashes for hashes, _ in flush_data.block_txs) + ) // 32 == flush_data.tx_count - prior_tx_count + + + # Write the headers + start_time = time.perf_counter() + + with self.db.write_batch() as batch: + batch_put = batch.put + height_start = self.fs_height + 1 + tx_num = prior_tx_count + for i, (header, block_hash, (tx_hashes, txs)) in enumerate(zip(flush_data.headers, flush_data.block_hashes, flush_data.block_txs)): + batch_put(HEADER_PREFIX + util.pack_be_uint64(height_start), header) + self.headers.append(header) + tx_count = self.tx_counts[height_start] + batch_put(BLOCK_HASH_PREFIX + util.pack_be_uint64(height_start), block_hash[::-1]) + batch_put(TX_COUNT_PREFIX + util.pack_be_uint64(height_start), util.pack_be_uint64(tx_count)) + height_start += 1 + offset = 0 + while offset < len(tx_hashes): + batch_put(TX_HASH_PREFIX + util.pack_be_uint64(tx_num), tx_hashes[offset:offset + 32]) + batch_put(TX_NUM_PREFIX + tx_hashes[offset:offset + 32], util.pack_be_uint64(tx_num)) + batch_put(TX_PREFIX + tx_hashes[offset:offset + 32], txs[offset // 32]) + + tx_num += 1 + offset += 32 + flush_data.headers.clear() + flush_data.block_txs.clear() + flush_data.block_hashes.clear() + # flush_data.claim_txo_cache.clear() + # flush_data.support_txo_cache.clear() + + self.fs_height = flush_data.height + self.fs_tx_count = flush_data.tx_count + + + # Then history + self.history.flush_count += 1 + flush_id = pack_be_uint16(self.history.flush_count) + unflushed = self.history.unflushed + + for hashX in sorted(unflushed): + key = hashX + flush_id + batch_put(HASHX_HISTORY_PREFIX + key, unflushed[hashX].tobytes()) + self.history.write_state(batch) + + unflushed.clear() + self.history.unflushed_count = 0 + + + ######################### + + # Flush state last as it reads the wall time. + if flush_utxos: + self.flush_utxo_db(batch, flush_data) + + # self.flush_state(batch) + # + now = time.time() + self.wall_time += now - self.last_flush + self.last_flush = now + self.last_flush_tx_count = self.fs_tx_count + self.write_utxo_state(batch) + + # # Update and put the wall time again - otherwise we drop the + # # time it took to commit the batch + # # self.flush_state(self.db) + # now = time.time() + # self.wall_time += now - self.last_flush + # self.last_flush = now + # self.last_flush_tx_count = self.fs_tx_count + # self.write_utxo_state(batch) + + elapsed = self.last_flush - start_time + self.logger.info(f'flush #{self.history.flush_count:,d} took ' + f'{elapsed:.1f}s. Height {flush_data.height:,d} ' + f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') + + # Catch-up stats + if self.db.for_sync: + flush_interval = self.last_flush - prior_flush + tx_per_sec_gen = int(flush_data.tx_count / self.wall_time) + tx_per_sec_last = 1 + int(tx_delta / flush_interval) + eta = estimate_txs_remaining() / tx_per_sec_last + self.logger.info(f'tx/sec since genesis: {tx_per_sec_gen:,d}, ' + f'since last flush: {tx_per_sec_last:,d}') + self.logger.info(f'sync time: {formatted_time(self.wall_time)} ' + f'ETA: {formatted_time(eta)}') + def flush_state(self, batch): """Flush chain state to the batch.""" now = time.time()