Merge pull request #17 from lbryio/precomputed-hashx-status

Precomputed hashx status
This commit is contained in:
Jack Robison 2022-04-07 12:06:18 -04:00 committed by GitHub
commit 311db529a0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 372 additions and 56 deletions

View file

@ -0,0 +1,110 @@
import itertools
import attr
import typing
from collections import defaultdict
from scribe.blockchain.transaction.deserializer import Deserializer
if typing.TYPE_CHECKING:
from scribe.db import HubDB
@attr.s(slots=True)
class MemPoolTx:
prevouts = attr.ib()
# A pair is a (hashX, value) tuple
in_pairs = attr.ib()
out_pairs = attr.ib()
fee = attr.ib()
size = attr.ib()
raw_tx = attr.ib()
@attr.s(slots=True)
class MemPoolTxSummary:
hash = attr.ib()
fee = attr.ib()
has_unconfirmed_inputs = attr.ib()
class MemPool:
def __init__(self, coin, db: 'HubDB'):
self.coin = coin
self._db = db
self.txs = {}
self.touched_hashXs: typing.DefaultDict[bytes, typing.Set[bytes]] = defaultdict(set) # None can be a key
def mempool_history(self, hashX: bytes) -> str:
result = ''
for tx_hash in self.touched_hashXs.get(hashX, ()):
if tx_hash not in self.txs:
continue # the tx hash for the touched address is an input that isn't in mempool anymore
result += f'{tx_hash[::-1].hex()}:{-any(_hash in self.txs for _hash, idx in self.txs[tx_hash].in_pairs):d}:'
return result
def remove(self, to_remove: typing.Dict[bytes, bytes]):
# Remove txs that aren't in mempool anymore
for tx_hash in set(self.txs).intersection(to_remove.keys()):
tx = self.txs.pop(tx_hash)
tx_hashXs = {hashX for hashX, value in tx.in_pairs}.union({hashX for hashX, value in tx.out_pairs})
for hashX in tx_hashXs:
if hashX in self.touched_hashXs and tx_hash in self.touched_hashXs[hashX]:
self.touched_hashXs[hashX].remove(tx_hash)
if not self.touched_hashXs[hashX]:
self.touched_hashXs.pop(hashX)
def update_mempool(self, to_add: typing.List[typing.Tuple[bytes, bytes]]) -> typing.Set[bytes]:
prefix_db = self._db.prefix_db
touched_hashXs = set()
# Re-sync with the new set of hashes
tx_map = {}
for tx_hash, raw_tx in to_add:
if tx_hash in self.txs:
continue
tx, tx_size = Deserializer(raw_tx).read_tx_and_vsize()
# Convert the inputs and outputs into (hashX, value) pairs
# Drop generation-like inputs from MemPoolTx.prevouts
txin_pairs = tuple((txin.prev_hash, txin.prev_idx)
for txin in tx.inputs
if not txin.is_generation())
txout_pairs = tuple((self.coin.hashX_from_txo(txout), txout.value)
for txout in tx.outputs if txout.pk_script)
tx_map[tx_hash] = MemPoolTx(None, txin_pairs, txout_pairs, 0, tx_size, raw_tx)
for tx_hash, tx in tx_map.items():
prevouts = []
# Look up the prevouts
for prev_hash, prev_index in tx.in_pairs:
if prev_hash in self.txs: # accepted mempool
utxo = self.txs[prev_hash].out_pairs[prev_index]
elif prev_hash in tx_map: # this set of changes
utxo = tx_map[prev_hash].out_pairs[prev_index]
else: # get it from the db
prev_tx_num = prefix_db.tx_num.get(prev_hash)
if not prev_tx_num:
continue
prev_tx_num = prev_tx_num.tx_num
hashX_val = prefix_db.hashX_utxo.get(prev_hash[:4], prev_tx_num, prev_index)
if not hashX_val:
continue
hashX = hashX_val.hashX
utxo_value = prefix_db.utxo.get(hashX, prev_tx_num, prev_index)
utxo = (hashX, utxo_value.amount)
prevouts.append(utxo)
# Save the prevouts, compute the fee and accept the TX
tx.prevouts = tuple(prevouts)
# Avoid negative fees if dealing with generation-like transactions
# because some in_parts would be missing
tx.fee = max(0, (sum(v for _, v in tx.prevouts) -
sum(v for _, v in tx.out_pairs)))
self.txs[tx_hash] = tx
for hashX, value in itertools.chain(tx.prevouts, tx.out_pairs):
self.touched_hashXs[hashX].add(tx_hash)
touched_hashXs.add(hashX)
return touched_hashXs
def clear(self):
self.txs.clear()
self.touched_hashXs.clear()

View file

@ -11,10 +11,11 @@ from scribe import PROMETHEUS_NAMESPACE
from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from scribe.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from scribe.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from scribe.error.base import ChainError from scribe.error.base import ChainError
from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem from scribe.common import hash_to_hex_str, hash160, RPCError, HISTOGRAM_BUCKETS, StagedClaimtrieItem, sha256
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block from scribe.blockchain.transaction import Tx, TxOutput, TxInput, Block
from scribe.blockchain.prefetcher import Prefetcher from scribe.blockchain.prefetcher import Prefetcher
from scribe.blockchain.mempool import MemPool
from scribe.schema.url import normalize_name from scribe.schema.url import normalize_name
from scribe.service import BlockchainService from scribe.service import BlockchainService
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
@ -45,6 +46,7 @@ class BlockchainProcessorService(BlockchainService):
def __init__(self, env: 'Env'): def __init__(self, env: 'Env'):
super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor') super().__init__(env, secondary_name='', thread_workers=1, thread_prefix='block-processor')
self.daemon = LBCDaemon(env.coin, env.daemon_url) self.daemon = LBCDaemon(env.coin, env.daemon_url)
self.mempool = MemPool(env.coin, self.db)
self.coin = env.coin self.coin = env.coin
self.wait_for_blocks_duration = 0.1 self.wait_for_blocks_duration = 0.1
self._ready_to_stop = asyncio.Event() self._ready_to_stop = asyncio.Event()
@ -147,6 +149,10 @@ class BlockchainProcessorService(BlockchainService):
} }
def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete): def update_mempool(unsafe_commit, mempool_prefix, to_put, to_delete):
self.mempool.remove(to_delete)
touched_hashXs = self.mempool.update_mempool(to_put)
for hashX in touched_hashXs:
self._get_update_hashX_mempool_status_ops(hashX)
for tx_hash, raw_tx in to_put: for tx_hash, raw_tx in to_put:
mempool_prefix.stage_put((tx_hash,), (raw_tx,)) mempool_prefix.stage_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in to_delete.items(): for tx_hash, raw_tx in to_delete.items():
@ -157,17 +163,17 @@ class BlockchainProcessorService(BlockchainService):
current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx) current_mempool = await self.run_in_thread(fetch_mempool, self.db.prefix_db.mempool_tx)
_to_put = [] _to_put = []
try: try:
mempool_hashes = await self.daemon.mempool_hashes() mempool_txids = await self.daemon.mempool_hashes()
except (TypeError, RPCError) as err: except (TypeError, RPCError) as err:
self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err) self.log.exception("failed to get mempool tx hashes, reorg underway? (%s)", err)
return return
for hh in mempool_hashes: for mempool_txid in mempool_txids:
tx_hash = bytes.fromhex(hh)[::-1] tx_hash = bytes.fromhex(mempool_txid)[::-1]
if tx_hash in current_mempool: if tx_hash in current_mempool:
current_mempool.pop(tx_hash) current_mempool.pop(tx_hash)
else: else:
try: try:
_to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(hh)))) _to_put.append((tx_hash, bytes.fromhex(await self.daemon.getrawtransaction(mempool_txid))))
except (TypeError, RPCError): except (TypeError, RPCError):
self.log.warning("failed to get a mempool tx, reorg underway?") self.log.warning("failed to get a mempool tx, reorg underway?")
return return
@ -1238,6 +1244,50 @@ class BlockchainProcessorService(BlockchainService):
self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes) self.touched_claims_to_send_es.difference_update(self.removed_claim_hashes)
self.removed_claims_to_send_es.update(self.removed_claim_hashes) self.removed_claims_to_send_es.update(self.removed_claim_hashes)
def _get_update_hashX_status_ops(self, hashX: bytes, new_history: List[Tuple[bytes, int]]):
existing = self.db.prefix_db.hashX_status.get(hashX)
if existing:
self.db.prefix_db.hashX_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
for tx_hash, height in new_history:
history += f'{hash_to_hex_str(tx_hash)}:{height:d}:'
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_status.stage_put((hashX,), (status,))
def _get_update_hashX_mempool_status_ops(self, hashX: bytes):
existing = self.db.prefix_db.hashX_mempool_status.get(hashX)
if existing:
self.db.prefix_db.hashX_mempool_status.stage_delete((hashX,), existing)
tx_nums = self.db.read_history(hashX, limit=None)
history = ''
for tx_num in tx_nums:
history += f'{hash_to_hex_str(self.db.get_tx_hash(tx_num) )}:{bisect_right(self.db.tx_counts, tx_num):d}:'
history += self.mempool.mempool_history(hashX)
if history:
status = sha256(history.encode())
self.db.prefix_db.hashX_mempool_status.stage_put((hashX,), (status,))
def _get_compactify_hashX_history_ops(self, height: int, hashX: bytes):
if height > self.env.reorg_limit: # compactify existing history
hist_txs = b''
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit
for k, hist in self.db.prefix_db.hashX_history.iterate(
start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit),
deserialize_key=False, deserialize_value=False):
hist_txs += hist
self.db.prefix_db.stage_raw_delete(k, hist)
if hist_txs:
# add the accumulated histories onto the existing compacted history at height 0
key = self.db.prefix_db.hashX_history.pack_key(hashX, 0)
existing = self.db.prefix_db.get(key)
if existing is not None:
self.db.prefix_db.stage_raw_delete(key, existing)
self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs)
def advance_block(self, block: Block): def advance_block(self, block: Block):
height = self.height + 1 height = self.height + 1
# print("advance ", height) # print("advance ", height)
@ -1326,22 +1376,16 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
for k, v in self.db.prefix_db.hashX_mempool_status.iterate(
start=(b'\x00' * 20, ), stop=(b'\xff' * 20, ), deserialize_key=False, deserialize_value=False):
self.db.prefix_db.stage_raw_delete(k, v)
for hashX, new_history in self.hashXs_by_tx.items(): for hashX, new_history in self.hashXs_by_tx.items():
if height > self.env.reorg_limit: # compactify existing history # TODO: combine this with compaction so that we only read the history once
hist_txs = b'' self._get_update_hashX_status_ops(
# accumulate and delete all of the tx histories between height 1 and current - reorg_limit hashX, [(self.pending_transactions[tx_num], height) for tx_num in new_history]
for k, hist in self.db.prefix_db.hashX_history.iterate( )
start=(hashX, 1), stop=(hashX, height - self.env.reorg_limit), self._get_compactify_hashX_history_ops(height, hashX)
deserialize_key=False, deserialize_value=False):
hist_txs += hist
self.db.prefix_db.stage_raw_delete(k, hist)
if hist_txs:
# add the accumulated histories onto the existing compacted history at height 0
key = self.db.prefix_db.hashX_history.pack_key(hashX, 0)
existing = self.db.prefix_db.get(key)
if existing is not None:
self.db.prefix_db.stage_raw_delete(key, existing)
self.db.prefix_db.stage_raw_put(key, (existing or b'') + hist_txs)
if not new_history: if not new_history:
continue continue
self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,)) self.db.prefix_db.hashX_history.stage_put(key_args=(hashX, height), value_args=(new_history,))
@ -1418,6 +1462,7 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transactions.clear() self.pending_transactions.clear()
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.touched_hashXs.clear() self.touched_hashXs.clear()
self.mempool.clear()
def backup_block(self): def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0
@ -1592,6 +1637,16 @@ class BlockchainProcessorService(BlockchainService):
await self.run_in_thread_with_lock(flush) await self.run_in_thread_with_lock(flush)
def _iter_start_tasks(self): def _iter_start_tasks(self):
while self.db.db_version < max(self.db.DB_VERSIONS):
if self.db.db_version == 7:
from scribe.db.migrators.migrate7to8 import migrate, FROM_VERSION, TO_VERSION
else:
raise RuntimeError("unknown db version")
self.log.warning(f"migrating database from version {FROM_VERSION} to version {TO_VERSION}")
migrate(self.db)
self.log.info("finished migration")
self.db.read_db_state()
self.height = self.db.db_height self.height = self.db.db_height
self.tip = self.db.db_tip self.tip = self.db.db_tip
self.tx_count = self.db.db_tx_count self.tx_count = self.db.db_tx_count

View file

@ -46,6 +46,8 @@ class DB_PREFIXES(enum.Enum):
trending_notifications = b'c' trending_notifications = b'c'
mempool_tx = b'd' mempool_tx = b'd'
touched_hashX = b'e' touched_hashX = b'e'
hashX_status = b'f'
hashX_mempool_status = b'g'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -23,6 +23,7 @@ from scribe.db.common import ResolveResult, STREAM_TYPES, CLAIM_TYPES, ExpandedR
from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB from scribe.db.prefixes import PendingActivationValue, ClaimTakeoverValue, ClaimToTXOValue, PrefixDB
from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey from scribe.db.prefixes import ACTIVATED_CLAIM_TXO_TYPE, ACTIVATED_SUPPORT_TXO_TYPE, EffectiveAmountKey
from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow from scribe.db.prefixes import PendingActivationKey, TXOToClaimValue, DBStatePrefixRow, MempoolTXPrefixRow
from scribe.db.prefixes import HashXMempoolStatusPrefixRow
TXO_STRUCT = struct.Struct(b'>LH') TXO_STRUCT = struct.Struct(b'>LH')
@ -31,7 +32,7 @@ TXO_STRUCT_pack = TXO_STRUCT.pack
class HubDB: class HubDB:
DB_VERSIONS = HIST_DB_VERSIONS = [7] DB_VERSIONS = [7, 8]
def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200, def __init__(self, coin, db_dir: str, cache_MB: int = 512, reorg_limit: int = 200,
cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False, cache_all_claim_txos: bool = False, cache_all_tx_hashes: bool = False,
@ -804,7 +805,8 @@ class HubDB:
self.prefix_db = PrefixDB( self.prefix_db = PrefixDB(
db_path, cache_mb=self._cache_MB, db_path, cache_mb=self._cache_MB,
reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files, reorg_limit=self._reorg_limit, max_open_files=self._db_max_open_files,
unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix}, secondary_path=secondary_path unsafe_prefixes={DBStatePrefixRow.prefix, MempoolTXPrefixRow.prefix, HashXMempoolStatusPrefixRow.prefix},
secondary_path=secondary_path
) )
if secondary_path != '': if secondary_path != '':
@ -848,6 +850,14 @@ class HubDB:
self.prefix_db.close() self.prefix_db.close()
self.prefix_db = None self.prefix_db = None
def get_hashX_status(self, hashX: bytes):
mempool_status = self.prefix_db.hashX_mempool_status.get(hashX, deserialize_value=False)
if mempool_status:
return mempool_status.hex()
status = self.prefix_db.hashX_status.get(hashX, deserialize_value=False)
if status:
return status.hex()
def get_tx_hash(self, tx_num: int) -> bytes: def get_tx_hash(self, tx_num: int) -> bytes:
if self._cache_all_tx_hashes: if self._cache_all_tx_hashes:
return self.total_transactions[tx_num] return self.total_transactions[tx_num]
@ -1017,7 +1027,7 @@ class HubDB:
txs_extend = txs.extend txs_extend = txs.extend
for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False): for hist in self.prefix_db.hashX_history.iterate(prefix=(hashX,), include_key=False):
txs_extend(hist) txs_extend(hist)
if len(txs) >= limit: if limit and len(txs) >= limit:
break break
return txs return txs

View file

View file

@ -0,0 +1,88 @@
import logging
import time
import array
import typing
from bisect import bisect_right
from scribe.common import sha256
if typing.TYPE_CHECKING:
from scribe.db.db import HubDB
FROM_VERSION = 7
TO_VERSION = 8
def get_all_hashXs(db):
def iterator():
last_hashX = None
for k in db.prefix_db.hashX_history.iterate(deserialize_key=False, include_value=False):
hashX = k[1:12]
if last_hashX is None:
last_hashX = hashX
if last_hashX != hashX:
yield hashX
last_hashX = hashX
if last_hashX:
yield last_hashX
return [hashX for hashX in iterator()]
def hashX_history(db: 'HubDB', hashX: bytes):
history = b''
to_delete = []
for k, v in db.prefix_db.hashX_history.iterate(prefix=(hashX,), deserialize_value=False, deserialize_key=False):
to_delete.append((k, v))
history += v
return history, to_delete
def hashX_status_from_history(db: 'HubDB', history: bytes) -> bytes:
tx_counts = db.tx_counts
hist_tx_nums = array.array('I')
hist_tx_nums.frombytes(history)
hist = ''
for tx_num in hist_tx_nums:
hist += f'{db.get_tx_hash(tx_num)[::-1].hex()}:{bisect_right(tx_counts, tx_num)}:'
return sha256(hist.encode())
def migrate(db):
log = logging.getLogger(__name__)
start = time.perf_counter()
prefix_db = db.prefix_db
hashXs = get_all_hashXs(db)
log.info(f"loaded {len(hashXs)} hashXs in {round(time.perf_counter() - start, 2)}s, "
f"now building the status index...")
op_cnt = 0
hashX_cnt = 0
for hashX in hashXs:
hashX_cnt += 1
key = prefix_db.hashX_status.pack_key(hashX)
history, to_delete = hashX_history(db, hashX)
status = hashX_status_from_history(db, history)
existing_status = prefix_db.hashX_status.get(hashX, deserialize_value=False)
if existing_status and existing_status != status:
prefix_db.stage_raw_delete(key, existing_status)
op_cnt += 1
elif existing_status == status:
pass
else:
prefix_db.stage_raw_put(key, status)
op_cnt += 1
if len(to_delete) > 1:
for k, v in to_delete:
prefix_db.stage_raw_delete(k, v)
op_cnt += 1
if history:
prefix_db.stage_raw_put(prefix_db.hashX_history.pack_key(hashX, 0), history)
op_cnt += 1
if op_cnt > 100000:
prefix_db.unsafe_commit()
log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses")
op_cnt = 0
if op_cnt:
prefix_db.unsafe_commit()
log.info(f"wrote {hashX_cnt}/{len(hashXs)} hashXs statuses")
db.db_version = 8
db.write_db_state()
db.prefix_db.unsafe_commit()
log.info("finished migration")

View file

@ -1623,6 +1623,76 @@ class TouchedHashXPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(touched) return cls.pack_key(height), cls.pack_value(touched)
class HashXStatusKey(NamedTuple):
hashX: bytes
class HashXStatusValue(NamedTuple):
status: bytes
class HashXStatusPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_status.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXStatusKey:
return HashXStatusKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, status: bytes) -> bytes:
return super().pack_value(status)
@classmethod
def unpack_value(cls, data: bytes) -> HashXStatusValue:
return HashXStatusValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, hashX: bytes, status: bytes):
return cls.pack_key(hashX), cls.pack_value(status)
class HashXMempoolStatusPrefixRow(PrefixRow):
prefix = DB_PREFIXES.hashX_mempool_status.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, hashX: bytes):
return super().pack_key(hashX)
@classmethod
def unpack_key(cls, key: bytes) -> HashXStatusKey:
return HashXStatusKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, status: bytes) -> bytes:
return super().pack_value(status)
@classmethod
def unpack_value(cls, data: bytes) -> HashXStatusValue:
return HashXStatusValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, hashX: bytes, status: bytes):
return cls.pack_key(hashX), cls.pack_value(status)
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64, def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
@ -1662,6 +1732,8 @@ class PrefixDB(BasePrefixDB):
self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack) self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack)
self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack) self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack)
self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack) self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack)
self.hashX_status = HashXStatusPrefixRow(db, self._op_stack)
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -41,7 +41,7 @@ mempool_process_time_metric = Histogram(
) )
class MemPool: class HubMemPool:
def __init__(self, coin, db: 'HubDB', refresh_secs=1.0): def __init__(self, coin, db: 'HubDB', refresh_secs=1.0):
self.coin = coin self.coin = coin
self._db = db self._db = db

View file

@ -3,7 +3,7 @@ import time
import asyncio import asyncio
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.hub.session import SessionManager from scribe.hub.session import SessionManager
from scribe.hub.mempool import MemPool from scribe.hub.mempool import HubMemPool
from scribe.hub.udp import StatusServer from scribe.hub.udp import StatusServer
from scribe.service import BlockchainReaderService from scribe.service import BlockchainReaderService
from scribe.elasticsearch import ElasticNotifierClientProtocol from scribe.elasticsearch import ElasticNotifierClientProtocol
@ -16,7 +16,7 @@ class HubServerService(BlockchainReaderService):
self.mempool_notifications = set() self.mempool_notifications = set()
self.status_server = StatusServer() self.status_server = StatusServer()
self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs self.daemon = LBCDaemon(env.coin, env.daemon_url) # only needed for broadcasting txs
self.mempool = MemPool(self.env.coin, self.db) self.mempool = HubMemPool(self.env.coin, self.db)
self.session_manager = SessionManager( self.session_manager = SessionManager(
env, self.db, self.mempool, self.daemon, env, self.db, self.mempool, self.daemon,
self.shutdown_event, self.shutdown_event,

View file

@ -30,7 +30,7 @@ if typing.TYPE_CHECKING:
from scribe.db import HubDB from scribe.db import HubDB
from scribe.env import Env from scribe.env import Env
from scribe.blockchain.daemon import LBCDaemon from scribe.blockchain.daemon import LBCDaemon
from scribe.hub.mempool import MemPool from scribe.hub.mempool import HubMemPool
BAD_REQUEST = 1 BAD_REQUEST = 1
DAEMON_ERROR = 2 DAEMON_ERROR = 2
@ -38,13 +38,10 @@ DAEMON_ERROR = 2
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args ' SignatureInfo = namedtuple('SignatureInfo', 'min_args max_args '
'required_names other_names') 'required_names other_names')
def scripthash_to_hashX(scripthash: str) -> bytes: def scripthash_to_hashX(scripthash: str) -> bytes:
try: try:
bin_hash = hex_str_to_hash(scripthash) bin_hash = hex_str_to_hash(scripthash)
@ -136,7 +133,6 @@ class SessionManager:
tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE) tx_replied_count_metric = Counter("replied_transaction", "Number of transactions responded", namespace=NAMESPACE)
urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE) urls_to_resolve_count_metric = Counter("urls_to_resolve", "Number of urls to resolve", namespace=NAMESPACE)
resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE) resolved_url_count_metric = Counter("resolved_url", "Number of resolved urls", namespace=NAMESPACE)
interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE) interrupt_count_metric = Counter("interrupt", "Number of interrupted queries", namespace=NAMESPACE)
db_operational_error_metric = Counter( db_operational_error_metric = Counter(
"operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE "operational_error", "Number of queries that raised operational errors", namespace=NAMESPACE
@ -168,7 +164,7 @@ class SessionManager:
namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS namespace=NAMESPACE, buckets=HISTOGRAM_BUCKETS
) )
def __init__(self, env: 'Env', db: 'HubDB', mempool: 'MemPool', def __init__(self, env: 'Env', db: 'HubDB', mempool: 'HubMemPool',
daemon: 'LBCDaemon', shutdown_event: asyncio.Event, daemon: 'LBCDaemon', shutdown_event: asyncio.Event,
on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]): on_available_callback: typing.Callable[[], None], on_unavailable_callback: typing.Callable[[], None]):
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
@ -1105,18 +1101,7 @@ class LBRYElectrumX(asyncio.Protocol):
return len(self.hashX_subs) return len(self.hashX_subs)
async def get_hashX_status(self, hashX: bytes): async def get_hashX_status(self, hashX: bytes):
mempool_history = self.mempool.transaction_summaries(hashX) return await self.loop.run_in_executor(self.db._executor, self.db.get_hashX_status, hashX)
history = ''.join(f'{hash_to_hex_str(tx_hash)}:'
f'{height:d}:'
for tx_hash, height in await self.session_manager.limited_history(hashX))
history += ''.join(f'{hash_to_hex_str(tx.hash)}:'
f'{-tx.has_unconfirmed_inputs:d}:'
for tx in mempool_history)
if history:
status = sha256(history.encode()).hex()
else:
status = None
return history, status, len(mempool_history) > 0
async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]): async def send_history_notifications(self, *hashXes: typing.Iterable[bytes]):
notifications = [] notifications = []
@ -1127,14 +1112,12 @@ class LBRYElectrumX(asyncio.Protocol):
else: else:
method = 'blockchain.address.subscribe' method = 'blockchain.address.subscribe'
start = time.perf_counter() start = time.perf_counter()
history, status, mempool_status = await self.get_hashX_status(hashX) status = await self.get_hashX_status(hashX)
if mempool_status: duration = time.perf_counter() - start
self.session_manager.mempool_statuses[hashX] = status self.session_manager.address_history_metric.observe(duration)
else:
self.session_manager.mempool_statuses.pop(hashX, None)
self.session_manager.address_history_metric.observe(time.perf_counter() - start)
notifications.append((method, (alias, status))) notifications.append((method, (alias, status)))
if duration > 30:
self.logger.warning("slow history notification (%s) for '%s'", duration, alias)
start = time.perf_counter() start = time.perf_counter()
self.session_manager.notifications_in_flight_metric.inc() self.session_manager.notifications_in_flight_metric.inc()
@ -1340,11 +1323,7 @@ class LBRYElectrumX(asyncio.Protocol):
""" """
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if it has unconfirmed inputs, otherwise 0 # For mempool, height is -1 if it has unconfirmed inputs, otherwise 0
_, status, has_mempool_history = await self.get_hashX_status(hashX) status = await self.get_hashX_status(hashX)
if has_mempool_history:
self.session_manager.mempool_statuses[hashX] = status
else:
self.session_manager.mempool_statuses.pop(hashX, None)
return status return status
async def hashX_listunspent(self, hashX: bytes): async def hashX_listunspent(self, hashX: bytes):