precomputed address status upon new blocks and mempool

-adds HashXStatusPrefixRow and HashXMempoolStatusPrefixRow column families
This commit is contained in:
Jack Robison 2022-04-05 14:10:44 -04:00
parent a57b69391b
commit 4cfc1b4765
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 246 additions and 31 deletions

View file

@ -0,0 +1,110 @@
import itertools
import attr
import typing
from collections import defaultdict
from scribe.blockchain.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING:
from scribe.db import HubDB
@attr.s(slots=True)
class MemPoolTx:
prevouts = attr.ib()
# A pair is a (hashX, value) tuple
in_pairs = attr.ib()
out_pairs = attr.ib()
fee = attr.ib()
size = attr.ib()
raw_tx = attr.ib()
@attr.s(slots=True)
class MemPoolTxSummary:
hash = attr.ib()
fee = attr.ib()
has_unconfirmed_inputs = attr.ib()
class MemPool:
def __init__(self, coin, db: 'HubDB'):
self.coin = coin
self._db = db
self.txs = {}
self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key
def mempool_history(self, hashX: bytes) -> str:
result = ''
for tx_hash in self.touched_hashXs.get(hashX, ()):
if tx_hash not in self.txs:
continue # the tx hash for the touched address is an input that isn't in mempool anymore
result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:'
return result
def remove(self, to_remove: typing.Dict[bytes, bytes]):
# Remove txs that aren't in mempool anymore
for tx_hash in set(self.txs).intersection(to_remove.keys()):
tx = self.txs.pop(tx_hash)
tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs})
for hashX in tx_hashXs:
if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]:
self.touched_hashXs[hashX].remove(tx_hash)
if not self.touched_hashXs[hashX]:
self.touched_hashXs.pop(hashX)
def update_mempool(self, to_add: typing.List[typing.Tuple[bytes, bytes]]) -> typing.Set[bytes]:
prefix_db = self._db.prefix_db
touched_hashXs = set()
# Re-sync with the new set of hashes
tx_map = {}
for tx_hash, raw_tx in to_add:
if tx_hash in self.txs:
continue
tx, tx_size = Deserializer(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Drop generation-like inputs from MemPoolTx.prevouts
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs
if not txin.is_generation())
txout_pairs = tuple((self.coin.hashX_from_txo(txout), txout.value)
for txout in tx.outputs if txout.pk_script)
tx_map[tx_hash] = MemPoolTx(None, txin_pairs, txout_pairs, 0, tx_size, raw_tx)
for tx_hash, tx in tx_map.items():
prevouts = []
# Look up the prevouts
for prev_hash, prev_index in tx.in_pairs:
if prev_hash in self.txs: # accepted mempool
utxo = self.txs[prev_hash].out_pairs[prev_index]
elif prev_hash in tx_map: # this set of changes
utxo = tx_map[prev_hash].out_pairs[prev_index]
else: # get it from the db
prev_tx_num = prefix_db.tx_num.get(prev_hash)
if not prev_tx_num:
continue
prev_tx_num = prev_tx_num.tx_num
hashX_val = prefix_db.hashX_utxo.get(prev_hash[:4], prev_tx_num, prev_index)
if not hashX_val:
continue
hashX = hashX_val.hashX
utxo_value = prefix_db.utxo.get(hashX, prev_tx_num, prev_index)
utxo = (hashX, utxo_value.amount)
prevouts.append(utxo)
# Save the prevouts, compute the fee and accept the TX
tx.prevouts = tuple(prevouts)
# Avoid negative fees if dealing with generation-like transactions
# because some in_parts would be missing
tx.fee = max(0, (sum(v for _, v in tx.prevouts) -
sum(v for _, v in tx.out_pairs)))
self.txs[tx_hash] = tx
for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs):
self.touched_hashXs[hashX].add(tx_hash)
touched_hashXs.add(hashX)
return touched_hashXs
def clear(self):
self.txs.clear()
self.touched_hashXs.clear()

View file

@ -11,10 +11,11 @@ from scribe import PROMETHEUS_NAMESPACE
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.error.base import ChainError from scribe.error.base import ChainError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher from scribe.blockchain.prefetcher import Prefetcher
from scribe.blockchain.mempool import MemPool
from scribe.schema.url import normalize_name from scribe.schema.url import normalize_name
from scribe.service import BlockchainService from scribe.service import BlockchainService
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -45,6 +46,7 @@ class BlockchainProcessorService(BlockchainService):
def __init__(self, env: 'Env'): def __init__(self, env: 'Env'):
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.daemon = LBCDaemon(env.coin, env.daemon_url) self.daemon = LBCDaemon(env.coin, env.daemon_url)
self.mempool = MemPool(env.coin, self.db)
self.coin = env.coin self.coin = env.coin
self.wait_for_blocks_duration = 0.1 self.wait_for_blocks_duration = 0.1
self._ready_to_stop = asyncio.Event() self._ready_to_stop = asyncio.Event()
@ -147,6 +149,10 @@ class BlockchainProcessorService(BlockchainService):
} }
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
for tx_hash, raw_tx in to_put: for tx_hash, raw_tx in to_put:
mempool_prefix.stage_put((tx_hash,), (raw_tx,)) mempool_prefix.stage_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items(): for tx_hash, raw_tx in to_delete.items():
@ -157,17 +163,17 @@ class BlockchainProcessorService(BlockchainService):
current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx) current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx)
_to_put = [] _to_put = []
try: try:
mempool_hashes = await self.daemon.mempool_hashes() mempool_txids = await self.daemon.mempool_hashes()
except (TypeError, RPCError) as err: except (TypeError, RPCError) as err:
self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err) self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err)
return return
for hh in mempool_hashes: for mempool_txid in mempool_txids:
tx_hash = bytes.fromhex(hh)[::-1] tx_hash = bytes.fromhex(mempool_txid)[::-1]
if tx_hash in current_mempool: if tx_hash in current_mempool:
current_mempool.pop(tx_hash) current_mempool.pop(tx_hash)
else: else:
try: try:
_to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh)))) _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(mempool_txid))))
except (TypeError, RPCError): except (TypeError, RPCError):
self.log.warning("failed to get a mempool tx, reorg underway?") self.log.warning("failed to get a mempool tx, reorg underway?")
return return
@ -1238,6 +1244,33 @@ class BlockchainProcessorService(BlockchainService):
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes)
def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
existing = self.db.prefix_db.hashX_status.get(hashX)
if existing:
self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_hash, height in new_history:
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
history += self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
def advance_block(self, block: Block): def advance_block(self, block: Block):
height = self.height + 1 height = self.height + 1
# print("advance ", height) # print("advance ", height)
@ -1326,7 +1359,14 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
for k, v in self.db.prefix_db.hashX_mempool_status.iterate(
start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False):
self.db.prefix_db.stage_raw_delete(k, v)
for hashX, new_history in self.hashXs_by_tx.items(): for hashX, new_history in self.hashXs_by_tx.items():
self._get_update_hashX_status_ops(
hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history]
)
if height > self.env.reorg_limit: # compactify existing history if height > self.env.reorg_limit: # compactify existing history
hist_txs = b'' hist_txs = b''
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit # accumulate and delete all of the tx histories between height 1 and current - reorg_limit
@ -1418,6 +1458,7 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transactions.clear() self.pending_transactions.clear()
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.touched_hashXs.clear() self.touched_hashXs.clear()
self.mempool.clear()
def backup_block(self): def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0

View file

@ -46,6 +46,8 @@ class DB_PREFIXES(enum.Enum):
trending_notifications = b'c' trending_notifications = b'c'
mempool_tx = b'd' mempool_tx = b'd'
touched_hashX = b'e' touched_hashX = b'e'
hashX_status = b'f'
hashX_mempool_status = b'g'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -23,6 +23,7 @@ from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedR
from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey
from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow
from scribe.db.prefixes import HashXMempoolStatusPrefixRow
TXO_STRUCT = struct.Struct(b'>LH') TXO_STRUCT = struct.Struct(b'>LH')
@ -804,7 +805,8 @@ class HubDB:
self.prefix_db = PrefixDB( self.prefix_db = PrefixDB(
db_path, cache_mb=self._cache_MB, db_path, cache_mb=self._cache_MB,
reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix}, secondary_path=secondary_path unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix},
secondary_path=secondary_path
) )
if secondary_path != '': if secondary_path != '':
@ -1017,7 +1019,7 @@ class HubDB:
txs_extend = txs.extend txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
txs_extend(hist) txs_extend(hist)
if len(txs) >= limit: if limit and len(txs) >= limit:
break break
return txs return txs

View file

@ -1623,6 +1623,76 @@ class TouchedHashXPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(touched) return cls.pack_key(height), cls.pack_value(touched)
class HashXStatusKey(NamedTuple):
hashX: bytes
class HashXStatusValue(NamedTuple):
status: bytes
class HashXStatusPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_status.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXStatusKey:
return HashXStatusKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, status: bytes) -> bytes:
return super().pack_value(status)
@classmethod
def unpack_value(cls, data: bytes) -> HashXStatusValue:
return HashXStatusValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, hashX: bytes, status: bytes):
return cls.pack_key(hashX), cls.pack_value(status)
class HashXMempoolStatusPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_mempool_status.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXStatusKey:
return HashXStatusKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, status: bytes) -> bytes:
return super().pack_value(status)
@classmethod
def unpack_value(cls, data: bytes) -> HashXStatusValue:
return HashXStatusValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, hashX: bytes, status: bytes):
return cls.pack_key(hashX), cls.pack_value(status)
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64, def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
@ -1662,6 +1732,8 @@ class PrefixDB(BasePrefixDB):
self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack) self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack)
self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack) self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack)
self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack) self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack)
self.hashX_status = HashXStatusPrefixRow(db, self._op_stack)
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -1101,18 +1101,12 @@ class LBRYElectrumX(asyncio.Protocol):
return len(self.hashX_subs) return len(self.hashX_subs)
async def get_hashX_status(self, hashX: bytes): async def get_hashX_status(self, hashX: bytes):
mempool_history = self.mempool.transaction_summaries(hashX) mempool_status = self.db.prefix_db.hashX_mempool_status.get(hashX)
history = ''.join(f'{hash_to_hex_str(tx_hash)}:' if mempool_status:
f'{height:d}:' return mempool_status.status.hex()
for tx_hash, height in await self.session_manager.limited_history(hashX)) status = self.db.prefix_db.hashX_status.get(hashX)
history += ''.join(f'{hash_to_hex_str(tx.hash)}:' if status:
f'{-tx.has_unconfirmed_inputs:d}:' return status.status.hex()
for tx in mempool_history)
if history:
status = sha256(history.encode()).hex()
else:
status = None
return history, status, len(mempool_history) > 0
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
notifications = [] notifications = []
@ -1123,14 +1117,12 @@ class LBRYElectrumX(asyncio.Protocol):
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
start = time.perf_counter() start = time.perf_counter()
history, status, mempool_status = await self.get_hashX_status(hashX) status = await self.get_hashX_status(hashX)
if mempool_status: duration = time.perf_counter() - start
self.session_manager.mempool_statuses[hashX] = status self.session_manager.address_history_metric.observe(duration)
else:
self.session_manager.mempool_statuses.pop(hashX, None)
self.session_manager.address_history_metric.observe(time.perf_counter() - start)
notifications.append((method, (alias, status))) notifications.append((method, (alias, status)))
if duration > 30:
self.logger.warning("slow history notification (%s) for '%s'", duration, alias)
start = time.perf_counter() start = time.perf_counter()
self.session_manager.notifications_in_flight_metric.inc() self.session_manager.notifications_in_flight_metric.inc()
@ -1336,11 +1328,7 @@ class LBRYElectrumX(asyncio.Protocol):
""" """
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
_, status, has_mempool_history = await self.get_hashX_status(hashX) status = await self.get_hashX_status(hashX)
if has_mempool_history:
self.session_manager.mempool_statuses[hashX] = status
else:
self.session_manager.mempool_statuses.pop(hashX, None)
return status return status
async def hashX_listunspent(self, hashX: bytes): async def hashX_listunspent(self, hashX: bytes):