add reposted_count index

fixes https://github.com/lbryio/hub/issues/63
This commit is contained in:
Jack Robison 2022-07-18 15:36:59 -04:00
parent 18e0571e81
commit 2627f02a55
4 changed files with 76 additions and 6 deletions

View file

@ -48,6 +48,7 @@ class DB_PREFIXES(enum.Enum):
touched_hashX = b'e'
hashX_status = b'f'
hashX_mempool_status = b'g'
reposted_count = b'j'
COLUMN_SETTINGS = {} # this is updated by the PrefixRow metaclass

View file

@ -116,9 +116,10 @@ class SecondaryDB:
return
def get_reposted_count(self, claim_hash: bytes) -> int:
return sum(
1 for _ in self.prefix_db.reposted_claim.iterate(prefix=(claim_hash,), include_value=False)
)
v = self.prefix_db.reposted_count.get(claim_hash)
if v:
return v.reposted_count
return 0
def get_activation(self, tx_num, position, is_support=False) -> int:
activation = self.prefix_db.activated.get(
@ -632,11 +633,12 @@ class SecondaryDB:
)
}
# collect all of the repost counts
repost_counts = {
claim_hash: await run_in_executor(
self._executor, self.get_reposted_count, claim_hash
claim_hash: 0 if not v else v.reposted_count
async for (claim_hash, ), v in self.prefix_db.reposted_count.multi_get_async_gen(
self._executor, [(claim_hash,) for claim_hash in claims]
)
for claim_hash, claim in claims.items() if claim is not None
}
effective_amounts = {

View file

@ -1025,6 +1025,44 @@ class RepostedPrefixRow(PrefixRow):
return cls.pack_key(reposted_claim_hash, tx_num, position), cls.pack_value(claim_hash)
class RepostedCountKey(NamedTuple):
claim_hash: bytes
class RepostedCountValue(NamedTuple):
reposted_count: int
class RepostedCountPrefixRow(PrefixRow):
prefix = DB_PREFIXES.reposted_count.value
key_struct = struct.Struct(b'>20s')
value_struct = struct.Struct(b'>L')
key_part_lambdas = [
lambda: b'',
struct.Struct(b'>20s').pack,
]
@classmethod
def pack_key(cls, claim_hash: bytes):
return super().pack_key(claim_hash)
@classmethod
def unpack_key(cls, key: bytes) -> RepostedCountKey:
return RepostedCountKey(*super().unpack_key(key))
@classmethod
def pack_value(cls, reposted_count: int) -> bytes:
return super().pack_value(reposted_count)
@classmethod
def unpack_value(cls, data: bytes) -> RepostedCountValue:
return RepostedCountValue(*super().unpack_value(data))
@classmethod
def pack_item(cls, claim_hash: bytes, reposted_count: int):
return cls.pack_key(claim_hash), cls.pack_value(reposted_count)
class UndoKey(NamedTuple):
height: int
block_hash: bytes
@ -1753,6 +1791,7 @@ class PrefixDB(BasePrefixDB):
self.effective_amount = EffectiveAmountPrefixRow(db, self._op_stack)
self.repost = RepostPrefixRow(db, self._op_stack)
self.reposted_claim = RepostedPrefixRow(db, self._op_stack)
self.reposted_count = RepostedCountPrefixRow(db, self._op_stack)
self.undo = UndoPrefixRow(db, self._op_stack)
self.utxo = UTXOPrefixRow(db, self._op_stack)
self.hashX_utxo = HashXUTXOPrefixRow(db, self._op_stack)

View file

@ -122,6 +122,9 @@ class BlockchainProcessorService(BlockchainService):
self.pending_transaction_num_mapping: Dict[bytes, int] = {}
self.pending_transactions: Dict[int, bytes] = {}
# {claim_hash: <count>}
self.reposted_count_delta = defaultdict(int)
self.hashX_history_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_history', NAMESPACE)
self.hashX_full_cache = LFUCacheWithMetrics(max(100, env.hashX_history_cache_size), 'hashX_full', NAMESPACE)
self.history_tx_info_cache = LFUCacheWithMetrics(max(100, env.history_tx_cache_size), 'hashX_tx', NAMESPACE)
@ -307,6 +310,7 @@ class BlockchainProcessorService(BlockchainService):
if is_repost:
reposted_claim_hash = signable.repost.reference.claim_hash[::-1]
self.pending_reposted.add(reposted_claim_hash)
self.reposted_count_delta[reposted_claim_hash] += 1
is_channel = signable.is_channel
if is_channel:
self.pending_channels[claim_hash] = signable.channel.public_key_bytes
@ -477,6 +481,7 @@ class BlockchainProcessorService(BlockchainService):
self.db.prefix_db.reposted_claim.stage_delete(
(pending.reposted_claim_hash, pending.tx_num, pending.position), (pending.claim_hash,)
)
self.reposted_count_delta[pending.reposted_claim_hash] -= 1
def _add_support(self, height: int, txo: 'TxOutput', tx_num: int, nout: int):
supported_claim_hash = txo.support.claim_hash[::-1]
@ -1163,6 +1168,28 @@ class BlockchainProcessorService(BlockchainService):
if claim_hash not in self.abandoned_claims}
)
# update the reposted counts
reposted_to_check = [(claim_hash,) for claim_hash, delta in self.reposted_count_delta.items() if delta != 0]
existing_repost_counts = {}
if reposted_to_check:
existing_repost_counts.update({
claim_hash: v.reposted_count
for (claim_hash,), v in zip(reposted_to_check, self.db.prefix_db.reposted_count.multi_get(
reposted_to_check
)) if v is not None
})
if existing_repost_counts:
self.db.prefix_db.multi_delete([
self.db.prefix_db.reposted_count.pack_item(claim_hash, count)
for claim_hash, count in existing_repost_counts.items()
])
repost_count_puts = []
for claim_hash, delta in self.reposted_count_delta.items():
if delta != 0:
new_count = existing_repost_counts.get(claim_hash, 0) + delta
repost_count_puts.append(((claim_hash,), (new_count,)))
self.db.prefix_db.reposted_count.stage_multi_put(repost_count_puts)
# gather cumulative removed/touched sets to update the search index
self.removed_claim_hashes.update(set(self.abandoned_claims.keys()))
self.touched_claim_hashes.difference_update(self.removed_claim_hashes)
@ -1558,6 +1585,7 @@ class BlockchainProcessorService(BlockchainService):
self.mempool.clear()
self.hashX_history_cache.clear()
self.hashX_full_cache.clear()
self.reposted_count_delta.clear()
def backup_block(self):
assert len(self.db.prefix_db._op_stack) == 0