diff --git a/lbry/blockchain/database.py b/lbry/blockchain/database.py index 7843995ef..4a09aacdf 100644 --- a/lbry/blockchain/database.py +++ b/lbry/blockchain/database.py @@ -56,7 +56,7 @@ class BlockchainDB: ) self.connection.create_aggregate("find_shortest_id", 2, FindShortestID) #self.connection.execute( - # "CREATE INDEX IF NOT EXISTS claim_originalheight ON claim (originalheight);" + # "CREATE INDEX IF NOT EXISTS claim_originalheight_claimid ON claim (originalheight, claimid);" #) self.connection.row_factory = sqlite3.Row @@ -205,7 +205,7 @@ class BlockchainDB: ) AS shortestID FROM claim WHERE originalHeight BETWEEN ? AND ? - ORDER BY originalHeight + ORDER BY originalHeight, claimid """, (start_height, end_height) )] diff --git a/lbry/blockchain/sync.py b/lbry/blockchain/sync.py index 768fa2953..2f5cf7719 100644 --- a/lbry/blockchain/sync.py +++ b/lbry/blockchain/sync.py @@ -4,7 +4,7 @@ import logging from contextvars import ContextVar from typing import Optional, Tuple, Set, NamedTuple -from sqlalchemy import func, bindparam, case, distinct, between +from sqlalchemy import func, bindparam, case, distinct, desc from sqlalchemy.future import select from lbry.event import BroadcastSubscription @@ -15,7 +15,7 @@ from lbry.db.query_context import progress, context, Event from lbry.db.queries import rows_to_txos from lbry.db.sync import ( condition_spent_claims, condition_spent_supports, - select_missing_claims, select_stale_claims, select_missing_supports + select_missing_supports, process_claim_changes ) from lbry.schema.url import normalize_name @@ -90,79 +90,9 @@ def process_takeovers(starting_height: int, ending_height: int): p.step(done) -def signature_validation(d: dict, row: dict, public_key) -> dict: - d['is_signature_valid'] = False - if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key): - d['is_signature_valid'] = True - return d - - -def select_updated_channel_keys(starting_height, ending_height, *cols): - return ( - select(*cols).select_from(Claim) - .where( - (Claim.c.claim_type == TXO_TYPES['channel']) & - between(Claim.c.public_key_height, starting_height, ending_height) - ) - ) - - -def get_updated_channel_key_count(ctx, starting_height, ending_height): - sql = select_updated_channel_keys( - starting_height, ending_height, func.count('*').label('total') - ) - return ctx.fetchone(sql)['total'] - - -def get_updated_channel_keys(ctx, starting_height, ending_height): - sql = select_updated_channel_keys( - starting_height, ending_height, - Claim.c.claim_hash, Claim.c.public_key, Claim.c.height - ) - return ctx.fetchall(sql) - - -def get_signables_for_channel(ctx, table, pk, channel): - sql = ( - select(pk, table.c.signature, table.c.signature_digest) - .where(table.c.channel_hash == channel['claim_hash']) - ) - return ctx.fetchall(sql) - - -def select_unvalidated_signables(signable, starting_height: int, ending_height: int, *cols): - channel = Claim.alias('channel') - if len(cols) > 1: - cols += (channel.c.public_key,) - return ( - select(*cols) - .select_from(signable.join(channel, signable.c.channel_hash == channel.c.claim_hash)) - .where( - (signable.c.signature != None) & - (signable.c.is_signature_valid == False) & - between(signable.c.height, starting_height, ending_height) - ) - ) - - -def get_unvalidated_signable_count(ctx, signable, starting_height: int, ending_height: int): - sql = select_unvalidated_signables( - signable, starting_height, ending_height, func.count('*').label('total') - ) - return ctx.fetchone(sql)['total'] - - -def get_unvalidated_signables(ctx, signable, starting_height: int, ending_height: int, pk): - sql = select_unvalidated_signables( - signable, starting_height, ending_height, - pk, signable.c.signature, signable.c.signature_digest - ) - return ctx.fetchall(sql) - - class ClaimChanges(NamedTuple): deleted_channels: Set[bytes] - channels_with_changed_claims: Set[bytes] + channels_with_changed_content: Set[bytes] claims_with_changed_supports: Set[bytes] @@ -187,20 +117,7 @@ def process_claims_and_supports(): p.start(1) p.ctx.execute(Claim.delete().where(condition_spent_claims())) - with progress(Event.CLAIM_INSERT) as p: - channels_with_added_claims = set() - loader = p.ctx.get_bulk_loader() - for txo in rows_to_txos(p.ctx.fetchall(select_missing_claims)): - loader.add_claim(txo) - if txo.can_decode_claim and txo.claim.is_signed: - channels_with_added_claims.add(txo.claim.signing_channel_hash) - loader.save() - - with progress(Event.CLAIM_UPDATE) as p: - loader = p.ctx.get_bulk_loader() - for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)): - loader.update_claim(claim) - loader.save() + process_claim_changes() with progress(Event.SUPPORT_DELETE) as p: claims_with_deleted_supports = { @@ -208,25 +125,28 @@ def process_claims_and_supports(): select(distinct(Support.c.claim_hash)).where(condition_spent_supports) ) } + channels_with_deleted_supports = { + r['channel_hash'] for r in p.ctx.fetchall( + select(distinct(Support.c.channel_hash)) + .where(condition_spent_supports & (Support.c.channel_hash != None)) + ) + } p.start(1) sql = Support.delete().where(condition_spent_supports) p.ctx.execute(sql) with progress(Event.SUPPORT_INSERT) as p: - claims_with_added_supports = { - r['claim_hash'] for r in p.ctx.fetchall( - select(distinct(Support.c.claim_hash)).where(condition_spent_supports) - ) - } + claims_with_added_supports = set() loader = p.ctx.get_bulk_loader() - for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)): - loader.add_support(support) + for txo in rows_to_txos(p.ctx.fetchall(select_missing_supports)): + loader.add_support(txo) + claims_with_added_supports.add(txo.claim_hash) loader.save() return ClaimChanges( deleted_channels=deleted_channels, - channels_with_changed_claims=( - channels_with_added_claims | channels_with_deleted_claims + channels_with_changed_content=( + channels_with_deleted_supports | channels_with_deleted_claims ), claims_with_changed_supports=( claims_with_added_supports | claims_with_deleted_supports @@ -234,35 +154,137 @@ def process_claims_and_supports(): ) +def condition_unvalidated_signables(signable): + return ( + (signable.c.is_signature_valid == None) & + (signable.c.channel_hash != None) + ) + + +def get_unvalidated_signable_count(ctx, signable): + sql = ( + select(func.count('*').label('total')) + .select_from(signable) + .where(condition_unvalidated_signables(signable)) + ) + return ctx.fetchone(sql)['total'] + + +def select_unvalidated_signables(signable, pk, include_urls=False, include_previous=False): + sql = ( + select( + pk, signable.c.signature, signable.c.signature_digest, signable.c.channel_hash, ( + select(TXO.c.public_key).select_from(TXO) + .where( + (TXO.c.claim_hash == signable.c.channel_hash) & + (TXO.c.txo_type == TXO_TYPES['channel']) & + (TXO.c.height <= signable.c.height) + ) + .order_by(desc(TXO.c.height)) + .scalar_subquery().label('public_key') + ), + ) + .where(condition_unvalidated_signables(signable)) + ) + if include_previous: + assert signable.name != 'support', "Supports cannot be updated and don't have a previous." + sql = sql.add_columns( + select(TXO.c.channel_hash).select_from(TXO) + .where( + (TXO.c.claim_hash == signable.c.claim_hash) & + (TXO.c.txo_type.in_(CLAIM_TYPE_CODES)) & + (TXO.c.height <= signable.c.height) + ) + .order_by(desc(TXO.c.height)).offset(1) + .scalar_subquery().label('previous_channel_hash') + ) + if include_urls: + channel = Claim.alias('channel') + return sql.add_columns( + signable.c.short_url.label('claim_url'), + channel.c.short_url.label('channel_url') + ).select_from(signable.join(channel, signable.c.channel_hash == channel.c.claim_hash)) + return sql.select_from(signable) + + +def signature_validation(d: dict, row: dict, public_key) -> dict: + d['is_signature_valid'] = False + if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key): + d['is_signature_valid'] = True + return d + + +def process_signature_validation(changes: ClaimChanges): + + with progress(Event.CLAIM_SIGN) as p: + p.start(get_unvalidated_signable_count(p.ctx, Claim)) + claim_updates = [] + sql = select_unvalidated_signables( + Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None + ) + for claim in p.ctx.execute(sql): + claim_updates.append( + signature_validation({ + 'pk': claim['claim_hash'], + 'canonical_url': claim['channel_url'] + '/' + claim['claim_url'] + }, claim, claim['public_key']) + ) + if changes is not None: + changes.channels_with_changed_content.add(claim['channel_hash']) + if claim['previous_channel_hash']: + changes.channels_with_changed_content.add(claim['previous_channel_hash']) + if len(claim_updates) > 500: + p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) + p.step(len(claim_updates)) + claim_updates.clear() + if claim_updates: + p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) + del claim_updates + + with progress(Event.SUPPORT_SIGN) as p: + p.start(get_unvalidated_signable_count(p.ctx, Support)) + support_updates = [] + for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)): + support_updates.append( + signature_validation({'pk': support['txo_hash']}, support, support['public_key']) + ) + if changes is not None: + changes.channels_with_changed_content.add(support['channel_hash']) + if len(support_updates) > 500: + p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) + p.step(len(support_updates)) + support_updates.clear() + if support_updates: + p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) + del support_updates + + +def channel_content_count_calc(signable): + return ( + select(func.count('*')) + .select_from(signable) + .where((signable.c.channel_hash == Claim.c.claim_hash) & signable.c.is_signature_valid) + .scalar_subquery() + ) + + +def claim_support_aggregation(*cols): + return ( + select(*cols) + .select_from(Support) + .where(Support.c.claim_hash == Claim.c.claim_hash) + .scalar_subquery() + ) + + def process_metadata(starting_height: int, ending_height: int, initial_sync: bool): - # TODO: - # - claim updates to point to a different channel - # - deleting a channel should invalidate contained claim signatures chain = get_or_initialize_lbrycrd() channel = Claim.alias('channel') + stream = Claim.alias('stream') changes = process_claims_and_supports() if not initial_sync else None - support_amount_calculator = ( - select(func.coalesce(func.sum(Support.c.amount), 0) + Claim.c.amount) - .select_from(Support) - .where(Support.c.claim_hash == Claim.c.claim_hash) - .scalar_subquery() - ) - - supports_in_claim_calculator = ( - select(func.count('*')) - .select_from(Support) - .where(Support.c.claim_hash == Claim.c.claim_hash) - .scalar_subquery() - ) - - stream = Claim.alias('stream') - claims_in_channel_calculator = ( - select(func.count('*')) - .select_from(stream) - .where(stream.c.channel_hash == Claim.c.claim_hash) - .scalar_subquery() - ) + staked_support_amount_calc = claim_support_aggregation(func.coalesce(func.sum(Support.c.amount), 0)) + staked_support_count_calc = claim_support_aggregation(func.count('*')) with progress(Event.CLAIM_META) as p: p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height)) @@ -275,10 +297,15 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo .where(channel.c.claim_hash == Claim.c.channel_hash) .scalar_subquery() + '/' + bindparam('short_url_') )], else_=Claim.c.canonical_url), - support_amount=support_amount_calculator, - supports_in_claim_count=supports_in_claim_calculator, - claims_in_channel_count=case([( - (Claim.c.claim_type == TXO_TYPES['channel']), claims_in_channel_calculator + staked_support_amount=staked_support_amount_calc, + staked_support_count=staked_support_count_calc, + signed_claim_count=case([( + (Claim.c.claim_type == TXO_TYPES['channel']), + channel_content_count_calc(stream) + )], else_=0), + signed_support_count=case([( + (Claim.c.claim_type == TXO_TYPES['channel']), + channel_content_count_calc(Support) )], else_=0), ) ) @@ -292,6 +319,8 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo done += len(claims) p.step(done) + process_signature_validation(changes) + if not initial_sync and changes.claims_with_changed_supports: # covered by Event.CLAIM_META during initial_sync, then only run if supports change with progress(Event.CLAIM_CALC) as p: @@ -300,20 +329,23 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo Claim.update() .where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports))) .values( - support_amount=support_amount_calculator, - supports_in_claim_count=supports_in_claim_calculator, + staked_support_amount=staked_support_amount_calc, + staked_support_count=staked_support_count_calc, ) ) p.ctx.execute(sql) - if not initial_sync and changes.channels_with_changed_claims: + if not initial_sync and changes.channels_with_changed_content: # covered by Event.CLAIM_META during initial_sync, then only run if claims are deleted with progress(Event.CLAIM_CALC) as p: - p.start(len(changes.channels_with_changed_claims)) + p.start(len(changes.channels_with_changed_content)) sql = ( Claim.update() - .where((Claim.c.claim_hash.in_(changes.channels_with_changed_claims))) - .values(claims_in_channel_count=claims_in_channel_calculator) + .where((Claim.c.claim_hash.in_(changes.channels_with_changed_content))) + .values( + signed_claim_count=channel_content_count_calc(stream), + signed_support_count=channel_content_count_calc(Support), + ) ) p.ctx.execute(sql) @@ -356,69 +388,6 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo # done += len(supports) # p.step(done) - with progress(Event.CHANNEL_SIGN) as p: - p.start(get_updated_channel_key_count(p.ctx, starting_height, ending_height)) - done, step_size = 0, 500 - for offset in range(starting_height, ending_height+1, step_size): - channels = get_updated_channel_keys(p.ctx, offset, min(offset+step_size, ending_height)) - for channel in channels: - - claim_updates = [] - for claim in get_signables_for_channel(p.ctx, Claim, Claim.c.claim_hash, channel): - claim_updates.append( - signature_validation({'pk': claim['claim_hash']}, claim, channel['public_key']) - ) - if claim_updates: - p.ctx.execute( - Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates - ) - - support_updates = [] - for support in get_signables_for_channel(p.ctx, Support, Support.c.txo_hash, channel): - support_updates.append( - signature_validation({'pk': support['txo_hash']}, support, channel['public_key']) - ) - if support_updates: - p.ctx.execute( - Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates - ) - - p.step(len(channels)) - - with progress(Event.CLAIM_SIGN) as p: - p.start(get_unvalidated_signable_count(p.ctx, Claim, starting_height, ending_height)) - done, step_size = 0, 500 - for offset in range(starting_height, ending_height+1, step_size): - claims = get_unvalidated_signables( - p.ctx, Claim, offset, min(offset+step_size, ending_height), Claim.c.claim_hash) - claim_updates = [] - for claim in claims: - claim_updates.append( - signature_validation({'pk': claim['claim_hash']}, claim, claim['public_key']) - ) - if claim_updates: - p.ctx.execute( - Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates - ) - p.step(done) - - with progress(Event.SUPPORT_SIGN) as p: - p.start(get_unvalidated_signable_count(p.ctx, Support, starting_height, ending_height)) - done, step_size = 0, 500 - for offset in range(starting_height, ending_height+1, step_size): - supports = get_unvalidated_signables( - p.ctx, Support, offset, min(offset+step_size, ending_height), Support.c.txo_hash) - support_updates = [] - for support in supports: - support_updates.append( - signature_validation({'pk': support['txo_hash']}, support, support['public_key']) - ) - if support_updates: - p.ctx.execute( - Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates - ) - p.step(done) - def process_block_and_tx_filters(): @@ -498,7 +467,7 @@ class BlockchainSync(Sync): if -1 < our_best_file_height < chain_file['best_height']: # we have some blocks, need to figure out what we're missing # call get_block_files again limited to this file and current_height - file = (await self.chain.db.get_block_files( + chain_file = (await self.chain.db.get_block_files( file_number=chain_file['file_number'], start_height=our_best_file_height+1 ))[0] tx_count += chain_file['txs']