diff --git a/scribe/blockchain/mempool.py b/scribe/blockchain/mempool.py new file mode 100644 index 0000000..848f929 --- /dev/null +++ b/scribe/blockchain/mempool.py @@ -0,0 +1,110 @@ +import itertools +import attr +import typing +from collections import defaultdict +from scribe.blockchain.transaction.deserializer import Deserializer + +if typing.TYPE_CHECKING: + from scribe.db import HubDB + + +@attr.s(slots=True) +class MemPoolTx: + prevouts = attr.ib() + # A pair is a (hashX, value) tuple + in_pairs = attr.ib() + out_pairs = attr.ib() + fee = attr.ib() + size = attr.ib() + raw_tx = attr.ib() + + +@attr.s(slots=True) +class MemPoolTxSummary: + hash = attr.ib() + fee = attr.ib() + has_unconfirmed_inputs = attr.ib() + + +class MemPool: + def __init__(self, coin, db: 'HubDB'): + self.coin = coin + self._db = db + self.txs = {} + self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key + + def mempool_history(self, hashX: bytes) -> str: + result = '' + for tx_hash in self.touched_hashXs.get(hashX, ()): + if tx_hash not in self.txs: + continue # the tx hash for the touched address is an input that isn't in mempool anymore + result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:' + return result + + def remove(self, to_remove: typing.Dict[bytes, bytes]): + # Remove txs that aren't in mempool anymore + for tx_hash in set(self.txs).intersection(to_remove.keys()): + tx = self.txs.pop(tx_hash) + tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs}) + for hashX in tx_hashXs: + if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]: + self.touched_hashXs[hashX].remove(tx_hash) + if not self.touched_hashXs[hashX]: + self.touched_hashXs.pop(hashX) + + def update_mempool(self, to_add: typing.List[typing.Tuple[bytes, bytes]]) -> typing.Set[bytes]: + prefix_db = self._db.prefix_db + touched_hashXs = set() + + # Re-sync with the new set of hashes + tx_map = {} + for tx_hash, raw_tx in to_add: + if tx_hash in self.txs: + continue + tx, tx_size = Deserializer(raw_tx).read_tx_and_vsize() + # Convert the inputs and outputs into (hashX, value) pairs + # Drop generation-like inputs from MemPoolTx.prevouts + txin_pairs = tuple((txin.prev_hash, txin.prev_idx) + for txin in tx.inputs + if not txin.is_generation()) + txout_pairs = tuple((self.coin.hashX_from_txo(txout), txout.value) + for txout in tx.outputs if txout.pk_script) + tx_map[tx_hash] = MemPoolTx(None, txin_pairs, txout_pairs, 0, tx_size, raw_tx) + + for tx_hash, tx in tx_map.items(): + prevouts = [] + # Look up the prevouts + for prev_hash, prev_index in tx.in_pairs: + if prev_hash in self.txs: # accepted mempool + utxo = self.txs[prev_hash].out_pairs[prev_index] + elif prev_hash in tx_map: # this set of changes + utxo = tx_map[prev_hash].out_pairs[prev_index] + else: # get it from the db + prev_tx_num = prefix_db.tx_num.get(prev_hash) + if not prev_tx_num: + continue + prev_tx_num = prev_tx_num.tx_num + hashX_val = prefix_db.hashX_utxo.get(prev_hash[:4], prev_tx_num, prev_index) + if not hashX_val: + continue + hashX = hashX_val.hashX + utxo_value = prefix_db.utxo.get(hashX, prev_tx_num, prev_index) + utxo = (hashX, utxo_value.amount) + prevouts.append(utxo) + + # Save the prevouts, compute the fee and accept the TX + tx.prevouts = tuple(prevouts) + # Avoid negative fees if dealing with generation-like transactions + # because some in_parts would be missing + tx.fee = max(0, (sum(v for _, v in tx.prevouts) - + sum(v for _, v in tx.out_pairs))) + self.txs[tx_hash] = tx + for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs): + self.touched_hashXs[hashX].add(tx_hash) + touched_hashXs.add(hashX) + + return touched_hashXs + + def clear(self): + self.txs.clear() + self.touched_hashXs.clear() diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 629bccd..b539ff0 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -11,10 +11,11 @@ from scribe import PROMETHEUS_NAMESPACE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.error.base import ChainError -from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem +from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256 from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.prefetcher import Prefetcher +from scribe.blockchain.mempool import MemPool from scribe.schema.url import normalize_name from scribe.service import BlockchainService if typing.TYPE_CHECKING: @@ -45,6 +46,7 @@ class BlockchainProcessorService(BlockchainService): def __init__(self, env: 'Env'): super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') self.daemon = LBCDaemon(env.coin, env.daemon_url) + self.mempool = MemPool(env.coin, self.db) self.coin = env.coin self.wait_for_blocks_duration = 0.1 self._ready_to_stop = asyncio.Event() @@ -147,6 +149,10 @@ class BlockchainProcessorService(BlockchainService): } def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): + self.mempool.remove(to_delete) + touched_hashXs = self.mempool.update_mempool(to_put) + for hashX in touched_hashXs: + self._get_update_hashX_mempool_status_ops(hashX) for tx_hash, raw_tx in to_put: mempool_prefix.stage_put((tx_hash,), (raw_tx,)) for tx_hash, raw_tx in to_delete.items(): @@ -157,17 +163,17 @@ class BlockchainProcessorService(BlockchainService): current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx) _to_put = [] try: - mempool_hashes = await self.daemon.mempool_hashes() + mempool_txids = await self.daemon.mempool_hashes() except (TypeError, RPCError) as err: self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err) return - for hh in mempool_hashes: - tx_hash = bytes.fromhex(hh)[::-1] + for mempool_txid in mempool_txids: + tx_hash = bytes.fromhex(mempool_txid)[::-1] if tx_hash in current_mempool: current_mempool.pop(tx_hash) else: try: - _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh)))) + _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(mempool_txid)))) except (TypeError, RPCError): self.log.warning("failed to get a mempool tx, reorg underway?") return @@ -1238,6 +1244,50 @@ class BlockchainProcessorService(BlockchainService): self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes) + def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]): + existing = self.db.prefix_db.hashX_status.get(hashX) + if existing: + self.db.prefix_db.hashX_status.stage_delete((hashX,), existing) + tx_nums = self.db.read_history(hashX, limit=None) + history = '' + for tx_num in tx_nums: + history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' + for tx_hash, height in new_history: + history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' + if history: + status = sha256(history.encode()) + self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) + + def _get_update_hashX_mempool_status_ops(self, hashX: bytes): + existing = self.db.prefix_db.hashX_mempool_status.get(hashX) + if existing: + self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing) + tx_nums = self.db.read_history(hashX, limit=None) + history = '' + for tx_num in tx_nums: + history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' + history += self.mempool.mempool_history(hashX) + if history: + status = sha256(history.encode()) + self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) + + def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes): + if height > self.env.reorg_limit: # compactify existing history + hist_txs = b'' + # accumulate and delete all of the tx histories between height 1 and current - reorg_limit + for k, hist in self.db.prefix_db.hashX_history.iterate( + start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit), + deserialize_key=False, deserialize_value=False): + hist_txs += hist + self.db.prefix_db.stage_raw_delete(k, hist) + if hist_txs: + # add the accumulated histories onto the existing compacted history at height 0 + key = self.db.prefix_db.hashX_history.pack_key(hashX, 0) + existing = self.db.prefix_db.get(key) + if existing is not None: + self.db.prefix_db.stage_raw_delete(key, existing) + self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs) + def advance_block(self, block: Block): height = self.height + 1 # print("advance ", height) @@ -1326,22 +1376,16 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) + for k, v in self.db.prefix_db.hashX_mempool_status.iterate( + start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False): + self.db.prefix_db.stage_raw_delete(k, v) + for hashX, new_history in self.hashXs_by_tx.items(): - if height > self.env.reorg_limit: # compactify existing history - hist_txs = b'' - # accumulate and delete all of the tx histories between height 1 and current - reorg_limit - for k, hist in self.db.prefix_db.hashX_history.iterate( - start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit), - deserialize_key=False, deserialize_value=False): - hist_txs += hist - self.db.prefix_db.stage_raw_delete(k, hist) - if hist_txs: - # add the accumulated histories onto the existing compacted history at height 0 - key = self.db.prefix_db.hashX_history.pack_key(hashX, 0) - existing = self.db.prefix_db.get(key) - if existing is not None: - self.db.prefix_db.stage_raw_delete(key, existing) - self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs) + # TODO: combine this with compaction so that we only read the history once + self._get_update_hashX_status_ops( + hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history] + ) + self._get_compactify_hashX_history_ops(height, hashX) if not new_history: continue self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,)) @@ -1418,6 +1462,7 @@ class BlockchainProcessorService(BlockchainService): self.pending_transactions.clear() self.pending_support_amount_change.clear() self.touched_hashXs.clear() + self.mempool.clear() def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 @@ -1592,6 +1637,16 @@ class BlockchainProcessorService(BlockchainService): await self.run_in_thread_with_lock(flush) def _iter_start_tasks(self): + while self.db.db_version < max(self.db.DB_VERSIONS): + if self.db.db_version == 7: + from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION + else: + raise RuntimeError("unknown db version") + self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}") + migrate(self.db) + self.log.info("finished migration") + self.db.read_db_state() + self.height = self.db.db_height self.tip = self.db.db_tip self.tx_count = self.db.db_tx_count diff --git a/scribe/db/common.py b/scribe/db/common.py index 3cf530e..23f9164 100644 --- a/scribe/db/common.py +++ b/scribe/db/common.py @@ -46,6 +46,8 @@ class DB_PREFIXES(enum.Enum): trending_notifications = b'c' mempool_tx = b'd' touched_hashX = b'e' + hashX_status = b'f' + hashX_mempool_status = b'g' COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass diff --git a/scribe/db/db.py b/scribe/db/db.py index f3827b1..98fccae 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -23,6 +23,7 @@ from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedR from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow +from scribe.db.prefixes import HashXMempoolStatusPrefixRow TXO_STRUCT = struct.Struct(b'>LH') @@ -31,7 +32,7 @@ TXO_STRUCT_pack = TXO_STRUCT.pack class HubDB: - DB_VERSIONS = HIST_DB_VERSIONS = [7] + DB_VERSIONS = [7, 8] def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, @@ -804,7 +805,8 @@ class HubDB: self.prefix_db = PrefixDB( db_path, cache_mb=self._cache_MB, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, - unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix}, secondary_path=secondary_path + unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix}, + secondary_path=secondary_path ) if secondary_path != '': @@ -848,6 +850,14 @@ class HubDB: self.prefix_db.close() self.prefix_db = None + def get_hashX_status(self, hashX: bytes): + mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False) + if mempool_status: + return mempool_status.hex() + status = self.prefix_db.hashX_status.get(hashX, deserialize_value=False) + if status: + return status.hex() + def get_tx_hash(self, tx_num: int) -> bytes: if self._cache_all_tx_hashes: return self.total_transactions[tx_num] @@ -1017,7 +1027,7 @@ class HubDB: txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): txs_extend(hist) - if len(txs) >= limit: + if limit and len(txs) >= limit: break return txs diff --git a/scribe/db/migrators/__init__.py b/scribe/db/migrators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scribe/db/migrators/migrate7to8.py b/scribe/db/migrators/migrate7to8.py new file mode 100644 index 0000000..58e1627 --- /dev/null +++ b/scribe/db/migrators/migrate7to8.py @@ -0,0 +1,88 @@ +import logging +import time +import array +import typing +from bisect import bisect_right +from scribe.common import sha256 +if typing.TYPE_CHECKING: + from scribe.db.db import HubDB + +FROM_VERSION = 7 +TO_VERSION = 8 + + +def get_all_hashXs(db): + def iterator(): + last_hashX = None + for k in db.prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False): + hashX = k[1:12] + if last_hashX is None: + last_hashX = hashX + if last_hashX != hashX: + yield hashX + last_hashX = hashX + if last_hashX: + yield last_hashX + return [hashX for hashX in iterator()] + + +def hashX_history(db: 'HubDB', hashX: bytes): + history = b'' + to_delete = [] + for k, v in db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, deserialize_key=False): + to_delete.append((k, v)) + history += v + return history, to_delete + + +def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes: + tx_counts = db.tx_counts + hist_tx_nums = array.array('I') + hist_tx_nums.frombytes(history) + hist = '' + for tx_num in hist_tx_nums: + hist += f'{db.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:' + return sha256(hist.encode()) + + +def migrate(db): + log = logging.getLogger(__name__) + start = time.perf_counter() + prefix_db = db.prefix_db + hashXs = get_all_hashXs(db) + log.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, " + f"now building the status index...") + op_cnt = 0 + hashX_cnt = 0 + for hashX in hashXs: + hashX_cnt += 1 + key = prefix_db.hashX_status.pack_key(hashX) + history, to_delete = hashX_history(db, hashX) + status = hashX_status_from_history(db, history) + existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False) + if existing_status and existing_status != status: + prefix_db.stage_raw_delete(key, existing_status) + op_cnt += 1 + elif existing_status == status: + pass + else: + prefix_db.stage_raw_put(key, status) + op_cnt += 1 + if len(to_delete) > 1: + for k, v in to_delete: + prefix_db.stage_raw_delete(k, v) + op_cnt += 1 + if history: + prefix_db.stage_raw_put(prefix_db.hashX_history.pack_key(hashX, 0), history) + op_cnt += 1 + if op_cnt > 100000: + prefix_db.unsafe_commit() + log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses") + op_cnt = 0 + if op_cnt: + prefix_db.unsafe_commit() + log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses") + db.db_version = 8 + db.write_db_state() + db.prefix_db.unsafe_commit() + log.info("finished migration") diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index 21c8f89..098d89c 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -1623,6 +1623,76 @@ class TouchedHashXPrefixRow(PrefixRow): return cls.pack_key(height), cls.pack_value(touched) +class HashXStatusKey(NamedTuple): + hashX: bytes + + +class HashXStatusValue(NamedTuple): + status: bytes + + +class HashXStatusPrefixRow(PrefixRow): + prefix = DB_PREFIXES.hashX_status.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'32s') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] + + @classmethod + def pack_key(cls, hashX: bytes): + return super().pack_key(hashX) + + @classmethod + def unpack_key(cls, key: bytes) -> HashXStatusKey: + return HashXStatusKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, status: bytes) -> bytes: + return super().pack_value(status) + + @classmethod + def unpack_value(cls, data: bytes) -> HashXStatusValue: + return HashXStatusValue(*cls.value_struct.unpack(data)) + + @classmethod + def pack_item(cls, hashX: bytes, status: bytes): + return cls.pack_key(hashX), cls.pack_value(status) + + +class HashXMempoolStatusPrefixRow(PrefixRow): + prefix = DB_PREFIXES.hashX_mempool_status.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'32s') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] + + @classmethod + def pack_key(cls, hashX: bytes): + return super().pack_key(hashX) + + @classmethod + def unpack_key(cls, key: bytes) -> HashXStatusKey: + return HashXStatusKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, status: bytes) -> bytes: + return super().pack_value(status) + + @classmethod + def unpack_value(cls, data: bytes) -> HashXStatusValue: + return HashXStatusValue(*cls.value_struct.unpack(data)) + + @classmethod + def pack_item(cls, hashX: bytes, status: bytes): + return cls.pack_key(hashX), cls.pack_value(status) + + class PrefixDB(BasePrefixDB): def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): @@ -1662,6 +1732,8 @@ class PrefixDB(BasePrefixDB): self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack) self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack) self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack) + self.hashX_status = HashXStatusPrefixRow(db, self._op_stack) + self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack) def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: diff --git a/scribe/hub/mempool.py b/scribe/hub/mempool.py index c31f47e..e98b632 100644 --- a/scribe/hub/mempool.py +++ b/scribe/hub/mempool.py @@ -41,7 +41,7 @@ mempool_process_time_metric = Histogram( ) -class MemPool: +class HubMemPool: def __init__(self, coin, db: 'HubDB', refresh_secs=1.0): self.coin = coin self._db = db diff --git a/scribe/hub/service.py b/scribe/hub/service.py index cb7fcd7..b209995 100644 --- a/scribe/hub/service.py +++ b/scribe/hub/service.py @@ -3,7 +3,7 @@ import time import asyncio from scribe.blockchain.daemon import LBCDaemon from scribe.hub.session import SessionManager -from scribe.hub.mempool import MemPool +from scribe.hub.mempool import HubMemPool from scribe.hub.udp import StatusServer from scribe.service import BlockchainReaderService from scribe.elasticsearch import ElasticNotifierClientProtocol @@ -16,7 +16,7 @@ class HubServerService(BlockchainReaderService): self.mempool_notifications = set() self.status_server = StatusServer() self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs - self.mempool = MemPool(self.env.coin, self.db) + self.mempool = HubMemPool(self.env.coin, self.db) self.session_manager = SessionManager( env, self.db, self.mempool, self.daemon, self.shutdown_event, diff --git a/scribe/hub/session.py b/scribe/hub/session.py index 949ac5f..ed246a1 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -30,7 +30,7 @@ if typing.TYPE_CHECKING: from scribe.db import HubDB from scribe.env import Env from scribe.blockchain.daemon import LBCDaemon - from scribe.hub.mempool import MemPool + from scribe.hub.mempool import HubMemPool BAD_REQUEST = 1 DAEMON_ERROR = 2 @@ -38,13 +38,10 @@ DAEMON_ERROR = 2 log = logging.getLogger(__name__) - SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' 'required_names other_names') - - def scripthash_to_hashX(scripthash: str) -> bytes: try: bin_hash = hex_str_to_hash(scripthash) @@ -136,7 +133,6 @@ class SessionManager: tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE) - interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) db_operational_error_metric = Counter( "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE @@ -168,7 +164,7 @@ class SessionManager: namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS ) - def __init__(self, env: 'Env', db: 'HubDB', mempool: 'MemPool', + def __init__(self, env: 'Env', db: 'HubDB', mempool: 'HubMemPool', daemon: 'LBCDaemon', shutdown_event: asyncio.Event, on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): env.max_send = max(350000, env.max_send) @@ -1105,18 +1101,7 @@ class LBRYElectrumX(asyncio.Protocol): return len(self.hashX_subs) async def get_hashX_status(self, hashX: bytes): - mempool_history = self.mempool.transaction_summaries(hashX) - history = ''.join(f'{hash_to_hex_str(tx_hash)}:' - f'{height:d}:' - for tx_hash, height in await self.session_manager.limited_history(hashX)) - history += ''.join(f'{hash_to_hex_str(tx.hash)}:' - f'{-tx.has_unconfirmed_inputs:d}:' - for tx in mempool_history) - if history: - status = sha256(history.encode()).hex() - else: - status = None - return history, status, len(mempool_history) > 0 + return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX) async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): notifications = [] @@ -1127,14 +1112,12 @@ class LBRYElectrumX(asyncio.Protocol): else: method = 'blockchain.address.subscribe' start = time.perf_counter() - history, status, mempool_status = await self.get_hashX_status(hashX) - if mempool_status: - self.session_manager.mempool_statuses[hashX] = status - else: - self.session_manager.mempool_statuses.pop(hashX, None) - - self.session_manager.address_history_metric.observe(time.perf_counter() - start) + status = await self.get_hashX_status(hashX) + duration = time.perf_counter() - start + self.session_manager.address_history_metric.observe(duration) notifications.append((method, (alias, status))) + if duration > 30: + self.logger.warning("slow history notification (%s) for '%s'", duration, alias) start = time.perf_counter() self.session_manager.notifications_in_flight_metric.inc() @@ -1340,11 +1323,7 @@ class LBRYElectrumX(asyncio.Protocol): """ # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 - _, status, has_mempool_history = await self.get_hashX_status(hashX) - if has_mempool_history: - self.session_manager.mempool_statuses[hashX] = status - else: - self.session_manager.mempool_statuses.pop(hashX, None) + status = await self.get_hashX_status(hashX) return status async def hashX_listunspent(self, hashX: bytes):