From 2d48e93f747fa54f86b327d695bd7efc244b9da5 Mon Sep 17 00:00:00 2001
From: Jack Robison <jackrobison@lbry.io>
Date: Mon, 2 Aug 2021 12:08:55 -0400
Subject: [PATCH] fix bulk es sync

---
 lbry/wallet/server/leveldb.py | 45 ++++++++++-------------------------
 1 file changed, 13 insertions(+), 32 deletions(-)

diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py
index 541edf2df..12e2050ee 100644
--- a/lbry/wallet/server/leveldb.py
+++ b/lbry/wallet/server/leveldb.py
@@ -702,39 +702,25 @@ class LevelDB:
             # TODO: fix the couple of claim txos that dont have controlling names
             if not self.db.get(Prefixes.claim_takeover.pack_key(Prefixes.claim_to_txo.unpack_value(v).name)):
                 continue
-            tasks.append(
-                loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash[1:])
-            )
-            if len(tasks) == batch_size:
-                for t in asyncio.as_completed(tasks):
-                    claim = await t
-                    if claim:
-                        batch.append(claim)
-                tasks.clear()
+            claim = self._fs_get_claim_by_hash(claim_hash[1:])
+            if claim:
+                batch.append(claim)
+            if len(batch) == batch_size:
                 batch.sort(key=lambda x: x.tx_hash)
-                for claim_fut in asyncio.as_completed(
-                    [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim)
-                     for claim in batch]):
-                    meta = await claim_fut
+                for claim in batch:
+                    meta = self._prepare_claim_metadata(claim.claim_hash, claim)
                     if meta:
                         yield meta
                 batch.clear()
-        for t in asyncio.as_completed(tasks):
-            claim = await t
-            if claim:
-                batch.append(claim)
         batch.sort(key=lambda x: x.tx_hash)
-        for claim_fut in asyncio.as_completed(
-                [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim)
-                 for claim in batch]):
-            meta = await claim_fut
+        for claim in batch:
+            meta = self._prepare_claim_metadata(claim.claim_hash, claim)
             if meta:
                 yield meta
+        batch.clear()
 
     async def claims_producer(self, claim_hashes: Set[bytes]):
         batch = []
-        loop = asyncio.get_event_loop()
-        tasks = []
         for claim_hash in claim_hashes:
             if claim_hash not in self.claim_to_txo:
                 self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
@@ -743,20 +729,15 @@ class LevelDB:
             if not self.db.get(Prefixes.claim_takeover.pack_key(name)):
                 self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex())
                 continue
-            tasks.append(
-                loop.run_in_executor(None, self._fs_get_claim_by_hash, claim_hash)
-            )
-        for t in asyncio.as_completed(tasks):
-            claim = await t
+            claim = self._fs_get_claim_by_hash(claim_hash)
             if claim:
                 batch.append(claim)
         batch.sort(key=lambda x: x.tx_hash)
-        for claim_fut in asyncio.as_completed(
-            [loop.run_in_executor(None, self._prepare_claim_metadata, claim.claim_hash, claim)
-                 for claim in batch]):
-            meta = await claim_fut
+        for claim in batch:
+            meta = self._prepare_claim_metadata(claim.claim_hash, claim)
             if meta:
                 yield meta
+        batch.clear()
 
     def get_activated_at_height(self, height: int) -> DefaultDict[PendingActivationValue, List[PendingActivationKey]]:
         activated = defaultdict(list)