improve claims_producer performance

This commit is contained in:
Jack Robison 2021-10-13 14:09:16 -04:00
parent ba1d0a12d1
commit 3dc3792478
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2

View file

@ -697,49 +697,44 @@ class LevelDB:
batch.clear() batch.clear()
async def claims_producer(self, claim_hashes: Set[bytes]): async def claims_producer(self, claim_hashes: Set[bytes]):
batch = []
results = []
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
def produce_claim(claim_hash): def produce_claims(claims):
if claim_hash not in self.claim_to_txo: batch = []
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) results = []
return
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo:
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
def get_metadata(claim): for claim_hash in claims:
meta = self._prepare_claim_metadata(claim.claim_hash, claim) if claim_hash not in self.claim_to_txo:
if meta: self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
results.append(meta) return
name = self.claim_to_txo[claim_hash].normalized_name
if not self.prefix_db.claim_takeover.get(name):
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
return
claim_txo = self.claim_to_txo.get(claim_hash)
if not claim_txo:
return
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
claim = self._prepare_resolve_result(
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
)
if claim:
batch.append(claim)
batch.sort(key=lambda x: x.tx_hash)
for claim in batch:
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
if meta:
results.append(meta)
return results
if claim_hashes: if claim_hashes:
await asyncio.wait( results = await loop.run_in_executor(None, produce_claims, claim_hashes)
[loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes]
)
batch.sort(key=lambda x: x.tx_hash)
if batch: for meta in results:
await asyncio.wait( yield meta
[loop.run_in_executor(None, get_metadata, claim) for claim in batch]
)
for meta in results:
yield meta
batch.clear()
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
activated = defaultdict(list) activated = defaultdict(list)