From 24d11de5a76fe1b7b2366ede082673fee1d7a961 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 11 Feb 2021 23:10:30 -0500 Subject: [PATCH] torba-elastic-sync --- docker/wallet_server_entrypoint.sh | 2 + .../wallet/server/db/elastic_sync.py | 44 +++++++++++++++---- setup.py | 1 + 3 files changed, 39 insertions(+), 8 deletions(-) rename scripts/sync.py => lbry/wallet/server/db/elastic_sync.py (76%) diff --git a/docker/wallet_server_entrypoint.sh b/docker/wallet_server_entrypoint.sh index 8bcbd8a96..b33ff87a7 100755 --- a/docker/wallet_server_entrypoint.sh +++ b/docker/wallet_server_entrypoint.sh @@ -20,4 +20,6 @@ if [[ -n "$SNAPSHOT_URL" ]] && [[ ! -f /database/claims.db ]]; then rm "$filename" fi +/home/lbry/.local/bin/torba-elastic-sync /database/claims.db +echo 'starting server' /home/lbry/.local/bin/torba-server "$@" diff --git a/scripts/sync.py b/lbry/wallet/server/db/elastic_sync.py similarity index 76% rename from scripts/sync.py rename to lbry/wallet/server/db/elastic_sync.py index 109e79cbb..fd3d17a20 100644 --- a/scripts/sync.py +++ b/lbry/wallet/server/db/elastic_sync.py @@ -41,9 +41,25 @@ WHERE claim.height % {shards_total} = {shard_num} async def consume(producer): es = AsyncElasticsearch() - await async_bulk(es, producer, request_timeout=120) - await es.indices.refresh(index=INDEX) - await es.close() + try: + await async_bulk(es, producer, request_timeout=120) + await es.indices.refresh(index=INDEX) + finally: + await es.close() + + +async def make_es_index(): + es = AsyncElasticsearch() + try: + if await es.indices.exists(index=INDEX): + print("already synced ES") + return 1 + index = SearchIndex('') + await index.start() + await index.stop() + return 0 + finally: + await es.close() async def run(args, shard): @@ -53,26 +69,38 @@ async def run(args, shard): index = SearchIndex('') await index.start() await index.stop() + producer = get_all(db.cursor(), shard, args.clients) await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients)))) + def __run(args, shard): asyncio.run(run(args, shard)) -def main(): +def __make_index(): + return asyncio.run(make_es_index()) + + +def run_elastic_sync(): parser = argparse.ArgumentParser() parser.add_argument("db_path", type=str) parser.add_argument("-c", "--clients", type=int, default=16) args = parser.parse_args() processes = [] + + init_proc = Process(target=__make_index, args=()) + init_proc.start() + init_proc.join() + exitcode = init_proc.exitcode + init_proc.close() + if exitcode: + print("ES is already initialized") + return + print("bulk-loading ES") for i in range(args.clients): processes.append(Process(target=__run, args=(args, i))) processes[-1].start() for process in processes: process.join() process.close() - - -if __name__ == '__main__': - main() diff --git a/setup.py b/setup.py index 59af5be45..56a42c7e4 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ setup( 'lbrynet=lbry.extras.cli:main', 'torba-server=lbry.wallet.server.cli:main', 'orchstr8=lbry.wallet.orchstr8.cli:main', + 'torba-elastic-sync=lbry.wallet.server.db.elastic_sync:run_elastic_sync' ], }, install_requires=[