effective amount index

fixes https://github.com/lbryio/hub/issues/64
This commit is contained in:
Jack Robison 2022-07-21 14:48:14 -04:00
parent 292ad2b9b6
commit 08f25a7c50
5 changed files with 104 additions and 15 deletions

View file

@ -49,6 +49,7 @@ class DB_PREFIXES(enum.Enum):
hashX_status = b'f' hashX_status = b'f'
hashX_mempool_status = b'g' hashX_mempool_status = b'g'
reposted_count = b'j' reposted_count = b'j'
effective_amount = b'i'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -508,9 +508,10 @@ class SecondaryDB:
return 0 return 0
def get_effective_amount(self, claim_hash: bytes) -> int: def get_effective_amount(self, claim_hash: bytes) -> int:
return self._get_active_amount( v = self.prefix_db.effective_amount.get(claim_hash)
claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.db_height + 1 if v:
) + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) return v.effective_amount
return 0
def get_url_effective_amount(self, name: str, claim_hash: bytes) -> Optional['BidOrderKey']: def get_url_effective_amount(self, name: str, claim_hash: bytes) -> Optional['BidOrderKey']:
for k, v in self.prefix_db.bid_order.iterate(prefix=(name,)): for k, v in self.prefix_db.bid_order.iterate(prefix=(name,)):
@ -641,11 +642,12 @@ class SecondaryDB:
) )
} }
# collect all of the effective amounts
effective_amounts = { effective_amounts = {
claim_hash: await run_in_executor( claim_hash: 0 if not v else v.effective_amount
self._executor, self.get_effective_amount, claim_hash async for (claim_hash, ), v in self.prefix_db.effective_amount.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims]
) )
for claim_hash, claim in claims.items() if claim is not None
} }
censoring_reasons = {} censoring_reasons = {}

View file

@ -280,6 +280,8 @@ class BasePrefixDB:
return self._db.get((cf, key), fill_cache=fill_cache) return self._db.get((cf, key), fill_cache=fill_cache)
def multi_get(self, keys: typing.List[bytes], fill_cache=True): def multi_get(self, keys: typing.List[bytes], fill_cache=True):
if len(keys) == 0:
return []
first_key = keys[0] first_key = keys[0]
if not all(first_key[0] == key[0] for key in keys): if not all(first_key[0] == key[0] for key in keys):
raise ValueError('cannot multi-delete across column families') raise ValueError('cannot multi-delete across column families')

View file

@ -1770,6 +1770,45 @@ class HashXMempoolStatusPrefixRow(PrefixRow):
return cls.pack_key(hashX), cls.pack_value(status) return cls.pack_key(hashX), cls.pack_value(status)
class EffectiveAmountKey(NamedTuple):
claim_hash: bytes
class EffectiveAmountValue(NamedTuple):
effective_amount: int
class EffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.effective_amount.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>Q')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack
]
@classmethod
def pack_key(cls, claim_hash: bytes):
return super().pack_key(claim_hash)
@classmethod
def unpack_key(cls, key: bytes) -> EffectiveAmountKey:
return EffectiveAmountKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, effective_amount: int) -> bytes:
return super().pack_value(effective_amount)
@classmethod
def unpack_value(cls, data: bytes) -> EffectiveAmountValue:
return EffectiveAmountValue(*cls.value_struct.unpack(data))
@classmethod
def pack_item(cls, claim_hash: bytes, effective_amount: int):
return cls.pack_key(claim_hash), cls.pack_value(effective_amount)
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):
def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64,
secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None):
@ -1812,6 +1851,7 @@ class PrefixDB(BasePrefixDB):
self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack) self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack)
self.hashX_status = HashXStatusPrefixRow(db, self._op_stack) self.hashX_status = HashXStatusPrefixRow(db, self._op_stack)
self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack) self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack)
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]:

View file

@ -124,6 +124,8 @@ class BlockchainProcessorService(BlockchainService):
# {claim_hash: <count>} # {claim_hash: <count>}
self.reposted_count_delta = defaultdict(int) self.reposted_count_delta = defaultdict(int)
# {claim_hash: <change in effective amount>}
self.effective_amount_delta = defaultdict(int)
self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE) self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE)
self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE) self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE)
@ -458,7 +460,7 @@ class BlockchainProcessorService(BlockchainService):
) )
# short url resolution # short url resolution
for prefix_len in range(10): for prefix_len in range(10): # TODO: multi_delete
self.db.prefix_db.claim_short_id.stage_delete( self.db.prefix_db.claim_short_id.stage_delete(
(pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1], (pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1],
pending.root_tx_num, pending.root_position), pending.root_tx_num, pending.root_position),
@ -517,8 +519,9 @@ class BlockchainProcessorService(BlockchainService):
self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append( self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append(
(txin_num, txin.prev_idx)) (txin_num, txin.prev_idx))
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True) activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True)
if 0 < activation < self.height + 1: if 0 < activation <= self.height:
self.removed_active_support_amount_by_claim[spent_support].append(support_amount) self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
self.effective_amount_delta[spent_support] -= support_amount
if supported_name is not None and activation > 0: if supported_name is not None and activation > 0:
self.get_remove_activate_ops( self.get_remove_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
@ -542,7 +545,9 @@ class BlockchainProcessorService(BlockchainService):
) )
assert spent_claim_hash_and_name is not None assert spent_claim_hash_and_name is not None
spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash)
activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=False)
if 0 < activation <= self.height + 1:
self.effective_amount_delta[spent_claim_hash_and_name.claim_hash] -= spent.amount
if self.env.cache_all_claim_txos: if self.env.cache_all_claim_txos:
claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx)
if not self.db.txo_to_claim[txin_num]: if not self.db.txo_to_claim[txin_num]:
@ -613,7 +618,7 @@ class BlockchainProcessorService(BlockchainService):
False, pending.name) False, pending.name)
) )
def _invalidate_channel_signatures(self, claim_hash: bytes): def _invalidate_channel_signatures(self, claim_hash: bytes): # TODO: multi_put
for (signed_claim_hash, ) in self.db.prefix_db.channel_to_claim.iterate( for (signed_claim_hash, ) in self.db.prefix_db.channel_to_claim.iterate(
prefix=(claim_hash, ), include_key=False): prefix=(claim_hash, ), include_key=False):
if signed_claim_hash in self.abandoned_claims or signed_claim_hash in self.expired_claim_hashes: if signed_claim_hash in self.abandoned_claims or signed_claim_hash in self.expired_claim_hashes:
@ -781,6 +786,7 @@ class BlockchainProcessorService(BlockchainService):
activated_at_height = self.db.get_activated_at_height(height) activated_at_height = self.db.get_activated_at_height(height)
activate_in_future = defaultdict(lambda: defaultdict(list)) activate_in_future = defaultdict(lambda: defaultdict(list))
future_activations = defaultdict(dict) future_activations = defaultdict(dict)
total_activated = set()
def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int, def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int,
amount: int, is_support: bool): amount: int, is_support: bool):
@ -895,6 +901,8 @@ class BlockchainProcessorService(BlockchainService):
name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True
) )
activated_added_to_effective_amount = set()
# add the activation/delayed-activation ops # add the activation/delayed-activation ops
for activated, activated_txos in activated_at_height.items(): for activated, activated_txos in activated_at_height.items():
controlling = get_controlling(activated.normalized_name) controlling = get_controlling(activated.normalized_name)
@ -922,6 +930,7 @@ class BlockchainProcessorService(BlockchainService):
# print("\tskip activate for non existent claim") # print("\tskip activate for non existent claim")
continue continue
self.activated_claim_amount_by_name_and_hash[(activated.normalized_name, activated.claim_hash)] = amount self.activated_claim_amount_by_name_and_hash[(activated.normalized_name, activated.claim_hash)] = amount
total_activated.add(activated.claim_hash)
else: else:
txo_type = ACTIVATED_SUPPORT_TXO_TYPE txo_type = ACTIVATED_SUPPORT_TXO_TYPE
txo_tup = (activated_txo.tx_num, activated_txo.position) txo_tup = (activated_txo.tx_num, activated_txo.position)
@ -935,6 +944,8 @@ class BlockchainProcessorService(BlockchainService):
# print("\tskip activate support for non existent claim") # print("\tskip activate support for non existent claim")
continue continue
self.activated_support_amount_by_claim[activated.claim_hash].append(amount) self.activated_support_amount_by_claim[activated.claim_hash].append(amount)
self.effective_amount_delta[activated.claim_hash] += amount
activated_added_to_effective_amount.add(activated.claim_hash)
self.activation_by_claim_by_name[activated.normalized_name][activated.claim_hash].append((activated_txo, amount)) self.activation_by_claim_by_name[activated.normalized_name][activated.claim_hash].append((activated_txo, amount))
# print(f"\tactivate {'support' if txo_type == ACTIVATED_SUPPORT_TXO_TYPE else 'claim'} " # print(f"\tactivate {'support' if txo_type == ACTIVATED_SUPPORT_TXO_TYPE else 'claim'} "
# f"{activated.claim_hash.hex()} @ {activated_txo.height}") # f"{activated.claim_hash.hex()} @ {activated_txo.height}")
@ -1072,18 +1083,20 @@ class BlockchainProcessorService(BlockchainService):
ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num,
position, height, name, amount position, height, name, amount
) )
self.effective_amount_delta[winning_including_future_activations] += amount
for (k, amount) in activate_in_future[name][winning_including_future_activations]: for (k, future_amount) in activate_in_future[name][winning_including_future_activations]:
txo = (k.tx_num, k.position) txo = (k.tx_num, k.position)
if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]: if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]:
self.get_remove_activate_ops( self.get_remove_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
k.position, k.height, name, amount k.position, k.height, name, future_amount
) )
self.get_activate_ops( self.get_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num,
k.position, height, name, amount k.position, height, name, future_amount
) )
self.effective_amount_delta[winning_including_future_activations] += future_amount
self.taken_over_names.add(name) self.taken_over_names.add(name)
if controlling: if controlling:
self.db.prefix_db.claim_takeover.stage_delete( self.db.prefix_db.claim_takeover.stage_delete(
@ -1118,6 +1131,8 @@ class BlockchainProcessorService(BlockchainService):
ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num,
position, height, name, amount position, height, name, amount
) )
if winning_claim_hash not in activated_added_to_effective_amount:
self.effective_amount_delta[winning_claim_hash] += amount
self.taken_over_names.add(name) self.taken_over_names.add(name)
if controlling: if controlling:
self.db.prefix_db.claim_takeover.stage_delete( self.db.prefix_db.claim_takeover.stage_delete(
@ -1223,6 +1238,8 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,)) self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,))
# use the cumulative changes to update bid ordered resolve # use the cumulative changes to update bid ordered resolve
#
# first remove bid orders for removed claims
for removed in self.removed_claim_hashes: for removed in self.removed_claim_hashes:
removed_claim = self.db.get_claim_txo(removed) removed_claim = self.db.get_claim_txo(removed)
if removed_claim: if removed_claim:
@ -1233,7 +1250,7 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.bid_order.stage_delete( self.db.prefix_db.bid_order.stage_delete(
(removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,) (removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,)
) )
# update or insert new bid orders
for touched in self.touched_claim_hashes: for touched in self.touched_claim_hashes:
prev_effective_amount = 0 prev_effective_amount = 0
@ -1273,6 +1290,31 @@ class BlockchainProcessorService(BlockchainService):
(height, touched), (prev_effective_amount, new_effective_amount) (height, touched), (prev_effective_amount, new_effective_amount)
) )
current_effective_amounts = {
claim: None if v is None else v.effective_amount
for claim, v in zip(
self.effective_amount_delta,
self.db.prefix_db.effective_amount.multi_get(
[(claim_hash,) for claim_hash in self.effective_amount_delta]
)
)
}
delete_effective_amounts = [
self.db.prefix_db.effective_amount.pack_item(claim_hash, amount)
for claim_hash, amount in current_effective_amounts.items() if amount is not None
]
new_effective_amounts = {
claim_hash: (current_effective_amounts.get(claim_hash, 0) or 0) + delta
for claim_hash, delta in self.effective_amount_delta.items()
}
if delete_effective_amounts:
self.db.prefix_db.multi_delete(delete_effective_amounts)
if new_effective_amounts:
self.db.prefix_db.effective_amount.stage_multi_put(
[((claim_hash,), (amount,)) for claim_hash, amount in new_effective_amounts.items()]
)
# update or insert channel counts
for channel_hash, count in self.pending_channel_counts.items(): for channel_hash, count in self.pending_channel_counts.items():
if count != 0: if count != 0:
channel_count_val = self.db.prefix_db.channel_count.get(channel_hash) channel_count_val = self.db.prefix_db.channel_count.get(channel_hash)
@ -1281,6 +1323,7 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,)) self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,))
self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,)) self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,))
# update the sets of touched and removed claims for es sync
self.touched_claim_hashes.update( self.touched_claim_hashes.update(
{k for k in self.pending_reposted if k not in self.removed_claim_hashes} {k for k in self.pending_reposted if k not in self.removed_claim_hashes}
) )
@ -1587,6 +1630,7 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_history_cache.clear() self.hashX_history_cache.clear()
self.hashX_full_cache.clear() self.hashX_full_cache.clear()
self.reposted_count_delta.clear() self.reposted_count_delta.clear()
self.effective_amount_delta.clear()
def backup_block(self): def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0