add chain reader and reader server, new mempool, update block processor

This commit is contained in:
Jack Robison 2022-01-06 12:39:24 -05:00
parent d3da442727
commit 499ee74dfc
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 563 additions and 520 deletions

View file

@ -92,7 +92,6 @@ class BlockProcessor:
) )
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor') self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync') self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
self.shutdown_event = asyncio.Event() self.shutdown_event = asyncio.Event()
self.coin = env.coin self.coin = env.coin
if env.coin.NET == 'mainnet': if env.coin.NET == 'mainnet':
@ -118,14 +117,13 @@ class BlockProcessor:
self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {} self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {}
# Claimtrie cache # Claimtrie cache
self.db_op_stack: Optional[RevertableOpStack] = None self.db_op_stack: Optional['RevertableOpStack'] = None
# self.search_cache = {} # self.search_cache = {}
self.resolve_cache = LRUCache(2**16) self.resolve_cache = LRUCache(2**16)
self.resolve_outputs_cache = LRUCache(2 ** 16) self.resolve_outputs_cache = LRUCache(2 ** 16)
self.history_cache = {} self.history_cache = {}
self.status_server = StatusServer()
################################# #################################
# attributes used for calculating stake activations and takeovers per block # attributes used for calculating stake activations and takeovers per block
@ -162,7 +160,6 @@ class BlockProcessor:
self.removed_claims_to_send_es = set() # cumulative changes across blocks to send ES self.removed_claims_to_send_es = set() # cumulative changes across blocks to send ES
self.touched_claims_to_send_es = set() self.touched_claims_to_send_es = set()
self.activation_info_to_send_es: DefaultDict[str, List[TrendingNotification]] = defaultdict(list)
self.removed_claim_hashes: Set[bytes] = set() # per block changes self.removed_claim_hashes: Set[bytes] = set() # per block changes
self.touched_claim_hashes: Set[bytes] = set() self.touched_claim_hashes: Set[bytes] = set()
@ -251,29 +248,12 @@ class BlockProcessor:
await self.run_in_thread(self.advance_block, block) await self.run_in_thread(self.advance_block, block)
await self.flush() await self.flush()
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start) self.logger.warning("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
if self.height == self.coin.nExtendedClaimExpirationForkHeight: if self.height == self.coin.nExtendedClaimExpirationForkHeight:
self.logger.warning( self.logger.warning(
"applying extended claim expiration fork on claims accepted by, %i", self.height "applying extended claim expiration fork on claims accepted by, %i", self.height
) )
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork) await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
if self.db.first_sync:
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
self.activation_info_to_send_es.clear()
# TODO: we shouldnt wait on the search index updating before advancing to the next block
if not self.db.first_sync:
await self.db.reload_blocking_filtering_streams()
await self.db.search_index.claim_consumer(self.claim_producer())
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
self.db.filtered_streams, self.db.filtered_channels)
await self.db.search_index.update_trending_score(self.activation_info_to_send_es)
await self._es_caught_up()
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
self.activation_info_to_send_es.clear()
# print("******************\n") # print("******************\n")
except: except:
self.logger.exception("advance blocks failed") self.logger.exception("advance blocks failed")
@ -281,12 +261,9 @@ class BlockProcessor:
processed_time = time.perf_counter() - total_start processed_time = time.perf_counter() - total_start
self.block_count_metric.set(self.height) self.block_count_metric.set(self.height)
self.block_update_time_metric.observe(processed_time) self.block_update_time_metric.observe(processed_time)
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
if not self.db.first_sync: if not self.db.first_sync:
s = '' if len(blocks) == 1 else 's' s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time)) self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
if self._caught_up_event.is_set():
await self.mempool.on_block(self.touched_hashXs, self.height)
self.touched_hashXs.clear() self.touched_hashXs.clear()
elif hprevs[0] != chain[0]: elif hprevs[0] != chain[0]:
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0) min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
@ -309,15 +286,6 @@ class BlockProcessor:
if self.env.cache_all_claim_txos: if self.env.cache_all_claim_txos:
await self.db._read_claim_txos() # TODO: don't do this await self.db._read_claim_txos() # TODO: don't do this
for touched in self.touched_claims_to_send_es:
if not self.db.get_claim_txo(touched):
self.removed_claims_to_send_es.add(touched)
self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es)
await self.db.search_index.claim_consumer(self.claim_producer())
self.db.search_index.clear_caches()
self.touched_claims_to_send_es.clear()
self.removed_claims_to_send_es.clear()
self.activation_info_to_send_es.clear()
await self.prefetcher.reset_height(self.height) await self.prefetcher.reset_height(self.height)
self.reorg_count_metric.inc() self.reorg_count_metric.inc()
except: except:
@ -652,8 +620,6 @@ class BlockProcessor:
self.support_txo_to_claim.pop(support_txo_to_clear) self.support_txo_to_claim.pop(support_txo_to_clear)
self.support_txos_by_claim[claim_hash].clear() self.support_txos_by_claim[claim_hash].clear()
self.support_txos_by_claim.pop(claim_hash) self.support_txos_by_claim.pop(claim_hash)
if claim_hash.hex() in self.activation_info_to_send_es:
self.activation_info_to_send_es.pop(claim_hash.hex())
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
self._invalidate_channel_signatures(claim_hash) self._invalidate_channel_signatures(claim_hash)
@ -1226,10 +1192,6 @@ class BlockProcessor:
self.touched_claim_hashes.add(controlling.claim_hash) self.touched_claim_hashes.add(controlling.claim_hash)
self.touched_claim_hashes.add(winning) self.touched_claim_hashes.add(winning)
def _add_claim_activation_change_notification(self, claim_id: str, height: int, prev_amount: int,
new_amount: int):
self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, prev_amount, new_amount))
def _get_cumulative_update_ops(self, height: int): def _get_cumulative_update_ops(self, height: int):
# update the last takeover height for names with takeovers # update the last takeover height for names with takeovers
for name in self.taken_over_names: for name in self.taken_over_names:
@ -1493,7 +1455,6 @@ class BlockProcessor:
self.utxo_cache.clear() self.utxo_cache.clear()
self.hashXs_by_tx.clear() self.hashXs_by_tx.clear()
self.history_cache.clear() self.history_cache.clear()
self.mempool.notified_mempool_txs.clear()
self.removed_claim_hashes.clear() self.removed_claim_hashes.clear()
self.touched_claim_hashes.clear() self.touched_claim_hashes.clear()
self.pending_reposted.clear() self.pending_reposted.clear()
@ -1518,8 +1479,8 @@ class BlockProcessor:
self.logger.info("backup block %i", self.height) self.logger.info("backup block %i", self.height)
# Check and update self.tip # Check and update self.tip
self.db.headers.pop()
self.db.tx_counts.pop() self.db.tx_counts.pop()
reverted_block_hash = self.coin.header_hash(self.db.headers.pop())
self.tip = self.coin.header_hash(self.db.headers[-1]) self.tip = self.coin.header_hash(self.db.headers[-1])
if self.env.cache_all_tx_hashes: if self.env.cache_all_tx_hashes:
while len(self.db.total_transactions) > self.db.tx_counts[-1]: while len(self.db.total_transactions) > self.db.tx_counts[-1]:
@ -1613,16 +1574,27 @@ class BlockProcessor:
self.touched_hashXs.add(hashX) self.touched_hashXs.add(hashX)
return hashX return hashX
async def _process_prefetched_blocks(self): async def process_blocks_and_mempool_forever(self):
"""Loop forever processing blocks as they arrive.""" """Loop forever processing blocks as they arrive."""
while True: while True:
if self.height == self.daemon.cached_height(): if self.height == self.daemon.cached_height():
if not self._caught_up_event.is_set(): if not self._caught_up_event.is_set():
await self._first_caught_up() await self._first_caught_up()
self._caught_up_event.set() self._caught_up_event.set()
await self.blocks_event.wait() try:
await asyncio.wait_for(self.blocks_event.wait(), 0.25)
except asyncio.TimeoutError:
pass
self.blocks_event.clear() self.blocks_event.clear()
blocks = self.prefetcher.get_prefetched_blocks() blocks = self.prefetcher.get_prefetched_blocks()
if not blocks:
try:
await self.check_mempool()
self.db.prefix_db.unsafe_commit()
except Exception:
self.logger.exception("error while updating mempool txs")
raise
else:
try: try:
await self.check_and_advance_blocks(blocks) await self.check_and_advance_blocks(blocks)
except Exception: except Exception:
@ -1659,6 +1631,13 @@ class BlockProcessor:
f'height {self.height:,d}, halting here.') f'height {self.height:,d}, halting here.')
self.shutdown_event.set() self.shutdown_event.set()
async def open(self):
self.db.open_db()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
await self.db.initialize_caches()
async def fetch_and_process_blocks(self, caught_up_event): async def fetch_and_process_blocks(self, caught_up_event):
"""Fetch, process and index blocks from the daemon. """Fetch, process and index blocks from the daemon.
@ -1674,16 +1653,9 @@ class BlockProcessor:
self._caught_up_event = caught_up_event self._caught_up_event = caught_up_event
try: try:
self.db.open_db()
self.height = self.db.db_height
self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
await self.db.initialize_caches()
await self.db.search_index.start()
await asyncio.wait([ await asyncio.wait([
self.prefetcher.main_loop(self.height), self.prefetcher.main_loop(self.height),
self._process_prefetched_blocks() self.process_blocks_and_mempool_forever()
]) ])
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
@ -1691,9 +1663,47 @@ class BlockProcessor:
self.logger.exception("Block processing failed!") self.logger.exception("Block processing failed!")
raise raise
finally: finally:
self.status_server.stop()
# Shut down block processing # Shut down block processing
self.logger.info('closing the DB for a clean shutdown...') self.logger.info('closing the DB for a clean shutdown...')
self._sync_reader_executor.shutdown(wait=True) self._sync_reader_executor.shutdown(wait=True)
self._chain_executor.shutdown(wait=True) self._chain_executor.shutdown(wait=True)
self.db.close() self.db.close()
async def start(self):
env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.logger.info(f'software version: {lbry.__version__}')
self.logger.info(f'supported protocol versions: {min_str}-{max_str}')
self.logger.info(f'event loop policy: {env.loop_policy}')
self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
await self.daemon.height()
def _start_cancellable(run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
await _start_cancellable(self.fetch_and_process_blocks)
async def stop(self):
for task in reversed(self.cancellable_tasks):
task.cancel()
await asyncio.wait(self.cancellable_tasks)
self.shutdown_event.set()
await self.daemon.close()
def run(self):
loop = asyncio.get_event_loop()
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start())
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())

View file

@ -0,0 +1,243 @@
import signal
import logging
import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
import typing
import lbry
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.db.prefixes import DBState
from lbry.wallet.server.udp import StatusServer
from lbry.wallet.server.db.db import HubDB
from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierClientProtocol
from lbry.wallet.server.session import LBRYSessionManager
from lbry.prometheus import PrometheusServer
class BlockchainReader:
def __init__(self, env, secondary_name: str):
self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event()
self.cancellable_tasks = []
self.db = HubDB(
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
secondary_name=secondary_name, max_open_files=-1
)
self.last_state: typing.Optional[DBState] = None
self._refresh_interval = 0.1
def _detect_changes(self):
try:
self.db.prefix_db.try_catch_up_with_primary()
except:
self.log.exception('failed to update secondary db')
raise
state = self.db.prefix_db.db_state.get()
if not state or state.height <= 0:
return
# if state and self.last_state and self.db.headers and self.last_state.tip == self.db.coin.header_hash(self.db.headers[-1]):
# return
if self.last_state and self.last_state.height > state.height: # FIXME: derp
self.log.debug("reorg detected, waiting until the writer has flushed the new blocks to advance")
return
last_height = 0 if not self.last_state else self.last_state.height
if self.last_state:
while True:
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
self.log.debug("connects to block %i", last_height)
break
else:
self.log.warning("disconnect block %i", last_height)
self.unwind()
last_height -= 1
self.db.read_db_state()
if not self.last_state or self.last_state.height < state.height:
for height in range(last_height + 1, state.height + 1):
self.log.warning("advancing to %i", height)
self.advance(height)
self.clear_caches()
self.last_state = state
# elif self.last_state and self.last_state.height > state.height:
# last_height = self.last_state.height
# for height in range(last_height, state.height, -1):
# self.log.warning("unwind %i", height)
# self.unwind()
# self.clear_caches()
# self.last_state = state
# self.log.warning("unwound to %i", self.last_state.height)
# print("reader rewound to ", self.last_state.height)
async def poll_for_changes(self):
await asyncio.get_event_loop().run_in_executor(None, self._detect_changes)
async def refresh_blocks_forever(self, synchronized: asyncio.Event):
self.log.warning("start refresh blocks forever")
while True:
try:
await self.poll_for_changes()
except:
self.log.exception("boom")
raise
await asyncio.sleep(self._refresh_interval)
synchronized.set()
def clear_caches(self):
pass
def advance(self, height: int):
tx_count = self.db.prefix_db.tx_count.get(height).tx_count
assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}"
self.db.tx_counts.append(tx_count)
self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False))
def unwind(self):
self.db.tx_counts.pop()
self.db.headers.pop()
class BlockchainReaderServer(BlockchainReader):
def __init__(self, env):
super().__init__(env, 'lbry-reader')
self.history_cache = {}
self.resolve_outputs_cache = {}
self.resolve_cache = {}
self.notifications_to_send = []
self.status_server = StatusServer()
self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) # only needed for broadcasting txs
self.prometheus_server: typing.Optional[PrometheusServer] = None
self.mempool = MemPool(self.env.coin, self.db)
self.session_manager = LBRYSessionManager(
env, self.db, self.mempool, self.history_cache, self.resolve_cache,
self.resolve_outputs_cache, self.daemon,
self.shutdown_event,
on_available_callback=self.status_server.set_available,
on_unavailable_callback=self.status_server.set_unavailable
)
self.mempool.session_manager = self.session_manager
self.es_notifications = asyncio.Queue()
self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications)
self.synchronized = asyncio.Event()
self._es_height = None
def clear_caches(self):
self.history_cache.clear()
self.resolve_outputs_cache.clear()
self.resolve_cache.clear()
# self.clear_search_cache()
# self.mempool.notified_mempool_txs.clear()
def clear_search_cache(self):
self.session_manager.search_index.clear_caches()
def advance(self, height: int):
super().advance(height)
touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs
self.notifications_to_send.append((set(touched_hashXs), height))
def _detect_changes(self):
super()._detect_changes()
self.mempool.raw_mempool.clear()
self.mempool.raw_mempool.update(
{k.tx_hash: v.raw_tx for k, v in self.db.prefix_db.mempool_tx.iterate()}
)
async def poll_for_changes(self):
await super().poll_for_changes()
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
if self.notifications_to_send:
for (touched, height) in self.notifications_to_send:
await self.mempool.on_block(touched, height)
self.log.warning("reader advanced to %i", height)
if self._es_height == self.db.db_height:
self.synchronized.set()
# print("reader notified")
await self.mempool.refresh_hashes(self.db.db_height)
self.notifications_to_send.clear()
async def receive_es_notifications(self, synchronized: asyncio.Event):
await asyncio.get_event_loop().create_connection(
lambda: self.es_notification_client, '127.0.0.1', self.env.elastic_notifier_port
)
synchronized.set()
try:
while True:
self._es_height = await self.es_notifications.get()
self.clear_search_cache()
if self._es_height == self.db.db_height:
self.synchronized.set()
self.log.warning("es and reader are in sync")
else:
self.log.warning("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height)
finally:
self.es_notification_client.close()
async def start(self):
env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {lbry.__version__}')
self.log.info(f'supported protocol versions: {min_str}-{max_str}')
self.log.info(f'event loop policy: {env.loop_policy}')
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
await self.daemon.height()
def _start_cancellable(run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
self.db.open_db()
await self.db.initialize_caches()
self.last_state = self.db.read_db_state()
await self.start_prometheus()
if self.env.udp_port:
await self.status_server.start(
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
self.env.host, self.env.udp_port, self.env.allow_lan_udp
)
await _start_cancellable(self.receive_es_notifications)
await _start_cancellable(self.refresh_blocks_forever)
await self.session_manager.search_index.start()
await _start_cancellable(self.session_manager.serve, self.mempool)
async def stop(self):
self.status_server.stop()
for task in reversed(self.cancellable_tasks):
task.cancel()
await asyncio.wait(self.cancellable_tasks)
self.session_manager.search_index.stop()
self.db.close()
if self.prometheus_server:
await self.prometheus_server.stop()
self.prometheus_server = None
self.shutdown_event.set()
await self.daemon.close()
def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start())
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())
executor.shutdown(True)
async def start_prometheus(self):
if not self.prometheus_server and self.env.prometheus_port:
self.prometheus_server = PrometheusServer()
await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)

View file

@ -13,7 +13,6 @@ from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_
from lbry.wallet.server.daemon import Daemon, LBCDaemon from lbry.wallet.server.daemon import Daemon, LBCDaemon
from lbry.wallet.server.script import ScriptPubKey, OpCodes from lbry.wallet.server.script import ScriptPubKey, OpCodes
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
from lbry.wallet.server.block_processor import BlockProcessor
Block = namedtuple("Block", "raw header transactions") Block = namedtuple("Block", "raw header transactions")
@ -37,7 +36,6 @@ class Coin:
SESSIONCLS = LBRYElectrumX SESSIONCLS = LBRYElectrumX
DESERIALIZER = lib_tx.Deserializer DESERIALIZER = lib_tx.Deserializer
DAEMON = Daemon DAEMON = Daemon
BLOCK_PROCESSOR = BlockProcessor
SESSION_MANAGER = LBRYSessionManager SESSION_MANAGER = LBRYSessionManager
HEADER_VALUES = [ HEADER_VALUES = [
'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce' 'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce'

View file

@ -11,14 +11,13 @@ import itertools
import time import time
import attr import attr
import typing import typing
from typing import Set, Optional, Callable, Awaitable
from collections import defaultdict from collections import defaultdict
from prometheus_client import Histogram from prometheus_client import Histogram
from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash from lbry.wallet.server.util import class_logger
from lbry.wallet.server.util import class_logger, chunks
from lbry.wallet.server.leveldb import UTXO
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.wallet.server.session import LBRYSessionManager from lbry.wallet.server.session import LBRYSessionManager
from wallet.server.db.db import LevelDB
@attr.s(slots=True) @attr.s(slots=True)
@ -50,70 +49,99 @@ mempool_process_time_metric = Histogram(
class MemPool: class MemPool:
def __init__(self, coin, daemon, db, state_lock: asyncio.Lock, refresh_secs=1.0, log_status_secs=120.0): def __init__(self, coin, db: 'LevelDB', refresh_secs=1.0):
self.coin = coin self.coin = coin
self._daemon = daemon
self._db = db self._db = db
self._touched_mp = {}
self._touched_bp = {}
self._highest_block = -1
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.txs = {} self.txs = {}
self.hashXs = defaultdict(set) # None can be a key self.raw_mempool = {}
self.cached_compact_histogram = [] self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key
self.refresh_secs = refresh_secs self.refresh_secs = refresh_secs
self.log_status_secs = log_status_secs
# Prevents mempool refreshes during fee histogram calculation
self.lock = state_lock
self.wakeup = asyncio.Event()
self.mempool_process_time_metric = mempool_process_time_metric self.mempool_process_time_metric = mempool_process_time_metric
self.notified_mempool_txs = set() self.session_manager: typing.Optional['LBRYSessionManager'] = None
self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None
async def _logging(self, synchronized_event): async def refresh_hashes(self, height: int):
"""Print regular logs of mempool stats."""
self.logger.info('beginning processing of daemon mempool. '
'This can take some time...')
start = time.perf_counter() start = time.perf_counter()
await synchronized_event.wait() new_touched = await self._process_mempool()
elapsed = time.perf_counter() - start await self.on_mempool(set(self.touched_hashXs), new_touched, height)
self.logger.info(f'synced in {elapsed:.2f}s') duration = time.perf_counter() - start
while True: self.mempool_process_time_metric.observe(duration)
self.logger.info(f'{len(self.txs):,d} txs '
f'touching {len(self.hashXs):,d} addresses')
await asyncio.sleep(self.log_status_secs)
await synchronized_event.wait()
def _accept_transactions(self, tx_map, utxo_map, touched): async def _process_mempool(self) -> typing.Set[bytes]: # returns list of new touched hashXs
"""Accept transactions in tx_map to the mempool if all their inputs # Re-sync with the new set of hashes
can be found in the existing mempool or a utxo_map from the
DB.
Returns an (unprocessed tx_map, unspent utxo_map) pair. # hashXs = self.hashXs # hashX: [tx_hash, ...]
""" touched_hashXs = set()
hashXs = self.hashXs
txs = self.txs
deferred = {} # Remove txs that aren't in mempool anymore
unspent = set(utxo_map) for tx_hash in set(self.txs).difference(self.raw_mempool.keys()):
# Try to find all prevouts so we can accept the TX tx = self.txs.pop(tx_hash)
for hash, tx in tx_map.items(): tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs})
in_pairs = [] for hashX in tx_hashXs:
try: if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]:
for prevout in tx.prevouts: self.touched_hashXs[hashX].remove(tx_hash)
utxo = utxo_map.get(prevout) if not self.touched_hashXs[hashX]:
if not utxo: self.touched_hashXs.pop(hashX)
prev_hash, prev_index = prevout touched_hashXs.update(tx_hashXs)
# Raises KeyError if prev_hash is not in txs
utxo = txs[prev_hash].out_pairs[prev_index] tx_map = {}
in_pairs.append(utxo) for tx_hash, raw_tx in self.raw_mempool.items():
except KeyError: if tx_hash in self.txs:
deferred[hash] = tx
continue continue
tx, tx_size = self.coin.DESERIALIZER(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Drop generation-like inputs from MemPoolTx.prevouts
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs
if not txin.is_generation())
txout_pairs = tuple((self.coin.hashX_from_script(txout.pk_script), txout.value)
for txout in tx.outputs)
# Spend the prevouts tx_map[tx_hash] = MemPoolTx(txin_pairs, None, txout_pairs, 0, tx_size, raw_tx)
unspent.difference_update(tx.prevouts)
# Determine all prevouts not in the mempool, and fetch the
# UTXO information from the database. Failed prevout lookups
# return None - concurrent database updates happen - which is
# relied upon by _accept_transactions. Ignore prevouts that are
# generation-like.
# prevouts = tuple(prevout for tx in tx_map.values()
# for prevout in tx.prevouts
# if prevout[0] not in self.raw_mempool)
# utxos = await self._db.lookup_utxos(prevouts)
# utxo_map = dict(zip(prevouts, utxos))
# unspent = set(utxo_map)
for tx_hash, tx in tx_map.items():
in_pairs = []
for prevout in tx.prevouts:
# utxo = utxo_map.get(prevout)
# if not utxo:
prev_hash, prev_index = prevout
if prev_hash in self.txs: # accepted mempool
utxo = self.txs[prev_hash].out_pairs[prev_index]
elif prev_hash in tx_map: # this set of changes
utxo = tx_map[prev_hash].out_pairs[prev_index]
else: # get it from the db
prev_tx_num = self._db.prefix_db.tx_num.get(prev_hash)
if not prev_tx_num:
continue
prev_tx_num = prev_tx_num.tx_num
hashX_val = self._db.prefix_db.hashX_utxo.get(tx_hash[:4], prev_tx_num, prev_index)
if not hashX_val:
continue
hashX = hashX_val.hashX
utxo_value = self._db.prefix_db.utxo.get(hashX, prev_tx_num, prev_index)
utxo = (hashX, utxo_value.amount)
# if not prev_raw:
# print("derp", prev_hash[::-1].hex())
# print(self._db.get_tx_num(prev_hash))
# prev_tx, prev_tx_size = self.coin.DESERIALIZER(prev_raw.raw_tx).read_tx_and_vsize()
# prev_txo = prev_tx.outputs[prev_index]
# utxo = (self.coin.hashX_from_script(prev_txo.pk_script), prev_txo.value)
in_pairs.append(utxo)
# # Spend the prevouts
# unspent.difference_update(tx.prevouts)
# Save the in_pairs, compute the fee and accept the TX # Save the in_pairs, compute the fee and accept the TX
tx.in_pairs = tuple(in_pairs) tx.in_pairs = tuple(in_pairs)
@ -121,198 +149,26 @@ class MemPool:
# because some in_parts would be missing # because some in_parts would be missing
tx.fee = max(0, (sum(v for _, v in tx.in_pairs) - tx.fee = max(0, (sum(v for _, v in tx.in_pairs) -
sum(v for _, v in tx.out_pairs))) sum(v for _, v in tx.out_pairs)))
txs[hash] = tx self.txs[tx_hash] = tx
# print(f"added {tx_hash[::-1].hex()} reader to mempool")
for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs): for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs):
touched.add(hashX) self.touched_hashXs[hashX].add(tx_hash)
hashXs[hashX].add(hash) touched_hashXs.add(hashX)
# utxo_map = {prevout: utxo_map[prevout] for prevout in unspent}
return deferred, {prevout: utxo_map[prevout] for prevout in unspent} return touched_hashXs
async def _mempool_loop(self, synchronized_event):
try:
return await self._refresh_hashes(synchronized_event)
except asyncio.CancelledError:
raise
except Exception as e:
self.logger.exception("MEMPOOL DIED")
raise e
async def _refresh_hashes(self, synchronized_event):
"""Refresh our view of the daemon's mempool."""
while True:
start = time.perf_counter()
height = self._daemon.cached_height()
hex_hashes = await self._daemon.mempool_hashes()
if height != await self._daemon.height():
continue
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
async with self.lock:
new_hashes = hashes.difference(self.notified_mempool_txs)
touched = await self._process_mempool(hashes)
self.notified_mempool_txs.update(new_hashes)
new_touched = {
touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes)
}
synchronized_event.set()
synchronized_event.clear()
await self.on_mempool(touched, new_touched, height)
duration = time.perf_counter() - start
self.mempool_process_time_metric.observe(duration)
try:
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
except asyncio.TimeoutError:
pass
finally:
self.wakeup.clear()
async def _process_mempool(self, all_hashes):
# Re-sync with the new set of hashes
txs = self.txs
hashXs = self.hashXs # hashX: [tx_hash, ...]
touched = set()
# First handle txs that have disappeared
for tx_hash in set(txs).difference(all_hashes):
tx = txs.pop(tx_hash)
tx_hashXs = {hashX for hashX, value in tx.in_pairs}
tx_hashXs.update(hashX for hashX, value in tx.out_pairs)
for hashX in tx_hashXs:
hashXs[hashX].remove(tx_hash)
if not hashXs[hashX]:
del hashXs[hashX]
touched.update(tx_hashXs)
# Process new transactions
new_hashes = list(all_hashes.difference(txs))
if new_hashes:
fetches = []
for hashes in chunks(new_hashes, 200):
fetches.append(self._fetch_and_accept(hashes, all_hashes, touched))
tx_map = {}
utxo_map = {}
for fetch in asyncio.as_completed(fetches):
deferred, unspent = await fetch
tx_map.update(deferred)
utxo_map.update(unspent)
prior_count = 0
# FIXME: this is not particularly efficient
while tx_map and len(tx_map) != prior_count:
prior_count = len(tx_map)
tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched)
if tx_map:
self.logger.info(f'{len(tx_map)} txs dropped')
return touched
async def _fetch_and_accept(self, hashes, all_hashes, touched):
"""Fetch a list of mempool transactions."""
raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes))
to_hashX = self.coin.hashX_from_script
deserializer = self.coin.DESERIALIZER
tx_map = {}
for hash, raw_tx in zip(hashes, raw_txs):
# The daemon may have evicted the tx from its
# mempool or it may have gotten in a block
if not raw_tx:
continue
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Drop generation-like inputs from MemPoolTx.prevouts
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs
if not txin.is_generation())
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
for txout in tx.outputs)
tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
0, tx_size, raw_tx)
# Determine all prevouts not in the mempool, and fetch the
# UTXO information from the database. Failed prevout lookups
# return None - concurrent database updates happen - which is
# relied upon by _accept_transactions. Ignore prevouts that are
# generation-like.
prevouts = tuple(prevout for tx in tx_map.values()
for prevout in tx.prevouts
if prevout[0] not in all_hashes)
utxos = await self._db.lookup_utxos(prevouts)
utxo_map = dict(zip(prevouts, utxos))
return self._accept_transactions(tx_map, utxo_map, touched)
#
# External interface
#
async def keep_synchronized(self, synchronized_event):
"""Keep the mempool synchronized with the daemon."""
await asyncio.wait([
self._mempool_loop(synchronized_event),
# self._refresh_histogram(synchronized_event),
self._logging(synchronized_event)
])
async def balance_delta(self, hashX):
"""Return the unconfirmed amount in the mempool for hashX.
Can be positive or negative.
"""
value = 0
if hashX in self.hashXs:
for hash in self.hashXs[hashX]:
tx = self.txs[hash]
value -= sum(v for h168, v in tx.in_pairs if h168 == hashX)
value += sum(v for h168, v in tx.out_pairs if h168 == hashX)
return value
def compact_fee_histogram(self):
"""Return a compact fee histogram of the current mempool."""
return self.cached_compact_histogram
async def potential_spends(self, hashX):
"""Return a set of (prev_hash, prev_idx) pairs from mempool
transactions that touch hashX.
None, some or all of these may be spends of the hashX, but all
actual spends of it (in the DB or mempool) will be included.
"""
result = set()
for tx_hash in self.hashXs.get(hashX, ()):
tx = self.txs[tx_hash]
result.update(tx.prevouts)
return result
def transaction_summaries(self, hashX): def transaction_summaries(self, hashX):
"""Return a list of MemPoolTxSummary objects for the hashX.""" """Return a list of MemPoolTxSummary objects for the hashX."""
result = [] result = []
for tx_hash in self.hashXs.get(hashX, ()): for tx_hash in self.touched_hashXs.get(hashX, ()):
tx = self.txs[tx_hash] tx = self.txs[tx_hash]
has_ui = any(hash in self.txs for hash, idx in tx.prevouts) has_ui = any(hash in self.txs for hash, idx in tx.prevouts)
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui)) result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
return result return result
async def unordered_UTXOs(self, hashX): def get_mempool_height(self, tx_hash: bytes) -> int:
"""Return an unordered list of UTXO named tuples from mempool
transactions that pay to hashX.
This does not consider if any other mempool transactions spend
the outputs.
"""
utxos = []
for tx_hash in self.hashXs.get(hashX, ()):
tx = self.txs.get(tx_hash)
for pos, (hX, value) in enumerate(tx.out_pairs):
if hX == hashX:
utxos.append(UTXO(-1, pos, tx_hash, 0, value))
return utxos
def get_mempool_height(self, tx_hash):
# Height Progression # Height Progression
# -2: not broadcast # -2: not broadcast
# -1: in mempool but has unconfirmed inputs # -1: in mempool but has unconfirmed inputs
@ -321,41 +177,57 @@ class MemPool:
if tx_hash not in self.txs: if tx_hash not in self.txs:
return -2 return -2
tx = self.txs[tx_hash] tx = self.txs[tx_hash]
unspent_inputs = sum(1 if hash in self.txs else 0 for hash, idx in tx.prevouts) unspent_inputs = any(hash in self.raw_mempool for hash, idx in tx.prevouts)
if unspent_inputs: if unspent_inputs:
return -1 return -1
return 0 return 0
async def _maybe_notify(self, new_touched):
tmp, tbp = self._touched_mp, self._touched_bp
common = set(tmp).intersection(tbp)
if common:
height = max(common)
elif tmp and max(tmp) == self._highest_block:
height = self._highest_block
else:
# Either we are processing a block and waiting for it to
# come in, or we have not yet had a mempool update for the
# new block height
return
touched = tmp.pop(height)
for old in [h for h in tmp if h <= height]:
del tmp[old]
for old in [h for h in tbp if h <= height]:
touched.update(tbp.pop(old))
# print("notify", height, len(touched), len(new_touched))
await self.notify_sessions(height, touched, new_touched)
async def start(self, height, session_manager: 'LBRYSessionManager'): async def start(self, height, session_manager: 'LBRYSessionManager'):
self._highest_block = height
self.notify_sessions = session_manager._notify_sessions self.notify_sessions = session_manager._notify_sessions
await self.notify_sessions(height, set(), set()) await self._notify_sessions(height, set(), set())
async def on_mempool(self, touched, new_touched, height): async def on_mempool(self, touched, new_touched, height):
self._touched_mp[height] = touched await self._notify_sessions(height, touched, new_touched)
await self._maybe_notify(new_touched)
async def on_block(self, touched, height): async def on_block(self, touched, height):
self._touched_bp[height] = touched await self._notify_sessions(height, touched, set())
self._highest_block = height
await self._maybe_notify(set()) async def _notify_sessions(self, height, touched, new_touched):
"""Notify sessions about height changes and touched addresses."""
height_changed = height != self.session_manager.notified_height
if height_changed:
await self.session_manager._refresh_hsub_results(height)
if not self.session_manager.sessions:
return
if height_changed:
header_tasks = [
session.send_notification('blockchain.headers.subscribe', (self.session_manager.hsub_results[session.subscribe_headers_raw], ))
for session in self.session_manager.sessions.values() if session.subscribe_headers
]
if header_tasks:
self.logger.warning(f'notify {len(header_tasks)} sessions of new header')
asyncio.create_task(asyncio.wait(header_tasks))
for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()):
self.session_manager.mempool_statuses.pop(hashX, None)
# self.bp._chain_executor
await asyncio.get_event_loop().run_in_executor(
None, touched.intersection_update, self.session_manager.hashx_subscriptions_by_session.keys()
)
if touched or new_touched or (height_changed and self.session_manager.mempool_statuses):
notified_hashxs = 0
session_hashxes_to_notify = defaultdict(list)
to_notify = touched if height_changed else new_touched
for hashX in to_notify:
if hashX not in self.session_manager.hashx_subscriptions_by_session:
continue
for session_id in self.session_manager.hashx_subscriptions_by_session[hashX]:
session_hashxes_to_notify[session_id].append(hashX)
notified_hashxs += 1
for session_id, hashXes in session_hashxes_to_notify.items():
asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes))
if session_hashxes_to_notify:
self.logger.warning(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')

View file

@ -1,94 +0,0 @@
import signal
import logging
import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
import typing
import lbry
from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.block_processor import BlockProcessor
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.session import LBRYSessionManager
from lbry.prometheus import PrometheusServer
class Server:
def __init__(self, env):
self.env = env
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
self.shutdown_event = asyncio.Event()
self.cancellable_tasks = []
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
self.db = db = LevelDB(env)
self.bp = bp = BlockProcessor(env, db, daemon, self.shutdown_event)
self.prometheus_server: typing.Optional[PrometheusServer] = None
self.session_mgr = LBRYSessionManager(
env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
self.shutdown_event,
on_available_callback=bp.status_server.set_available,
on_unavailable_callback=bp.status_server.set_unavailable
)
self._indexer_task = None
async def start(self):
env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
self.log.info(f'software version: {lbry.__version__}')
self.log.info(f'supported protocol versions: {min_str}-{max_str}')
self.log.info(f'event loop policy: {env.loop_policy}')
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
await self.daemon.height()
def _start_cancellable(run, *args):
_flag = asyncio.Event()
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
return _flag.wait()
await self.start_prometheus()
if self.env.udp_port:
await self.bp.status_server.start(
0, bytes.fromhex(self.bp.coin.GENESIS_HASH)[::-1], self.env.country,
self.env.host, self.env.udp_port, self.env.allow_lan_udp
)
await _start_cancellable(self.bp.fetch_and_process_blocks)
await self.db.populate_header_merkle_cache()
await _start_cancellable(self.bp.mempool.keep_synchronized)
await _start_cancellable(self.session_mgr.serve, self.bp.mempool)
async def stop(self):
for task in reversed(self.cancellable_tasks):
task.cancel()
await asyncio.wait(self.cancellable_tasks)
if self.prometheus_server:
await self.prometheus_server.stop()
self.prometheus_server = None
self.shutdown_event.set()
await self.daemon.close()
def run(self):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
loop.set_default_executor(executor)
def __exit():
raise SystemExit()
try:
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
loop.run_until_complete(self.start())
loop.run_until_complete(self.shutdown_event.wait())
except (SystemExit, KeyboardInterrupt):
pass
finally:
loop.run_until_complete(self.stop())
executor.shutdown(True)
async def start_prometheus(self):
if not self.prometheus_server and self.env.prometheus_port:
self.prometheus_server = PrometheusServer()
await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)

View file

@ -20,8 +20,7 @@ import lbry
from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
from lbry.schema.result import Outputs from lbry.schema.result import Outputs
from lbry.wallet.server.block_processor import BlockProcessor from lbry.wallet.server.db.db import HubDB
from lbry.wallet.server.leveldb import LevelDB
from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.websocket import AdminWebSocket
from lbry.wallet.rpc.framing import NewlineFramer from lbry.wallet.rpc.framing import NewlineFramer
@ -34,9 +33,12 @@ from lbry.wallet.rpc import (
from lbry.wallet.server import util from lbry.wallet.server import util
from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error
from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.daemon import DaemonError
from lbry.wallet.server.db.elasticsearch import SearchIndex
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
from lbry.wallet.server.daemon import Daemon from lbry.wallet.server.daemon import Daemon
from lbry.wallet.server.mempool import MemPool
BAD_REQUEST = 1 BAD_REQUEST = 1
DAEMON_ERROR = 2 DAEMON_ERROR = 2
@ -170,7 +172,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache, def __init__(self, env: 'Env', db: HubDB, mempool: 'MemPool', history_cache, resolve_cache, resolve_outputs_cache,
daemon: 'Daemon', shutdown_event: asyncio.Event, daemon: 'Daemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
@ -183,7 +185,7 @@ class SessionManager:
self.shutdown_event = shutdown_event self.shutdown_event = shutdown_event
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers: typing.Dict[str, asyncio.AbstractServer] = {} self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
self.sessions: typing.Dict[int, 'SessionBase'] = {} self.sessions: typing.Dict[int, 'LBRYElectrumX'] = {}
self.hashx_subscriptions_by_session: typing.DefaultDict[str, typing.Set[int]] = defaultdict(set) self.hashx_subscriptions_by_session: typing.DefaultDict[str, typing.Set[int]] = defaultdict(set)
self.mempool_statuses = {} self.mempool_statuses = {}
self.cur_group = SessionGroup(0) self.cur_group = SessionGroup(0)
@ -198,8 +200,15 @@ class SessionManager:
self.session_event = Event() self.session_event = Event()
# Search index
self.search_index = SearchIndex(
self.env.es_index_prefix, self.env.database_query_timeout,
elastic_host=env.elastic_host, elastic_port=env.elastic_port
)
async def _start_server(self, kind, *args, **kw_args): async def _start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if kind == 'RPC': if kind == 'RPC':
protocol_class = LocalRPC protocol_class = LocalRPC
else: else:
@ -578,7 +587,7 @@ class SessionManager:
async def raw_header(self, height): async def raw_header(self, height):
"""Return the binary header at the given height.""" """Return the binary header at the given height."""
try: try:
return self.db.raw_header(height) return await self.db.raw_header(height)
except IndexError: except IndexError:
raise RPCError(BAD_REQUEST, f'height {height:,d} ' raise RPCError(BAD_REQUEST, f'height {height:,d} '
'out of range') from None 'out of range') from None
@ -686,12 +695,12 @@ class SessionBase(RPCSession):
request_handlers: typing.Dict[str, typing.Callable] = {} request_handlers: typing.Dict[str, typing.Callable] = {}
version = '0.5.7' version = '0.5.7'
def __init__(self, session_mgr, db, mempool, kind): def __init__(self, session_manager: 'LBRYSessionManager', db: 'LevelDB', mempool: 'MemPool', kind: str):
connection = JSONRPCConnection(JSONRPCAutoDetect) connection = JSONRPCConnection(JSONRPCAutoDetect)
self.env = session_mgr.env self.env = session_manager.env
super().__init__(connection=connection) super().__init__(connection=connection)
self.logger = util.class_logger(__name__, self.__class__.__name__) self.logger = util.class_logger(__name__, self.__class__.__name__)
self.session_mgr = session_mgr self.session_manager = session_manager
self.db = db self.db = db
self.mempool = mempool self.mempool = mempool
self.kind = kind # 'RPC', 'TCP' etc. self.kind = kind # 'RPC', 'TCP' etc.
@ -699,7 +708,7 @@ class SessionBase(RPCSession):
self.anon_logs = self.env.anon_logs self.anon_logs = self.env.anon_logs
self.txs_sent = 0 self.txs_sent = 0
self.log_me = False self.log_me = False
self.daemon_request = self.session_mgr.daemon_request self.daemon_request = self.session_manager.daemon_request
# Hijack the connection so we can log messages # Hijack the connection so we can log messages
self._receive_message_orig = self.connection.receive_message self._receive_message_orig = self.connection.receive_message
self.connection.receive_message = self.receive_message self.connection.receive_message = self.receive_message
@ -729,17 +738,17 @@ class SessionBase(RPCSession):
self.session_id = next(self.session_counter) self.session_id = next(self.session_counter)
context = {'conn_id': f'{self.session_id}'} context = {'conn_id': f'{self.session_id}'}
self.logger = util.ConnectionLogger(self.logger, context) self.logger = util.ConnectionLogger(self.logger, context)
self.group = self.session_mgr.add_session(self) self.group = self.session_manager.add_session(self)
self.session_mgr.session_count_metric.labels(version=self.client_version).inc() self.session_manager.session_count_metric.labels(version=self.client_version).inc()
peer_addr_str = self.peer_address_str() peer_addr_str = self.peer_address_str()
self.logger.info(f'{self.kind} {peer_addr_str}, ' self.logger.info(f'{self.kind} {peer_addr_str}, '
f'{self.session_mgr.session_count():,d} total') f'{self.session_manager.session_count():,d} total')
def connection_lost(self, exc): def connection_lost(self, exc):
"""Handle client disconnection.""" """Handle client disconnection."""
super().connection_lost(exc) super().connection_lost(exc)
self.session_mgr.remove_session(self) self.session_manager.remove_session(self)
self.session_mgr.session_count_metric.labels(version=self.client_version).dec() self.session_manager.session_count_metric.labels(version=self.client_version).dec()
msg = '' msg = ''
if not self._can_send.is_set(): if not self._can_send.is_set():
msg += ' whilst paused' msg += ' whilst paused'
@ -763,7 +772,7 @@ class SessionBase(RPCSession):
"""Handle an incoming request. ElectrumX doesn't receive """Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions. notifications from client sessions.
""" """
self.session_mgr.request_count_metric.labels(method=request.method, version=self.client_version).inc() self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc()
if isinstance(request, Request): if isinstance(request, Request):
handler = self.request_handlers.get(request.method) handler = self.request_handlers.get(request.method)
handler = partial(handler, self) handler = partial(handler, self)
@ -811,7 +820,7 @@ class LBRYElectrumX(SessionBase):
PROTOCOL_MIN = VERSION.PROTOCOL_MIN PROTOCOL_MIN = VERSION.PROTOCOL_MIN
PROTOCOL_MAX = VERSION.PROTOCOL_MAX PROTOCOL_MAX = VERSION.PROTOCOL_MAX
max_errors = math.inf # don't disconnect people for errors! let them happen... max_errors = math.inf # don't disconnect people for errors! let them happen...
session_mgr: LBRYSessionManager session_manager: LBRYSessionManager
version = lbry.__version__ version = lbry.__version__
cached_server_features = {} cached_server_features = {}
@ -822,17 +831,17 @@ class LBRYElectrumX(SessionBase):
'blockchain.block.get_header': cls.block_get_header, 'blockchain.block.get_header': cls.block_get_header,
'blockchain.estimatefee': cls.estimatefee, 'blockchain.estimatefee': cls.estimatefee,
'blockchain.relayfee': cls.relayfee, 'blockchain.relayfee': cls.relayfee,
'blockchain.scripthash.get_balance': cls.scripthash_get_balance, # 'blockchain.scripthash.get_balance': cls.scripthash_get_balance,
'blockchain.scripthash.get_history': cls.scripthash_get_history, 'blockchain.scripthash.get_history': cls.scripthash_get_history,
'blockchain.scripthash.get_mempool': cls.scripthash_get_mempool, 'blockchain.scripthash.get_mempool': cls.scripthash_get_mempool,
'blockchain.scripthash.listunspent': cls.scripthash_listunspent, # 'blockchain.scripthash.listunspent': cls.scripthash_listunspent,
'blockchain.scripthash.subscribe': cls.scripthash_subscribe, 'blockchain.scripthash.subscribe': cls.scripthash_subscribe,
'blockchain.transaction.broadcast': cls.transaction_broadcast, 'blockchain.transaction.broadcast': cls.transaction_broadcast,
'blockchain.transaction.get': cls.transaction_get, 'blockchain.transaction.get': cls.transaction_get,
'blockchain.transaction.get_batch': cls.transaction_get_batch, 'blockchain.transaction.get_batch': cls.transaction_get_batch,
'blockchain.transaction.info': cls.transaction_info, 'blockchain.transaction.info': cls.transaction_info,
'blockchain.transaction.get_merkle': cls.transaction_merkle, 'blockchain.transaction.get_merkle': cls.transaction_merkle,
'server.add_peer': cls.add_peer, # 'server.add_peer': cls.add_peer,
'server.banner': cls.banner, 'server.banner': cls.banner,
'server.payment_address': cls.payment_address, 'server.payment_address': cls.payment_address,
'server.donation_address': cls.donation_address, 'server.donation_address': cls.donation_address,
@ -849,10 +858,10 @@ class LBRYElectrumX(SessionBase):
'blockchain.block.headers': cls.block_headers, 'blockchain.block.headers': cls.block_headers,
'server.ping': cls.ping, 'server.ping': cls.ping,
'blockchain.headers.subscribe': cls.headers_subscribe_False, 'blockchain.headers.subscribe': cls.headers_subscribe_False,
'blockchain.address.get_balance': cls.address_get_balance, # 'blockchain.address.get_balance': cls.address_get_balance,
'blockchain.address.get_history': cls.address_get_history, 'blockchain.address.get_history': cls.address_get_history,
'blockchain.address.get_mempool': cls.address_get_mempool, 'blockchain.address.get_mempool': cls.address_get_mempool,
'blockchain.address.listunspent': cls.address_listunspent, # 'blockchain.address.listunspent': cls.address_listunspent,
'blockchain.address.subscribe': cls.address_subscribe, 'blockchain.address.subscribe': cls.address_subscribe,
'blockchain.address.unsubscribe': cls.address_unsubscribe, 'blockchain.address.unsubscribe': cls.address_unsubscribe,
}) })
@ -871,8 +880,8 @@ class LBRYElectrumX(SessionBase):
self.sv_seen = False self.sv_seen = False
self.protocol_tuple = self.PROTOCOL_MIN self.protocol_tuple = self.PROTOCOL_MIN
self.protocol_string = None self.protocol_string = None
self.daemon = self.session_mgr.daemon self.daemon = self.session_manager.daemon
self.db: LevelDB = self.session_mgr.db self.db: LevelDB = self.session_manager.db
@classmethod @classmethod
def protocol_min_max_strings(cls): def protocol_min_max_strings(cls):
@ -921,7 +930,7 @@ class LBRYElectrumX(SessionBase):
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
start = time.perf_counter() start = time.perf_counter()
db_history = await self.session_mgr.limited_history(hashX) db_history = await self.session_manager.limited_history(hashX)
mempool = self.mempool.transaction_summaries(hashX) mempool = self.mempool.transaction_summaries(hashX)
status = ''.join(f'{hash_to_hex_str(tx_hash)}:' status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
@ -935,24 +944,25 @@ class LBRYElectrumX(SessionBase):
else: else:
status = None status = None
if mempool: if mempool:
self.session_mgr.mempool_statuses[hashX] = status self.session_manager.mempool_statuses[hashX] = status
else: else:
self.session_mgr.mempool_statuses.pop(hashX, None) self.session_manager.mempool_statuses.pop(hashX, None)
self.session_mgr.address_history_metric.observe(time.perf_counter() - start) self.session_manager.address_history_metric.observe(time.perf_counter() - start)
notifications.append((method, (alias, status))) notifications.append((method, (alias, status)))
print(f"notify {alias} {method}")
start = time.perf_counter() start = time.perf_counter()
self.session_mgr.notifications_in_flight_metric.inc() self.session_manager.notifications_in_flight_metric.inc()
for method, args in notifications: for method, args in notifications:
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc() self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
try: try:
await self.send_notifications( await self.send_notifications(
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications]) Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
) )
self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start) self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start)
finally: finally:
self.session_mgr.notifications_in_flight_metric.dec() self.session_manager.notifications_in_flight_metric.dec()
# def get_metrics_or_placeholder_for_api(self, query_name): # def get_metrics_or_placeholder_for_api(self, query_name):
# """ Do not hold on to a reference to the metrics # """ Do not hold on to a reference to the metrics
@ -960,7 +970,7 @@ class LBRYElectrumX(SessionBase):
# you may be working with a stale metrics object. # you may be working with a stale metrics object.
# """ # """
# if self.env.track_metrics: # if self.env.track_metrics:
# # return self.session_mgr.metrics.for_api(query_name) # # return self.session_manager.metrics.for_api(query_name)
# else: # else:
# return APICallMetrics(query_name) # return APICallMetrics(query_name)
@ -970,17 +980,17 @@ class LBRYElectrumX(SessionBase):
# if isinstance(kwargs, dict): # if isinstance(kwargs, dict):
# kwargs['release_time'] = format_release_time(kwargs.get('release_time')) # kwargs['release_time'] = format_release_time(kwargs.get('release_time'))
# try: # try:
# self.session_mgr.pending_query_metric.inc() # self.session_manager.pending_query_metric.inc()
# return await self.db.search_index.session_query(query_name, kwargs) # return await self.db.search_index.session_query(query_name, kwargs)
# except ConnectionTimeout: # except ConnectionTimeout:
# self.session_mgr.interrupt_count_metric.inc() # self.session_manager.interrupt_count_metric.inc()
# raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') # raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
# finally: # finally:
# self.session_mgr.pending_query_metric.dec() # self.session_manager.pending_query_metric.dec()
# self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) # self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
async def mempool_compact_histogram(self): async def mempool_compact_histogram(self):
return self.mempool.compact_fee_histogram() return [] #self.mempool.compact_fee_histogram()
async def claimtrie_search(self, **kwargs): async def claimtrie_search(self, **kwargs):
start = time.perf_counter() start = time.perf_counter()
@ -992,16 +1002,16 @@ class LBRYElectrumX(SessionBase):
except ValueError: except ValueError:
pass pass
try: try:
self.session_mgr.pending_query_metric.inc() self.session_manager.pending_query_metric.inc()
if 'channel' in kwargs: if 'channel' in kwargs:
channel_url = kwargs.pop('channel') channel_url = kwargs.pop('channel')
_, channel_claim, _, _ = await self.db.resolve(channel_url) _, channel_claim, _, _ = await self.db.resolve(channel_url)
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
return Outputs.to_base64([], [], 0, None, None) return Outputs.to_base64([], [], 0, None, None)
kwargs['channel_id'] = channel_claim.claim_hash.hex() kwargs['channel_id'] = channel_claim.claim_hash.hex()
return await self.db.search_index.cached_search(kwargs) return await self.session_manager.search_index.cached_search(kwargs)
except ConnectionTimeout: except ConnectionTimeout:
self.session_mgr.interrupt_count_metric.inc() self.session_manager.interrupt_count_metric.inc()
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
except TooManyClaimSearchParametersError as err: except TooManyClaimSearchParametersError as err:
await asyncio.sleep(2) await asyncio.sleep(2)
@ -1009,25 +1019,25 @@ class LBRYElectrumX(SessionBase):
self.peer_address()[0], err.key, err.limit) self.peer_address()[0], err.key, err.limit)
return RPCError(1, str(err)) return RPCError(1, str(err))
finally: finally:
self.session_mgr.pending_query_metric.dec() self.session_manager.pending_query_metric.dec()
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
async def _cached_resolve_url(self, url): async def _cached_resolve_url(self, url):
if url not in self.session_mgr.resolve_cache: if url not in self.session_manager.resolve_cache:
self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) self.session_manager.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
return self.session_mgr.resolve_cache[url] return self.session_manager.resolve_cache[url]
async def claimtrie_resolve(self, *urls) -> str: async def claimtrie_resolve(self, *urls) -> str:
sorted_urls = tuple(sorted(urls)) sorted_urls = tuple(sorted(urls))
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) self.session_manager.urls_to_resolve_count_metric.inc(len(sorted_urls))
try: try:
if sorted_urls in self.session_mgr.resolve_outputs_cache: if sorted_urls in self.session_manager.resolve_outputs_cache:
return self.session_mgr.resolve_outputs_cache[sorted_urls] return self.session_manager.resolve_outputs_cache[sorted_urls]
rows, extra = [], [] rows, extra = [], []
for url in urls: for url in urls:
if url not in self.session_mgr.resolve_cache: if url not in self.session_manager.resolve_cache:
self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url) self.session_manager.resolve_cache[url] = await self._cached_resolve_url(url)
stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url] stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url]
if isinstance(channel, ResolveCensoredError): if isinstance(channel, ResolveCensoredError):
rows.append(channel) rows.append(channel)
extra.append(channel.censor_row) extra.append(channel.censor_row)
@ -1052,12 +1062,12 @@ class LBRYElectrumX(SessionBase):
if reposted_channel: if reposted_channel:
extra.append(reposted_channel) extra.append(reposted_channel)
await asyncio.sleep(0) await asyncio.sleep(0)
self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
None, Outputs.to_base64, rows, extra, 0, None, None None, Outputs.to_base64, rows, extra, 0, None, None
) )
return result return result
finally: finally:
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) self.session_manager.resolved_url_count_metric.inc(len(sorted_urls))
async def get_server_height(self): async def get_server_height(self):
return self.db.db_height return self.db.db_height
@ -1093,7 +1103,7 @@ class LBRYElectrumX(SessionBase):
async def subscribe_headers_result(self): async def subscribe_headers_result(self):
"""The result of a header subscription or notification.""" """The result of a header subscription or notification."""
return self.session_mgr.hsub_results[self.subscribe_headers_raw] return self.session_manager.hsub_results[self.subscribe_headers_raw]
async def _headers_subscribe(self, raw): async def _headers_subscribe(self, raw):
"""Subscribe to get headers of new blocks.""" """Subscribe to get headers of new blocks."""
@ -1130,7 +1140,7 @@ class LBRYElectrumX(SessionBase):
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
db_history = await self.session_mgr.limited_history(hashX) db_history = await self.session_manager.limited_history(hashX)
mempool = self.mempool.transaction_summaries(hashX) mempool = self.mempool.transaction_summaries(hashX)
status = ''.join(f'{hash_to_hex_str(tx_hash)}:' status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
@ -1145,32 +1155,32 @@ class LBRYElectrumX(SessionBase):
status = None status = None
if mempool: if mempool:
self.session_mgr.mempool_statuses[hashX] = status self.session_manager.mempool_statuses[hashX] = status
else: else:
self.session_mgr.mempool_statuses.pop(hashX, None) self.session_manager.mempool_statuses.pop(hashX, None)
return status return status
async def hashX_listunspent(self, hashX): # async def hashX_listunspent(self, hashX):
"""Return the list of UTXOs of a script hash, including mempool # """Return the list of UTXOs of a script hash, including mempool
effects.""" # effects."""
utxos = await self.db.all_utxos(hashX) # utxos = await self.db.all_utxos(hashX)
utxos = sorted(utxos) # utxos = sorted(utxos)
utxos.extend(await self.mempool.unordered_UTXOs(hashX)) # utxos.extend(await self.mempool.unordered_UTXOs(hashX))
spends = await self.mempool.potential_spends(hashX) # spends = await self.mempool.potential_spends(hashX)
#
return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), # return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos, # 'tx_pos': utxo.tx_pos,
'height': utxo.height, 'value': utxo.value} # 'height': utxo.height, 'value': utxo.value}
for utxo in utxos # for utxo in utxos
if (utxo.tx_hash, utxo.tx_pos) not in spends] # if (utxo.tx_hash, utxo.tx_pos) not in spends]
async def hashX_subscribe(self, hashX, alias): async def hashX_subscribe(self, hashX, alias):
self.hashX_subs[hashX] = alias self.hashX_subs[hashX] = alias
self.session_mgr.hashx_subscriptions_by_session[hashX].add(id(self)) self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self))
return await self.address_status(hashX) return await self.address_status(hashX)
async def hashX_unsubscribe(self, hashX, alias): async def hashX_unsubscribe(self, hashX, alias):
sessions = self.session_mgr.hashx_subscriptions_by_session[hashX] sessions = self.session_manager.hashx_subscriptions_by_session[hashX]
sessions.remove(id(self)) sessions.remove(id(self))
if not sessions: if not sessions:
self.hashX_subs.pop(hashX, None) self.hashX_subs.pop(hashX, None)
@ -1182,10 +1192,10 @@ class LBRYElectrumX(SessionBase):
pass pass
raise RPCError(BAD_REQUEST, f'{address} is not a valid address') raise RPCError(BAD_REQUEST, f'{address} is not a valid address')
async def address_get_balance(self, address): # async def address_get_balance(self, address):
"""Return the confirmed and unconfirmed balance of an address.""" # """Return the confirmed and unconfirmed balance of an address."""
hashX = self.address_to_hashX(address) # hashX = self.address_to_hashX(address)
return await self.get_balance(hashX) # return await self.get_balance(hashX)
async def address_get_history(self, address): async def address_get_history(self, address):
"""Return the confirmed and unconfirmed history of an address.""" """Return the confirmed and unconfirmed history of an address."""
@ -1197,10 +1207,10 @@ class LBRYElectrumX(SessionBase):
hashX = self.address_to_hashX(address) hashX = self.address_to_hashX(address)
return self.unconfirmed_history(hashX) return self.unconfirmed_history(hashX)
async def address_listunspent(self, address): # async def address_listunspent(self, address):
"""Return the list of UTXOs of an address.""" # """Return the list of UTXOs of an address."""
hashX = self.address_to_hashX(address) # hashX = self.address_to_hashX(address)
return await self.hashX_listunspent(hashX) # return await self.hashX_listunspent(hashX)
async def address_subscribe(self, *addresses): async def address_subscribe(self, *addresses):
"""Subscribe to an address. """Subscribe to an address.
@ -1221,16 +1231,16 @@ class LBRYElectrumX(SessionBase):
hashX = self.address_to_hashX(address) hashX = self.address_to_hashX(address)
return await self.hashX_unsubscribe(hashX, address) return await self.hashX_unsubscribe(hashX, address)
async def get_balance(self, hashX): # async def get_balance(self, hashX):
utxos = await self.db.all_utxos(hashX) # utxos = await self.db.all_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos) # confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = await self.mempool.balance_delta(hashX) # unconfirmed = await self.mempool.balance_delta(hashX)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed} # return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
async def scripthash_get_balance(self, scripthash): # async def scripthash_get_balance(self, scripthash):
"""Return the confirmed and unconfirmed balance of a scripthash.""" # """Return the confirmed and unconfirmed balance of a scripthash."""
hashX = scripthash_to_hashX(scripthash) # hashX = scripthash_to_hashX(scripthash)
return await self.get_balance(hashX) # return await self.get_balance(hashX)
def unconfirmed_history(self, hashX): def unconfirmed_history(self, hashX):
# Note unconfirmed history is unordered in electrum-server # Note unconfirmed history is unordered in electrum-server
@ -1242,7 +1252,7 @@ class LBRYElectrumX(SessionBase):
async def confirmed_and_unconfirmed_history(self, hashX): async def confirmed_and_unconfirmed_history(self, hashX):
# Note history is ordered but unconfirmed is unordered in e-s # Note history is ordered but unconfirmed is unordered in e-s
history = await self.session_mgr.limited_history(hashX) history = await self.session_manager.limited_history(hashX)
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
for tx_hash, height in history] for tx_hash, height in history]
return conf + self.unconfirmed_history(hashX) return conf + self.unconfirmed_history(hashX)
@ -1257,10 +1267,10 @@ class LBRYElectrumX(SessionBase):
hashX = scripthash_to_hashX(scripthash) hashX = scripthash_to_hashX(scripthash)
return self.unconfirmed_history(hashX) return self.unconfirmed_history(hashX)
async def scripthash_listunspent(self, scripthash): # async def scripthash_listunspent(self, scripthash):
"""Return the list of UTXOs of a scripthash.""" # """Return the list of UTXOs of a scripthash."""
hashX = scripthash_to_hashX(scripthash) # hashX = scripthash_to_hashX(scripthash)
return await self.hashX_listunspent(hashX) # return await self.hashX_listunspent(hashX)
async def scripthash_subscribe(self, scripthash): async def scripthash_subscribe(self, scripthash):
"""Subscribe to a script hash. """Subscribe to a script hash.
@ -1295,7 +1305,7 @@ class LBRYElectrumX(SessionBase):
max_size = self.MAX_CHUNK_SIZE max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size) count = min(count, max_size)
headers, count = self.db.read_headers(start_height, count) headers, count = await self.db.read_headers(start_height, count)
if b64: if b64:
headers = self.db.encode_headers(start_height, count, headers) headers = self.db.encode_headers(start_height, count, headers)
@ -1318,7 +1328,7 @@ class LBRYElectrumX(SessionBase):
index = non_negative_integer(index) index = non_negative_integer(index)
size = self.coin.CHUNK_SIZE size = self.coin.CHUNK_SIZE
start_height = index * size start_height = index * size
headers, _ = self.db.read_headers(start_height, size) headers, _ = await self.db.read_headers(start_height, size)
return headers.hex() return headers.hex()
async def block_get_header(self, height): async def block_get_header(self, height):
@ -1326,7 +1336,7 @@ class LBRYElectrumX(SessionBase):
height: the header's height""" height: the header's height"""
height = non_negative_integer(height) height = non_negative_integer(height)
return await self.session_mgr.electrum_header(height) return await self.session_manager.electrum_header(height)
def is_tor(self): def is_tor(self):
"""Try to detect if the connection is to a tor hidden service we are """Try to detect if the connection is to a tor hidden service we are
@ -1416,10 +1426,10 @@ class LBRYElectrumX(SessionBase):
self.close_after_send = True self.close_after_send = True
raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}') raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}')
if self.client_version != client_name[:17]: if self.client_version != client_name[:17]:
self.session_mgr.session_count_metric.labels(version=self.client_version).dec() self.session_manager.session_count_metric.labels(version=self.client_version).dec()
self.client_version = client_name[:17] self.client_version = client_name[:17]
self.session_mgr.session_count_metric.labels(version=self.client_version).inc() self.session_manager.session_count_metric.labels(version=self.client_version).inc()
self.session_mgr.client_version_metric.labels(version=self.client_version).inc() self.session_manager.client_version_metric.labels(version=self.client_version).inc()
# Find the highest common protocol version. Disconnect if # Find the highest common protocol version. Disconnect if
# that protocol version in unsupported. # that protocol version in unsupported.
@ -1440,9 +1450,10 @@ class LBRYElectrumX(SessionBase):
raw_tx: the raw transaction as a hexadecimal string""" raw_tx: the raw transaction as a hexadecimal string"""
# This returns errors as JSON RPC errors, as is natural # This returns errors as JSON RPC errors, as is natural
try: try:
hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) hex_hash = await self.session_manager.broadcast_transaction(raw_tx)
self.txs_sent += 1 self.txs_sent += 1
self.mempool.wakeup.set() # self.mempool.wakeup.set()
# await asyncio.sleep(0.5)
self.logger.info(f'sent tx: {hex_hash}') self.logger.info(f'sent tx: {hex_hash}')
return hex_hash return hex_hash
except DaemonError as e: except DaemonError as e:
@ -1456,7 +1467,7 @@ class LBRYElectrumX(SessionBase):
return (await self.transaction_get_batch(tx_hash))[tx_hash] return (await self.transaction_get_batch(tx_hash))[tx_hash]
async def transaction_get_batch(self, *tx_hashes): async def transaction_get_batch(self, *tx_hashes):
self.session_mgr.tx_request_count_metric.inc(len(tx_hashes)) self.session_manager.tx_request_count_metric.inc(len(tx_hashes))
if len(tx_hashes) > 100: if len(tx_hashes) > 100:
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
for tx_hash in tx_hashes: for tx_hash in tx_hashes:
@ -1495,8 +1506,11 @@ class LBRYElectrumX(SessionBase):
'block_height': block_height 'block_height': block_height
} }
await asyncio.sleep(0) # heavy call, give other tasks a chance await asyncio.sleep(0) # heavy call, give other tasks a chance
print("return tx batch")
for tx_hash, (_, info) in batch_result.items():
print(tx_hash, info['block_height'])
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes)) self.session_manager.tx_replied_count_metric.inc(len(tx_hashes))
return batch_result return batch_result
async def transaction_get(self, tx_hash, verbose=False): async def transaction_get(self, tx_hash, verbose=False):