From ff960fda0e7432800fd9e28f4662b5078aea8e94 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 8 Sep 2021 16:49:38 -0400 Subject: [PATCH] non blocking claim producer --- lbry/wallet/server/env.py | 2 +- lbry/wallet/server/leveldb.py | 39 ++++++++++++++++++++++++++++------- lbry/wallet/server/server.py | 2 +- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index d2de19254..2b4c489b3 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -46,7 +46,7 @@ class Env: self.trending_whale_half_life = math.log2(0.1 ** (1 / (3 + self.integer('TRENDING_WHALE_DECAY_RATE', 24)))) + 1 self.trending_whale_threshold = float(self.integer('TRENDING_WHALE_THRESHOLD', 10000)) * 1E8 - self.max_query_workers = self.integer('MAX_QUERY_WORKERS', None) + self.max_query_workers = self.integer('MAX_QUERY_WORKERS', 4) self.individual_tag_indexes = self.boolean('INDIVIDUAL_TAG_INDEXES', True) self.track_metrics = self.boolean('TRACK_METRICS', False) self.websocket_host = self.default('WEBSOCKET_HOST', self.host) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 225d32d02..51a397c84 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -742,22 +742,47 @@ class LevelDB: async def claims_producer(self, claim_hashes: Set[bytes]): batch = [] - for claim_hash in claim_hashes: + results = [] + + loop = asyncio.get_event_loop() + + def produce_claim(claim_hash): if claim_hash not in self.claim_to_txo: self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue + return name = self.claim_to_txo[claim_hash].normalized_name if not self.prefix_db.claim_takeover.get(name): self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) - continue - claim = self._fs_get_claim_by_hash(claim_hash) + return + claim_txo = self.claim_to_txo.get(claim_hash) + if not claim_txo: + return + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) if claim: batch.append(claim) - batch.sort(key=lambda x: x.tx_hash) - for claim in batch: + + def get_metadata(claim): meta = self._prepare_claim_metadata(claim.claim_hash, claim) if meta: - yield meta + results.append(meta) + + if claim_hashes: + await asyncio.wait( + [loop.run_in_executor(None, produce_claim, claim_hash) for claim_hash in claim_hashes] + ) + batch.sort(key=lambda x: x.tx_hash) + + if batch: + await asyncio.wait( + [loop.run_in_executor(None, get_metadata, claim) for claim in batch] + ) + for meta in results: + yield meta + batch.clear() def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: diff --git a/lbry/wallet/server/server.py b/lbry/wallet/server/server.py index 21572feca..2a0a2111e 100644 --- a/lbry/wallet/server/server.py +++ b/lbry/wallet/server/server.py @@ -69,7 +69,7 @@ class Server: def run(self): loop = asyncio.get_event_loop() - executor = ThreadPoolExecutor(4) + executor = ThreadPoolExecutor(self.env.max_query_workers) loop.set_default_executor(executor) def __exit():