forked from LBRYCommunity/lbry-sdk
improve claims_producer performance
This commit is contained in:
parent
15ac2ade59
commit
e0086682b9
1 changed files with 32 additions and 37 deletions
|
@ -697,49 +697,44 @@ class LevelDB:
|
||||||
batch.clear()
|
batch.clear()
|
||||||
|
|
||||||
async def claims_producer(self, claim_hashes: Set[bytes]):
|
async def claims_producer(self, claim_hashes: Set[bytes]):
|
||||||
batch = []
|
|
||||||
results = []
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
def produce_claim(claim_hash):
|
def produce_claims(claims):
|
||||||
if claim_hash not in self.claim_to_txo:
|
batch = []
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
_results = []
|
||||||
return
|
|
||||||
name = self.claim_to_txo[claim_hash].normalized_name
|
|
||||||
if not self.prefix_db.claim_takeover.get(name):
|
|
||||||
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
|
||||||
return
|
|
||||||
claim_txo = self.claim_to_txo.get(claim_hash)
|
|
||||||
if not claim_txo:
|
|
||||||
return
|
|
||||||
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
|
||||||
claim = self._prepare_resolve_result(
|
|
||||||
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
|
||||||
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
|
||||||
)
|
|
||||||
if claim:
|
|
||||||
batch.append(claim)
|
|
||||||
|
|
||||||
def get_metadata(claim):
|
for claim_hash in claims:
|
||||||
meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
if claim_hash not in self.claim_to_txo:
|
||||||
if meta:
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
results.append(meta)
|
continue
|
||||||
|
name = self.claim_to_txo[claim_hash].normalized_name
|
||||||
|
if not self.prefix_db.claim_takeover.get(name):
|
||||||
|
self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
|
||||||
|
continue
|
||||||
|
claim_txo = self.claim_to_txo.get(claim_hash)
|
||||||
|
if not claim_txo:
|
||||||
|
continue
|
||||||
|
activation = self.get_activation(claim_txo.tx_num, claim_txo.position)
|
||||||
|
claim = self._prepare_resolve_result(
|
||||||
|
claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num,
|
||||||
|
claim_txo.root_position, activation, claim_txo.channel_signature_is_valid
|
||||||
|
)
|
||||||
|
if claim:
|
||||||
|
batch.append(claim)
|
||||||
|
|
||||||
|
batch.sort(key=lambda x: x.tx_hash)
|
||||||
|
|
||||||
|
for claim in batch:
|
||||||
|
_meta = self._prepare_claim_metadata(claim.claim_hash, claim)
|
||||||
|
if _meta:
|
||||||
|
_results.append(_meta)
|
||||||
|
return _results
|
||||||
|
|
||||||
if claim_hashes:
|
if claim_hashes:
|
||||||
await asyncio.wait(
|
results = await loop.run_in_executor(None, produce_claims, claim_hashes)
|
||||||
[loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes]
|
|
||||||
)
|
|
||||||
batch.sort(key=lambda x: x.tx_hash)
|
|
||||||
|
|
||||||
if batch:
|
for meta in results:
|
||||||
await asyncio.wait(
|
yield meta
|
||||||
[loop.run_in_executor(None, get_metadata, claim) for claim in batch]
|
|
||||||
)
|
|
||||||
for meta in results:
|
|
||||||
yield meta
|
|
||||||
|
|
||||||
batch.clear()
|
|
||||||
|
|
||||||
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
|
||||||
activated = defaultdict(list)
|
activated = defaultdict(list)
|
||||||
|
|
Loading…
Reference in a new issue