From 27520c835e4cd54417ebad5b86dc99b84826189d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 6 Sep 2022 12:49:49 -0400 Subject: [PATCH] use effective_amount index in scribe for faster takeover calculations --- hub/db/db.py | 12 ++++++++++++ hub/db/prefixes.py | 11 ++++++----- hub/scribe/service.py | 31 +++++++++++++++++++++++-------- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/hub/db/db.py b/hub/db/db.py index 825dab9..e2d37f5 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -494,6 +494,13 @@ class SecondaryDB: return self.prefix_db.claim_to_txo.get(claim_hash) def _get_active_amount(self, claim_hash: bytes, txo_type: int, height: int) -> int: + if height == self.db_height + 1: + v = self.prefix_db.effective_amount.get(claim_hash) + if not v: + return 0 + if txo_type is ACTIVATED_SUPPORT_TXO_TYPE: + return v.support_sum + return v.effective_amount - v.support_sum return sum( v.amount for v in self.prefix_db.active_amount.iterate( start=(claim_hash, txo_type, 0), stop=(claim_hash, txo_type, height), include_key=False @@ -501,6 +508,11 @@ class SecondaryDB: ) def get_active_amount_as_of_height(self, claim_hash: bytes, height: int) -> int: + if height == self.db_height + 1: + v = self.prefix_db.effective_amount.get(claim_hash) + if not v: + return 0 + return v.effective_amount - v.support_sum for v in self.prefix_db.active_amount.iterate( start=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, 0), stop=(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, height), include_key=False, reverse=True): diff --git a/hub/db/prefixes.py b/hub/db/prefixes.py index 67a6ff3..0e2b67f 100644 --- a/hub/db/prefixes.py +++ b/hub/db/prefixes.py @@ -1776,12 +1776,13 @@ class EffectiveAmountKey(NamedTuple): class EffectiveAmountValue(NamedTuple): effective_amount: int + support_sum: int class EffectiveAmountPrefixRow(PrefixRow): prefix = DB_PREFIXES.effective_amount.value key_struct = struct.Struct(b'>20s') - value_struct = struct.Struct(b'>Q') + value_struct = struct.Struct(b'>QQ') key_part_lambdas = [ lambda: b'', @@ -1797,16 +1798,16 @@ class EffectiveAmountPrefixRow(PrefixRow): return EffectiveAmountKey(*super().unpack_key(key)) @classmethod - def pack_value(cls, effective_amount: int) -> bytes: - return super().pack_value(effective_amount) + def pack_value(cls, effective_amount: int, support_sum: int) -> bytes: + return super().pack_value(effective_amount, support_sum) @classmethod def unpack_value(cls, data: bytes) -> EffectiveAmountValue: return EffectiveAmountValue(*cls.value_struct.unpack(data)) @classmethod - def pack_item(cls, claim_hash: bytes, effective_amount: int): - return cls.pack_key(claim_hash), cls.pack_value(effective_amount) + def pack_item(cls, claim_hash: bytes, effective_amount: int, support_sum: int): + return cls.pack_key(claim_hash), cls.pack_value(effective_amount, support_sum) class PrefixDB(BasePrefixDB): diff --git a/hub/scribe/service.py b/hub/scribe/service.py index 204eed3..40737ff 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -126,6 +126,7 @@ class BlockchainProcessorService(BlockchainService): self.reposted_count_delta = defaultdict(int) # {claim_hash: } self.effective_amount_delta = defaultdict(int) + self.active_support_amount_delta = defaultdict(int) self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE) self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE) @@ -522,6 +523,7 @@ class BlockchainProcessorService(BlockchainService): if 0 < activation <= self.height: self.removed_active_support_amount_by_claim[spent_support].append(support_amount) self.effective_amount_delta[spent_support] -= support_amount + self.active_support_amount_delta[spent_support] -= support_amount if supported_name is not None and activation > 0: self.get_remove_activate_ops( ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, @@ -942,6 +944,7 @@ class BlockchainProcessorService(BlockchainService): # print("\tskip activate support for non existent claim") continue self.activated_support_amount_by_claim[activated.claim_hash].append(amount) + self.active_support_amount_delta[activated.claim_hash] += amount self.effective_amount_delta[activated.claim_hash] += amount activated_added_to_effective_amount.add(activated.claim_hash) self.activation_by_claim_by_name[activated.normalized_name][activated.claim_hash].append((activated_txo, amount)) @@ -1094,6 +1097,7 @@ class BlockchainProcessorService(BlockchainService): k.position, height, name, future_amount ) self.effective_amount_delta[winning_including_future_activations] += future_amount + self.active_support_amount_delta[winning_including_future_activations] += future_amount self.taken_over_names.add(name) if controlling: @@ -1288,28 +1292,38 @@ class BlockchainProcessorService(BlockchainService): (height, touched), (prev_effective_amount, new_effective_amount) ) - current_effective_amounts = { - claim: None if v is None else v.effective_amount - for claim, v in zip( + current_effective_amount_values = { + claim_hash: v for claim_hash, v in zip( self.effective_amount_delta, self.db.prefix_db.effective_amount.multi_get( [(claim_hash,) for claim_hash in self.effective_amount_delta] ) ) } + current_effective_amounts = { + claim_hash: 0 if not v else v.effective_amount + for claim_hash, v in current_effective_amount_values.items() + } + current_supports_amount = { + claim_hash: 0 if not v else v.support_sum + for claim_hash, v in current_effective_amount_values.items() + } + delete_effective_amounts = [ - self.db.prefix_db.effective_amount.pack_item(claim_hash, amount) - for claim_hash, amount in current_effective_amounts.items() if amount is not None + self.db.prefix_db.effective_amount.pack_item(claim_hash, v.effective_amount, v.support_sum) + for claim_hash, v in current_effective_amount_values.items() if v is not None ] + claims = set(self.effective_amount_delta.keys()).union(self.active_support_amount_delta.keys()) new_effective_amounts = { - claim_hash: (current_effective_amounts.get(claim_hash, 0) or 0) + delta - for claim_hash, delta in self.effective_amount_delta.items() + claim_hash: ((current_effective_amounts.get(claim_hash, 0) or 0) + self.effective_amount_delta.get(claim_hash, 0), + (current_supports_amount.get(claim_hash, 0) or 0) + self.active_support_amount_delta.get(claim_hash, 0)) + for claim_hash in claims } if delete_effective_amounts: self.db.prefix_db.multi_delete(delete_effective_amounts) if new_effective_amounts: self.db.prefix_db.effective_amount.stage_multi_put( - [((claim_hash,), (amount,)) for claim_hash, amount in new_effective_amounts.items()] + [((claim_hash,), (amount, support_sum)) for claim_hash, (amount, support_sum) in new_effective_amounts.items()] ) # update or insert channel counts @@ -1629,6 +1643,7 @@ class BlockchainProcessorService(BlockchainService): self.hashX_full_cache.clear() self.reposted_count_delta.clear() self.effective_amount_delta.clear() + self.active_support_amount_delta.clear() def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0