add chain reader and reader server, new mempool, update block processor
This commit is contained in:
parent
e06c8e8303
commit
611ad5c655
6 changed files with 563 additions and 520 deletions
|
@ -92,7 +92,6 @@ class BlockProcessor:
|
|||
)
|
||||
self._chain_executor = ThreadPoolExecutor(1, thread_name_prefix='block-processor')
|
||||
self._sync_reader_executor = ThreadPoolExecutor(1, thread_name_prefix='hub-es-sync')
|
||||
self.mempool = MemPool(env.coin, daemon, db, self.state_lock)
|
||||
self.shutdown_event = asyncio.Event()
|
||||
self.coin = env.coin
|
||||
if env.coin.NET == 'mainnet':
|
||||
|
@ -118,14 +117,13 @@ class BlockProcessor:
|
|||
self.utxo_cache: Dict[Tuple[bytes, int], Tuple[bytes, int]] = {}
|
||||
|
||||
# Claimtrie cache
|
||||
self.db_op_stack: Optional[RevertableOpStack] = None
|
||||
self.db_op_stack: Optional['RevertableOpStack'] = None
|
||||
|
||||
# self.search_cache = {}
|
||||
self.resolve_cache = LRUCache(2**16)
|
||||
self.resolve_outputs_cache = LRUCache(2 ** 16)
|
||||
|
||||
self.history_cache = {}
|
||||
self.status_server = StatusServer()
|
||||
|
||||
#################################
|
||||
# attributes used for calculating stake activations and takeovers per block
|
||||
|
@ -162,7 +160,6 @@ class BlockProcessor:
|
|||
|
||||
self.removed_claims_to_send_es = set() # cumulative changes across blocks to send ES
|
||||
self.touched_claims_to_send_es = set()
|
||||
self.activation_info_to_send_es: DefaultDict[str, List[TrendingNotification]] = defaultdict(list)
|
||||
|
||||
self.removed_claim_hashes: Set[bytes] = set() # per block changes
|
||||
self.touched_claim_hashes: Set[bytes] = set()
|
||||
|
@ -251,29 +248,12 @@ class BlockProcessor:
|
|||
await self.run_in_thread(self.advance_block, block)
|
||||
await self.flush()
|
||||
|
||||
self.logger.info("advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
||||
self.logger.warning("writer advanced to %i in %0.3fs", self.height, time.perf_counter() - start)
|
||||
if self.height == self.coin.nExtendedClaimExpirationForkHeight:
|
||||
self.logger.warning(
|
||||
"applying extended claim expiration fork on claims accepted by, %i", self.height
|
||||
)
|
||||
await self.run_in_thread_with_lock(self.db.apply_expiration_extension_fork)
|
||||
if self.db.first_sync:
|
||||
self.db.search_index.clear_caches()
|
||||
self.touched_claims_to_send_es.clear()
|
||||
self.removed_claims_to_send_es.clear()
|
||||
self.activation_info_to_send_es.clear()
|
||||
# TODO: we shouldnt wait on the search index updating before advancing to the next block
|
||||
if not self.db.first_sync:
|
||||
await self.db.reload_blocking_filtering_streams()
|
||||
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||
await self.db.search_index.apply_filters(self.db.blocked_streams, self.db.blocked_channels,
|
||||
self.db.filtered_streams, self.db.filtered_channels)
|
||||
await self.db.search_index.update_trending_score(self.activation_info_to_send_es)
|
||||
await self._es_caught_up()
|
||||
self.db.search_index.clear_caches()
|
||||
self.touched_claims_to_send_es.clear()
|
||||
self.removed_claims_to_send_es.clear()
|
||||
self.activation_info_to_send_es.clear()
|
||||
# print("******************\n")
|
||||
except:
|
||||
self.logger.exception("advance blocks failed")
|
||||
|
@ -281,12 +261,9 @@ class BlockProcessor:
|
|||
processed_time = time.perf_counter() - total_start
|
||||
self.block_count_metric.set(self.height)
|
||||
self.block_update_time_metric.observe(processed_time)
|
||||
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
|
||||
if not self.db.first_sync:
|
||||
s = '' if len(blocks) == 1 else 's'
|
||||
self.logger.info('processed {:,d} block{} in {:.1f}s'.format(len(blocks), s, processed_time))
|
||||
if self._caught_up_event.is_set():
|
||||
await self.mempool.on_block(self.touched_hashXs, self.height)
|
||||
self.touched_hashXs.clear()
|
||||
elif hprevs[0] != chain[0]:
|
||||
min_start_height = max(self.height - self.coin.REORG_LIMIT, 0)
|
||||
|
@ -309,15 +286,6 @@ class BlockProcessor:
|
|||
|
||||
if self.env.cache_all_claim_txos:
|
||||
await self.db._read_claim_txos() # TODO: don't do this
|
||||
for touched in self.touched_claims_to_send_es:
|
||||
if not self.db.get_claim_txo(touched):
|
||||
self.removed_claims_to_send_es.add(touched)
|
||||
self.touched_claims_to_send_es.difference_update(self.removed_claims_to_send_es)
|
||||
await self.db.search_index.claim_consumer(self.claim_producer())
|
||||
self.db.search_index.clear_caches()
|
||||
self.touched_claims_to_send_es.clear()
|
||||
self.removed_claims_to_send_es.clear()
|
||||
self.activation_info_to_send_es.clear()
|
||||
await self.prefetcher.reset_height(self.height)
|
||||
self.reorg_count_metric.inc()
|
||||
except:
|
||||
|
@ -652,8 +620,6 @@ class BlockProcessor:
|
|||
self.support_txo_to_claim.pop(support_txo_to_clear)
|
||||
self.support_txos_by_claim[claim_hash].clear()
|
||||
self.support_txos_by_claim.pop(claim_hash)
|
||||
if claim_hash.hex() in self.activation_info_to_send_es:
|
||||
self.activation_info_to_send_es.pop(claim_hash.hex())
|
||||
if normalized_name.startswith('@'): # abandon a channel, invalidate signatures
|
||||
self._invalidate_channel_signatures(claim_hash)
|
||||
|
||||
|
@ -1226,10 +1192,6 @@ class BlockProcessor:
|
|||
self.touched_claim_hashes.add(controlling.claim_hash)
|
||||
self.touched_claim_hashes.add(winning)
|
||||
|
||||
def _add_claim_activation_change_notification(self, claim_id: str, height: int, prev_amount: int,
|
||||
new_amount: int):
|
||||
self.activation_info_to_send_es[claim_id].append(TrendingNotification(height, prev_amount, new_amount))
|
||||
|
||||
def _get_cumulative_update_ops(self, height: int):
|
||||
# update the last takeover height for names with takeovers
|
||||
for name in self.taken_over_names:
|
||||
|
@ -1493,7 +1455,6 @@ class BlockProcessor:
|
|||
self.utxo_cache.clear()
|
||||
self.hashXs_by_tx.clear()
|
||||
self.history_cache.clear()
|
||||
self.mempool.notified_mempool_txs.clear()
|
||||
self.removed_claim_hashes.clear()
|
||||
self.touched_claim_hashes.clear()
|
||||
self.pending_reposted.clear()
|
||||
|
@ -1518,8 +1479,8 @@ class BlockProcessor:
|
|||
self.logger.info("backup block %i", self.height)
|
||||
# Check and update self.tip
|
||||
|
||||
self.db.headers.pop()
|
||||
self.db.tx_counts.pop()
|
||||
reverted_block_hash = self.coin.header_hash(self.db.headers.pop())
|
||||
self.tip = self.coin.header_hash(self.db.headers[-1])
|
||||
if self.env.cache_all_tx_hashes:
|
||||
while len(self.db.total_transactions) > self.db.tx_counts[-1]:
|
||||
|
@ -1613,21 +1574,32 @@ class BlockProcessor:
|
|||
self.touched_hashXs.add(hashX)
|
||||
return hashX
|
||||
|
||||
async def _process_prefetched_blocks(self):
|
||||
async def process_blocks_and_mempool_forever(self):
|
||||
"""Loop forever processing blocks as they arrive."""
|
||||
while True:
|
||||
if self.height == self.daemon.cached_height():
|
||||
if not self._caught_up_event.is_set():
|
||||
await self._first_caught_up()
|
||||
self._caught_up_event.set()
|
||||
await self.blocks_event.wait()
|
||||
try:
|
||||
await asyncio.wait_for(self.blocks_event.wait(), 0.25)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
self.blocks_event.clear()
|
||||
blocks = self.prefetcher.get_prefetched_blocks()
|
||||
try:
|
||||
await self.check_and_advance_blocks(blocks)
|
||||
except Exception:
|
||||
self.logger.exception("error while processing txs")
|
||||
raise
|
||||
if not blocks:
|
||||
try:
|
||||
await self.check_mempool()
|
||||
self.db.prefix_db.unsafe_commit()
|
||||
except Exception:
|
||||
self.logger.exception("error while updating mempool txs")
|
||||
raise
|
||||
else:
|
||||
try:
|
||||
await self.check_and_advance_blocks(blocks)
|
||||
except Exception:
|
||||
self.logger.exception("error while processing txs")
|
||||
raise
|
||||
|
||||
async def _es_caught_up(self):
|
||||
self.db.es_sync_height = self.height
|
||||
|
@ -1659,6 +1631,13 @@ class BlockProcessor:
|
|||
f'height {self.height:,d}, halting here.')
|
||||
self.shutdown_event.set()
|
||||
|
||||
async def open(self):
|
||||
self.db.open_db()
|
||||
self.height = self.db.db_height
|
||||
self.tip = self.db.db_tip
|
||||
self.tx_count = self.db.db_tx_count
|
||||
await self.db.initialize_caches()
|
||||
|
||||
async def fetch_and_process_blocks(self, caught_up_event):
|
||||
"""Fetch, process and index blocks from the daemon.
|
||||
|
||||
|
@ -1674,16 +1653,9 @@ class BlockProcessor:
|
|||
|
||||
self._caught_up_event = caught_up_event
|
||||
try:
|
||||
self.db.open_db()
|
||||
self.height = self.db.db_height
|
||||
self.tip = self.db.db_tip
|
||||
self.tx_count = self.db.db_tx_count
|
||||
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
|
||||
await self.db.initialize_caches()
|
||||
await self.db.search_index.start()
|
||||
await asyncio.wait([
|
||||
self.prefetcher.main_loop(self.height),
|
||||
self._process_prefetched_blocks()
|
||||
self.process_blocks_and_mempool_forever()
|
||||
])
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
|
@ -1691,9 +1663,47 @@ class BlockProcessor:
|
|||
self.logger.exception("Block processing failed!")
|
||||
raise
|
||||
finally:
|
||||
self.status_server.stop()
|
||||
# Shut down block processing
|
||||
self.logger.info('closing the DB for a clean shutdown...')
|
||||
self._sync_reader_executor.shutdown(wait=True)
|
||||
self._chain_executor.shutdown(wait=True)
|
||||
self.db.close()
|
||||
|
||||
async def start(self):
|
||||
env = self.env
|
||||
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
|
||||
self.logger.info(f'software version: {lbry.__version__}')
|
||||
self.logger.info(f'supported protocol versions: {min_str}-{max_str}')
|
||||
self.logger.info(f'event loop policy: {env.loop_policy}')
|
||||
self.logger.info(f'reorg limit is {env.reorg_limit:,d} blocks')
|
||||
|
||||
await self.daemon.height()
|
||||
|
||||
def _start_cancellable(run, *args):
|
||||
_flag = asyncio.Event()
|
||||
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
|
||||
return _flag.wait()
|
||||
|
||||
await _start_cancellable(self.fetch_and_process_blocks)
|
||||
|
||||
async def stop(self):
|
||||
for task in reversed(self.cancellable_tasks):
|
||||
task.cancel()
|
||||
await asyncio.wait(self.cancellable_tasks)
|
||||
self.shutdown_event.set()
|
||||
await self.daemon.close()
|
||||
|
||||
def run(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
def __exit():
|
||||
raise SystemExit()
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||
loop.run_until_complete(self.start())
|
||||
loop.run_until_complete(self.shutdown_event.wait())
|
||||
except (SystemExit, KeyboardInterrupt):
|
||||
pass
|
||||
finally:
|
||||
loop.run_until_complete(self.stop())
|
||||
|
|
243
lbry/wallet/server/chain_reader.py
Normal file
243
lbry/wallet/server/chain_reader.py
Normal file
|
@ -0,0 +1,243 @@
|
|||
import signal
|
||||
import logging
|
||||
import asyncio
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
import typing
|
||||
|
||||
import lbry
|
||||
from lbry.wallet.server.mempool import MemPool
|
||||
from lbry.wallet.server.db.prefixes import DBState
|
||||
from lbry.wallet.server.udp import StatusServer
|
||||
from lbry.wallet.server.db.db import HubDB
|
||||
from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierClientProtocol
|
||||
from lbry.wallet.server.session import LBRYSessionManager
|
||||
from lbry.prometheus import PrometheusServer
|
||||
|
||||
|
||||
class BlockchainReader:
|
||||
def __init__(self, env, secondary_name: str):
|
||||
self.env = env
|
||||
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
||||
self.shutdown_event = asyncio.Event()
|
||||
self.cancellable_tasks = []
|
||||
|
||||
self.db = HubDB(
|
||||
env.coin, env.db_dir, env.cache_MB, env.reorg_limit, env.cache_all_claim_txos, env.cache_all_tx_hashes,
|
||||
secondary_name=secondary_name, max_open_files=-1
|
||||
)
|
||||
self.last_state: typing.Optional[DBState] = None
|
||||
self._refresh_interval = 0.1
|
||||
|
||||
def _detect_changes(self):
|
||||
try:
|
||||
self.db.prefix_db.try_catch_up_with_primary()
|
||||
except:
|
||||
self.log.exception('failed to update secondary db')
|
||||
raise
|
||||
state = self.db.prefix_db.db_state.get()
|
||||
if not state or state.height <= 0:
|
||||
return
|
||||
# if state and self.last_state and self.db.headers and self.last_state.tip == self.db.coin.header_hash(self.db.headers[-1]):
|
||||
# return
|
||||
if self.last_state and self.last_state.height > state.height: # FIXME: derp
|
||||
self.log.debug("reorg detected, waiting until the writer has flushed the new blocks to advance")
|
||||
return
|
||||
last_height = 0 if not self.last_state else self.last_state.height
|
||||
if self.last_state:
|
||||
while True:
|
||||
if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False):
|
||||
self.log.debug("connects to block %i", last_height)
|
||||
break
|
||||
else:
|
||||
self.log.warning("disconnect block %i", last_height)
|
||||
self.unwind()
|
||||
last_height -= 1
|
||||
self.db.read_db_state()
|
||||
if not self.last_state or self.last_state.height < state.height:
|
||||
for height in range(last_height + 1, state.height + 1):
|
||||
self.log.warning("advancing to %i", height)
|
||||
self.advance(height)
|
||||
self.clear_caches()
|
||||
self.last_state = state
|
||||
|
||||
# elif self.last_state and self.last_state.height > state.height:
|
||||
# last_height = self.last_state.height
|
||||
# for height in range(last_height, state.height, -1):
|
||||
# self.log.warning("unwind %i", height)
|
||||
# self.unwind()
|
||||
# self.clear_caches()
|
||||
# self.last_state = state
|
||||
# self.log.warning("unwound to %i", self.last_state.height)
|
||||
|
||||
# print("reader rewound to ", self.last_state.height)
|
||||
|
||||
async def poll_for_changes(self):
|
||||
await asyncio.get_event_loop().run_in_executor(None, self._detect_changes)
|
||||
|
||||
async def refresh_blocks_forever(self, synchronized: asyncio.Event):
|
||||
self.log.warning("start refresh blocks forever")
|
||||
while True:
|
||||
try:
|
||||
await self.poll_for_changes()
|
||||
except:
|
||||
self.log.exception("boom")
|
||||
raise
|
||||
await asyncio.sleep(self._refresh_interval)
|
||||
synchronized.set()
|
||||
|
||||
def clear_caches(self):
|
||||
pass
|
||||
|
||||
def advance(self, height: int):
|
||||
tx_count = self.db.prefix_db.tx_count.get(height).tx_count
|
||||
assert tx_count not in self.db.tx_counts, f'boom {tx_count} in {len(self.db.tx_counts)} tx counts'
|
||||
assert len(self.db.tx_counts) == height, f"{len(self.db.tx_counts)} != {height}"
|
||||
self.db.tx_counts.append(tx_count)
|
||||
self.db.headers.append(self.db.prefix_db.header.get(height, deserialize_value=False))
|
||||
|
||||
def unwind(self):
|
||||
self.db.tx_counts.pop()
|
||||
self.db.headers.pop()
|
||||
|
||||
|
||||
class BlockchainReaderServer(BlockchainReader):
|
||||
def __init__(self, env):
|
||||
super().__init__(env, 'lbry-reader')
|
||||
self.history_cache = {}
|
||||
self.resolve_outputs_cache = {}
|
||||
self.resolve_cache = {}
|
||||
self.notifications_to_send = []
|
||||
self.status_server = StatusServer()
|
||||
self.daemon = env.coin.DAEMON(env.coin, env.daemon_url) # only needed for broadcasting txs
|
||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||
self.mempool = MemPool(self.env.coin, self.db)
|
||||
self.session_manager = LBRYSessionManager(
|
||||
env, self.db, self.mempool, self.history_cache, self.resolve_cache,
|
||||
self.resolve_outputs_cache, self.daemon,
|
||||
self.shutdown_event,
|
||||
on_available_callback=self.status_server.set_available,
|
||||
on_unavailable_callback=self.status_server.set_unavailable
|
||||
)
|
||||
self.mempool.session_manager = self.session_manager
|
||||
self.es_notifications = asyncio.Queue()
|
||||
self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications)
|
||||
self.synchronized = asyncio.Event()
|
||||
self._es_height = None
|
||||
|
||||
def clear_caches(self):
|
||||
self.history_cache.clear()
|
||||
self.resolve_outputs_cache.clear()
|
||||
self.resolve_cache.clear()
|
||||
# self.clear_search_cache()
|
||||
# self.mempool.notified_mempool_txs.clear()
|
||||
|
||||
def clear_search_cache(self):
|
||||
self.session_manager.search_index.clear_caches()
|
||||
|
||||
def advance(self, height: int):
|
||||
super().advance(height)
|
||||
touched_hashXs = self.db.prefix_db.touched_hashX.get(height).touched_hashXs
|
||||
self.notifications_to_send.append((set(touched_hashXs), height))
|
||||
|
||||
def _detect_changes(self):
|
||||
super()._detect_changes()
|
||||
self.mempool.raw_mempool.clear()
|
||||
self.mempool.raw_mempool.update(
|
||||
{k.tx_hash: v.raw_tx for k, v in self.db.prefix_db.mempool_tx.iterate()}
|
||||
)
|
||||
|
||||
async def poll_for_changes(self):
|
||||
await super().poll_for_changes()
|
||||
self.status_server.set_height(self.db.fs_height, self.db.db_tip)
|
||||
if self.notifications_to_send:
|
||||
for (touched, height) in self.notifications_to_send:
|
||||
await self.mempool.on_block(touched, height)
|
||||
self.log.warning("reader advanced to %i", height)
|
||||
if self._es_height == self.db.db_height:
|
||||
self.synchronized.set()
|
||||
# print("reader notified")
|
||||
await self.mempool.refresh_hashes(self.db.db_height)
|
||||
self.notifications_to_send.clear()
|
||||
|
||||
async def receive_es_notifications(self, synchronized: asyncio.Event):
|
||||
await asyncio.get_event_loop().create_connection(
|
||||
lambda: self.es_notification_client, '127.0.0.1', self.env.elastic_notifier_port
|
||||
)
|
||||
synchronized.set()
|
||||
try:
|
||||
while True:
|
||||
self._es_height = await self.es_notifications.get()
|
||||
self.clear_search_cache()
|
||||
if self._es_height == self.db.db_height:
|
||||
self.synchronized.set()
|
||||
self.log.warning("es and reader are in sync")
|
||||
else:
|
||||
self.log.warning("es and reader are not yet in sync %s vs %s", self._es_height, self.db.db_height)
|
||||
finally:
|
||||
self.es_notification_client.close()
|
||||
|
||||
async def start(self):
|
||||
env = self.env
|
||||
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
|
||||
self.log.info(f'software version: {lbry.__version__}')
|
||||
self.log.info(f'supported protocol versions: {min_str}-{max_str}')
|
||||
self.log.info(f'event loop policy: {env.loop_policy}')
|
||||
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
|
||||
await self.daemon.height()
|
||||
|
||||
def _start_cancellable(run, *args):
|
||||
_flag = asyncio.Event()
|
||||
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
|
||||
return _flag.wait()
|
||||
|
||||
self.db.open_db()
|
||||
await self.db.initialize_caches()
|
||||
|
||||
self.last_state = self.db.read_db_state()
|
||||
|
||||
await self.start_prometheus()
|
||||
if self.env.udp_port:
|
||||
await self.status_server.start(
|
||||
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
|
||||
self.env.host, self.env.udp_port, self.env.allow_lan_udp
|
||||
)
|
||||
await _start_cancellable(self.receive_es_notifications)
|
||||
await _start_cancellable(self.refresh_blocks_forever)
|
||||
await self.session_manager.search_index.start()
|
||||
await _start_cancellable(self.session_manager.serve, self.mempool)
|
||||
|
||||
async def stop(self):
|
||||
self.status_server.stop()
|
||||
for task in reversed(self.cancellable_tasks):
|
||||
task.cancel()
|
||||
await asyncio.wait(self.cancellable_tasks)
|
||||
self.session_manager.search_index.stop()
|
||||
self.db.close()
|
||||
if self.prometheus_server:
|
||||
await self.prometheus_server.stop()
|
||||
self.prometheus_server = None
|
||||
self.shutdown_event.set()
|
||||
await self.daemon.close()
|
||||
|
||||
def run(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
|
||||
loop.set_default_executor(executor)
|
||||
|
||||
def __exit():
|
||||
raise SystemExit()
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||
loop.run_until_complete(self.start())
|
||||
loop.run_until_complete(self.shutdown_event.wait())
|
||||
except (SystemExit, KeyboardInterrupt):
|
||||
pass
|
||||
finally:
|
||||
loop.run_until_complete(self.stop())
|
||||
executor.shutdown(True)
|
||||
|
||||
async def start_prometheus(self):
|
||||
if not self.prometheus_server and self.env.prometheus_port:
|
||||
self.prometheus_server = PrometheusServer()
|
||||
await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)
|
|
@ -13,7 +13,6 @@ from lbry.wallet.server.hash import Base58, hash160, double_sha256, hash_to_hex_
|
|||
from lbry.wallet.server.daemon import Daemon, LBCDaemon
|
||||
from lbry.wallet.server.script import ScriptPubKey, OpCodes
|
||||
from lbry.wallet.server.session import LBRYElectrumX, LBRYSessionManager
|
||||
from lbry.wallet.server.block_processor import BlockProcessor
|
||||
|
||||
|
||||
Block = namedtuple("Block", "raw header transactions")
|
||||
|
@ -37,7 +36,6 @@ class Coin:
|
|||
SESSIONCLS = LBRYElectrumX
|
||||
DESERIALIZER = lib_tx.Deserializer
|
||||
DAEMON = Daemon
|
||||
BLOCK_PROCESSOR = BlockProcessor
|
||||
SESSION_MANAGER = LBRYSessionManager
|
||||
HEADER_VALUES = [
|
||||
'version', 'prev_block_hash', 'merkle_root', 'timestamp', 'bits', 'nonce'
|
||||
|
|
|
@ -11,14 +11,13 @@ import itertools
|
|||
import time
|
||||
import attr
|
||||
import typing
|
||||
from typing import Set, Optional, Callable, Awaitable
|
||||
from collections import defaultdict
|
||||
from prometheus_client import Histogram
|
||||
from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash
|
||||
from lbry.wallet.server.util import class_logger, chunks
|
||||
from lbry.wallet.server.leveldb import UTXO
|
||||
from lbry.wallet.server.util import class_logger
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.wallet.server.session import LBRYSessionManager
|
||||
from wallet.server.db.db import LevelDB
|
||||
|
||||
|
||||
@attr.s(slots=True)
|
||||
|
@ -50,70 +49,99 @@ mempool_process_time_metric = Histogram(
|
|||
|
||||
|
||||
class MemPool:
|
||||
def __init__(self, coin, daemon, db, state_lock: asyncio.Lock, refresh_secs=1.0, log_status_secs=120.0):
|
||||
def __init__(self, coin, db: 'LevelDB', refresh_secs=1.0):
|
||||
self.coin = coin
|
||||
self._daemon = daemon
|
||||
self._db = db
|
||||
self._touched_mp = {}
|
||||
self._touched_bp = {}
|
||||
self._highest_block = -1
|
||||
|
||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||
self.txs = {}
|
||||
self.hashXs = defaultdict(set) # None can be a key
|
||||
self.cached_compact_histogram = []
|
||||
self.raw_mempool = {}
|
||||
self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key
|
||||
self.refresh_secs = refresh_secs
|
||||
self.log_status_secs = log_status_secs
|
||||
# Prevents mempool refreshes during fee histogram calculation
|
||||
self.lock = state_lock
|
||||
self.wakeup = asyncio.Event()
|
||||
self.mempool_process_time_metric = mempool_process_time_metric
|
||||
self.notified_mempool_txs = set()
|
||||
self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None
|
||||
self.session_manager: typing.Optional['LBRYSessionManager'] = None
|
||||
|
||||
async def _logging(self, synchronized_event):
|
||||
"""Print regular logs of mempool stats."""
|
||||
self.logger.info('beginning processing of daemon mempool. '
|
||||
'This can take some time...')
|
||||
async def refresh_hashes(self, height: int):
|
||||
start = time.perf_counter()
|
||||
await synchronized_event.wait()
|
||||
elapsed = time.perf_counter() - start
|
||||
self.logger.info(f'synced in {elapsed:.2f}s')
|
||||
while True:
|
||||
self.logger.info(f'{len(self.txs):,d} txs '
|
||||
f'touching {len(self.hashXs):,d} addresses')
|
||||
await asyncio.sleep(self.log_status_secs)
|
||||
await synchronized_event.wait()
|
||||
new_touched = await self._process_mempool()
|
||||
await self.on_mempool(set(self.touched_hashXs), new_touched, height)
|
||||
duration = time.perf_counter() - start
|
||||
self.mempool_process_time_metric.observe(duration)
|
||||
|
||||
def _accept_transactions(self, tx_map, utxo_map, touched):
|
||||
"""Accept transactions in tx_map to the mempool if all their inputs
|
||||
can be found in the existing mempool or a utxo_map from the
|
||||
DB.
|
||||
async def _process_mempool(self) -> typing.Set[bytes]: # returns list of new touched hashXs
|
||||
# Re-sync with the new set of hashes
|
||||
|
||||
Returns an (unprocessed tx_map, unspent utxo_map) pair.
|
||||
"""
|
||||
hashXs = self.hashXs
|
||||
txs = self.txs
|
||||
# hashXs = self.hashXs # hashX: [tx_hash, ...]
|
||||
touched_hashXs = set()
|
||||
|
||||
deferred = {}
|
||||
unspent = set(utxo_map)
|
||||
# Try to find all prevouts so we can accept the TX
|
||||
for hash, tx in tx_map.items():
|
||||
in_pairs = []
|
||||
try:
|
||||
for prevout in tx.prevouts:
|
||||
utxo = utxo_map.get(prevout)
|
||||
if not utxo:
|
||||
prev_hash, prev_index = prevout
|
||||
# Raises KeyError if prev_hash is not in txs
|
||||
utxo = txs[prev_hash].out_pairs[prev_index]
|
||||
in_pairs.append(utxo)
|
||||
except KeyError:
|
||||
deferred[hash] = tx
|
||||
# Remove txs that aren't in mempool anymore
|
||||
for tx_hash in set(self.txs).difference(self.raw_mempool.keys()):
|
||||
tx = self.txs.pop(tx_hash)
|
||||
tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs})
|
||||
for hashX in tx_hashXs:
|
||||
if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]:
|
||||
self.touched_hashXs[hashX].remove(tx_hash)
|
||||
if not self.touched_hashXs[hashX]:
|
||||
self.touched_hashXs.pop(hashX)
|
||||
touched_hashXs.update(tx_hashXs)
|
||||
|
||||
tx_map = {}
|
||||
for tx_hash, raw_tx in self.raw_mempool.items():
|
||||
if tx_hash in self.txs:
|
||||
continue
|
||||
tx, tx_size = self.coin.DESERIALIZER(raw_tx).read_tx_and_vsize()
|
||||
# Convert the inputs and outputs into (hashX, value) pairs
|
||||
# Drop generation-like inputs from MemPoolTx.prevouts
|
||||
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
|
||||
for txin in tx.inputs
|
||||
if not txin.is_generation())
|
||||
txout_pairs = tuple((self.coin.hashX_from_script(txout.pk_script), txout.value)
|
||||
for txout in tx.outputs)
|
||||
|
||||
# Spend the prevouts
|
||||
unspent.difference_update(tx.prevouts)
|
||||
tx_map[tx_hash] = MemPoolTx(txin_pairs, None, txout_pairs, 0, tx_size, raw_tx)
|
||||
|
||||
# Determine all prevouts not in the mempool, and fetch the
|
||||
# UTXO information from the database. Failed prevout lookups
|
||||
# return None - concurrent database updates happen - which is
|
||||
# relied upon by _accept_transactions. Ignore prevouts that are
|
||||
# generation-like.
|
||||
# prevouts = tuple(prevout for tx in tx_map.values()
|
||||
# for prevout in tx.prevouts
|
||||
# if prevout[0] not in self.raw_mempool)
|
||||
# utxos = await self._db.lookup_utxos(prevouts)
|
||||
# utxo_map = dict(zip(prevouts, utxos))
|
||||
# unspent = set(utxo_map)
|
||||
|
||||
for tx_hash, tx in tx_map.items():
|
||||
in_pairs = []
|
||||
for prevout in tx.prevouts:
|
||||
# utxo = utxo_map.get(prevout)
|
||||
# if not utxo:
|
||||
prev_hash, prev_index = prevout
|
||||
if prev_hash in self.txs: # accepted mempool
|
||||
utxo = self.txs[prev_hash].out_pairs[prev_index]
|
||||
elif prev_hash in tx_map: # this set of changes
|
||||
utxo = tx_map[prev_hash].out_pairs[prev_index]
|
||||
else: # get it from the db
|
||||
prev_tx_num = self._db.prefix_db.tx_num.get(prev_hash)
|
||||
if not prev_tx_num:
|
||||
continue
|
||||
prev_tx_num = prev_tx_num.tx_num
|
||||
hashX_val = self._db.prefix_db.hashX_utxo.get(tx_hash[:4], prev_tx_num, prev_index)
|
||||
if not hashX_val:
|
||||
continue
|
||||
hashX = hashX_val.hashX
|
||||
utxo_value = self._db.prefix_db.utxo.get(hashX, prev_tx_num, prev_index)
|
||||
utxo = (hashX, utxo_value.amount)
|
||||
# if not prev_raw:
|
||||
# print("derp", prev_hash[::-1].hex())
|
||||
# print(self._db.get_tx_num(prev_hash))
|
||||
# prev_tx, prev_tx_size = self.coin.DESERIALIZER(prev_raw.raw_tx).read_tx_and_vsize()
|
||||
# prev_txo = prev_tx.outputs[prev_index]
|
||||
# utxo = (self.coin.hashX_from_script(prev_txo.pk_script), prev_txo.value)
|
||||
in_pairs.append(utxo)
|
||||
|
||||
# # Spend the prevouts
|
||||
# unspent.difference_update(tx.prevouts)
|
||||
|
||||
# Save the in_pairs, compute the fee and accept the TX
|
||||
tx.in_pairs = tuple(in_pairs)
|
||||
|
@ -121,198 +149,26 @@ class MemPool:
|
|||
# because some in_parts would be missing
|
||||
tx.fee = max(0, (sum(v for _, v in tx.in_pairs) -
|
||||
sum(v for _, v in tx.out_pairs)))
|
||||
txs[hash] = tx
|
||||
self.txs[tx_hash] = tx
|
||||
# print(f"added {tx_hash[::-1].hex()} reader to mempool")
|
||||
|
||||
for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs):
|
||||
touched.add(hashX)
|
||||
hashXs[hashX].add(hash)
|
||||
self.touched_hashXs[hashX].add(tx_hash)
|
||||
touched_hashXs.add(hashX)
|
||||
# utxo_map = {prevout: utxo_map[prevout] for prevout in unspent}
|
||||
|
||||
return deferred, {prevout: utxo_map[prevout] for prevout in unspent}
|
||||
|
||||
async def _mempool_loop(self, synchronized_event):
|
||||
try:
|
||||
return await self._refresh_hashes(synchronized_event)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
self.logger.exception("MEMPOOL DIED")
|
||||
raise e
|
||||
|
||||
async def _refresh_hashes(self, synchronized_event):
|
||||
"""Refresh our view of the daemon's mempool."""
|
||||
while True:
|
||||
start = time.perf_counter()
|
||||
height = self._daemon.cached_height()
|
||||
hex_hashes = await self._daemon.mempool_hashes()
|
||||
if height != await self._daemon.height():
|
||||
continue
|
||||
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
|
||||
async with self.lock:
|
||||
new_hashes = hashes.difference(self.notified_mempool_txs)
|
||||
touched = await self._process_mempool(hashes)
|
||||
self.notified_mempool_txs.update(new_hashes)
|
||||
new_touched = {
|
||||
touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes)
|
||||
}
|
||||
synchronized_event.set()
|
||||
synchronized_event.clear()
|
||||
await self.on_mempool(touched, new_touched, height)
|
||||
duration = time.perf_counter() - start
|
||||
self.mempool_process_time_metric.observe(duration)
|
||||
try:
|
||||
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
|
||||
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
finally:
|
||||
self.wakeup.clear()
|
||||
|
||||
async def _process_mempool(self, all_hashes):
|
||||
# Re-sync with the new set of hashes
|
||||
txs = self.txs
|
||||
|
||||
hashXs = self.hashXs # hashX: [tx_hash, ...]
|
||||
touched = set()
|
||||
|
||||
# First handle txs that have disappeared
|
||||
for tx_hash in set(txs).difference(all_hashes):
|
||||
tx = txs.pop(tx_hash)
|
||||
tx_hashXs = {hashX for hashX, value in tx.in_pairs}
|
||||
tx_hashXs.update(hashX for hashX, value in tx.out_pairs)
|
||||
for hashX in tx_hashXs:
|
||||
hashXs[hashX].remove(tx_hash)
|
||||
if not hashXs[hashX]:
|
||||
del hashXs[hashX]
|
||||
touched.update(tx_hashXs)
|
||||
|
||||
# Process new transactions
|
||||
new_hashes = list(all_hashes.difference(txs))
|
||||
if new_hashes:
|
||||
fetches = []
|
||||
for hashes in chunks(new_hashes, 200):
|
||||
fetches.append(self._fetch_and_accept(hashes, all_hashes, touched))
|
||||
tx_map = {}
|
||||
utxo_map = {}
|
||||
for fetch in asyncio.as_completed(fetches):
|
||||
deferred, unspent = await fetch
|
||||
tx_map.update(deferred)
|
||||
utxo_map.update(unspent)
|
||||
|
||||
prior_count = 0
|
||||
# FIXME: this is not particularly efficient
|
||||
while tx_map and len(tx_map) != prior_count:
|
||||
prior_count = len(tx_map)
|
||||
tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched)
|
||||
|
||||
if tx_map:
|
||||
self.logger.info(f'{len(tx_map)} txs dropped')
|
||||
|
||||
return touched
|
||||
|
||||
async def _fetch_and_accept(self, hashes, all_hashes, touched):
|
||||
"""Fetch a list of mempool transactions."""
|
||||
raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes))
|
||||
|
||||
to_hashX = self.coin.hashX_from_script
|
||||
deserializer = self.coin.DESERIALIZER
|
||||
|
||||
tx_map = {}
|
||||
for hash, raw_tx in zip(hashes, raw_txs):
|
||||
# The daemon may have evicted the tx from its
|
||||
# mempool or it may have gotten in a block
|
||||
if not raw_tx:
|
||||
continue
|
||||
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
|
||||
# Convert the inputs and outputs into (hashX, value) pairs
|
||||
# Drop generation-like inputs from MemPoolTx.prevouts
|
||||
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
|
||||
for txin in tx.inputs
|
||||
if not txin.is_generation())
|
||||
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
|
||||
for txout in tx.outputs)
|
||||
tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
|
||||
0, tx_size, raw_tx)
|
||||
|
||||
# Determine all prevouts not in the mempool, and fetch the
|
||||
# UTXO information from the database. Failed prevout lookups
|
||||
# return None - concurrent database updates happen - which is
|
||||
# relied upon by _accept_transactions. Ignore prevouts that are
|
||||
# generation-like.
|
||||
prevouts = tuple(prevout for tx in tx_map.values()
|
||||
for prevout in tx.prevouts
|
||||
if prevout[0] not in all_hashes)
|
||||
utxos = await self._db.lookup_utxos(prevouts)
|
||||
utxo_map = dict(zip(prevouts, utxos))
|
||||
|
||||
return self._accept_transactions(tx_map, utxo_map, touched)
|
||||
|
||||
#
|
||||
# External interface
|
||||
#
|
||||
|
||||
async def keep_synchronized(self, synchronized_event):
|
||||
"""Keep the mempool synchronized with the daemon."""
|
||||
await asyncio.wait([
|
||||
self._mempool_loop(synchronized_event),
|
||||
# self._refresh_histogram(synchronized_event),
|
||||
self._logging(synchronized_event)
|
||||
])
|
||||
|
||||
async def balance_delta(self, hashX):
|
||||
"""Return the unconfirmed amount in the mempool for hashX.
|
||||
|
||||
Can be positive or negative.
|
||||
"""
|
||||
value = 0
|
||||
if hashX in self.hashXs:
|
||||
for hash in self.hashXs[hashX]:
|
||||
tx = self.txs[hash]
|
||||
value -= sum(v for h168, v in tx.in_pairs if h168 == hashX)
|
||||
value += sum(v for h168, v in tx.out_pairs if h168 == hashX)
|
||||
return value
|
||||
|
||||
def compact_fee_histogram(self):
|
||||
"""Return a compact fee histogram of the current mempool."""
|
||||
return self.cached_compact_histogram
|
||||
|
||||
async def potential_spends(self, hashX):
|
||||
"""Return a set of (prev_hash, prev_idx) pairs from mempool
|
||||
transactions that touch hashX.
|
||||
|
||||
None, some or all of these may be spends of the hashX, but all
|
||||
actual spends of it (in the DB or mempool) will be included.
|
||||
"""
|
||||
result = set()
|
||||
for tx_hash in self.hashXs.get(hashX, ()):
|
||||
tx = self.txs[tx_hash]
|
||||
result.update(tx.prevouts)
|
||||
return result
|
||||
return touched_hashXs
|
||||
|
||||
def transaction_summaries(self, hashX):
|
||||
"""Return a list of MemPoolTxSummary objects for the hashX."""
|
||||
result = []
|
||||
for tx_hash in self.hashXs.get(hashX, ()):
|
||||
for tx_hash in self.touched_hashXs.get(hashX, ()):
|
||||
tx = self.txs[tx_hash]
|
||||
has_ui = any(hash in self.txs for hash, idx in tx.prevouts)
|
||||
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
|
||||
return result
|
||||
|
||||
async def unordered_UTXOs(self, hashX):
|
||||
"""Return an unordered list of UTXO named tuples from mempool
|
||||
transactions that pay to hashX.
|
||||
|
||||
This does not consider if any other mempool transactions spend
|
||||
the outputs.
|
||||
"""
|
||||
utxos = []
|
||||
for tx_hash in self.hashXs.get(hashX, ()):
|
||||
tx = self.txs.get(tx_hash)
|
||||
for pos, (hX, value) in enumerate(tx.out_pairs):
|
||||
if hX == hashX:
|
||||
utxos.append(UTXO(-1, pos, tx_hash, 0, value))
|
||||
return utxos
|
||||
|
||||
def get_mempool_height(self, tx_hash):
|
||||
def get_mempool_height(self, tx_hash: bytes) -> int:
|
||||
# Height Progression
|
||||
# -2: not broadcast
|
||||
# -1: in mempool but has unconfirmed inputs
|
||||
|
@ -321,41 +177,57 @@ class MemPool:
|
|||
if tx_hash not in self.txs:
|
||||
return -2
|
||||
tx = self.txs[tx_hash]
|
||||
unspent_inputs = sum(1 if hash in self.txs else 0 for hash, idx in tx.prevouts)
|
||||
unspent_inputs = any(hash in self.raw_mempool for hash, idx in tx.prevouts)
|
||||
if unspent_inputs:
|
||||
return -1
|
||||
return 0
|
||||
|
||||
async def _maybe_notify(self, new_touched):
|
||||
tmp, tbp = self._touched_mp, self._touched_bp
|
||||
common = set(tmp).intersection(tbp)
|
||||
if common:
|
||||
height = max(common)
|
||||
elif tmp and max(tmp) == self._highest_block:
|
||||
height = self._highest_block
|
||||
else:
|
||||
# Either we are processing a block and waiting for it to
|
||||
# come in, or we have not yet had a mempool update for the
|
||||
# new block height
|
||||
return
|
||||
touched = tmp.pop(height)
|
||||
for old in [h for h in tmp if h <= height]:
|
||||
del tmp[old]
|
||||
for old in [h for h in tbp if h <= height]:
|
||||
touched.update(tbp.pop(old))
|
||||
# print("notify", height, len(touched), len(new_touched))
|
||||
await self.notify_sessions(height, touched, new_touched)
|
||||
|
||||
async def start(self, height, session_manager: 'LBRYSessionManager'):
|
||||
self._highest_block = height
|
||||
self.notify_sessions = session_manager._notify_sessions
|
||||
await self.notify_sessions(height, set(), set())
|
||||
await self._notify_sessions(height, set(), set())
|
||||
|
||||
async def on_mempool(self, touched, new_touched, height):
|
||||
self._touched_mp[height] = touched
|
||||
await self._maybe_notify(new_touched)
|
||||
await self._notify_sessions(height, touched, new_touched)
|
||||
|
||||
async def on_block(self, touched, height):
|
||||
self._touched_bp[height] = touched
|
||||
self._highest_block = height
|
||||
await self._maybe_notify(set())
|
||||
await self._notify_sessions(height, touched, set())
|
||||
|
||||
async def _notify_sessions(self, height, touched, new_touched):
|
||||
"""Notify sessions about height changes and touched addresses."""
|
||||
height_changed = height != self.session_manager.notified_height
|
||||
if height_changed:
|
||||
await self.session_manager._refresh_hsub_results(height)
|
||||
|
||||
if not self.session_manager.sessions:
|
||||
return
|
||||
|
||||
if height_changed:
|
||||
header_tasks = [
|
||||
session.send_notification('blockchain.headers.subscribe', (self.session_manager.hsub_results[session.subscribe_headers_raw], ))
|
||||
for session in self.session_manager.sessions.values() if session.subscribe_headers
|
||||
]
|
||||
if header_tasks:
|
||||
self.logger.warning(f'notify {len(header_tasks)} sessions of new header')
|
||||
asyncio.create_task(asyncio.wait(header_tasks))
|
||||
for hashX in touched.intersection(self.session_manager.mempool_statuses.keys()):
|
||||
self.session_manager.mempool_statuses.pop(hashX, None)
|
||||
# self.bp._chain_executor
|
||||
await asyncio.get_event_loop().run_in_executor(
|
||||
None, touched.intersection_update, self.session_manager.hashx_subscriptions_by_session.keys()
|
||||
)
|
||||
|
||||
if touched or new_touched or (height_changed and self.session_manager.mempool_statuses):
|
||||
notified_hashxs = 0
|
||||
session_hashxes_to_notify = defaultdict(list)
|
||||
to_notify = touched if height_changed else new_touched
|
||||
|
||||
for hashX in to_notify:
|
||||
if hashX not in self.session_manager.hashx_subscriptions_by_session:
|
||||
continue
|
||||
for session_id in self.session_manager.hashx_subscriptions_by_session[hashX]:
|
||||
session_hashxes_to_notify[session_id].append(hashX)
|
||||
notified_hashxs += 1
|
||||
for session_id, hashXes in session_hashxes_to_notify.items():
|
||||
asyncio.create_task(self.session_manager.sessions[session_id].send_history_notifications(*hashXes))
|
||||
if session_hashxes_to_notify:
|
||||
self.logger.warning(f'notified {len(session_hashxes_to_notify)} sessions/{notified_hashxs:,d} touched addresses')
|
||||
|
|
|
@ -1,94 +0,0 @@
|
|||
import signal
|
||||
import logging
|
||||
import asyncio
|
||||
from concurrent.futures.thread import ThreadPoolExecutor
|
||||
import typing
|
||||
|
||||
import lbry
|
||||
from lbry.wallet.server.mempool import MemPool
|
||||
from lbry.wallet.server.block_processor import BlockProcessor
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
from lbry.wallet.server.session import LBRYSessionManager
|
||||
from lbry.prometheus import PrometheusServer
|
||||
|
||||
|
||||
class Server:
|
||||
|
||||
def __init__(self, env):
|
||||
self.env = env
|
||||
self.log = logging.getLogger(__name__).getChild(self.__class__.__name__)
|
||||
self.shutdown_event = asyncio.Event()
|
||||
self.cancellable_tasks = []
|
||||
|
||||
self.daemon = daemon = env.coin.DAEMON(env.coin, env.daemon_url)
|
||||
self.db = db = LevelDB(env)
|
||||
self.bp = bp = BlockProcessor(env, db, daemon, self.shutdown_event)
|
||||
self.prometheus_server: typing.Optional[PrometheusServer] = None
|
||||
|
||||
self.session_mgr = LBRYSessionManager(
|
||||
env, db, bp.mempool, bp.history_cache, bp.resolve_cache, bp.resolve_outputs_cache, daemon,
|
||||
self.shutdown_event,
|
||||
on_available_callback=bp.status_server.set_available,
|
||||
on_unavailable_callback=bp.status_server.set_unavailable
|
||||
)
|
||||
self._indexer_task = None
|
||||
|
||||
async def start(self):
|
||||
env = self.env
|
||||
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()
|
||||
self.log.info(f'software version: {lbry.__version__}')
|
||||
self.log.info(f'supported protocol versions: {min_str}-{max_str}')
|
||||
self.log.info(f'event loop policy: {env.loop_policy}')
|
||||
self.log.info(f'reorg limit is {env.reorg_limit:,d} blocks')
|
||||
|
||||
await self.daemon.height()
|
||||
|
||||
def _start_cancellable(run, *args):
|
||||
_flag = asyncio.Event()
|
||||
self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag)))
|
||||
return _flag.wait()
|
||||
|
||||
await self.start_prometheus()
|
||||
if self.env.udp_port:
|
||||
await self.bp.status_server.start(
|
||||
0, bytes.fromhex(self.bp.coin.GENESIS_HASH)[::-1], self.env.country,
|
||||
self.env.host, self.env.udp_port, self.env.allow_lan_udp
|
||||
)
|
||||
await _start_cancellable(self.bp.fetch_and_process_blocks)
|
||||
|
||||
await self.db.populate_header_merkle_cache()
|
||||
await _start_cancellable(self.bp.mempool.keep_synchronized)
|
||||
await _start_cancellable(self.session_mgr.serve, self.bp.mempool)
|
||||
|
||||
async def stop(self):
|
||||
for task in reversed(self.cancellable_tasks):
|
||||
task.cancel()
|
||||
await asyncio.wait(self.cancellable_tasks)
|
||||
if self.prometheus_server:
|
||||
await self.prometheus_server.stop()
|
||||
self.prometheus_server = None
|
||||
self.shutdown_event.set()
|
||||
await self.daemon.close()
|
||||
|
||||
def run(self):
|
||||
loop = asyncio.get_event_loop()
|
||||
executor = ThreadPoolExecutor(self.env.max_query_workers, thread_name_prefix='hub-worker')
|
||||
loop.set_default_executor(executor)
|
||||
|
||||
def __exit():
|
||||
raise SystemExit()
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, __exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, __exit)
|
||||
loop.run_until_complete(self.start())
|
||||
loop.run_until_complete(self.shutdown_event.wait())
|
||||
except (SystemExit, KeyboardInterrupt):
|
||||
pass
|
||||
finally:
|
||||
loop.run_until_complete(self.stop())
|
||||
executor.shutdown(True)
|
||||
|
||||
async def start_prometheus(self):
|
||||
if not self.prometheus_server and self.env.prometheus_port:
|
||||
self.prometheus_server = PrometheusServer()
|
||||
await self.prometheus_server.start("0.0.0.0", self.env.prometheus_port)
|
|
@ -20,8 +20,7 @@ import lbry
|
|||
from lbry.error import ResolveCensoredError, TooManyClaimSearchParametersError
|
||||
from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG
|
||||
from lbry.schema.result import Outputs
|
||||
from lbry.wallet.server.block_processor import BlockProcessor
|
||||
from lbry.wallet.server.leveldb import LevelDB
|
||||
from lbry.wallet.server.db.db import HubDB
|
||||
from lbry.wallet.server.websocket import AdminWebSocket
|
||||
from lbry.wallet.rpc.framing import NewlineFramer
|
||||
|
||||
|
@ -34,9 +33,12 @@ from lbry.wallet.rpc import (
|
|||
from lbry.wallet.server import util
|
||||
from lbry.wallet.server.hash import sha256, hash_to_hex_str, hex_str_to_hash, HASHX_LEN, Base58Error
|
||||
from lbry.wallet.server.daemon import DaemonError
|
||||
from lbry.wallet.server.db.elasticsearch import SearchIndex
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from lbry.wallet.server.env import Env
|
||||
from lbry.wallet.server.daemon import Daemon
|
||||
from lbry.wallet.server.mempool import MemPool
|
||||
|
||||
BAD_REQUEST = 1
|
||||
DAEMON_ERROR = 2
|
||||
|
@ -170,7 +172,7 @@ class SessionManager:
|
|||
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
||||
)
|
||||
|
||||
def __init__(self, env: 'Env', db: LevelDB, mempool, history_cache, resolve_cache, resolve_outputs_cache,
|
||||
def __init__(self, env: 'Env', db: HubDB, mempool: 'MemPool', history_cache, resolve_cache, resolve_outputs_cache,
|
||||
daemon: 'Daemon', shutdown_event: asyncio.Event,
|
||||
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
|
||||
env.max_send = max(350000, env.max_send)
|
||||
|
@ -183,7 +185,7 @@ class SessionManager:
|
|||
self.shutdown_event = shutdown_event
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
self.servers: typing.Dict[str, asyncio.AbstractServer] = {}
|
||||
self.sessions: typing.Dict[int, 'SessionBase'] = {}
|
||||
self.sessions: typing.Dict[int, 'LBRYElectrumX'] = {}
|
||||
self.hashx_subscriptions_by_session: typing.DefaultDict[str, typing.Set[int]] = defaultdict(set)
|
||||
self.mempool_statuses = {}
|
||||
self.cur_group = SessionGroup(0)
|
||||
|
@ -198,8 +200,15 @@ class SessionManager:
|
|||
|
||||
self.session_event = Event()
|
||||
|
||||
# Search index
|
||||
self.search_index = SearchIndex(
|
||||
self.env.es_index_prefix, self.env.database_query_timeout,
|
||||
elastic_host=env.elastic_host, elastic_port=env.elastic_port
|
||||
)
|
||||
|
||||
async def _start_server(self, kind, *args, **kw_args):
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
if kind == 'RPC':
|
||||
protocol_class = LocalRPC
|
||||
else:
|
||||
|
@ -578,7 +587,7 @@ class SessionManager:
|
|||
async def raw_header(self, height):
|
||||
"""Return the binary header at the given height."""
|
||||
try:
|
||||
return self.db.raw_header(height)
|
||||
return await self.db.raw_header(height)
|
||||
except IndexError:
|
||||
raise RPCError(BAD_REQUEST, f'height {height:,d} '
|
||||
'out of range') from None
|
||||
|
@ -686,12 +695,12 @@ class SessionBase(RPCSession):
|
|||
request_handlers: typing.Dict[str, typing.Callable] = {}
|
||||
version = '0.5.7'
|
||||
|
||||
def __init__(self, session_mgr, db, mempool, kind):
|
||||
def __init__(self, session_manager: 'LBRYSessionManager', db: 'LevelDB', mempool: 'MemPool', kind: str):
|
||||
connection = JSONRPCConnection(JSONRPCAutoDetect)
|
||||
self.env = session_mgr.env
|
||||
self.env = session_manager.env
|
||||
super().__init__(connection=connection)
|
||||
self.logger = util.class_logger(__name__, self.__class__.__name__)
|
||||
self.session_mgr = session_mgr
|
||||
self.session_manager = session_manager
|
||||
self.db = db
|
||||
self.mempool = mempool
|
||||
self.kind = kind # 'RPC', 'TCP' etc.
|
||||
|
@ -699,7 +708,7 @@ class SessionBase(RPCSession):
|
|||
self.anon_logs = self.env.anon_logs
|
||||
self.txs_sent = 0
|
||||
self.log_me = False
|
||||
self.daemon_request = self.session_mgr.daemon_request
|
||||
self.daemon_request = self.session_manager.daemon_request
|
||||
# Hijack the connection so we can log messages
|
||||
self._receive_message_orig = self.connection.receive_message
|
||||
self.connection.receive_message = self.receive_message
|
||||
|
@ -729,17 +738,17 @@ class SessionBase(RPCSession):
|
|||
self.session_id = next(self.session_counter)
|
||||
context = {'conn_id': f'{self.session_id}'}
|
||||
self.logger = util.ConnectionLogger(self.logger, context)
|
||||
self.group = self.session_mgr.add_session(self)
|
||||
self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
|
||||
self.group = self.session_manager.add_session(self)
|
||||
self.session_manager.session_count_metric.labels(version=self.client_version).inc()
|
||||
peer_addr_str = self.peer_address_str()
|
||||
self.logger.info(f'{self.kind} {peer_addr_str}, '
|
||||
f'{self.session_mgr.session_count():,d} total')
|
||||
f'{self.session_manager.session_count():,d} total')
|
||||
|
||||
def connection_lost(self, exc):
|
||||
"""Handle client disconnection."""
|
||||
super().connection_lost(exc)
|
||||
self.session_mgr.remove_session(self)
|
||||
self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
|
||||
self.session_manager.remove_session(self)
|
||||
self.session_manager.session_count_metric.labels(version=self.client_version).dec()
|
||||
msg = ''
|
||||
if not self._can_send.is_set():
|
||||
msg += ' whilst paused'
|
||||
|
@ -763,7 +772,7 @@ class SessionBase(RPCSession):
|
|||
"""Handle an incoming request. ElectrumX doesn't receive
|
||||
notifications from client sessions.
|
||||
"""
|
||||
self.session_mgr.request_count_metric.labels(method=request.method, version=self.client_version).inc()
|
||||
self.session_manager.request_count_metric.labels(method=request.method, version=self.client_version).inc()
|
||||
if isinstance(request, Request):
|
||||
handler = self.request_handlers.get(request.method)
|
||||
handler = partial(handler, self)
|
||||
|
@ -811,7 +820,7 @@ class LBRYElectrumX(SessionBase):
|
|||
PROTOCOL_MIN = VERSION.PROTOCOL_MIN
|
||||
PROTOCOL_MAX = VERSION.PROTOCOL_MAX
|
||||
max_errors = math.inf # don't disconnect people for errors! let them happen...
|
||||
session_mgr: LBRYSessionManager
|
||||
session_manager: LBRYSessionManager
|
||||
version = lbry.__version__
|
||||
cached_server_features = {}
|
||||
|
||||
|
@ -822,17 +831,17 @@ class LBRYElectrumX(SessionBase):
|
|||
'blockchain.block.get_header': cls.block_get_header,
|
||||
'blockchain.estimatefee': cls.estimatefee,
|
||||
'blockchain.relayfee': cls.relayfee,
|
||||
'blockchain.scripthash.get_balance': cls.scripthash_get_balance,
|
||||
# 'blockchain.scripthash.get_balance': cls.scripthash_get_balance,
|
||||
'blockchain.scripthash.get_history': cls.scripthash_get_history,
|
||||
'blockchain.scripthash.get_mempool': cls.scripthash_get_mempool,
|
||||
'blockchain.scripthash.listunspent': cls.scripthash_listunspent,
|
||||
# 'blockchain.scripthash.listunspent': cls.scripthash_listunspent,
|
||||
'blockchain.scripthash.subscribe': cls.scripthash_subscribe,
|
||||
'blockchain.transaction.broadcast': cls.transaction_broadcast,
|
||||
'blockchain.transaction.get': cls.transaction_get,
|
||||
'blockchain.transaction.get_batch': cls.transaction_get_batch,
|
||||
'blockchain.transaction.info': cls.transaction_info,
|
||||
'blockchain.transaction.get_merkle': cls.transaction_merkle,
|
||||
'server.add_peer': cls.add_peer,
|
||||
# 'server.add_peer': cls.add_peer,
|
||||
'server.banner': cls.banner,
|
||||
'server.payment_address': cls.payment_address,
|
||||
'server.donation_address': cls.donation_address,
|
||||
|
@ -849,10 +858,10 @@ class LBRYElectrumX(SessionBase):
|
|||
'blockchain.block.headers': cls.block_headers,
|
||||
'server.ping': cls.ping,
|
||||
'blockchain.headers.subscribe': cls.headers_subscribe_False,
|
||||
'blockchain.address.get_balance': cls.address_get_balance,
|
||||
# 'blockchain.address.get_balance': cls.address_get_balance,
|
||||
'blockchain.address.get_history': cls.address_get_history,
|
||||
'blockchain.address.get_mempool': cls.address_get_mempool,
|
||||
'blockchain.address.listunspent': cls.address_listunspent,
|
||||
# 'blockchain.address.listunspent': cls.address_listunspent,
|
||||
'blockchain.address.subscribe': cls.address_subscribe,
|
||||
'blockchain.address.unsubscribe': cls.address_unsubscribe,
|
||||
})
|
||||
|
@ -871,8 +880,8 @@ class LBRYElectrumX(SessionBase):
|
|||
self.sv_seen = False
|
||||
self.protocol_tuple = self.PROTOCOL_MIN
|
||||
self.protocol_string = None
|
||||
self.daemon = self.session_mgr.daemon
|
||||
self.db: LevelDB = self.session_mgr.db
|
||||
self.daemon = self.session_manager.daemon
|
||||
self.db: LevelDB = self.session_manager.db
|
||||
|
||||
@classmethod
|
||||
def protocol_min_max_strings(cls):
|
||||
|
@ -921,7 +930,7 @@ class LBRYElectrumX(SessionBase):
|
|||
else:
|
||||
method = 'blockchain.address.subscribe'
|
||||
start = time.perf_counter()
|
||||
db_history = await self.session_mgr.limited_history(hashX)
|
||||
db_history = await self.session_manager.limited_history(hashX)
|
||||
mempool = self.mempool.transaction_summaries(hashX)
|
||||
|
||||
status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
|
||||
|
@ -935,24 +944,25 @@ class LBRYElectrumX(SessionBase):
|
|||
else:
|
||||
status = None
|
||||
if mempool:
|
||||
self.session_mgr.mempool_statuses[hashX] = status
|
||||
self.session_manager.mempool_statuses[hashX] = status
|
||||
else:
|
||||
self.session_mgr.mempool_statuses.pop(hashX, None)
|
||||
self.session_manager.mempool_statuses.pop(hashX, None)
|
||||
|
||||
self.session_mgr.address_history_metric.observe(time.perf_counter() - start)
|
||||
self.session_manager.address_history_metric.observe(time.perf_counter() - start)
|
||||
notifications.append((method, (alias, status)))
|
||||
print(f"notify {alias} {method}")
|
||||
|
||||
start = time.perf_counter()
|
||||
self.session_mgr.notifications_in_flight_metric.inc()
|
||||
self.session_manager.notifications_in_flight_metric.inc()
|
||||
for method, args in notifications:
|
||||
self.NOTIFICATION_COUNT.labels(method=method, version=self.client_version).inc()
|
||||
try:
|
||||
await self.send_notifications(
|
||||
Batch([Notification(method, (alias, status)) for (method, (alias, status)) in notifications])
|
||||
)
|
||||
self.session_mgr.notifications_sent_metric.observe(time.perf_counter() - start)
|
||||
self.session_manager.notifications_sent_metric.observe(time.perf_counter() - start)
|
||||
finally:
|
||||
self.session_mgr.notifications_in_flight_metric.dec()
|
||||
self.session_manager.notifications_in_flight_metric.dec()
|
||||
|
||||
# def get_metrics_or_placeholder_for_api(self, query_name):
|
||||
# """ Do not hold on to a reference to the metrics
|
||||
|
@ -960,7 +970,7 @@ class LBRYElectrumX(SessionBase):
|
|||
# you may be working with a stale metrics object.
|
||||
# """
|
||||
# if self.env.track_metrics:
|
||||
# # return self.session_mgr.metrics.for_api(query_name)
|
||||
# # return self.session_manager.metrics.for_api(query_name)
|
||||
# else:
|
||||
# return APICallMetrics(query_name)
|
||||
|
||||
|
@ -970,17 +980,17 @@ class LBRYElectrumX(SessionBase):
|
|||
# if isinstance(kwargs, dict):
|
||||
# kwargs['release_time'] = format_release_time(kwargs.get('release_time'))
|
||||
# try:
|
||||
# self.session_mgr.pending_query_metric.inc()
|
||||
# self.session_manager.pending_query_metric.inc()
|
||||
# return await self.db.search_index.session_query(query_name, kwargs)
|
||||
# except ConnectionTimeout:
|
||||
# self.session_mgr.interrupt_count_metric.inc()
|
||||
# self.session_manager.interrupt_count_metric.inc()
|
||||
# raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
||||
# finally:
|
||||
# self.session_mgr.pending_query_metric.dec()
|
||||
# self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
# self.session_manager.pending_query_metric.dec()
|
||||
# self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
async def mempool_compact_histogram(self):
|
||||
return self.mempool.compact_fee_histogram()
|
||||
return [] #self.mempool.compact_fee_histogram()
|
||||
|
||||
async def claimtrie_search(self, **kwargs):
|
||||
start = time.perf_counter()
|
||||
|
@ -992,16 +1002,16 @@ class LBRYElectrumX(SessionBase):
|
|||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
self.session_mgr.pending_query_metric.inc()
|
||||
self.session_manager.pending_query_metric.inc()
|
||||
if 'channel' in kwargs:
|
||||
channel_url = kwargs.pop('channel')
|
||||
_, channel_claim, _, _ = await self.db.resolve(channel_url)
|
||||
if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)):
|
||||
return Outputs.to_base64([], [], 0, None, None)
|
||||
kwargs['channel_id'] = channel_claim.claim_hash.hex()
|
||||
return await self.db.search_index.cached_search(kwargs)
|
||||
return await self.session_manager.search_index.cached_search(kwargs)
|
||||
except ConnectionTimeout:
|
||||
self.session_mgr.interrupt_count_metric.inc()
|
||||
self.session_manager.interrupt_count_metric.inc()
|
||||
raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out')
|
||||
except TooManyClaimSearchParametersError as err:
|
||||
await asyncio.sleep(2)
|
||||
|
@ -1009,25 +1019,25 @@ class LBRYElectrumX(SessionBase):
|
|||
self.peer_address()[0], err.key, err.limit)
|
||||
return RPCError(1, str(err))
|
||||
finally:
|
||||
self.session_mgr.pending_query_metric.dec()
|
||||
self.session_mgr.executor_time_metric.observe(time.perf_counter() - start)
|
||||
self.session_manager.pending_query_metric.dec()
|
||||
self.session_manager.executor_time_metric.observe(time.perf_counter() - start)
|
||||
|
||||
async def _cached_resolve_url(self, url):
|
||||
if url not in self.session_mgr.resolve_cache:
|
||||
self.session_mgr.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.session_mgr.resolve_cache[url]
|
||||
if url not in self.session_manager.resolve_cache:
|
||||
self.session_manager.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url)
|
||||
return self.session_manager.resolve_cache[url]
|
||||
|
||||
async def claimtrie_resolve(self, *urls) -> str:
|
||||
sorted_urls = tuple(sorted(urls))
|
||||
self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||
self.session_manager.urls_to_resolve_count_metric.inc(len(sorted_urls))
|
||||
try:
|
||||
if sorted_urls in self.session_mgr.resolve_outputs_cache:
|
||||
return self.session_mgr.resolve_outputs_cache[sorted_urls]
|
||||
if sorted_urls in self.session_manager.resolve_outputs_cache:
|
||||
return self.session_manager.resolve_outputs_cache[sorted_urls]
|
||||
rows, extra = [], []
|
||||
for url in urls:
|
||||
if url not in self.session_mgr.resolve_cache:
|
||||
self.session_mgr.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.session_mgr.resolve_cache[url]
|
||||
if url not in self.session_manager.resolve_cache:
|
||||
self.session_manager.resolve_cache[url] = await self._cached_resolve_url(url)
|
||||
stream, channel, repost, reposted_channel = self.session_manager.resolve_cache[url]
|
||||
if isinstance(channel, ResolveCensoredError):
|
||||
rows.append(channel)
|
||||
extra.append(channel.censor_row)
|
||||
|
@ -1052,12 +1062,12 @@ class LBRYElectrumX(SessionBase):
|
|||
if reposted_channel:
|
||||
extra.append(reposted_channel)
|
||||
await asyncio.sleep(0)
|
||||
self.session_mgr.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
self.session_manager.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor(
|
||||
None, Outputs.to_base64, rows, extra, 0, None, None
|
||||
)
|
||||
return result
|
||||
finally:
|
||||
self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls))
|
||||
self.session_manager.resolved_url_count_metric.inc(len(sorted_urls))
|
||||
|
||||
async def get_server_height(self):
|
||||
return self.db.db_height
|
||||
|
@ -1093,7 +1103,7 @@ class LBRYElectrumX(SessionBase):
|
|||
|
||||
async def subscribe_headers_result(self):
|
||||
"""The result of a header subscription or notification."""
|
||||
return self.session_mgr.hsub_results[self.subscribe_headers_raw]
|
||||
return self.session_manager.hsub_results[self.subscribe_headers_raw]
|
||||
|
||||
async def _headers_subscribe(self, raw):
|
||||
"""Subscribe to get headers of new blocks."""
|
||||
|
@ -1130,7 +1140,7 @@ class LBRYElectrumX(SessionBase):
|
|||
# Note history is ordered and mempool unordered in electrum-server
|
||||
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
|
||||
|
||||
db_history = await self.session_mgr.limited_history(hashX)
|
||||
db_history = await self.session_manager.limited_history(hashX)
|
||||
mempool = self.mempool.transaction_summaries(hashX)
|
||||
|
||||
status = ''.join(f'{hash_to_hex_str(tx_hash)}:'
|
||||
|
@ -1145,32 +1155,32 @@ class LBRYElectrumX(SessionBase):
|
|||
status = None
|
||||
|
||||
if mempool:
|
||||
self.session_mgr.mempool_statuses[hashX] = status
|
||||
self.session_manager.mempool_statuses[hashX] = status
|
||||
else:
|
||||
self.session_mgr.mempool_statuses.pop(hashX, None)
|
||||
self.session_manager.mempool_statuses.pop(hashX, None)
|
||||
return status
|
||||
|
||||
async def hashX_listunspent(self, hashX):
|
||||
"""Return the list of UTXOs of a script hash, including mempool
|
||||
effects."""
|
||||
utxos = await self.db.all_utxos(hashX)
|
||||
utxos = sorted(utxos)
|
||||
utxos.extend(await self.mempool.unordered_UTXOs(hashX))
|
||||
spends = await self.mempool.potential_spends(hashX)
|
||||
|
||||
return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
|
||||
'tx_pos': utxo.tx_pos,
|
||||
'height': utxo.height, 'value': utxo.value}
|
||||
for utxo in utxos
|
||||
if (utxo.tx_hash, utxo.tx_pos) not in spends]
|
||||
# async def hashX_listunspent(self, hashX):
|
||||
# """Return the list of UTXOs of a script hash, including mempool
|
||||
# effects."""
|
||||
# utxos = await self.db.all_utxos(hashX)
|
||||
# utxos = sorted(utxos)
|
||||
# utxos.extend(await self.mempool.unordered_UTXOs(hashX))
|
||||
# spends = await self.mempool.potential_spends(hashX)
|
||||
#
|
||||
# return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
|
||||
# 'tx_pos': utxo.tx_pos,
|
||||
# 'height': utxo.height, 'value': utxo.value}
|
||||
# for utxo in utxos
|
||||
# if (utxo.tx_hash, utxo.tx_pos) not in spends]
|
||||
|
||||
async def hashX_subscribe(self, hashX, alias):
|
||||
self.hashX_subs[hashX] = alias
|
||||
self.session_mgr.hashx_subscriptions_by_session[hashX].add(id(self))
|
||||
self.session_manager.hashx_subscriptions_by_session[hashX].add(id(self))
|
||||
return await self.address_status(hashX)
|
||||
|
||||
async def hashX_unsubscribe(self, hashX, alias):
|
||||
sessions = self.session_mgr.hashx_subscriptions_by_session[hashX]
|
||||
sessions = self.session_manager.hashx_subscriptions_by_session[hashX]
|
||||
sessions.remove(id(self))
|
||||
if not sessions:
|
||||
self.hashX_subs.pop(hashX, None)
|
||||
|
@ -1182,10 +1192,10 @@ class LBRYElectrumX(SessionBase):
|
|||
pass
|
||||
raise RPCError(BAD_REQUEST, f'{address} is not a valid address')
|
||||
|
||||
async def address_get_balance(self, address):
|
||||
"""Return the confirmed and unconfirmed balance of an address."""
|
||||
hashX = self.address_to_hashX(address)
|
||||
return await self.get_balance(hashX)
|
||||
# async def address_get_balance(self, address):
|
||||
# """Return the confirmed and unconfirmed balance of an address."""
|
||||
# hashX = self.address_to_hashX(address)
|
||||
# return await self.get_balance(hashX)
|
||||
|
||||
async def address_get_history(self, address):
|
||||
"""Return the confirmed and unconfirmed history of an address."""
|
||||
|
@ -1197,10 +1207,10 @@ class LBRYElectrumX(SessionBase):
|
|||
hashX = self.address_to_hashX(address)
|
||||
return self.unconfirmed_history(hashX)
|
||||
|
||||
async def address_listunspent(self, address):
|
||||
"""Return the list of UTXOs of an address."""
|
||||
hashX = self.address_to_hashX(address)
|
||||
return await self.hashX_listunspent(hashX)
|
||||
# async def address_listunspent(self, address):
|
||||
# """Return the list of UTXOs of an address."""
|
||||
# hashX = self.address_to_hashX(address)
|
||||
# return await self.hashX_listunspent(hashX)
|
||||
|
||||
async def address_subscribe(self, *addresses):
|
||||
"""Subscribe to an address.
|
||||
|
@ -1221,16 +1231,16 @@ class LBRYElectrumX(SessionBase):
|
|||
hashX = self.address_to_hashX(address)
|
||||
return await self.hashX_unsubscribe(hashX, address)
|
||||
|
||||
async def get_balance(self, hashX):
|
||||
utxos = await self.db.all_utxos(hashX)
|
||||
confirmed = sum(utxo.value for utxo in utxos)
|
||||
unconfirmed = await self.mempool.balance_delta(hashX)
|
||||
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
|
||||
# async def get_balance(self, hashX):
|
||||
# utxos = await self.db.all_utxos(hashX)
|
||||
# confirmed = sum(utxo.value for utxo in utxos)
|
||||
# unconfirmed = await self.mempool.balance_delta(hashX)
|
||||
# return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
|
||||
|
||||
async def scripthash_get_balance(self, scripthash):
|
||||
"""Return the confirmed and unconfirmed balance of a scripthash."""
|
||||
hashX = scripthash_to_hashX(scripthash)
|
||||
return await self.get_balance(hashX)
|
||||
# async def scripthash_get_balance(self, scripthash):
|
||||
# """Return the confirmed and unconfirmed balance of a scripthash."""
|
||||
# hashX = scripthash_to_hashX(scripthash)
|
||||
# return await self.get_balance(hashX)
|
||||
|
||||
def unconfirmed_history(self, hashX):
|
||||
# Note unconfirmed history is unordered in electrum-server
|
||||
|
@ -1242,7 +1252,7 @@ class LBRYElectrumX(SessionBase):
|
|||
|
||||
async def confirmed_and_unconfirmed_history(self, hashX):
|
||||
# Note history is ordered but unconfirmed is unordered in e-s
|
||||
history = await self.session_mgr.limited_history(hashX)
|
||||
history = await self.session_manager.limited_history(hashX)
|
||||
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
|
||||
for tx_hash, height in history]
|
||||
return conf + self.unconfirmed_history(hashX)
|
||||
|
@ -1257,10 +1267,10 @@ class LBRYElectrumX(SessionBase):
|
|||
hashX = scripthash_to_hashX(scripthash)
|
||||
return self.unconfirmed_history(hashX)
|
||||
|
||||
async def scripthash_listunspent(self, scripthash):
|
||||
"""Return the list of UTXOs of a scripthash."""
|
||||
hashX = scripthash_to_hashX(scripthash)
|
||||
return await self.hashX_listunspent(hashX)
|
||||
# async def scripthash_listunspent(self, scripthash):
|
||||
# """Return the list of UTXOs of a scripthash."""
|
||||
# hashX = scripthash_to_hashX(scripthash)
|
||||
# return await self.hashX_listunspent(hashX)
|
||||
|
||||
async def scripthash_subscribe(self, scripthash):
|
||||
"""Subscribe to a script hash.
|
||||
|
@ -1295,7 +1305,7 @@ class LBRYElectrumX(SessionBase):
|
|||
|
||||
max_size = self.MAX_CHUNK_SIZE
|
||||
count = min(count, max_size)
|
||||
headers, count = self.db.read_headers(start_height, count)
|
||||
headers, count = await self.db.read_headers(start_height, count)
|
||||
|
||||
if b64:
|
||||
headers = self.db.encode_headers(start_height, count, headers)
|
||||
|
@ -1318,7 +1328,7 @@ class LBRYElectrumX(SessionBase):
|
|||
index = non_negative_integer(index)
|
||||
size = self.coin.CHUNK_SIZE
|
||||
start_height = index * size
|
||||
headers, _ = self.db.read_headers(start_height, size)
|
||||
headers, _ = await self.db.read_headers(start_height, size)
|
||||
return headers.hex()
|
||||
|
||||
async def block_get_header(self, height):
|
||||
|
@ -1326,7 +1336,7 @@ class LBRYElectrumX(SessionBase):
|
|||
|
||||
height: the header's height"""
|
||||
height = non_negative_integer(height)
|
||||
return await self.session_mgr.electrum_header(height)
|
||||
return await self.session_manager.electrum_header(height)
|
||||
|
||||
def is_tor(self):
|
||||
"""Try to detect if the connection is to a tor hidden service we are
|
||||
|
@ -1416,10 +1426,10 @@ class LBRYElectrumX(SessionBase):
|
|||
self.close_after_send = True
|
||||
raise RPCError(BAD_REQUEST, f'unsupported client: {client_name}')
|
||||
if self.client_version != client_name[:17]:
|
||||
self.session_mgr.session_count_metric.labels(version=self.client_version).dec()
|
||||
self.session_manager.session_count_metric.labels(version=self.client_version).dec()
|
||||
self.client_version = client_name[:17]
|
||||
self.session_mgr.session_count_metric.labels(version=self.client_version).inc()
|
||||
self.session_mgr.client_version_metric.labels(version=self.client_version).inc()
|
||||
self.session_manager.session_count_metric.labels(version=self.client_version).inc()
|
||||
self.session_manager.client_version_metric.labels(version=self.client_version).inc()
|
||||
|
||||
# Find the highest common protocol version. Disconnect if
|
||||
# that protocol version in unsupported.
|
||||
|
@ -1440,9 +1450,10 @@ class LBRYElectrumX(SessionBase):
|
|||
raw_tx: the raw transaction as a hexadecimal string"""
|
||||
# This returns errors as JSON RPC errors, as is natural
|
||||
try:
|
||||
hex_hash = await self.session_mgr.broadcast_transaction(raw_tx)
|
||||
hex_hash = await self.session_manager.broadcast_transaction(raw_tx)
|
||||
self.txs_sent += 1
|
||||
self.mempool.wakeup.set()
|
||||
# self.mempool.wakeup.set()
|
||||
# await asyncio.sleep(0.5)
|
||||
self.logger.info(f'sent tx: {hex_hash}')
|
||||
return hex_hash
|
||||
except DaemonError as e:
|
||||
|
@ -1456,7 +1467,7 @@ class LBRYElectrumX(SessionBase):
|
|||
return (await self.transaction_get_batch(tx_hash))[tx_hash]
|
||||
|
||||
async def transaction_get_batch(self, *tx_hashes):
|
||||
self.session_mgr.tx_request_count_metric.inc(len(tx_hashes))
|
||||
self.session_manager.tx_request_count_metric.inc(len(tx_hashes))
|
||||
if len(tx_hashes) > 100:
|
||||
raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}')
|
||||
for tx_hash in tx_hashes:
|
||||
|
@ -1495,8 +1506,11 @@ class LBRYElectrumX(SessionBase):
|
|||
'block_height': block_height
|
||||
}
|
||||
await asyncio.sleep(0) # heavy call, give other tasks a chance
|
||||
print("return tx batch")
|
||||
for tx_hash, (_, info) in batch_result.items():
|
||||
print(tx_hash, info['block_height'])
|
||||
|
||||
self.session_mgr.tx_replied_count_metric.inc(len(tx_hashes))
|
||||
self.session_manager.tx_replied_count_metric.inc(len(tx_hashes))
|
||||
return batch_result
|
||||
|
||||
async def transaction_get(self, tx_hash, verbose=False):
|
||||
|
|
Loading…
Add table
Reference in a new issue