From ec89bcac8e466df095fafefbaea30e5a7d658841 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 17 Feb 2021 01:09:12 -0300 Subject: [PATCH] improve sync script for no-downtime maintenance --- lbry/wallet/server/db/elastic_sync.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/lbry/wallet/server/db/elastic_sync.py b/lbry/wallet/server/db/elastic_sync.py index 9a49da2c9..b5ccce1bd 100644 --- a/lbry/wallet/server/db/elastic_sync.py +++ b/lbry/wallet/server/db/elastic_sync.py @@ -14,7 +14,7 @@ from lbry.wallet.server.db.elastic_search import extract_doc, SearchIndex INDEX = 'claims' -async def get_all(db, shard_num, shards_total): +async def get_all(db, shard_num, shards_total, limit=0): logging.info("shard %d starting", shard_num) def exec_factory(cursor, statement, bindings): tpl = namedtuple('row', (d[0] for d in cursor.getdescription())) @@ -31,6 +31,7 @@ SELECT claimtrie.claim_hash as is_controlling, claim.* FROM claim LEFT JOIN claimtrie USING (claim_hash) WHERE claim.height % {shards_total} = {shard_num} +ORDER BY claim.height desc """)): claim = dict(claim._asdict()) claim['censor_type'] = 0 @@ -40,6 +41,8 @@ WHERE claim.height % {shards_total} = {shard_num} if num % 10_000 == 0: logging.info("%d/%d", num, total) yield extract_doc(claim, INDEX) + if 0 < limit <= num: + break async def consume(producer): @@ -68,7 +71,7 @@ async def run(args, shard): db.cursor().execute('pragma journal_mode=wal;') db.cursor().execute('pragma temp_store=memory;') - producer = get_all(db.cursor(), shard, args.clients) + producer = get_all(db.cursor(), shard, args.clients, limit=args.blocks) await asyncio.gather(*(consume(producer) for _ in range(min(8, args.clients)))) @@ -82,14 +85,16 @@ def run_elastic_sync(): parser = argparse.ArgumentParser() parser.add_argument("db_path", type=str) parser.add_argument("-c", "--clients", type=int, default=16) + parser.add_argument("-b", "--blocks", type=int, default=0) + parser.add_argument("-f", "--force", default=False, action='store_true') args = parser.parse_args() processes = [] - if not os.path.exists(args.db_path): + if not args.force and not os.path.exists(args.db_path): logging.info("DB path doesnt exist") return - if not asyncio.run(make_es_index()): + if not args.force and not asyncio.run(make_es_index()): logging.info("ES is already initialized") return for i in range(args.clients):