diff --git a/lbry/console.py b/lbry/console.py index 4e775444c..d56406c0b 100644 --- a/lbry/console.py +++ b/lbry/console.py @@ -2,6 +2,7 @@ import os import sys import time import itertools +import logging from typing import Dict, Any from tempfile import TemporaryFile @@ -14,6 +15,9 @@ from lbry.service.full_node import FullNode from lbry.service.light_client import LightClient +log = logging.getLogger(__name__) + + class RedirectOutput: silence_lines = [ @@ -80,6 +84,8 @@ class Basic(Console): def __init__(self, service: Service): super().__init__(service) self.service.sync.on_progress.listen(self.on_sync_progress) + self.tasks = {} + logging.getLogger().setLevel(logging.INFO) def starting(self): conf = self.service.conf @@ -94,14 +100,44 @@ class Basic(Console): workers = os.cpu_count() if conf.workers == 0 else conf.workers s.append(f'{workers} Worker' if workers == 1 else f'{workers} Workers') s.append(f'({os.cpu_count()} CPUs available)') - print(' '.join(s)) + log.info(' '.join(s)) def stopping(self): - print('bye.') + log.info('exiting') @staticmethod - def on_sync_progress(event): - print(event) + def maybe_log_progress(event, done, total, last): + if done == 0: + log.info("%s 0%%", event) + return 0 + elif done == total: + log.info("%s 100%%", event) + return 1 + else: + percent = done/total + if percent >= 0.25 > last: + log.info("%s 25%%", event) + return 0.25 + elif percent >= 0.50 > last: + log.info("%s 50%%", event) + return 0.50 + elif percent >= 0.75 > last: + log.info("%s 75%%", event) + return 0.75 + return last + + def on_sync_progress(self, event): + e, data = event["event"], event["data"] + name, current, total, last = e, data['done'][0], 0, 0 + if not e.endswith("init") and not e.endswith("main") and not e.endswith("indexes"): + name = f"{e}#{data['id']}" + if "total" in data: + total, last = self.tasks[name] = (data["total"][0], last) + elif name in self.tasks: + total, last = self.tasks[name] + elif total == 0: + return + self.tasks[name] = (total, self.maybe_log_progress(name, current, total, last)) class Bar2(Bar): diff --git a/scripts/simulate_sync_console.py b/scripts/simulate_sync_console.py index a5a065199..c19712e8f 100644 --- a/scripts/simulate_sync_console.py +++ b/scripts/simulate_sync_console.py @@ -9,7 +9,7 @@ from lbry import Config, Ledger, FullNode from lbry.console import Advanced, Basic from lbry.schema.claim import Claim from lbry.db.utils import chunk -from lbry.db.query_context import Event +from lbry.testcase import EventGenerator def cause_protobuf_stderr(): @@ -31,7 +31,7 @@ class Simulator: self.starting_height = 0 self.ending_height = 0 self.starting_file = 0 - self.processes = console.service.db.processes + self.workers = console.service.db.workers self.txs = 0 self.claims = 0 @@ -77,7 +77,7 @@ class Simulator: "data": {"id": 0, "done": (0,), "total": (total,), "units": (unit,)} }) tasks = [] - for group_range in self.make_ranges(total, max(int(total/self.processes), 1)): + for group_range in self.make_ranges(total, max(int(total/self.workers), 1)): tasks.append(self.generate( f"{name}.insert", (unit,), group_range[0], f"add {unit} at {group_range[0]}-{group_range[1]}", @@ -104,23 +104,40 @@ class Simulator: self.txs = txs self.claims = int(txs/4) self.supports = int(txs/2) - await self.generate("blockchain.sync.block.init", ("steps",), 0, None, (3,), (1,)) - await self.progress.add({ - "event": "blockchain.sync.block.main", - "data": { - "id": 0, - "done": (0, 0), - "total": (self.blocks, self.txs), - "units": ("blocks", "txs"), - "starting_height": self.starting_height, - "ending_height": ending_height, - "files": len(files), - "claims": self.claims, - "supports": self.supports, - } - }) + eg = EventGenerator( + initial_sync=initial_sync, + start=self.starting_height, + end=ending_height, + block_files=[ + (0, 191, 280, ((100, 0), (191, 280))), + (1, 89, 178, ((89, 178),)), + (2, 73, 86, ((73, 86),)), + (3, 73, 86, ((73, 86),)), + (4, 73, 86, ((73, 86),)), + (5, 73, 86, ((73, 86),)), + ], + claims=[ + (102, 120, 361, 361), + (121, 139, 361, 361), + (140, 158, 361, 361), + (159, 177, 361, 361), + (178, 196, 361, 361), + (197, 215, 361, 361), + (216, 234, 361, 361), + (235, 253, 361, 361), + (254, 272, 361, 361), + (273, 291, 361, 361), + ], + supports=[ + (352, 352, 2, 2), + ] + ) + for event in eg.events: + await self.progress.add(event) + await asyncio.sleep(0.5) + return blocks_synced = txs_synced = 0 - for file_group in chunk(files, self.processes): + for file_group in chunk(files, self.workers): tasks = [] for file in file_group: if file == files[-1]: @@ -141,20 +158,6 @@ class Simulator: (blocks, txs), (50, 100) )) await asyncio.wait(tasks) - await self.close_event("blockchain.sync.block.main") - return - if initial_sync: - await self.generate("blockchain.sync.txoi.main", ("steps",), 0, None, (9,), (1,)) - else: - await self.generate("blockchain.sync.txoi.main", ("steps",), 0, None, (2,), (1,)) - if initial_sync: - await self.generate_group("blockchain.sync.claims", "claims", 4, self.claims, 100) - else: - await self.generate_group("blockchain.sync.claims", "claims", 2, self.claims, 100) - if initial_sync: - await self.generate_group("blockchain.sync.supports", "supports", 4, self.supports, 100) - else: - await self.generate_group("blockchain.sync.supports", "supports", 2, self.supports, 100) self.ending_height = ending_height+1 self.starting_height = self.ending_height @@ -169,11 +172,11 @@ async def main(console): if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--basic", default=False, action="store_true") - parser.add_argument("--processes", default=3) + parser.add_argument("--workers", default=3) args = parser.parse_args() node = FullNode(Ledger(Config( - processes=args.processes, + workers=args.workers, spv_address_filters=False ))) console = Basic(node) if args.basic else Advanced(node)