diff --git a/scribe/blockchain/mempool.py b/scribe/blockchain/mempool.py new file mode 100644 index 0000000..848f929 --- /dev/null +++ b/scribe/blockchain/mempool.py @@ -0,0 +1,110 @@ +import itertools +import attr +import typing +from collections import defaultdict +from scribe.blockchain.transaction.deserializer import Deserializer + +if typing.TYPE_CHECKING: + from scribe.db import HubDB + + +@attr.s(slots=True) +class MemPoolTx: + prevouts = attr.ib() + # A pair is a (hashX, value) tuple + in_pairs = attr.ib() + out_pairs = attr.ib() + fee = attr.ib() + size = attr.ib() + raw_tx = attr.ib() + + +@attr.s(slots=True) +class MemPoolTxSummary: + hash = attr.ib() + fee = attr.ib() + has_unconfirmed_inputs = attr.ib() + + +class MemPool: + def __init__(self, coin, db: 'HubDB'): + self.coin = coin + self._db = db + self.txs = {} + self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key + + def mempool_history(self, hashX: bytes) -> str: + result = '' + for tx_hash in self.touched_hashXs.get(hashX, ()): + if tx_hash not in self.txs: + continue # the tx hash for the touched address is an input that isn't in mempool anymore + result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:' + return result + + def remove(self, to_remove: typing.Dict[bytes, bytes]): + # Remove txs that aren't in mempool anymore + for tx_hash in set(self.txs).intersection(to_remove.keys()): + tx = self.txs.pop(tx_hash) + tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs}) + for hashX in tx_hashXs: + if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]: + self.touched_hashXs[hashX].remove(tx_hash) + if not self.touched_hashXs[hashX]: + self.touched_hashXs.pop(hashX) + + def update_mempool(self, to_add: typing.List[typing.Tuple[bytes, bytes]]) -> typing.Set[bytes]: + prefix_db = self._db.prefix_db + touched_hashXs = set() + + # Re-sync with the new set of hashes + tx_map = {} + for tx_hash, raw_tx in to_add: + if tx_hash in self.txs: + continue + tx, tx_size = Deserializer(raw_tx).read_tx_and_vsize() + # Convert the inputs and outputs into (hashX, value) pairs + # Drop generation-like inputs from MemPoolTx.prevouts + txin_pairs = tuple((txin.prev_hash, txin.prev_idx) + for txin in tx.inputs + if not txin.is_generation()) + txout_pairs = tuple((self.coin.hashX_from_txo(txout), txout.value) + for txout in tx.outputs if txout.pk_script) + tx_map[tx_hash] = MemPoolTx(None, txin_pairs, txout_pairs, 0, tx_size, raw_tx) + + for tx_hash, tx in tx_map.items(): + prevouts = [] + # Look up the prevouts + for prev_hash, prev_index in tx.in_pairs: + if prev_hash in self.txs: # accepted mempool + utxo = self.txs[prev_hash].out_pairs[prev_index] + elif prev_hash in tx_map: # this set of changes + utxo = tx_map[prev_hash].out_pairs[prev_index] + else: # get it from the db + prev_tx_num = prefix_db.tx_num.get(prev_hash) + if not prev_tx_num: + continue + prev_tx_num = prev_tx_num.tx_num + hashX_val = prefix_db.hashX_utxo.get(prev_hash[:4], prev_tx_num, prev_index) + if not hashX_val: + continue + hashX = hashX_val.hashX + utxo_value = prefix_db.utxo.get(hashX, prev_tx_num, prev_index) + utxo = (hashX, utxo_value.amount) + prevouts.append(utxo) + + # Save the prevouts, compute the fee and accept the TX + tx.prevouts = tuple(prevouts) + # Avoid negative fees if dealing with generation-like transactions + # because some in_parts would be missing + tx.fee = max(0, (sum(v for _, v in tx.prevouts) - + sum(v for _, v in tx.out_pairs))) + self.txs[tx_hash] = tx + for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs): + self.touched_hashXs[hashX].add(tx_hash) + touched_hashXs.add(hashX) + + return touched_hashXs + + def clear(self): + self.txs.clear() + self.touched_hashXs.clear() diff --git a/scribe/blockchain/service.py b/scribe/blockchain/service.py index 629bccd..e6da88d 100644 --- a/scribe/blockchain/service.py +++ b/scribe/blockchain/service.py @@ -11,10 +11,11 @@ from scribe import PROMETHEUS_NAMESPACE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.error.base import ChainError -from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem +from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256 from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.prefetcher import Prefetcher +from scribe.blockchain.mempool import MemPool from scribe.schema.url import normalize_name from scribe.service import BlockchainService if typing.TYPE_CHECKING: @@ -45,6 +46,7 @@ class BlockchainProcessorService(BlockchainService): def __init__(self, env: 'Env'): super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') self.daemon = LBCDaemon(env.coin, env.daemon_url) + self.mempool = MemPool(env.coin, self.db) self.coin = env.coin self.wait_for_blocks_duration = 0.1 self._ready_to_stop = asyncio.Event() @@ -147,6 +149,10 @@ class BlockchainProcessorService(BlockchainService): } def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): + self.mempool.remove(to_delete) + touched_hashXs = self.mempool.update_mempool(to_put) + for hashX in touched_hashXs: + self._get_update_hashX_mempool_status_ops(hashX) for tx_hash, raw_tx in to_put: mempool_prefix.stage_put((tx_hash,), (raw_tx,)) for tx_hash, raw_tx in to_delete.items(): @@ -157,17 +163,17 @@ class BlockchainProcessorService(BlockchainService): current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx) _to_put = [] try: - mempool_hashes = await self.daemon.mempool_hashes() + mempool_txids = await self.daemon.mempool_hashes() except (TypeError, RPCError) as err: self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err) return - for hh in mempool_hashes: - tx_hash = bytes.fromhex(hh)[::-1] + for mempool_txid in mempool_txids: + tx_hash = bytes.fromhex(mempool_txid)[::-1] if tx_hash in current_mempool: current_mempool.pop(tx_hash) else: try: - _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh)))) + _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(mempool_txid)))) except (TypeError, RPCError): self.log.warning("failed to get a mempool tx, reorg underway?") return @@ -1238,6 +1244,33 @@ class BlockchainProcessorService(BlockchainService): self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes) + def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]): + existing = self.db.prefix_db.hashX_status.get(hashX) + if existing: + self.db.prefix_db.hashX_status.stage_delete((hashX,), existing) + tx_nums = self.db.read_history(hashX, limit=None) + history = '' + for tx_num in tx_nums: + history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' + for tx_hash, height in new_history: + history += f'{hash_to_hex_str(tx_hash)}:{height:d}:' + if history: + status = sha256(history.encode()) + self.db.prefix_db.hashX_status.stage_put((hashX,), (status,)) + + def _get_update_hashX_mempool_status_ops(self, hashX: bytes): + existing = self.db.prefix_db.hashX_mempool_status.get(hashX) + if existing: + self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing) + tx_nums = self.db.read_history(hashX, limit=None) + history = '' + for tx_num in tx_nums: + history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:' + history += self.mempool.mempool_history(hashX) + if history: + status = sha256(history.encode()) + self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,)) + def advance_block(self, block: Block): height = self.height + 1 # print("advance ", height) @@ -1326,7 +1359,14 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) + for k, v in self.db.prefix_db.hashX_mempool_status.iterate( + start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False): + self.db.prefix_db.stage_raw_delete(k, v) + for hashX, new_history in self.hashXs_by_tx.items(): + self._get_update_hashX_status_ops( + hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history] + ) if height > self.env.reorg_limit: # compactify existing history hist_txs = b'' # accumulate and delete all of the tx histories between height 1 and current - reorg_limit @@ -1418,6 +1458,7 @@ class BlockchainProcessorService(BlockchainService): self.pending_transactions.clear() self.pending_support_amount_change.clear() self.touched_hashXs.clear() + self.mempool.clear() def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0 diff --git a/scribe/db/common.py b/scribe/db/common.py index 3cf530e..23f9164 100644 --- a/scribe/db/common.py +++ b/scribe/db/common.py @@ -46,6 +46,8 @@ class DB_PREFIXES(enum.Enum): trending_notifications = b'c' mempool_tx = b'd' touched_hashX = b'e' + hashX_status = b'f' + hashX_mempool_status = b'g' COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass diff --git a/scribe/db/db.py b/scribe/db/db.py index f3827b1..ab22ae7 100644 --- a/scribe/db/db.py +++ b/scribe/db/db.py @@ -23,6 +23,7 @@ from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedR from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow +from scribe.db.prefixes import HashXMempoolStatusPrefixRow TXO_STRUCT = struct.Struct(b'>LH') @@ -804,7 +805,8 @@ class HubDB: self.prefix_db = PrefixDB( db_path, cache_mb=self._cache_MB, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, - unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix}, secondary_path=secondary_path + unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix}, + secondary_path=secondary_path ) if secondary_path != '': @@ -1017,7 +1019,7 @@ class HubDB: txs_extend = txs.extend for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): txs_extend(hist) - if len(txs) >= limit: + if limit and len(txs) >= limit: break return txs diff --git a/scribe/db/prefixes.py b/scribe/db/prefixes.py index 21c8f89..098d89c 100644 --- a/scribe/db/prefixes.py +++ b/scribe/db/prefixes.py @@ -1623,6 +1623,76 @@ class TouchedHashXPrefixRow(PrefixRow): return cls.pack_key(height), cls.pack_value(touched) +class HashXStatusKey(NamedTuple): + hashX: bytes + + +class HashXStatusValue(NamedTuple): + status: bytes + + +class HashXStatusPrefixRow(PrefixRow): + prefix = DB_PREFIXES.hashX_status.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'32s') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] + + @classmethod + def pack_key(cls, hashX: bytes): + return super().pack_key(hashX) + + @classmethod + def unpack_key(cls, key: bytes) -> HashXStatusKey: + return HashXStatusKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, status: bytes) -> bytes: + return super().pack_value(status) + + @classmethod + def unpack_value(cls, data: bytes) -> HashXStatusValue: + return HashXStatusValue(*cls.value_struct.unpack(data)) + + @classmethod + def pack_item(cls, hashX: bytes, status: bytes): + return cls.pack_key(hashX), cls.pack_value(status) + + +class HashXMempoolStatusPrefixRow(PrefixRow): + prefix = DB_PREFIXES.hashX_mempool_status.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'32s') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] + + @classmethod + def pack_key(cls, hashX: bytes): + return super().pack_key(hashX) + + @classmethod + def unpack_key(cls, key: bytes) -> HashXStatusKey: + return HashXStatusKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, status: bytes) -> bytes: + return super().pack_value(status) + + @classmethod + def unpack_value(cls, data: bytes) -> HashXStatusValue: + return HashXStatusValue(*cls.value_struct.unpack(data)) + + @classmethod + def pack_item(cls, hashX: bytes, status: bytes): + return cls.pack_key(hashX), cls.pack_value(status) + + class PrefixDB(BasePrefixDB): def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): @@ -1662,6 +1732,8 @@ class PrefixDB(BasePrefixDB): self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack) self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack) self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack) + self.hashX_status = HashXStatusPrefixRow(db, self._op_stack) + self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack) def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: diff --git a/scribe/hub/session.py b/scribe/hub/session.py index c0e90e1..e212023 100644 --- a/scribe/hub/session.py +++ b/scribe/hub/session.py @@ -1101,18 +1101,12 @@ class LBRYElectrumX(asyncio.Protocol): return len(self.hashX_subs) async def get_hashX_status(self, hashX: bytes): - mempool_history = self.mempool.transaction_summaries(hashX) - history = ''.join(f'{hash_to_hex_str(tx_hash)}:' - f'{height:d}:' - for tx_hash, height in await self.session_manager.limited_history(hashX)) - history += ''.join(f'{hash_to_hex_str(tx.hash)}:' - f'{-tx.has_unconfirmed_inputs:d}:' - for tx in mempool_history) - if history: - status = sha256(history.encode()).hex() - else: - status = None - return history, status, len(mempool_history) > 0 + mempool_status = self.db.prefix_db.hashX_mempool_status.get(hashX) + if mempool_status: + return mempool_status.status.hex() + status = self.db.prefix_db.hashX_status.get(hashX) + if status: + return status.status.hex() async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): notifications = [] @@ -1123,14 +1117,12 @@ class LBRYElectrumX(asyncio.Protocol): else: method = 'blockchain.address.subscribe' start = time.perf_counter() - history, status, mempool_status = await self.get_hashX_status(hashX) - if mempool_status: - self.session_manager.mempool_statuses[hashX] = status - else: - self.session_manager.mempool_statuses.pop(hashX, None) - - self.session_manager.address_history_metric.observe(time.perf_counter() - start) + status = await self.get_hashX_status(hashX) + duration = time.perf_counter() - start + self.session_manager.address_history_metric.observe(duration) notifications.append((method, (alias, status))) + if duration > 30: + self.logger.warning("slow history notification (%s) for '%s'", duration, alias) start = time.perf_counter() self.session_manager.notifications_in_flight_metric.inc() @@ -1336,11 +1328,7 @@ class LBRYElectrumX(asyncio.Protocol): """ # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 - _, status, has_mempool_history = await self.get_hashX_status(hashX) - if has_mempool_history: - self.session_manager.mempool_statuses[hashX] = status - else: - self.session_manager.mempool_statuses.pop(hashX, None) + status = await self.get_hashX_status(hashX) return status async def hashX_listunspent(self, hashX: bytes):