From 7d9bf0357494941bfcd68c42d131edf06bbe000a Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Sun, 5 Jul 2020 23:03:45 -0400 Subject: [PATCH] refactored --- lbry/blockchain/database.py | 58 +- lbry/blockchain/sync/queries.py | 82 --- lbry/blockchain/sync/steps.py | 674 ++++++++++-------- lbry/blockchain/sync/synchronizer.py | 55 +- lbry/blockchain/transaction.py | 15 +- lbry/console.py | 26 +- lbry/db/database.py | 9 +- lbry/db/queries.py | 15 +- lbry/db/query_context.py | 354 +++++---- lbry/db/sync.py | 35 +- lbry/db/tables.py | 42 +- scripts/simulate_sync_console.py | 60 +- .../integration/blockchain/test_blockchain.py | 118 +-- 13 files changed, 750 insertions(+), 793 deletions(-) delete mode 100644 lbry/blockchain/sync/queries.py diff --git a/lbry/blockchain/database.py b/lbry/blockchain/database.py index f18c6ef13..900889323 100644 --- a/lbry/blockchain/database.py +++ b/lbry/blockchain/database.py @@ -86,7 +86,7 @@ class BlockchainDB: return await self.run_in_executor(self.sync_execute, sql, *args) def sync_execute_fetchall(self, sql: str, *args) -> List[dict]: - return [dict(r) for r in self.connection.execute(sql, *args).fetchall()] + return self.connection.execute(sql, *args).fetchall() async def execute_fetchall(self, sql: str, *args) -> List[dict]: return await self.run_in_executor(self.sync_execute_fetchall, sql, *args) @@ -112,7 +112,7 @@ class BlockchainDB: if file_number is not None and start_height is not None: sql += "AND file = ? AND height >= ?" args = (file_number, start_height) - return self.sync_execute_fetchall(sql + " GROUP BY file ORDER BY file ASC;", args) + return [dict(r) for r in self.sync_execute_fetchall(sql + " GROUP BY file ORDER BY file ASC;", args)] async def get_block_files(self, file_number: int = None, start_height: int = None) -> List[dict]: return await self.run_in_executor( @@ -120,14 +120,14 @@ class BlockchainDB: ) def sync_get_blocks_in_file(self, block_file: int, start_height=0) -> List[dict]: - return self.sync_execute_fetchall( + return [dict(r) for r in self.sync_execute_fetchall( """ SELECT datapos as data_offset, height, hash as block_hash, txCount as txs FROM block_info WHERE file = ? AND height >= ? AND status&1 AND status&4 ORDER BY datapos ASC; """, (block_file, start_height) - ) + )] async def get_blocks_in_file(self, block_file: int, start_height=0) -> List[dict]: return await self.run_in_executor(self.sync_get_blocks_in_file, block_file, start_height) @@ -145,24 +145,27 @@ class BlockchainDB: } def sync_get_takeover_count(self, start_height: int, end_height: int) -> int: - sql = "SELECT COUNT(*) FROM takeover WHERE height BETWEEN ? AND ?" - return self.connection.execute(sql, (start_height, end_height)).fetchone()[0] + sql = """ + SELECT COUNT(*) FROM claim WHERE name IN ( + SELECT name FROM takeover WHERE claimID IS NOT NULL AND height BETWEEN ? AND ? + ) + """, (start_height, end_height) + return self.connection.execute(*sql).fetchone()[0] async def get_takeover_count(self, start_height: int, end_height: int) -> int: return await self.run_in_executor(self.sync_get_takeover_count, start_height, end_height) def sync_get_takeovers(self, start_height: int, end_height: int) -> List[dict]: - return self.sync_execute_fetchall( - """ - SELECT - takeover.name, - takeover.claimID AS claim_hash, - takeover.height - FROM takeover - WHERE height BETWEEN ? AND ? - ORDER BY height, name - """, (start_height, end_height) - ) + sql = """ + SELECT name, claimID, MAX(height) AS height FROM takeover + WHERE claimID IS NOT NULL AND height BETWEEN ? AND ? + GROUP BY name + """, (start_height, end_height) + return [{ + 'normalized': normalize_name(r['name'].decode()), + 'claim_hash': r['claimID'], + 'height': r['height'] + } for r in self.sync_execute_fetchall(*sql)] async def get_takeovers(self, start_height: int, end_height: int) -> List[dict]: return await self.run_in_executor(self.sync_get_takeovers, start_height, end_height) @@ -174,19 +177,15 @@ class BlockchainDB: async def get_claim_metadata_count(self, start_height: int, end_height: int) -> int: return await self.run_in_executor(self.sync_get_claim_metadata_count, start_height, end_height) - def sync_get_claim_metadata(self, start_height: int, end_height: int) -> List[dict]: - sql = """ + def sync_get_claim_metadata(self, claim_hashes) -> List[dict]: + sql = f""" SELECT - name, claimID, activationHeight, expirationHeight, + name, claimID, activationHeight, expirationHeight, originalHeight, (SELECT CASE WHEN takeover.claimID = claim.claimID THEN takeover.height END FROM takeover WHERE takeover.name = claim.name ORDER BY height DESC LIMIT 1 ) AS takeoverHeight, - (SELECT CASE WHEN takeover.claimID = claim.claimID THEN 1 ELSE 0 END - FROM takeover WHERE takeover.name = claim.name - ORDER BY height DESC LIMIT 1 - ) AS isControlling, (SELECT find_shortest_id(c.claimid, claim.claimid) FROM claim AS c WHERE c.nodename = claim.nodename AND @@ -194,18 +193,17 @@ class BlockchainDB: c.claimid != claim.claimid ) AS shortestID FROM claim - WHERE originalHeight BETWEEN ? AND ? - ORDER BY originalHeight - """, (start_height, end_height) + WHERE claimID IN ({','.join(['?' for _ in claim_hashes])}) + ORDER BY claimID + """, claim_hashes return [{ "name": r["name"], - "claim_hash_": r["claimID"], + "claim_hash": r["claimID"], "activation_height": r["activationHeight"], "expiration_height": r["expirationHeight"], "takeover_height": r["takeoverHeight"], - "is_controlling": r["isControlling"], + "creation_height": r["originalHeight"], "short_url": f'{normalize_name(r["name"].decode())}#{r["shortestID"] or r["claimID"][::-1].hex()[0]}', - "short_url_": f'{normalize_name(r["name"].decode())}#{r["shortestID"] or r["claimID"][::-1].hex()[0]}', } for r in self.sync_execute_fetchall(*sql)] async def get_claim_metadata(self, start_height: int, end_height: int) -> List[dict]: diff --git a/lbry/blockchain/sync/queries.py b/lbry/blockchain/sync/queries.py deleted file mode 100644 index 89472e5a8..000000000 --- a/lbry/blockchain/sync/queries.py +++ /dev/null @@ -1,82 +0,0 @@ -# pylint: disable=singleton-comparison -from sqlalchemy import func, desc -from sqlalchemy.future import select - -from lbry.db import TXO_TYPES, CLAIM_TYPE_CODES -from lbry.db.tables import Claim, Support, TXO - - -def condition_unvalidated_signables(signable): - return ( - (signable.c.is_signature_valid == None) & - (signable.c.channel_hash != None) - ) - - -def get_unvalidated_signable_count(ctx, signable): - sql = ( - select(func.count('*').label('total')) - .select_from(signable) - .where(condition_unvalidated_signables(signable)) - ) - return ctx.fetchone(sql)['total'] - - -def select_unvalidated_signables(signable, pk, include_urls=False, include_previous=False): - sql = ( - select( - pk, signable.c.signature, signable.c.signature_digest, signable.c.channel_hash, ( - select(TXO.c.public_key).select_from(TXO) - .where( - (TXO.c.claim_hash == signable.c.channel_hash) & - (TXO.c.txo_type == TXO_TYPES['channel']) & - (TXO.c.height <= signable.c.height) - ) - .order_by(desc(TXO.c.height)) - .limit(1) - .scalar_subquery().label('public_key') - ), - ) - .where(condition_unvalidated_signables(signable)) - ) - if include_previous: - assert signable.name != 'support', "Supports cannot be updated and don't have a previous." - sql = sql.add_columns( - select(TXO.c.channel_hash).select_from(TXO) - .where( - (TXO.c.claim_hash == signable.c.claim_hash) & - (TXO.c.txo_type.in_(CLAIM_TYPE_CODES)) & - (TXO.c.height <= signable.c.height) - ) - .order_by(desc(TXO.c.height)).offset(1).limit(1) - .scalar_subquery().label('previous_channel_hash') - ) - if include_urls: - channel = Claim.alias('channel') - return sql.add_columns( - signable.c.short_url.label('claim_url'), - channel.c.short_url.label('channel_url') - ).select_from(signable.join(channel, signable.c.channel_hash == channel.c.claim_hash)) - return sql.select_from(signable) - - -def channel_content_count_calc(signable): - return ( - select(func.count('*')) - .select_from(signable) - .where((signable.c.channel_hash == Claim.c.claim_hash) & signable.c.is_signature_valid) - .scalar_subquery() - ) - - -def claim_support_aggregation(*cols): - return ( - select(*cols) - .select_from(Support) - .where(Support.c.claim_hash == Claim.c.claim_hash) - .scalar_subquery() - ) - - -staked_support_amount_calc = claim_support_aggregation(func.coalesce(func.sum(Support.c.amount), 0)) -staked_support_count_calc = claim_support_aggregation(func.count('*')) diff --git a/lbry/blockchain/sync/steps.py b/lbry/blockchain/sync/steps.py index 9a9b4f2b2..b093d05f7 100644 --- a/lbry/blockchain/sync/steps.py +++ b/lbry/blockchain/sync/steps.py @@ -1,80 +1,39 @@ # pylint: disable=singleton-comparison import logging -import functools from contextvars import ContextVar -from typing import Set +from functools import partial +from typing import Optional, Tuple -from sqlalchemy import bindparam, case, distinct, text +from sqlalchemy import bindparam, case, distinct, text, func, between, desc +from sqlalchemy.future import select from sqlalchemy.schema import CreateTable from lbry.db import queries -from lbry.db.tables import Block as BlockTable, TXO, TXI -from lbry.db.query_context import progress, context, Event +from lbry.db.tables import ( + Block as BlockTable, TX, TXO, TXI, Claim, Support, + pg_add_txo_constraints_and_indexes, pg_add_txi_constraints_and_indexes +) +from lbry.db.query_context import ProgressContext, context, event_emitter from lbry.db.queries import rows_to_txos from lbry.db.sync import ( select_missing_supports, - select_missing_claims, select_stale_claims, - condition_spent_claims, condition_spent_supports, - set_input_addresses, update_spent_outputs + condition_spent_claims, + condition_spent_supports, condition_missing_supports, + set_input_addresses, update_spent_outputs, ) from lbry.db.utils import least -from lbry.schema.url import normalize_name +from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.block import Block, create_block_filter from lbry.blockchain.bcd_data_stream import BCDataStream -from lbry.blockchain.transaction import Output - -from .queries import ( - select, Claim, Support, - TXO_TYPES, CLAIM_TYPE_CODES, - channel_content_count_calc, - staked_support_amount_calc, staked_support_count_calc, - select_unvalidated_signables, get_unvalidated_signable_count -) +from lbry.blockchain.transaction import Output, OutputScript, TXRefImmutable log = logging.getLogger(__name__) _chain: ContextVar[Lbrycrd] = ContextVar('chain') -SYNC_STEPS = { - "initial_sync": [], - "ongoing_sync": [], - "events": [], -} - - -def sync_step(event: Event, step_size=1, initial_sync=False, ongoing_sync=False): - assert event.label not in SYNC_STEPS['events'], f"Event {event.label} used more than once." - assert initial_sync or ongoing_sync, "At least one of initial_sync or ongoing_sync must be true." - SYNC_STEPS['events'].append(event.label) - if initial_sync: - SYNC_STEPS['initial_sync'].append(event.label) - if ongoing_sync: - SYNC_STEPS['ongoing_sync'].append(event.label) - - def wrapper(f): - @functools.wraps(f) - def with_progress(*args, **kwargs): - with progress(event, step_size=step_size) as p: - return f(*args, **kwargs, p=p) - return with_progress - - return wrapper - - -class ClaimChanges: - deleted_channels: Set[bytes] - channels_with_changed_content: Set[bytes] - claims_with_changed_supports: Set[bytes] - - def __init__(self): - self.deleted_channels = set() - self.channels_with_changed_content = set() - self.claims_with_changed_supports = set() - - def get_or_initialize_lbrycrd(ctx=None) -> Lbrycrd: chain = _chain.get(None) if chain is not None: @@ -85,37 +44,16 @@ def get_or_initialize_lbrycrd(ctx=None) -> Lbrycrd: return chain -def process_block_file(block_file_number: int, starting_height: int, initial_sync: bool): +def process_block_file(block_file_number: int, starting_height: int): ctx = context() loader = ctx.get_bulk_loader() - last_block_processed = process_block_read(block_file_number, starting_height, initial_sync, loader) + last_block_processed = process_block_read(block_file_number, starting_height, loader) process_block_save(block_file_number, loader) return last_block_processed -def process_metadata(starting_height: int, ending_height: int, initial_sync: bool): - chain = get_or_initialize_lbrycrd() - process_inputs_outputs(initial_sync) - changes = None - if not initial_sync: - changes = ClaimChanges() - process_claim_delete(changes) - process_claim_insert(changes) - process_claim_update(changes) - process_support_delete(changes) - process_support_insert(changes) - process_takeovers(starting_height, ending_height, chain) - process_claim_metadata(starting_height, ending_height, chain) - process_claim_signatures(changes) - process_support_signatures(changes) - if not initial_sync: - # these depend on signature validation - process_stake_calc(changes) - process_channel_content(changes) - - -@sync_step(Event.BLOCK_READ, step_size=100, initial_sync=True, ongoing_sync=True) -def process_block_read(block_file_number: int, starting_height: int, initial_sync: bool, loader, p=None): +@event_emitter("blockchain.sync.block.read", "blocks", step_size=100) +def process_block_read(block_file_number: int, starting_height: int, loader, p: ProgressContext): chain = get_or_initialize_lbrycrd(p.ctx) stop = p.ctx.stop_event new_blocks = chain.db.sync_get_blocks_in_file(block_file_number, starting_height) @@ -132,22 +70,46 @@ def process_block_read(block_file_number: int, starting_height: int, initial_syn block_height = block_info['height'] fp.seek(block_info['data_offset']) block = Block.from_data_stream(stream, block_height, block_file_number) - loader.add_block( - block, initial_sync and chain.db.sync_get_claim_support_txo_hashes(block_height) - ) + loader.add_block(block) last_block_processed = block_height p.step(done) return last_block_processed -@sync_step(Event.BLOCK_SAVE, initial_sync=True, ongoing_sync=True) -def process_block_save(block_file_number: int, loader, p=None): +@event_emitter("blockchain.sync.block.save", "txs") +def process_block_save(block_file_number: int, loader, p: ProgressContext): p.extra = {'block_file': block_file_number} - loader.save() + loader.save(TX) -@sync_step(Event.INPUT_UPDATE, initial_sync=True, ongoing_sync=True) -def process_inputs_outputs(initial_sync=False, p=None): +@event_emitter("blockchain.sync.block.filters", "blocks") +def process_block_filters(p: ProgressContext): + blocks = [] + all_filters = [] + all_addresses = [] + for block in queries.get_blocks_without_filters(): + addresses = { + p.ctx.ledger.address_to_hash160(r['address']) + for r in queries.get_block_tx_addresses(block_hash=block['block_hash']) + } + all_addresses.extend(addresses) + block_filter = create_block_filter(addresses) + all_filters.append(block_filter) + blocks.append({'pk': block['block_hash'], 'block_filter': block_filter}) + # filters = [get_block_filter(f) for f in all_filters] + p.ctx.execute(BlockTable.update().where(BlockTable.c.block_hash == bindparam('pk')), blocks) + +# txs = [] +# for tx in queries.get_transactions_without_filters(): +# tx_filter = create_block_filter( +# {r['address'] for r in queries.get_block_tx_addresses(tx_hash=tx['tx_hash'])} +# ) +# txs.append({'pk': tx['tx_hash'], 'tx_filter': tx_filter}) +# execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs) + + +@event_emitter("blockchain.sync.spends", "steps") +def process_spends(initial_sync: bool, p: ProgressContext): step = 0 @@ -184,18 +146,22 @@ def process_inputs_outputs(initial_sync=False, p=None): str(insert_txis.compile(p.ctx.engine)).replace('txi AS old_txi', 'old_txi') )) p.step(next_step()) - # D. drop old txi + # D. drop old txi and vacuum p.ctx.execute(text("DROP TABLE old_txi;")) + if p.ctx.is_postgres: + with p.ctx.engine.connect() as c: + c.execute(text("COMMIT;")) + c.execute(text("VACUUM ANALYZE txi;")) p.step(next_step()) # E. restore integrity constraint if p.ctx.is_postgres: - p.ctx.execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);")) + pg_add_txi_constraints_and_indexes(p.ctx.execute) p.step(next_step()) else: set_input_addresses(p.ctx) p.step(next_step()) - # 2. Update spent TXOs setting is_spent = True + # 2. Update spent TXOs setting spent_height if initial_sync: # F. txo table reshuffling p.ctx.execute(text("ALTER TABLE txo RENAME TO old_txo;")) @@ -205,217 +171,348 @@ def process_inputs_outputs(initial_sync=False, p=None): p.step(next_step()) # G. insert old_txo = TXO.alias('old_txo') - columns = ( - [c for c in old_txo.columns if c.name != 'is_spent'] + - [(TXI.c.txo_hash != None).label('is_spent')] - ) - select_txos = select(*columns).select_from(old_txo.join(TXI, isouter=True)) - insert_txos = TXO.insert().from_select(columns, select_txos) + columns = [c for c in old_txo.columns if c.name != 'spent_height'] + select_columns = columns + [func.coalesce(TXI.c.height, 0).label('spent_height')] + insert_columns = columns + [TXO.c.spent_height] + select_txos = select(*select_columns).select_from(old_txo.join(TXI, isouter=True)) + insert_txos = TXO.insert().from_select(insert_columns, select_txos) p.ctx.execute(text( - str(insert_txos.compile(p.ctx.engine)).replace('txo AS old_txo', 'old_txo') + str(insert_txos.compile(p.ctx.engine)) + .replace('txo AS old_txo', 'old_txo') + .replace('%(coalesce_1)s', '0') )) p.step(next_step()) # H. drop old txo p.ctx.execute(text("DROP TABLE old_txo;")) + if p.ctx.is_postgres: + with p.ctx.engine.connect() as c: + c.execute(text("COMMIT;")) + c.execute(text("VACUUM ANALYZE txo;")) p.step(next_step()) # I. restore integrity constraint if p.ctx.is_postgres: - p.ctx.execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) + pg_add_txo_constraints_and_indexes(p.ctx.execute) p.step(next_step()) else: update_spent_outputs(p.ctx) p.step(next_step()) -@sync_step(Event.BLOCK_FILTER, initial_sync=True, ongoing_sync=True) -def process_block_filters(p=None): - blocks = [] - all_filters = [] - all_addresses = [] - for block in queries.get_blocks_without_filters(): - addresses = { - p.ctx.ledger.address_to_hash160(r['address']) - for r in queries.get_block_tx_addresses(block_hash=block['block_hash']) - } - all_addresses.extend(addresses) - block_filter = create_block_filter(addresses) - all_filters.append(block_filter) - blocks.append({'pk': block['block_hash'], 'block_filter': block_filter}) - # filters = [get_block_filter(f) for f in all_filters] - p.ctx.execute(BlockTable.update().where(BlockTable.c.block_hash == bindparam('pk')), blocks) - -# txs = [] -# for tx in queries.get_transactions_without_filters(): -# tx_filter = create_block_filter( -# {r['address'] for r in queries.get_block_tx_addresses(tx_hash=tx['tx_hash'])} -# ) -# txs.append({'pk': tx['tx_hash'], 'tx_filter': tx_filter}) -# execute(TX.update().where(TX.c.tx_hash == bindparam('pk')), txs) - - -@sync_step(Event.CLAIM_DELETE, ongoing_sync=True) -def process_claim_delete(changes: ClaimChanges, p=None): - changes.channels_with_changed_content |= { - r['channel_hash'] for r in p.ctx.fetchall( - select(distinct(Claim.c.channel_hash)) - .where(condition_spent_claims( - list(set(CLAIM_TYPE_CODES) - {TXO_TYPES['channel']}) - ) & (Claim.c.channel_hash != None)) - ) - } - changes.deleted_channels |= { - r['claim_hash'] for r in p.ctx.fetchall( - select(distinct(Claim.c.claim_hash)).where( - (Claim.c.claim_type == TXO_TYPES['channel']) & - condition_spent_claims([TXO_TYPES['channel']]) +def insert_claims_with_lbrycrd(done, chain, p: ProgressContext, cursor): + loader = p.ctx.get_bulk_loader() + for rows in cursor.partitions(900): + claim_metadata = iter(chain.db.sync_get_claim_metadata(claim_hashes=[row['claim_hash'] for row in rows])) + for row in rows: + metadata = next(claim_metadata, None) + if metadata is None or metadata['claim_hash'] != row.claim_hash: + log.error( + r"During sync'ing a claim in our db couldn't find a " + r"match in lbrycrd's db. This could be because lbrycrd " + r"moved a block forward and updated its own claim table " + r"while we were still on a previous block, or it could be " + r"a more fundamental issue... ¯\_(ツ)_/¯" + ) + if metadata is None: + break + if metadata['claim_hash'] != row.claim_hash: + continue + txo = Output( + amount=row.amount, + script=OutputScript(row.src), + tx_ref=TXRefImmutable.from_hash(row.tx_hash, row.height), + position=row.txo_position, ) + extra = { + 'timestamp': row.timestamp, + 'staked_support_amount': int(row.staked_support_amount), + 'staked_support_count': int(row.staked_support_count), + 'short_url': metadata['short_url'], + 'creation_height': metadata['creation_height'], + 'activation_height': metadata['activation_height'], + 'expiration_height': metadata['expiration_height'], + 'takeover_height': metadata['takeover_height'], + } + if hasattr(row, 'signature'): + extra.update({ + 'signature': row.signature, + 'signature_digest': row.signature_digest, + 'channel_public_key': row.channel_public_key, + 'channel_url': row.channel_url + }) + loader.add_claim(txo, **extra) + if len(loader.claims) >= 10_000: + done += loader.flush(Claim) + p.step(done) + done += loader.flush(Claim) + p.step(done) + return done + + +def channel_content_count_calc(signable): + return ( + select(func.count('*')) + .select_from(signable) + .where((signable.c.channel_hash == Claim.c.claim_hash) & signable.c.is_signature_valid) + .scalar_subquery() + ) + + +@event_emitter("blockchain.sync.claims", "claims") +def process_claims(starting_height: int, blocks_added: Optional[Tuple[int, int]], p: ProgressContext): + chain = get_or_initialize_lbrycrd(p.ctx) + initial_sync = not p.ctx.has_records(Claim) + to_be_modified = p.ctx.fetchtotal( + (TXO.c.txo_type.in_(CLAIM_TYPE_CODES)) & + (TXO.c.spent_height == 0) & + (TXO.c.txo_hash.notin_(select(Claim.c.txo_hash))) + ) + to_be_deleted = to_be_synced = to_be_overtaken = to_be_counted_channel_members = 0 + condition_changed_stakes = condition_changed_channel_content = None + if initial_sync: + to_be_counted_channel_members = p.ctx.fetchtotal( + (TXO.c.txo_type == TXO_TYPES['channel']) & + (TXO.c.spent_height == 0) ) - } - p.start(1) - p.ctx.execute(Claim.delete().where(condition_spent_claims())) - - -@sync_step(Event.CLAIM_INSERT, ongoing_sync=True) -def process_claim_insert(_, p=None): - loader = p.ctx.get_bulk_loader() - for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)): - loader.add_claim(claim) - loader.save() - - -@sync_step(Event.CLAIM_UPDATE, ongoing_sync=True) -def process_claim_update(_, p=None): - loader = p.ctx.get_bulk_loader() - for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)): - loader.update_claim(claim) - loader.save() - - -@sync_step(Event.SUPPORT_DELETE, ongoing_sync=True) -def process_support_delete(changes: ClaimChanges, p=None): - changes.claims_with_changed_supports |= { - r['claim_hash'] for r in p.ctx.fetchall( - select(distinct(Support.c.claim_hash)).where(condition_spent_supports) - ) - } - changes.channels_with_changed_content |= { - r['channel_hash'] for r in p.ctx.fetchall( - select(distinct(Support.c.channel_hash)) - .where(condition_spent_supports & (Support.c.channel_hash != None)) - ) - } - p.start(1) - sql = Support.delete().where(condition_spent_supports) - p.ctx.execute(sql) - - -@sync_step(Event.SUPPORT_INSERT, ongoing_sync=True) -def process_support_insert(changes: ClaimChanges, p=None): - loader = p.ctx.get_bulk_loader() - for txo in rows_to_txos(p.ctx.fetchall(select_missing_supports)): - loader.add_support(txo) - changes.claims_with_changed_supports.add(txo.claim_hash) - loader.save() - - -@sync_step(Event.CLAIM_TRIE, step_size=100, ongoing_sync=True) -def process_takeovers(starting_height: int, ending_height: int, chain, p=None): - p.start(chain.db.sync_get_takeover_count(start_height=starting_height, end_height=ending_height)) - for offset in range(starting_height, ending_height + 1): - for takeover in chain.db.sync_get_takeovers(start_height=offset, end_height=offset): - update_claims = ( - Claim.update() - .where(Claim.c.normalized == normalize_name(takeover['name'].decode())) - .values( - is_controlling=case( - [(Claim.c.claim_hash == takeover['claim_hash'], True)], - else_=False - ), - takeover_height=case( - [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], - else_=None - ), - activation_height=least(Claim.c.activation_height, takeover['height']), + else: + to_be_deleted = p.ctx.fetchtotal(condition_spent_claims()) + if blocks_added: + condition_changed_stakes = ( + (TXO.c.txo_type == TXO_TYPES['support']) & ( + between(TXO.c.height, blocks_added[0], blocks_added[-1]) | + between(TXO.c.spent_height, blocks_added[0], blocks_added[-1]) ) ) - p.ctx.execute(update_claims) - p.step(1) + sql = ( + select(func.count(distinct(TXO.c.claim_hash)).label('total')) + .where(condition_changed_stakes) + ) + to_be_synced = p.ctx.fetchone(sql)['total'] + condition_changed_channel_content = ( + (TXO.c.channel_hash != None) & ( + between(TXO.c.height, blocks_added[0], blocks_added[-1]) | + between(TXO.c.spent_height, blocks_added[0], blocks_added[-1]) + ) + ) + sql = ( + select(func.count(distinct(TXO.c.channel_hash)).label('total')) + .where(condition_changed_channel_content) + ) + to_be_synced += p.ctx.fetchone(sql)['total'] -@sync_step(Event.CLAIM_META, initial_sync=True, ongoing_sync=True) -def process_claim_metadata(starting_height: int, ending_height: int, chain, p=None): - channel = Claim.alias('channel') - stream = Claim.alias('stream') - p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height)) - claim_update_sql = ( - Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')) - .values( - canonical_url=case([( - ((Claim.c.canonical_url == None) & (Claim.c.channel_hash != None)), - select(channel.c.short_url).select_from(channel) - .where(channel.c.claim_hash == Claim.c.channel_hash) - .scalar_subquery() + '/' + bindparam('short_url_') - )], else_=Claim.c.canonical_url), - staked_support_amount=staked_support_amount_calc, - staked_support_count=staked_support_count_calc, - signed_claim_count=case([( - (Claim.c.claim_type == TXO_TYPES['channel']), - channel_content_count_calc(stream) - )], else_=0), - signed_support_count=case([( - (Claim.c.claim_type == TXO_TYPES['channel']), - channel_content_count_calc(Support) - )], else_=0), + to_be_overtaken = chain.db.sync_get_takeover_count( + start_height=blocks_added[0], end_height=blocks_added[-1]) + + p.start(to_be_deleted + to_be_modified + to_be_synced + to_be_overtaken + to_be_counted_channel_members) + + done = 0 + + if to_be_deleted: + deleted = p.ctx.execute(Claim.delete().where(condition_spent_claims())) + assert to_be_deleted == deleted.rowcount, \ + f"Expected claims to be deleted {to_be_deleted}, actual deleted {deleted.rowcount}." + done += deleted.rowcount + p.step(done) + + support = TXO.alias('support') + staked_support_amount_calc = ( + select(func.coalesce(func.sum(support.c.amount), 0)).where( + (support.c.txo_type == TXO_TYPES['support']) & + (support.c.spent_height == 0) ) ) - done, step_size = 0, 500 - for offset in range(starting_height, ending_height + 1, step_size): - claims = chain.db.sync_get_claim_metadata( - start_height=offset, end_height=min(offset + step_size, ending_height) + staked_support_count_calc = ( + select(func.coalesce(func.count('*'), 0)).where( + (support.c.txo_type == TXO_TYPES['support']) & + (support.c.spent_height == 0) ) - if claims: - p.ctx.execute(claim_update_sql, claims) - done += len(claims) + ) + select_claims = ( + select( + TXO.c.claim_hash, TXO.c.amount, TXO.c.position.label('txo_position'), + TX.c.tx_hash, TX.c.height, TX.c.timestamp, + func.substr(TX.c.raw, TXO.c.script_offset+1, TXO.c.script_length).label('src'), + (staked_support_amount_calc + .where(support.c.claim_hash == TXO.c.claim_hash) + .label('staked_support_amount')), + (staked_support_count_calc + .where(support.c.claim_hash == TXO.c.claim_hash) + .label('staked_support_count')) + ).order_by(TXO.c.claim_hash) + ) + + streaming_connection = p.ctx.connection.execution_options(stream_results=True) + + # all channels need to be inserted first because channel short_url will needed to + # set the contained claims canonical_urls when those are inserted next + done = insert_claims_with_lbrycrd( + done, chain, p, streaming_connection.execute( + select_claims.select_from(TXO.join(TX)).where( + (TXO.c.txo_type == TXO_TYPES['channel']) & + (TXO.c.spent_height == 0) & + (TXO.c.claim_hash.notin_(select(Claim.c.claim_hash))) + ) + ) + ) + + channel_txo = TXO.alias('channel_txo') + channel_claim = Claim.alias('channel_claim') + select_claims = ( + select_claims.add_columns( + TXO.c.signature, TXO.c.signature_digest, + case([( + TXO.c.channel_hash != None, + select(channel_txo.c.public_key).select_from(channel_txo).where( + (channel_txo.c.txo_type == TXO_TYPES['channel']) & + (channel_txo.c.claim_hash == TXO.c.channel_hash) & + (channel_txo.c.height <= TXO.c.height) + ).order_by(desc(channel_txo.c.height)).limit(1).scalar_subquery() + )]).label('channel_public_key'), + channel_claim.c.short_url.label('channel_url') + ).select_from( + TXO + .join(TX) + .join(channel_claim, channel_claim.c.claim_hash == TXO.c.channel_hash, isouter=True) + ) + ) + + done = insert_claims_with_lbrycrd( + done, chain, p, streaming_connection.execute( + select_claims.where( + (TXO.c.txo_type.in_(list(set(CLAIM_TYPE_CODES) - {TXO_TYPES['channel']}))) & + (TXO.c.spent_height == 0) & + (TXO.c.claim_hash.notin_(select(Claim.c.claim_hash))) + ) + ) + ) + + if initial_sync: + channel_update_member_count_sql = ( + Claim.update() + .where(Claim.c.claim_type == TXO_TYPES['channel']) + .values( + signed_claim_count=channel_content_count_calc(Claim.alias('content')), + signed_support_count=channel_content_count_calc(Support), + ) + ) + result = p.ctx.execute(channel_update_member_count_sql) + done += result.rowcount + p.step(done) + + if initial_sync: + return + + select_stale_claims = select_claims.where( + (TXO.c.txo_type.in_(CLAIM_TYPE_CODES)) & + (TXO.c.spent_height == 0) & + (TXO.c.txo_hash.notin_(select(Claim.c.txo_hash))) + ) + loader = p.ctx.get_bulk_loader() + for row in p.ctx.connection.execution_options(stream_results=True).execute(select_stale_claims): + txo = Output( + amount=row['amount'], + script=OutputScript(row['src']), + tx_ref=TXRefImmutable.from_hash(row['tx_hash'], row['height']), + position=row['txo_position'], + ) + loader.update_claim( + txo, channel_url=row['channel_url'], timestamp=row['timestamp'], + staked_support_amount=int(row['staked_support_amount']), + staked_support_count=int(row['staked_support_count']), + signature=row['signature'], signature_digest=row['signature_digest'], + channel_public_key=row['channel_public_key'], + ) + if len(loader.update_claims) >= 1000: + done += loader.flush(Claim) p.step(done) + done += loader.flush(Claim) + p.step(done) - -def signature_validation(d: dict, row: dict, public_key) -> dict: - d['is_signature_valid'] = False - if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key): - d['is_signature_valid'] = True - return d - - -@sync_step(Event.CLAIM_SIGN, initial_sync=True, ongoing_sync=True) -def process_claim_signatures(changes: ClaimChanges, p=None): - p.start(get_unvalidated_signable_count(p.ctx, Claim)) - claim_updates = [] - sql = select_unvalidated_signables( - Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None - ) - steps = 0 - for claim in p.ctx.execute(sql): - claim_updates.append( - signature_validation({ - 'pk': claim['claim_hash'], - 'canonical_url': claim['channel_url'] + '/' + claim['claim_url'] - }, claim, claim['public_key']) + for takeover in chain.db.sync_get_takeovers(start_height=blocks_added[0], end_height=blocks_added[-1]): + update_claims = ( + Claim.update() + .where(Claim.c.normalized == takeover['normalized']) + .values( + is_controlling=case( + [(Claim.c.claim_hash == takeover['claim_hash'], True)], + else_=False + ), + takeover_height=case( + [(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])], + else_=None + ), + activation_height=least(Claim.c.activation_height, takeover['height']), + ) ) - if changes is not None: - changes.channels_with_changed_content.add(claim['channel_hash']) - if claim['previous_channel_hash']: - changes.channels_with_changed_content.add(claim['previous_channel_hash']) - if len(claim_updates) > 1000: - p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) - steps += len(claim_updates) - p.step(steps) - claim_updates.clear() - if claim_updates: - p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates) + result = p.ctx.execute(update_claims) + done += result.rowcount + p.step(done) + + channel_update_member_count_sql = ( + Claim.update() + .where( + (Claim.c.claim_type == TXO_TYPES['channel']) & + Claim.c.claim_hash.in_(select(TXO.c.channel_hash).where(condition_changed_channel_content)) + ).values( + signed_claim_count=channel_content_count_calc(Claim.alias('content')), + signed_support_count=channel_content_count_calc(Support), + ) + ) + p.ctx.execute(channel_update_member_count_sql) + + claim_update_supports_sql = ( + Claim.update() + .where(Claim.c.claim_hash.in_(select(TXO.c.claim_hash).where(condition_changed_stakes))) + .values( + staked_support_amount=( + staked_support_amount_calc + .where(support.c.claim_hash == Claim.c.claim_hash) + .scalar_subquery()) + , + staked_support_count=( + staked_support_count_calc + .where(support.c.claim_hash == Claim.c.claim_hash) + .scalar_subquery() + ), + ) + ) + result = p.ctx.execute(claim_update_supports_sql) + p.step(done+result.rowcount) -@sync_step(Event.SUPPORT_SIGN, initial_sync=True, ongoing_sync=True) -def process_support_signatures(changes: ClaimChanges, p=None): +@event_emitter("blockchain.sync.supports", "supports") +def process_supports(starting_height: int, blocks_added: Optional[Tuple[int, int]], p: ProgressContext): + done = 0 + to_be_deleted = p.ctx.fetchtotal(condition_spent_supports) + to_be_inserted = p.ctx.fetchtotal(condition_missing_supports) + p.start(to_be_deleted + to_be_inserted) + + sql = Support.delete().where(condition_spent_supports) + deleted = p.ctx.execute(sql) + assert to_be_deleted == deleted.rowcount,\ + f"Expected supports to be deleted {to_be_deleted}, actual deleted {deleted.rowcount}." + done += deleted.rowcount + p.step(done) + + if p.ctx.is_postgres: + insert_supports = partial(p.ctx.pg_copy, Support) + else: + insert_supports = partial(p.ctx.execute, Support.insert()) + loader = p.ctx.get_bulk_loader() + inserted_supports, supports = 0, [] + for txo in rows_to_txos(p.ctx.fetchall(select_missing_supports)): + supports.append(loader.support_to_row(txo)) + if len(supports) >= 50_000: + insert_supports(supports) + inserted_supports += len(supports) + supports = [] + if supports: + insert_supports(supports) + inserted_supports += len(supports) + assert to_be_inserted == inserted_supports, \ + f"Expected supports to be inserted {to_be_inserted}, actual inserted {inserted_supports}." + return + p.start(get_unvalidated_signable_count(p.ctx, Support)) support_updates = [] for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)): @@ -430,32 +527,3 @@ def process_support_signatures(changes: ClaimChanges, p=None): support_updates.clear() if support_updates: p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates) - - -@sync_step(Event.STAKE_CALC, ongoing_sync=True) -def process_stake_calc(changes: ClaimChanges, p=None): - p.start(len(changes.claims_with_changed_supports)) - sql = ( - Claim.update() - .where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports))) - .values( - staked_support_amount=staked_support_amount_calc, - staked_support_count=staked_support_count_calc, - ) - ) - p.ctx.execute(sql) - - -@sync_step(Event.CLAIM_CHAN, ongoing_sync=True) -def process_channel_content(changes: ClaimChanges, p=None): - p.start(len(changes.channels_with_changed_content)) - stream = Claim.alias('stream') - sql = ( - Claim.update() - .where((Claim.c.claim_hash.in_(changes.channels_with_changed_content))) - .values( - signed_claim_count=channel_content_count_calc(stream), - signed_support_count=channel_content_count_calc(Support), - ) - ) - p.ctx.execute(sql) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 63635e8b5..00ad8fc08 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -9,12 +9,7 @@ from lbry.event import BroadcastSubscription from lbry.service.base import Sync, BlockEvent from lbry.blockchain.lbrycrd import Lbrycrd -from .steps import ( - SYNC_STEPS, - process_block_file, - process_block_filters, - process_metadata, -) +from . import steps log = logging.getLogger(__name__) @@ -35,9 +30,7 @@ class BlockchainSync(Sync): # created while sync was running; therefore, run a second sync # after first one finishes to possibly sync those new blocks. # run advance as a task so that it can be stop()'ed if necessary. - self.advance_loop_task = asyncio.create_task( - self.advance(await self.db.needs_initial_sync()) - ) + self.advance_loop_task = asyncio.create_task(self.advance()) await self.advance_loop_task self.chain.subscribe() self.advance_loop_task = asyncio.create_task(self.advance_loop()) @@ -58,7 +51,7 @@ class BlockchainSync(Sync): self.db.executor, f, *args ) - async def load_blocks(self, sync_steps: int, initial_sync: bool) -> Optional[Tuple[int, int]]: + async def load_blocks(self) -> Optional[Tuple[int, int]]: tasks = [] starting_height, ending_height = None, await self.chain.db.get_best_height() tx_count = block_count = 0 @@ -81,16 +74,15 @@ class BlockchainSync(Sync): our_best_file_height+1 if starting_height is None else starting_height, our_best_file_height+1 ) tasks.append(self.run( - process_block_file, chain_file['file_number'], our_best_file_height+1, initial_sync + steps.process_block_file, chain_file['file_number'], our_best_file_height+1 )) if not tasks: return await self._on_progress_controller.add({ - "event": Event.START.label, # pylint: disable=no-member + "event": "blockchain.sync.start", "data": { "starting_height": starting_height, "ending_height": ending_height, - "sync_steps": sync_steps, "files": len(tasks), "blocks": block_count, "txs": tx_count, @@ -111,27 +103,22 @@ class BlockchainSync(Sync): best_height_processed = max(f.result() for f in done) return starting_height, best_height_processed - def get_steps(self, initial_sync: bool): - if initial_sync: - sync_steps = SYNC_STEPS['initial_sync'].copy() - else: - sync_steps = SYNC_STEPS['ongoing_sync'].copy() - if not self.conf.spv_address_filters: - sync_steps.remove(Event.BLOCK_FILTER.label) # pylint: disable=no-member - return sync_steps - - async def advance(self, initial_sync=False): - sync_steps = self.get_steps(initial_sync) - heights = await self.load_blocks(sync_steps, initial_sync) - if heights: - starting_height, ending_height = heights - await self.run(process_metadata, starting_height, ending_height, initial_sync) - if self.conf.spv_address_filters: - await self.run(process_block_filters) - await self._on_block_controller.add(BlockEvent(ending_height)) - self.db.message_queue.put(( - Event.COMPLETE.value, os.getpid(), len(sync_steps), len(sync_steps) - )) + async def advance(self): + starting_height = await self.db.get_best_block_height() + blocks_added = await self.load_blocks() + process_block_filters = ( + self.run(steps.process_block_filters) + if blocks_added and self.conf.spv_address_filters else asyncio.sleep(0) + ) + if blocks_added: + await self.run(steps.process_spends, blocks_added[0] == 0) + await asyncio.wait([ + process_block_filters, + self.run(steps.process_claims, starting_height, blocks_added), + self.run(steps.process_supports, starting_height, blocks_added), + ]) + if blocks_added: + await self._on_block_controller.add(BlockEvent(blocks_added[-1])) async def advance_loop(self): while True: diff --git a/lbry/blockchain/transaction.py b/lbry/blockchain/transaction.py index 5348c63fa..928a574df 100644 --- a/lbry/blockchain/transaction.py +++ b/lbry/blockchain/transaction.py @@ -196,7 +196,7 @@ class Input(InputOutput): class Output(InputOutput): __slots__ = ( - 'amount', 'script', 'is_internal_transfer', 'is_spent', 'is_my_output', 'is_my_input', + 'amount', 'script', 'is_internal_transfer', 'spent_height', 'is_my_output', 'is_my_input', 'channel', 'private_key', 'meta', 'sent_supports', 'sent_tips', 'received_tips', 'purchase', 'purchased_claim', 'purchase_receipt', 'reposted_claim', 'claims', '_signable' @@ -204,7 +204,7 @@ class Output(InputOutput): def __init__(self, amount: int, script: OutputScript, tx_ref: TXRef = None, position: int = None, - is_internal_transfer: Optional[bool] = None, is_spent: Optional[bool] = None, + is_internal_transfer: Optional[bool] = None, spent_height: Optional[bool] = None, is_my_output: Optional[bool] = None, is_my_input: Optional[bool] = None, sent_supports: Optional[int] = None, sent_tips: Optional[int] = None, received_tips: Optional[int] = None, @@ -214,7 +214,7 @@ class Output(InputOutput): self.amount = amount self.script = script self.is_internal_transfer = is_internal_transfer - self.is_spent = is_spent + self.spent_height = spent_height self.is_my_output = is_my_output self.is_my_input = is_my_input self.sent_supports = sent_supports @@ -233,7 +233,7 @@ class Output(InputOutput): def update_annotations(self, annotated: 'Output'): if annotated is None: self.is_internal_transfer = None - self.is_spent = None + self.spent_height = None self.is_my_output = None self.is_my_input = None self.sent_supports = None @@ -241,7 +241,7 @@ class Output(InputOutput): self.received_tips = None else: self.is_internal_transfer = annotated.is_internal_transfer - self.is_spent = annotated.is_spent + self.spent_height = annotated.spent_height self.is_my_output = annotated.is_my_output self.is_my_input = annotated.is_my_input self.sent_supports = annotated.sent_supports @@ -262,6 +262,11 @@ class Output(InputOutput): def hash(self): return self.ref.hash + @property + def is_spent(self): + if self.spent_height is not None: + return self.spent_height > 0 + @property def pubkey_hash(self): return self.script.values['pubkey_hash'] diff --git a/lbry/console.py b/lbry/console.py index 8e597369c..44cdeb72c 100644 --- a/lbry/console.py +++ b/lbry/console.py @@ -112,7 +112,6 @@ class Advanced(Basic): self.is_single_sync_bar = False self.single_bar_relative_steps = 0 self.last_stats = "" - self.sync_steps = [] self.block_savers = 0 self.block_readers = 0 self.stderr = RedirectOutput('stderr') @@ -123,7 +122,7 @@ class Advanced(Basic): def stopping(self): super().stopping() - self.stderr.flush(self.bars['sync'].write, True) + self.stderr.flush(self.bars['read'].write, True) self.stderr.release() def get_or_create_bar(self, name, desc, unit, total, leave=False, bar_format=None, postfix=None, position=None): @@ -138,16 +137,12 @@ class Advanced(Basic): def sync_start(self, d): self.bars.clear() - self.sync_steps = d['sync_steps'] if d['ending_height']-d['starting_height'] > 0: label = f"sync {d['starting_height']:,d}-{d['ending_height']:,d}" else: label = f"sync {d['ending_height']:,d}" + print(label) self.last_stats = f"{d['txs']:,d} txs, {d['claims']:,d} claims and {d['supports']:,d} supports" - self.get_or_create_bar( - "sync", label, "tasks", len(d['sync_steps']), True, - "{l_bar}{bar}| {postfix[0]:<55}", (self.last_stats,) - ) self.get_or_create_bar("read", "├─ blocks read", "blocks", d['blocks'], True) self.get_or_create_bar("save", "└─┬ txs saved", "txs", d['txs'], True) @@ -155,16 +150,8 @@ class Advanced(Basic): if e in ('blockchain.sync.block.read', 'blockchain.sync.block.save'): self.update_block_bars(e, d) else: - self.update_steps_bar(e, d) self.update_other_bars(e, d) - def update_steps_bar(self, e, d): - sync_bar = self.bars['sync'] - if d['step'] == d['total']: - sync_done = (self.sync_steps.index(e)+1)-sync_bar.last_print_n - sync_bar.postfix = (f'finished: {e}',) - sync_bar.update(sync_done) - def update_block_bars(self, event, d): total_bar = self.bars[event[-4:]] if event.endswith("read") and self.block_readers == 0: @@ -198,7 +185,6 @@ class Advanced(Basic): total_bar.update(diff) if total_bar.total == total_bar.last_print_n: - self.update_steps_bar(event, {'step': 1, 'total': 1}) if total_bar.desc.endswith('txs saved'): total_bar.desc = "├─ txs saved" total_bar.refresh() @@ -221,7 +207,7 @@ class Advanced(Basic): #bar.close() def sync_complete(self): - self.bars['sync'].postfix = (self.last_stats,) + self.bars['read'].postfix = (self.last_stats,) for bar in self.bars.values(): bar.close() @@ -229,10 +215,10 @@ class Advanced(Basic): e, d = event['event'], event.get('data', {}) if e.endswith("sync.start"): self.sync_start(d) - self.stderr.flush(self.bars['sync'].write) + self.stderr.flush(self.bars['read'].write) elif e.endswith("sync.complete"): - self.stderr.flush(self.bars['sync'].write, True) + self.stderr.flush(self.bars['read'].write, True) self.sync_complete() else: - self.stderr.flush(self.bars['sync'].write) + self.stderr.flush(self.bars['read'].write) self.update_progress(e, d) diff --git a/lbry/db/database.py b/lbry/db/database.py index df1470b46..0cb6f6f8e 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -196,11 +196,8 @@ class Database: async def process_all_things_after_sync(self): return await self.run_in_executor(sync.process_all_things_after_sync) - async def needs_initial_sync(self) -> bool: - return (await self.get_best_tx_height()) == -1 - - async def get_best_tx_height(self) -> int: - return await self.run_in_executor(q.get_best_tx_height) + async def get_best_block_height(self) -> int: + return await self.run_in_executor(q.get_best_block_height) async def get_best_block_height_for_file(self, file_number) -> int: return await self.run_in_executor(q.get_best_block_height_for_file, file_number) @@ -307,7 +304,7 @@ class Database: return txos async def get_utxos(self, **constraints) -> Result[Output]: - return await self.get_txos(is_spent=False, **constraints) + return await self.get_txos(spent_height=0, **constraints) async def get_supports(self, **constraints) -> Result[Output]: return await self.get_utxos(txo_type=TXO_TYPES['support'], **constraints) diff --git a/lbry/db/queries.py b/lbry/db/queries.py index 2c2d750b6..e5365dd49 100644 --- a/lbry/db/queries.py +++ b/lbry/db/queries.py @@ -78,16 +78,15 @@ def execute_fetchall(sql): return context().fetchall(text(sql)) -def get_best_tx_height(): +def get_best_block_height(): return context().fetchone( - select(func.coalesce(func.max(TX.c.height), -1).label('height')).select_from(TX) + select(func.coalesce(func.max(Block.c.height), -1).label('height')) )['height'] def get_best_block_height_for_file(file_number): return context().fetchone( select(func.coalesce(func.max(Block.c.height), -1).label('height')) - .select_from(Block) .where(Block.c.file_number == file_number) )['height'] @@ -268,7 +267,7 @@ def get_transaction_count(**constraints): BASE_SELECT_TXO_COLUMNS = [ TX.c.tx_hash, TX.c.raw, TX.c.height, TX.c.position.label('tx_position'), TX.c.is_verified, TX.c.timestamp, - TXO.c.txo_type, TXO.c.position.label('txo_position'), TXO.c.amount, TXO.c.is_spent, + TXO.c.txo_type, TXO.c.position.label('txo_position'), TXO.c.amount, TXO.c.spent_height, TXO.c.script_offset, TXO.c.script_length, ] @@ -357,7 +356,7 @@ def rows_to_txos(rows: List[dict], include_tx=True) -> List[Output]: tx_ref=TXRefImmutable.from_hash(row['tx_hash'], row['height']), position=row['txo_position'], ) - txo.is_spent = bool(row['is_spent']) + txo.spent_height = bool(row['spent_height']) if 'is_my_input' in row: txo.is_my_input = bool(row['is_my_input']) if 'is_my_output' in row: @@ -437,7 +436,7 @@ def get_txos(no_tx=False, include_total=False, **constraints) -> Tuple[List[Outp channels = { txo.claim_hash: txo for txo in get_txos( - txo_type=TXO_TYPES['channel'], is_spent=False, + txo_type=TXO_TYPES['channel'], spent_height=0, wallet_account_ids=wallet_account_ids, claim_hash__in=channel_hashes )[0] } @@ -471,7 +470,7 @@ def get_txo_sum(**constraints): def get_balance(**constraints): - return get_txo_sum(is_spent=False, **constraints) + return get_txo_sum(spent_height=0, **constraints) def get_report(account_ids): @@ -793,7 +792,7 @@ def add_keys(account, chain, pubkeys): def get_supports_summary(self, **constraints): return get_txos( txo_type=TXO_TYPES['support'], - is_spent=False, is_my_output=True, + spent_height=0, is_my_output=True, include_is_my_input=True, no_tx=True, **constraints diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 3eff04856..2e0362262 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -1,14 +1,15 @@ import os import time +import functools from io import BytesIO import multiprocessing as mp -from enum import Enum from decimal import Decimal from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, field from contextvars import ContextVar -from sqlalchemy import create_engine, inspect, bindparam +from sqlalchemy import create_engine, inspect, bindparam, func, exists, case +from sqlalchemy.future import select from sqlalchemy.engine import Engine, Connection from sqlalchemy.sql import Insert try: @@ -93,6 +94,18 @@ class QueryContext: rows = self.connection.execute(sql, *args).fetchall() return [dict(row._mapping) for row in rows] + def fetchtotal(self, condition): + sql = select(func.count('*').label('total')).where(condition) + return self.fetchone(sql)['total'] + + def fetchmax(self, column): + sql = select(func.max(column).label('max_result')) + return self.fetchone(sql)['max_result'] + + def has_records(self, table): + sql = select(exists([1], from_obj=table).label('result')) + return self.fetchone(sql)['result'] + def insert_or_ignore(self, table): if self.is_sqlite: return table.insert().prefix_with("OR IGNORE") @@ -178,68 +191,58 @@ def uninitialize(): _context.set(None) -class ProgressUnit(Enum): - NONE = "", None - TASKS = "tasks", None - BLOCKS = "blocks", Block - TXS = "txs", TX - TXIS = "txis", TXI - CLAIMS = "claims", Claim - SUPPORTS = "supports", Support +class Event: + _events: List['Event'] = [] + __slots__ = 'id', 'name', 'unit', 'step_size' - def __new__(cls, value, table): - next_id = len(cls.__members__) + 1 - obj = object.__new__(cls) - obj._value_ = next_id - obj.label = value - obj.table = table - return obj + def __init__(self, name: str, unit: str, step_size: int): + self.name = name + self.unit = unit + self.step_size = step_size + + @classmethod + def get_by_id(cls, event_id) -> 'Event': + return cls._events[event_id] + + @classmethod + def get_by_name(cls, name) -> 'Event': + for event in cls._events: + if event.name == name: + return event + + @classmethod + def add(cls, name: str, unit: str, step_size: int) -> 'Event': + assert cls.get_by_name(name) is None, f"Event {name} already exists." + event = cls(name, unit, step_size) + cls._events.append(event) + event.id = cls._events.index(event) + return event -class Event(Enum): - START = "blockchain.sync.start", ProgressUnit.BLOCKS - COMPLETE = "blockchain.sync.complete", ProgressUnit.TASKS +def event_emitter(name: str, unit: str, step_size=1): + event = Event.add(name, unit, step_size) - # full node specific sync events - BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS - BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS - BLOCK_FILTER = "blockchain.sync.block.filter", ProgressUnit.BLOCKS - CLAIM_META = "blockchain.sync.claim.meta", ProgressUnit.CLAIMS - CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.CLAIMS - STAKE_CALC = "blockchain.sync.claim.stakes", ProgressUnit.CLAIMS - CLAIM_CHAN = "blockchain.sync.claim.channels", ProgressUnit.CLAIMS - CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS - SUPPORT_SIGN = "blockchain.sync.support.signatures", ProgressUnit.SUPPORTS - TRENDING_CALC = "blockchain.sync.trending", ProgressUnit.BLOCKS + def wrapper(f): + @functools.wraps(f) + def with_progress(*args, **kwargs): + with progress(event, step_size=step_size) as p: + return f(*args, **kwargs, p=p) + return with_progress - # full node + light client sync events - INPUT_UPDATE = "db.sync.input", ProgressUnit.TXIS - CLAIM_DELETE = "db.sync.claim.delete", ProgressUnit.CLAIMS - CLAIM_INSERT = "db.sync.claim.insert", ProgressUnit.CLAIMS - CLAIM_UPDATE = "db.sync.claim.update", ProgressUnit.CLAIMS - SUPPORT_DELETE = "db.sync.support.delete", ProgressUnit.SUPPORTS - SUPPORT_INSERT = "db.sync.support.insert", ProgressUnit.SUPPORTS - - def __new__(cls, value, unit: ProgressUnit): - next_id = len(cls.__members__) + 1 - obj = object.__new__(cls) - obj._value_ = next_id - obj.label = value - obj.unit = unit - return obj + return wrapper class ProgressPublisher(EventQueuePublisher): def message_to_event(self, message): - event = Event(message[0]) # pylint: disable=no-value-for-parameter + event = Event.get_by_id(message[0]) d = { - "event": event.label, + "event": event.name, "data": { "pid": message[1], "step": message[2], "total": message[3], - "unit": event.unit.label + "unit": event.unit } } if len(message) > 4 and isinstance(message[4], dict): @@ -294,12 +297,12 @@ class ProgressContext: def get_event_args(self, done): if self.extra is not None: - return self.event.value, self.ctx.pid, done, self.total, self.extra - return self.event.value, self.ctx.pid, done, self.total + return self.event.id, self.ctx.pid, done, self.total, self.extra + return self.event.id, self.ctx.pid, done, self.total def progress(e: Event, step_size=1) -> ProgressContext: - ctx = context(e.label) + ctx = context(e.name) ctx.current_progress = ProgressContext(ctx, e, step_size=step_size) return ctx.current_progress @@ -354,6 +357,7 @@ class BulkLoader: 'tx_hash': tx.hash, 'txo_hash': txi.txo_ref.hash, 'position': txi.position, + 'height': tx.height, } def txo_to_row(self, tx: Transaction, txo: Output) -> dict: @@ -371,6 +375,8 @@ class BulkLoader: 'claim_hash': None, 'claim_name': None, 'channel_hash': None, + 'signature': None, + 'signature_digest': None, 'public_key': None, 'public_key_hash': None } @@ -380,6 +386,8 @@ class BulkLoader: row['txo_type'] = TXO_TYPES.get(claim.claim_type, TXO_TYPES['stream']) if claim.is_signed: row['channel_hash'] = claim.signing_channel_hash + row['signature'] = txo.get_encoded_signature() + row['signature_digest'] = txo.get_signature_digest(self.ledger) if claim.is_channel: row['public_key'] = claim.channel.public_key_bytes row['public_key_hash'] = self.ledger.address_to_hash160( @@ -406,111 +414,89 @@ class BulkLoader: pass return row - def claim_to_rows(self, txo: Output) -> Tuple[dict, List]: - try: - claim_name = txo.claim_name.replace('\x00', '') - normalized_name = txo.normalized_name - except UnicodeDecodeError: - return {}, [] - tx = txo.tx_ref.tx - claim_hash = txo.claim_hash - claim_record = { - 'claim_hash': claim_hash, - 'claim_id': txo.claim_id, - 'claim_name': claim_name, - 'normalized': normalized_name, - 'address': txo.get_address(self.ledger), - 'txo_hash': txo.ref.hash, - 'amount': txo.amount, - 'timestamp': tx.timestamp, - 'release_time': None, - 'height': tx.height, - 'title': None, - 'author': None, - 'description': None, + def claim_to_rows( + self, txo: Output, timestamp: int, staked_support_amount: int, staked_support_count: int, + signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None, + ) -> Tuple[dict, List]: + + d = { 'claim_type': None, + 'address': txo.get_address(self.ledger), + 'txo_hash': txo.hash, + 'amount': txo.amount, + 'height': txo.tx_ref.height, + 'timestamp': timestamp, + # support + 'staked_amount': txo.amount + staked_support_amount, + 'staked_support_amount': staked_support_amount, + 'staked_support_count': staked_support_count, + # basic metadata + 'title': None, + 'description': None, + 'author': None, # streams 'stream_type': None, 'media_type': None, + 'duration': None, + 'release_time': None, 'fee_amount': 0, 'fee_currency': None, - 'duration': None, # reposts 'reposted_claim_hash': None, # signed claims 'channel_hash': None, - 'signature': None, - 'signature_digest': None, 'is_signature_valid': None, } - try: - claim = txo.claim - except Exception: - #self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.") - return claim_record, [] + claim = txo.can_decode_claim + if not claim: + return d, [] if claim.is_stream: - claim_record['claim_type'] = TXO_TYPES['stream'] - claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])] - claim_record['media_type'] = claim.stream.source.media_type - claim_record['title'] = claim.stream.title.replace('\x00', '') - claim_record['description'] = claim.stream.description.replace('\x00', '') - claim_record['author'] = claim.stream.author.replace('\x00', '') + d['claim_type'] = TXO_TYPES['stream'] + d['stream_type'] = STREAM_TYPES[guess_stream_type(d['media_type'])] + d['media_type'] = claim.stream.source.media_type + d['title'] = claim.stream.title.replace('\x00', '') + d['description'] = claim.stream.description.replace('\x00', '') + d['author'] = claim.stream.author.replace('\x00', '') if claim.stream.video and claim.stream.video.duration: - claim_record['duration'] = claim.stream.video.duration + d['duration'] = claim.stream.video.duration if claim.stream.audio and claim.stream.audio.duration: - claim_record['duration'] = claim.stream.audio.duration + d['duration'] = claim.stream.audio.duration if claim.stream.release_time: - claim_record['release_time'] = claim.stream.release_time + d['release_time'] = claim.stream.release_time if claim.stream.has_fee: fee = claim.stream.fee - if isinstance(fee.currency, str): - claim_record['fee_currency'] = fee.currency.lower() if isinstance(fee.amount, Decimal): - claim_record['fee_amount'] = int(fee.amount*1000) + d['fee_amount'] = int(fee.amount*1000) + if isinstance(fee.currency, str): + d['fee_currency'] = fee.currency.lower() elif claim.is_repost: - claim_record['claim_type'] = TXO_TYPES['repost'] - claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash + d['claim_type'] = TXO_TYPES['repost'] + d['reposted_claim_hash'] = claim.repost.reference.claim_hash elif claim.is_channel: - claim_record['claim_type'] = TXO_TYPES['channel'] + d['claim_type'] = TXO_TYPES['channel'] if claim.is_signed: - claim_record['channel_hash'] = claim.signing_channel_hash - claim_record['signature'] = txo.get_encoded_signature() - claim_record['signature_digest'] = txo.get_signature_digest(self.ledger) + d['channel_hash'] = claim.signing_channel_hash + d['is_signature_valid'] = Output.is_signature_valid( + signature, signature_digest, channel_public_key + ) - tags = [ - {'claim_hash': claim_hash, 'tag': tag} for tag in clean_tags(claim.message.tags) - ] + tags = [] + if claim.message.tags: + claim_hash = txo.claim_hash + tags = [ + {'claim_hash': claim_hash, 'tag': tag} + for tag in clean_tags(claim.message.tags) + ] - return claim_record, tags + return d, tags - def add_block(self, block: Block, add_claims_supports: set = None): - self.blocks.append(self.block_to_row(block)) - for tx in block.txs: - self.add_transaction(block.block_hash, tx, add_claims_supports) - return self - - def add_transaction(self, block_hash: bytes, tx: Transaction, add_claims_supports: set = None): - self.txs.append(self.tx_to_row(block_hash, tx)) - for txi in tx.inputs: - if txi.coinbase is None: - self.txis.append(self.txi_to_row(tx, txi)) - for txo in tx.outputs: - self.txos.append(self.txo_to_row(tx, txo)) - if add_claims_supports: - if txo.is_support and txo.hash in add_claims_supports: - self.add_support(txo) - elif txo.is_claim and txo.hash in add_claims_supports: - self.add_claim(txo) - return self - - def add_support(self, txo: Output): + def support_to_row(self, txo): tx = txo.tx_ref.tx - claim_hash = txo.claim_hash - support_record = { + d = { 'txo_hash': txo.ref.hash, - 'claim_hash': claim_hash, + 'claim_hash': txo.claim_hash, 'address': txo.get_address(self.ledger), 'amount': txo.amount, 'height': tx.height, @@ -519,55 +505,94 @@ class BulkLoader: 'signature': None, 'signature_digest': None, } - self.supports.append(support_record) support = txo.can_decode_support if support: - support_record['emoji'] = support.emoji + d['emoji'] = support.emoji if support.is_signed: - support_record['channel_hash'] = support.signing_channel_hash - support_record['signature'] = txo.get_encoded_signature() - support_record['signature_digest'] = txo.get_signature_digest(None) + d['channel_hash'] = support.signing_channel_hash + d['signature'] = txo.get_encoded_signature() + d['signature_digest'] = txo.get_signature_digest(None) + return d - def add_claim(self, txo: Output): - claim, tags = self.claim_to_rows(txo) - if claim: - tx = txo.tx_ref.tx - if txo.script.is_claim_name: - claim['creation_height'] = tx.height - claim['creation_timestamp'] = tx.timestamp - else: - claim['creation_height'] = None - claim['creation_timestamp'] = None - self.claims.append(claim) - self.tags.extend(tags) + def add_block(self, block: Block): + self.blocks.append(self.block_to_row(block)) + for tx in block.txs: + self.add_transaction(block.block_hash, tx) return self - def update_claim(self, txo: Output): - claim, tags = self.claim_to_rows(txo) - if claim: - claim['claim_hash_'] = claim.pop('claim_hash') - self.update_claims.append(claim) - self.delete_tags.append({'claim_hash_': claim['claim_hash_']}) - self.tags.extend(tags) + def add_transaction(self, block_hash: bytes, tx: Transaction): + self.txs.append(self.tx_to_row(block_hash, tx)) + for txi in tx.inputs: + if txi.coinbase is None: + self.txis.append(self.txi_to_row(tx, txi)) + for txo in tx.outputs: + self.txos.append(self.txo_to_row(tx, txo)) return self - def save(self, batch_size=10000): - queries = ( + def add_support(self, txo: Output): + self.supports.append(self.support_to_row(txo)) + + def add_claim( + self, txo: Output, short_url: str, + creation_height: int, activation_height: int, expiration_height: int, + takeover_height: int = None, channel_url: str = None, **extra): + try: + claim_name = txo.claim_name.replace('\x00', '') + normalized_name = txo.normalized_name + except UnicodeDecodeError: + return self + d, tags = self.claim_to_rows(txo, **extra) + d['claim_hash'] = txo.claim_hash + d['claim_id'] = txo.claim_id + d['claim_name'] = claim_name + d['normalized'] = normalized_name + d['short_url'] = short_url + d['creation_height'] = creation_height + d['activation_height'] = activation_height + d['expiration_height'] = expiration_height + d['takeover_height'] = takeover_height + d['is_controlling'] = takeover_height is not None + if d['is_signature_valid']: + d['canonical_url'] = channel_url + '/' + short_url + else: + d['canonical_url'] = None + self.claims.append(d) + self.tags.extend(tags) + return self + + def update_claim(self, txo: Output, channel_url: Optional[str], **extra): + d, tags = self.claim_to_rows(txo, **extra) + d['pk'] = txo.claim_hash + d['channel_url'] = channel_url + d['set_canonical_url'] = d['is_signature_valid'] + self.update_claims.append(d) + self.delete_tags.append({'pk': txo.claim_hash}) + self.tags.extend(tags) + return self + + def get_queries(self): + return ( (Block.insert(), self.blocks), (TX.insert(), self.txs), (TXO.insert(), self.txos), (TXI.insert(), self.txis), (Claim.insert(), self.claims), - (Tag.delete().where(Tag.c.claim_hash == bindparam('claim_hash_')), self.delete_tags), - (Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')), self.update_claims), + (Tag.delete().where(Tag.c.claim_hash == bindparam('pk')), self.delete_tags), + (Claim.update().where(Claim.c.claim_hash == bindparam('pk')).values( + canonical_url=case([ + (bindparam('set_canonical_url'), bindparam('channel_url') + '/' + Claim.c.short_url) + ], else_=None) + ), self.update_claims), (Tag.insert(), self.tags), (Support.insert(), self.supports), ) + def save(self, unit_table, batch_size=10000): + queries = self.get_queries() + p = self.ctx.current_progress done = row_scale = 0 if p: - unit_table = p.event.unit.table progress_total, row_total = 0, sum(len(q[1]) for q in queries) for sql, rows in queries: if sql.table == unit_table: @@ -610,3 +635,18 @@ class BulkLoader: if p: done += int(len(chunk_rows)/row_scale) p.step(done) + + def flush(self, done_counter_table) -> int: + execute = self.ctx.connection.execute + done = 0 + for sql, rows in self.get_queries(): + if not rows: + continue + if self.ctx.is_postgres and isinstance(sql, Insert): + self.ctx.pg_copy(sql.table, rows) + else: + execute(sql, rows) + if sql.table == done_counter_table: + done += len(rows) + rows.clear() + return done diff --git a/lbry/db/sync.py b/lbry/db/sync.py index 354e8d67f..492c8c670 100644 --- a/lbry/db/sync.py +++ b/lbry/db/sync.py @@ -61,16 +61,20 @@ def set_input_addresses(ctx): def update_spent_outputs(ctx): - # Update spent TXOs setting is_spent = True - set_is_spent = ( + # Update spent TXOs setting spent_height + set_spent_height = ( TXO.update() - .values({TXO.c.is_spent: True}) - .where( - (TXO.c.is_spent == False) & + .values({ + TXO.c.spent_height: ( + select(TXI.c.height) + .where(TXI.c.txo_hash == TXO.c.txo_hash) + ) + }).where( + (TXO.c.spent_height == 0) & (TXO.c.txo_hash.in_(select(TXI.c.txo_hash))) ) ) - ctx.execute(set_is_spent) + ctx.execute(set_spent_height) def condition_spent_claims(claim_type: list = None): @@ -84,14 +88,14 @@ def condition_spent_claims(claim_type: list = None): else: type_filter = TXO.c.txo_type.in_(CLAIM_TYPE_CODES) return Claim.c.claim_hash.notin_( - select(TXO.c.claim_hash).where(type_filter & (TXO.c.is_spent == False)) + select(TXO.c.claim_hash).where(type_filter & (TXO.c.spent_height == 0)) ) # find UTXOs that are claims and their claim_id is not in claim table, # this means they need to be inserted select_missing_claims = ( - select_txos(txo_type__in=CLAIM_TYPE_CODES, is_spent=False, claim_id_not_in_claim_table=True) + select_txos(txo_type__in=CLAIM_TYPE_CODES, spent_height=0, claim_id_not_in_claim_table=True) ) @@ -100,7 +104,7 @@ select_missing_claims = ( # all claims_ids should match between TXO and Claim table but txo_hashes will not match for # claims that are not up-to-date select_stale_claims = ( - select_txos(txo_type__in=CLAIM_TYPE_CODES, is_spent=False, txo_id_not_in_claim_table=True) + select_txos(txo_type__in=CLAIM_TYPE_CODES, spent_height=0, txo_id_not_in_claim_table=True) ) @@ -108,12 +112,19 @@ condition_spent_supports = ( Support.c.txo_hash.notin_( select(TXO.c.txo_hash).where( (TXO.c.txo_type == TXO_TYPES['support']) & - (TXO.c.is_spent == False) + (TXO.c.spent_height == 0) ) ) ) -select_missing_supports = ( - select_txos(txo_type=TXO_TYPES['support'], is_spent=False, txo_id_not_in_support_table=True) +condition_missing_supports = ( + (TXO.c.txo_type == TXO_TYPES['support']) & + (TXO.c.spent_height == 0) & + (TXO.c.txo_hash.notin_(select(Support.c.txo_hash))) +) + + +select_missing_supports = ( + select_txos(txo_type=TXO_TYPES['support'], spent_height=0, txo_id_not_in_support_table=True) ) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 7d62f83ed..d0e5c6769 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -1,9 +1,11 @@ # pylint: skip-file from sqlalchemy import ( - MetaData, Table, Column, ForeignKey, - LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean + MetaData, Table, Column, ForeignKey, PrimaryKeyConstraint, + LargeBinary, Text, SmallInteger, Integer, BigInteger, Boolean, + text ) +from .constants import TXO_TYPES SCHEMA_VERSION = '1.4' @@ -71,9 +73,9 @@ TXO = Table( Column('position', SmallInteger), Column('amount', BigInteger), Column('height', Integer), + Column('spent_height', Integer, server_default='0'), Column('script_offset', Integer), Column('script_length', Integer), - Column('is_spent', Boolean, server_default='0'), Column('is_reserved', Boolean, server_default='0'), # claims @@ -82,6 +84,8 @@ TXO = Table( Column('claim_hash', LargeBinary, nullable=True), Column('claim_name', Text, nullable=True), Column('channel_hash', LargeBinary, nullable=True), # claims in channel + Column('signature', LargeBinary, nullable=True), + Column('signature_digest', LargeBinary, nullable=True), # channels Column('public_key', LargeBinary, nullable=True), @@ -91,17 +95,36 @@ TXO = Table( txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address) +def pg_add_txo_constraints_and_indexes(execute): + execute(text("ALTER TABLE txo ADD PRIMARY KEY (txo_hash);")) + execute(text(f""" + CREATE INDEX txo_channel_hash_w_height_desc_and_pub_key + ON txo (claim_hash, height desc) INCLUDE (public_key) + WHERE txo_type={TXO_TYPES['channel']}; + """)) + execute(text(f""" + CREATE INDEX txo_unspent_supports + ON txo (claim_hash) INCLUDE (amount) + WHERE spent_height = 0 AND txo_type={TXO_TYPES['support']}; + """)) + + TXI = Table( 'txi', metadata, Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)), Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash), primary_key=True), Column('address', Text, nullable=True), Column('position', SmallInteger), + Column('height', Integer), ) txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address) +def pg_add_txi_constraints_and_indexes(execute): + execute(text("ALTER TABLE txi ADD PRIMARY KEY (txo_hash);")) + + Claim = Table( 'claim', metadata, Column('claim_hash', LargeBinary, primary_key=True), @@ -111,19 +134,20 @@ Claim = Table( Column('address', Text), Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash)), Column('amount', BigInteger), - Column('staked_amount', BigInteger, server_default='0'), + Column('staked_amount', BigInteger), Column('timestamp', Integer), # last updated timestamp Column('creation_timestamp', Integer), Column('release_time', Integer, nullable=True), Column('height', Integer), # last updated height Column('creation_height', Integer), - Column('activation_height', Integer, nullable=True), - Column('expiration_height', Integer, nullable=True), + Column('activation_height', Integer), + Column('expiration_height', Integer), Column('takeover_height', Integer, nullable=True), - Column('is_controlling', Boolean, server_default='0'), + Column('sync_height', Integer), # claim dynamic values up-to-date as of this height (eg. staked_amount) + Column('is_controlling', Boolean), # normalized#shortest-unique-claim_id - Column('short_url', Text, nullable=True), + Column('short_url', Text), # channel's-short_url/normalized#shortest-unique-claim_id-within-channel Column('canonical_url', Text, nullable=True), @@ -152,8 +176,6 @@ Claim = Table( # claims which are inside channels Column('channel_hash', LargeBinary, nullable=True), - Column('signature', LargeBinary, nullable=True), - Column('signature_digest', LargeBinary, nullable=True), Column('is_signature_valid', Boolean, nullable=True), Column('trending_group', BigInteger, server_default='0'), diff --git a/scripts/simulate_sync_console.py b/scripts/simulate_sync_console.py index 75ba49fd1..ee7f3dba8 100644 --- a/scripts/simulate_sync_console.py +++ b/scripts/simulate_sync_console.py @@ -10,6 +10,7 @@ from lbry.service import FullNode from lbry.console import Advanced, Basic from lbry.conf import Config from lbry.db.utils import chunk +from lbry.db.query_context import Event def cause_protobuf_stderr(): @@ -33,7 +34,6 @@ class Simulator: self.starting_file = 0 self.processes = console.service.db.processes - self.steps = [] self.txs = 0 self.claims = 0 self.supports = 0 @@ -46,7 +46,6 @@ class Simulator: async def advance(self, initial_sync: bool, ending_height: int, files: List[int], txs: int): self.ending_height = ending_height - self.steps = self.sync.get_steps(initial_sync) self.txs = txs self.claims = int(txs/4) self.supports = int(txs/2) @@ -57,7 +56,6 @@ class Simulator: "ending_height": ending_height, "files": len(files), "blocks": self.blocks, - "sync_steps": self.steps, "txs": self.txs, "claims": self.claims, "supports": self.supports, @@ -78,14 +76,14 @@ class Simulator: txs_synced += txs tasks.append(self.sync_block_file(file, blocks, txs)) await asyncio.wait(tasks) - for step in self.steps: - if step in ("blockchain.sync.block.read", "blockchain.sync.block.save"): + for step in Event._events: + if step.name in ("blockchain.sync.block.read", "blockchain.sync.block.save"): continue - await getattr(self, step.replace('.', '_'))() - await self.progress.add({ - "event": "blockchain.sync.complete", - "data": {"step": len(self.steps), "total": len(self.steps), "unit": "tasks"} - }) + await getattr(self, step.name.replace('.', '_'))() + #await self.progress.add({ + # "event": "blockchain.sync.complete", + # "data": {"step": len(self.steps), "total": len(self.steps), "unit": "tasks"} + #}) self.ending_height = ending_height+1 self.starting_height = self.ending_height @@ -123,50 +121,26 @@ class Simulator: await asyncio.sleep(delay) await self.progress.add({"event": event, "data": {"step": steps, "total": steps, "unit": unit}}) - async def db_sync_input(self): - await self.generate_steps("db.sync.input", 2, "txis") + async def blockchain_sync_block_filters(self): + await self.generate_steps("blockchain.sync.block.filters", 5, "blocks") - async def db_sync_claim_delete(self): - await self.generate_steps("db.sync.claim.delete", 1, "claims") + async def blockchain_sync_spends(self): + await self.generate_steps("blockchain.sync.spends", 5, "steps") - async def db_sync_claim_insert(self): - await self.generate_steps("db.sync.claim.insert", 1, "claims") - - async def db_sync_claim_update(self): - await self.generate_steps("db.sync.claim.update", 0, "claims") - - async def db_sync_support_delete(self): - await self.generate_steps("db.sync.support.delete", 1, "supports") - - async def db_sync_support_insert(self): - await self.generate_steps("db.sync.support.insert", 1, "supports") - - async def blockchain_sync_claim_trie(self): - await self.generate_steps("blockchain.sync.claim.trie", 1, "claims") - - async def blockchain_sync_claim_meta(self): + async def blockchain_sync_claims(self): for i in range(0, self.claims, 1_000): await self.progress.add({ - "event": "blockchain.sync.claim.meta", + "event": "blockchain.sync.claims", "data": {"step": i, "total": self.claims, "unit": "claims"} }) await asyncio.sleep(0.1) await self.progress.add({ - "event": "blockchain.sync.claim.meta", + "event": "blockchain.sync.claims", "data": {"step": self.claims, "total": self.claims, "unit": "claims"} }) - async def blockchain_sync_claim_signatures(self): - await self.generate_steps("blockchain.sync.claim.signatures", self.claims, "claims", 0.5, 1000) - - async def blockchain_sync_support_signatures(self): - await self.generate_steps("blockchain.sync.support.signatures", self.supports, "supports", 0.5, 1000) - - async def blockchain_sync_claim_stakes(self): - await self.generate_steps("blockchain.sync.claim.stakes", 1, "claims", 0.5) - - async def blockchain_sync_claim_channels(self): - await self.generate_steps("blockchain.sync.claim.channels", 0, "supports", 0.5) + async def blockchain_sync_supports(self): + await self.generate_steps("blockchain.sync.supports", 5, "supports") async def main(): diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 09287951a..0bf92a98a 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -469,7 +469,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): self.sync.on_progress.listen(events.append) # initial_sync = True - await self.sync.advance(True) + await self.sync.advance() await asyncio.sleep(1) # give it time to collect events self.assertEqual( events[0], { @@ -477,13 +477,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): "data": { "starting_height": 0, "ending_height": 352, - "sync_steps": [ - "blockchain.sync.block.read", - "blockchain.sync.block.save", - "db.sync.input", - "blockchain.sync.claim.meta", - "blockchain.sync.claim.signatures", - "blockchain.sync.support.signatures"], "files": 3, "blocks": 353, "txs": 544, @@ -508,33 +501,28 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): self.assertEventsAlmostEqual( self.extract_block_events('blockchain.sync.block.save', events), [ [0, 0, 280], - [0, 5, 280], - [0, 13, 280], - [0, 76, 280], - [0, 79, 280], - [0, 128, 280], - [0, 277, 280], + [0, 19, 280], + [0, 47, 280], + [0, 267, 280], + [0, 278, 280], [0, 280, 280], [1, 0, 178], - [1, 1, 178], - [1, 4, 178], - [1, 42, 178], - [1, 44, 178], - [1, 77, 178], - [1, 176, 178], + [1, 6, 178], + [1, 19, 178], + [1, 167, 178], + [1, 175, 178], [1, 178, 178], [2, 0, 86], - [2, 5, 86], - [2, 9, 86], - [2, 31, 86], - [2, 44, 86], + [2, 11, 86], + [2, 24, 86], [2, 83, 86], + [2, 85, 86], [2, 86, 86], ] ) - # 3 - db.sync.input + # 3 - blockchain.sync.spends self.assertEventsAlmostEqual( - self.extract_events('db.sync.input', events), [ + self.extract_events('blockchain.sync.spends', events), [ [0, 9], [1, 9], [2, 9], @@ -547,31 +535,26 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): [9, 9], ] ) - # 4 - blockchain.sync.claim.meta + # 4 - blockchain.sync.claims self.assertEqual( [[0, 3610], [3610, 3610]], - self.extract_events('blockchain.sync.claim.meta', events) + self.extract_events('blockchain.sync.claims', events) ) - # 5 - blockchain.sync.claim.signatures + # 4 - blockchain.sync.supports self.assertEqual( - [[0, 0]], self.extract_events('blockchain.sync.claim.signatures', events) - ) - # 6 - blockchain.sync.support.signatures - self.assertEqual( - [[0, 0]], self.extract_events('blockchain.sync.support.signatures', events) - ) - # Complete - self.assertEqual( - [[6, 6]], self.extract_events('blockchain.sync.complete', events) + [[0, 2], [2, 2]], + self.extract_events('blockchain.sync.supports', events) ) # initial_sync = False & no new blocks events.clear() await self.sync.advance() # should be no-op await asyncio.sleep(1) # give it time to collect events - self.assertListEqual([], events) + self.assertEqual([[0, 0]], self.extract_events('blockchain.sync.claims', events)) + self.assertEqual([[0, 0]], self.extract_events('blockchain.sync.supports', events)) # initial_sync = False + events.clear() txid = await self.chain.claim_name('foo', 'beef', '0.01') await self.chain.generate(1) tx = Transaction(unhexlify(await self.chain.get_raw_transaction(txid))) @@ -586,21 +569,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): "data": { "starting_height": 353, "ending_height": 354, - "sync_steps": [ - "blockchain.sync.block.read", - "blockchain.sync.block.save", - "db.sync.input", - "db.sync.claim.delete", - "db.sync.claim.insert", - "db.sync.claim.update", - "db.sync.support.delete", - "db.sync.support.insert", - "blockchain.sync.claim.trie", - "blockchain.sync.claim.meta", - "blockchain.sync.claim.signatures", - "blockchain.sync.support.signatures", - "blockchain.sync.claim.stakes", - "blockchain.sync.claim.channels"], "files": 1, "blocks": 2, "txs": 4, @@ -625,38 +593,22 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): [2, 4, 4], ] ) - # 3 - db.sync.input + # 3 - blockchain.sync.spends self.assertEqual( - self.extract_events('db.sync.input', events), [ - [0, 2], # started - [1, 2], # updating addresses on inputs - [2, 2], # connect spent outputs to their inputs + self.extract_events('blockchain.sync.spends', events), [ + [0, 2], + [1, 2], + [2, 2], ] ) - # 4 - db.sync.claim.delete - self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.claim.delete', events)) - # 5 - db.sync.claim.insert - self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.claim.insert', events)) - # 6 - db.sync.claim.update - self.assertEqual([[0, 0]], self.extract_events('db.sync.claim.update', events)) - # 7 - db.sync.support.delete - self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.delete', events)) - # 8 - db.sync.support.insert - self.assertEqual([[0, 1], [1, 1]], self.extract_events('db.sync.support.insert', events)) - # 9 - blockchain.sync.claim.trie - self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.trie', events)) - # 10 - blockchain.sync.claim.meta - self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.claim.meta', events)) - # 11 - blockchain.sync.claim.signatures - self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.claim.signatures", events)) - # 12 - blockchain.sync.support.signatures - self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.support.signatures", events)) - # 13 - blockchain.sync.claim.stakes - self.assertEqual([[0, 1], [1, 1]], self.extract_events("blockchain.sync.claim.stakes", events)) - # 14 - blockchain.sync.claim.channels - self.assertEqual([[0, 0]], self.extract_events("blockchain.sync.claim.channels", events)) - # Complete - self.assertEqual([[14, 14]], self.extract_events('blockchain.sync.complete', events)) + # 4 - blockchain.sync.claims + self.assertEqual( + self.extract_events('blockchain.sync.claims', events), [ + [0, 3], [1, 3], [2, 3], [3, 3] + ] + ) + # 5 - blockchain.sync.supports + self.assertEqual([[0, 1], [1, 1]], self.extract_events('blockchain.sync.supports', events)) class TestGeneralBlockchainSync(SyncingBlockchainTestCase):