From 358fa21eaf6c844adfb021f74068fd7480702d08 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 6 Jan 2022 12:34:08 -0500 Subject: [PATCH] move Prefetcher to own file --- lbry/wallet/server/block_processor.py | 150 ++++---------------------- lbry/wallet/server/prefetcher.py | 119 ++++++++++++++++++++ 2 files changed, 140 insertions(+), 129 deletions(-) create mode 100644 lbry/wallet/server/prefetcher.py diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index a9aa1e7e8..52965968c 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1,144 +1,31 @@ import time import asyncio import typing +import signal + from bisect import bisect_right -from struct import pack, unpack +from struct import pack from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple +from typing import Optional, List, Tuple, Set, DefaultDict, Dict from prometheus_client import Gauge, Histogram from collections import defaultdict import lbry -from lbry.schema.url import URL from lbry.schema.claim import Claim from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger from lbry.utils import LRUCache from lbry.wallet.transaction import OutputScript, Output, Transaction from lbry.wallet.server.tx import Tx, TxOutput, TxInput -from lbry.wallet.server.daemon import DaemonError -from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN -from lbry.wallet.server.util import chunks, class_logger +from lbry.wallet.server.hash import hash_to_hex_str +from lbry.wallet.server.util import class_logger from lbry.crypto.hash import hash160 -from lbry.wallet.server.mempool import MemPool -from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue -from lbry.wallet.server.udp import StatusServer -from lbry.wallet.server.db.revertable import RevertableOpStack +from lbry.wallet.server.prefetcher import Prefetcher +from lbry.wallet.server.db.db import HubDB + if typing.TYPE_CHECKING: - from lbry.wallet.server.leveldb import LevelDB - - -class Prefetcher: - """Prefetches blocks (in the forward direction only).""" - - def __init__(self, daemon, coin, blocks_event): - self.logger = class_logger(__name__, self.__class__.__name__) - self.daemon = daemon - self.coin = coin - self.blocks_event = blocks_event - self.blocks = [] - self.caught_up = False - # Access to fetched_height should be protected by the semaphore - self.fetched_height = None - self.semaphore = asyncio.Semaphore() - self.refill_event = asyncio.Event() - # The prefetched block cache size. The min cache size has - # little effect on sync time. - self.cache_size = 0 - self.min_cache_size = 10 * 1024 * 1024 - # This makes the first fetch be 10 blocks - self.ave_size = self.min_cache_size // 10 - self.polling_delay = 5 - - async def main_loop(self, bp_height): - """Loop forever polling for more blocks.""" - await self.reset_height(bp_height) - while True: - try: - # Sleep a while if there is nothing to prefetch - await self.refill_event.wait() - if not await self._prefetch_blocks(): - await asyncio.sleep(self.polling_delay) - except DaemonError as e: - self.logger.info(f'ignoring daemon error: {e}') - - def get_prefetched_blocks(self): - """Called by block processor when it is processing queued blocks.""" - blocks = self.blocks - self.blocks = [] - self.cache_size = 0 - self.refill_event.set() - return blocks - - async def reset_height(self, height): - """Reset to prefetch blocks from the block processor's height. - - Used in blockchain reorganisations. This coroutine can be - called asynchronously to the _prefetch_blocks coroutine so we - must synchronize with a semaphore. - """ - async with self.semaphore: - self.blocks.clear() - self.cache_size = 0 - self.fetched_height = height - self.refill_event.set() - - daemon_height = await self.daemon.height() - behind = daemon_height - height - if behind > 0: - self.logger.info(f'catching up to daemon height {daemon_height:,d} ' - f'({behind:,d} blocks behind)') - else: - self.logger.info(f'caught up to daemon height {daemon_height:,d}') - - async def _prefetch_blocks(self): - """Prefetch some blocks and put them on the queue. - - Repeats until the queue is full or caught up. - """ - daemon = self.daemon - daemon_height = await daemon.height() - async with self.semaphore: - while self.cache_size < self.min_cache_size: - # Try and catch up all blocks but limit to room in cache. - # Constrain fetch count to between 0 and 500 regardless; - # testnet can be lumpy. - cache_room = self.min_cache_size // self.ave_size - count = min(daemon_height - self.fetched_height, cache_room) - count = min(500, max(count, 0)) - if not count: - self.caught_up = True - return False - - first = self.fetched_height + 1 - hex_hashes = await daemon.block_hex_hashes(first, count) - if self.caught_up: - self.logger.info('new block height {:,d} hash {}' - .format(first + count-1, hex_hashes[-1])) - blocks = await daemon.raw_blocks(hex_hashes) - - assert count == len(blocks) - - # Special handling for genesis block - if first == 0: - blocks[0] = self.coin.genesis_block(blocks[0]) - self.logger.info(f'verified genesis block with hash {hex_hashes[0]}') - - # Update our recent average block size estimate - size = sum(len(block) for block in blocks) - if count >= 10: - self.ave_size = size // count - else: - self.ave_size = (size + (10 - count) * self.ave_size) // 10 - - self.blocks.extend(blocks) - self.cache_size += size - self.fetched_height += count - self.blocks_event.set() - - self.refill_event.clear() - return True + from lbry.wallet.server.db.revertable import RevertableOpStack class ChainError(Exception): @@ -193,15 +80,20 @@ class BlockProcessor: "reorg_count", "Number of reorgs", namespace=NAMESPACE ) - def __init__(self, env, db: 'LevelDB', daemon, shutdown_event: asyncio.Event): - self.state_lock = asyncio.Lock() + def __init__(self, env: 'Env'): + self.cancellable_tasks = [] + self.env = env - self.db = db - self.daemon = daemon + self.state_lock = asyncio.Lock() + self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) + self.db = HubDB( + env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, + max_open_files=env.db_max_open_files + ) self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync') self.mempool = MemPool(env.coin, daemon, db, self.state_lock) - self.shutdown_event = shutdown_event + self.shutdown_event = asyncio.Event() self.coin = env.coin if env.coin.NET == 'mainnet': self.ledger = Ledger @@ -216,7 +108,7 @@ class BlockProcessor: self.tx_count = 0 self.blocks_event = asyncio.Event() - self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) + self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event) self.logger = class_logger(__name__, self.__class__.__name__) # Meta diff --git a/lbry/wallet/server/prefetcher.py b/lbry/wallet/server/prefetcher.py new file mode 100644 index 000000000..7d986b0c3 --- /dev/null +++ b/lbry/wallet/server/prefetcher.py @@ -0,0 +1,119 @@ +import asyncio +import typing + +from lbry.wallet.server.util import chunks, class_logger +if typing.TYPE_CHECKING: + from lbry.wallet.server.daemon import LBCDaemon + from lbry.wallet.server.coin import Coin + + +class Prefetcher: + """Prefetches blocks (in the forward direction only).""" + + def __init__(self, daemon: 'LBCDaemon', coin: 'Coin', blocks_event: asyncio.Event): + self.logger = class_logger(__name__, self.__class__.__name__) + self.daemon = daemon + self.coin = coin + self.blocks_event = blocks_event + self.blocks = [] + self.caught_up = False + # Access to fetched_height should be protected by the semaphore + self.fetched_height = None + self.semaphore = asyncio.Semaphore() + self.refill_event = asyncio.Event() + # The prefetched block cache size. The min cache size has + # little effect on sync time. + self.cache_size = 0 + self.min_cache_size = 10 * 1024 * 1024 + # This makes the first fetch be 10 blocks + self.ave_size = self.min_cache_size // 10 + self.polling_delay = 5 + + async def main_loop(self, bp_height): + """Loop forever polling for more blocks.""" + await self.reset_height(bp_height) + try: + while True: + # Sleep a while if there is nothing to prefetch + await self.refill_event.wait() + if not await self._prefetch_blocks(): + await asyncio.sleep(self.polling_delay) + finally: + self.logger.info("block pre-fetcher is shutting down") + + def get_prefetched_blocks(self): + """Called by block processor when it is processing queued blocks.""" + blocks = self.blocks + self.blocks = [] + self.cache_size = 0 + self.refill_event.set() + return blocks + + async def reset_height(self, height): + """Reset to prefetch blocks from the block processor's height. + + Used in blockchain reorganisations. This coroutine can be + called asynchronously to the _prefetch_blocks coroutine so we + must synchronize with a semaphore. + """ + async with self.semaphore: + self.blocks.clear() + self.cache_size = 0 + self.fetched_height = height + self.refill_event.set() + + daemon_height = await self.daemon.height() + behind = daemon_height - height + if behind > 0: + self.logger.info(f'catching up to daemon height {daemon_height:,d} ' + f'({behind:,d} blocks behind)') + else: + self.logger.info(f'caught up to daemon height {daemon_height:,d}') + + async def _prefetch_blocks(self): + """Prefetch some blocks and put them on the queue. + + Repeats until the queue is full or caught up. + """ + daemon = self.daemon + daemon_height = await daemon.height() + async with self.semaphore: + while self.cache_size < self.min_cache_size: + # Try and catch up all blocks but limit to room in cache. + # Constrain fetch count to between 0 and 500 regardless; + # testnet can be lumpy. + cache_room = self.min_cache_size // self.ave_size + count = min(daemon_height - self.fetched_height, cache_room) + count = min(500, max(count, 0)) + if not count: + self.caught_up = True + return False + + first = self.fetched_height + 1 + hex_hashes = await daemon.block_hex_hashes(first, count) + if self.caught_up: + self.logger.info('new block height {:,d} hash {}' + .format(first + count-1, hex_hashes[-1])) + blocks = await daemon.raw_blocks(hex_hashes) + + assert count == len(blocks) + + # Special handling for genesis block + if first == 0: + blocks[0] = self.coin.genesis_block(blocks[0]) + self.logger.info(f'verified genesis block with hash {hex_hashes[0]}') + + # Update our recent average block size estimate + size = sum(len(block) for block in blocks) + if count >= 10: + self.ave_size = size // count + else: + self.ave_size = (size + (10 - count) * self.ave_size) // 10 + + self.blocks.extend(blocks) + self.cache_size += size + self.fetched_height += count + self.blocks_event.set() + + self.refill_event.clear() + return True