diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index b76181063..8558afe5c 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -215,7 +215,7 @@ class BlockProcessor: start = time.perf_counter() await self.run_in_thread_with_lock(self.advance_blocks, blocks) if self.sql: - await self.db.search_index.sync_queue(self.sql.claim_queue) + await self.db.search_index.claim_consumer(self.sql.claim_producer()) for cache in self.search_cache.values(): cache.clear() self.history_cache.clear() diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 76871fa66..34d3254ed 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -83,25 +83,26 @@ class SearchIndex: def delete_index(self): return self.client.indices.delete(self.index, ignore_unavailable=True) - async def _queue_consumer_doc_producer(self, queue: asyncio.Queue): - while not queue.empty(): - op, doc = queue.get_nowait() + async def _consume_claim_producer(self, claim_producer): + count = 0 + for op, doc in claim_producer: if op == 'delete': yield {'_index': self.index, '_op_type': 'delete', '_id': doc} else: yield extract_doc(doc, self.index) + count += 1 + if count % 100: + self.logger.info("Indexing in progress, %d claims.", count) + self.logger.info("Indexing done for %d claims.", count) - async def sync_queue(self, claim_queue): - self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize()) + async def claim_consumer(self, claim_producer): await self.client.indices.refresh(self.index) - async for ok, item in async_streaming_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)): + async for ok, item in async_streaming_bulk(self.client, self._consume_claim_producer(claim_producer)): if not ok: self.logger.warning("indexing failed for an item: %s", item) await self.client.indices.refresh(self.index) await self.client.indices.flush(self.index) - self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize()) - self.search_cache.clear() - self.channel_cache.clear() + self.logger.info("Indexing done.") async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): def make_query(censor_type, blockdict, channels=False): @@ -134,6 +135,8 @@ class SearchIndex: await self.client.indices.refresh(self.index) await self.client.update_by_query(self.index, body=make_query(2, blocked_channels, True), slices=32) await self.client.indices.refresh(self.index) + self.search_cache.clear() + self.channel_cache.clear() async def delete_above_height(self, height): await self.client.delete_by_query(self.index, expand_query(height='>'+str(height))) diff --git a/lbry/wallet/server/db/writer.py b/lbry/wallet/server/db/writer.py index d7a0809ed..831038525 100644 --- a/lbry/wallet/server/db/writer.py +++ b/lbry/wallet/server/db/writer.py @@ -233,7 +233,7 @@ class SQLDB: unhexlify(channel_id)[::-1] for channel_id in filtering_channels if channel_id } self.trending = trending - self.claim_queue = Queue() + self.pending_deletes = set() def open(self): self.db = apsw.Connection( @@ -852,18 +852,24 @@ class SQLDB: claim['tags'] = claim['tags'].split(',,') if claim['tags'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] - self.claim_queue.put_nowait(('update', claim)) + yield 'update', claim + + def clear_changelog(self): self.execute("delete from changelog;") - def enqueue_deleted(self, deleted_claims): - for claim_hash in deleted_claims: - self.claim_queue.put_nowait(('delete', hexlify(claim_hash[::-1]).decode())) + def claim_producer(self): + while self.pending_deletes: + claim_hash = self.pending_deletes.pop() + yield 'delete', hexlify(claim_hash[::-1]).decode() + for claim in self.enqueue_changes(): + yield claim + self.clear_changelog() def advance_txs(self, height, all_txs, header, daemon_height, timer): insert_claims = [] update_claims = [] update_claim_hashes = set() - delete_claim_hashes = set() + delete_claim_hashes = self.pending_deletes insert_supports = [] delete_support_txo_hashes = set() recalculate_claim_hashes = set() # added/deleted supports, added/updated claim @@ -943,8 +949,6 @@ class SQLDB: r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True) for algorithm in self.trending: r(algorithm.run, self.db.cursor(), height, daemon_height, recalculate_claim_hashes) - r(self.enqueue_deleted, delete_claim_hashes) - r(self.enqueue_changes) class LBRYLevelDB(LevelDB):