2018-11-03 23:50:34 +01:00
|
|
|
# Copyright (c) 2016-2018, Neil Booth
|
|
|
|
#
|
|
|
|
# All rights reserved.
|
|
|
|
#
|
|
|
|
# See the file "LICENCE" for information about the copyright
|
|
|
|
# and warranty status of this software.
|
|
|
|
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Mempool handling."""
|
2018-12-06 20:27:38 +01:00
|
|
|
import asyncio
|
2018-11-03 23:50:34 +01:00
|
|
|
import itertools
|
|
|
|
import time
|
2021-07-16 20:51:10 +02:00
|
|
|
import attr
|
|
|
|
import typing
|
|
|
|
from typing import Set, Optional, Callable, Awaitable
|
2018-11-03 23:50:34 +01:00
|
|
|
from collections import defaultdict
|
2020-06-12 20:01:50 +02:00
|
|
|
from prometheus_client import Histogram
|
2019-12-31 20:52:57 +01:00
|
|
|
from lbry.wallet.server.hash import hash_to_hex_str, hex_str_to_hash
|
|
|
|
from lbry.wallet.server.util import class_logger, chunks
|
|
|
|
from lbry.wallet.server.leveldb import UTXO
|
2021-07-16 20:51:10 +02:00
|
|
|
if typing.TYPE_CHECKING:
|
|
|
|
from lbry.wallet.server.session import LBRYSessionManager
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
|
|
|
|
@attr.s(slots=True)
|
2018-11-26 02:51:41 +01:00
|
|
|
class MemPoolTx:
|
2018-11-03 23:50:34 +01:00
|
|
|
prevouts = attr.ib()
|
|
|
|
# A pair is a (hashX, value) tuple
|
|
|
|
in_pairs = attr.ib()
|
|
|
|
out_pairs = attr.ib()
|
|
|
|
fee = attr.ib()
|
|
|
|
size = attr.ib()
|
2021-10-28 02:19:08 +02:00
|
|
|
raw_tx = attr.ib()
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
|
|
|
|
@attr.s(slots=True)
|
2018-11-26 02:51:41 +01:00
|
|
|
class MemPoolTxSummary:
|
2018-11-03 23:50:34 +01:00
|
|
|
hash = attr.ib()
|
|
|
|
fee = attr.ib()
|
|
|
|
has_unconfirmed_inputs = attr.ib()
|
|
|
|
|
|
|
|
|
2020-06-12 20:01:50 +02:00
|
|
|
NAMESPACE = "wallet_server"
|
|
|
|
HISTOGRAM_BUCKETS = (
|
|
|
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
|
|
|
)
|
2020-06-15 16:21:40 +02:00
|
|
|
mempool_process_time_metric = Histogram(
|
|
|
|
"processed_mempool", "Time to process mempool and notify touched addresses",
|
|
|
|
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
|
|
|
|
)
|
2020-06-12 20:01:50 +02:00
|
|
|
|
|
|
|
|
2018-11-26 02:51:41 +01:00
|
|
|
class MemPool:
|
2021-07-16 21:12:46 +02:00
|
|
|
def __init__(self, coin, daemon, db, state_lock: asyncio.Lock, refresh_secs=1.0, log_status_secs=120.0):
|
2018-11-03 23:50:34 +01:00
|
|
|
self.coin = coin
|
2021-07-16 20:51:10 +02:00
|
|
|
self._daemon = daemon
|
|
|
|
self._db = db
|
|
|
|
self._touched_mp = {}
|
|
|
|
self._touched_bp = {}
|
|
|
|
self._highest_block = -1
|
|
|
|
|
2018-11-03 23:50:34 +01:00
|
|
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
|
|
|
self.txs = {}
|
|
|
|
self.hashXs = defaultdict(set) # None can be a key
|
|
|
|
self.cached_compact_histogram = []
|
|
|
|
self.refresh_secs = refresh_secs
|
|
|
|
self.log_status_secs = log_status_secs
|
|
|
|
# Prevents mempool refreshes during fee histogram calculation
|
2021-07-16 21:12:46 +02:00
|
|
|
self.lock = state_lock
|
2019-12-18 03:00:28 +01:00
|
|
|
self.wakeup = asyncio.Event()
|
2020-06-15 16:21:40 +02:00
|
|
|
self.mempool_process_time_metric = mempool_process_time_metric
|
2020-12-14 19:42:20 +01:00
|
|
|
self.notified_mempool_txs = set()
|
2021-07-16 20:51:10 +02:00
|
|
|
self.notify_sessions: Optional[Callable[[int, Set[bytes], Set[bytes]], Awaitable[None]]] = None
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
async def _logging(self, synchronized_event):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Print regular logs of mempool stats."""
|
2018-11-03 23:50:34 +01:00
|
|
|
self.logger.info('beginning processing of daemon mempool. '
|
|
|
|
'This can take some time...')
|
2020-03-01 18:34:36 +01:00
|
|
|
start = time.perf_counter()
|
2018-11-03 23:50:34 +01:00
|
|
|
await synchronized_event.wait()
|
2020-03-01 18:34:36 +01:00
|
|
|
elapsed = time.perf_counter() - start
|
2018-11-03 23:50:34 +01:00
|
|
|
self.logger.info(f'synced in {elapsed:.2f}s')
|
|
|
|
while True:
|
|
|
|
self.logger.info(f'{len(self.txs):,d} txs '
|
|
|
|
f'touching {len(self.hashXs):,d} addresses')
|
2019-12-18 03:00:28 +01:00
|
|
|
await asyncio.sleep(self.log_status_secs)
|
2018-11-03 23:50:34 +01:00
|
|
|
await synchronized_event.wait()
|
|
|
|
|
|
|
|
def _accept_transactions(self, tx_map, utxo_map, touched):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Accept transactions in tx_map to the mempool if all their inputs
|
2018-11-03 23:50:34 +01:00
|
|
|
can be found in the existing mempool or a utxo_map from the
|
|
|
|
DB.
|
|
|
|
|
|
|
|
Returns an (unprocessed tx_map, unspent utxo_map) pair.
|
2019-04-16 09:50:35 +02:00
|
|
|
"""
|
2018-11-03 23:50:34 +01:00
|
|
|
hashXs = self.hashXs
|
|
|
|
txs = self.txs
|
|
|
|
|
|
|
|
deferred = {}
|
|
|
|
unspent = set(utxo_map)
|
|
|
|
# Try to find all prevouts so we can accept the TX
|
|
|
|
for hash, tx in tx_map.items():
|
|
|
|
in_pairs = []
|
|
|
|
try:
|
|
|
|
for prevout in tx.prevouts:
|
|
|
|
utxo = utxo_map.get(prevout)
|
|
|
|
if not utxo:
|
|
|
|
prev_hash, prev_index = prevout
|
|
|
|
# Raises KeyError if prev_hash is not in txs
|
|
|
|
utxo = txs[prev_hash].out_pairs[prev_index]
|
|
|
|
in_pairs.append(utxo)
|
|
|
|
except KeyError:
|
|
|
|
deferred[hash] = tx
|
|
|
|
continue
|
|
|
|
|
|
|
|
# Spend the prevouts
|
|
|
|
unspent.difference_update(tx.prevouts)
|
|
|
|
|
|
|
|
# Save the in_pairs, compute the fee and accept the TX
|
|
|
|
tx.in_pairs = tuple(in_pairs)
|
|
|
|
# Avoid negative fees if dealing with generation-like transactions
|
|
|
|
# because some in_parts would be missing
|
|
|
|
tx.fee = max(0, (sum(v for _, v in tx.in_pairs) -
|
|
|
|
sum(v for _, v in tx.out_pairs)))
|
|
|
|
txs[hash] = tx
|
|
|
|
|
|
|
|
for hashX, value in itertools.chain(tx.in_pairs, tx.out_pairs):
|
|
|
|
touched.add(hashX)
|
|
|
|
hashXs[hashX].add(hash)
|
|
|
|
|
|
|
|
return deferred, {prevout: utxo_map[prevout] for prevout in unspent}
|
|
|
|
|
2021-03-02 23:59:15 +01:00
|
|
|
async def _mempool_loop(self, synchronized_event):
|
|
|
|
try:
|
|
|
|
return await self._refresh_hashes(synchronized_event)
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
raise
|
|
|
|
except Exception as e:
|
|
|
|
self.logger.exception("MEMPOOL DIED")
|
|
|
|
raise e
|
|
|
|
|
2018-11-03 23:50:34 +01:00
|
|
|
async def _refresh_hashes(self, synchronized_event):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Refresh our view of the daemon's mempool."""
|
2018-11-03 23:50:34 +01:00
|
|
|
while True:
|
2020-06-12 20:01:50 +02:00
|
|
|
start = time.perf_counter()
|
2021-07-16 20:51:10 +02:00
|
|
|
height = self._daemon.cached_height()
|
|
|
|
hex_hashes = await self._daemon.mempool_hashes()
|
|
|
|
if height != await self._daemon.height():
|
2018-11-03 23:50:34 +01:00
|
|
|
continue
|
2019-10-02 18:38:56 +02:00
|
|
|
hashes = {hex_str_to_hash(hh) for hh in hex_hashes}
|
2018-11-03 23:50:34 +01:00
|
|
|
async with self.lock:
|
2020-12-14 19:42:20 +01:00
|
|
|
new_hashes = hashes.difference(self.notified_mempool_txs)
|
2018-11-03 23:50:34 +01:00
|
|
|
touched = await self._process_mempool(hashes)
|
2020-12-14 19:42:20 +01:00
|
|
|
self.notified_mempool_txs.update(new_hashes)
|
|
|
|
new_touched = {
|
|
|
|
touched_hashx for touched_hashx, txs in self.hashXs.items() if txs.intersection(new_hashes)
|
|
|
|
}
|
2018-11-03 23:50:34 +01:00
|
|
|
synchronized_event.set()
|
|
|
|
synchronized_event.clear()
|
2021-07-16 20:51:10 +02:00
|
|
|
await self.on_mempool(touched, new_touched, height)
|
2020-06-15 16:22:30 +02:00
|
|
|
duration = time.perf_counter() - start
|
2020-06-12 20:01:50 +02:00
|
|
|
self.mempool_process_time_metric.observe(duration)
|
2020-06-15 22:21:43 +02:00
|
|
|
try:
|
|
|
|
# we wait up to `refresh_secs` but go early if a broadcast happens (which triggers wakeup event)
|
|
|
|
await asyncio.wait_for(self.wakeup.wait(), timeout=self.refresh_secs)
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
pass
|
|
|
|
finally:
|
|
|
|
self.wakeup.clear()
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
async def _process_mempool(self, all_hashes):
|
|
|
|
# Re-sync with the new set of hashes
|
|
|
|
txs = self.txs
|
2020-12-14 19:42:20 +01:00
|
|
|
|
|
|
|
hashXs = self.hashXs # hashX: [tx_hash, ...]
|
2018-11-03 23:50:34 +01:00
|
|
|
touched = set()
|
|
|
|
|
|
|
|
# First handle txs that have disappeared
|
|
|
|
for tx_hash in set(txs).difference(all_hashes):
|
|
|
|
tx = txs.pop(tx_hash)
|
2019-10-02 18:38:56 +02:00
|
|
|
tx_hashXs = {hashX for hashX, value in tx.in_pairs}
|
2018-11-03 23:50:34 +01:00
|
|
|
tx_hashXs.update(hashX for hashX, value in tx.out_pairs)
|
|
|
|
for hashX in tx_hashXs:
|
|
|
|
hashXs[hashX].remove(tx_hash)
|
|
|
|
if not hashXs[hashX]:
|
|
|
|
del hashXs[hashX]
|
|
|
|
touched.update(tx_hashXs)
|
|
|
|
|
|
|
|
# Process new transactions
|
|
|
|
new_hashes = list(all_hashes.difference(txs))
|
|
|
|
if new_hashes:
|
2018-12-07 02:03:22 +01:00
|
|
|
fetches = []
|
2018-11-03 23:50:34 +01:00
|
|
|
for hashes in chunks(new_hashes, 200):
|
2018-12-07 02:03:22 +01:00
|
|
|
fetches.append(self._fetch_and_accept(hashes, all_hashes, touched))
|
2018-11-03 23:50:34 +01:00
|
|
|
tx_map = {}
|
|
|
|
utxo_map = {}
|
2018-12-07 02:03:22 +01:00
|
|
|
for fetch in asyncio.as_completed(fetches):
|
|
|
|
deferred, unspent = await fetch
|
2018-11-03 23:50:34 +01:00
|
|
|
tx_map.update(deferred)
|
|
|
|
utxo_map.update(unspent)
|
|
|
|
|
|
|
|
prior_count = 0
|
|
|
|
# FIXME: this is not particularly efficient
|
|
|
|
while tx_map and len(tx_map) != prior_count:
|
|
|
|
prior_count = len(tx_map)
|
2020-12-14 19:42:20 +01:00
|
|
|
tx_map, utxo_map = self._accept_transactions(tx_map, utxo_map, touched)
|
|
|
|
|
2018-11-03 23:50:34 +01:00
|
|
|
if tx_map:
|
|
|
|
self.logger.info(f'{len(tx_map)} txs dropped')
|
|
|
|
|
|
|
|
return touched
|
|
|
|
|
|
|
|
async def _fetch_and_accept(self, hashes, all_hashes, touched):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Fetch a list of mempool transactions."""
|
2021-07-16 20:51:10 +02:00
|
|
|
raw_txs = await self._daemon.getrawtransactions((hash_to_hex_str(hash) for hash in hashes))
|
2018-11-03 23:50:34 +01:00
|
|
|
|
2020-12-13 19:48:50 +01:00
|
|
|
to_hashX = self.coin.hashX_from_script
|
|
|
|
deserializer = self.coin.DESERIALIZER
|
|
|
|
|
|
|
|
tx_map = {}
|
|
|
|
for hash, raw_tx in zip(hashes, raw_txs):
|
|
|
|
# The daemon may have evicted the tx from its
|
|
|
|
# mempool or it may have gotten in a block
|
|
|
|
if not raw_tx:
|
|
|
|
continue
|
|
|
|
tx, tx_size = deserializer(raw_tx).read_tx_and_vsize()
|
|
|
|
# Convert the inputs and outputs into (hashX, value) pairs
|
|
|
|
# Drop generation-like inputs from MemPoolTx.prevouts
|
|
|
|
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
|
|
|
|
for txin in tx.inputs
|
|
|
|
if not txin.is_generation())
|
|
|
|
txout_pairs = tuple((to_hashX(txout.pk_script), txout.value)
|
|
|
|
for txout in tx.outputs)
|
|
|
|
tx_map[hash] = MemPoolTx(txin_pairs, None, txout_pairs,
|
2021-10-28 02:19:08 +02:00
|
|
|
0, tx_size, raw_tx)
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
# Determine all prevouts not in the mempool, and fetch the
|
|
|
|
# UTXO information from the database. Failed prevout lookups
|
|
|
|
# return None - concurrent database updates happen - which is
|
|
|
|
# relied upon by _accept_transactions. Ignore prevouts that are
|
|
|
|
# generation-like.
|
|
|
|
prevouts = tuple(prevout for tx in tx_map.values()
|
|
|
|
for prevout in tx.prevouts
|
|
|
|
if prevout[0] not in all_hashes)
|
2021-07-16 20:51:10 +02:00
|
|
|
utxos = await self._db.lookup_utxos(prevouts)
|
2021-03-26 14:15:38 +01:00
|
|
|
utxo_map = dict(zip(prevouts, utxos))
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
return self._accept_transactions(tx_map, utxo_map, touched)
|
|
|
|
|
|
|
|
#
|
|
|
|
# External interface
|
|
|
|
#
|
|
|
|
|
|
|
|
async def keep_synchronized(self, synchronized_event):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Keep the mempool synchronized with the daemon."""
|
2018-12-07 02:03:22 +01:00
|
|
|
await asyncio.wait([
|
2021-03-02 23:59:15 +01:00
|
|
|
self._mempool_loop(synchronized_event),
|
2020-12-16 01:22:12 +01:00
|
|
|
# self._refresh_histogram(synchronized_event),
|
2018-12-07 02:03:22 +01:00
|
|
|
self._logging(synchronized_event)
|
|
|
|
])
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
async def balance_delta(self, hashX):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Return the unconfirmed amount in the mempool for hashX.
|
2018-11-03 23:50:34 +01:00
|
|
|
|
|
|
|
Can be positive or negative.
|
2019-04-16 09:50:35 +02:00
|
|
|
"""
|
2018-11-03 23:50:34 +01:00
|
|
|
value = 0
|
|
|
|
if hashX in self.hashXs:
|
|
|
|
for hash in self.hashXs[hashX]:
|
|
|
|
tx = self.txs[hash]
|
|
|
|
value -= sum(v for h168, v in tx.in_pairs if h168 == hashX)
|
|
|
|
value += sum(v for h168, v in tx.out_pairs if h168 == hashX)
|
|
|
|
return value
|
|
|
|
|
2021-01-22 03:37:39 +01:00
|
|
|
def compact_fee_histogram(self):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Return a compact fee histogram of the current mempool."""
|
2018-11-03 23:50:34 +01:00
|
|
|
return self.cached_compact_histogram
|
|
|
|
|
|
|
|
async def potential_spends(self, hashX):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Return a set of (prev_hash, prev_idx) pairs from mempool
|
2018-11-03 23:50:34 +01:00
|
|
|
transactions that touch hashX.
|
|
|
|
|
|
|
|
None, some or all of these may be spends of the hashX, but all
|
|
|
|
actual spends of it (in the DB or mempool) will be included.
|
2019-04-16 09:50:35 +02:00
|
|
|
"""
|
2018-11-03 23:50:34 +01:00
|
|
|
result = set()
|
|
|
|
for tx_hash in self.hashXs.get(hashX, ()):
|
|
|
|
tx = self.txs[tx_hash]
|
|
|
|
result.update(tx.prevouts)
|
|
|
|
return result
|
|
|
|
|
2020-12-03 23:08:26 +01:00
|
|
|
def transaction_summaries(self, hashX):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Return a list of MemPoolTxSummary objects for the hashX."""
|
2018-11-03 23:50:34 +01:00
|
|
|
result = []
|
|
|
|
for tx_hash in self.hashXs.get(hashX, ()):
|
|
|
|
tx = self.txs[tx_hash]
|
|
|
|
has_ui = any(hash in self.txs for hash, idx in tx.prevouts)
|
|
|
|
result.append(MemPoolTxSummary(tx_hash, tx.fee, has_ui))
|
|
|
|
return result
|
|
|
|
|
|
|
|
async def unordered_UTXOs(self, hashX):
|
2019-04-16 09:50:35 +02:00
|
|
|
"""Return an unordered list of UTXO named tuples from mempool
|
2018-11-03 23:50:34 +01:00
|
|
|
transactions that pay to hashX.
|
|
|
|
|
|
|
|
This does not consider if any other mempool transactions spend
|
|
|
|
the outputs.
|
2019-04-16 09:50:35 +02:00
|
|
|
"""
|
2018-11-03 23:50:34 +01:00
|
|
|
utxos = []
|
|
|
|
for tx_hash in self.hashXs.get(hashX, ()):
|
|
|
|
tx = self.txs.get(tx_hash)
|
|
|
|
for pos, (hX, value) in enumerate(tx.out_pairs):
|
|
|
|
if hX == hashX:
|
|
|
|
utxos.append(UTXO(-1, pos, tx_hash, 0, value))
|
|
|
|
return utxos
|
2020-07-06 17:58:23 +02:00
|
|
|
|
|
|
|
def get_mempool_height(self, tx_hash):
|
|
|
|
# Height Progression
|
|
|
|
# -2: not broadcast
|
|
|
|
# -1: in mempool but has unconfirmed inputs
|
|
|
|
# 0: in mempool and all inputs confirmed
|
|
|
|
# +num: confirmed in a specific block (height)
|
|
|
|
if tx_hash not in self.txs:
|
|
|
|
return -2
|
|
|
|
tx = self.txs[tx_hash]
|
|
|
|
unspent_inputs = sum(1 if hash in self.txs else 0 for hash, idx in tx.prevouts)
|
|
|
|
if unspent_inputs:
|
|
|
|
return -1
|
|
|
|
return 0
|
2021-07-16 20:51:10 +02:00
|
|
|
|
|
|
|
async def _maybe_notify(self, new_touched):
|
|
|
|
tmp, tbp = self._touched_mp, self._touched_bp
|
|
|
|
common = set(tmp).intersection(tbp)
|
|
|
|
if common:
|
|
|
|
height = max(common)
|
|
|
|
elif tmp and max(tmp) == self._highest_block:
|
|
|
|
height = self._highest_block
|
|
|
|
else:
|
|
|
|
# Either we are processing a block and waiting for it to
|
|
|
|
# come in, or we have not yet had a mempool update for the
|
|
|
|
# new block height
|
|
|
|
return
|
|
|
|
touched = tmp.pop(height)
|
|
|
|
for old in [h for h in tmp if h <= height]:
|
|
|
|
del tmp[old]
|
|
|
|
for old in [h for h in tbp if h <= height]:
|
|
|
|
touched.update(tbp.pop(old))
|
|
|
|
# print("notify", height, len(touched), len(new_touched))
|
|
|
|
await self.notify_sessions(height, touched, new_touched)
|
|
|
|
|
|
|
|
async def start(self, height, session_manager: 'LBRYSessionManager'):
|
|
|
|
self._highest_block = height
|
|
|
|
self.notify_sessions = session_manager._notify_sessions
|
|
|
|
await self.notify_sessions(height, set(), set())
|
|
|
|
|
|
|
|
async def on_mempool(self, touched, new_touched, height):
|
|
|
|
self._touched_mp[height] = touched
|
|
|
|
await self._maybe_notify(new_touched)
|
|
|
|
|
|
|
|
async def on_block(self, touched, height):
|
|
|
|
self._touched_bp[height] = touched
|
|
|
|
self._highest_block = height
|
|
|
|
await self._maybe_notify(set())
|