From 43a1385b79126ae9b2477e5a00027b43a9758d2c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 11 May 2021 21:38:05 -0300 Subject: [PATCH 1/3] test sync helper --- lbry/wallet/server/db/elasticsearch/search.py | 3 +++ lbry/wallet/server/db/elasticsearch/sync.py | 25 +++++++++---------- .../blockchain/test_wallet_server_sessions.py | 23 ++++++++++++++--- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 6e79cb39e..302512082 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -162,6 +162,9 @@ class SearchIndex: await self.sync_client.update_by_query( self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) await self.sync_client.indices.refresh(self.index) + self.clear_caches() + + def clear_caches(self): self.search_cache.clear() self.short_id_cache.clear() self.claim_cache.clear() diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 2ca7644f6..8e8134e09 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -12,10 +12,8 @@ from lbry.wallet.server.env import Env from lbry.wallet.server.coin import LBC from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch -INDEX = 'claims' - -async def get_all(db, shard_num, shards_total, limit=0): +async def get_all(db, shard_num, shards_total, limit=0, index_name='claims'): logging.info("shard %d starting", shard_num) def exec_factory(cursor, statement, bindings): tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) @@ -44,25 +42,26 @@ ORDER BY claim.height desc claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] if num % 10_000 == 0: logging.info("%d/%d", num, total) - yield extract_doc(claim, INDEX) + yield extract_doc(claim, index_name) if 0 < limit <= num: break -async def consume(producer): +async def consume(producer, index_name): env = Env(LBC) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) try: await async_bulk(es, producer, request_timeout=120) - await es.indices.refresh(index=INDEX) + print(await es.indices.refresh(index=index_name)) finally: await es.close() -async def make_es_index(): +async def make_es_index(index=None): env = Env(LBC) - index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) + if index is None: + index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port) try: return await index.start() @@ -76,21 +75,21 @@ async def make_es_index(): index.stop() -async def run(args, shard): +async def run(db_path, clients, blocks, shard, index_name='claims'): def itsbusy(*_): logging.info("shard %d: db is busy, retry", shard) return True - db = apsw.Connection(args.db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) + db = apsw.Connection(db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) db.setbusyhandler(itsbusy) db.cursor().execute('pragma journal_mode=wal;') db.cursor().execute('pragma temp_store=memory;') - producer = get_all(db.cursor(), shard, args.clients, limit=args.blocks) - await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients)))) + producer = get_all(db.cursor(), shard, clients, limit=blocks, index_name=index_name) + await asyncio.gather(*(consume(producer, index_name=index_name) for _ in range(min(8, clients)))) def __run(args, shard): - asyncio.run(run(args, shard)) + asyncio.run(run(args.db_path, args.clients, args.blocks, shard)) def run_elastic_sync(): diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index b0a770558..f4b3db185 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -4,6 +4,7 @@ import lbry import lbry.wallet from lbry.error import ServerPaymentFeeAboveMaxAllowedError from lbry.wallet.network import ClientSession +from lbry.wallet.server.db.elasticsearch.sync import run as run_sync, make_es_index from lbry.wallet.server.session import LBRYElectrumX from lbry.testcase import IntegrationTestCase, CommandTestCase from lbry.wallet.orchstr8.node import SPVNode @@ -13,9 +14,6 @@ class TestSessions(IntegrationTestCase): """ Tests that server cleans up stale connections after session timeout and client times out too. """ - - LEDGER = lbry.wallet - async def test_session_bloat_from_socket_timeout(self): await self.conductor.stop_spv() await self.ledger.stop() @@ -87,3 +85,22 @@ class TestUsagePayment(CommandTestCase): self.assertIsNotNone(await self.blockchain.get_raw_transaction(tx.id)) # verify its broadcasted self.assertEqual(tx.outputs[0].amount, 100000000) self.assertEqual(tx.outputs[0].get_address(self.ledger), address) + + +class TestESSync(CommandTestCase): + VERBOSITY = 'DEBUG' + async def test_es_sync_utility(self): + for i in range(10): + await self.stream_create(f"stream{i}", bid='0.001') + await self.generate(1) + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + db = self.conductor.spv_node.server.db + await db.search_index.delete_index() + db.search_index.clear_caches() + self.assertEqual(0, len(await self.claim_search(order_by=['height']))) + await db.search_index.stop() + self.assertTrue(await make_es_index(db.search_index)) + await db.search_index.start() + db.search_index.clear_caches() + await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index) + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) From cc02a0efc26d94fe11511cbcd1940ca3f4ee02d1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 12 May 2021 00:21:03 -0300 Subject: [PATCH 2/3] fix es migration bug, expand test case --- lbry/wallet/server/db/elasticsearch/search.py | 4 ++-- lbry/wallet/server/db/elasticsearch/sync.py | 3 ++- .../blockchain/test_wallet_server_sessions.py | 17 +++++++++++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 302512082..02ed4d741 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -68,9 +68,9 @@ class SearchIndex: self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400 ) - async def start(self): + async def start(self) -> bool: if self.sync_client: - return + return False hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 8e8134e09..05d95dacd 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -53,7 +53,7 @@ async def consume(producer, index_name): es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) try: await async_bulk(es, producer, request_timeout=120) - print(await es.indices.refresh(index=index_name)) + await es.indices.refresh(index=index_name) finally: await es.close() @@ -70,6 +70,7 @@ async def make_es_index(index=None): "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version ) await index.delete_index() + await index.stop() return await index.start() finally: index.stop() diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index f4b3db185..a846b3178 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -88,7 +88,6 @@ class TestUsagePayment(CommandTestCase): class TestESSync(CommandTestCase): - VERBOSITY = 'DEBUG' async def test_es_sync_utility(self): for i in range(10): await self.stream_create(f"stream{i}", bid='0.001') @@ -100,7 +99,17 @@ class TestESSync(CommandTestCase): self.assertEqual(0, len(await self.claim_search(order_by=['height']))) await db.search_index.stop() self.assertTrue(await make_es_index(db.search_index)) + + async def resync(): + await db.search_index.start() + db.search_index.clear_caches() + await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index) + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + await resync() + + # this time we will test a migration from unversioned to v1 + await db.search_index.sync_client.indices.delete_template(db.search_index.index) + await db.search_index.stop() + self.assertTrue(await make_es_index(db.search_index)) await db.search_index.start() - db.search_index.clear_caches() - await run_sync(db.sql._db_path, 1, 0, 0, index_name=db.search_index.index) - self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + await resync() From ccadd88af5073ce8abb8a70bae8d1b3c0ea04554 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 12 May 2021 04:40:43 -0300 Subject: [PATCH 3/3] fix cache call --- lbry/wallet/server/db/elasticsearch/search.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/wallet/server/db/elasticsearch/search.py b/lbry/wallet/server/db/elasticsearch/search.py index 02ed4d741..ff9ccfcdb 100644 --- a/lbry/wallet/server/db/elasticsearch/search.py +++ b/lbry/wallet/server/db/elasticsearch/search.py @@ -162,7 +162,7 @@ class SearchIndex: await self.sync_client.update_by_query( self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) await self.sync_client.indices.refresh(self.index) - self.clear_caches() + self.clear_caches() def clear_caches(self): self.search_cache.clear()