console improvements

This commit is contained in:
Lex Berezhny 2020-06-26 21:52:01 -04:00
parent 5a0a987f0c
commit 4b19861a74
6 changed files with 379 additions and 253 deletions

View file

@ -24,7 +24,13 @@ from lbry.blockchain.block import Block, create_block_filter
from lbry.blockchain.bcd_data_stream import BCDataStream from lbry.blockchain.bcd_data_stream import BCDataStream
from lbry.blockchain.transaction import Output from lbry.blockchain.transaction import Output
from .queries import * from .queries import (
select, Claim, Support,
TXO_TYPES, CLAIM_TYPE_CODES,
channel_content_count_calc,
staked_support_amount_calc, staked_support_count_calc,
select_unvalidated_signables, get_unvalidated_signable_count
)
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -32,19 +38,20 @@ _chain: ContextVar[Lbrycrd] = ContextVar('chain')
SYNC_STEPS = { SYNC_STEPS = {
"initial_sync": 0, "initial_sync": [],
"ongoing_sync": 0, "ongoing_sync": [],
"events": [], "events": [],
} }
def sync_step(event: Event, step_size=1, initial_sync=False, ongoing_sync=False): def sync_step(event: Event, step_size=1, initial_sync=False, ongoing_sync=False):
assert event not in SYNC_STEPS['events'], f"Event {event} used more than once." assert event.label not in SYNC_STEPS['events'], f"Event {event.label} used more than once."
assert initial_sync or ongoing_sync, "At least one of initial_sync or ongoing_sync must be true." assert initial_sync or ongoing_sync, "At least one of initial_sync or ongoing_sync must be true."
SYNC_STEPS['events'].append(event.label)
if initial_sync: if initial_sync:
SYNC_STEPS['initial_sync'] += 1 SYNC_STEPS['initial_sync'].append(event.label)
if ongoing_sync: if ongoing_sync:
SYNC_STEPS['ongoing_sync'] += 1 SYNC_STEPS['ongoing_sync'].append(event.label)
def wrapper(f): def wrapper(f):
@functools.wraps(f) @functools.wraps(f)
@ -138,6 +145,17 @@ def process_block_save(block_file_number: int, loader, p=None):
loader.save() loader.save()
@sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True)
def process_inputs_outputs(p=None):
p.start(2)
# 1. Update TXIs to have the address of TXO they are spending.
set_input_addresses(p.ctx)
p.step(1)
# 2. Update spent TXOs setting is_spent = True
update_spent_outputs(p.ctx)
p.step(2)
@sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True) @sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True)
def process_block_filters(p=None): def process_block_filters(p=None):
blocks = [] blocks = []
@ -164,160 +182,6 @@ def process_block_filters(p=None):
# execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs) # execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs)
def signature_validation(d: dict, row: dict, public_key) -> dict:
d['is_signature_valid'] = False
if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key):
d['is_signature_valid'] = True
return d
@sync_step(Event.CLAIM_SIGN, initial_sync=True, ongoing_sync=True)
def process_claim_signatures(changes: ClaimChanges, p=None):
p.start(get_unvalidated_signable_count(p.ctx, Claim))
claim_updates = []
sql = select_unvalidated_signables(
Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None
)
steps = 0
for claim in p.ctx.execute(sql):
claim_updates.append(
signature_validation({
'pk': claim['claim_hash'],
'canonical_url': claim['channel_url'] + '/' + claim['claim_url']
}, claim, claim['public_key'])
)
if changes is not None:
changes.channels_with_changed_content.add(claim['channel_hash'])
if claim['previous_channel_hash']:
changes.channels_with_changed_content.add(claim['previous_channel_hash'])
if len(claim_updates) > 500:
p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates)
steps += len(claim_updates)
p.step(steps)
claim_updates.clear()
if claim_updates:
p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates)
@sync_step(Event.SUPPORT_SIGN, initial_sync=True, ongoing_sync=True)
def process_support_signatures(changes: ClaimChanges, p=None):
p.start(get_unvalidated_signable_count(p.ctx, Support))
support_updates = []
for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)):
support_updates.append(
signature_validation({'pk': support['txo_hash']}, support, support['public_key'])
)
if changes is not None:
changes.channels_with_changed_content.add(support['channel_hash'])
if len(support_updates) > 500:
p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates)
p.step(len(support_updates))
support_updates.clear()
if support_updates:
p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates)
@sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True)
def process_inputs_outputs(p=None):
p.start(2)
# 1. Update TXIs to have the address of TXO they are spending.
set_input_addresses(p.ctx)
p.step(1)
# 2. Update spent TXOs setting is_spent = True
update_spent_outputs(p.ctx)
p.step(2)
@sync_step(Event.CLAIM_META, initial_sync=True, ongoing_sync=True)
def process_claim_metadata(starting_height: int, ending_height: int, chain, p=None):
channel = Claim.alias('channel')
stream = Claim.alias('stream')
p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height))
claim_update_sql = (
Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_'))
.values(
canonical_url=case([(
((Claim.c.canonical_url == None) & (Claim.c.channel_hash != None)),
select(channel.c.short_url).select_from(channel)
.where(channel.c.claim_hash == Claim.c.channel_hash)
.scalar_subquery() + '/' + bindparam('short_url_')
)], else_=Claim.c.canonical_url),
staked_support_amount=staked_support_amount_calc,
staked_support_count=staked_support_count_calc,
signed_claim_count=case([(
(Claim.c.claim_type == TXO_TYPES['channel']),
channel_content_count_calc(stream)
)], else_=0),
signed_support_count=case([(
(Claim.c.claim_type == TXO_TYPES['channel']),
channel_content_count_calc(Support)
)], else_=0),
)
)
done, step_size = 0, 500
for offset in range(starting_height, ending_height + 1, step_size):
claims = chain.db.sync_get_claim_metadata(
start_height=offset, end_height=min(offset + step_size, ending_height)
)
if claims:
p.ctx.execute(claim_update_sql, claims)
done += len(claims)
p.step(done)
@sync_step(Event.STAKE_CALC, ongoing_sync=True)
def process_stake_calc(changes: ClaimChanges, p=None):
p.start(len(changes.claims_with_changed_supports))
sql = (
Claim.update()
.where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports)))
.values(
staked_support_amount=staked_support_amount_calc,
staked_support_count=staked_support_count_calc,
)
)
p.ctx.execute(sql)
@sync_step(Event.CLAIM_CHAN, ongoing_sync=True)
def process_channel_content(changes: ClaimChanges, p=None):
p.start(len(changes.channels_with_changed_content))
stream = Claim.alias('stream')
sql = (
Claim.update()
.where((Claim.c.claim_hash.in_(changes.channels_with_changed_content)))
.values(
signed_claim_count=channel_content_count_calc(stream),
signed_support_count=channel_content_count_calc(Support),
)
)
p.ctx.execute(sql)
@sync_step(Event.CLAIM_TRIE, step_size=100, ongoing_sync=True)
def process_takeovers(starting_height: int, ending_height: int, chain, p=None):
p.start(chain.db.sync_get_takeover_count(start_height=starting_height, end_height=ending_height))
for offset in range(starting_height, ending_height + 1):
for takeover in chain.db.sync_get_takeovers(start_height=offset, end_height=offset):
update_claims = (
Claim.update()
.where(Claim.c.normalized == normalize_name(takeover['name'].decode()))
.values(
is_controlling=case(
[(Claim.c.claim_hash == takeover['claim_hash'], True)],
else_=False
),
takeover_height=case(
[(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])],
else_=None
),
activation_height=least(Claim.c.activation_height, takeover['height']),
)
)
p.ctx.execute(update_claims)
p.step(1)
@sync_step(Event.CLAIM_DELETE, ongoing_sync=True) @sync_step(Event.CLAIM_DELETE, ongoing_sync=True)
def process_claim_delete(changes: ClaimChanges, p=None): def process_claim_delete(changes: ClaimChanges, p=None):
changes.channels_with_changed_content |= { changes.channels_with_changed_content |= {
@ -381,3 +245,146 @@ def process_support_insert(changes: ClaimChanges, p=None):
loader.add_support(txo) loader.add_support(txo)
changes.claims_with_changed_supports.add(txo.claim_hash) changes.claims_with_changed_supports.add(txo.claim_hash)
loader.save() loader.save()
@sync_step(Event.CLAIM_TRIE, step_size=100, ongoing_sync=True)
def process_takeovers(starting_height: int, ending_height: int, chain, p=None):
p.start(chain.db.sync_get_takeover_count(start_height=starting_height, end_height=ending_height))
for offset in range(starting_height, ending_height + 1):
for takeover in chain.db.sync_get_takeovers(start_height=offset, end_height=offset):
update_claims = (
Claim.update()
.where(Claim.c.normalized == normalize_name(takeover['name'].decode()))
.values(
is_controlling=case(
[(Claim.c.claim_hash == takeover['claim_hash'], True)],
else_=False
),
takeover_height=case(
[(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])],
else_=None
),
activation_height=least(Claim.c.activation_height, takeover['height']),
)
)
p.ctx.execute(update_claims)
p.step(1)
@sync_step(Event.CLAIM_META, initial_sync=True, ongoing_sync=True)
def process_claim_metadata(starting_height: int, ending_height: int, chain, p=None):
channel = Claim.alias('channel')
stream = Claim.alias('stream')
p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height))
claim_update_sql = (
Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_'))
.values(
canonical_url=case([(
((Claim.c.canonical_url == None) & (Claim.c.channel_hash != None)),
select(channel.c.short_url).select_from(channel)
.where(channel.c.claim_hash == Claim.c.channel_hash)
.scalar_subquery() + '/' + bindparam('short_url_')
)], else_=Claim.c.canonical_url),
staked_support_amount=staked_support_amount_calc,
staked_support_count=staked_support_count_calc,
signed_claim_count=case([(
(Claim.c.claim_type == TXO_TYPES['channel']),
channel_content_count_calc(stream)
)], else_=0),
signed_support_count=case([(
(Claim.c.claim_type == TXO_TYPES['channel']),
channel_content_count_calc(Support)
)], else_=0),
)
)
done, step_size = 0, 500
for offset in range(starting_height, ending_height + 1, step_size):
claims = chain.db.sync_get_claim_metadata(
start_height=offset, end_height=min(offset + step_size, ending_height)
)
if claims:
p.ctx.execute(claim_update_sql, claims)
done += len(claims)
p.step(done)
def signature_validation(d: dict, row: dict, public_key) -> dict:
d['is_signature_valid'] = False
if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key):
d['is_signature_valid'] = True
return d
@sync_step(Event.CLAIM_SIGN, initial_sync=True, ongoing_sync=True)
def process_claim_signatures(changes: ClaimChanges, p=None):
p.start(get_unvalidated_signable_count(p.ctx, Claim))
claim_updates = []
sql = select_unvalidated_signables(
Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None
)
steps = 0
for claim in p.ctx.execute(sql):
claim_updates.append(
signature_validation({
'pk': claim['claim_hash'],
'canonical_url': claim['channel_url'] + '/' + claim['claim_url']
}, claim, claim['public_key'])
)
if changes is not None:
changes.channels_with_changed_content.add(claim['channel_hash'])
if claim['previous_channel_hash']:
changes.channels_with_changed_content.add(claim['previous_channel_hash'])
if len(claim_updates) > 500:
p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates)
steps += len(claim_updates)
p.step(steps)
claim_updates.clear()
if claim_updates:
p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates)
@sync_step(Event.SUPPORT_SIGN, initial_sync=True, ongoing_sync=True)
def process_support_signatures(changes: ClaimChanges, p=None):
p.start(get_unvalidated_signable_count(p.ctx, Support))
support_updates = []
for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)):
support_updates.append(
signature_validation({'pk': support['txo_hash']}, support, support['public_key'])
)
if changes is not None:
changes.channels_with_changed_content.add(support['channel_hash'])
if len(support_updates) > 500:
p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates)
p.step(len(support_updates))
support_updates.clear()
if support_updates:
p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates)
@sync_step(Event.STAKE_CALC, ongoing_sync=True)
def process_stake_calc(changes: ClaimChanges, p=None):
p.start(len(changes.claims_with_changed_supports))
sql = (
Claim.update()
.where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports)))
.values(
staked_support_amount=staked_support_amount_calc,
staked_support_count=staked_support_count_calc,
)
)
p.ctx.execute(sql)
@sync_step(Event.CLAIM_CHAN, ongoing_sync=True)
def process_channel_content(changes: ClaimChanges, p=None):
p.start(len(changes.channels_with_changed_content))
stream = Claim.alias('stream')
sql = (
Claim.update()
.where((Claim.c.claim_hash.in_(changes.channels_with_changed_content)))
.values(
signed_claim_count=channel_content_count_calc(stream),
signed_support_count=channel_content_count_calc(Support),
)
)
p.ctx.execute(sql)

View file

@ -86,7 +86,7 @@ class BlockchainSync(Sync):
if not tasks: if not tasks:
return return
await self._on_progress_controller.add({ await self._on_progress_controller.add({
"event": Event.START.label, "event": Event.START.label, # pylint: disable=no-member
"data": { "data": {
"starting_height": starting_height, "starting_height": starting_height,
"ending_height": ending_height, "ending_height": ending_height,
@ -111,17 +111,17 @@ class BlockchainSync(Sync):
best_height_processed = max(f.result() for f in done) best_height_processed = max(f.result() for f in done)
return starting_height, best_height_processed return starting_height, best_height_processed
def count_steps(self, initial_sync: bool): def get_steps(self, initial_sync: bool):
if initial_sync: if initial_sync:
sync_steps = SYNC_STEPS['initial_sync'] sync_steps = SYNC_STEPS['initial_sync'].copy()
else: else:
sync_steps = SYNC_STEPS['ongoing_sync'] sync_steps = SYNC_STEPS['ongoing_sync'].copy()
if not self.conf.spv_address_filters: if not self.conf.spv_address_filters:
sync_steps -= 1 sync_steps.remove(Event.BLOCK_FILTER.label) # pylint: disable=no-member
return sync_steps return sync_steps
async def advance(self, initial_sync=False): async def advance(self, initial_sync=False):
sync_steps = self.count_steps(initial_sync) sync_steps = self.get_steps(initial_sync)
heights = await self.load_blocks(sync_steps, initial_sync) heights = await self.load_blocks(sync_steps, initial_sync)
if heights: if heights:
starting_height, ending_height = heights starting_height, ending_height = heights
@ -129,7 +129,9 @@ class BlockchainSync(Sync):
if self.conf.spv_address_filters: if self.conf.spv_address_filters:
await self.run(process_block_filters) await self.run(process_block_filters)
await self._on_block_controller.add(BlockEvent(ending_height)) await self._on_block_controller.add(BlockEvent(ending_height))
self.db.message_queue.put((Event.COMPLETE.value, os.getpid(), sync_steps, sync_steps)) self.db.message_queue.put((
Event.COMPLETE.value, os.getpid(), len(sync_steps), len(sync_steps)
))
async def advance_loop(self): async def advance_loop(self):
while True: while True:

View file

@ -54,68 +54,110 @@ class Basic(Console):
class Advanced(Basic): class Advanced(Basic):
FORMAT = '{l_bar}{bar}| {n_fmt:>8}/{total_fmt:>8} [{elapsed:>7}<{remaining:>8}, {rate_fmt:>16}]' FORMAT = '{l_bar}{bar}| {n_fmt:>8}/{total_fmt:>8} [{elapsed:>7}<{remaining:>8}, {rate_fmt:>17}]'
def __init__(self, service: Service): def __init__(self, service: Service):
super().__init__(service) super().__init__(service)
self.bars: Dict[Any, tqdm.tqdm] = {} self.bars: Dict[Any, tqdm.tqdm] = {}
self.is_single_sync_bar = False
self.single_bar_relative_steps = 0
self.last_stats = ""
self.sync_steps = []
def get_or_create_bar(self, name, desc, unit, total, leave=False): def get_or_create_bar(self, name, desc, unit, total, leave=False, bar_format=None, postfix=None, position=None):
bar = self.bars.get(name) bar = self.bars.get(name)
if bar is None: if bar is None:
bar = self.bars[name] = tqdm.tqdm( bar = self.bars[name] = tqdm.tqdm(
desc=desc, unit=unit, total=total, desc=desc, unit=unit, total=total,
bar_format=self.FORMAT, leave=leave bar_format=bar_format or self.FORMAT, leave=leave,
postfix=postfix, position=position
) )
return bar return bar
def start_sync_block_bars(self, d): def sync_start(self, d):
self.bars.clear() self.bars.clear()
self.get_or_create_bar("read", "total reading", "blocks", d['blocks'], True) self.sync_steps = d['sync_steps']
self.get_or_create_bar("save", "total saving", "txs", d['txs'], True) if d['ending_height']-d['starting_height'] > 0:
label = f"sync {d['starting_height']:,d}-{d['ending_height']:,d}"
else:
label = f"sync {d['ending_height']:,d}"
self.last_stats = f"{d['txs']:,d} txs, {d['claims']:,d} claims and {d['supports']:,d} supports"
self.get_or_create_bar(
"sync", label, "tasks", len(d['sync_steps']), True,
"{l_bar}{bar}| {postfix[0]:<55}", (self.last_stats,)
)
self.get_or_create_bar("read", "├─ blocks read", "blocks", d['blocks'], True)
self.get_or_create_bar("save", "└─┬ txs saved", "txs", d['txs'], True)
def close_sync_block_bars(self): def update_progress(self, e, d):
self.bars.pop("read").close() if e in ('blockchain.sync.block.read', 'blockchain.sync.block.save'):
self.bars.pop("save").close() self.update_block_bars(e, d)
else:
self.update_steps_bar(e, d)
self.update_other_bars(e, d)
def update_sync_block_bars(self, event, d): def update_steps_bar(self, e, d):
sync_bar = self.bars['sync']
if d['step'] == d['total']:
sync_done = (self.sync_steps.index(e)+1)-sync_bar.last_print_n
sync_bar.postfix = (f'finished: {e}',)
sync_bar.update(sync_done)
def update_block_bars(self, event, d):
bar_name = f"block-{d['block_file']}" bar_name = f"block-{d['block_file']}"
bar = self.bars.get(bar_name) bar = self.bars.get(bar_name)
if bar is None: if bar is None:
return self.get_or_create_bar( return self.get_or_create_bar(
bar_name, bar_name,
f"├─ blk{d['block_file']:05}.dat parsing", 'blocks', d['total'] f" ├─ blk{d['block_file']:05}.dat reading", 'blocks', d['total']
) )
if event == "save" and bar.unit == "blocks": if event.endswith("save") and bar.unit == "blocks":
bar.desc = f"├─ blk{d['block_file']:05}.dat saving" bar.desc = f" ├─ blk{d['block_file']:05}.dat saving"
bar.unit = "txs" bar.unit = "txs"
bar.reset(d['total']) bar.reset(d['total'])
return return
diff = d['step']-bar.last_print_n diff = d['step']-bar.last_print_n
bar.update(diff) bar.update(diff)
self.bars[event].update(diff) if event.endswith("save") and d['step'] == d['total']:
if event == "save" and d['step'] == d['total']:
bar.close() bar.close()
total_bar = self.bars[event[-4:]]
total_bar.update(diff)
if total_bar.total == total_bar.last_print_n:
self.update_steps_bar(event, {'step': 1, 'total': 1})
if total_bar.desc.endswith('txs saved'):
total_bar.desc = "├─ txs saved"
total_bar.refresh()
def update_other_bars(self, e, d): def update_other_bars(self, e, d):
bar = self.get_or_create_bar(e, e[-13:], d['unit'], d['total'], leave=True) if d['total'] == 0:
return
bar = self.bars.get(e)
if not bar:
name = (
' '.join(e.split('.')[-2:])
.replace('support', 'suprt')
.replace('channels', 'chanls')
.replace('signatures', 'sigs')
)
bar = self.get_or_create_bar(e, f"├─ {name:>12}", d['unit'], d['total'], True)
diff = d['step']-bar.last_print_n diff = d['step']-bar.last_print_n
bar.update(diff) bar.update(diff)
if d['step'] == d['total']: #if d['step'] == d['total']:
#bar.close()
def sync_complete(self):
self.bars['sync'].postfix = (self.last_stats,)
for bar in self.bars.values():
bar.close() bar.close()
def on_sync_progress(self, event): def on_sync_progress(self, event):
e, d = event['event'], event.get('data', {}) e, d = event['event'], event.get('data', {})
if e.endswith("start"): if e.endswith("sync.start"):
self.start_sync_block_bars(d) self.sync_start(d)
elif e.endswith("block.done"): elif e.endswith("sync.complete"):
self.close_sync_block_bars() self.sync_complete()
elif e.endswith("block.read"):
self.update_sync_block_bars("read", d)
elif e.endswith("block.save"):
self.update_sync_block_bars("save", d)
else: else:
self.update_other_bars(e, d) self.update_progress(e, d)

View file

@ -178,14 +178,14 @@ class ProgressUnit(Enum):
class Event(Enum): class Event(Enum):
START = "blockchain.sync.start", ProgressUnit.BLOCKS START = "blockchain.sync.start", ProgressUnit.BLOCKS
COMPLETE = "blockchain.sync.complete", ProgressUnit.BLOCKS COMPLETE = "blockchain.sync.complete", ProgressUnit.TASKS
# full node specific sync events # full node specific sync events
BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS
BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS
BLOCK_FILTER = "blockchain.sync.block.filter", ProgressUnit.BLOCKS BLOCK_FILTER = "blockchain.sync.block.filter", ProgressUnit.BLOCKS
CLAIM_META = "blockchain.sync.claim.update", ProgressUnit.CLAIMS CLAIM_META = "blockchain.sync.claim.meta", ProgressUnit.CLAIMS
CLAIM_TRIE = "blockchain.sync.claim.takeovers", ProgressUnit.CLAIMS CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.CLAIMS
STAKE_CALC = "blockchain.sync.claim.stakes", ProgressUnit.CLAIMS STAKE_CALC = "blockchain.sync.claim.stakes", ProgressUnit.CLAIMS
CLAIM_CHAN = "blockchain.sync.claim.channels", ProgressUnit.CLAIMS CLAIM_CHAN = "blockchain.sync.claim.channels", ProgressUnit.CLAIMS
CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS

View file

@ -1,63 +1,77 @@
import asyncio import asyncio
from random import randrange from random import randrange
from typing import List
from lbry.blockchain import Ledger from lbry.blockchain import Ledger
from lbry.service import FullNode from lbry.service import FullNode
from lbry.console import Advanced from lbry.console import Advanced, Basic
from lbry.conf import Config from lbry.conf import Config
from lbry.db.utils import chunk
class Simulator: class Simulator:
def __init__(self, progress, ending_height, files, txs, processes): def __init__(self, console):
self.progress = progress self.console = console
self.sync = console.service.sync
self.progress = self.sync._on_progress_controller
self.starting_height = 0 self.starting_height = 0
self.ending_height = ending_height self.ending_height = 0
self.starting_file = 0 self.starting_file = 0
self.ending_file = files self.processes = console.service.db.processes
self.steps = []
self.txs = 0
self.claims = 0
self.supports = 0
@property
def blocks(self, ):
if self.starting_height == 0:
return self.ending_height-self.starting_height
return (self.ending_height-self.starting_height)+1
async def advance(self, initial_sync: bool, ending_height: int, files: List[int], txs: int):
self.ending_height = ending_height
self.steps = self.sync.get_steps(initial_sync)
self.txs = txs self.txs = txs
self.processes = processes self.claims = int(txs/4)
self.supports = int(txs/2)
@property
def blocks(self):
return self.ending_height-self.starting_height
@property
def files(self):
return self.ending_file-self.starting_file
async def advance(self):
await self.progress.add({ await self.progress.add({
"event": "blockchain.sync.start", "event": "blockchain.sync.start",
"data": { "data": {
"starting_height": self.starting_height, "starting_height": self.starting_height,
"ending_height": self.ending_height, "ending_height": ending_height,
"files": self.files, "files": len(files),
"blocks": self.blocks, "blocks": self.blocks,
"txs": self.txs "sync_steps": self.steps,
"txs": self.txs,
"claims": self.claims,
"supports": self.supports,
} }
}) })
blocks_synced = txs_synced = 0 blocks_synced = txs_synced = 0
for starting_file in range(self.starting_file, self.ending_file, self.processes): for file_group in chunk(files, self.processes):
tasks = [] tasks = []
for b in range(starting_file, min(self.ending_file, starting_file+self.processes)): for file in file_group:
if b == (self.ending_file-1): if file == files[-1]:
tasks.append(self.sync_block_file(b, self.blocks-blocks_synced, self.txs-txs_synced)) tasks.append(self.sync_block_file(file, self.blocks-blocks_synced, self.txs-txs_synced))
else: else:
blocks = int(self.blocks / self.files) blocks = int(self.blocks / len(files))
blocks_synced += blocks blocks_synced += blocks
txs = int(self.txs / self.files) txs = int(self.txs / len(files))
txs_synced += txs txs_synced += txs
tasks.append(self.sync_block_file(b, blocks, txs)) tasks.append(self.sync_block_file(file, blocks, txs))
await asyncio.wait(tasks) await asyncio.wait(tasks)
for step in self.steps:
if step in ("blockchain.sync.block.read", "blockchain.sync.block.save"):
continue
await getattr(self, step.replace('.', '_'))()
await self.progress.add({ await self.progress.add({
"event": "blockchain.sync.block.done", "event": "blockchain.sync.complete",
"data": {"step": 1, "total": 1, "unit": "tasks", "best_height_processed": self.ending_height} "data": {"step": len(self.steps), "total": len(self.steps), "unit": "tasks"}
}) })
await self.process() self.ending_height = ending_height+1
self.processes = 1
self.txs = 25
self.starting_height = self.ending_height self.starting_height = self.ending_height
self.ending_height += 1
async def sync_block_file(self, block_file, blocks, txs): async def sync_block_file(self, block_file, blocks, txs):
for i in range(0, blocks, 1000): for i in range(0, blocks, 1000):
@ -71,7 +85,7 @@ class Simulator:
"data": {"step": blocks, "total": blocks, "unit": "blocks", "block_file": block_file} "data": {"step": blocks, "total": blocks, "unit": "blocks", "block_file": block_file}
}) })
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
for i in range(0, txs, 10000): for i in range(0, txs, 2000):
await self.progress.add({ await self.progress.add({
"event": "blockchain.sync.block.save", "event": "blockchain.sync.block.save",
"data": {"step": i, "total": txs, "unit": "txs", "block_file": block_file} "data": {"step": i, "total": txs, "unit": "txs", "block_file": block_file}
@ -82,28 +96,69 @@ class Simulator:
"data": {"step": txs, "total": txs, "unit": "txs", "block_file": block_file} "data": {"step": txs, "total": txs, "unit": "txs", "block_file": block_file}
}) })
async def process(self): async def generate_steps(self, event, steps, unit, delay=1.0, step=1):
for i in range(3): await self.progress.add({"event": event, "data": {"step": 0, "total": steps, "unit": unit}})
remaining = steps
for i in range(1, steps+1, step):
await asyncio.sleep(delay)
await self.progress.add({"event": event, "data": {"step": i, "total": steps, "unit": unit}})
remaining -= i
if remaining:
await asyncio.sleep(delay)
await self.progress.add({"event": event, "data": {"step": steps, "total": steps, "unit": unit}})
async def db_sync_input(self):
await self.generate_steps("db.sync.input", 2, "txis")
async def db_sync_claim_delete(self):
await self.generate_steps("db.sync.claim.delete", 1, "claims")
async def db_sync_claim_insert(self):
await self.generate_steps("db.sync.claim.insert", 1, "claims")
async def db_sync_claim_update(self):
await self.generate_steps("db.sync.claim.update", 0, "claims")
async def db_sync_support_delete(self):
await self.generate_steps("db.sync.support.delete", 1, "supports")
async def db_sync_support_insert(self):
await self.generate_steps("db.sync.support.insert", 1, "supports")
async def blockchain_sync_claim_trie(self):
await self.generate_steps("blockchain.sync.claim.trie", 1, "claims")
async def blockchain_sync_claim_meta(self):
for i in range(0, self.claims, 1_000):
await self.progress.add({ await self.progress.add({
"event": "db.sync.input", "event": "blockchain.sync.claim.meta",
"data": {"step": i, "total": 2, "unit": "txis"} "data": {"step": i, "total": self.claims, "unit": "claims"}
})
await asyncio.sleep(1)
claims = int(self.txs/4)
for i in range(0, claims+1, 10_000):
await self.progress.add({
"event": "blockchain.sync.claim.update",
"data": {"step": i, "total": claims, "unit": "claims"}
}) })
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
await self.progress.add({
"event": "blockchain.sync.claim.meta",
"data": {"step": self.claims, "total": self.claims, "unit": "claims"}
})
async def blockchain_sync_claim_signatures(self):
await self.generate_steps("blockchain.sync.claim.signatures", self.claims, "claims", 0.5, 1000)
async def blockchain_sync_support_signatures(self):
await self.generate_steps("blockchain.sync.support.signatures", self.supports, "supports", 0.5, 1000)
async def blockchain_sync_claim_stakes(self):
await self.generate_steps("blockchain.sync.claim.stakes", 1, "claims", 0.5)
async def blockchain_sync_claim_channels(self):
await self.generate_steps("blockchain.sync.claim.channels", 0, "supports", 0.5)
async def main(): async def main():
console = Advanced(FullNode(Ledger(Config(processes=4)))) console = Advanced(FullNode(Ledger(Config(processes=3, spv_address_filters=False))))
progress = console.service.sync._on_progress_controller sim = Simulator(console)
sim = Simulator(progress, 200_000, 7, 1_000_000, console.service.db.processes)
console.starting() console.starting()
await sim.advance() await sim.advance(True, 100_000, [1, 2, 3, 4, 5], 100_000)
await sim.advance(False, 100_001, [5], 100)
console.stopping() console.stopping()

View file

@ -466,7 +466,13 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
"data": { "data": {
"starting_height": 0, "starting_height": 0,
"ending_height": 352, "ending_height": 352,
"sync_steps": 6, "sync_steps": [
"blockchain.sync.block.read",
"blockchain.sync.block.save",
"db.sync.input",
"blockchain.sync.claim.meta",
"blockchain.sync.claim.signatures",
"blockchain.sync.support.signatures"],
"files": 3, "files": 3,
"blocks": 353, "blocks": 353,
"txs": 544, "txs": 544,
@ -523,10 +529,10 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
[2, 2], [2, 2],
] ]
) )
# 4 - blockchain.sync.claim.update # 4 - blockchain.sync.claim.meta
self.assertEqual( self.assertEqual(
[[0, 3610], [3610, 3610]], [[0, 3610], [3610, 3610]],
self.extract_events('blockchain.sync.claim.update', events) self.extract_events('blockchain.sync.claim.meta', events)
) )
# 5 - blockchain.sync.claim.signatures # 5 - blockchain.sync.claim.signatures
self.assertEqual( self.assertEqual(
@ -562,7 +568,21 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
"data": { "data": {
"starting_height": 353, "starting_height": 353,
"ending_height": 354, "ending_height": 354,
"sync_steps": 14, "sync_steps": [
"blockchain.sync.block.read",
"blockchain.sync.block.save",
"db.sync.input",
"db.sync.claim.delete",
"db.sync.claim.insert",
"db.sync.claim.update",
"db.sync.support.delete",
"db.sync.support.insert",
"blockchain.sync.claim.trie",
"blockchain.sync.claim.meta",
"blockchain.sync.claim.signatures",
"blockchain.sync.support.signatures",
"blockchain.sync.claim.stakes",
"blockchain.sync.claim.channels"],
"files": 1, "files": 1,
"blocks": 2, "blocks": 2,
"txs": 4, "txs": 4,
@ -605,10 +625,10 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.delete', events)) self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.delete', events))
# 8 - db.sync.support.insert # 8 - db.sync.support.insert
self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.insert', events)) self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.insert', events))
# 9 - blockchain.sync.claim.takeovers # 9 - blockchain.sync.claim.trie
self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.takeovers', events)) self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.trie', events))
# 10 - blockchain.sync.claim.update # 10 - blockchain.sync.claim.meta
self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.update', events)) self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.meta', events))
# 11 - blockchain.sync.claim.signatures # 11 - blockchain.sync.claim.signatures
self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.claim.signatures", events)) self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.claim.signatures", events))
# 12 - blockchain.sync.support.signatures # 12 - blockchain.sync.support.signatures