forked from LBRYCommunity/lbry-sdk
fix es migration bug, expand test case
This commit is contained in:
parent
43a1385b79
commit
cc02a0efc2
3 changed files with 17 additions and 7 deletions
|
@ -68,9 +68,9 @@ class SearchIndex:
|
||||||
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
|
self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400
|
||||||
)
|
)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self) -> bool:
|
||||||
if self.sync_client:
|
if self.sync_client:
|
||||||
return
|
return False
|
||||||
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
|
hosts = [{'host': self._elastic_host, 'port': self._elastic_port}]
|
||||||
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
|
self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout)
|
||||||
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout)
|
self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout)
|
||||||
|
|
|
@ -53,7 +53,7 @@ async def consume(producer, index_name):
|
||||||
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
|
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
|
||||||
try:
|
try:
|
||||||
await async_bulk(es, producer, request_timeout=120)
|
await async_bulk(es, producer, request_timeout=120)
|
||||||
print(await es.indices.refresh(index=index_name))
|
await es.indices.refresh(index=index_name)
|
||||||
finally:
|
finally:
|
||||||
await es.close()
|
await es.close()
|
||||||
|
|
||||||
|
@ -70,6 +70,7 @@ async def make_es_index(index=None):
|
||||||
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
|
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
|
||||||
)
|
)
|
||||||
await index.delete_index()
|
await index.delete_index()
|
||||||
|
await index.stop()
|
||||||
return await index.start()
|
return await index.start()
|
||||||
finally:
|
finally:
|
||||||
index.stop()
|
index.stop()
|
||||||
|
|
|
@ -88,7 +88,6 @@ class TestUsagePayment(CommandTestCase):
|
||||||
|
|
||||||
|
|
||||||
class TestESSync(CommandTestCase):
|
class TestESSync(CommandTestCase):
|
||||||
VERBOSITY = 'DEBUG'
|
|
||||||
async def test_es_sync_utility(self):
|
async def test_es_sync_utility(self):
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
await self.stream_create(f"stream{i}", bid='0.001')
|
await self.stream_create(f"stream{i}", bid='0.001')
|
||||||
|
@ -100,7 +99,17 @@ class TestESSync(CommandTestCase):
|
||||||
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
|
self.assertEqual(0, len(await self.claim_search(order_by=['height'])))
|
||||||
await db.search_index.stop()
|
await db.search_index.stop()
|
||||||
self.assertTrue(await make_es_index(db.search_index))
|
self.assertTrue(await make_es_index(db.search_index))
|
||||||
|
|
||||||
|
async def resync():
|
||||||
await db.search_index.start()
|
await db.search_index.start()
|
||||||
db.search_index.clear_caches()
|
db.search_index.clear_caches()
|
||||||
await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index)
|
await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index)
|
||||||
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
self.assertEqual(10, len(await self.claim_search(order_by=['height'])))
|
||||||
|
await resync()
|
||||||
|
|
||||||
|
# this time we will test a migration from unversioned to v1
|
||||||
|
await db.search_index.sync_client.indices.delete_template(db.search_index.index)
|
||||||
|
await db.search_index.stop()
|
||||||
|
self.assertTrue(await make_es_index(db.search_index))
|
||||||
|
await db.search_index.start()
|
||||||
|
await resync()
|
||||||
|
|
Loading…
Reference in a new issue