diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index e03d4cbec..6f9820e54 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -33,7 +33,15 @@ class StreamResolution(str): return LookupError(f'Could not find claim at "{url}".') +class IndexVersionMismatch(Exception): + def __init__(self, got_version, expected_version): + self.got_version = got_version + self.expected_version = expected_version + + class SearchIndex: + VERSION = 1 + def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200): self.search_timeout = search_timeout self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import @@ -48,6 +56,18 @@ class SearchIndex: self._elastic_host = elastic_host self._elastic_port = elastic_port + async def get_index_version(self) -> int: + try: + template = await self.sync_client.indices.get_template(self.index) + return template[self.index]['version'] + except NotFoundError: + return 0 + + async def set_index_version(self, version): + await self.sync_client.indices.put_template( + self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400 + ) + async def start(self): if self.sync_client: return @@ -61,8 +81,17 @@ class SearchIndex: except ConnectionError: self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!") await asyncio.sleep(1) + res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400) - return res.get('acknowledged', False) + acked = res.get('acknowledged', False) + if acked: + await self.set_index_version(self.VERSION) + return acked + index_version = await self.get_index_version() + if index_version != self.VERSION: + self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION) + raise IndexVersionMismatch(f"{index_version} vs {self.VERSION}") + return acked def stop(self): clients = [self.sync_client, self.search_client] diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index bc716021a..2ca7644f6 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -10,7 +10,7 @@ from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk from lbry.wallet.server.env import Env from lbry.wallet.server.coin import LBC -from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex +from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch INDEX = 'claims' @@ -63,8 +63,15 @@ async def consume(producer): async def make_es_index(): env = Env(LBC) index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) + try: return await index.start() + except IndexVersionMismatch as err: + logging.info( + "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version + ) + await index.delete_index() + return await index.start() finally: index.stop()