diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 4dc16a27a..1d2355d6f 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -5,8 +5,6 @@ from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional from prometheus_client import Gauge, Histogram import lbry -from lbry.schema.claim import Claim -from lbry.wallet.server.db.elastic_search import SearchIndex from lbry.wallet.server.db.writer import SQLDB from lbry.wallet.server.daemon import DaemonError from lbry.wallet.server.hash import hash_to_hex_str, HASHX_LEN @@ -165,6 +163,7 @@ class BlockProcessor: self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.logger = class_logger(__name__, self.__class__.__name__) self.executor = ThreadPoolExecutor(1) + self.index_executor = ThreadPoolExecutor(8) # Meta self.next_cache_check = 0 @@ -216,6 +215,10 @@ class BlockProcessor: if hprevs == chain: start = time.perf_counter() await self.run_in_thread_with_lock(self.advance_blocks, blocks) + pending = [] + for height in range(first, first + len(blocks)): + pending.append(asyncio.get_event_loop().run_in_executor(self.index_executor, self.db.sql.enqueue_changes, height)) + await asyncio.gather(*pending) await self.db.search_index.sync_queue(self.sql.claim_queue) for cache in self.search_cache.values(): cache.clear() diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index 7b7fb6056..d3a44ab72 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -806,7 +806,7 @@ class SQLDB: f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" ) - def enqueue_changes(self, height, deleted_claims): + def enqueue_changes(self, height): for claim in self.execute(f""" SELECT claimtrie.claim_hash as is_controlling, claimtrie.last_take_over_height, @@ -838,6 +838,8 @@ class SQLDB: claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] if not self.claim_queue.full(): self.claim_queue.put_nowait(('update', claim)) + + def enqueue_deleted(self, deleted_claims): for claim_hash in deleted_claims: if not self.claim_queue.full(): self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode())) @@ -937,7 +939,7 @@ class SQLDB: if not self._fts_synced and self.main.first_sync and height == daemon_height: r(first_sync_finished, self.db.cursor()) self._fts_synced = True - r(self.enqueue_changes, height, delete_claim_hashes) + r(self.enqueue_deleted, delete_claim_hashes) class LBRYLevelDB(LevelDB):