From 19494088bdc8a0ccc9ed097c5cc398d9bba1a525 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 22 Feb 2021 16:42:43 -0300 Subject: [PATCH] generate from queue --- lbry/wallet/server/db/elastic_search.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index a2a6eb642..ceb5ce85f 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -80,20 +80,21 @@ class SearchIndex: def delete_index(self): return self.client.indices.delete(self.index, ignore_unavailable=True) + async def _queue_consumer_doc_producer(self, queue: asyncio.Queue): + while not queue.empty(): + op, doc = queue.get_nowait() + if op == 'delete': + yield {'_index': self.index, '_op_type': 'delete', '_id': doc} + else: + yield extract_doc(doc, self.index) + async def sync_queue(self, claim_queue): self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize()) - if claim_queue.empty(): - return - actions = [] - while not claim_queue.empty(): - operation, doc = claim_queue.get_nowait() - actions.append(extract_doc(doc, self.index)) - self.logger.info("prepare update: %d elements. Queue: %d elements", len(actions), claim_queue.qsize()) await self.client.indices.refresh(self.index) - self.logger.info("update done: %d elements. Queue: %d elements", len(actions), claim_queue.qsize()) - await async_bulk(self.client, actions) + await async_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)) await self.client.indices.refresh(self.index) await self.client.indices.flush(self.index) + self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize()) async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): def make_query(censor_type, blockdict, channels=False):