diff --git a/lbry/blockchain/sync/steps.py b/lbry/blockchain/sync/steps.py index 6ab03d52c..30b70b1f5 100644 --- a/lbry/blockchain/sync/steps.py +++ b/lbry/blockchain/sync/steps.py @@ -24,7 +24,13 @@ from lbry.blockchain.block import Block, create_block_filter from lbry.blockchain.bcd_data_stream import BCDataStream from lbry.blockchain.transaction import Output -from .queries import * +from .queries import ( + select, Claim, Support, + TXO_TYPES, CLAIM_TYPE_CODES, + channel_content_count_calc, + staked_support_amount_calc, staked_support_count_calc, + select_unvalidated_signables, get_unvalidated_signable_count +) log = logging.getLogger(__name__) @@ -32,19 +38,20 @@ _chain: ContextVar[Lbrycrd] = ContextVar('chain') SYNC_STEPS = { - "initial_sync": 0, - "ongoing_sync": 0, + "initial_sync": [], + "ongoing_sync": [], "events": [], } def sync_step(event: Event, step_size=1, initial_sync=False, ongoing_sync=False): - assert event not in SYNC_STEPS['events'], f"Event {event} used more than once." + assert event.label not in SYNC_STEPS['events'], f"Event {event.label} used more than once." assert initial_sync or ongoing_sync, "At least one of initial_sync or ongoing_sync must be true." + SYNC_STEPS['events'].append(event.label) if initial_sync: - SYNC_STEPS['initial_sync'] += 1 + SYNC_STEPS['initial_sync'].append(event.label) if ongoing_sync: - SYNC_STEPS['ongoing_sync'] += 1 + SYNC_STEPS['ongoing_sync'].append(event.label) def wrapper(f): @functools.wraps(f) @@ -138,6 +145,17 @@ def process_block_save(block_file_number: int, loader, p=None): loader.save() +@sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True) +def process_inputs_outputs(p=None): + p.start(2) + # 1. Update TXIs to have the address of TXO they are spending. + set_input_addresses(p.ctx) + p.step(1) + # 2. Update spent TXOs setting is_spent = True + update_spent_outputs(p.ctx) + p.step(2) + + @sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True) def process_block_filters(p=None): blocks = [] @@ -164,160 +182,6 @@ def process_block_filters(p=None): # execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs) -def signature_validation(d: dict, row: dict, public_key) -> dict: - d['is_signature_valid'] = False - if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key): - d['is_signature_valid'] = True - return d - - -@sync_step(Event.CLAIM_SIGN, initial_sync=True, ongoing_sync=True) -def process_claim_signatures(changes: ClaimChanges, p=None): - p.start(get_unvalidated_signable_count(p.ctx, Claim)) - claim_updates = [] - sql = select_unvalidated_signables( - Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None - ) - steps = 0 - for claim in p.ctx.execute(sql): - claim_updates.append( - signature_validation({ - 'pk': claim['claim_hash'], - 'canonical_url': claim['channel_url'] + '/' + claim['claim_url'] - }, claim, claim['public_key']) - ) - if changes is not None: - changes.channels_with_changed_content.add(claim['channel_hash']) - if claim['previous_channel_hash']: - changes.channels_with_changed_content.add(claim['previous_channel_hash']) - if len(claim_updates) > 500: - p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) - steps += len(claim_updates) - p.step(steps) - claim_updates.clear() - if claim_updates: - p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) - - -@sync_step(Event.SUPPORT_SIGN, initial_sync=True, ongoing_sync=True) -def process_support_signatures(changes: ClaimChanges, p=None): - p.start(get_unvalidated_signable_count(p.ctx, Support)) - support_updates = [] - for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)): - support_updates.append( - signature_validation({'pk': support['txo_hash']}, support, support['public_key']) - ) - if changes is not None: - changes.channels_with_changed_content.add(support['channel_hash']) - if len(support_updates) > 500: - p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) - p.step(len(support_updates)) - support_updates.clear() - if support_updates: - p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) - - -@sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True) -def process_inputs_outputs(p=None): - p.start(2) - # 1. Update TXIs to have the address of TXO they are spending. - set_input_addresses(p.ctx) - p.step(1) - # 2. Update spent TXOs setting is_spent = True - update_spent_outputs(p.ctx) - p.step(2) - - -@sync_step(Event.CLAIM_META, initial_sync=True, ongoing_sync=True) -def process_claim_metadata(starting_height: int, ending_height: int, chain, p=None): - channel = Claim.alias('channel') - stream = Claim.alias('stream') - p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height)) - claim_update_sql = ( - Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')) - .values( - canonical_url=case([( - ((Claim.c.canonical_url == None) & (Claim.c.channel_hash != None)), - select(channel.c.short_url).select_from(channel) - .where(channel.c.claim_hash == Claim.c.channel_hash) - .scalar_subquery() + '/' + bindparam('short_url_') - )], else_=Claim.c.canonical_url), - staked_support_amount=staked_support_amount_calc, - staked_support_count=staked_support_count_calc, - signed_claim_count=case([( - (Claim.c.claim_type == TXO_TYPES['channel']), - channel_content_count_calc(stream) - )], else_=0), - signed_support_count=case([( - (Claim.c.claim_type == TXO_TYPES['channel']), - channel_content_count_calc(Support) - )], else_=0), - ) - ) - done, step_size = 0, 500 - for offset in range(starting_height, ending_height + 1, step_size): - claims = chain.db.sync_get_claim_metadata( - start_height=offset, end_height=min(offset + step_size, ending_height) - ) - if claims: - p.ctx.execute(claim_update_sql, claims) - done += len(claims) - p.step(done) - - -@sync_step(Event.STAKE_CALC, ongoing_sync=True) -def process_stake_calc(changes: ClaimChanges, p=None): - p.start(len(changes.claims_with_changed_supports)) - sql = ( - Claim.update() - .where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports))) - .values( - staked_support_amount=staked_support_amount_calc, - staked_support_count=staked_support_count_calc, - ) - ) - p.ctx.execute(sql) - - -@sync_step(Event.CLAIM_CHAN, ongoing_sync=True) -def process_channel_content(changes: ClaimChanges, p=None): - p.start(len(changes.channels_with_changed_content)) - stream = Claim.alias('stream') - sql = ( - Claim.update() - .where((Claim.c.claim_hash.in_(changes.channels_with_changed_content))) - .values( - signed_claim_count=channel_content_count_calc(stream), - signed_support_count=channel_content_count_calc(Support), - ) - ) - p.ctx.execute(sql) - - -@sync_step(Event.CLAIM_TRIE, step_size=100, ongoing_sync=True) -def process_takeovers(starting_height: int, ending_height: int, chain, p=None): - p.start(chain.db.sync_get_takeover_count(start_height=starting_height, end_height=ending_height)) - for offset in range(starting_height, ending_height + 1): - for takeover in chain.db.sync_get_takeovers(start_height=offset, end_height=offset): - update_claims = ( - Claim.update() - .where(Claim.c.normalized == normalize_name(takeover['name'].decode())) - .values( - is_controlling=case( - [(Claim.c.claim_hash == takeover['claim_hash'], True)], - else_=False - ), - takeover_height=case( - [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], - else_=None - ), - activation_height=least(Claim.c.activation_height, takeover['height']), - ) - ) - p.ctx.execute(update_claims) - p.step(1) - - @sync_step(Event.CLAIM_DELETE, ongoing_sync=True) def process_claim_delete(changes: ClaimChanges, p=None): changes.channels_with_changed_content |= { @@ -381,3 +245,146 @@ def process_support_insert(changes: ClaimChanges, p=None): loader.add_support(txo) changes.claims_with_changed_supports.add(txo.claim_hash) loader.save() + + +@sync_step(Event.CLAIM_TRIE, step_size=100, ongoing_sync=True) +def process_takeovers(starting_height: int, ending_height: int, chain, p=None): + p.start(chain.db.sync_get_takeover_count(start_height=starting_height, end_height=ending_height)) + for offset in range(starting_height, ending_height + 1): + for takeover in chain.db.sync_get_takeovers(start_height=offset, end_height=offset): + update_claims = ( + Claim.update() + .where(Claim.c.normalized == normalize_name(takeover['name'].decode())) + .values( + is_controlling=case( + [(Claim.c.claim_hash == takeover['claim_hash'], True)], + else_=False + ), + takeover_height=case( + [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], + else_=None + ), + activation_height=least(Claim.c.activation_height, takeover['height']), + ) + ) + p.ctx.execute(update_claims) + p.step(1) + + +@sync_step(Event.CLAIM_META, initial_sync=True, ongoing_sync=True) +def process_claim_metadata(starting_height: int, ending_height: int, chain, p=None): + channel = Claim.alias('channel') + stream = Claim.alias('stream') + p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height)) + claim_update_sql = ( + Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')) + .values( + canonical_url=case([( + ((Claim.c.canonical_url == None) & (Claim.c.channel_hash != None)), + select(channel.c.short_url).select_from(channel) + .where(channel.c.claim_hash == Claim.c.channel_hash) + .scalar_subquery() + '/' + bindparam('short_url_') + )], else_=Claim.c.canonical_url), + staked_support_amount=staked_support_amount_calc, + staked_support_count=staked_support_count_calc, + signed_claim_count=case([( + (Claim.c.claim_type == TXO_TYPES['channel']), + channel_content_count_calc(stream) + )], else_=0), + signed_support_count=case([( + (Claim.c.claim_type == TXO_TYPES['channel']), + channel_content_count_calc(Support) + )], else_=0), + ) + ) + done, step_size = 0, 500 + for offset in range(starting_height, ending_height + 1, step_size): + claims = chain.db.sync_get_claim_metadata( + start_height=offset, end_height=min(offset + step_size, ending_height) + ) + if claims: + p.ctx.execute(claim_update_sql, claims) + done += len(claims) + p.step(done) + + +def signature_validation(d: dict, row: dict, public_key) -> dict: + d['is_signature_valid'] = False + if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key): + d['is_signature_valid'] = True + return d + + +@sync_step(Event.CLAIM_SIGN, initial_sync=True, ongoing_sync=True) +def process_claim_signatures(changes: ClaimChanges, p=None): + p.start(get_unvalidated_signable_count(p.ctx, Claim)) + claim_updates = [] + sql = select_unvalidated_signables( + Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None + ) + steps = 0 + for claim in p.ctx.execute(sql): + claim_updates.append( + signature_validation({ + 'pk': claim['claim_hash'], + 'canonical_url': claim['channel_url'] + '/' + claim['claim_url'] + }, claim, claim['public_key']) + ) + if changes is not None: + changes.channels_with_changed_content.add(claim['channel_hash']) + if claim['previous_channel_hash']: + changes.channels_with_changed_content.add(claim['previous_channel_hash']) + if len(claim_updates) > 500: + p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) + steps += len(claim_updates) + p.step(steps) + claim_updates.clear() + if claim_updates: + p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) + + +@sync_step(Event.SUPPORT_SIGN, initial_sync=True, ongoing_sync=True) +def process_support_signatures(changes: ClaimChanges, p=None): + p.start(get_unvalidated_signable_count(p.ctx, Support)) + support_updates = [] + for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)): + support_updates.append( + signature_validation({'pk': support['txo_hash']}, support, support['public_key']) + ) + if changes is not None: + changes.channels_with_changed_content.add(support['channel_hash']) + if len(support_updates) > 500: + p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) + p.step(len(support_updates)) + support_updates.clear() + if support_updates: + p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) + + +@sync_step(Event.STAKE_CALC, ongoing_sync=True) +def process_stake_calc(changes: ClaimChanges, p=None): + p.start(len(changes.claims_with_changed_supports)) + sql = ( + Claim.update() + .where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports))) + .values( + staked_support_amount=staked_support_amount_calc, + staked_support_count=staked_support_count_calc, + ) + ) + p.ctx.execute(sql) + + +@sync_step(Event.CLAIM_CHAN, ongoing_sync=True) +def process_channel_content(changes: ClaimChanges, p=None): + p.start(len(changes.channels_with_changed_content)) + stream = Claim.alias('stream') + sql = ( + Claim.update() + .where((Claim.c.claim_hash.in_(changes.channels_with_changed_content))) + .values( + signed_claim_count=channel_content_count_calc(stream), + signed_support_count=channel_content_count_calc(Support), + ) + ) + p.ctx.execute(sql) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index a3b8cb0fb..63635e8b5 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -86,7 +86,7 @@ class BlockchainSync(Sync): if not tasks: return await self._on_progress_controller.add({ - "event": Event.START.label, + "event": Event.START.label, # pylint: disable=no-member "data": { "starting_height": starting_height, "ending_height": ending_height, @@ -111,17 +111,17 @@ class BlockchainSync(Sync): best_height_processed = max(f.result() for f in done) return starting_height, best_height_processed - def count_steps(self, initial_sync: bool): + def get_steps(self, initial_sync: bool): if initial_sync: - sync_steps = SYNC_STEPS['initial_sync'] + sync_steps = SYNC_STEPS['initial_sync'].copy() else: - sync_steps = SYNC_STEPS['ongoing_sync'] + sync_steps = SYNC_STEPS['ongoing_sync'].copy() if not self.conf.spv_address_filters: - sync_steps -= 1 + sync_steps.remove(Event.BLOCK_FILTER.label) # pylint: disable=no-member return sync_steps async def advance(self, initial_sync=False): - sync_steps = self.count_steps(initial_sync) + sync_steps = self.get_steps(initial_sync) heights = await self.load_blocks(sync_steps, initial_sync) if heights: starting_height, ending_height = heights @@ -129,7 +129,9 @@ class BlockchainSync(Sync): if self.conf.spv_address_filters: await self.run(process_block_filters) await self._on_block_controller.add(BlockEvent(ending_height)) - self.db.message_queue.put((Event.COMPLETE.value, os.getpid(), sync_steps, sync_steps)) + self.db.message_queue.put(( + Event.COMPLETE.value, os.getpid(), len(sync_steps), len(sync_steps) + )) async def advance_loop(self): while True: diff --git a/lbry/console.py b/lbry/console.py index e636e1e50..c60dd172d 100644 --- a/lbry/console.py +++ b/lbry/console.py @@ -54,68 +54,110 @@ class Basic(Console): class Advanced(Basic): - FORMAT = '{l_bar}{bar}| {n_fmt:>8}/{total_fmt:>8} [{elapsed:>7}<{remaining:>8}, {rate_fmt:>16}]' + FORMAT = '{l_bar}{bar}| {n_fmt:>8}/{total_fmt:>8} [{elapsed:>7}<{remaining:>8}, {rate_fmt:>17}]' def __init__(self, service: Service): super().__init__(service) self.bars: Dict[Any, tqdm.tqdm] = {} + self.is_single_sync_bar = False + self.single_bar_relative_steps = 0 + self.last_stats = "" + self.sync_steps = [] - def get_or_create_bar(self, name, desc, unit, total, leave=False): + def get_or_create_bar(self, name, desc, unit, total, leave=False, bar_format=None, postfix=None, position=None): bar = self.bars.get(name) if bar is None: bar = self.bars[name] = tqdm.tqdm( desc=desc, unit=unit, total=total, - bar_format=self.FORMAT, leave=leave + bar_format=bar_format or self.FORMAT, leave=leave, + postfix=postfix, position=position ) return bar - def start_sync_block_bars(self, d): + def sync_start(self, d): self.bars.clear() - self.get_or_create_bar("read", "total reading", "blocks", d['blocks'], True) - self.get_or_create_bar("save", "total saving", "txs", d['txs'], True) + self.sync_steps = d['sync_steps'] + if d['ending_height']-d['starting_height'] > 0: + label = f"sync {d['starting_height']:,d}-{d['ending_height']:,d}" + else: + label = f"sync {d['ending_height']:,d}" + self.last_stats = f"{d['txs']:,d} txs, {d['claims']:,d} claims and {d['supports']:,d} supports" + self.get_or_create_bar( + "sync", label, "tasks", len(d['sync_steps']), True, + "{l_bar}{bar}| {postfix[0]:<55}", (self.last_stats,) + ) + self.get_or_create_bar("read", "├─ blocks read", "blocks", d['blocks'], True) + self.get_or_create_bar("save", "└─┬ txs saved", "txs", d['txs'], True) - def close_sync_block_bars(self): - self.bars.pop("read").close() - self.bars.pop("save").close() + def update_progress(self, e, d): + if e in ('blockchain.sync.block.read', 'blockchain.sync.block.save'): + self.update_block_bars(e, d) + else: + self.update_steps_bar(e, d) + self.update_other_bars(e, d) - def update_sync_block_bars(self, event, d): + def update_steps_bar(self, e, d): + sync_bar = self.bars['sync'] + if d['step'] == d['total']: + sync_done = (self.sync_steps.index(e)+1)-sync_bar.last_print_n + sync_bar.postfix = (f'finished: {e}',) + sync_bar.update(sync_done) + + def update_block_bars(self, event, d): bar_name = f"block-{d['block_file']}" bar = self.bars.get(bar_name) if bar is None: return self.get_or_create_bar( bar_name, - f"├─ blk{d['block_file']:05}.dat parsing", 'blocks', d['total'] + f" ├─ blk{d['block_file']:05}.dat reading", 'blocks', d['total'] ) - if event == "save" and bar.unit == "blocks": - bar.desc = f"├─ blk{d['block_file']:05}.dat saving" + if event.endswith("save") and bar.unit == "blocks": + bar.desc = f" ├─ blk{d['block_file']:05}.dat saving" bar.unit = "txs" bar.reset(d['total']) return diff = d['step']-bar.last_print_n bar.update(diff) - self.bars[event].update(diff) - - if event == "save" and d['step'] == d['total']: + if event.endswith("save") and d['step'] == d['total']: bar.close() + total_bar = self.bars[event[-4:]] + total_bar.update(diff) + if total_bar.total == total_bar.last_print_n: + self.update_steps_bar(event, {'step': 1, 'total': 1}) + if total_bar.desc.endswith('txs saved'): + total_bar.desc = "├─ txs saved" + total_bar.refresh() + def update_other_bars(self, e, d): - bar = self.get_or_create_bar(e, e[-13:], d['unit'], d['total'], leave=True) + if d['total'] == 0: + return + bar = self.bars.get(e) + if not bar: + name = ( + ' '.join(e.split('.')[-2:]) + .replace('support', 'suprt') + .replace('channels', 'chanls') + .replace('signatures', 'sigs') + ) + bar = self.get_or_create_bar(e, f"├─ {name:>12}", d['unit'], d['total'], True) diff = d['step']-bar.last_print_n bar.update(diff) - if d['step'] == d['total']: + #if d['step'] == d['total']: + #bar.close() + + def sync_complete(self): + self.bars['sync'].postfix = (self.last_stats,) + for bar in self.bars.values(): bar.close() def on_sync_progress(self, event): e, d = event['event'], event.get('data', {}) - if e.endswith("start"): - self.start_sync_block_bars(d) - elif e.endswith("block.done"): - self.close_sync_block_bars() - elif e.endswith("block.read"): - self.update_sync_block_bars("read", d) - elif e.endswith("block.save"): - self.update_sync_block_bars("save", d) + if e.endswith("sync.start"): + self.sync_start(d) + elif e.endswith("sync.complete"): + self.sync_complete() else: - self.update_other_bars(e, d) + self.update_progress(e, d) diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 8cf74ac33..1fba3161a 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -178,14 +178,14 @@ class ProgressUnit(Enum): class Event(Enum): START = "blockchain.sync.start", ProgressUnit.BLOCKS - COMPLETE = "blockchain.sync.complete", ProgressUnit.BLOCKS + COMPLETE = "blockchain.sync.complete", ProgressUnit.TASKS # full node specific sync events BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS BLOCK_FILTER = "blockchain.sync.block.filter", ProgressUnit.BLOCKS - CLAIM_META = "blockchain.sync.claim.update", ProgressUnit.CLAIMS - CLAIM_TRIE = "blockchain.sync.claim.takeovers", ProgressUnit.CLAIMS + CLAIM_META = "blockchain.sync.claim.meta", ProgressUnit.CLAIMS + CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.CLAIMS STAKE_CALC = "blockchain.sync.claim.stakes", ProgressUnit.CLAIMS CLAIM_CHAN = "blockchain.sync.claim.channels", ProgressUnit.CLAIMS CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS diff --git a/scripts/simulate_sync_console.py b/scripts/simulate_sync_console.py index 056cea964..ee88edf39 100644 --- a/scripts/simulate_sync_console.py +++ b/scripts/simulate_sync_console.py @@ -1,63 +1,77 @@ import asyncio from random import randrange +from typing import List from lbry.blockchain import Ledger from lbry.service import FullNode -from lbry.console import Advanced +from lbry.console import Advanced, Basic from lbry.conf import Config +from lbry.db.utils import chunk class Simulator: - def __init__(self, progress, ending_height, files, txs, processes): - self.progress = progress + def __init__(self, console): + self.console = console + self.sync = console.service.sync + self.progress = self.sync._on_progress_controller self.starting_height = 0 - self.ending_height = ending_height + self.ending_height = 0 self.starting_file = 0 - self.ending_file = files + self.processes = console.service.db.processes + + self.steps = [] + self.txs = 0 + self.claims = 0 + self.supports = 0 + + @property + def blocks(self, ): + if self.starting_height == 0: + return self.ending_height-self.starting_height + return (self.ending_height-self.starting_height)+1 + + async def advance(self, initial_sync: bool, ending_height: int, files: List[int], txs: int): + self.ending_height = ending_height + self.steps = self.sync.get_steps(initial_sync) self.txs = txs - self.processes = processes - - @property - def blocks(self): - return self.ending_height-self.starting_height - - @property - def files(self): - return self.ending_file-self.starting_file - - async def advance(self): + self.claims = int(txs/4) + self.supports = int(txs/2) await self.progress.add({ "event": "blockchain.sync.start", "data": { "starting_height": self.starting_height, - "ending_height": self.ending_height, - "files": self.files, + "ending_height": ending_height, + "files": len(files), "blocks": self.blocks, - "txs": self.txs + "sync_steps": self.steps, + "txs": self.txs, + "claims": self.claims, + "supports": self.supports, } }) blocks_synced = txs_synced = 0 - for starting_file in range(self.starting_file, self.ending_file, self.processes): + for file_group in chunk(files, self.processes): tasks = [] - for b in range(starting_file, min(self.ending_file, starting_file+self.processes)): - if b == (self.ending_file-1): - tasks.append(self.sync_block_file(b, self.blocks-blocks_synced, self.txs-txs_synced)) + for file in file_group: + if file == files[-1]: + tasks.append(self.sync_block_file(file, self.blocks-blocks_synced, self.txs-txs_synced)) else: - blocks = int(self.blocks / self.files) + blocks = int(self.blocks / len(files)) blocks_synced += blocks - txs = int(self.txs / self.files) + txs = int(self.txs / len(files)) txs_synced += txs - tasks.append(self.sync_block_file(b, blocks, txs)) + tasks.append(self.sync_block_file(file, blocks, txs)) await asyncio.wait(tasks) + for step in self.steps: + if step in ("blockchain.sync.block.read", "blockchain.sync.block.save"): + continue + await getattr(self, step.replace('.', '_'))() await self.progress.add({ - "event": "blockchain.sync.block.done", - "data": {"step": 1, "total": 1, "unit": "tasks", "best_height_processed": self.ending_height} + "event": "blockchain.sync.complete", + "data": {"step": len(self.steps), "total": len(self.steps), "unit": "tasks"} }) - await self.process() - self.processes = 1 - self.txs = 25 + self.ending_height = ending_height+1 self.starting_height = self.ending_height - self.ending_height += 1 async def sync_block_file(self, block_file, blocks, txs): for i in range(0, blocks, 1000): @@ -71,7 +85,7 @@ class Simulator: "data": {"step": blocks, "total": blocks, "unit": "blocks", "block_file": block_file} }) await asyncio.sleep(0.5) - for i in range(0, txs, 10000): + for i in range(0, txs, 2000): await self.progress.add({ "event": "blockchain.sync.block.save", "data": {"step": i, "total": txs, "unit": "txs", "block_file": block_file} @@ -82,28 +96,69 @@ class Simulator: "data": {"step": txs, "total": txs, "unit": "txs", "block_file": block_file} }) - async def process(self): - for i in range(3): + async def generate_steps(self, event, steps, unit, delay=1.0, step=1): + await self.progress.add({"event": event, "data": {"step": 0, "total": steps, "unit": unit}}) + remaining = steps + for i in range(1, steps+1, step): + await asyncio.sleep(delay) + await self.progress.add({"event": event, "data": {"step": i, "total": steps, "unit": unit}}) + remaining -= i + if remaining: + await asyncio.sleep(delay) + await self.progress.add({"event": event, "data": {"step": steps, "total": steps, "unit": unit}}) + + async def db_sync_input(self): + await self.generate_steps("db.sync.input", 2, "txis") + + async def db_sync_claim_delete(self): + await self.generate_steps("db.sync.claim.delete", 1, "claims") + + async def db_sync_claim_insert(self): + await self.generate_steps("db.sync.claim.insert", 1, "claims") + + async def db_sync_claim_update(self): + await self.generate_steps("db.sync.claim.update", 0, "claims") + + async def db_sync_support_delete(self): + await self.generate_steps("db.sync.support.delete", 1, "supports") + + async def db_sync_support_insert(self): + await self.generate_steps("db.sync.support.insert", 1, "supports") + + async def blockchain_sync_claim_trie(self): + await self.generate_steps("blockchain.sync.claim.trie", 1, "claims") + + async def blockchain_sync_claim_meta(self): + for i in range(0, self.claims, 1_000): await self.progress.add({ - "event": "db.sync.input", - "data": {"step": i, "total": 2, "unit": "txis"} - }) - await asyncio.sleep(1) - claims = int(self.txs/4) - for i in range(0, claims+1, 10_000): - await self.progress.add({ - "event": "blockchain.sync.claim.update", - "data": {"step": i, "total": claims, "unit": "claims"} + "event": "blockchain.sync.claim.meta", + "data": {"step": i, "total": self.claims, "unit": "claims"} }) await asyncio.sleep(0.1) + await self.progress.add({ + "event": "blockchain.sync.claim.meta", + "data": {"step": self.claims, "total": self.claims, "unit": "claims"} + }) + + async def blockchain_sync_claim_signatures(self): + await self.generate_steps("blockchain.sync.claim.signatures", self.claims, "claims", 0.5, 1000) + + async def blockchain_sync_support_signatures(self): + await self.generate_steps("blockchain.sync.support.signatures", self.supports, "supports", 0.5, 1000) + + async def blockchain_sync_claim_stakes(self): + await self.generate_steps("blockchain.sync.claim.stakes", 1, "claims", 0.5) + + async def blockchain_sync_claim_channels(self): + await self.generate_steps("blockchain.sync.claim.channels", 0, "supports", 0.5) async def main(): - console = Advanced(FullNode(Ledger(Config(processes=4)))) - progress = console.service.sync._on_progress_controller - sim = Simulator(progress, 200_000, 7, 1_000_000, console.service.db.processes) + console = Advanced(FullNode(Ledger(Config(processes=3, spv_address_filters=False)))) + sim = Simulator(console) console.starting() - await sim.advance() + await sim.advance(True, 100_000, [1, 2, 3, 4, 5], 100_000) + await sim.advance(False, 100_001, [5], 100) console.stopping() diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index fd5b59508..d07f624b1 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -466,7 +466,13 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): "data": { "starting_height": 0, "ending_height": 352, - "sync_steps": 6, + "sync_steps": [ + "blockchain.sync.block.read", + "blockchain.sync.block.save", + "db.sync.input", + "blockchain.sync.claim.meta", + "blockchain.sync.claim.signatures", + "blockchain.sync.support.signatures"], "files": 3, "blocks": 353, "txs": 544, @@ -523,10 +529,10 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): [2, 2], ] ) - # 4 - blockchain.sync.claim.update + # 4 - blockchain.sync.claim.meta self.assertEqual( [[0, 3610], [3610, 3610]], - self.extract_events('blockchain.sync.claim.update', events) + self.extract_events('blockchain.sync.claim.meta', events) ) # 5 - blockchain.sync.claim.signatures self.assertEqual( @@ -562,7 +568,21 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): "data": { "starting_height": 353, "ending_height": 354, - "sync_steps": 14, + "sync_steps": [ + "blockchain.sync.block.read", + "blockchain.sync.block.save", + "db.sync.input", + "db.sync.claim.delete", + "db.sync.claim.insert", + "db.sync.claim.update", + "db.sync.support.delete", + "db.sync.support.insert", + "blockchain.sync.claim.trie", + "blockchain.sync.claim.meta", + "blockchain.sync.claim.signatures", + "blockchain.sync.support.signatures", + "blockchain.sync.claim.stakes", + "blockchain.sync.claim.channels"], "files": 1, "blocks": 2, "txs": 4, @@ -605,10 +625,10 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.delete', events)) # 8 - db.sync.support.insert self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.insert', events)) - # 9 - blockchain.sync.claim.takeovers - self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.takeovers', events)) - # 10 - blockchain.sync.claim.update - self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.update', events)) + # 9 - blockchain.sync.claim.trie + self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.trie', events)) + # 10 - blockchain.sync.claim.meta + self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.meta', events)) # 11 - blockchain.sync.claim.signatures self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.claim.signatures", events)) # 12 - blockchain.sync.support.signatures