diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 827f654db..f988b15aa 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -92,7 +92,6 @@ class BlockProcessor: ) self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync') - self.mempool = MemPool(env.coin, daemon, db, self.state_lock) self.shutdown_event = asyncio.Event() self.coin = env.coin if env.coin.NET == 'mainnet': @@ -118,14 +117,13 @@ class BlockProcessor: self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {} # Claimtrie cache - self.db_op_stack: Optional[RevertableOpStack] = None + self.db_op_stack: Optional['RevertableOpStack'] = None # self.search_cache = {} self.resolve_cache = LRUCache(2**16) self.resolve_outputs_cache = LRUCache(2 ** 16) self.history_cache = {} - self.status_server = StatusServer() ################################# # attributes used for calculating stake activations and takeovers per block @@ -162,7 +160,6 @@ class BlockProcessor: self.removed_claims_to_send_es = set() # cumulative changes across blocks to send ES self.touched_claims_to_send_es = set() - self.activation_info_to_send_es: DefaultDict[str, List[TrendingNotification]] = defaultdict(list) self.removed_claim_hashes: Set[bytes] = set() # per block changes self.touched_claim_hashes: Set[bytes] = set() @@ -251,29 +248,12 @@ class BlockProcessor: await self.run_in_thread(self.advance_block, block) await self.flush() - self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) + self.logger.warning("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start) if self.height == self.coin.nExtendedClaimExpirationForkHeight: self.logger.warning( "applying extended claim expiration fork on claims accepted by, %i", self.height ) await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork) - if self.db.first_sync: - self.db.search_index.clear_caches() - self.touched_claims_to_send_es.clear() - self.removed_claims_to_send_es.clear() - self.activation_info_to_send_es.clear() - # TODO: we shouldnt wait on the search index updating before advancing to the next block - if not self.db.first_sync: - await self.db.reload_blocking_filtering_streams() - await self.db.search_index.claim_consumer(self.claim_producer()) - await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels, - self.db.filtered_streams, self.db.filtered_channels) - await self.db.search_index.update_trending_score(self.activation_info_to_send_es) - await self._es_caught_up() - self.db.search_index.clear_caches() - self.touched_claims_to_send_es.clear() - self.removed_claims_to_send_es.clear() - self.activation_info_to_send_es.clear() # print("******************\n") except: self.logger.exception("advance blocks failed") @@ -281,12 +261,9 @@ class BlockProcessor: processed_time = time.perf_counter() - total_start self.block_count_metric.set(self.height) self.block_update_time_metric.observe(processed_time) - self.status_server.set_height(self.db.fs_height, self.db.db_tip) if not self.db.first_sync: s = '' if len(blocks) == 1 else 's' self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) - if self._caught_up_event.is_set(): - await self.mempool.on_block(self.touched_hashXs, self.height) self.touched_hashXs.clear() elif hprevs[0] != chain[0]: min_start_height = max(self.height - self.coin.REORG_LIMIT, 0) @@ -309,15 +286,6 @@ class BlockProcessor: if self.env.cache_all_claim_txos: await self.db._read_claim_txos() # TODO: don't do this - for touched in self.touched_claims_to_send_es: - if not self.db.get_claim_txo(touched): - self.removed_claims_to_send_es.add(touched) - self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es) - await self.db.search_index.claim_consumer(self.claim_producer()) - self.db.search_index.clear_caches() - self.touched_claims_to_send_es.clear() - self.removed_claims_to_send_es.clear() - self.activation_info_to_send_es.clear() await self.prefetcher.reset_height(self.height) self.reorg_count_metric.inc() except: @@ -652,8 +620,6 @@ class BlockProcessor: self.support_txo_to_claim.pop(support_txo_to_clear) self.support_txos_by_claim[claim_hash].clear() self.support_txos_by_claim.pop(claim_hash) - if claim_hash.hex() in self.activation_info_to_send_es: - self.activation_info_to_send_es.pop(claim_hash.hex()) if normalized_name.startswith('@'): # abandon a channel, invalidate signatures self._invalidate_channel_signatures(claim_hash) @@ -1226,10 +1192,6 @@ class BlockProcessor: self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(winning) - def _add_claim_activation_change_notification(self, claim_id: str, height: int, prev_amount: int, - new_amount: int): - self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, prev_amount, new_amount)) - def _get_cumulative_update_ops(self, height: int): # update the last takeover height for names with takeovers for name in self.taken_over_names: @@ -1493,7 +1455,6 @@ class BlockProcessor: self.utxo_cache.clear() self.hashXs_by_tx.clear() self.history_cache.clear() - self.mempool.notified_mempool_txs.clear() self.removed_claim_hashes.clear() self.touched_claim_hashes.clear() self.pending_reposted.clear() @@ -1518,8 +1479,8 @@ class BlockProcessor: self.logger.info("backup block %i", self.height) # Check and update self.tip - self.db.headers.pop() self.db.tx_counts.pop() + reverted_block_hash = self.coin.header_hash(self.db.headers.pop()) self.tip = self.coin.header_hash(self.db.headers[-1]) if self.env.cache_all_tx_hashes: while len(self.db.total_transactions) > self.db.tx_counts[-1]: @@ -1613,21 +1574,32 @@ class BlockProcessor: self.touched_hashXs.add(hashX) return hashX - async def _process_prefetched_blocks(self): + async def process_blocks_and_mempool_forever(self): """Loop forever processing blocks as they arrive.""" while True: if self.height == self.daemon.cached_height(): if not self._caught_up_event.is_set(): await self._first_caught_up() self._caught_up_event.set() - await self.blocks_event.wait() + try: + await asyncio.wait_for(self.blocks_event.wait(), 0.25) + except asyncio.TimeoutError: + pass self.blocks_event.clear() blocks = self.prefetcher.get_prefetched_blocks() - try: - await self.check_and_advance_blocks(blocks) - except Exception: - self.logger.exception("error while processing txs") - raise + if not blocks: + try: + await self.check_mempool() + self.db.prefix_db.unsafe_commit() + except Exception: + self.logger.exception("error while updating mempool txs") + raise + else: + try: + await self.check_and_advance_blocks(blocks) + except Exception: + self.logger.exception("error while processing txs") + raise async def _es_caught_up(self): self.db.es_sync_height = self.height @@ -1659,6 +1631,13 @@ class BlockProcessor: f'height {self.height:,d}, halting here.') self.shutdown_event.set() + async def open(self): + self.db.open_db() + self.height = self.db.db_height + self.tip = self.db.db_tip + self.tx_count = self.db.db_tx_count + await self.db.initialize_caches() + async def fetch_and_process_blocks(self, caught_up_event): """Fetch, process and index blocks from the daemon. @@ -1674,16 +1653,9 @@ class BlockProcessor: self._caught_up_event = caught_up_event try: - self.db.open_db() - self.height = self.db.db_height - self.tip = self.db.db_tip - self.tx_count = self.db.db_tx_count - self.status_server.set_height(self.db.fs_height, self.db.db_tip) - await self.db.initialize_caches() - await self.db.search_index.start() await asyncio.wait([ self.prefetcher.main_loop(self.height), - self._process_prefetched_blocks() + self.process_blocks_and_mempool_forever() ]) except asyncio.CancelledError: raise @@ -1691,9 +1663,47 @@ class BlockProcessor: self.logger.exception("Block processing failed!") raise finally: - self.status_server.stop() # Shut down block processing self.logger.info('closing the DB for a clean shutdown...') self._sync_reader_executor.shutdown(wait=True) self._chain_executor.shutdown(wait=True) self.db.close() + + async def start(self): + env = self.env + min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() + self.logger.info(f'software version: {lbry.__version__}') + self.logger.info(f'supported protocol versions: {min_str}-{max_str}') + self.logger.info(f'event loop policy: {env.loop_policy}') + self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks') + + await self.daemon.height() + + def _start_cancellable(run, *args): + _flag = asyncio.Event() + self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) + return _flag.wait() + + await _start_cancellable(self.fetch_and_process_blocks) + + async def stop(self): + for task in reversed(self.cancellable_tasks): + task.cancel() + await asyncio.wait(self.cancellable_tasks) + self.shutdown_event.set() + await self.daemon.close() + + def run(self): + loop = asyncio.get_event_loop() + + def __exit(): + raise SystemExit() + try: + loop.add_signal_handler(signal.SIGINT, __exit) + loop.add_signal_handler(signal.SIGTERM, __exit) + loop.run_until_complete(self.start()) + loop.run_until_complete(self.shutdown_event.wait()) + except (SystemExit, KeyboardInterrupt): + pass + finally: + loop.run_until_complete(self.stop()) diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py new file mode 100644 index 000000000..5f4a31aad --- /dev/null +++ b/lbry/wallet/server/chain_reader.py @@ -0,0 +1,243 @@ +import signal +import logging +import asyncio +from concurrent.futures.thread import ThreadPoolExecutor +import typing + +import lbry +from lbry.wallet.server.mempool import MemPool +from lbry.wallet.server.db.prefixes import DBState +from lbry.wallet.server.udp import StatusServer +from lbry.wallet.server.db.db import HubDB +from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierClientProtocol +from lbry.wallet.server.session import LBRYSessionManager +from lbry.prometheus import PrometheusServer + + +class BlockchainReader: + def __init__(self, env, secondary_name: str): + self.env = env + self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) + self.shutdown_event = asyncio.Event() + self.cancellable_tasks = [] + + self.db = HubDB( + env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes, + secondary_name=secondary_name, max_open_files=-1 + ) + self.last_state: typing.Optional[DBState] = None + self._refresh_interval = 0.1 + + def _detect_changes(self): + try: + self.db.prefix_db.try_catch_up_with_primary() + except: + self.log.exception('failed to update secondary db') + raise + state = self.db.prefix_db.db_state.get() + if not state or state.height <= 0: + return + # if state and self.last_state and self.db.headers and self.last_state.tip == self.db.coin.header_hash(self.db.headers[-1]): + # return + if self.last_state and self.last_state.height > state.height: # FIXME: derp + self.log.debug("reorg detected, waiting until the writer has flushed the new blocks to advance") + return + last_height = 0 if not self.last_state else self.last_state.height + if self.last_state: + while True: + if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False): + self.log.debug("connects to block %i", last_height) + break + else: + self.log.warning("disconnect block %i", last_height) + self.unwind() + last_height -= 1 + self.db.read_db_state() + if not self.last_state or self.last_state.height < state.height: + for height in range(last_height + 1, state.height + 1): + self.log.warning("advancing to %i", height) + self.advance(height) + self.clear_caches() + self.last_state = state + + # elif self.last_state and self.last_state.height > state.height: + # last_height = self.last_state.height + # for height in range(last_height, state.height, -1): + # self.log.warning("unwind %i", height) + # self.unwind() + # self.clear_caches() + # self.last_state = state + # self.log.warning("unwound to %i", self.last_state.height) + + # print("reader rewound to ", self.last_state.height) + + async def poll_for_changes(self): + await asyncio.get_event_loop().run_in_executor(None, self._detect_changes) + + async def refresh_blocks_forever(self, synchronized: asyncio.Event): + self.log.warning("start refresh blocks forever") + while True: + try: + await self.poll_for_changes() + except: + self.log.exception("boom") + raise + await asyncio.sleep(self._refresh_interval) + synchronized.set() + + def clear_caches(self): + pass + + def advance(self, height: int): + tx_count = self.db.prefix_db.tx_count.get(height).tx_count + assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts' + assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}" + self.db.tx_counts.append(tx_count) + self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False)) + + def unwind(self): + self.db.tx_counts.pop() + self.db.headers.pop() + + +class BlockchainReaderServer(BlockchainReader): + def __init__(self, env): + super().__init__(env, 'lbry-reader') + self.history_cache = {} + self.resolve_outputs_cache = {} + self.resolve_cache = {} + self.notifications_to_send = [] + self.status_server = StatusServer() + self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) # only needed for broadcasting txs + self.prometheus_server: typing.Optional[PrometheusServer] = None + self.mempool = MemPool(self.env.coin, self.db) + self.session_manager = LBRYSessionManager( + env, self.db, self.mempool, self.history_cache, self.resolve_cache, + self.resolve_outputs_cache, self.daemon, + self.shutdown_event, + on_available_callback=self.status_server.set_available, + on_unavailable_callback=self.status_server.set_unavailable + ) + self.mempool.session_manager = self.session_manager + self.es_notifications = asyncio.Queue() + self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications) + self.synchronized = asyncio.Event() + self._es_height = None + + def clear_caches(self): + self.history_cache.clear() + self.resolve_outputs_cache.clear() + self.resolve_cache.clear() + # self.clear_search_cache() + # self.mempool.notified_mempool_txs.clear() + + def clear_search_cache(self): + self.session_manager.search_index.clear_caches() + + def advance(self, height: int): + super().advance(height) + touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs + self.notifications_to_send.append((set(touched_hashXs), height)) + + def _detect_changes(self): + super()._detect_changes() + self.mempool.raw_mempool.clear() + self.mempool.raw_mempool.update( + {k.tx_hash: v.raw_tx for k, v in self.db.prefix_db.mempool_tx.iterate()} + ) + + async def poll_for_changes(self): + await super().poll_for_changes() + self.status_server.set_height(self.db.fs_height, self.db.db_tip) + if self.notifications_to_send: + for (touched, height) in self.notifications_to_send: + await self.mempool.on_block(touched, height) + self.log.warning("reader advanced to %i", height) + if self._es_height == self.db.db_height: + self.synchronized.set() + # print("reader notified") + await self.mempool.refresh_hashes(self.db.db_height) + self.notifications_to_send.clear() + + async def receive_es_notifications(self, synchronized: asyncio.Event): + await asyncio.get_event_loop().create_connection( + lambda: self.es_notification_client, '127.0.0.1', self.env.elastic_notifier_port + ) + synchronized.set() + try: + while True: + self._es_height = await self.es_notifications.get() + self.clear_search_cache() + if self._es_height == self.db.db_height: + self.synchronized.set() + self.log.warning("es and reader are in sync") + else: + self.log.warning("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height) + finally: + self.es_notification_client.close() + + async def start(self): + env = self.env + min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() + self.log.info(f'software version: {lbry.__version__}') + self.log.info(f'supported protocol versions: {min_str}-{max_str}') + self.log.info(f'event loop policy: {env.loop_policy}') + self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks') + await self.daemon.height() + + def _start_cancellable(run, *args): + _flag = asyncio.Event() + self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) + return _flag.wait() + + self.db.open_db() + await self.db.initialize_caches() + + self.last_state = self.db.read_db_state() + + await self.start_prometheus() + if self.env.udp_port: + await self.status_server.start( + 0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country, + self.env.host, self.env.udp_port, self.env.allow_lan_udp + ) + await _start_cancellable(self.receive_es_notifications) + await _start_cancellable(self.refresh_blocks_forever) + await self.session_manager.search_index.start() + await _start_cancellable(self.session_manager.serve, self.mempool) + + async def stop(self): + self.status_server.stop() + for task in reversed(self.cancellable_tasks): + task.cancel() + await asyncio.wait(self.cancellable_tasks) + self.session_manager.search_index.stop() + self.db.close() + if self.prometheus_server: + await self.prometheus_server.stop() + self.prometheus_server = None + self.shutdown_event.set() + await self.daemon.close() + + def run(self): + loop = asyncio.get_event_loop() + executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker') + loop.set_default_executor(executor) + + def __exit(): + raise SystemExit() + try: + loop.add_signal_handler(signal.SIGINT, __exit) + loop.add_signal_handler(signal.SIGTERM, __exit) + loop.run_until_complete(self.start()) + loop.run_until_complete(self.shutdown_event.wait()) + except (SystemExit, KeyboardInterrupt): + pass + finally: + loop.run_until_complete(self.stop()) + executor.shutdown(True) + + async def start_prometheus(self): + if not self.prometheus_server and self.env.prometheus_port: + self.prometheus_server = PrometheusServer() + await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port) diff --git a/lbry/wallet/server/coin.py b/lbry/wallet/server/coin.py index 059b95502..afe2a9423 100644 --- a/lbry/wallet/server/coin.py +++ b/lbry/wallet/server/coin.py @@ -13,7 +13,6 @@ from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_ from lbry.wallet.server.daemon import Daemon, LBCDaemon from lbry.wallet.server.script import ScriptPubKey, OpCodes from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager -from lbry.wallet.server.block_processor import BlockProcessor Block = namedtuple("Block", "raw header transactions") @@ -37,7 +36,6 @@ class Coin: SESSIONCLS = LBRYElectrumX DESERIALIZER = lib_tx.Deserializer DAEMON = Daemon - BLOCK_PROCESSOR = BlockProcessor SESSION_MANAGER = LBRYSessionManager HEADER_VALUES = [ 'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce' diff --git a/lbry/wallet/server/mempool.py b/lbry/wallet/server/mempool.py index 27d7a2352..cecb9f299 100644 --- a/lbry/wallet/server/mempool.py +++ b/lbry/wallet/server/mempool.py @@ -11,14 +11,13 @@ import itertools import time import attr import typing -from typing import Set, Optional, Callable, Awaitable from collections import defaultdict from prometheus_client import Histogram -from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash -from lbry.wallet.server.util import class_logger, chunks -from lbry.wallet.server.leveldb import UTXO +from lbry.wallet.server.util import class_logger + if typing.TYPE_CHECKING: from lbry.wallet.server.session import LBRYSessionManager + from wallet.server.db.db import LevelDB @attr.s(slots=True) @@ -50,70 +49,99 @@ mempool_process_time_metric = Histogram( class MemPool: - def __init__(self, coin, daemon, db, state_lock: asyncio.Lock, refresh_secs=1.0, log_status_secs=120.0): + def __init__(self, coin, db: 'LevelDB', refresh_secs=1.0): self.coin = coin - self._daemon = daemon self._db = db - self._touched_mp = {} - self._touched_bp = {} - self._highest_block = -1 - self.logger = class_logger(__name__, self.__class__.__name__) self.txs = {} - self.hashXs = defaultdict(set) # None can be a key - self.cached_compact_histogram = [] + self.raw_mempool = {} + self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key self.refresh_secs = refresh_secs - self.log_status_secs = log_status_secs - # Prevents mempool refreshes during fee histogram calculation - self.lock = state_lock - self.wakeup = asyncio.Event() self.mempool_process_time_metric = mempool_process_time_metric - self.notified_mempool_txs = set() - self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None + self.session_manager: typing.Optional['LBRYSessionManager'] = None - async def _logging(self, synchronized_event): - """Print regular logs of mempool stats.""" - self.logger.info('beginning processing of daemon mempool. ' - 'This can take some time...') + async def refresh_hashes(self, height: int): start = time.perf_counter() - await synchronized_event.wait() - elapsed = time.perf_counter() - start - self.logger.info(f'synced in {elapsed:.2f}s') - while True: - self.logger.info(f'{len(self.txs):,d} txs ' - f'touching {len(self.hashXs):,d} addresses') - await asyncio.sleep(self.log_status_secs) - await synchronized_event.wait() + new_touched = await self._process_mempool() + await self.on_mempool(set(self.touched_hashXs), new_touched, height) + duration = time.perf_counter() - start + self.mempool_process_time_metric.observe(duration) - def _accept_transactions(self, tx_map, utxo_map, touched): - """Accept transactions in tx_map to the mempool if all their inputs - can be found in the existing mempool or a utxo_map from the - DB. + async def _process_mempool(self) -> typing.Set[bytes]: # returns list of new touched hashXs + # Re-sync with the new set of hashes - Returns an (unprocessed tx_map, unspent utxo_map) pair. - """ - hashXs = self.hashXs - txs = self.txs + # hashXs = self.hashXs # hashX: [tx_hash, ...] + touched_hashXs = set() - deferred = {} - unspent = set(utxo_map) - # Try to find all prevouts so we can accept the TX - for hash, tx in tx_map.items(): - in_pairs = [] - try: - for prevout in tx.prevouts: - utxo = utxo_map.get(prevout) - if not utxo: - prev_hash, prev_index = prevout - # Raises KeyError if prev_hash is not in txs - utxo = txs[prev_hash].out_pairs[prev_index] - in_pairs.append(utxo) - except KeyError: - deferred[hash] = tx + # Remove txs that aren't in mempool anymore + for tx_hash in set(self.txs).difference(self.raw_mempool.keys()): + tx = self.txs.pop(tx_hash) + tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs}) + for hashX in tx_hashXs: + if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]: + self.touched_hashXs[hashX].remove(tx_hash) + if not self.touched_hashXs[hashX]: + self.touched_hashXs.pop(hashX) + touched_hashXs.update(tx_hashXs) + + tx_map = {} + for tx_hash, raw_tx in self.raw_mempool.items(): + if tx_hash in self.txs: continue + tx, tx_size = self.coin.DESERIALIZER(raw_tx).read_tx_and_vsize() + # Convert the inputs and outputs into (hashX, value) pairs + # Drop generation-like inputs from MemPoolTx.prevouts + txin_pairs = tuple((txin.prev_hash, txin.prev_idx) + for txin in tx.inputs + if not txin.is_generation()) + txout_pairs = tuple((self.coin.hashX_from_script(txout.pk_script), txout.value) + for txout in tx.outputs) - # Spend the prevouts - unspent.difference_update(tx.prevouts) + tx_map[tx_hash] = MemPoolTx(txin_pairs, None, txout_pairs, 0, tx_size, raw_tx) + + # Determine all prevouts not in the mempool, and fetch the + # UTXO information from the database. Failed prevout lookups + # return None - concurrent database updates happen - which is + # relied upon by _accept_transactions. Ignore prevouts that are + # generation-like. + # prevouts = tuple(prevout for tx in tx_map.values() + # for prevout in tx.prevouts + # if prevout[0] not in self.raw_mempool) + # utxos = await self._db.lookup_utxos(prevouts) + # utxo_map = dict(zip(prevouts, utxos)) + # unspent = set(utxo_map) + + for tx_hash, tx in tx_map.items(): + in_pairs = [] + for prevout in tx.prevouts: + # utxo = utxo_map.get(prevout) + # if not utxo: + prev_hash, prev_index = prevout + if prev_hash in self.txs: # accepted mempool + utxo = self.txs[prev_hash].out_pairs[prev_index] + elif prev_hash in tx_map: # this set of changes + utxo = tx_map[prev_hash].out_pairs[prev_index] + else: # get it from the db + prev_tx_num = self._db.prefix_db.tx_num.get(prev_hash) + if not prev_tx_num: + continue + prev_tx_num = prev_tx_num.tx_num + hashX_val = self._db.prefix_db.hashX_utxo.get(tx_hash[:4], prev_tx_num, prev_index) + if not hashX_val: + continue + hashX = hashX_val.hashX + utxo_value = self._db.prefix_db.utxo.get(hashX, prev_tx_num, prev_index) + utxo = (hashX, utxo_value.amount) + # if not prev_raw: + # print("derp", prev_hash[::-1].hex()) + # print(self._db.get_tx_num(prev_hash)) + # prev_tx, prev_tx_size = self.coin.DESERIALIZER(prev_raw.raw_tx).read_tx_and_vsize() + # prev_txo = prev_tx.outputs[prev_index] + # utxo = (self.coin.hashX_from_script(prev_txo.pk_script), prev_txo.value) + in_pairs.append(utxo) + + # # Spend the prevouts + # unspent.difference_update(tx.prevouts) # Save the in_pairs, compute the fee and accept the TX tx.in_pairs = tuple(in_pairs) @@ -121,198 +149,26 @@ class MemPool: # because some in_parts would be missing tx.fee = max(0, (sum(v for _, v in tx.in_pairs) - sum(v for _, v in tx.out_pairs))) - txs[hash] = tx + self.txs[tx_hash] = tx + # print(f"added {tx_hash[::-1].hex()} reader to mempool") for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs): - touched.add(hashX) - hashXs[hashX].add(hash) + self.touched_hashXs[hashX].add(tx_hash) + touched_hashXs.add(hashX) + # utxo_map = {prevout: utxo_map[prevout] for prevout in unspent} - return deferred, {prevout: utxo_map[prevout] for prevout in unspent} - - async def _mempool_loop(self, synchronized_event): - try: - return await self._refresh_hashes(synchronized_event) - except asyncio.CancelledError: - raise - except Exception as e: - self.logger.exception("MEMPOOL DIED") - raise e - - async def _refresh_hashes(self, synchronized_event): - """Refresh our view of the daemon's mempool.""" - while True: - start = time.perf_counter() - height = self._daemon.cached_height() - hex_hashes = await self._daemon.mempool_hashes() - if height != await self._daemon.height(): - continue - hashes = {hex_str_to_hash(hh) for hh in hex_hashes} - async with self.lock: - new_hashes = hashes.difference(self.notified_mempool_txs) - touched = await self._process_mempool(hashes) - self.notified_mempool_txs.update(new_hashes) - new_touched = { - touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes) - } - synchronized_event.set() - synchronized_event.clear() - await self.on_mempool(touched, new_touched, height) - duration = time.perf_counter() - start - self.mempool_process_time_metric.observe(duration) - try: - # we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event) - await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs) - except asyncio.TimeoutError: - pass - finally: - self.wakeup.clear() - - async def _process_mempool(self, all_hashes): - # Re-sync with the new set of hashes - txs = self.txs - - hashXs = self.hashXs # hashX: [tx_hash, ...] - touched = set() - - # First handle txs that have disappeared - for tx_hash in set(txs).difference(all_hashes): - tx = txs.pop(tx_hash) - tx_hashXs = {hashX for hashX, value in tx.in_pairs} - tx_hashXs.update(hashX for hashX, value in tx.out_pairs) - for hashX in tx_hashXs: - hashXs[hashX].remove(tx_hash) - if not hashXs[hashX]: - del hashXs[hashX] - touched.update(tx_hashXs) - - # Process new transactions - new_hashes = list(all_hashes.difference(txs)) - if new_hashes: - fetches = [] - for hashes in chunks(new_hashes, 200): - fetches.append(self._fetch_and_accept(hashes, all_hashes, touched)) - tx_map = {} - utxo_map = {} - for fetch in asyncio.as_completed(fetches): - deferred, unspent = await fetch - tx_map.update(deferred) - utxo_map.update(unspent) - - prior_count = 0 - # FIXME: this is not particularly efficient - while tx_map and len(tx_map) != prior_count: - prior_count = len(tx_map) - tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched) - - if tx_map: - self.logger.info(f'{len(tx_map)} txs dropped') - - return touched - - async def _fetch_and_accept(self, hashes, all_hashes, touched): - """Fetch a list of mempool transactions.""" - raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes)) - - to_hashX = self.coin.hashX_from_script - deserializer = self.coin.DESERIALIZER - - tx_map = {} - for hash, raw_tx in zip(hashes, raw_txs): - # The daemon may have evicted the tx from its - # mempool or it may have gotten in a block - if not raw_tx: - continue - tx, tx_size = deserializer(raw_tx).read_tx_and_vsize() - # Convert the inputs and outputs into (hashX, value) pairs - # Drop generation-like inputs from MemPoolTx.prevouts - txin_pairs = tuple((txin.prev_hash, txin.prev_idx) - for txin in tx.inputs - if not txin.is_generation()) - txout_pairs = tuple((to_hashX(txout.pk_script), txout.value) - for txout in tx.outputs) - tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs, - 0, tx_size, raw_tx) - - # Determine all prevouts not in the mempool, and fetch the - # UTXO information from the database. Failed prevout lookups - # return None - concurrent database updates happen - which is - # relied upon by _accept_transactions. Ignore prevouts that are - # generation-like. - prevouts = tuple(prevout for tx in tx_map.values() - for prevout in tx.prevouts - if prevout[0] not in all_hashes) - utxos = await self._db.lookup_utxos(prevouts) - utxo_map = dict(zip(prevouts, utxos)) - - return self._accept_transactions(tx_map, utxo_map, touched) - - # - # External interface - # - - async def keep_synchronized(self, synchronized_event): - """Keep the mempool synchronized with the daemon.""" - await asyncio.wait([ - self._mempool_loop(synchronized_event), - # self._refresh_histogram(synchronized_event), - self._logging(synchronized_event) - ]) - - async def balance_delta(self, hashX): - """Return the unconfirmed amount in the mempool for hashX. - - Can be positive or negative. - """ - value = 0 - if hashX in self.hashXs: - for hash in self.hashXs[hashX]: - tx = self.txs[hash] - value -= sum(v for h168, v in tx.in_pairs if h168 == hashX) - value += sum(v for h168, v in tx.out_pairs if h168 == hashX) - return value - - def compact_fee_histogram(self): - """Return a compact fee histogram of the current mempool.""" - return self.cached_compact_histogram - - async def potential_spends(self, hashX): - """Return a set of (prev_hash, prev_idx) pairs from mempool - transactions that touch hashX. - - None, some or all of these may be spends of the hashX, but all - actual spends of it (in the DB or mempool) will be included. - """ - result = set() - for tx_hash in self.hashXs.get(hashX, ()): - tx = self.txs[tx_hash] - result.update(tx.prevouts) - return result + return touched_hashXs def transaction_summaries(self, hashX): """Return a list of MemPoolTxSummary objects for the hashX.""" result = [] - for tx_hash in self.hashXs.get(hashX, ()): + for tx_hash in self.touched_hashXs.get(hashX, ()): tx = self.txs[tx_hash] has_ui = any(hash in self.txs for hash, idx in tx.prevouts) result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) return result - async def unordered_UTXOs(self, hashX): - """Return an unordered list of UTXO named tuples from mempool - transactions that pay to hashX. - - This does not consider if any other mempool transactions spend - the outputs. - """ - utxos = [] - for tx_hash in self.hashXs.get(hashX, ()): - tx = self.txs.get(tx_hash) - for pos, (hX, value) in enumerate(tx.out_pairs): - if hX == hashX: - utxos.append(UTXO(-1, pos, tx_hash, 0, value)) - return utxos - - def get_mempool_height(self, tx_hash): + def get_mempool_height(self, tx_hash: bytes) -> int: # Height Progression # -2: not broadcast # -1: in mempool but has unconfirmed inputs @@ -321,41 +177,57 @@ class MemPool: if tx_hash not in self.txs: return -2 tx = self.txs[tx_hash] - unspent_inputs = sum(1 if hash in self.txs else 0 for hash, idx in tx.prevouts) + unspent_inputs = any(hash in self.raw_mempool for hash, idx in tx.prevouts) if unspent_inputs: return -1 return 0 - async def _maybe_notify(self, new_touched): - tmp, tbp = self._touched_mp, self._touched_bp - common = set(tmp).intersection(tbp) - if common: - height = max(common) - elif tmp and max(tmp) == self._highest_block: - height = self._highest_block - else: - # Either we are processing a block and waiting for it to - # come in, or we have not yet had a mempool update for the - # new block height - return - touched = tmp.pop(height) - for old in [h for h in tmp if h <= height]: - del tmp[old] - for old in [h for h in tbp if h <= height]: - touched.update(tbp.pop(old)) - # print("notify", height, len(touched), len(new_touched)) - await self.notify_sessions(height, touched, new_touched) - async def start(self, height, session_manager: 'LBRYSessionManager'): - self._highest_block = height self.notify_sessions = session_manager._notify_sessions - await self.notify_sessions(height, set(), set()) + await self._notify_sessions(height, set(), set()) async def on_mempool(self, touched, new_touched, height): - self._touched_mp[height] = touched - await self._maybe_notify(new_touched) + await self._notify_sessions(height, touched, new_touched) async def on_block(self, touched, height): - self._touched_bp[height] = touched - self._highest_block = height - await self._maybe_notify(set()) + await self._notify_sessions(height, touched, set()) + + async def _notify_sessions(self, height, touched, new_touched): + """Notify sessions about height changes and touched addresses.""" + height_changed = height != self.session_manager.notified_height + if height_changed: + await self.session_manager._refresh_hsub_results(height) + + if not self.session_manager.sessions: + return + + if height_changed: + header_tasks = [ + session.send_notification('blockchain.headers.subscribe', (self.session_manager.hsub_results[session.subscribe_headers_raw], )) + for session in self.session_manager.sessions.values() if session.subscribe_headers + ] + if header_tasks: + self.logger.warning(f'notify {len(header_tasks)} sessions of new header') + asyncio.create_task(asyncio.wait(header_tasks)) + for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()): + self.session_manager.mempool_statuses.pop(hashX, None) + # self.bp._chain_executor + await asyncio.get_event_loop().run_in_executor( + None, touched.intersection_update, self.session_manager.hashx_subscriptions_by_session.keys() + ) + + if touched or new_touched or (height_changed and self.session_manager.mempool_statuses): + notified_hashxs = 0 + session_hashxes_to_notify = defaultdict(list) + to_notify = touched if height_changed else new_touched + + for hashX in to_notify: + if hashX not in self.session_manager.hashx_subscriptions_by_session: + continue + for session_id in self.session_manager.hashx_subscriptions_by_session[hashX]: + session_hashxes_to_notify[session_id].append(hashX) + notified_hashxs += 1 + for session_id, hashXes in session_hashxes_to_notify.items(): + asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes)) + if session_hashxes_to_notify: + self.logger.warning(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses') diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py deleted file mode 100644 index e53297541..000000000 --- a/lbry/wallet/server/server.py +++ /dev/null @@ -1,94 +0,0 @@ -import signal -import logging -import asyncio -from concurrent.futures.thread import ThreadPoolExecutor -import typing - -import lbry -from lbry.wallet.server.mempool import MemPool -from lbry.wallet.server.block_processor import BlockProcessor -from lbry.wallet.server.leveldb import LevelDB -from lbry.wallet.server.session import LBRYSessionManager -from lbry.prometheus import PrometheusServer - - -class Server: - - def __init__(self, env): - self.env = env - self.log = logging.getLogger(__name__).getChild(self.__class__.__name__) - self.shutdown_event = asyncio.Event() - self.cancellable_tasks = [] - - self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url) - self.db = db = LevelDB(env) - self.bp = bp = BlockProcessor(env, db, daemon, self.shutdown_event) - self.prometheus_server: typing.Optional[PrometheusServer] = None - - self.session_mgr = LBRYSessionManager( - env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon, - self.shutdown_event, - on_available_callback=bp.status_server.set_available, - on_unavailable_callback=bp.status_server.set_unavailable - ) - self._indexer_task = None - - async def start(self): - env = self.env - min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() - self.log.info(f'software version: {lbry.__version__}') - self.log.info(f'supported protocol versions: {min_str}-{max_str}') - self.log.info(f'event loop policy: {env.loop_policy}') - self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks') - - await self.daemon.height() - - def _start_cancellable(run, *args): - _flag = asyncio.Event() - self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) - return _flag.wait() - - await self.start_prometheus() - if self.env.udp_port: - await self.bp.status_server.start( - 0, bytes.fromhex(self.bp.coin.GENESIS_HASH)[::-1], self.env.country, - self.env.host, self.env.udp_port, self.env.allow_lan_udp - ) - await _start_cancellable(self.bp.fetch_and_process_blocks) - - await self.db.populate_header_merkle_cache() - await _start_cancellable(self.bp.mempool.keep_synchronized) - await _start_cancellable(self.session_mgr.serve, self.bp.mempool) - - async def stop(self): - for task in reversed(self.cancellable_tasks): - task.cancel() - await asyncio.wait(self.cancellable_tasks) - if self.prometheus_server: - await self.prometheus_server.stop() - self.prometheus_server = None - self.shutdown_event.set() - await self.daemon.close() - - def run(self): - loop = asyncio.get_event_loop() - executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker') - loop.set_default_executor(executor) - - def __exit(): - raise SystemExit() - try: - loop.add_signal_handler(signal.SIGINT, __exit) - loop.add_signal_handler(signal.SIGTERM, __exit) - loop.run_until_complete(self.start()) - loop.run_until_complete(self.shutdown_event.wait()) - except (SystemExit, KeyboardInterrupt): - pass - finally: - loop.run_until_complete(self.stop()) - executor.shutdown(True) - - async def start_prometheus(self): - if not self.prometheus_server and self.env.prometheus_port: - self.prometheus_server = PrometheusServer() - await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 4ff192dc5..0318e932c 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -20,8 +20,7 @@ import lbry from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.schema.result import Outputs -from lbry.wallet.server.block_processor import BlockProcessor -from lbry.wallet.server.leveldb import LevelDB +from lbry.wallet.server.db.db import HubDB from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.rpc.framing import NewlineFramer @@ -34,9 +33,12 @@ from lbry.wallet.rpc import ( from lbry.wallet.server import util from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error from lbry.wallet.server.daemon import DaemonError +from lbry.wallet.server.db.elasticsearch import SearchIndex + if typing.TYPE_CHECKING: from lbry.wallet.server.env import Env from lbry.wallet.server.daemon import Daemon + from lbry.wallet.server.mempool import MemPool BAD_REQUEST = 1 DAEMON_ERROR = 2 @@ -170,7 +172,7 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache, + def __init__(self, env: 'Env', db: HubDB, mempool: 'MemPool', history_cache, resolve_cache, resolve_outputs_cache, daemon: 'Daemon', shutdown_event: asyncio.Event, on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) @@ -183,7 +185,7 @@ class SessionManager: self.shutdown_event = shutdown_event self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers: typing.Dict[str, asyncio.AbstractServer] = {} - self.sessions: typing.Dict[int, 'SessionBase'] = {} + self.sessions: typing.Dict[int, 'LBRYElectrumX'] = {} self.hashx_subscriptions_by_session: typing.DefaultDict[str, typing.Set[int]] = defaultdict(set) self.mempool_statuses = {} self.cur_group = SessionGroup(0) @@ -198,8 +200,15 @@ class SessionManager: self.session_event = Event() + # Search index + self.search_index = SearchIndex( + self.env.es_index_prefix, self.env.database_query_timeout, + elastic_host=env.elastic_host, elastic_port=env.elastic_port + ) + async def _start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() + if kind == 'RPC': protocol_class = LocalRPC else: @@ -578,7 +587,7 @@ class SessionManager: async def raw_header(self, height): """Return the binary header at the given height.""" try: - return self.db.raw_header(height) + return await self.db.raw_header(height) except IndexError: raise RPCError(BAD_REQUEST, f'height {height:,d} ' 'out of range') from None @@ -686,12 +695,12 @@ class SessionBase(RPCSession): request_handlers: typing.Dict[str, typing.Callable] = {} version = '0.5.7' - def __init__(self, session_mgr, db, mempool, kind): + def __init__(self, session_manager: 'LBRYSessionManager', db: 'LevelDB', mempool: 'MemPool', kind: str): connection = JSONRPCConnection(JSONRPCAutoDetect) - self.env = session_mgr.env + self.env = session_manager.env super().__init__(connection=connection) self.logger = util.class_logger(__name__, self.__class__.__name__) - self.session_mgr = session_mgr + self.session_manager = session_manager self.db = db self.mempool = mempool self.kind = kind # 'RPC', 'TCP' etc. @@ -699,7 +708,7 @@ class SessionBase(RPCSession): self.anon_logs = self.env.anon_logs self.txs_sent = 0 self.log_me = False - self.daemon_request = self.session_mgr.daemon_request + self.daemon_request = self.session_manager.daemon_request # Hijack the connection so we can log messages self._receive_message_orig = self.connection.receive_message self.connection.receive_message = self.receive_message @@ -729,17 +738,17 @@ class SessionBase(RPCSession): self.session_id = next(self.session_counter) context = {'conn_id': f'{self.session_id}'} self.logger = util.ConnectionLogger(self.logger, context) - self.group = self.session_mgr.add_session(self) - self.session_mgr.session_count_metric.labels(version=self.client_version).inc() + self.group = self.session_manager.add_session(self) + self.session_manager.session_count_metric.labels(version=self.client_version).inc() peer_addr_str = self.peer_address_str() self.logger.info(f'{self.kind} {peer_addr_str}, ' - f'{self.session_mgr.session_count():,d} total') + f'{self.session_manager.session_count():,d} total') def connection_lost(self, exc): """Handle client disconnection.""" super().connection_lost(exc) - self.session_mgr.remove_session(self) - self.session_mgr.session_count_metric.labels(version=self.client_version).dec() + self.session_manager.remove_session(self) + self.session_manager.session_count_metric.labels(version=self.client_version).dec() msg = '' if not self._can_send.is_set(): msg += ' whilst paused' @@ -763,7 +772,7 @@ class SessionBase(RPCSession): """Handle an incoming request. ElectrumX doesn't receive notifications from client sessions. """ - self.session_mgr.request_count_metric.labels(method=request.method, version=self.client_version).inc() + self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc() if isinstance(request, Request): handler = self.request_handlers.get(request.method) handler = partial(handler, self) @@ -811,7 +820,7 @@ class LBRYElectrumX(SessionBase): PROTOCOL_MIN = VERSION.PROTOCOL_MIN PROTOCOL_MAX = VERSION.PROTOCOL_MAX max_errors = math.inf # don't disconnect people for errors! let them happen... - session_mgr: LBRYSessionManager + session_manager: LBRYSessionManager version = lbry.__version__ cached_server_features = {} @@ -822,17 +831,17 @@ class LBRYElectrumX(SessionBase): 'blockchain.block.get_header': cls.block_get_header, 'blockchain.estimatefee': cls.estimatefee, 'blockchain.relayfee': cls.relayfee, - 'blockchain.scripthash.get_balance': cls.scripthash_get_balance, + # 'blockchain.scripthash.get_balance': cls.scripthash_get_balance, 'blockchain.scripthash.get_history': cls.scripthash_get_history, 'blockchain.scripthash.get_mempool': cls.scripthash_get_mempool, - 'blockchain.scripthash.listunspent': cls.scripthash_listunspent, + # 'blockchain.scripthash.listunspent': cls.scripthash_listunspent, 'blockchain.scripthash.subscribe': cls.scripthash_subscribe, 'blockchain.transaction.broadcast': cls.transaction_broadcast, 'blockchain.transaction.get': cls.transaction_get, 'blockchain.transaction.get_batch': cls.transaction_get_batch, 'blockchain.transaction.info': cls.transaction_info, 'blockchain.transaction.get_merkle': cls.transaction_merkle, - 'server.add_peer': cls.add_peer, + # 'server.add_peer': cls.add_peer, 'server.banner': cls.banner, 'server.payment_address': cls.payment_address, 'server.donation_address': cls.donation_address, @@ -849,10 +858,10 @@ class LBRYElectrumX(SessionBase): 'blockchain.block.headers': cls.block_headers, 'server.ping': cls.ping, 'blockchain.headers.subscribe': cls.headers_subscribe_False, - 'blockchain.address.get_balance': cls.address_get_balance, + # 'blockchain.address.get_balance': cls.address_get_balance, 'blockchain.address.get_history': cls.address_get_history, 'blockchain.address.get_mempool': cls.address_get_mempool, - 'blockchain.address.listunspent': cls.address_listunspent, + # 'blockchain.address.listunspent': cls.address_listunspent, 'blockchain.address.subscribe': cls.address_subscribe, 'blockchain.address.unsubscribe': cls.address_unsubscribe, }) @@ -871,8 +880,8 @@ class LBRYElectrumX(SessionBase): self.sv_seen = False self.protocol_tuple = self.PROTOCOL_MIN self.protocol_string = None - self.daemon = self.session_mgr.daemon - self.db: LevelDB = self.session_mgr.db + self.daemon = self.session_manager.daemon + self.db: LevelDB = self.session_manager.db @classmethod def protocol_min_max_strings(cls): @@ -921,7 +930,7 @@ class LBRYElectrumX(SessionBase): else: method = 'blockchain.address.subscribe' start = time.perf_counter() - db_history = await self.session_mgr.limited_history(hashX) + db_history = await self.session_manager.limited_history(hashX) mempool = self.mempool.transaction_summaries(hashX) status = ''.join(f'{hash_to_hex_str(tx_hash)}:' @@ -935,24 +944,25 @@ class LBRYElectrumX(SessionBase): else: status = None if mempool: - self.session_mgr.mempool_statuses[hashX] = status + self.session_manager.mempool_statuses[hashX] = status else: - self.session_mgr.mempool_statuses.pop(hashX, None) + self.session_manager.mempool_statuses.pop(hashX, None) - self.session_mgr.address_history_metric.observe(time.perf_counter() - start) + self.session_manager.address_history_metric.observe(time.perf_counter() - start) notifications.append((method, (alias, status))) + print(f"notify {alias} {method}") start = time.perf_counter() - self.session_mgr.notifications_in_flight_metric.inc() + self.session_manager.notifications_in_flight_metric.inc() for method, args in notifications: self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() try: await self.send_notifications( Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications]) ) - self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start) + self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start) finally: - self.session_mgr.notifications_in_flight_metric.dec() + self.session_manager.notifications_in_flight_metric.dec() # def get_metrics_or_placeholder_for_api(self, query_name): # """ Do not hold on to a reference to the metrics @@ -960,7 +970,7 @@ class LBRYElectrumX(SessionBase): # you may be working with a stale metrics object. # """ # if self.env.track_metrics: - # # return self.session_mgr.metrics.for_api(query_name) + # # return self.session_manager.metrics.for_api(query_name) # else: # return APICallMetrics(query_name) @@ -970,17 +980,17 @@ class LBRYElectrumX(SessionBase): # if isinstance(kwargs, dict): # kwargs['release_time'] = format_release_time(kwargs.get('release_time')) # try: - # self.session_mgr.pending_query_metric.inc() + # self.session_manager.pending_query_metric.inc() # return await self.db.search_index.session_query(query_name, kwargs) # except ConnectionTimeout: - # self.session_mgr.interrupt_count_metric.inc() + # self.session_manager.interrupt_count_metric.inc() # raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') # finally: - # self.session_mgr.pending_query_metric.dec() - # self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) + # self.session_manager.pending_query_metric.dec() + # self.session_manager.executor_time_metric.observe(time.perf_counter() - start) async def mempool_compact_histogram(self): - return self.mempool.compact_fee_histogram() + return [] #self.mempool.compact_fee_histogram() async def claimtrie_search(self, **kwargs): start = time.perf_counter() @@ -992,16 +1002,16 @@ class LBRYElectrumX(SessionBase): except ValueError: pass try: - self.session_mgr.pending_query_metric.inc() + self.session_manager.pending_query_metric.inc() if 'channel' in kwargs: channel_url = kwargs.pop('channel') _, channel_claim, _, _ = await self.db.resolve(channel_url) if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): return Outputs.to_base64([], [], 0, None, None) kwargs['channel_id'] = channel_claim.claim_hash.hex() - return await self.db.search_index.cached_search(kwargs) + return await self.session_manager.search_index.cached_search(kwargs) except ConnectionTimeout: - self.session_mgr.interrupt_count_metric.inc() + self.session_manager.interrupt_count_metric.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') except TooManyClaimSearchParametersError as err: await asyncio.sleep(2) @@ -1009,25 +1019,25 @@ class LBRYElectrumX(SessionBase): self.peer_address()[0], err.key, err.limit) return RPCError(1, str(err)) finally: - self.session_mgr.pending_query_metric.dec() - self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) + self.session_manager.pending_query_metric.dec() + self.session_manager.executor_time_metric.observe(time.perf_counter() - start) async def _cached_resolve_url(self, url): - if url not in self.session_mgr.resolve_cache: - self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) - return self.session_mgr.resolve_cache[url] + if url not in self.session_manager.resolve_cache: + self.session_manager.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) + return self.session_manager.resolve_cache[url] async def claimtrie_resolve(self, *urls) -> str: sorted_urls = tuple(sorted(urls)) - self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) + self.session_manager.urls_to_resolve_count_metric.inc(len(sorted_urls)) try: - if sorted_urls in self.session_mgr.resolve_outputs_cache: - return self.session_mgr.resolve_outputs_cache[sorted_urls] + if sorted_urls in self.session_manager.resolve_outputs_cache: + return self.session_manager.resolve_outputs_cache[sorted_urls] rows, extra = [], [] for url in urls: - if url not in self.session_mgr.resolve_cache: - self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url) - stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url] + if url not in self.session_manager.resolve_cache: + self.session_manager.resolve_cache[url] = await self._cached_resolve_url(url) + stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url] if isinstance(channel, ResolveCensoredError): rows.append(channel) extra.append(channel.censor_row) @@ -1052,12 +1062,12 @@ class LBRYElectrumX(SessionBase): if reposted_channel: extra.append(reposted_channel) await asyncio.sleep(0) - self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( + self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( None, Outputs.to_base64, rows, extra, 0, None, None ) return result finally: - self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) + self.session_manager.resolved_url_count_metric.inc(len(sorted_urls)) async def get_server_height(self): return self.db.db_height @@ -1093,7 +1103,7 @@ class LBRYElectrumX(SessionBase): async def subscribe_headers_result(self): """The result of a header subscription or notification.""" - return self.session_mgr.hsub_results[self.subscribe_headers_raw] + return self.session_manager.hsub_results[self.subscribe_headers_raw] async def _headers_subscribe(self, raw): """Subscribe to get headers of new blocks.""" @@ -1130,7 +1140,7 @@ class LBRYElectrumX(SessionBase): # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 - db_history = await self.session_mgr.limited_history(hashX) + db_history = await self.session_manager.limited_history(hashX) mempool = self.mempool.transaction_summaries(hashX) status = ''.join(f'{hash_to_hex_str(tx_hash)}:' @@ -1145,32 +1155,32 @@ class LBRYElectrumX(SessionBase): status = None if mempool: - self.session_mgr.mempool_statuses[hashX] = status + self.session_manager.mempool_statuses[hashX] = status else: - self.session_mgr.mempool_statuses.pop(hashX, None) + self.session_manager.mempool_statuses.pop(hashX, None) return status - async def hashX_listunspent(self, hashX): - """Return the list of UTXOs of a script hash, including mempool - effects.""" - utxos = await self.db.all_utxos(hashX) - utxos = sorted(utxos) - utxos.extend(await self.mempool.unordered_UTXOs(hashX)) - spends = await self.mempool.potential_spends(hashX) - - return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), - 'tx_pos': utxo.tx_pos, - 'height': utxo.height, 'value': utxo.value} - for utxo in utxos - if (utxo.tx_hash, utxo.tx_pos) not in spends] + # async def hashX_listunspent(self, hashX): + # """Return the list of UTXOs of a script hash, including mempool + # effects.""" + # utxos = await self.db.all_utxos(hashX) + # utxos = sorted(utxos) + # utxos.extend(await self.mempool.unordered_UTXOs(hashX)) + # spends = await self.mempool.potential_spends(hashX) + # + # return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), + # 'tx_pos': utxo.tx_pos, + # 'height': utxo.height, 'value': utxo.value} + # for utxo in utxos + # if (utxo.tx_hash, utxo.tx_pos) not in spends] async def hashX_subscribe(self, hashX, alias): self.hashX_subs[hashX] = alias - self.session_mgr.hashx_subscriptions_by_session[hashX].add(id(self)) + self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self)) return await self.address_status(hashX) async def hashX_unsubscribe(self, hashX, alias): - sessions = self.session_mgr.hashx_subscriptions_by_session[hashX] + sessions = self.session_manager.hashx_subscriptions_by_session[hashX] sessions.remove(id(self)) if not sessions: self.hashX_subs.pop(hashX, None) @@ -1182,10 +1192,10 @@ class LBRYElectrumX(SessionBase): pass raise RPCError(BAD_REQUEST, f'{address} is not a valid address') - async def address_get_balance(self, address): - """Return the confirmed and unconfirmed balance of an address.""" - hashX = self.address_to_hashX(address) - return await self.get_balance(hashX) + # async def address_get_balance(self, address): + # """Return the confirmed and unconfirmed balance of an address.""" + # hashX = self.address_to_hashX(address) + # return await self.get_balance(hashX) async def address_get_history(self, address): """Return the confirmed and unconfirmed history of an address.""" @@ -1197,10 +1207,10 @@ class LBRYElectrumX(SessionBase): hashX = self.address_to_hashX(address) return self.unconfirmed_history(hashX) - async def address_listunspent(self, address): - """Return the list of UTXOs of an address.""" - hashX = self.address_to_hashX(address) - return await self.hashX_listunspent(hashX) + # async def address_listunspent(self, address): + # """Return the list of UTXOs of an address.""" + # hashX = self.address_to_hashX(address) + # return await self.hashX_listunspent(hashX) async def address_subscribe(self, *addresses): """Subscribe to an address. @@ -1221,16 +1231,16 @@ class LBRYElectrumX(SessionBase): hashX = self.address_to_hashX(address) return await self.hashX_unsubscribe(hashX, address) - async def get_balance(self, hashX): - utxos = await self.db.all_utxos(hashX) - confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = await self.mempool.balance_delta(hashX) - return {'confirmed': confirmed, 'unconfirmed': unconfirmed} + # async def get_balance(self, hashX): + # utxos = await self.db.all_utxos(hashX) + # confirmed = sum(utxo.value for utxo in utxos) + # unconfirmed = await self.mempool.balance_delta(hashX) + # return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - async def scripthash_get_balance(self, scripthash): - """Return the confirmed and unconfirmed balance of a scripthash.""" - hashX = scripthash_to_hashX(scripthash) - return await self.get_balance(hashX) + # async def scripthash_get_balance(self, scripthash): + # """Return the confirmed and unconfirmed balance of a scripthash.""" + # hashX = scripthash_to_hashX(scripthash) + # return await self.get_balance(hashX) def unconfirmed_history(self, hashX): # Note unconfirmed history is unordered in electrum-server @@ -1242,7 +1252,7 @@ class LBRYElectrumX(SessionBase): async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.session_mgr.limited_history(hashX) + history = await self.session_manager.limited_history(hashX) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + self.unconfirmed_history(hashX) @@ -1257,10 +1267,10 @@ class LBRYElectrumX(SessionBase): hashX = scripthash_to_hashX(scripthash) return self.unconfirmed_history(hashX) - async def scripthash_listunspent(self, scripthash): - """Return the list of UTXOs of a scripthash.""" - hashX = scripthash_to_hashX(scripthash) - return await self.hashX_listunspent(hashX) + # async def scripthash_listunspent(self, scripthash): + # """Return the list of UTXOs of a scripthash.""" + # hashX = scripthash_to_hashX(scripthash) + # return await self.hashX_listunspent(hashX) async def scripthash_subscribe(self, scripthash): """Subscribe to a script hash. @@ -1295,7 +1305,7 @@ class LBRYElectrumX(SessionBase): max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) - headers, count = self.db.read_headers(start_height, count) + headers, count = await self.db.read_headers(start_height, count) if b64: headers = self.db.encode_headers(start_height, count, headers) @@ -1318,7 +1328,7 @@ class LBRYElectrumX(SessionBase): index = non_negative_integer(index) size = self.coin.CHUNK_SIZE start_height = index * size - headers, _ = self.db.read_headers(start_height, size) + headers, _ = await self.db.read_headers(start_height, size) return headers.hex() async def block_get_header(self, height): @@ -1326,7 +1336,7 @@ class LBRYElectrumX(SessionBase): height: the header's height""" height = non_negative_integer(height) - return await self.session_mgr.electrum_header(height) + return await self.session_manager.electrum_header(height) def is_tor(self): """Try to detect if the connection is to a tor hidden service we are @@ -1416,10 +1426,10 @@ class LBRYElectrumX(SessionBase): self.close_after_send = True raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}') if self.client_version != client_name[:17]: - self.session_mgr.session_count_metric.labels(version=self.client_version).dec() + self.session_manager.session_count_metric.labels(version=self.client_version).dec() self.client_version = client_name[:17] - self.session_mgr.session_count_metric.labels(version=self.client_version).inc() - self.session_mgr.client_version_metric.labels(version=self.client_version).inc() + self.session_manager.session_count_metric.labels(version=self.client_version).inc() + self.session_manager.client_version_metric.labels(version=self.client_version).inc() # Find the highest common protocol version. Disconnect if # that protocol version in unsupported. @@ -1440,9 +1450,10 @@ class LBRYElectrumX(SessionBase): raw_tx: the raw transaction as a hexadecimal string""" # This returns errors as JSON RPC errors, as is natural try: - hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) + hex_hash = await self.session_manager.broadcast_transaction(raw_tx) self.txs_sent += 1 - self.mempool.wakeup.set() + # self.mempool.wakeup.set() + # await asyncio.sleep(0.5) self.logger.info(f'sent tx: {hex_hash}') return hex_hash except DaemonError as e: @@ -1456,7 +1467,7 @@ class LBRYElectrumX(SessionBase): return (await self.transaction_get_batch(tx_hash))[tx_hash] async def transaction_get_batch(self, *tx_hashes): - self.session_mgr.tx_request_count_metric.inc(len(tx_hashes)) + self.session_manager.tx_request_count_metric.inc(len(tx_hashes)) if len(tx_hashes) > 100: raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') for tx_hash in tx_hashes: @@ -1495,8 +1506,11 @@ class LBRYElectrumX(SessionBase): 'block_height': block_height } await asyncio.sleep(0) # heavy call, give other tasks a chance + print("return tx batch") + for tx_hash, (_, info) in batch_result.items(): + print(tx_hash, info['block_height']) - self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes)) + self.session_manager.tx_replied_count_metric.inc(len(tx_hashes)) return batch_result async def transaction_get(self, tx_hash, verbose=False):