generate from queue

This commit is contained in:
Victor Shyba 2021-02-22 16:42:43 -03:00
parent 920dad524a
commit 19494088bd

View file

@ -80,20 +80,21 @@ class SearchIndex:
def delete_index(self): def delete_index(self):
return self.client.indices.delete(self.index, ignore_unavailable=True) return self.client.indices.delete(self.index, ignore_unavailable=True)
async def _queue_consumer_doc_producer(self, queue: asyncio.Queue):
while not queue.empty():
op, doc = queue.get_nowait()
if op == 'delete':
yield {'_index': self.index, '_op_type': 'delete', '_id': doc}
else:
yield extract_doc(doc, self.index)
async def sync_queue(self, claim_queue): async def sync_queue(self, claim_queue):
self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize()) self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize())
if claim_queue.empty():
return
actions = []
while not claim_queue.empty():
operation, doc = claim_queue.get_nowait()
actions.append(extract_doc(doc, self.index))
self.logger.info("prepare update: %d elements. Queue: %d elements", len(actions), claim_queue.qsize())
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
self.logger.info("update done: %d elements. Queue: %d elements", len(actions), claim_queue.qsize()) await async_bulk(self.client, self._queue_consumer_doc_producer(claim_queue))
await async_bulk(self.client, actions)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
await self.client.indices.flush(self.index) await self.client.indices.flush(self.index)
self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize())
async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels):
def make_query(censor_type, blockdict, channels=False): def make_query(censor_type, blockdict, channels=False):