use effective_amount index in scribe for faster takeover calculations

This commit is contained in:
Jack Robison 2022-09-06 12:49:49 -04:00
parent c686187e35
commit 27520c835e
3 changed files with 41 additions and 13 deletions

View file

@ -494,6 +494,13 @@ class SecondaryDB:
return self.prefix_db.claim_to_txo.get(claim_hash) return self.prefix_db.claim_to_txo.get(claim_hash)
def _get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int: def _get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int:
if height == self.db_height + 1:
v = self.prefix_db.effective_amount.get(claim_hash)
if not v:
return 0
if txo_type is ACTIVATED_SUPPORT_TXO_TYPE:
return v.support_sum
return v.effective_amount - v.support_sum
return sum( return sum(
v.amount for v in self.prefix_db.active_amount.iterate( v.amount for v in self.prefix_db.active_amount.iterate(
start=(claim_hash, txo_type, 0), stop=(claim_hash, txo_type, height), include_key=False start=(claim_hash, txo_type, 0), stop=(claim_hash, txo_type, height), include_key=False
@ -501,6 +508,11 @@ class SecondaryDB:
) )
def get_active_amount_as_of_height(self, claim_hash: bytes, height: int) -> int: def get_active_amount_as_of_height(self, claim_hash: bytes, height: int) -> int:
if height == self.db_height + 1:
v = self.prefix_db.effective_amount.get(claim_hash)
if not v:
return 0
return v.effective_amount - v.support_sum
for v in self.prefix_db.active_amount.iterate( for v in self.prefix_db.active_amount.iterate(
start=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), stop=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height), start=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), stop=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height),
include_key=False, reverse=True): include_key=False, reverse=True):

View file

@ -1776,12 +1776,13 @@ class EffectiveAmountKey(NamedTuple):
class EffectiveAmountValue(NamedTuple): class EffectiveAmountValue(NamedTuple):
effective_amount: int effective_amount: int
support_sum: int
class EffectiveAmountPrefixRow(PrefixRow): class EffectiveAmountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.effective_amount.value prefix = DB_PREFIXES.effective_amount.value
key_struct = struct.Struct(b'>20s') key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>Q') value_struct = struct.Struct(b'>QQ')
key_part_lambdas = [ key_part_lambdas = [
lambda: b'', lambda: b'',
@ -1797,16 +1798,16 @@ class EffectiveAmountPrefixRow(PrefixRow):
return EffectiveAmountKey(*super().unpack_key(key)) return EffectiveAmountKey(*super().unpack_key(key))
@classmethod @classmethod
def pack_value(cls, effective_amount: int) -> bytes: def pack_value(cls, effective_amount: int, support_sum: int) -> bytes:
return super().pack_value(effective_amount) return super().pack_value(effective_amount, support_sum)
@classmethod @classmethod
def unpack_value(cls, data: bytes) -> EffectiveAmountValue: def unpack_value(cls, data: bytes) -> EffectiveAmountValue:
return EffectiveAmountValue(*cls.value_struct.unpack(data)) return EffectiveAmountValue(*cls.value_struct.unpack(data))
@classmethod @classmethod
def pack_item(cls, claim_hash: bytes, effective_amount: int): def pack_item(cls, claim_hash: bytes, effective_amount: int, support_sum: int):
return cls.pack_key(claim_hash), cls.pack_value(effective_amount) return cls.pack_key(claim_hash), cls.pack_value(effective_amount, support_sum)
class PrefixDB(BasePrefixDB): class PrefixDB(BasePrefixDB):

View file

@ -126,6 +126,7 @@ class BlockchainProcessorService(BlockchainService):
self.reposted_count_delta = defaultdict(int) self.reposted_count_delta = defaultdict(int)
# {claim_hash: <change in effective amount>} # {claim_hash: <change in effective amount>}
self.effective_amount_delta = defaultdict(int) self.effective_amount_delta = defaultdict(int)
self.active_support_amount_delta = defaultdict(int)
self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE) self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE)
self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE) self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE)
@ -522,6 +523,7 @@ class BlockchainProcessorService(BlockchainService):
if 0 < activation <= self.height: if 0 < activation <= self.height:
self.removed_active_support_amount_by_claim[spent_support].append(support_amount) self.removed_active_support_amount_by_claim[spent_support].append(support_amount)
self.effective_amount_delta[spent_support] -= support_amount self.effective_amount_delta[spent_support] -= support_amount
self.active_support_amount_delta[spent_support] -= support_amount
if supported_name is not None and activation > 0: if supported_name is not None and activation > 0:
self.get_remove_activate_ops( self.get_remove_activate_ops(
ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name,
@ -942,6 +944,7 @@ class BlockchainProcessorService(BlockchainService):
# print("\tskip activate support for non existent claim") # print("\tskip activate support for non existent claim")
continue continue
self.activated_support_amount_by_claim[activated.claim_hash].append(amount) self.activated_support_amount_by_claim[activated.claim_hash].append(amount)
self.active_support_amount_delta[activated.claim_hash] += amount
self.effective_amount_delta[activated.claim_hash] += amount self.effective_amount_delta[activated.claim_hash] += amount
activated_added_to_effective_amount.add(activated.claim_hash) activated_added_to_effective_amount.add(activated.claim_hash)
self.activation_by_claim_by_name[activated.normalized_name][activated.claim_hash].append((activated_txo, amount)) self.activation_by_claim_by_name[activated.normalized_name][activated.claim_hash].append((activated_txo, amount))
@ -1094,6 +1097,7 @@ class BlockchainProcessorService(BlockchainService):
k.position, height, name, future_amount k.position, height, name, future_amount
) )
self.effective_amount_delta[winning_including_future_activations] += future_amount self.effective_amount_delta[winning_including_future_activations] += future_amount
self.active_support_amount_delta[winning_including_future_activations] += future_amount
self.taken_over_names.add(name) self.taken_over_names.add(name)
if controlling: if controlling:
@ -1288,28 +1292,38 @@ class BlockchainProcessorService(BlockchainService):
(height, touched), (prev_effective_amount, new_effective_amount) (height, touched), (prev_effective_amount, new_effective_amount)
) )
current_effective_amounts = { current_effective_amount_values = {
claim: None if v is None else v.effective_amount claim_hash: v for claim_hash, v in zip(
for claim, v in zip(
self.effective_amount_delta, self.effective_amount_delta,
self.db.prefix_db.effective_amount.multi_get( self.db.prefix_db.effective_amount.multi_get(
[(claim_hash,) for claim_hash in self.effective_amount_delta] [(claim_hash,) for claim_hash in self.effective_amount_delta]
) )
) )
} }
current_effective_amounts = {
claim_hash: 0 if not v else v.effective_amount
for claim_hash, v in current_effective_amount_values.items()
}
current_supports_amount = {
claim_hash: 0 if not v else v.support_sum
for claim_hash, v in current_effective_amount_values.items()
}
delete_effective_amounts = [ delete_effective_amounts = [
self.db.prefix_db.effective_amount.pack_item(claim_hash, amount) self.db.prefix_db.effective_amount.pack_item(claim_hash, v.effective_amount, v.support_sum)
for claim_hash, amount in current_effective_amounts.items() if amount is not None for claim_hash, v in current_effective_amount_values.items() if v is not None
] ]
claims = set(self.effective_amount_delta.keys()).union(self.active_support_amount_delta.keys())
new_effective_amounts = { new_effective_amounts = {
claim_hash: (current_effective_amounts.get(claim_hash, 0) or 0) + delta claim_hash: ((current_effective_amounts.get(claim_hash, 0) or 0) + self.effective_amount_delta.get(claim_hash, 0),
for claim_hash, delta in self.effective_amount_delta.items() (current_supports_amount.get(claim_hash, 0) or 0) + self.active_support_amount_delta.get(claim_hash, 0))
for claim_hash in claims
} }
if delete_effective_amounts: if delete_effective_amounts:
self.db.prefix_db.multi_delete(delete_effective_amounts) self.db.prefix_db.multi_delete(delete_effective_amounts)
if new_effective_amounts: if new_effective_amounts:
self.db.prefix_db.effective_amount.stage_multi_put( self.db.prefix_db.effective_amount.stage_multi_put(
[((claim_hash,), (amount,)) for claim_hash, amount in new_effective_amounts.items()] [((claim_hash,), (amount, support_sum)) for claim_hash, (amount, support_sum) in new_effective_amounts.items()]
) )
# update or insert channel counts # update or insert channel counts
@ -1629,6 +1643,7 @@ class BlockchainProcessorService(BlockchainService):
self.hashX_full_cache.clear() self.hashX_full_cache.clear()
self.reposted_count_delta.clear() self.reposted_count_delta.clear()
self.effective_amount_delta.clear() self.effective_amount_delta.clear()
self.active_support_amount_delta.clear()
def backup_block(self): def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0 assert len(self.db.prefix_db._op_stack) == 0