log indexing errors

This commit is contained in:
Victor Shyba 2021-02-22 20:47:56 -03:00
parent 19494088bd
commit d388527ffa

View file

@ -6,7 +6,7 @@ from operator import itemgetter
from typing import Optional, List, Iterable from typing import Optional, List, Iterable
from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError from elasticsearch import AsyncElasticsearch, NotFoundError, ConnectionError
from elasticsearch.helpers import async_bulk from elasticsearch.helpers import async_streaming_bulk
from lbry.crypto.base58 import Base58 from lbry.crypto.base58 import Base58
from lbry.error import ResolveCensoredError from lbry.error import ResolveCensoredError
@ -91,7 +91,9 @@ class SearchIndex:
async def sync_queue(self, claim_queue): async def sync_queue(self, claim_queue):
self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize()) self.logger.info("Writing to index from a queue with %d elements.", claim_queue.qsize())
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
await async_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)) async for ok, item in async_streaming_bulk(self.client, self._queue_consumer_doc_producer(claim_queue)):
if not ok:
self.logger.warning("indexing failed for an item: %s", item)
await self.client.indices.refresh(self.index) await self.client.indices.refresh(self.index)
await self.client.indices.flush(self.index) await self.client.indices.flush(self.index)
self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize()) self.logger.info("Indexing done. Queue: %d elements", claim_queue.qsize())