diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index 6cc3a4a48..14e87cee3 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -164,7 +164,6 @@ class BlockProcessor: self.prefetcher = Prefetcher(daemon, env.coin, self.blocks_event) self.logger = class_logger(__name__, self.__class__.__name__) self.executor = ThreadPoolExecutor(1) - self.index_executor = ThreadPoolExecutor(os.cpu_count()) # Meta self.next_cache_check = 0 @@ -216,10 +215,6 @@ class BlockProcessor: if hprevs == chain: start = time.perf_counter() await self.run_in_thread_with_lock(self.advance_blocks, blocks) - pending = [] - for height in range(first, first + len(blocks)): - pending.append(asyncio.get_event_loop().run_in_executor(self.index_executor, self.db.sql.enqueue_changes, height)) - await asyncio.gather(*pending) await self.db.search_index.sync_queue(self.sql.claim_queue) for cache in self.search_cache.values(): cache.clear() diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index d3a44ab72..667364ee7 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -135,6 +135,17 @@ class SQLDB: create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash); """ + CREATE_CHANGELOG_TRIGGER = """ + create table if not exists changelog ( + claim_hash bytes primary key + ); + create index if not exists claimtrie_claim_hash_idx on claimtrie (claim_hash); + create trigger if not exists claim_changelog after update on claim + begin + insert or ignore into changelog (claim_hash) values (new.claim_hash); + end; + """ + SEARCH_INDEXES = """ -- used by any tag clouds create index if not exists tag_tag_idx on tag (tag, claim_hash); @@ -194,6 +205,7 @@ class SQLDB: CREATE_SUPPORT_TABLE + CREATE_CLAIMTRIE_TABLE + CREATE_TAG_TABLE + + CREATE_CHANGELOG_TRIGGER + CREATE_LANGUAGE_TABLE ) @@ -806,7 +818,7 @@ class SQLDB: f"SELECT claim_hash, normalized FROM claim WHERE expiration_height = {height}" ) - def enqueue_changes(self, height): + def enqueue_changes(self): for claim in self.execute(f""" SELECT claimtrie.claim_hash as is_controlling, claimtrie.last_take_over_height, @@ -814,7 +826,7 @@ class SQLDB: (select group_concat(language, ' ') from language where language.claim_hash in (claim.claim_hash, claim.reposted_claim_hash)) as languages, claim.* FROM claim LEFT JOIN claimtrie USING (claim_hash) - WHERE claim.height = {height} OR claim.claim_hash in (SELECT claim_hash FROM support WHERE height = {height}) + WHERE claim.claim_hash in (SELECT claim_hash FROM changelog) """): claim = claim._asdict() id_set = set(filter(None, (claim['claim_hash'], claim['channel_hash'], claim['reposted_claim_hash']))) @@ -838,6 +850,7 @@ class SQLDB: claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] if not self.claim_queue.full(): self.claim_queue.put_nowait(('update', claim)) + self.execute("delete from changelog;") def enqueue_deleted(self, deleted_claims): for claim_hash in deleted_claims: @@ -940,6 +953,7 @@ class SQLDB: r(first_sync_finished, self.db.cursor()) self._fts_synced = True r(self.enqueue_deleted, delete_claim_hashes) + r(self.enqueue_changes) class LBRYLevelDB(LevelDB):