add mempool, trending, and touched address indexes to the hub db

This commit is contained in:
Jack Robison 2021-12-01 16:47:29 -05:00
parent 0a71e2ff91
commit 77e64ef028
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 160 additions and 9 deletions

View file

@ -20,6 +20,7 @@ from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN
from lbry.wallet.server.util import chunks, class_logger from lbry.wallet.server.util import chunks, class_logger
from lbry.crypto.hash import hash160 from lbry.crypto.hash import hash160
from lbry.wallet.server.mempool import MemPool from lbry.wallet.server.mempool import MemPool
from lbry.wallet.server.db.common import TrendingNotification
from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE from lbry.wallet.server.db.prefixes import ACTIVATED_SUPPORT_TXO_TYPE, ACTIVATED_CLAIM_TXO_TYPE
from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue from lbry.wallet.server.db.prefixes import PendingActivationKey, PendingActivationValue, ClaimToTXOValue
from lbry.wallet.server.udp import StatusServer from lbry.wallet.server.udp import StatusServer
@ -28,12 +29,6 @@ if typing.TYPE_CHECKING:
from lbry.wallet.server.leveldb import LevelDB from lbry.wallet.server.leveldb import LevelDB
class TrendingNotification(NamedTuple):
height: int
prev_amount: int
new_amount: int
class Prefetcher: class Prefetcher:
"""Prefetches blocks (in the forward direction only).""" """Prefetches blocks (in the forward direction only)."""
@ -326,6 +321,22 @@ class BlockProcessor:
return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args) return await asyncio.get_event_loop().run_in_executor(self._chain_executor, func, *args)
return await asyncio.shield(run_in_thread()) return await asyncio.shield(run_in_thread())
async def check_mempool(self):
if self.db.prefix_db.closed:
return
current_mempool = {
k.tx_hash: v.raw_tx for (k, v) in self.db.prefix_db.mempool_tx.iterate()
}
for hh in await self.daemon.mempool_hashes():
tx_hash = bytes.fromhex(hh)[::-1]
if tx_hash in current_mempool:
current_mempool.pop(tx_hash)
else:
raw_tx = bytes.fromhex(await self.daemon.getrawtransaction(hh))
self.db.prefix_db.mempool_tx.stage_put((tx_hash,), (raw_tx,))
for tx_hash, raw_tx in current_mempool.items():
self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), (raw_tx,))
async def check_and_advance_blocks(self, raw_blocks): async def check_and_advance_blocks(self, raw_blocks):
"""Process the list of raw blocks passed. Detects and handles """Process the list of raw blocks passed. Detects and handles
reorgs. reorgs.
@ -1413,8 +1424,8 @@ class BlockProcessor:
or touched in self.pending_support_amount_change: or touched in self.pending_support_amount_change:
# exclude sending notifications for claims/supports that activated but # exclude sending notifications for claims/supports that activated but
# weren't added/spent in this block # weren't added/spent in this block
self._add_claim_activation_change_notification( self.db.prefix_db.trending_notification.stage_put(
touched.hex(), height, prev_effective_amount, new_effective_amount (height, touched), (prev_effective_amount, new_effective_amount)
) )
for channel_hash, count in self.pending_channel_counts.items(): for channel_hash, count in self.pending_channel_counts.items():
@ -1454,6 +1465,12 @@ class BlockProcessor:
spent_claims = {} spent_claims = {}
txos = Transaction(tx.raw).outputs txos = Transaction(tx.raw).outputs
# clean up mempool, delete txs that were already in mempool/staged to be added
# leave txs in mempool that werent in the block
mempool_tx = self.db.prefix_db.mempool_tx.get_pending(tx_hash)
if mempool_tx:
self.db.prefix_db.mempool_tx.stage_delete((tx_hash,), mempool_tx)
self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,)) self.db.prefix_db.tx.stage_put(key_args=(tx_hash,), value_args=(tx.raw,))
self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,)) self.db.prefix_db.tx_num.stage_put(key_args=(tx_hash,), value_args=(tx_count,))
self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,)) self.db.prefix_db.tx_hash.stage_put(key_args=(tx_count,), value_args=(tx_hash,))
@ -1513,6 +1530,8 @@ class BlockProcessor:
# update effective amount and update sets of touched and deleted claims # update effective amount and update sets of touched and deleted claims
self._get_cumulative_update_ops(height) self._get_cumulative_update_ops(height)
self.db.prefix_db.touched_hashX.stage_put((height,), (list(sorted(self.touched_hashXs)),))
self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,)) self.db.prefix_db.tx_count.stage_put(key_args=(height,), value_args=(tx_count,))
for hashX, new_history in self.hashXs_by_tx.items(): for hashX, new_history in self.hashXs_by_tx.items():
@ -1594,6 +1613,7 @@ class BlockProcessor:
self.pending_support_amount_change.clear() self.pending_support_amount_change.clear()
self.resolve_cache.clear() self.resolve_cache.clear()
self.resolve_outputs_cache.clear() self.resolve_outputs_cache.clear()
self.touched_hashXs.clear()
async def backup_block(self): async def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0

View file

@ -40,3 +40,6 @@ class DB_PREFIXES(enum.Enum):
channel_count = b'Z' channel_count = b'Z'
support_amount = b'a' support_amount = b'a'
block_txs = b'b' block_txs = b'b'
trending_notifications = b'c'
mempool_tx = b'd'
touched_hashX = b'e'

View file

@ -445,3 +445,9 @@ class ResolveResult(typing.NamedTuple):
channel_hash: typing.Optional[bytes] channel_hash: typing.Optional[bytes]
reposted_claim_hash: typing.Optional[bytes] reposted_claim_hash: typing.Optional[bytes]
signature_valid: typing.Optional[bool] signature_valid: typing.Optional[bool]
class TrendingNotification(typing.NamedTuple):
height: int
prev_amount: int
new_amount: int

View file

@ -5,6 +5,7 @@ import base64
from typing import Union, Tuple, NamedTuple, Optional from typing import Union, Tuple, NamedTuple, Optional
from lbry.wallet.server.db import DB_PREFIXES from lbry.wallet.server.db import DB_PREFIXES
from lbry.wallet.server.db.db import RocksDBStore, PrefixDB from lbry.wallet.server.db.db import RocksDBStore, PrefixDB
from lbry.wallet.server.db.common import TrendingNotification
from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete from lbry.wallet.server.db.revertable import RevertableOpStack, RevertablePut, RevertableDelete
from lbry.schema.url import normalize_name from lbry.schema.url import normalize_name
@ -230,7 +231,7 @@ class TxValue(NamedTuple):
raw_tx: bytes raw_tx: bytes
def __str__(self): def __str__(self):
return f"{self.__class__.__name__}(raw_tx={base64.b64encode(self.raw_tx)})" return f"{self.__class__.__name__}(raw_tx={base64.b64encode(self.raw_tx).decode()})"
class BlockHeaderKey(NamedTuple): class BlockHeaderKey(NamedTuple):
@ -1595,6 +1596,124 @@ class BlockTxsPrefixRow(PrefixRow):
return cls.pack_key(height), cls.pack_value(tx_hashes) return cls.pack_key(height), cls.pack_value(tx_hashes)
class MempoolTxKey(TxKey):
pass
class MempoolTxValue(TxValue):
pass
class MempoolTXPrefixRow(PrefixRow):
prefix = DB_PREFIXES.mempool_tx.value
key_struct = struct.Struct(b'>32s')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>32s').pack
]
@classmethod
def pack_key(cls, tx_hash: bytes) -> bytes:
return super().pack_key(tx_hash)
@classmethod
def unpack_key(cls, tx_hash: bytes) -> MempoolTxKey:
return MempoolTxKey(*super().unpack_key(tx_hash))
@classmethod
def pack_value(cls, tx: bytes) -> bytes:
return tx
@classmethod
def unpack_value(cls, data: bytes) -> MempoolTxValue:
return MempoolTxValue(data)
@classmethod
def pack_item(cls, tx_hash: bytes, raw_tx: bytes):
return cls.pack_key(tx_hash), cls.pack_value(raw_tx)
class TrendingNotificationKey(typing.NamedTuple):
height: int
claim_hash: bytes
class TrendingNotificationValue(typing.NamedTuple):
previous_amount: int
new_amount: int
class TrendingNotificationPrefixRow(PrefixRow):
prefix = DB_PREFIXES.trending_notifications.value
key_struct = struct.Struct(b'>L20s')
value_struct = struct.Struct(b'>QQ')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack,
struct.Struct(b'>L20s').pack
]
@classmethod
def pack_key(cls, height: int, claim_hash: bytes):
return super().pack_key(height, claim_hash)
@classmethod
def unpack_key(cls, key: bytes) -> TrendingNotificationKey:
return TrendingNotificationKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, previous_amount: int, new_amount: int) -> bytes:
return super().pack_value(previous_amount, new_amount)
@classmethod
def unpack_value(cls, data: bytes) -> TrendingNotificationValue:
return TrendingNotificationValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, height, claim_hash, previous_amount, new_amount):
return cls.pack_key(height, claim_hash), cls.pack_value(previous_amount, new_amount)
class TouchedHashXKey(NamedTuple):
height: int
class TouchedHashXValue(NamedTuple):
touched_hashXs: typing.List[bytes]
class TouchedHashXPrefixRow(PrefixRow):
prefix = DB_PREFIXES.touched_hashX.value
key_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>L').pack
]
@classmethod
def pack_key(cls, height: int):
return super().pack_key(height)
@classmethod
def unpack_key(cls, key: bytes) -> TouchedHashXKey:
return TouchedHashXKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, touched: typing.List[bytes]) -> bytes:
assert all(map(lambda item: len(item) == 11, touched))
return b''.join(touched)
@classmethod
def unpack_value(cls, data: bytes) -> TouchedHashXValue:
return TouchedHashXValue([data[idx*11:(idx*11)+11] for idx in range(len(data) // 11)])
@classmethod
def pack_item(cls, height: int, touched: typing.List[bytes]):
return cls.pack_key(height), cls.pack_value(touched)
class HubDB(PrefixDB): class HubDB(PrefixDB):
def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512, def __init__(self, path: str, cache_mb: int = 128, reorg_limit: int = 200, max_open_files: int = 512,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
@ -1630,6 +1749,9 @@ class HubDB(PrefixDB):
self.db_state = DBStatePrefixRow(db, self._op_stack) self.db_state = DBStatePrefixRow(db, self._op_stack)
self.support_amount = SupportAmountPrefixRow(db, self._op_stack) self.support_amount = SupportAmountPrefixRow(db, self._op_stack)
self.block_txs = BlockTxsPrefixRow(db, self._op_stack) self.block_txs = BlockTxsPrefixRow(db, self._op_stack)
self.mempool_tx = MempoolTXPrefixRow(db, self._op_stack)
self.trending_notification = TrendingNotificationPrefixRow(db, self._op_stack)
self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: