forked from LBRYCommunity/lbry-sdk
Merge pull request #3294 from lbryio/versioned-es-search-index
add versioning to ES search index and automate resync on version bumps
This commit is contained in:
commit
48c9e9f3cc
2 changed files with 38 additions and 2 deletions
|
@ -33,7 +33,15 @@ class StreamResolution(str):
|
||||||
return LookupError(f'Could not find claim at "{url}".')
|
return LookupError(f'Could not find claim at "{url}".')
|
||||||
|
|
||||||
|
|
||||||
|
class IndexVersionMismatch(Exception):
|
||||||
|
def __init__(self, got_version, expected_version):
|
||||||
|
self.got_version = got_version
|
||||||
|
self.expected_version = expected_version
|
||||||
|
|
||||||
|
|
||||||
class SearchIndex:
|
class SearchIndex:
|
||||||
|
VERSION = 1
|
||||||
|
|
||||||
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
|
def __init__(self, index_prefix: str, search_timeout=3.0, elastic_host='localhost', elastic_port=9200):
|
||||||
self.search_timeout = search_timeout
|
self.search_timeout = search_timeout
|
||||||
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
|
self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import
|
||||||
|
@ -48,6 +56,18 @@ class SearchIndex:
|
||||||
self._elastic_host = elastic_host
|
self._elastic_host = elastic_host
|
||||||
self._elastic_port = elastic_port
|
self._elastic_port = elastic_port
|
||||||
|
|
||||||
|
async def get_index_version(self) -> int:
|
||||||
|
try:
|
||||||
|
template = await self.sync_client.indices.get_template(self.index)
|
||||||
|
return template[self.index]['version']
|
||||||
|
except NotFoundError:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
async def set_index_version(self, version):
|
||||||
|
await self.sync_client.indices.put_template(
|
||||||
|
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
|
||||||
|
)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
if self.sync_client:
|
if self.sync_client:
|
||||||
return
|
return
|
||||||
|
@ -61,8 +81,17 @@ class SearchIndex:
|
||||||
except ConnectionError:
|
except ConnectionError:
|
||||||
self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!")
|
self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!")
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
|
res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400)
|
||||||
return res.get('acknowledged', False)
|
acked = res.get('acknowledged', False)
|
||||||
|
if acked:
|
||||||
|
await self.set_index_version(self.VERSION)
|
||||||
|
return acked
|
||||||
|
index_version = await self.get_index_version()
|
||||||
|
if index_version != self.VERSION:
|
||||||
|
self.logger.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION)
|
||||||
|
raise IndexVersionMismatch(f"{index_version} vs {self.VERSION}")
|
||||||
|
return acked
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
clients = [self.sync_client, self.search_client]
|
clients = [self.sync_client, self.search_client]
|
||||||
|
|
|
@ -10,7 +10,7 @@ from elasticsearch import AsyncElasticsearch
|
||||||
from elasticsearch.helpers import async_bulk
|
from elasticsearch.helpers import async_bulk
|
||||||
from lbry.wallet.server.env import Env
|
from lbry.wallet.server.env import Env
|
||||||
from lbry.wallet.server.coin import LBC
|
from lbry.wallet.server.coin import LBC
|
||||||
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex
|
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch
|
||||||
|
|
||||||
INDEX = 'claims'
|
INDEX = 'claims'
|
||||||
|
|
||||||
|
@ -63,8 +63,15 @@ async def consume(producer):
|
||||||
async def make_es_index():
|
async def make_es_index():
|
||||||
env = Env(LBC)
|
env = Env(LBC)
|
||||||
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
|
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return await index.start()
|
return await index.start()
|
||||||
|
except IndexVersionMismatch as err:
|
||||||
|
logging.info(
|
||||||
|
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
|
||||||
|
)
|
||||||
|
await index.delete_index()
|
||||||
|
return await index.start()
|
||||||
finally:
|
finally:
|
||||||
index.stop()
|
index.stop()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue