diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 6a2c4113a..15e076de9 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -45,21 +45,18 @@ async def make_es_index(index=None): index.stop() -async def run_sync(index_name='claims', db=None): +async def run_sync(index_name='claims', db=None, clients=32): env = Env(LBC) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) + claim_generator = get_all_claims(index_name=index_name, db=db) try: - await async_bulk(es, get_all_claims(index_name=index_name, db=db), request_timeout=120) + await asyncio.gather(*(async_bulk(es, claim_generator, request_timeout=600) for _ in range(clients))) await es.indices.refresh(index=index_name) finally: await es.close() -def __run(args, shard): - asyncio.run(run_sync()) - - def run_elastic_sync(): logging.basicConfig(level=logging.INFO) logging.getLogger('aiohttp').setLevel(logging.WARNING) @@ -68,7 +65,7 @@ def run_elastic_sync(): logging.info('lbry.server starting') parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") # parser.add_argument("db_path", type=str) - parser.add_argument("-c", "--clients", type=int, default=16) + parser.add_argument("-c", "--clients", type=int, default=32) parser.add_argument("-b", "--blocks", type=int, default=0) parser.add_argument("-f", "--force", default=False, action='store_true') args = parser.parse_args() @@ -80,4 +77,4 @@ def run_elastic_sync(): if not args.force and not asyncio.run(make_es_index()): logging.info("ES is already initialized") return - asyncio.run(run_sync()) + asyncio.run(run_sync(clients=args.clients))