forked from LBRYCommunity/lbry-sdk
basic logging
This commit is contained in:
parent
8f2cce7f61
commit
895719a13d
2 changed files with 78 additions and 39 deletions
|
@ -2,6 +2,7 @@ import os
|
|||
import sys
|
||||
import time
|
||||
import itertools
|
||||
import logging
|
||||
from typing import Dict, Any
|
||||
from tempfile import TemporaryFile
|
||||
|
||||
|
@ -14,6 +15,9 @@ from lbry.service.full_node import FullNode
|
|||
from lbry.service.light_client import LightClient
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RedirectOutput:
|
||||
|
||||
silence_lines = [
|
||||
|
@ -80,6 +84,8 @@ class Basic(Console):
|
|||
def __init__(self, service: Service):
|
||||
super().__init__(service)
|
||||
self.service.sync.on_progress.listen(self.on_sync_progress)
|
||||
self.tasks = {}
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
|
||||
def starting(self):
|
||||
conf = self.service.conf
|
||||
|
@ -94,14 +100,44 @@ class Basic(Console):
|
|||
workers = os.cpu_count() if conf.workers == 0 else conf.workers
|
||||
s.append(f'{workers} Worker' if workers == 1 else f'{workers} Workers')
|
||||
s.append(f'({os.cpu_count()} CPUs available)')
|
||||
print(' '.join(s))
|
||||
log.info(' '.join(s))
|
||||
|
||||
def stopping(self):
|
||||
print('bye.')
|
||||
log.info('exiting')
|
||||
|
||||
@staticmethod
|
||||
def on_sync_progress(event):
|
||||
print(event)
|
||||
def maybe_log_progress(event, done, total, last):
|
||||
if done == 0:
|
||||
log.info("%s 0%%", event)
|
||||
return 0
|
||||
elif done == total:
|
||||
log.info("%s 100%%", event)
|
||||
return 1
|
||||
else:
|
||||
percent = done/total
|
||||
if percent >= 0.25 > last:
|
||||
log.info("%s 25%%", event)
|
||||
return 0.25
|
||||
elif percent >= 0.50 > last:
|
||||
log.info("%s 50%%", event)
|
||||
return 0.50
|
||||
elif percent >= 0.75 > last:
|
||||
log.info("%s 75%%", event)
|
||||
return 0.75
|
||||
return last
|
||||
|
||||
def on_sync_progress(self, event):
|
||||
e, data = event["event"], event["data"]
|
||||
name, current, total, last = e, data['done'][0], 0, 0
|
||||
if not e.endswith("init") and not e.endswith("main") and not e.endswith("indexes"):
|
||||
name = f"{e}#{data['id']}"
|
||||
if "total" in data:
|
||||
total, last = self.tasks[name] = (data["total"][0], last)
|
||||
elif name in self.tasks:
|
||||
total, last = self.tasks[name]
|
||||
elif total == 0:
|
||||
return
|
||||
self.tasks[name] = (total, self.maybe_log_progress(name, current, total, last))
|
||||
|
||||
|
||||
class Bar2(Bar):
|
||||
|
|
|
@ -9,7 +9,7 @@ from lbry import Config, Ledger, FullNode
|
|||
from lbry.console import Advanced, Basic
|
||||
from lbry.schema.claim import Claim
|
||||
from lbry.db.utils import chunk
|
||||
from lbry.db.query_context import Event
|
||||
from lbry.testcase import EventGenerator
|
||||
|
||||
|
||||
def cause_protobuf_stderr():
|
||||
|
@ -31,7 +31,7 @@ class Simulator:
|
|||
self.starting_height = 0
|
||||
self.ending_height = 0
|
||||
self.starting_file = 0
|
||||
self.processes = console.service.db.processes
|
||||
self.workers = console.service.db.workers
|
||||
|
||||
self.txs = 0
|
||||
self.claims = 0
|
||||
|
@ -77,7 +77,7 @@ class Simulator:
|
|||
"data": {"id": 0, "done": (0,), "total": (total,), "units": (unit,)}
|
||||
})
|
||||
tasks = []
|
||||
for group_range in self.make_ranges(total, max(int(total/self.processes), 1)):
|
||||
for group_range in self.make_ranges(total, max(int(total/self.workers), 1)):
|
||||
tasks.append(self.generate(
|
||||
f"{name}.insert", (unit,),
|
||||
group_range[0], f"add {unit} at {group_range[0]}-{group_range[1]}",
|
||||
|
@ -104,23 +104,40 @@ class Simulator:
|
|||
self.txs = txs
|
||||
self.claims = int(txs/4)
|
||||
self.supports = int(txs/2)
|
||||
await self.generate("blockchain.sync.block.init", ("steps",), 0, None, (3,), (1,))
|
||||
await self.progress.add({
|
||||
"event": "blockchain.sync.block.main",
|
||||
"data": {
|
||||
"id": 0,
|
||||
"done": (0, 0),
|
||||
"total": (self.blocks, self.txs),
|
||||
"units": ("blocks", "txs"),
|
||||
"starting_height": self.starting_height,
|
||||
"ending_height": ending_height,
|
||||
"files": len(files),
|
||||
"claims": self.claims,
|
||||
"supports": self.supports,
|
||||
}
|
||||
})
|
||||
eg = EventGenerator(
|
||||
initial_sync=initial_sync,
|
||||
start=self.starting_height,
|
||||
end=ending_height,
|
||||
block_files=[
|
||||
(0, 191, 280, ((100, 0), (191, 280))),
|
||||
(1, 89, 178, ((89, 178),)),
|
||||
(2, 73, 86, ((73, 86),)),
|
||||
(3, 73, 86, ((73, 86),)),
|
||||
(4, 73, 86, ((73, 86),)),
|
||||
(5, 73, 86, ((73, 86),)),
|
||||
],
|
||||
claims=[
|
||||
(102, 120, 361, 361),
|
||||
(121, 139, 361, 361),
|
||||
(140, 158, 361, 361),
|
||||
(159, 177, 361, 361),
|
||||
(178, 196, 361, 361),
|
||||
(197, 215, 361, 361),
|
||||
(216, 234, 361, 361),
|
||||
(235, 253, 361, 361),
|
||||
(254, 272, 361, 361),
|
||||
(273, 291, 361, 361),
|
||||
],
|
||||
supports=[
|
||||
(352, 352, 2, 2),
|
||||
]
|
||||
)
|
||||
for event in eg.events:
|
||||
await self.progress.add(event)
|
||||
await asyncio.sleep(0.5)
|
||||
return
|
||||
blocks_synced = txs_synced = 0
|
||||
for file_group in chunk(files, self.processes):
|
||||
for file_group in chunk(files, self.workers):
|
||||
tasks = []
|
||||
for file in file_group:
|
||||
if file == files[-1]:
|
||||
|
@ -141,20 +158,6 @@ class Simulator:
|
|||
(blocks, txs), (50, 100)
|
||||
))
|
||||
await asyncio.wait(tasks)
|
||||
await self.close_event("blockchain.sync.block.main")
|
||||
return
|
||||
if initial_sync:
|
||||
await self.generate("blockchain.sync.txoi.main", ("steps",), 0, None, (9,), (1,))
|
||||
else:
|
||||
await self.generate("blockchain.sync.txoi.main", ("steps",), 0, None, (2,), (1,))
|
||||
if initial_sync:
|
||||
await self.generate_group("blockchain.sync.claims", "claims", 4, self.claims, 100)
|
||||
else:
|
||||
await self.generate_group("blockchain.sync.claims", "claims", 2, self.claims, 100)
|
||||
if initial_sync:
|
||||
await self.generate_group("blockchain.sync.supports", "supports", 4, self.supports, 100)
|
||||
else:
|
||||
await self.generate_group("blockchain.sync.supports", "supports", 2, self.supports, 100)
|
||||
self.ending_height = ending_height+1
|
||||
self.starting_height = self.ending_height
|
||||
|
||||
|
@ -169,11 +172,11 @@ async def main(console):
|
|||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--basic", default=False, action="store_true")
|
||||
parser.add_argument("--processes", default=3)
|
||||
parser.add_argument("--workers", default=3)
|
||||
args = parser.parse_args()
|
||||
|
||||
node = FullNode(Ledger(Config(
|
||||
processes=args.processes,
|
||||
workers=args.workers,
|
||||
spv_address_filters=False
|
||||
)))
|
||||
console = Basic(node) if args.basic else Advanced(node)
|
||||
|
|
Loading…
Reference in a new issue