diff --git a/hub/db/common.py b/hub/db/common.py index 1edc5eb..3570b84 100644 --- a/hub/db/common.py +++ b/hub/db/common.py @@ -48,6 +48,7 @@ class DB_PREFIXES(enum.Enum): touched_hashX = b'e' hashX_status = b'f' hashX_mempool_status = b'g' + reposted_count = b'j' COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass diff --git a/hub/db/db.py b/hub/db/db.py index 4a1e4bc..791cc09 100644 --- a/hub/db/db.py +++ b/hub/db/db.py @@ -116,9 +116,10 @@ class SecondaryDB: return def get_reposted_count(self, claim_hash: bytes) -> int: - return sum( - 1 for _ in self.prefix_db.reposted_claim.iterate(prefix=(claim_hash,), include_value=False) - ) + v = self.prefix_db.reposted_count.get(claim_hash) + if v: + return v.reposted_count + return 0 def get_activation(self, tx_num, position, is_support=False) -> int: activation = self.prefix_db.activated.get( @@ -632,11 +633,12 @@ class SecondaryDB: ) } + # collect all of the repost counts repost_counts = { - claim_hash: await run_in_executor( - self._executor, self.get_reposted_count, claim_hash + claim_hash: 0 if not v else v.reposted_count + async for (claim_hash, ), v in self.prefix_db.reposted_count.multi_get_async_gen( + self._executor, [(claim_hash,) for claim_hash in claims] ) - for claim_hash, claim in claims.items() if claim is not None } effective_amounts = { diff --git a/hub/db/prefixes.py b/hub/db/prefixes.py index fbf73f1..ed9238c 100644 --- a/hub/db/prefixes.py +++ b/hub/db/prefixes.py @@ -1025,6 +1025,44 @@ class RepostedPrefixRow(PrefixRow): return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash) +class RepostedCountKey(NamedTuple): + claim_hash: bytes + + +class RepostedCountValue(NamedTuple): + reposted_count: int + + +class RepostedCountPrefixRow(PrefixRow): + prefix = DB_PREFIXES.reposted_count.value + key_struct = struct.Struct(b'>20s') + value_struct = struct.Struct(b'>L') + key_part_lambdas = [ + lambda: b'', + struct.Struct(b'>20s').pack, + ] + + @classmethod + def pack_key(cls, claim_hash: bytes): + return super().pack_key(claim_hash) + + @classmethod + def unpack_key(cls, key: bytes) -> RepostedCountKey: + return RepostedCountKey(*super().unpack_key(key)) + + @classmethod + def pack_value(cls, reposted_count: int) -> bytes: + return super().pack_value(reposted_count) + + @classmethod + def unpack_value(cls, data: bytes) -> RepostedCountValue: + return RepostedCountValue(*super().unpack_value(data)) + + @classmethod + def pack_item(cls, claim_hash: bytes, reposted_count: int): + return cls.pack_key(claim_hash), cls.pack_value(reposted_count) + + class UndoKey(NamedTuple): height: int block_hash: bytes @@ -1753,6 +1791,7 @@ class PrefixDB(BasePrefixDB): self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack) self.repost = RepostPrefixRow(db, self._op_stack) self.reposted_claim = RepostedPrefixRow(db, self._op_stack) + self.reposted_count = RepostedCountPrefixRow(db, self._op_stack) self.undo = UndoPrefixRow(db, self._op_stack) self.utxo = UTXOPrefixRow(db, self._op_stack) self.hashX_utxo = HashXUTXOPrefixRow(db, self._op_stack) diff --git a/hub/scribe/service.py b/hub/scribe/service.py index 46c198b..3421432 100644 --- a/hub/scribe/service.py +++ b/hub/scribe/service.py @@ -122,6 +122,9 @@ class BlockchainProcessorService(BlockchainService): self.pending_transaction_num_mapping: Dict[bytes, int] = {} self.pending_transactions: Dict[int, bytes] = {} + # {claim_hash: } + self.reposted_count_delta = defaultdict(int) + self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE) self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE) self.history_tx_info_cache = LFUCacheWithMetrics(max(100, env.history_tx_cache_size), 'hashX_tx', NAMESPACE) @@ -307,6 +310,7 @@ class BlockchainProcessorService(BlockchainService): if is_repost: reposted_claim_hash = signable.repost.reference.claim_hash[::-1] self.pending_reposted.add(reposted_claim_hash) + self.reposted_count_delta[reposted_claim_hash] += 1 is_channel = signable.is_channel if is_channel: self.pending_channels[claim_hash] = signable.channel.public_key_bytes @@ -477,6 +481,7 @@ class BlockchainProcessorService(BlockchainService): self.db.prefix_db.reposted_claim.stage_delete( (pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,) ) + self.reposted_count_delta[pending.reposted_claim_hash] -= 1 def _add_support(self, height: int, txo: 'TxOutput', tx_num: int, nout: int): supported_claim_hash = txo.support.claim_hash[::-1] @@ -1163,6 +1168,28 @@ class BlockchainProcessorService(BlockchainService): if claim_hash not in self.abandoned_claims} ) + # update the reposted counts + reposted_to_check = [(claim_hash,) for claim_hash, delta in self.reposted_count_delta.items() if delta != 0] + existing_repost_counts = {} + if reposted_to_check: + existing_repost_counts.update({ + claim_hash: v.reposted_count + for (claim_hash,), v in zip(reposted_to_check, self.db.prefix_db.reposted_count.multi_get( + reposted_to_check + )) if v is not None + }) + if existing_repost_counts: + self.db.prefix_db.multi_delete([ + self.db.prefix_db.reposted_count.pack_item(claim_hash, count) + for claim_hash, count in existing_repost_counts.items() + ]) + repost_count_puts = [] + for claim_hash, delta in self.reposted_count_delta.items(): + if delta != 0: + new_count = existing_repost_counts.get(claim_hash, 0) + delta + repost_count_puts.append(((claim_hash,), (new_count,))) + self.db.prefix_db.reposted_count.stage_multi_put(repost_count_puts) + # gather cumulative removed/touched sets to update the search index self.removed_claim_hashes.update(set(self.abandoned_claims.keys())) self.touched_claim_hashes.difference_update(self.removed_claim_hashes) @@ -1558,6 +1585,7 @@ class BlockchainProcessorService(BlockchainService): self.mempool.clear() self.hashX_history_cache.clear() self.hashX_full_cache.clear() + self.reposted_count_delta.clear() def backup_block(self): assert len(self.db.prefix_db._op_stack) == 0