From fd7bfbea782a2c999f2efc7d4b7cf93c923722ab Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 18 Jun 2021 13:54:02 -0400 Subject: [PATCH] faster claim producer -make batches of claim txos from the iterator, and sort by tx hash before fetching to maximize cache and read ahead hits --- lbry/wallet/server/leveldb.py | 42 +++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index e791989ee..5b0494120 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -442,7 +442,7 @@ class LevelDB: txos[claim_hash] = tx_num, nout return txos - def get_claim_output_script(self, tx_hash, nout): + def get_claim_metadata(self, tx_hash, nout): raw = self.db.get( DB_PREFIXES.TX_PREFIX.value + tx_hash ) @@ -463,9 +463,13 @@ class LevelDB: if not claim: print("wat") return - metadata = self.get_claim_output_script(claim.tx_hash, claim.position) + return self._prepare_claim_metadata(claim_hash, claim) + + def _prepare_claim_metadata(self, claim_hash: bytes, claim: ResolveResult): + metadata = self.get_claim_metadata(claim.tx_hash, claim.position) if not metadata: return + reposted_claim_hash = None if not metadata.is_repost else metadata.repost.reference.claim_hash[::-1] reposted_claim = None reposted_metadata = None @@ -473,7 +477,7 @@ class LevelDB: reposted_claim = self.get_claim_txo(reposted_claim_hash) if not reposted_claim: return - reposted_metadata = self.get_claim_output_script( + reposted_metadata = self.get_claim_metadata( self.total_transactions[reposted_claim.tx_num], reposted_claim.position ) if not reposted_metadata: @@ -592,17 +596,37 @@ class LevelDB: value['release_time'] = metadata.stream.release_time return value - def all_claims_producer(self): + def all_claims_producer(self, batch_size=500_000): + batch = [] for claim_hash in self.db.iterator(prefix=Prefixes.claim_to_txo.prefix, include_value=False): - claim = self._prepare_claim_for_sync(claim_hash[1:]) + claim = self._fs_get_claim_by_hash(claim_hash[1:]) if claim: - yield claim + batch.append(claim) + if len(batch) == batch_size: + batch.sort(key=lambda x: x.tx_hash) + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if meta: + yield meta + batch.clear() + batch.sort(key=lambda x: x.tx_hash) + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if meta: + yield meta + batch.clear() def claims_producer(self, claim_hashes: Set[bytes]): + batch = [] for claim_hash in claim_hashes: - result = self._prepare_claim_for_sync(claim_hash) - if result: - yield result + claim = self._fs_get_claim_by_hash(claim_hash) + if claim: + batch.append(claim) + batch.sort(key=lambda x: x.tx_hash) + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if meta: + yield meta def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list)