diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 11c755a8a..de09b0526 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -1,5 +1,5 @@ import logging -from typing import Tuple, Union +from typing import Tuple from sqlalchemy import case, func, desc, text from sqlalchemy.future import select @@ -8,12 +8,12 @@ from lbry.db.queries.txio import ( minimum_txo_columns, row_to_txo, where_unspent_txos, where_claims_with_changed_supports, count_unspent_txos, where_channels_with_changed_content, - where_abandoned_claims + where_abandoned_claims, count_channels_with_changed_content ) from lbry.db.query_context import ProgressContext, event_emitter from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_constraints_and_indexes from lbry.db.utils import least -from lbry.db.constants import TXO_TYPES +from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.blockchain.transaction import Output from .context import get_or_initialize_lbrycrd @@ -64,58 +64,45 @@ def make_label(action, blocks): def select_claims_for_saving( - txo_types: Union[int, Tuple[int, ...]], blocks: Tuple[int, int], missing_in_claims_table=False, missing_or_stale_in_claims_table=False, ): - select_claims = select( + channel_txo = TXO.alias('channel_txo') + return select( *minimum_txo_columns, TXO.c.claim_hash, staked_support_amount_calc(TXO).label('staked_support_amount'), - staked_support_count_calc(TXO).label('staked_support_count') + staked_support_count_calc(TXO).label('staked_support_count'), + TXO.c.signature, TXO.c.signature_digest, + case([( + TXO.c.channel_hash.isnot(None), + select(channel_txo.c.public_key).select_from(channel_txo).where( + (channel_txo.c.txo_type == TXO_TYPES['channel']) & + (channel_txo.c.claim_hash == TXO.c.channel_hash) & + (channel_txo.c.height <= TXO.c.height) + ).order_by(desc(channel_txo.c.height)).limit(1).scalar_subquery() + )]).label('channel_public_key') ).where( where_unspent_txos( - txo_types, blocks, + CLAIM_TYPE_CODES, blocks, missing_in_claims_table=missing_in_claims_table, missing_or_stale_in_claims_table=missing_or_stale_in_claims_table, ) ).select_from(TXO.join(TX)) - if txo_types != TXO_TYPES['channel']: - channel_txo = TXO.alias('channel_txo') - return ( - select_claims.add_columns( - TXO.c.signature, TXO.c.signature_digest, - case([( - TXO.c.channel_hash.isnot(None), - select(channel_txo.c.public_key).select_from(channel_txo).where( - (channel_txo.c.txo_type == TXO_TYPES['channel']) & - (channel_txo.c.claim_hash == TXO.c.channel_hash) & - (channel_txo.c.height <= TXO.c.height) - ).order_by(desc(channel_txo.c.height)).limit(1).scalar_subquery() - )]).label('channel_public_key') - ) - ) - return select_claims def row_to_claim_for_saving(row) -> Tuple[Output, dict]: - txo = row_to_txo(row) - extra = { + return row_to_txo(row), { 'staked_support_amount': int(row.staked_support_amount), 'staked_support_count': int(row.staked_support_count), + 'signature': row.signature, + 'signature_digest': row.signature_digest, + 'channel_public_key': row.channel_public_key } - if hasattr(row, 'signature'): - extra.update({ - 'signature': row.signature, - 'signature_digest': row.signature_digest, - 'channel_public_key': row.channel_public_key - }) - return txo, extra @event_emitter("blockchain.sync.claims.insert", "claims") def claims_insert( - txo_types: Union[int, Tuple[int, ...]], blocks: Tuple[int, int], missing_in_claims_table: bool, p: ProgressContext @@ -124,23 +111,21 @@ def claims_insert( p.start( count_unspent_txos( - txo_types, blocks, + CLAIM_TYPE_CODES, blocks, missing_in_claims_table=missing_in_claims_table, ), progress_id=blocks[0], label=make_label("add claims at", blocks) ) - channel_url_cache = {} - with p.ctx.engine.connect().execution_options(stream_results=True) as c: loader = p.ctx.get_bulk_loader() cursor = c.execute(select_claims_for_saving( - txo_types, blocks, missing_in_claims_table=missing_in_claims_table + blocks, missing_in_claims_table=missing_in_claims_table ).order_by(TXO.c.claim_hash)) for rows in cursor.partitions(900): claim_metadata = chain.db.sync_get_claim_metadata( claim_hashes=[row['claim_hash'] for row in rows] ) - i, txos_w_extra, unknown_channel_urls, txos_wo_channel_url = 0, [], set(), [] + i = 0 for row in rows: metadata = claim_metadata[i] if i < len(claim_metadata) else None if metadata is None: @@ -157,37 +142,12 @@ def claims_insert( 'expiration_height': metadata['expiration_height'], 'takeover_height': metadata['takeover_height'], }) - txos_w_extra.append((txo, extra)) - set_or_add_to_url_lookup( - channel_url_cache, txo, extra, unknown_channel_urls, txos_wo_channel_url - ) - perform_url_lookup(chain, channel_url_cache, unknown_channel_urls, txos_wo_channel_url) - for txo, extra in txos_w_extra: loader.add_claim(txo, **extra) if len(loader.claims) >= 25_000: p.add(loader.flush(Claim)) p.add(loader.flush(Claim)) -def set_or_add_to_url_lookup(cache: dict, txo: Output, extra: dict, to_lookup: set, to_set: list): - claim = txo.can_decode_claim - if claim and claim.is_signed: - if claim.signing_channel_hash not in cache: - to_lookup.add(claim.signing_channel_hash) - to_set.append((claim.signing_channel_hash, extra)) - else: - extra['channel_url'] = cache[claim.signing_channel_hash] - - -def perform_url_lookup(chain, cache, to_lookup: set, to_set: list): - if to_lookup: - channels = chain.db.sync_get_claim_metadata(claim_hashes=list(to_lookup)) - for channel in channels: - cache[channel['claim_hash']] = channel['short_url'] - for channel_hash, extra in to_set: - extra['channel_url'] = cache.get(channel_hash) - - @event_emitter("blockchain.sync.claims.indexes", "steps") def claims_constraints_and_indexes(p: ProgressContext): p.start(2) @@ -202,27 +162,18 @@ def claims_constraints_and_indexes(p: ProgressContext): @event_emitter("blockchain.sync.claims.update", "claims") -def claims_update(txo_types: Union[int, Tuple[int, ...]], blocks: Tuple[int, int], p: ProgressContext): +def claims_update(blocks: Tuple[int, int], p: ProgressContext): p.start( - count_unspent_txos(txo_types, blocks, missing_or_stale_in_claims_table=True), + count_unspent_txos(CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True), progress_id=blocks[0], label=make_label("update claims at", blocks) ) - chain = get_or_initialize_lbrycrd(p.ctx) with p.ctx.engine.connect().execution_options(stream_results=True) as c: loader = p.ctx.get_bulk_loader() cursor = c.execute(select_claims_for_saving( - txo_types, blocks, missing_or_stale_in_claims_table=True + blocks, missing_or_stale_in_claims_table=True )) - channel_url_cache = {} for row in cursor: txo, extra = row_to_claim_for_saving(row) - claim = txo.can_decode_claim - if claim and claim.is_signed: - if claim.signing_channel_hash not in channel_url_cache: - channels = chain.db.sync_get_claim_metadata(claim_hashes=[claim.signing_channel_hash]) - if channels: - channel_url_cache[channels[0]['claim_hash']] = channels[0]['short_url'] - extra['channel_url'] = channel_url_cache.get(claim.signing_channel_hash) loader.update_claim(txo, **extra) if len(loader.update_claims) >= 500: p.add(loader.flush(Claim)) @@ -276,15 +227,18 @@ def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext): @event_emitter("blockchain.sync.claims.channels", "channels") -def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, channels: int, p: ProgressContext): - p.start(channels, label="channel stats") +def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext): update_sql = Claim.update().values( signed_claim_count=channel_content_count_calc(Claim.alias('content')), signed_support_count=channel_content_count_calc(Support), ) if initial_sync: + p.start(p.ctx.fetchtotal(Claim.c.claim_type == TXO_TYPES['channel']), label="channel stats") update_sql = update_sql.where(Claim.c.claim_type == TXO_TYPES['channel']) - else: + elif blocks: + p.start(count_channels_with_changed_content(blocks), label="channel stats") update_sql = update_sql.where(where_channels_with_changed_content(blocks)) + else: + return result = p.ctx.execute(update_sql) p.step(result.rowcount) diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index 5b0c5c45b..e348006d4 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -5,7 +5,7 @@ from typing import Optional, Tuple, Set, List, Coroutine from lbry.db import Database from lbry.db import queries as q -from lbry.db.constants import TXO_TYPES, CONTENT_TYPE_CODES +from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES from lbry.db.query_context import Event, Progress from lbry.event import BroadcastSubscription from lbry.service.base import Sync, BlockEvent @@ -187,73 +187,44 @@ class BlockchainSync(Sync): start_height=blocks[0], end_height=blocks[-1] ) - async def sync_claims(self, blocks): - total = delete_claims = takeovers = claims_with_changed_supports = 0 + async def sync_claims(self, blocks) -> bool: + delete_claims = takeovers = claims_with_changed_supports = 0 initial_sync = not await self.db.has_claims() with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p: if initial_sync: - p.start(2) - # 1. distribute channel insertion load - channels, channel_batches = await self.distribute_unspent_txos(TXO_TYPES['channel']) - channels_with_changed_content = channels - total += channels + channels_with_changed_content - p.step() - # 2. distribute content insertion load - content, content_batches = await self.distribute_unspent_txos(CONTENT_TYPE_CODES) - total += content - p.step() + total, batches = await self.distribute_unspent_txos(CLAIM_TYPE_CODES) elif blocks: - p.start(6) - # 1. channel claims to be inserted or updated - channels = await self.count_unspent_txos( - TXO_TYPES['channel'], blocks, missing_or_stale_in_claims_table=True + p.start(4) + # 1. content claims to be inserted or updated + total = await self.count_unspent_txos( + CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True ) - channel_batches = [blocks] if channels else [] - total += channels + batches = [blocks] if total else [] p.step() - # 2. content claims to be inserted or updated - content = await self.count_unspent_txos( - CONTENT_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True - ) - content_batches = [blocks] if content else [] - total += content - p.step() - # 3. claims to be deleted + # 2. claims to be deleted delete_claims = await self.count_abandoned_claims() total += delete_claims p.step() - # 4. claims to be updated with new support totals + # 3. claims to be updated with new support totals claims_with_changed_supports = await self.count_claims_with_changed_supports(blocks) total += claims_with_changed_supports p.step() - # 5. channels to be updated with changed content totals - channels_with_changed_content = await self.count_channels_with_changed_content(blocks) - total += channels_with_changed_content - p.step() - # 6. claims to be updated due to name takeovers + # 5. claims to be updated due to name takeovers takeovers = await self.count_takeovers(blocks) total += takeovers p.step() else: - return + return initial_sync with Progress(self.db.message_queue, CLAIMS_MAIN_EVENT) as p: p.start(total) - insertions = [ - (TXO_TYPES['channel'], channel_batches), - (CONTENT_TYPE_CODES, content_batches), - ] - for txo_type, batches in insertions: - if batches: + if batches: + await self.run_tasks([ + self.db.run(claim_phase.claims_insert, batch, not initial_sync) for batch in batches + ]) + if not initial_sync: await self.run_tasks([ - self.db.run( - claim_phase.claims_insert, txo_type, batch, not initial_sync - ) for batch in batches + self.db.run(claim_phase.claims_update, batch) for batch in batches ]) - if not initial_sync: - await self.run_tasks([ - self.db.run(claim_phase.claims_update, txo_type, batch) - for batch in batches - ]) if delete_claims: await self.db.run(claim_phase.claims_delete, delete_claims) if takeovers: @@ -262,8 +233,7 @@ class BlockchainSync(Sync): await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports) if initial_sync: await self.db.run(claim_phase.claims_constraints_and_indexes) - if channels_with_changed_content: - return initial_sync, channels_with_changed_content + return initial_sync async def sync_supports(self, blocks): delete_supports = 0 @@ -298,11 +268,8 @@ class BlockchainSync(Sync): if initial_sync: await self.db.run(support_phase.supports_constraints_and_indexes) - async def sync_channel_stats(self, blocks, initial_sync, channels_with_changed_content): - if channels_with_changed_content: - await self.db.run( - claim_phase.update_channel_stats, blocks, initial_sync, channels_with_changed_content - ) + async def sync_channel_stats(self, blocks, initial_sync): + await self.db.run(claim_phase.update_channel_stats, blocks, initial_sync) async def sync_trends(self): pass @@ -312,10 +279,9 @@ class BlockchainSync(Sync): sync_filters_task = asyncio.create_task(self.sync_filters()) sync_trends_task = asyncio.create_task(self.sync_trends()) await self.sync_spends(blocks_added) - channel_stats = await self.sync_claims(blocks_added) + initial_claim_sync = await self.sync_claims(blocks_added) await self.sync_supports(blocks_added) - if channel_stats: - await self.sync_channel_stats(blocks_added, *channel_stats) + await self.sync_channel_stats(blocks_added, initial_claim_sync) await sync_trends_task await sync_filters_task if blocks_added: diff --git a/lbry/db/queries/search.py b/lbry/db/queries/search.py index 6eb17cb9b..ba2a434ea 100644 --- a/lbry/db/queries/search.py +++ b/lbry/db/queries/search.py @@ -4,7 +4,7 @@ from decimal import Decimal from binascii import unhexlify from typing import Tuple, List, Optional -from sqlalchemy import func +from sqlalchemy import func, case from sqlalchemy.future import select, Select from lbry.schema.tags import clean_tags @@ -56,6 +56,7 @@ def search_support_count(**constraints) -> int: return count[0]['total'] or 0 +channel_claim = Claim.alias('channel') BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ Claim.c.activation_height, Claim.c.takeover_height, @@ -64,15 +65,19 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ Claim.c.channel_hash, Claim.c.reposted_claim_hash, Claim.c.short_url, - Claim.c.canonical_url, Claim.c.signed_claim_count, Claim.c.signed_support_count, (Claim.c.amount + Claim.c.staked_support_amount).label('staked_amount'), Claim.c.staked_support_amount, Claim.c.staked_support_count, Claim.c.is_signature_valid, + case([( + channel_claim.c.short_url.isnot(None), + channel_claim.c.short_url + '/' + Claim.c.short_url + )]).label('canonical_url'), ] + def select_claims(cols: List = None, for_count=False, **constraints) -> Select: if cols is None: cols = BASE_SELECT_CLAIM_COLUMNS @@ -203,8 +208,14 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: # TODO: fix constraints["search"] = constraints.pop("text") - joins = Claim.join(TXO).join(TX) - return query([Claim], select(*cols).select_from(joins), **constraints) + return query( + [Claim], + select(*cols) + .select_from( + Claim.join(TXO).join(TX) + .join(channel_claim, Claim.c.channel_hash == channel_claim.c.claim_hash, isouter=True) + ), **constraints + ) def search_claims(**constraints) -> Tuple[List[Output], Optional[int], Optional[Censor]]: @@ -226,6 +237,8 @@ def search_claim_count(**constraints) -> int: constraints.pop('order_by', None) count = context().fetchall(select_claims([func.count().label('total')], **constraints)) return count[0]['total'] or 0 + + CLAIM_HASH_OR_REPOST_HASH_SQL = f""" CASE WHEN claim.claim_type = {TXO_TYPES['repost']} THEN claim.reposted_claim_hash diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index b658fa019..a33909aea 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -612,7 +612,7 @@ class BulkLoader: def add_claim( self, txo: Output, short_url: str, creation_height: int, activation_height: int, expiration_height: int, - takeover_height: int = None, channel_url: str = None, **extra + takeover_height: int = None, **extra ): try: claim_name = txo.claim_name.replace('\x00', '') @@ -630,18 +630,13 @@ class BulkLoader: d['expiration_height'] = expiration_height d['takeover_height'] = takeover_height d['is_controlling'] = takeover_height is not None - if d['is_signature_valid'] and channel_url is not None: - d['canonical_url'] = channel_url + '/' + short_url - else: - d['canonical_url'] = None self.claims.append(d) self.tags.extend(tags) return self - def update_claim(self, txo: Output, channel_url: str = None, **extra): + def update_claim(self, txo: Output, **extra): d, tags = self.claim_to_rows(txo, **extra) d['pk'] = txo.claim_hash - d['channel_url'] = channel_url d['set_canonical_url'] = d['is_signature_valid'] self.update_claims.append(d) self.delete_tags.append({'pk': txo.claim_hash}) @@ -656,11 +651,7 @@ class BulkLoader: (TXI.insert(), self.txis), (Claim.insert(), self.claims), (Tag.delete().where(Tag.c.claim_hash == bindparam('pk')), self.delete_tags), - (Claim.update().where(Claim.c.claim_hash == bindparam('pk')).values( - canonical_url=case([ - (bindparam('set_canonical_url'), bindparam('channel_url') + '/' + Claim.c.short_url) - ], else_=None) - ), self.update_claims), + (Claim.update().where(Claim.c.claim_hash == bindparam('pk')), self.update_claims), (Tag.insert(), self.tags), (Support.insert(), self.supports), ) diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 9944ba760..0d83f65ab 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -146,10 +146,10 @@ Claim = Table( Column('sync_height', Integer), # claim dynamic values up-to-date as of this height (eg. staked_amount) Column('is_controlling', Boolean), - # normalized#shortest-unique-claim_id + # short_url: normalized#shortest-unique-claim_id Column('short_url', Text), - # channel's-short_url/normalized#shortest-unique-claim_id-within-channel - Column('canonical_url', Text, nullable=True), + # canonical_url: channel's-short_url/normalized#shortest-unique-claim_id-within-channel + # canonical_url is computed dynamically Column('title', Text, nullable=True), Column('author', Text, nullable=True), diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 5f4efee48..d35410861 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -536,11 +536,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): (0, None, (9,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)) ] ) - self.assertConsumingEvents( - events, "blockchain.sync.claims.init", ("steps",), [ - (0, None, (2,), (1,), (2,)) - ] - ) self.assertEqual( events.pop(0), { "event": "blockchain.sync.claims.main", @@ -648,7 +643,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase): ) self.assertConsumingEvents( events, "blockchain.sync.claims.init", ("steps",), [ - (0, None, (6,), (1,), (2,), (3,), (4,), (5,), (6,)) + (0, None, (4,), (1,), (2,), (3,), (4,)) ] ) self.assertEqual(