faster es sync

This commit is contained in:
Jack Robison 2021-07-21 12:54:10 -04:00 committed by Victor Shyba
parent fb5c008fc5
commit 7ad2234983

View file

@ -45,21 +45,18 @@ async def make_es_index(index=None):
index.stop() index.stop()
async def run_sync(index_name='claims', db=None): async def run_sync(index_name='claims', db=None, clients=32):
env = Env(LBC) env = Env(LBC)
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
claim_generator = get_all_claims(index_name=index_name, db=db)
try: try:
await async_bulk(es, get_all_claims(index_name=index_name, db=db), request_timeout=120) await asyncio.gather(*(async_bulk(es, claim_generator, request_timeout=600) for _ in range(clients)))
await es.indices.refresh(index=index_name) await es.indices.refresh(index=index_name)
finally: finally:
await es.close() await es.close()
def __run(args, shard):
asyncio.run(run_sync())
def run_elastic_sync(): def run_elastic_sync():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logging.getLogger('aiohttp').setLevel(logging.WARNING) logging.getLogger('aiohttp').setLevel(logging.WARNING)
@ -68,7 +65,7 @@ def run_elastic_sync():
logging.info('lbry.server starting') logging.info('lbry.server starting')
parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync")
# parser.add_argument("db_path", type=str) # parser.add_argument("db_path", type=str)
parser.add_argument("-c", "--clients", type=int, default=16) parser.add_argument("-c", "--clients", type=int, default=32)
parser.add_argument("-b", "--blocks", type=int, default=0) parser.add_argument("-b", "--blocks", type=int, default=0)
parser.add_argument("-f", "--force", default=False, action='store_true') parser.add_argument("-f", "--force", default=False, action='store_true')
args = parser.parse_args() args = parser.parse_args()
@ -80,4 +77,4 @@ def run_elastic_sync():
if not args.force and not asyncio.run(make_es_index()): if not args.force and not asyncio.run(make_es_index()):
logging.info("ES is already initialized") logging.info("ES is already initialized")
return return
asyncio.run(run_sync()) asyncio.run(run_sync(clients=args.clients))