From 5a0a987f0cb133f1ac59459b7a49f620a3c81bd0 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 26 Jun 2020 10:39:58 -0400 Subject: [PATCH] refactored and simplified the blockchain sync --- lbry/blockchain/sync.py | 539 ------------------ lbry/blockchain/sync/__init__.py | 1 + lbry/blockchain/sync/queries.py | 81 +++ lbry/blockchain/sync/steps.py | 383 +++++++++++++ lbry/blockchain/sync/synchronizer.py | 144 +++++ .../integration/blockchain/test_blockchain.py | 200 +++++-- 6 files changed, 752 insertions(+), 596 deletions(-) delete mode 100644 lbry/blockchain/sync.py create mode 100644 lbry/blockchain/sync/__init__.py create mode 100644 lbry/blockchain/sync/queries.py create mode 100644 lbry/blockchain/sync/steps.py create mode 100644 lbry/blockchain/sync/synchronizer.py diff --git a/lbry/blockchain/sync.py b/lbry/blockchain/sync.py deleted file mode 100644 index 8ca5837e3..000000000 --- a/lbry/blockchain/sync.py +++ /dev/null @@ -1,539 +0,0 @@ -# pylint: disable=singleton-comparison -import os -import asyncio -import logging -from contextvars import ContextVar -from typing import Optional, Tuple, Set, NamedTuple - -from sqlalchemy import func, bindparam, case, distinct, desc -from sqlalchemy.future import select - -from lbry.event import BroadcastSubscription -from lbry.service.base import Sync, BlockEvent -from lbry.db import Database, queries, TXO_TYPES, CLAIM_TYPE_CODES -from lbry.db.tables import Claim, Takeover, Support, TXO, Block as BlockTable -from lbry.db.query_context import progress, context, Event -from lbry.db.queries import rows_to_txos -from lbry.db.sync import ( - condition_spent_claims, condition_spent_supports, - select_missing_supports, process_claim_changes -) -from lbry.db.utils import least -from lbry.schema.url import normalize_name - -from .lbrycrd import Lbrycrd -from .block import Block, create_block_filter -from .bcd_data_stream import BCDataStream -from .transaction import Output - - -log = logging.getLogger(__name__) -_chain: ContextVar[Lbrycrd] = ContextVar('chain') - - -def get_or_initialize_lbrycrd(ctx=None) -> Lbrycrd: - chain = _chain.get(None) - if chain is not None: - return chain - chain = Lbrycrd((ctx or context()).ledger) - chain.db.sync_open() - _chain.set(chain) - return chain - - -def process_block_file(block_file_number: int, starting_height: int, initial_sync: bool): - ctx = context() - chain = get_or_initialize_lbrycrd(ctx) - stop = ctx.stop_event - loader = ctx.get_bulk_loader() - - with progress(Event.BLOCK_READ, 100) as p: - new_blocks = chain.db.sync_get_blocks_in_file(block_file_number, starting_height) - if not new_blocks: - return -1 - done, total, last_block_processed = 0, len(new_blocks), -1 - block_file_path = chain.get_block_file_path_from_number(block_file_number) - p.start(total, {'block_file': block_file_number}) - with open(block_file_path, 'rb') as fp: - stream = BCDataStream(fp=fp) - for done, block_info in enumerate(new_blocks, start=1): - if stop.is_set(): - return -1 - block_height = block_info['height'] - fp.seek(block_info['data_offset']) - block = Block.from_data_stream(stream, block_height, block_file_number) - loader.add_block( - block, initial_sync and chain.db.sync_get_claim_support_txo_hashes(block_height) - ) - last_block_processed = block_height - p.step(done) - - with progress(Event.BLOCK_SAVE) as p: - p.extra = {'block_file': block_file_number} - loader.save() - - return last_block_processed - - -def process_takeovers(starting_height: int, ending_height: int): - chain = get_or_initialize_lbrycrd() - with progress(Event.TAKEOVER_INSERT) as p: - p.start(chain.db.sync_get_takeover_count( - above_height=starting_height, limit_height=ending_height - )) - done, step_size = 0, 500 - for offset in range(starting_height, ending_height+1, step_size): - takeovers = chain.db.sync_get_takeovers( - above_height=offset, limit_height=min(offset+step_size, ending_height), - ) - if takeovers: - p.ctx.execute(Takeover.insert(), takeovers) - done += len(takeovers) - p.step(done) - - -class ClaimChanges(NamedTuple): - deleted_channels: Set[bytes] - channels_with_changed_content: Set[bytes] - claims_with_changed_supports: Set[bytes] - - -def process_claims_and_supports(): - with progress(Event.CLAIM_DELETE) as p: - channels_with_deleted_claims = { - r['channel_hash'] for r in p.ctx.fetchall( - select(distinct(Claim.c.channel_hash)) - .where(condition_spent_claims( - list(set(CLAIM_TYPE_CODES) - {TXO_TYPES['channel']}) - ) & (Claim.c.channel_hash != None)) - ) - } - deleted_channels = { - r['claim_hash'] for r in p.ctx.fetchall( - select(distinct(Claim.c.claim_hash)).where( - (Claim.c.claim_type == TXO_TYPES['channel']) & - condition_spent_claims([TXO_TYPES['channel']]) - ) - ) - } - p.start(1) - p.ctx.execute(Claim.delete().where(condition_spent_claims())) - - process_claim_changes() - - with progress(Event.SUPPORT_DELETE) as p: - claims_with_deleted_supports = { - r['claim_hash'] for r in p.ctx.fetchall( - select(distinct(Support.c.claim_hash)).where(condition_spent_supports) - ) - } - channels_with_deleted_supports = { - r['channel_hash'] for r in p.ctx.fetchall( - select(distinct(Support.c.channel_hash)) - .where(condition_spent_supports & (Support.c.channel_hash != None)) - ) - } - p.start(1) - sql = Support.delete().where(condition_spent_supports) - p.ctx.execute(sql) - - with progress(Event.SUPPORT_INSERT) as p: - claims_with_added_supports = set() - loader = p.ctx.get_bulk_loader() - for txo in rows_to_txos(p.ctx.fetchall(select_missing_supports)): - loader.add_support(txo) - claims_with_added_supports.add(txo.claim_hash) - loader.save() - - return ClaimChanges( - deleted_channels=deleted_channels, - channels_with_changed_content=( - channels_with_deleted_supports | channels_with_deleted_claims - ), - claims_with_changed_supports=( - claims_with_added_supports | claims_with_deleted_supports - ) - ) - - -def condition_unvalidated_signables(signable): - return ( - (signable.c.is_signature_valid == None) & - (signable.c.channel_hash != None) - ) - - -def get_unvalidated_signable_count(ctx, signable): - sql = ( - select(func.count('*').label('total')) - .select_from(signable) - .where(condition_unvalidated_signables(signable)) - ) - return ctx.fetchone(sql)['total'] - - -def select_unvalidated_signables(signable, pk, include_urls=False, include_previous=False): - sql = ( - select( - pk, signable.c.signature, signable.c.signature_digest, signable.c.channel_hash, ( - select(TXO.c.public_key).select_from(TXO) - .where( - (TXO.c.claim_hash == signable.c.channel_hash) & - (TXO.c.txo_type == TXO_TYPES['channel']) & - (TXO.c.height <= signable.c.height) - ) - .order_by(desc(TXO.c.height)) - .scalar_subquery().label('public_key') - ), - ) - .where(condition_unvalidated_signables(signable)) - ) - if include_previous: - assert signable.name != 'support', "Supports cannot be updated and don't have a previous." - sql = sql.add_columns( - select(TXO.c.channel_hash).select_from(TXO) - .where( - (TXO.c.claim_hash == signable.c.claim_hash) & - (TXO.c.txo_type.in_(CLAIM_TYPE_CODES)) & - (TXO.c.height <= signable.c.height) - ) - .order_by(desc(TXO.c.height)).offset(1) - .scalar_subquery().label('previous_channel_hash') - ) - if include_urls: - channel = Claim.alias('channel') - return sql.add_columns( - signable.c.short_url.label('claim_url'), - channel.c.short_url.label('channel_url') - ).select_from(signable.join(channel, signable.c.channel_hash == channel.c.claim_hash)) - return sql.select_from(signable) - - -def signature_validation(d: dict, row: dict, public_key) -> dict: - d['is_signature_valid'] = False - if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key): - d['is_signature_valid'] = True - return d - - -def process_signature_validation(changes: ClaimChanges): - - with progress(Event.CLAIM_SIGN) as p: - p.start(get_unvalidated_signable_count(p.ctx, Claim)) - claim_updates = [] - sql = select_unvalidated_signables( - Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None - ) - for claim in p.ctx.execute(sql): - claim_updates.append( - signature_validation({ - 'pk': claim['claim_hash'], - 'canonical_url': claim['channel_url'] + '/' + claim['claim_url'] - }, claim, claim['public_key']) - ) - if changes is not None: - changes.channels_with_changed_content.add(claim['channel_hash']) - if claim['previous_channel_hash']: - changes.channels_with_changed_content.add(claim['previous_channel_hash']) - if len(claim_updates) > 500: - p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) - p.step(len(claim_updates)) - claim_updates.clear() - if claim_updates: - p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) - del claim_updates - - with progress(Event.SUPPORT_SIGN) as p: - p.start(get_unvalidated_signable_count(p.ctx, Support)) - support_updates = [] - for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)): - support_updates.append( - signature_validation({'pk': support['txo_hash']}, support, support['public_key']) - ) - if changes is not None: - changes.channels_with_changed_content.add(support['channel_hash']) - if len(support_updates) > 500: - p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) - p.step(len(support_updates)) - support_updates.clear() - if support_updates: - p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) - del support_updates - - -def channel_content_count_calc(signable): - return ( - select(func.count('*')) - .select_from(signable) - .where((signable.c.channel_hash == Claim.c.claim_hash) & signable.c.is_signature_valid) - .scalar_subquery() - ) - - -def claim_support_aggregation(*cols): - return ( - select(*cols) - .select_from(Support) - .where(Support.c.claim_hash == Claim.c.claim_hash) - .scalar_subquery() - ) - - -def process_metadata(starting_height: int, ending_height: int, initial_sync: bool): - chain = get_or_initialize_lbrycrd() - channel = Claim.alias('channel') - stream = Claim.alias('stream') - changes = process_claims_and_supports() if not initial_sync else None - - staked_support_amount_calc = claim_support_aggregation(func.coalesce(func.sum(Support.c.amount), 0)) - staked_support_count_calc = claim_support_aggregation(func.count('*')) - - with progress(Event.CLAIM_META) as p: - p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height)) - claim_update_sql = ( - Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')) - .values( - canonical_url=case([( - ((Claim.c.canonical_url == None) & (Claim.c.channel_hash != None)), - select(channel.c.short_url).select_from(channel) - .where(channel.c.claim_hash == Claim.c.channel_hash) - .scalar_subquery() + '/' + bindparam('short_url_') - )], else_=Claim.c.canonical_url), - staked_support_amount=staked_support_amount_calc, - staked_support_count=staked_support_count_calc, - signed_claim_count=case([( - (Claim.c.claim_type == TXO_TYPES['channel']), - channel_content_count_calc(stream) - )], else_=0), - signed_support_count=case([( - (Claim.c.claim_type == TXO_TYPES['channel']), - channel_content_count_calc(Support) - )], else_=0), - ) - ) - done, step_size = 0, 500 - for offset in range(starting_height, ending_height+1, step_size): - claims = chain.db.sync_get_claim_metadata( - start_height=offset, end_height=min(offset+step_size, ending_height) - ) - if claims: - p.ctx.execute(claim_update_sql, claims) - done += len(claims) - p.step(done) - - process_signature_validation(changes) - - if not initial_sync and changes.claims_with_changed_supports: - # covered by Event.CLAIM_META during initial_sync, then only run if supports change - with progress(Event.CLAIM_CALC) as p: - p.start(len(changes.claims_with_changed_supports)) - sql = ( - Claim.update() - .where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports))) - .values( - staked_support_amount=staked_support_amount_calc, - staked_support_count=staked_support_count_calc, - ) - ) - p.ctx.execute(sql) - - if not initial_sync and changes.channels_with_changed_content: - # covered by Event.CLAIM_META during initial_sync, then only run if claims are deleted - with progress(Event.CLAIM_CALC) as p: - p.start(len(changes.channels_with_changed_content)) - sql = ( - Claim.update() - .where((Claim.c.claim_hash.in_(changes.channels_with_changed_content))) - .values( - signed_claim_count=channel_content_count_calc(stream), - signed_support_count=channel_content_count_calc(Support), - ) - ) - p.ctx.execute(sql) - - if not initial_sync: - # covered by Event.CLAIM_META during initial_sync, otherwise loop over every block - with progress(Event.CLAIM_TRIE, 100) as p: - p.start(chain.db.sync_get_takeover_count(start_height=starting_height, end_height=ending_height)) - for offset in range(starting_height, ending_height+1): - for takeover in chain.db.sync_get_takeovers(start_height=offset, end_height=offset): - update_claims = ( - Claim.update() - .where(Claim.c.normalized == normalize_name(takeover['name'].decode())) - .values( - is_controlling=case( - [(Claim.c.claim_hash == takeover['claim_hash'], True)], - else_=False - ), - takeover_height=case( - [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], - else_=None - ), - activation_height=least(Claim.c.activation_height, takeover['height']), - ) - ) - p.ctx.execute(update_claims) - p.step(1) - -# with progress(Event.SUPPORT_META) as p: -# p.start(chain.db.sync_get_support_metadata_count(start_height=starting_height, end_height=ending_height)) -# done, step_size = 0, 500 -# for offset in range(starting_height, ending_height+1, step_size): -# supports = chain.db.sync_get_support_metadata( -# start_height=offset, end_height=min(offset+step_size, ending_height) -# ) -# if supports: -# p.ctx.execute( -# Support.update().where(Support.c.txo_hash == bindparam('txo_hash_pk')), -# supports -# ) -# done += len(supports) -# p.step(done) - - -def process_block_and_tx_filters(): - - with context("effective amount update") as ctx: - blocks = [] - all_filters = [] - all_addresses = [] - for block in queries.get_blocks_without_filters(): - addresses = { - ctx.ledger.address_to_hash160(r['address']) - for r in queries.get_block_tx_addresses(block_hash=block['block_hash']) - } - all_addresses.extend(addresses) - block_filter = create_block_filter(addresses) - all_filters.append(block_filter) - blocks.append({'pk': block['block_hash'], 'block_filter': block_filter}) - # filters = [get_block_filter(f) for f in all_filters] - ctx.execute(BlockTable.update().where(BlockTable.c.block_hash == bindparam('pk')), blocks) - -# txs = [] -# for tx in queries.get_transactions_without_filters(): -# tx_filter = create_block_filter( -# {r['address'] for r in queries.get_block_tx_addresses(tx_hash=tx['tx_hash'])} -# ) -# txs.append({'pk': tx['tx_hash'], 'tx_filter': tx_filter}) -# execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs) - - -class BlockchainSync(Sync): - - def __init__(self, chain: Lbrycrd, db: Database): - super().__init__(chain.ledger, db) - self.chain = chain - self.on_block_subscription: Optional[BroadcastSubscription] = None - self.advance_loop_task: Optional[asyncio.Task] = None - self.advance_loop_event = asyncio.Event() - - async def start(self): - for _ in range(1): # range(2): - # initial sync can take a long time, new blocks may have been - # created while sync was running; therefore, run a second sync - # after first one finishes to possibly sync those new blocks. - # run advance as a task so that it can be stop()'ed if necessary. - self.advance_loop_task = asyncio.create_task( - self.advance(await self.db.needs_initial_sync()) - ) - await self.advance_loop_task - self.chain.subscribe() - self.advance_loop_task = asyncio.create_task(self.advance_loop()) - self.on_block_subscription = self.chain.on_block.listen( - lambda e: self.advance_loop_event.set() - ) - - async def stop(self): - self.chain.unsubscribe() - if self.on_block_subscription is not None: - self.on_block_subscription.cancel() - self.db.stop_event.set() - if self.advance_loop_task is not None: - self.advance_loop_task.cancel() - - async def run(self, f, *args): - return await asyncio.get_running_loop().run_in_executor( - self.db.executor, f, *args - ) - - async def load_blocks(self, initial_sync: bool) -> Optional[Tuple[int, int]]: - tasks = [] - starting_height, ending_height = None, await self.chain.db.get_best_height() - tx_count = block_count = 0 - for chain_file in await self.chain.db.get_block_files(): - # block files may be read and saved out of order, need to check - # each file individually to see if we have missing blocks - our_best_file_height = await self.db.get_best_block_height_for_file(chain_file['file_number']) - if our_best_file_height == chain_file['best_height']: - # we have all blocks in this file, skipping - continue - if -1 < our_best_file_height < chain_file['best_height']: - # we have some blocks, need to figure out what we're missing - # call get_block_files again limited to this file and current_height - chain_file = (await self.chain.db.get_block_files( - file_number=chain_file['file_number'], start_height=our_best_file_height+1 - ))[0] - tx_count += chain_file['txs'] - block_count += chain_file['blocks'] - starting_height = min( - our_best_file_height+1 if starting_height is None else starting_height, our_best_file_height+1 - ) - tasks.append(self.run( - process_block_file, chain_file['file_number'], our_best_file_height+1, initial_sync - )) - if not tasks: - return - await self._on_progress_controller.add({ - "event": "blockchain.sync.start", - "data": { - "starting_height": starting_height, - "ending_height": ending_height, - "files": len(tasks), - "blocks": block_count, - "txs": tx_count - } - }) - done, pending = await asyncio.wait( - tasks, return_when=asyncio.FIRST_EXCEPTION - ) - if pending: - self.db.stop_event.set() - for future in pending: - future.cancel() - for future in done: - future.result() - return - best_height_processed = max(f.result() for f in done) - # putting event in queue instead of add to progress_controller because - # we want this message to appear after all of the queued messages from workers - self.db.message_queue.put(( - Event.BLOCK_DONE.value, os.getpid(), - len(done), len(tasks), - {"best_height_processed": best_height_processed} - )) - return starting_height, best_height_processed - - async def process(self, starting_height: int, ending_height: int, initial_sync: bool): - await self.db.process_inputs_outputs() - await self.run(process_metadata, starting_height, ending_height, initial_sync) - if self.conf.spv_address_filters: - await self.run(process_block_and_tx_filters) - - async def advance(self, initial_sync=False): - heights = await self.load_blocks(initial_sync) - if heights: - starting_height, ending_height = heights - await self.process(starting_height, ending_height, initial_sync) - await self._on_block_controller.add(BlockEvent(ending_height)) - - async def advance_loop(self): - while True: - await self.advance_loop_event.wait() - self.advance_loop_event.clear() - try: - await self.advance() - except asyncio.CancelledError: - return - except Exception as e: - log.exception(e) - await self.stop() diff --git a/lbry/blockchain/sync/__init__.py b/lbry/blockchain/sync/__init__.py new file mode 100644 index 000000000..06c5770a4 --- /dev/null +++ b/lbry/blockchain/sync/__init__.py @@ -0,0 +1 @@ +from .synchronizer import BlockchainSync diff --git a/lbry/blockchain/sync/queries.py b/lbry/blockchain/sync/queries.py new file mode 100644 index 000000000..f04a224d6 --- /dev/null +++ b/lbry/blockchain/sync/queries.py @@ -0,0 +1,81 @@ +# pylint: disable=singleton-comparison +from sqlalchemy import func, desc +from sqlalchemy.future import select + +from lbry.db import TXO_TYPES, CLAIM_TYPE_CODES +from lbry.db.tables import Claim, Support, TXO + + +def condition_unvalidated_signables(signable): + return ( + (signable.c.is_signature_valid == None) & + (signable.c.channel_hash != None) + ) + + +def get_unvalidated_signable_count(ctx, signable): + sql = ( + select(func.count('*').label('total')) + .select_from(signable) + .where(condition_unvalidated_signables(signable)) + ) + return ctx.fetchone(sql)['total'] + + +def select_unvalidated_signables(signable, pk, include_urls=False, include_previous=False): + sql = ( + select( + pk, signable.c.signature, signable.c.signature_digest, signable.c.channel_hash, ( + select(TXO.c.public_key).select_from(TXO) + .where( + (TXO.c.claim_hash == signable.c.channel_hash) & + (TXO.c.txo_type == TXO_TYPES['channel']) & + (TXO.c.height <= signable.c.height) + ) + .order_by(desc(TXO.c.height)) + .scalar_subquery().label('public_key') + ), + ) + .where(condition_unvalidated_signables(signable)) + ) + if include_previous: + assert signable.name != 'support', "Supports cannot be updated and don't have a previous." + sql = sql.add_columns( + select(TXO.c.channel_hash).select_from(TXO) + .where( + (TXO.c.claim_hash == signable.c.claim_hash) & + (TXO.c.txo_type.in_(CLAIM_TYPE_CODES)) & + (TXO.c.height <= signable.c.height) + ) + .order_by(desc(TXO.c.height)).offset(1) + .scalar_subquery().label('previous_channel_hash') + ) + if include_urls: + channel = Claim.alias('channel') + return sql.add_columns( + signable.c.short_url.label('claim_url'), + channel.c.short_url.label('channel_url') + ).select_from(signable.join(channel, signable.c.channel_hash == channel.c.claim_hash)) + return sql.select_from(signable) + + +def channel_content_count_calc(signable): + return ( + select(func.count('*')) + .select_from(signable) + .where((signable.c.channel_hash == Claim.c.claim_hash) & signable.c.is_signature_valid) + .scalar_subquery() + ) + + +def claim_support_aggregation(*cols): + return ( + select(*cols) + .select_from(Support) + .where(Support.c.claim_hash == Claim.c.claim_hash) + .scalar_subquery() + ) + + +staked_support_amount_calc = claim_support_aggregation(func.coalesce(func.sum(Support.c.amount), 0)) +staked_support_count_calc = claim_support_aggregation(func.count('*')) diff --git a/lbry/blockchain/sync/steps.py b/lbry/blockchain/sync/steps.py new file mode 100644 index 000000000..6ab03d52c --- /dev/null +++ b/lbry/blockchain/sync/steps.py @@ -0,0 +1,383 @@ +# pylint: disable=singleton-comparison +import logging +import functools +from contextvars import ContextVar +from typing import Set + +from sqlalchemy import bindparam, case, distinct + +from lbry.db import queries +from lbry.db.tables import Block as BlockTable +from lbry.db.query_context import progress, context, Event +from lbry.db.queries import rows_to_txos +from lbry.db.sync import ( + select_missing_supports, + select_missing_claims, select_stale_claims, + condition_spent_claims, condition_spent_supports, + set_input_addresses, update_spent_outputs +) +from lbry.db.utils import least +from lbry.schema.url import normalize_name + +from lbry.blockchain.lbrycrd import Lbrycrd +from lbry.blockchain.block import Block, create_block_filter +from lbry.blockchain.bcd_data_stream import BCDataStream +from lbry.blockchain.transaction import Output + +from .queries import * + + +log = logging.getLogger(__name__) +_chain: ContextVar[Lbrycrd] = ContextVar('chain') + + +SYNC_STEPS = { + "initial_sync": 0, + "ongoing_sync": 0, + "events": [], +} + + +def sync_step(event: Event, step_size=1, initial_sync=False, ongoing_sync=False): + assert event not in SYNC_STEPS['events'], f"Event {event} used more than once." + assert initial_sync or ongoing_sync, "At least one of initial_sync or ongoing_sync must be true." + if initial_sync: + SYNC_STEPS['initial_sync'] += 1 + if ongoing_sync: + SYNC_STEPS['ongoing_sync'] += 1 + + def wrapper(f): + @functools.wraps(f) + def with_progress(*args, **kwargs): + with progress(event, step_size=step_size) as p: + return f(*args, **kwargs, p=p) + return with_progress + + return wrapper + + +class ClaimChanges: + deleted_channels: Set[bytes] + channels_with_changed_content: Set[bytes] + claims_with_changed_supports: Set[bytes] + + def __init__(self): + self.deleted_channels = set() + self.channels_with_changed_content = set() + self.claims_with_changed_supports = set() + + +def get_or_initialize_lbrycrd(ctx=None) -> Lbrycrd: + chain = _chain.get(None) + if chain is not None: + return chain + chain = Lbrycrd((ctx or context()).ledger) + chain.db.sync_open() + _chain.set(chain) + return chain + + +def process_block_file(block_file_number: int, starting_height: int, initial_sync: bool): + ctx = context() + loader = ctx.get_bulk_loader() + last_block_processed = process_block_read(block_file_number, starting_height, initial_sync, loader) + process_block_save(block_file_number, loader) + return last_block_processed + + +def process_metadata(starting_height: int, ending_height: int, initial_sync: bool): + chain = get_or_initialize_lbrycrd() + process_inputs_outputs() + changes = None + if not initial_sync: + changes = ClaimChanges() + process_claim_delete(changes) + process_claim_insert(changes) + process_claim_update(changes) + process_support_delete(changes) + process_support_insert(changes) + process_takeovers(starting_height, ending_height, chain) + process_claim_metadata(starting_height, ending_height, chain) + process_claim_signatures(changes) + process_support_signatures(changes) + if not initial_sync: + # these depend on signature validation + process_stake_calc(changes) + process_channel_content(changes) + + +@sync_step(Event.BLOCK_READ, step_size=100, initial_sync=True, ongoing_sync=True) +def process_block_read(block_file_number: int, starting_height: int, initial_sync: bool, loader, p=None): + chain = get_or_initialize_lbrycrd(p.ctx) + stop = p.ctx.stop_event + new_blocks = chain.db.sync_get_blocks_in_file(block_file_number, starting_height) + if not new_blocks: + return -1 + done, total, last_block_processed = 0, len(new_blocks), -1 + block_file_path = chain.get_block_file_path_from_number(block_file_number) + p.start(total, {'block_file': block_file_number}) + with open(block_file_path, 'rb') as fp: + stream = BCDataStream(fp=fp) + for done, block_info in enumerate(new_blocks, start=1): + if stop.is_set(): + return -1 + block_height = block_info['height'] + fp.seek(block_info['data_offset']) + block = Block.from_data_stream(stream, block_height, block_file_number) + loader.add_block( + block, initial_sync and chain.db.sync_get_claim_support_txo_hashes(block_height) + ) + last_block_processed = block_height + p.step(done) + return last_block_processed + + +@sync_step(Event.BLOCK_SAVE, initial_sync=True, ongoing_sync=True) +def process_block_save(block_file_number: int, loader, p=None): + p.extra = {'block_file': block_file_number} + loader.save() + + +@sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True) +def process_block_filters(p=None): + blocks = [] + all_filters = [] + all_addresses = [] + for block in queries.get_blocks_without_filters(): + addresses = { + p.ctx.ledger.address_to_hash160(r['address']) + for r in queries.get_block_tx_addresses(block_hash=block['block_hash']) + } + all_addresses.extend(addresses) + block_filter = create_block_filter(addresses) + all_filters.append(block_filter) + blocks.append({'pk': block['block_hash'], 'block_filter': block_filter}) + # filters = [get_block_filter(f) for f in all_filters] + p.ctx.execute(BlockTable.update().where(BlockTable.c.block_hash == bindparam('pk')), blocks) + +# txs = [] +# for tx in queries.get_transactions_without_filters(): +# tx_filter = create_block_filter( +# {r['address'] for r in queries.get_block_tx_addresses(tx_hash=tx['tx_hash'])} +# ) +# txs.append({'pk': tx['tx_hash'], 'tx_filter': tx_filter}) +# execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs) + + +def signature_validation(d: dict, row: dict, public_key) -> dict: + d['is_signature_valid'] = False + if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key): + d['is_signature_valid'] = True + return d + + +@sync_step(Event.CLAIM_SIGN, initial_sync=True, ongoing_sync=True) +def process_claim_signatures(changes: ClaimChanges, p=None): + p.start(get_unvalidated_signable_count(p.ctx, Claim)) + claim_updates = [] + sql = select_unvalidated_signables( + Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None + ) + steps = 0 + for claim in p.ctx.execute(sql): + claim_updates.append( + signature_validation({ + 'pk': claim['claim_hash'], + 'canonical_url': claim['channel_url'] + '/' + claim['claim_url'] + }, claim, claim['public_key']) + ) + if changes is not None: + changes.channels_with_changed_content.add(claim['channel_hash']) + if claim['previous_channel_hash']: + changes.channels_with_changed_content.add(claim['previous_channel_hash']) + if len(claim_updates) > 500: + p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) + steps += len(claim_updates) + p.step(steps) + claim_updates.clear() + if claim_updates: + p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) + + +@sync_step(Event.SUPPORT_SIGN, initial_sync=True, ongoing_sync=True) +def process_support_signatures(changes: ClaimChanges, p=None): + p.start(get_unvalidated_signable_count(p.ctx, Support)) + support_updates = [] + for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)): + support_updates.append( + signature_validation({'pk': support['txo_hash']}, support, support['public_key']) + ) + if changes is not None: + changes.channels_with_changed_content.add(support['channel_hash']) + if len(support_updates) > 500: + p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) + p.step(len(support_updates)) + support_updates.clear() + if support_updates: + p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) + + +@sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True) +def process_inputs_outputs(p=None): + p.start(2) + # 1. Update TXIs to have the address of TXO they are spending. + set_input_addresses(p.ctx) + p.step(1) + # 2. Update spent TXOs setting is_spent = True + update_spent_outputs(p.ctx) + p.step(2) + + +@sync_step(Event.CLAIM_META, initial_sync=True, ongoing_sync=True) +def process_claim_metadata(starting_height: int, ending_height: int, chain, p=None): + channel = Claim.alias('channel') + stream = Claim.alias('stream') + p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height)) + claim_update_sql = ( + Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')) + .values( + canonical_url=case([( + ((Claim.c.canonical_url == None) & (Claim.c.channel_hash != None)), + select(channel.c.short_url).select_from(channel) + .where(channel.c.claim_hash == Claim.c.channel_hash) + .scalar_subquery() + '/' + bindparam('short_url_') + )], else_=Claim.c.canonical_url), + staked_support_amount=staked_support_amount_calc, + staked_support_count=staked_support_count_calc, + signed_claim_count=case([( + (Claim.c.claim_type == TXO_TYPES['channel']), + channel_content_count_calc(stream) + )], else_=0), + signed_support_count=case([( + (Claim.c.claim_type == TXO_TYPES['channel']), + channel_content_count_calc(Support) + )], else_=0), + ) + ) + done, step_size = 0, 500 + for offset in range(starting_height, ending_height + 1, step_size): + claims = chain.db.sync_get_claim_metadata( + start_height=offset, end_height=min(offset + step_size, ending_height) + ) + if claims: + p.ctx.execute(claim_update_sql, claims) + done += len(claims) + p.step(done) + + +@sync_step(Event.STAKE_CALC, ongoing_sync=True) +def process_stake_calc(changes: ClaimChanges, p=None): + p.start(len(changes.claims_with_changed_supports)) + sql = ( + Claim.update() + .where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports))) + .values( + staked_support_amount=staked_support_amount_calc, + staked_support_count=staked_support_count_calc, + ) + ) + p.ctx.execute(sql) + + +@sync_step(Event.CLAIM_CHAN, ongoing_sync=True) +def process_channel_content(changes: ClaimChanges, p=None): + p.start(len(changes.channels_with_changed_content)) + stream = Claim.alias('stream') + sql = ( + Claim.update() + .where((Claim.c.claim_hash.in_(changes.channels_with_changed_content))) + .values( + signed_claim_count=channel_content_count_calc(stream), + signed_support_count=channel_content_count_calc(Support), + ) + ) + p.ctx.execute(sql) + + +@sync_step(Event.CLAIM_TRIE, step_size=100, ongoing_sync=True) +def process_takeovers(starting_height: int, ending_height: int, chain, p=None): + p.start(chain.db.sync_get_takeover_count(start_height=starting_height, end_height=ending_height)) + for offset in range(starting_height, ending_height + 1): + for takeover in chain.db.sync_get_takeovers(start_height=offset, end_height=offset): + update_claims = ( + Claim.update() + .where(Claim.c.normalized == normalize_name(takeover['name'].decode())) + .values( + is_controlling=case( + [(Claim.c.claim_hash == takeover['claim_hash'], True)], + else_=False + ), + takeover_height=case( + [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], + else_=None + ), + activation_height=least(Claim.c.activation_height, takeover['height']), + ) + ) + p.ctx.execute(update_claims) + p.step(1) + + +@sync_step(Event.CLAIM_DELETE, ongoing_sync=True) +def process_claim_delete(changes: ClaimChanges, p=None): + changes.channels_with_changed_content |= { + r['channel_hash'] for r in p.ctx.fetchall( + select(distinct(Claim.c.channel_hash)) + .where(condition_spent_claims( + list(set(CLAIM_TYPE_CODES) - {TXO_TYPES['channel']}) + ) & (Claim.c.channel_hash != None)) + ) + } + changes.deleted_channels |= { + r['claim_hash'] for r in p.ctx.fetchall( + select(distinct(Claim.c.claim_hash)).where( + (Claim.c.claim_type == TXO_TYPES['channel']) & + condition_spent_claims([TXO_TYPES['channel']]) + ) + ) + } + p.start(1) + p.ctx.execute(Claim.delete().where(condition_spent_claims())) + + +@sync_step(Event.CLAIM_INSERT, ongoing_sync=True) +def process_claim_insert(_, p=None): + loader = p.ctx.get_bulk_loader() + for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)): + loader.add_claim(claim) + loader.save() + + +@sync_step(Event.CLAIM_UPDATE, ongoing_sync=True) +def process_claim_update(_, p=None): + loader = p.ctx.get_bulk_loader() + for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)): + loader.update_claim(claim) + loader.save() + + +@sync_step(Event.SUPPORT_DELETE, ongoing_sync=True) +def process_support_delete(changes: ClaimChanges, p=None): + changes.claims_with_changed_supports |= { + r['claim_hash'] for r in p.ctx.fetchall( + select(distinct(Support.c.claim_hash)).where(condition_spent_supports) + ) + } + changes.channels_with_changed_content |= { + r['channel_hash'] for r in p.ctx.fetchall( + select(distinct(Support.c.channel_hash)) + .where(condition_spent_supports & (Support.c.channel_hash != None)) + ) + } + p.start(1) + sql = Support.delete().where(condition_spent_supports) + p.ctx.execute(sql) + + +@sync_step(Event.SUPPORT_INSERT, ongoing_sync=True) +def process_support_insert(changes: ClaimChanges, p=None): + loader = p.ctx.get_bulk_loader() + for txo in rows_to_txos(p.ctx.fetchall(select_missing_supports)): + loader.add_support(txo) + changes.claims_with_changed_supports.add(txo.claim_hash) + loader.save() diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py new file mode 100644 index 000000000..a3b8cb0fb --- /dev/null +++ b/lbry/blockchain/sync/synchronizer.py @@ -0,0 +1,144 @@ +import os +import asyncio +import logging +from typing import Optional, Tuple + +from lbry.db import Database +from lbry.db.query_context import Event +from lbry.event import BroadcastSubscription +from lbry.service.base import Sync, BlockEvent +from lbry.blockchain.lbrycrd import Lbrycrd + +from .steps import ( + SYNC_STEPS, + process_block_file, + process_block_filters, + process_metadata, +) + + +log = logging.getLogger(__name__) + + +class BlockchainSync(Sync): + + def __init__(self, chain: Lbrycrd, db: Database): + super().__init__(chain.ledger, db) + self.chain = chain + self.on_block_subscription: Optional[BroadcastSubscription] = None + self.advance_loop_task: Optional[asyncio.Task] = None + self.advance_loop_event = asyncio.Event() + + async def start(self): + for _ in range(1): # range(2): + # initial sync can take a long time, new blocks may have been + # created while sync was running; therefore, run a second sync + # after first one finishes to possibly sync those new blocks. + # run advance as a task so that it can be stop()'ed if necessary. + self.advance_loop_task = asyncio.create_task( + self.advance(await self.db.needs_initial_sync()) + ) + await self.advance_loop_task + self.chain.subscribe() + self.advance_loop_task = asyncio.create_task(self.advance_loop()) + self.on_block_subscription = self.chain.on_block.listen( + lambda e: self.advance_loop_event.set() + ) + + async def stop(self): + self.chain.unsubscribe() + if self.on_block_subscription is not None: + self.on_block_subscription.cancel() + self.db.stop_event.set() + if self.advance_loop_task is not None: + self.advance_loop_task.cancel() + + async def run(self, f, *args): + return await asyncio.get_running_loop().run_in_executor( + self.db.executor, f, *args + ) + + async def load_blocks(self, sync_steps: int, initial_sync: bool) -> Optional[Tuple[int, int]]: + tasks = [] + starting_height, ending_height = None, await self.chain.db.get_best_height() + tx_count = block_count = 0 + for chain_file in await self.chain.db.get_block_files(): + # block files may be read and saved out of order, need to check + # each file individually to see if we have missing blocks + our_best_file_height = await self.db.get_best_block_height_for_file(chain_file['file_number']) + if our_best_file_height == chain_file['best_height']: + # we have all blocks in this file, skipping + continue + if -1 < our_best_file_height < chain_file['best_height']: + # we have some blocks, need to figure out what we're missing + # call get_block_files again limited to this file and current_height + chain_file = (await self.chain.db.get_block_files( + file_number=chain_file['file_number'], start_height=our_best_file_height+1 + ))[0] + tx_count += chain_file['txs'] + block_count += chain_file['blocks'] + starting_height = min( + our_best_file_height+1 if starting_height is None else starting_height, our_best_file_height+1 + ) + tasks.append(self.run( + process_block_file, chain_file['file_number'], our_best_file_height+1, initial_sync + )) + if not tasks: + return + await self._on_progress_controller.add({ + "event": Event.START.label, + "data": { + "starting_height": starting_height, + "ending_height": ending_height, + "sync_steps": sync_steps, + "files": len(tasks), + "blocks": block_count, + "txs": tx_count, + "claims": await self.chain.db.get_claim_metadata_count(starting_height, ending_height), + "supports": await self.chain.db.get_support_metadata_count(starting_height, ending_height), + } + }) + done, pending = await asyncio.wait( + tasks, return_when=asyncio.FIRST_EXCEPTION + ) + if pending: + self.db.stop_event.set() + for future in pending: + future.cancel() + for future in done: + future.result() + return + best_height_processed = max(f.result() for f in done) + return starting_height, best_height_processed + + def count_steps(self, initial_sync: bool): + if initial_sync: + sync_steps = SYNC_STEPS['initial_sync'] + else: + sync_steps = SYNC_STEPS['ongoing_sync'] + if not self.conf.spv_address_filters: + sync_steps -= 1 + return sync_steps + + async def advance(self, initial_sync=False): + sync_steps = self.count_steps(initial_sync) + heights = await self.load_blocks(sync_steps, initial_sync) + if heights: + starting_height, ending_height = heights + await self.run(process_metadata, starting_height, ending_height, initial_sync) + if self.conf.spv_address_filters: + await self.run(process_block_filters) + await self._on_block_controller.add(BlockEvent(ending_height)) + self.db.message_queue.put((Event.COMPLETE.value, os.getpid(), sync_steps, sync_steps)) + + async def advance_loop(self): + while True: + await self.advance_loop_event.wait() + self.advance_loop_event.clear() + try: + await self.advance() + except asyncio.CancelledError: + return + except Exception as e: + log.exception(e) + await self.stop() diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 27d909f8c..fd5b59508 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -17,7 +17,7 @@ from lbry.blockchain.dewies import dewies_to_lbc, lbc_to_dewies from lbry.constants import CENT, COIN from lbry.testcase import AsyncioTestCase -import logging + #logging.getLogger('lbry.blockchain').setLevel(logging.DEBUG) @@ -43,6 +43,18 @@ class BasicBlockchainTestCase(AsyncioTestCase): self.addCleanup(db.close) return db + @staticmethod + def find_claim_txo(tx) -> Optional[Output]: + for txo in tx.outputs: + if txo.is_claim: + return txo + + @staticmethod + def find_support_txo(tx) -> Optional[Output]: + for txo in tx.outputs: + if txo.is_support: + return txo + class SyncingBlockchainTestCase(BasicBlockchainTestCase): @@ -74,12 +86,6 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): async def get_last_block(self): return await self.chain.get_block(self.last_block_hash) - @staticmethod - def find_claim_txo(tx) -> Optional[Output]: - for txo in tx.outputs: - if txo.is_claim: - return txo - async def get_claim(self, txid: str) -> Output: raw = await self.chain.get_raw_transaction(txid) tx = Transaction(unhexlify(raw)) @@ -88,12 +94,6 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): txo.private_key = self.channel_keys.get(txo.claim_hash) return txo - @staticmethod - def find_support_txo(tx) -> Optional[Output]: - for txo in tx.outputs: - if txo.is_support: - return txo - async def get_support(self, txid: str) -> Output: raw = await self.chain.get_raw_transaction(txid) return self.find_support_txo(Transaction(unhexlify(raw))) @@ -339,6 +339,8 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): await self.chain.send_raw_transaction(signed['hex']) await self.chain.generate(1) + # supports \w data aren't supported until block 350, fast forward a little + await self.chain.generate(60) claim = tx.outputs[0] tx = Transaction().add_outputs([ Output.pay_support_pubkey_hash(CENT, claim.claim_name, claim.claim_id, address), @@ -357,11 +359,18 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): copy_tree(self.chain.ledger.conf.lbrycrd_dir, self.TEST_DATA_CACHE_DIR) await self.chain.start(*self.LBRYCRD_ARGS) + @staticmethod + def extract_block_events(name, events): + return sorted([ + [p['data']['block_file'], p['data']['step'], p['data']['total']] + for p in events if p['event'] == name + ]) + @staticmethod def extract_events(name, events): return sorted([ - [p['data'].get('block_file'), p['data']['step'], p['data']['total']] - for p in events if p['event'].endswith(name) + [p['data']['step'], p['data']['total']] + for p in events if p['event'] == name ]) def assertEventsAlmostEqual(self, actual, expected): @@ -378,11 +387,11 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): db = self.chain.db # get_best_height - self.assertEqual(292, await db.get_best_height()) + self.assertEqual(352, await db.get_best_height()) # get_block_files self.assertEqual( - [(0, 191, 280), (1, 89, 178), (2, 13, 26)], + [(0, 191, 280), (1, 89, 178), (2, 73, 86)], [(file['file_number'], file['blocks'], file['txs']) for file in await db.get_block_files()] ) @@ -430,7 +439,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) # get_support_metadata_count - self.assertEqual(1, await db.get_support_metadata_count(0, 500)) + self.assertEqual(2, await db.get_support_metadata_count(0, 500)) self.assertEqual(0, await db.get_support_metadata_count(500, 1000)) def foo(c): @@ -438,7 +447,8 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): # get_support_metadata self.assertEqual( - [{'name': b'two', 'activation_height': 297, 'expiration_height': 792}], + [{'name': b'two', 'activation_height': 359, 'expiration_height': 852}, + {'name': b'two', 'activation_height': 359, 'expiration_height': 852}], [{'name': c['name'], 'activation_height': c['activation_height'], 'expiration_height': c['expiration_height']} for c in await db.get_support_metadata(0, 500)] ) @@ -447,92 +457,168 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): events = [] self.sync.on_progress.listen(events.append) - await self.sync.advance() + # initial_sync = True + await self.sync.advance(True) await asyncio.sleep(1) # give it time to collect events self.assertEqual( events[0], { "event": "blockchain.sync.start", "data": { "starting_height": 0, - "ending_height": 292, + "ending_height": 352, + "sync_steps": 6, "files": 3, - "blocks": 293, - "txs": 484 + "blocks": 353, + "txs": 544, + "claims": 3610, + "supports": 2, } } ) + # 1 - blockchain.sync.block.read self.assertEqual( - self.extract_events('block.read', events), [ + self.extract_block_events('blockchain.sync.block.read', events), [ [0, 0, 191], [0, 100, 191], [0, 191, 191], [1, 0, 89], [1, 89, 89], - [2, 0, 13], - [2, 13, 13], + [2, 0, 73], + [2, 73, 73], ] ) + # 2 - blockchain.sync.block.save self.assertEventsAlmostEqual( - self.extract_events('block.save', events), [ + self.extract_block_events('blockchain.sync.block.save', events), [ [0, 0, 280], - [0, 19, 280], - [0, 47, 280], - [0, 267, 280], - [0, 278, 280], + [0, 5, 280], + [0, 13, 280], + [0, 76, 280], + [0, 79, 280], + [0, 128, 280], + [0, 277, 280], [0, 280, 280], [1, 0, 178], - [1, 6, 178], - [1, 19, 178], - [1, 166, 178], - [1, 175, 178], + [1, 1, 178], + [1, 4, 178], + [1, 42, 178], + [1, 44, 178], + [1, 77, 178], + [1, 176, 178], [1, 178, 178], - [2, 0, 26], - [2, 1, 26], - [2, 3, 26], - [2, 24, 26], - [2, 26, 26], - [2, 26, 26] + [2, 0, 86], + [2, 5, 86], + [2, 9, 86], + [2, 31, 86], + [2, 44, 86], + [2, 83, 86], + [2, 86, 86], ] ) - claim_events = self.extract_events('claim.insert', events) - self.assertEqual([3402, 3610], claim_events[2][1:]) - self.assertEqual([3610, 3610], claim_events[-1][1:]) + # 3 - db.sync.input + self.assertEventsAlmostEqual( + self.extract_events('db.sync.input', events), [ + [0, 2], + [1, 2], + [2, 2], + ] + ) + # 4 - blockchain.sync.claim.update + self.assertEqual( + [[0, 3610], [3610, 3610]], + self.extract_events('blockchain.sync.claim.update', events) + ) + # 5 - blockchain.sync.claim.signatures + self.assertEqual( + [[0, 0]], self.extract_events('blockchain.sync.claim.signatures', events) + ) + # 6 - blockchain.sync.support.signatures + self.assertEqual( + [[0, 0]], self.extract_events('blockchain.sync.support.signatures', events) + ) + # Complete + self.assertEqual( + [[6, 6]], self.extract_events('blockchain.sync.complete', events) + ) + # initial_sync = False & no new blocks events.clear() await self.sync.advance() # should be no-op await asyncio.sleep(1) # give it time to collect events self.assertListEqual([], events) + # initial_sync = False + txid = await self.chain.claim_name('foo', 'beef', '0.01') + await self.chain.generate(1) + tx = Transaction(unhexlify(await self.chain.get_raw_transaction(txid))) + txo = self.find_claim_txo(tx) + await self.chain.support_claim('foo', txo.claim_id, '0.01') await self.chain.generate(1) - - events.clear() - await self.sync.advance() await asyncio.sleep(1) # give it time to collect events self.assertEqual( events[0], { "event": "blockchain.sync.start", "data": { - "starting_height": 293, - "ending_height": 293, + "starting_height": 353, + "ending_height": 354, + "sync_steps": 14, "files": 1, - "blocks": 1, - "txs": 1 + "blocks": 2, + "txs": 4, + "claims": 1, + "supports": 1, } } ) + # 1 - blockchain.sync.block.read self.assertEqual( - self.extract_events('block.read', events), [ - [2, 0, 1], - [2, 1, 1], + self.extract_block_events('blockchain.sync.block.read', events), [ + [2, 0, 2], + [2, 2, 2], ] ) + # 2 - blockchain.sync.block.save self.assertEqual( - self.extract_events('block.save', events), [ - [2, 0, 1], - [2, 1, 1], + self.extract_block_events('blockchain.sync.block.save', events), [ + [2, 0, 4], + [2, 1, 4], + [2, 3, 4], + [2, 4, 4], ] ) + # 3 - db.sync.input + self.assertEqual( + self.extract_events('db.sync.input', events), [ + [0, 2], # started + [1, 2], # updating addresses on inputs + [2, 2], # connect spent outputs to their inputs + ] + ) + # 4 - db.sync.claim.delete + self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.claim.delete', events)) + # 5 - db.sync.claim.insert + self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.claim.insert', events)) + # 6 - db.sync.claim.update + self.assertEqual([[0, 0]], self.extract_events('db.sync.claim.update', events)) + # 7 - db.sync.support.delete + self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.delete', events)) + # 8 - db.sync.support.insert + self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.insert', events)) + # 9 - blockchain.sync.claim.takeovers + self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.takeovers', events)) + # 10 - blockchain.sync.claim.update + self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.update', events)) + # 11 - blockchain.sync.claim.signatures + self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.claim.signatures", events)) + # 12 - blockchain.sync.support.signatures + self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.support.signatures", events)) + # 13 - blockchain.sync.claim.stakes + self.assertEqual([[0, 1], [1, 1]], self.extract_events("blockchain.sync.claim.stakes", events)) + # 14 - blockchain.sync.claim.channels + self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.claim.channels", events)) + # Complete + self.assertEqual([[14, 14]], self.extract_events('blockchain.sync.complete', events)) class TestGeneralBlockchainSync(SyncingBlockchainTestCase):