From c68334b421e43f3f279a293ec18b90ef00cd09ab Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 2 Aug 2021 12:08:55 -0400 Subject: [PATCH] fix bulk es sync --- lbry/wallet/server/leveldb.py | 45 ++++++++++------------------------- 1 file changed, 13 insertions(+), 32 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 541edf2df..12e2050ee 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -702,39 +702,25 @@ class LevelDB: # TODO: fix the couple of claim txos that dont have controlling names if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)): continue - tasks.append( - loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash[1:]) - ) - if len(tasks) == batch_size: - for t in asyncio.as_completed(tasks): - claim = await t - if claim: - batch.append(claim) - tasks.clear() + claim = self._fs_get_claim_by_hash(claim_hash[1:]) + if claim: + batch.append(claim) + if len(batch) == batch_size: batch.sort(key=lambda x: x.tx_hash) - for claim_fut in asyncio.as_completed( - [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) - for claim in batch]): - meta = await claim_fut + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) if meta: yield meta batch.clear() - for t in asyncio.as_completed(tasks): - claim = await t - if claim: - batch.append(claim) batch.sort(key=lambda x: x.tx_hash) - for claim_fut in asyncio.as_completed( - [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) - for claim in batch]): - meta = await claim_fut + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) if meta: yield meta + batch.clear() async def claims_producer(self, claim_hashes: Set[bytes]): batch = [] - loop = asyncio.get_event_loop() - tasks = [] for claim_hash in claim_hashes: if claim_hash not in self.claim_to_txo: self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) @@ -743,20 +729,15 @@ class LevelDB: if not self.db.get(Prefixes.claim_takeover.pack_key(name)): self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) continue - tasks.append( - loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash) - ) - for t in asyncio.as_completed(tasks): - claim = await t + claim = self._fs_get_claim_by_hash(claim_hash) if claim: batch.append(claim) batch.sort(key=lambda x: x.tx_hash) - for claim_fut in asyncio.as_completed( - [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim) - for claim in batch]): - meta = await claim_fut + for claim in batch: + meta = self._prepare_claim_metadata(claim.claim_hash, claim) if meta: yield meta + batch.clear() def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]: activated = defaultdict(list)