move Prefetcher to own file

This commit is contained in:
Jack Robison 2022-01-06 12:34:08 -05:00
parent 28be7d8993
commit 776dea58c2
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 140 additions and 129 deletions

View file

@ -1,144 +1,31 @@
import time import time
import asyncio import asyncio
import typing import typing
import signal
from bisect import bisect_right from bisect import bisect_right
from struct import pack, unpack from struct import pack
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List, Tuple, Set, DefaultDict, Dict, NamedTuple from typing import Optional, List, Tuple, Set, DefaultDict, Dict
from prometheus_client import Gauge, Histogram from prometheus_client import Gauge, Histogram
from collections import defaultdict from collections import defaultdict
import lbry import lbry
from lbry.schema.url import URL
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger from lbry.wallet.ledger import Ledger, TestNetLedger, RegTestLedger
from lbry.utils import LRUCache from lbry.utils import LRUCache
from lbry.wallet.transaction import OutputScript, Output, Transaction from lbry.wallet.transaction import OutputScript, Output, Transaction
from lbry.wallet.server.tx import Tx, TxOutput, TxInput from lbry.wallet.server.tx import Tx, TxOutput, TxInput
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str
from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN from lbry.wallet.server.util import class_logger
from lbry.wallet.server.util import chunks, class_logger
from lbry.crypto.hash import hash160 from lbry.crypto.hash import hash160
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.db.common import TrendingNotification
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.prefetcher import Prefetcher
from lbry.wallet.server.db.revertable import RevertableOpStack from lbry.wallet.server.db.db import HubDB
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.db.revertable import RevertableOpStack
class Prefetcher:
"""Prefetches blocks (in the forward direction only)."""
def __init__(self, daemon, coin, blocks_event):
self.logger = class_logger(__name__, self.__class__.__name__)
self.daemon = daemon
self.coin = coin
self.blocks_event = blocks_event
self.blocks = []
self.caught_up = False
# Access to fetched_height should be protected by the semaphore
self.fetched_height = None
self.semaphore = asyncio.Semaphore()
self.refill_event = asyncio.Event()
# The prefetched block cache size. The min cache size has
# little effect on sync time.
self.cache_size = 0
self.min_cache_size = 10 * 1024 * 1024
# This makes the first fetch be 10 blocks
self.ave_size = self.min_cache_size // 10
self.polling_delay = 5
async def main_loop(self, bp_height):
"""Loop forever polling for more blocks."""
await self.reset_height(bp_height)
while True:
try:
# Sleep a while if there is nothing to prefetch
await self.refill_event.wait()
if not await self._prefetch_blocks():
await asyncio.sleep(self.polling_delay)
except DaemonError as e:
self.logger.info(f'ignoring daemon error: {e}')
def get_prefetched_blocks(self):
"""Called by block processor when it is processing queued blocks."""
blocks = self.blocks
self.blocks = []
self.cache_size = 0
self.refill_event.set()
return blocks
async def reset_height(self, height):
"""Reset to prefetch blocks from the block processor's height.
Used in blockchain reorganisations. This coroutine can be
called asynchronously to the _prefetch_blocks coroutine so we
must synchronize with a semaphore.
"""
async with self.semaphore:
self.blocks.clear()
self.cache_size = 0
self.fetched_height = height
self.refill_event.set()
daemon_height = await self.daemon.height()
behind = daemon_height - height
if behind > 0:
self.logger.info(f'catching up to daemon height {daemon_height:,d} '
f'({behind:,d} blocks behind)')
else:
self.logger.info(f'caught up to daemon height {daemon_height:,d}')
async def _prefetch_blocks(self):
"""Prefetch some blocks and put them on the queue.
Repeats until the queue is full or caught up.
"""
daemon = self.daemon
daemon_height = await daemon.height()
async with self.semaphore:
while self.cache_size < self.min_cache_size:
# Try and catch up all blocks but limit to room in cache.
# Constrain fetch count to between 0 and 500 regardless;
# testnet can be lumpy.
cache_room = self.min_cache_size // self.ave_size
count = min(daemon_height - self.fetched_height, cache_room)
count = min(500, max(count, 0))
if not count:
self.caught_up = True
return False
first = self.fetched_height + 1
hex_hashes = await daemon.block_hex_hashes(first, count)
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count-1, hex_hashes[-1]))
blocks = await daemon.raw_blocks(hex_hashes)
assert count == len(blocks)
# Special handling for genesis block
if first == 0:
blocks[0] = self.coin.genesis_block(blocks[0])
self.logger.info(f'verified genesis block with hash {hex_hashes[0]}')
# Update our recent average block size estimate
size = sum(len(block) for block in blocks)
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
self.blocks.extend(blocks)
self.cache_size += size
self.fetched_height += count
self.blocks_event.set()
self.refill_event.clear()
return True
class ChainError(Exception): class ChainError(Exception):
@ -193,15 +80,20 @@ class BlockProcessor:
"reorg_count", "Number of reorgs", namespace=NAMESPACE "reorg_count", "Number of reorgs", namespace=NAMESPACE
) )
def __init__(self, env, db: 'LevelDB', daemon, shutdown_event: asyncio.Event): def __init__(self, env: 'Env'):
self.state_lock = asyncio.Lock() self.cancellable_tasks = []
self.env = env self.env = env
self.db = db self.state_lock = asyncio.Lock()
self.daemon = daemon self.daemon = env.coin.DAEMON(env.coin, env.daemon_url)
self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
max_open_files=env.db_max_open_files
)
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync') self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = shutdown_event self.shutdown_event = asyncio.Event()
self.coin = env.coin self.coin = env.coin
if env.coin.NET == 'mainnet': if env.coin.NET == 'mainnet':
self.ledger = Ledger self.ledger = Ledger
@ -216,7 +108,7 @@ class BlockProcessor:
self.tx_count = 0 self.tx_count = 0
self.blocks_event = asyncio.Event() self.blocks_event = asyncio.Event()
self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.prefetcher = Prefetcher(self.daemon, env.coin, self.blocks_event)
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
# Meta # Meta

View file

@ -0,0 +1,119 @@
import asyncio
import typing
from lbry.wallet.server.util import chunks, class_logger
if typing.TYPE_CHECKING:
from lbry.wallet.server.daemon import LBCDaemon
from lbry.wallet.server.coin import Coin
class Prefetcher:
"""Prefetches blocks (in the forward direction only)."""
def __init__(self, daemon: 'LBCDaemon', coin: 'Coin', blocks_event: asyncio.Event):
self.logger = class_logger(__name__, self.__class__.__name__)
self.daemon = daemon
self.coin = coin
self.blocks_event = blocks_event
self.blocks = []
self.caught_up = False
# Access to fetched_height should be protected by the semaphore
self.fetched_height = None
self.semaphore = asyncio.Semaphore()
self.refill_event = asyncio.Event()
# The prefetched block cache size. The min cache size has
# little effect on sync time.
self.cache_size = 0
self.min_cache_size = 10 * 1024 * 1024
# This makes the first fetch be 10 blocks
self.ave_size = self.min_cache_size // 10
self.polling_delay = 5
async def main_loop(self, bp_height):
"""Loop forever polling for more blocks."""
await self.reset_height(bp_height)
try:
while True:
# Sleep a while if there is nothing to prefetch
await self.refill_event.wait()
if not await self._prefetch_blocks():
await asyncio.sleep(self.polling_delay)
finally:
self.logger.info("block pre-fetcher is shutting down")
def get_prefetched_blocks(self):
"""Called by block processor when it is processing queued blocks."""
blocks = self.blocks
self.blocks = []
self.cache_size = 0
self.refill_event.set()
return blocks
async def reset_height(self, height):
"""Reset to prefetch blocks from the block processor's height.
Used in blockchain reorganisations. This coroutine can be
called asynchronously to the _prefetch_blocks coroutine so we
must synchronize with a semaphore.
"""
async with self.semaphore:
self.blocks.clear()
self.cache_size = 0
self.fetched_height = height
self.refill_event.set()
daemon_height = await self.daemon.height()
behind = daemon_height - height
if behind > 0:
self.logger.info(f'catching up to daemon height {daemon_height:,d} '
f'({behind:,d} blocks behind)')
else:
self.logger.info(f'caught up to daemon height {daemon_height:,d}')
async def _prefetch_blocks(self):
"""Prefetch some blocks and put them on the queue.
Repeats until the queue is full or caught up.
"""
daemon = self.daemon
daemon_height = await daemon.height()
async with self.semaphore:
while self.cache_size < self.min_cache_size:
# Try and catch up all blocks but limit to room in cache.
# Constrain fetch count to between 0 and 500 regardless;
# testnet can be lumpy.
cache_room = self.min_cache_size // self.ave_size
count = min(daemon_height - self.fetched_height, cache_room)
count = min(500, max(count, 0))
if not count:
self.caught_up = True
return False
first = self.fetched_height + 1
hex_hashes = await daemon.block_hex_hashes(first, count)
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count-1, hex_hashes[-1]))
blocks = await daemon.raw_blocks(hex_hashes)
assert count == len(blocks)
# Special handling for genesis block
if first == 0:
blocks[0] = self.coin.genesis_block(blocks[0])
self.logger.info(f'verified genesis block with hash {hex_hashes[0]}')
# Update our recent average block size estimate
size = sum(len(block) for block in blocks)
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
self.blocks.extend(blocks)
self.cache_size += size
self.fetched_height += count
self.blocks_event.set()
self.refill_event.clear()
return True