diff --git a/hub/db/common.py b/hub/db/common.py index 685b5af..0b20ca9 100644 --- a/hub/db/common.py +++ b/hub/db/common.py @@ -49,6 +49,7 @@ class DB_PREFIXES(enum.Enum): hashX_status = b'f' hashX_mempool_status = b'g' reposted_count = b'j' + effective_amount = b'i' COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass diff --git a/hub/db/db.py b/hub/db/db.py index fd70c53..0b23383 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -508,9 +508,10 @@ class SecondaryDB: return 0 def get_effective_amount(self, claim_hash: bytes) -> int: - return self._get_active_amount( - claim_hash, ACTIVATED_SUPPORT_TXO_TYPE, self.db_height + 1 - ) + self._get_active_amount(claim_hash, ACTIVATED_CLAIM_TXO_TYPE, self.db_height + 1) + v = self.prefix_db.effective_amount.get(claim_hash) + if v: + return v.effective_amount + return 0 def get_url_effective_amount(self, name: str, claim_hash: bytes) -> Optional['BidOrderKey']: for k, v in self.prefix_db.bid_order.iterate(prefix=(name,)): @@ -641,11 +642,12 @@ class SecondaryDB: ) } + # collect all of the effective amounts effective_amounts = { - claim_hash: await run_in_executor( - self._executor, self.get_effective_amount, claim_hash + claim_hash: 0 if not v else v.effective_amount + async for (claim_hash, ), v in self.prefix_db.effective_amount.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in claims] ) - for claim_hash, claim in claims.items() if claim is not None } censoring_reasons = {} diff --git a/hub/db/interface.py b/hub/db/interface.py index fef99b7..7031e66 100644 --- a/hub/db/interface.py +++ b/hub/db/interface.py @@ -280,6 +280,8 @@ class BasePrefixDB: return self._db.get((cf, key), fill_cache=fill_cache) def multi_get(self, keys: typing.List[bytes], fill_cache=True): + if len(keys) == 0: + return [] first_key = keys[0] if not all(first_key[0] == key[0] for key in keys): raise ValueError('cannot multi-delete across column families') diff --git a/hub/db/prefixes.py b/hub/db/prefixes.py index db4327a..67a6ff3 100644 --- a/hub/db/prefixes.py +++ b/hub/db/prefixes.py @@ -1770,6 +1770,45 @@ class HashXMempoolStatusPrefixRow(PrefixRow): return cls.pack_key(hashX), cls.pack_value(status) +class EffectiveAmountKey(NamedTuple): + claim_hash: bytes + + +class EffectiveAmountValue(NamedTuple): + effective_amount: int + + +class EffectiveAmountPrefixRow(PrefixRow): + prefix = DB_PREFIXES.effective_amount.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'>Q') + + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack + ] + + @classmethod + def pack_key(cls, claim_hash: bytes): + return super().pack_key(claim_hash) + + @classmethod + def unpack_key(cls, key: bytes) -> EffectiveAmountKey: + return EffectiveAmountKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, effective_amount: int) -> bytes: + return super().pack_value(effective_amount) + + @classmethod + def unpack_value(cls, data: bytes) -> EffectiveAmountValue: + return EffectiveAmountValue(*cls.value_struct.unpack(data)) + + @classmethod + def pack_item(cls, claim_hash: bytes, effective_amount: int): + return cls.pack_key(claim_hash), cls.pack_value(effective_amount) + + class PrefixDB(BasePrefixDB): def __init__(self, path: str, reorg_limit: int = 200, max_open_files: int = 64, secondary_path: str = '', unsafe_prefixes: Optional[typing.Set[bytes]] = None): @@ -1812,6 +1851,7 @@ class PrefixDB(BasePrefixDB): self.touched_hashX = TouchedHashXPrefixRow(db, self._op_stack) self.hashX_status = HashXStatusPrefixRow(db, self._op_stack) self.hashX_mempool_status = HashXMempoolStatusPrefixRow(db, self._op_stack) + self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack) def auto_decode_item(key: bytes, value: bytes) -> Union[Tuple[NamedTuple, NamedTuple], Tuple[bytes, bytes]]: diff --git a/hub/scribe/service.py b/hub/scribe/service.py index 62e8845..7256160 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -124,6 +124,8 @@ class BlockchainProcessorService(BlockchainService): # {claim_hash: } self.reposted_count_delta = defaultdict(int) + # {claim_hash: } + self.effective_amount_delta = defaultdict(int) self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE) self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE) @@ -458,7 +460,7 @@ class BlockchainProcessorService(BlockchainService): ) # short url resolution - for prefix_len in range(10): + for prefix_len in range(10): # TODO: multi_delete self.db.prefix_db.claim_short_id.stage_delete( (pending.normalized_name, pending.claim_hash.hex()[:prefix_len + 1], pending.root_tx_num, pending.root_position), @@ -517,8 +519,9 @@ class BlockchainProcessorService(BlockchainService): self.removed_support_txos_by_name_by_claim[supported_name][spent_support].append( (txin_num, txin.prev_idx)) activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=True) - if 0 < activation < self.height + 1: + if 0 < activation <= self.height: self.removed_active_support_amount_by_claim[spent_support].append(support_amount) + self.effective_amount_delta[spent_support] -= support_amount if supported_name is not None and activation > 0: self.get_remove_activate_ops( ACTIVATED_SUPPORT_TXO_TYPE, spent_support, txin_num, txin.prev_idx, activation, supported_name, @@ -542,7 +545,9 @@ class BlockchainProcessorService(BlockchainService): ) assert spent_claim_hash_and_name is not None spent = self._make_pending_claim_txo(spent_claim_hash_and_name.claim_hash) - + activation = self.db.get_activation(txin_num, txin.prev_idx, is_support=False) + if 0 < activation <= self.height + 1: + self.effective_amount_delta[spent_claim_hash_and_name.claim_hash] -= spent.amount if self.env.cache_all_claim_txos: claim_hash = self.db.txo_to_claim[txin_num].pop(txin.prev_idx) if not self.db.txo_to_claim[txin_num]: @@ -613,7 +618,7 @@ class BlockchainProcessorService(BlockchainService): False, pending.name) ) - def _invalidate_channel_signatures(self, claim_hash: bytes): + def _invalidate_channel_signatures(self, claim_hash: bytes): # TODO: multi_put for (signed_claim_hash, ) in self.db.prefix_db.channel_to_claim.iterate( prefix=(claim_hash, ), include_key=False): if signed_claim_hash in self.abandoned_claims or signed_claim_hash in self.expired_claim_hashes: @@ -781,6 +786,7 @@ class BlockchainProcessorService(BlockchainService): activated_at_height = self.db.get_activated_at_height(height) activate_in_future = defaultdict(lambda: defaultdict(list)) future_activations = defaultdict(dict) + total_activated = set() def get_delayed_activate_ops(name: str, claim_hash: bytes, is_new_claim: bool, tx_num: int, nout: int, amount: int, is_support: bool): @@ -895,6 +901,8 @@ class BlockchainProcessorService(BlockchainService): name, claim_hash, staged_is_new_claim, tx_num, nout, amount, is_support=True ) + activated_added_to_effective_amount = set() + # add the activation/delayed-activation ops for activated, activated_txos in activated_at_height.items(): controlling = get_controlling(activated.normalized_name) @@ -922,6 +930,7 @@ class BlockchainProcessorService(BlockchainService): # print("\tskip activate for non existent claim") continue self.activated_claim_amount_by_name_and_hash[(activated.normalized_name, activated.claim_hash)] = amount + total_activated.add(activated.claim_hash) else: txo_type = ACTIVATED_SUPPORT_TXO_TYPE txo_tup = (activated_txo.tx_num, activated_txo.position) @@ -935,6 +944,8 @@ class BlockchainProcessorService(BlockchainService): # print("\tskip activate support for non existent claim") continue self.activated_support_amount_by_claim[activated.claim_hash].append(amount) + self.effective_amount_delta[activated.claim_hash] += amount + activated_added_to_effective_amount.add(activated.claim_hash) self.activation_by_claim_by_name[activated.normalized_name][activated.claim_hash].append((activated_txo, amount)) # print(f"\tactivate {'support' if txo_type == ACTIVATED_SUPPORT_TXO_TYPE else 'claim'} " # f"{activated.claim_hash.hex()} @ {activated_txo.height}") @@ -1072,18 +1083,20 @@ class BlockchainProcessorService(BlockchainService): ACTIVATED_CLAIM_TXO_TYPE, winning_including_future_activations, tx_num, position, height, name, amount ) - - for (k, amount) in activate_in_future[name][winning_including_future_activations]: + self.effective_amount_delta[winning_including_future_activations] += amount + for (k, future_amount) in activate_in_future[name][winning_including_future_activations]: txo = (k.tx_num, k.position) if txo in self.possible_future_support_txos_by_claim_hash[winning_including_future_activations]: self.get_remove_activate_ops( ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, - k.position, k.height, name, amount + k.position, k.height, name, future_amount ) self.get_activate_ops( ACTIVATED_SUPPORT_TXO_TYPE, winning_including_future_activations, k.tx_num, - k.position, height, name, amount + k.position, height, name, future_amount ) + self.effective_amount_delta[winning_including_future_activations] += future_amount + self.taken_over_names.add(name) if controlling: self.db.prefix_db.claim_takeover.stage_delete( @@ -1118,6 +1131,8 @@ class BlockchainProcessorService(BlockchainService): ACTIVATED_CLAIM_TXO_TYPE, winning_claim_hash, tx_num, position, height, name, amount ) + if winning_claim_hash not in activated_added_to_effective_amount: + self.effective_amount_delta[winning_claim_hash] += amount self.taken_over_names.add(name) if controlling: self.db.prefix_db.claim_takeover.stage_delete( @@ -1223,6 +1238,8 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.support_amount.stage_put((supported_claim,), (total,)) # use the cumulative changes to update bid ordered resolve + # + # first remove bid orders for removed claims for removed in self.removed_claim_hashes: removed_claim = self.db.get_claim_txo(removed) if removed_claim: @@ -1233,7 +1250,7 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.bid_order.stage_delete( (removed_claim.normalized_name, amt.effective_amount, amt.tx_num, amt.position), (removed,) ) - + # update or insert new bid orders for touched in self.touched_claim_hashes: prev_effective_amount = 0 @@ -1273,6 +1290,31 @@ class BlockchainProcessorService(BlockchainService): (height, touched), (prev_effective_amount, new_effective_amount) ) + current_effective_amounts = { + claim: None if v is None else v.effective_amount + for claim, v in zip( + self.effective_amount_delta, + self.db.prefix_db.effective_amount.multi_get( + [(claim_hash,) for claim_hash in self.effective_amount_delta] + ) + ) + } + delete_effective_amounts = [ + self.db.prefix_db.effective_amount.pack_item(claim_hash, amount) + for claim_hash, amount in current_effective_amounts.items() if amount is not None + ] + new_effective_amounts = { + claim_hash: (current_effective_amounts.get(claim_hash, 0) or 0) + delta + for claim_hash, delta in self.effective_amount_delta.items() + } + if delete_effective_amounts: + self.db.prefix_db.multi_delete(delete_effective_amounts) + if new_effective_amounts: + self.db.prefix_db.effective_amount.stage_multi_put( + [((claim_hash,), (amount,)) for claim_hash, amount in new_effective_amounts.items()] + ) + + # update or insert channel counts for channel_hash, count in self.pending_channel_counts.items(): if count != 0: channel_count_val = self.db.prefix_db.channel_count.get(channel_hash) @@ -1281,6 +1323,7 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.channel_count.stage_delete((channel_hash,), (channel_count,)) self.db.prefix_db.channel_count.stage_put((channel_hash,), (channel_count + count,)) + # update the sets of touched and removed claims for es sync self.touched_claim_hashes.update( {k for k in self.pending_reposted if k not in self.removed_claim_hashes} ) @@ -1587,6 +1630,7 @@ class BlockchainProcessorService(BlockchainService): self.hashX_history_cache.clear() self.hashX_full_cache.clear() self.reposted_count_delta.clear() + self.effective_amount_delta.clear() def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0