diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 5de62bce6..16176b3b5 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -697,49 +697,44 @@ class LevelDB: batch.clear() async def claims_producer(self, claim_hashes: Set[bytes]): - batch = [] - results = [] - loop = asyncio.get_event_loop() - def produce_claim(claim_hash): - if claim_hash not in self.claim_to_txo: - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - name = self.claim_to_txo[claim_hash].normalized_name - if not self.prefix_db.claim_takeover.get(name): - self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - return - claim_txo = self.claim_to_txo.get(claim_hash) - if not claim_txo: - return - activation = self.get_activation(claim_txo.tx_num, claim_txo.position) - claim = self._prepare_resolve_result( - claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, - claim_txo.root_position, activation, claim_txo.channel_signature_is_valid - ) - if claim: - batch.append(claim) + def produce_claims(claims): + batch = [] + _results = [] - def get_metadata(claim): - meta = self._prepare_claim_metadata(claim.claim_hash, claim) - if meta: - results.append(meta) + for claim_hash in claims: + if claim_hash not in self.claim_to_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + name = self.claim_to_txo[claim_hash].normalized_name + if not self.prefix_db.claim_takeover.get(name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + continue + claim_txo = self.claim_to_txo.get(claim_hash) + if not claim_txo: + continue + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if claim: + batch.append(claim) + + batch.sort(key=lambda x: x.tx_hash) + + for claim in batch: + _meta = self._prepare_claim_metadata(claim.claim_hash, claim) + if _meta: + _results.append(_meta) + return _results if claim_hashes: - await asyncio.wait( - [loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes] - ) - batch.sort(key=lambda x: x.tx_hash) + results = await loop.run_in_executor(None, produce_claims, claim_hashes) - if batch: - await asyncio.wait( - [loop.run_in_executor(None, get_metadata, claim) for claim in batch] - ) - for meta in results: - yield meta - - batch.clear() + for meta in results: + yield meta def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list)