diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 302512082..02ed4d741 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -68,9 +68,9 @@ class SearchIndex: self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400 ) - async def start(self): + async def start(self) -> bool: if self.sync_client: - return + return False hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 8e8134e09..05d95dacd 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -53,7 +53,7 @@ async def consume(producer, index_name): es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) try: await async_bulk(es, producer, request_timeout=120) - print(await es.indices.refresh(index=index_name)) + await es.indices.refresh(index=index_name) finally: await es.close() @@ -70,6 +70,7 @@ async def make_es_index(index=None): "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version ) await index.delete_index() + await index.stop() return await index.start() finally: index.stop() diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index f4b3db185..a846b3178 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -88,7 +88,6 @@ class TestUsagePayment(CommandTestCase): class TestESSync(CommandTestCase): - VERBOSITY = 'DEBUG' async def test_es_sync_utility(self): for i in range(10): await self.stream_create(f"stream{i}", bid='0.001') @@ -100,7 +99,17 @@ class TestESSync(CommandTestCase): self.assertEqual(0, len(await self.claim_search(order_by=['height']))) await db.search_index.stop() self.assertTrue(await make_es_index(db.search_index)) + + async def resync(): + await db.search_index.start() + db.search_index.clear_caches() + await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index) + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + await resync() + + # this time we will test a migration from unversioned to v1 + await db.search_index.sync_client.indices.delete_template(db.search_index.index) + await db.search_index.stop() + self.assertTrue(await make_es_index(db.search_index)) await db.search_index.start() - db.search_index.clear_caches() - await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index) - self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + await resync()