2021-01-20 01:41:54 -03:00
|
|
|
import argparse
|
|
|
|
import asyncio
|
2021-02-12 14:41:03 -03:00
|
|
|
import logging
|
2021-01-20 01:41:54 -03:00
|
|
|
from elasticsearch import AsyncElasticsearch
|
|
|
|
from elasticsearch.helpers import async_bulk
|
2021-03-29 13:16:49 -04:00
|
|
|
from lbry.wallet.server.env import Env
|
|
|
|
from lbry.wallet.server.coin import LBC
|
2021-06-17 21:22:23 -04:00
|
|
|
from lbry.wallet.server.leveldb import LevelDB
|
2021-05-07 12:42:52 -04:00
|
|
|
from lbry.wallet.server.db.elasticsearch.search import extract_doc, SearchIndex, IndexVersionMismatch
|
2021-01-20 01:41:54 -03:00
|
|
|
|
|
|
|
|
2021-06-17 21:22:23 -04:00
|
|
|
async def get_all_claims(index_name='claims', db=None):
|
2021-03-29 13:16:49 -04:00
|
|
|
env = Env(LBC)
|
2021-06-17 21:22:23 -04:00
|
|
|
need_open = db is None
|
|
|
|
db = db or LevelDB(env)
|
|
|
|
if need_open:
|
|
|
|
await db.open_dbs()
|
2021-06-17 21:30:31 -04:00
|
|
|
try:
|
|
|
|
cnt = 0
|
2021-07-27 16:11:27 -04:00
|
|
|
async for claim in db.all_claims_producer():
|
2021-06-17 21:30:31 -04:00
|
|
|
yield extract_doc(claim, index_name)
|
|
|
|
cnt += 1
|
|
|
|
if cnt % 10000 == 0:
|
|
|
|
print(f"{cnt} claims sent")
|
|
|
|
finally:
|
|
|
|
if need_open:
|
|
|
|
db.close()
|
2021-02-11 23:10:30 -05:00
|
|
|
|
|
|
|
|
2021-05-11 21:38:05 -03:00
|
|
|
async def make_es_index(index=None):
|
2021-03-29 13:16:49 -04:00
|
|
|
env = Env(LBC)
|
2021-05-11 21:38:05 -03:00
|
|
|
if index is None:
|
|
|
|
index = SearchIndex('', elastic_host=env.elastic_host, elastic_port=env.elastic_port)
|
2021-05-07 12:42:52 -04:00
|
|
|
|
2021-02-11 23:10:30 -05:00
|
|
|
try:
|
2021-02-12 14:41:03 -03:00
|
|
|
return await index.start()
|
2021-05-07 12:42:52 -04:00
|
|
|
except IndexVersionMismatch as err:
|
|
|
|
logging.info(
|
|
|
|
"dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version
|
|
|
|
)
|
|
|
|
await index.delete_index()
|
2021-05-12 00:21:03 -03:00
|
|
|
await index.stop()
|
2021-05-07 12:42:52 -04:00
|
|
|
return await index.start()
|
2021-02-11 23:10:30 -05:00
|
|
|
finally:
|
2021-02-12 14:41:03 -03:00
|
|
|
index.stop()
|
2021-01-24 23:19:28 -03:00
|
|
|
|
|
|
|
|
2021-07-21 12:54:10 -04:00
|
|
|
async def run_sync(index_name='claims', db=None, clients=32):
|
2021-06-17 21:22:23 -04:00
|
|
|
env = Env(LBC)
|
|
|
|
logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port)
|
|
|
|
es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}])
|
2021-07-21 12:54:10 -04:00
|
|
|
claim_generator = get_all_claims(index_name=index_name, db=db)
|
2021-06-17 21:22:23 -04:00
|
|
|
try:
|
2021-07-21 12:54:10 -04:00
|
|
|
await asyncio.gather(*(async_bulk(es, claim_generator, request_timeout=600) for _ in range(clients)))
|
2021-06-17 21:22:23 -04:00
|
|
|
await es.indices.refresh(index=index_name)
|
|
|
|
finally:
|
|
|
|
await es.close()
|
2021-01-27 01:43:06 -03:00
|
|
|
|
2021-02-11 23:10:30 -05:00
|
|
|
|
|
|
|
def run_elastic_sync():
|
2021-02-12 14:41:03 -03:00
|
|
|
logging.basicConfig(level=logging.INFO)
|
2021-03-29 13:16:49 -04:00
|
|
|
logging.getLogger('aiohttp').setLevel(logging.WARNING)
|
|
|
|
logging.getLogger('elasticsearch').setLevel(logging.WARNING)
|
|
|
|
|
2021-02-12 14:41:03 -03:00
|
|
|
logging.info('lbry.server starting')
|
2021-03-24 17:07:17 -03:00
|
|
|
parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync")
|
2021-06-17 21:22:23 -04:00
|
|
|
# parser.add_argument("db_path", type=str)
|
2021-07-21 12:54:10 -04:00
|
|
|
parser.add_argument("-c", "--clients", type=int, default=32)
|
2021-02-17 01:09:12 -03:00
|
|
|
parser.add_argument("-b", "--blocks", type=int, default=0)
|
|
|
|
parser.add_argument("-f", "--force", default=False, action='store_true')
|
2021-01-27 01:43:06 -03:00
|
|
|
args = parser.parse_args()
|
2021-02-11 23:10:30 -05:00
|
|
|
|
2021-06-17 21:22:23 -04:00
|
|
|
# if not args.force and not os.path.exists(args.db_path):
|
|
|
|
# logging.info("DB path doesnt exist")
|
|
|
|
# return
|
2021-02-16 12:52:32 -03:00
|
|
|
|
2021-02-17 01:09:12 -03:00
|
|
|
if not args.force and not asyncio.run(make_es_index()):
|
2021-02-12 14:41:03 -03:00
|
|
|
logging.info("ES is already initialized")
|
2021-02-11 23:10:30 -05:00
|
|
|
return
|
2021-07-21 12:54:10 -04:00
|
|
|
asyncio.run(run_sync(clients=args.clients))
|