From 67817005b536341cff919766d3456b9af4b1bd20 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 12 Feb 2021 14:41:03 -0300 Subject: [PATCH] check ES synced without a process and wait for ES --- lbry/wallet/server/db/elastic_search.py | 3 +- lbry/wallet/server/db/elastic_sync.py | 39 +++++++++---------------- 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 50b04ba9b..c0d6783f2 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -35,7 +35,7 @@ class SearchIndex: except ConnectionError: self.logger.warning("Failed to connect to Elasticsearch. Waiting for it!") await asyncio.sleep(1) - await self.client.indices.create( + res = await self.client.indices.create( self.index, { "settings": @@ -70,6 +70,7 @@ class SearchIndex: } }, ignore=400 ) + return res.get('acknowledged', False) def stop(self): client = self.client diff --git a/lbry/wallet/server/db/elastic_sync.py b/lbry/wallet/server/db/elastic_sync.py index fd3d17a20..21cf0b353 100644 --- a/lbry/wallet/server/db/elastic_sync.py +++ b/lbry/wallet/server/db/elastic_sync.py @@ -1,5 +1,6 @@ import argparse import asyncio +import logging from collections import namedtuple from multiprocessing import Process @@ -13,6 +14,7 @@ INDEX = 'claims' async def get_all(db, shard_num, shards_total): + logging.info("shard %d starting", shard_num) def exec_factory(cursor, statement, bindings): tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) cursor.setrowtrace(lambda cursor, row: tpl(*row)) @@ -35,7 +37,7 @@ WHERE claim.height % {shards_total} = {shard_num} claim['tags'] = claim['tags'].split(',,') if claim['tags'] else [] claim['languages'] = claim['languages'].split(' ') if claim['languages'] else [] if num % 10_000 == 0: - print(num, total) + logging.info("%d/%d", num, total) yield extract_doc(claim, INDEX) @@ -49,26 +51,21 @@ async def consume(producer): async def make_es_index(): - es = AsyncElasticsearch() + index = SearchIndex('') try: - if await es.indices.exists(index=INDEX): - print("already synced ES") - return 1 - index = SearchIndex('') - await index.start() - await index.stop() - return 0 + return await index.start() finally: - await es.close() + index.stop() async def run(args, shard): + def itsbusy(): + logging.info("shard %d: db is busy, retry") + return True db = apsw.Connection(args.db_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) + db.setbusyhandler(itsbusy) db.cursor().execute('pragma journal_mode=wal;') db.cursor().execute('pragma temp_store=memory;') - index = SearchIndex('') - await index.start() - await index.stop() producer = get_all(db.cursor(), shard, args.clients) await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients)))) @@ -78,26 +75,18 @@ def __run(args, shard): asyncio.run(run(args, shard)) -def __make_index(): - return asyncio.run(make_es_index()) - - def run_elastic_sync(): + logging.basicConfig(level=logging.INFO) + logging.info('lbry.server starting') parser = argparse.ArgumentParser() parser.add_argument("db_path", type=str) parser.add_argument("-c", "--clients", type=int, default=16) args = parser.parse_args() processes = [] - init_proc = Process(target=__make_index, args=()) - init_proc.start() - init_proc.join() - exitcode = init_proc.exitcode - init_proc.close() - if exitcode: - print("ES is already initialized") + if not asyncio.run(make_es_index()): + logging.info("ES is already initialized") return - print("bulk-loading ES") for i in range(args.clients): processes.append(Process(target=__run, args=(args, i))) processes[-1].start()