improve sync script for no-downtime maintenance
This commit is contained in:
parent
a916c1f4ad
commit
ec89bcac8e
1 changed files with 9 additions and 4 deletions
|
@ -14,7 +14,7 @@ from lbry.wallet.server.db.elastic_search import extract_doc, SearchIndex
|
||||||
INDEX = 'claims'
|
INDEX = 'claims'
|
||||||
|
|
||||||
|
|
||||||
async def get_all(db, shard_num, shards_total):
|
async def get_all(db, shard_num, shards_total, limit=0):
|
||||||
logging.info("shard %d starting", shard_num)
|
logging.info("shard %d starting", shard_num)
|
||||||
def exec_factory(cursor, statement, bindings):
|
def exec_factory(cursor, statement, bindings):
|
||||||
tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
|
tpl = namedtuple('row', (d[0] for d in cursor.getdescription()))
|
||||||
|
@ -31,6 +31,7 @@ SELECT claimtrie.claim_hash as is_controlling,
|
||||||
claim.*
|
claim.*
|
||||||
FROM claim LEFT JOIN claimtrie USING (claim_hash)
|
FROM claim LEFT JOIN claimtrie USING (claim_hash)
|
||||||
WHERE claim.height % {shards_total} = {shard_num}
|
WHERE claim.height % {shards_total} = {shard_num}
|
||||||
|
ORDER BY claim.height desc
|
||||||
""")):
|
""")):
|
||||||
claim = dict(claim._asdict())
|
claim = dict(claim._asdict())
|
||||||
claim['censor_type'] = 0
|
claim['censor_type'] = 0
|
||||||
|
@ -40,6 +41,8 @@ WHERE claim.height % {shards_total} = {shard_num}
|
||||||
if num % 10_000 == 0:
|
if num % 10_000 == 0:
|
||||||
logging.info("%d/%d", num, total)
|
logging.info("%d/%d", num, total)
|
||||||
yield extract_doc(claim, INDEX)
|
yield extract_doc(claim, INDEX)
|
||||||
|
if 0 < limit <= num:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
async def consume(producer):
|
async def consume(producer):
|
||||||
|
@ -68,7 +71,7 @@ async def run(args, shard):
|
||||||
db.cursor().execute('pragma journal_mode=wal;')
|
db.cursor().execute('pragma journal_mode=wal;')
|
||||||
db.cursor().execute('pragma temp_store=memory;')
|
db.cursor().execute('pragma temp_store=memory;')
|
||||||
|
|
||||||
producer = get_all(db.cursor(), shard, args.clients)
|
producer = get_all(db.cursor(), shard, args.clients, limit=args.blocks)
|
||||||
await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients))))
|
await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients))))
|
||||||
|
|
||||||
|
|
||||||
|
@ -82,14 +85,16 @@ def run_elastic_sync():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("db_path", type=str)
|
parser.add_argument("db_path", type=str)
|
||||||
parser.add_argument("-c", "--clients", type=int, default=16)
|
parser.add_argument("-c", "--clients", type=int, default=16)
|
||||||
|
parser.add_argument("-b", "--blocks", type=int, default=0)
|
||||||
|
parser.add_argument("-f", "--force", default=False, action='store_true')
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
processes = []
|
processes = []
|
||||||
|
|
||||||
if not os.path.exists(args.db_path):
|
if not args.force and not os.path.exists(args.db_path):
|
||||||
logging.info("DB path doesnt exist")
|
logging.info("DB path doesnt exist")
|
||||||
return
|
return
|
||||||
|
|
||||||
if not asyncio.run(make_es_index()):
|
if not args.force and not asyncio.run(make_es_index()):
|
||||||
logging.info("ES is already initialized")
|
logging.info("ES is already initialized")
|
||||||
return
|
return
|
||||||
for i in range(args.clients):
|
for i in range(args.clients):
|
||||||
|
|
Loading…
Add table
Reference in a new issue