diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 0594989fd..86ead1cce 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -8,7 +8,8 @@ from lbry.db.queries.txio import ( minimum_txo_columns, row_to_txo, where_unspent_txos, where_claims_with_changed_supports, count_unspent_txos, where_channels_with_changed_content, - where_abandoned_claims, count_channels_with_changed_content + where_abandoned_claims, count_channels_with_changed_content, + where_claims_with_changed_reposts, ) from lbry.db.query_context import ProgressContext, event_emitter from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes @@ -56,6 +57,17 @@ def staked_support_count_calc(other): ) +def reposted_claim_count_calc(other): + repost = TXO.alias('repost') + return ( + select(func.coalesce(func.count(repost.c.reposted_claim_hash), 0)) + .where( + (repost.c.reposted_claim_hash == other.c.claim_hash) & + (repost.c.spent_height == 0) + ).scalar_subquery() + ) + + def make_label(action, blocks): if blocks[0] == blocks[-1]: return f"{action} {blocks[0]:>6}" @@ -73,6 +85,7 @@ def select_claims_for_saving( *minimum_txo_columns, TXO.c.claim_hash, staked_support_amount_calc(TXO).label('staked_support_amount'), staked_support_count_calc(TXO).label('staked_support_count'), + reposted_claim_count_calc(TXO).label('reposted_count'), TXO.c.signature, TXO.c.signature_digest, case([( TXO.c.channel_hash.isnot(None), @@ -95,6 +108,7 @@ def row_to_claim_for_saving(row) -> Tuple[Output, dict]: return row_to_txo(row), { 'staked_support_amount': int(row.staked_support_amount), 'staked_support_count': int(row.staked_support_count), + 'reposted_count': int(row.reposted_count), 'signature': row.signature, 'signature_digest': row.signature_digest, 'channel_public_key': row.channel_public_key @@ -237,6 +251,18 @@ def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext): p.step(result.rowcount) +@event_emitter("blockchain.sync.claims.reposts", "claims") +def update_reposts(blocks: Tuple[int, int], claims: int, p: ProgressContext): + p.start(claims) + sql = ( + Claim.update() + .where(where_claims_with_changed_reposts(blocks)) + .values(reposted_count=reposted_claim_count_calc(Claim)) + ) + result = p.ctx.execute(sql) + p.step(result.rowcount) + + @event_emitter("blockchain.sync.claims.channels", "channels") def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext): update_sql = Claim.update().values( diff --git a/lbry/blockchain/sync/synchronizer.py b/lbry/blockchain/sync/synchronizer.py index f55299320..ee55d23c5 100644 --- a/lbry/blockchain/sync/synchronizer.py +++ b/lbry/blockchain/sync/synchronizer.py @@ -217,6 +217,9 @@ class BlockchainSync(Sync): async def count_claims_with_changed_supports(self, blocks) -> int: return await self.db.run(q.count_claims_with_changed_supports, blocks) + async def count_claims_with_changed_reposts(self, blocks) -> int: + return await self.db.run(q.count_claims_with_changed_reposts, blocks) + async def count_channels_with_changed_content(self, blocks) -> int: return await self.db.run(q.count_channels_with_changed_content, blocks) @@ -226,13 +229,13 @@ class BlockchainSync(Sync): ) async def sync_claims(self, blocks) -> bool: - delete_claims = takeovers = claims_with_changed_supports = 0 + delete_claims = takeovers = claims_with_changed_supports = claims_with_changed_reposts = 0 initial_sync = not await self.db.has_claims() with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p: if initial_sync: total, batches = await self.distribute_unspent_txos(CLAIM_TYPE_CODES) elif blocks: - p.start(4) + p.start(5) # 1. content claims to be inserted or updated total = await self.count_unspent_txos( CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True @@ -247,6 +250,10 @@ class BlockchainSync(Sync): claims_with_changed_supports = await self.count_claims_with_changed_supports(blocks) total += claims_with_changed_supports p.step() + # 4. claims to be updated with new repost totals + claims_with_changed_reposts = await self.count_claims_with_changed_reposts(blocks) + total += claims_with_changed_reposts + p.step() # 5. claims to be updated due to name takeovers takeovers = await self.count_takeovers(blocks) total += takeovers @@ -270,6 +277,8 @@ class BlockchainSync(Sync): await self.db.run(claim_phase.update_takeovers, blocks, takeovers) if claims_with_changed_supports: await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports) + if claims_with_changed_reposts: + await self.db.run(claim_phase.update_reposts, blocks, claims_with_changed_reposts) if initial_sync: await self.db.run(claim_phase.claims_constraints_and_indexes) else: diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index a8c8f6ad8..e77f834d7 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -193,6 +193,31 @@ def count_channels_with_changed_content(blocks: Optional[Tuple[int, int]]): return context().fetchone(sql)['total'] +def where_changed_repost_txos(blocks: Optional[Tuple[int, int]]): + return ( + (TXO.c.txo_type == TXO_TYPES['repost']) & ( + between(TXO.c.height, blocks[0], blocks[-1]) | + between(TXO.c.spent_height, blocks[0], blocks[-1]) + ) + ) + + +def where_claims_with_changed_reposts(blocks: Optional[Tuple[int, int]]): + return Claim.c.claim_hash.in_( + select(TXO.c.reposted_claim_hash).where( + where_changed_repost_txos(blocks) + ) + ) + + +def count_claims_with_changed_reposts(blocks: Optional[Tuple[int, int]]): + sql = ( + select(func.count(distinct(TXO.c.reposted_claim_hash)).label('total')) + .where(where_changed_repost_txos(blocks)) + ) + return context().fetchone(sql)['total'] + + def select_transactions(cols, account_ids=None, **constraints): s: Select = select(*cols).select_from(TX) if not {'tx_hash', 'tx_hash__in'}.intersection(constraints): diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 5bbd51c5c..cef79243c 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -476,6 +476,7 @@ class BulkLoader: 'channel_hash': None, 'signature': None, 'signature_digest': None, + 'reposted_claim_hash': None, 'public_key': None, 'public_key_hash': None } @@ -488,6 +489,8 @@ class BulkLoader: row['public_key_hash'] = self.ledger.address_to_hash160( self.ledger.public_key_to_address(claim.channel.public_key_bytes) ) + elif claim.is_repost: + row['reposted_claim_hash'] = claim.repost.reference.claim_hash else: row['txo_type'] = TXO_TYPES['stream'] elif txo.is_support: @@ -511,7 +514,7 @@ class BulkLoader: return row def claim_to_rows( - self, txo: Output, staked_support_amount: int, staked_support_count: int, + self, txo: Output, staked_support_amount: int, staked_support_count: int, reposted_count: int, signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None, ) -> Tuple[dict, List]: @@ -540,6 +543,7 @@ class BulkLoader: 'fee_currency': None, # reposts 'reposted_claim_hash': None, + 'reposted_count': reposted_count, # signed claims 'channel_hash': None, 'is_signature_valid': None, diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 0a2d3d169..fdc2c9609 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -133,6 +133,9 @@ TXO = Table( Column('signature', LargeBinary, nullable=True), Column('signature_digest', LargeBinary, nullable=True), + # reposts + Column('reposted_claim_hash', LargeBinary, nullable=True), + # channels Column('public_key', LargeBinary, nullable=True), Column('public_key_hash', LargeBinary, nullable=True), @@ -158,6 +161,13 @@ pg_add_txo_constraints_and_indexes = [ f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) " f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", + # for finding claims which need repost totals re-calculated in a block range + f"CREATE INDEX txo_added_reposts_by_height ON txo (height DESC) " + f"INCLUDE (reposted_claim_hash) WHERE txo_type={TXO_TYPES['repost']};", + f"CREATE INDEX txo_spent_reposts_by_height ON txo (spent_height DESC) " + f"INCLUDE (reposted_claim_hash) WHERE txo_type={TXO_TYPES['repost']};", + "CREATE INDEX txo_reposted_claim_hash ON txo (reposted_claim_hash)" + "WHERE reposted_claim_hash IS NOT NULL AND spent_height = 0;", "CREATE INDEX txo_height ON txo (height);", ] @@ -209,7 +219,6 @@ Claim = Table( Column('description', Text, nullable=True), Column('claim_type', SmallInteger), - Column('claim_reposted_count', Integer, server_default='0'), Column('staked_support_count', Integer, server_default='0'), Column('staked_support_amount', BigInteger, server_default='0'), @@ -221,8 +230,8 @@ Claim = Table( Column('duration', Integer, nullable=True), # reposts - Column('reposted_claim_hash', LargeBinary, nullable=True), - Column('reposted_count', Integer, server_default='0'), + Column('reposted_claim_hash', LargeBinary, nullable=True), # on claim doing the repost + Column('reposted_count', Integer, server_default='0'), # on claim being reposted # claims which are channels Column('signed_claim_count', Integer, server_default='0'), @@ -255,6 +264,8 @@ pg_add_claim_and_tag_constraints_and_indexes = [ # used to count()/sum() claims signed by channel "CREATE INDEX signed_content ON claim (channel_hash) " "INCLUDE (amount) WHERE is_signature_valid;", + # used to count()/sum() reposted claims + "CREATE INDEX reposted_content ON claim (reposted_claim_hash);", # basic tag indexes "ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);", "CREATE INDEX tags ON tag (tag) INCLUDE (claim_hash);", diff --git a/lbry/testcase.py b/lbry/testcase.py index 8b17175f8..64468c557 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -792,7 +792,7 @@ class EventGenerator: yield from self.claims_main_start() yield from self.claims_insert(self.claims) if self.initial_sync: - yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (9,), (1,)) + yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (10,), (1,)) else: yield from self.claims_takeovers(self.takeovers) yield from self.claims_stakes() @@ -920,12 +920,12 @@ class EventGenerator: def spends_steps(self): yield from self.generate( "blockchain.sync.spends.main", ("steps",), 0, None, - (17 if self.initial_sync else 5,), + (20 if self.initial_sync else 5,), (1,) ) def claims_init(self): - yield from self.generate("blockchain.sync.claims.init", ("steps",), 0, None, (4,), (1,)) + yield from self.generate("blockchain.sync.claims.init", ("steps",), 0, None, (5,), (1,)) def claims_main_start(self): total = ( diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index c25e8cc61..148c62130 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -9,7 +9,7 @@ from distutils.dir_util import copy_tree, remove_tree from lbry import Config, Database, RegTestLedger, Transaction, Output, Input from lbry.crypto.base58 import Base58 -from lbry.schema.claim import Stream, Channel +from lbry.schema.claim import Claim, Stream, Channel from lbry.schema.result import Outputs from lbry.schema.support import Support from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError @@ -115,10 +115,14 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase): async def create_claim( self, title='', amount='0.01', name=None, author='', desc='', - claim_id_startswith='', sign=None, is_channel=False) -> str: + claim_id_startswith='', sign=None, is_channel=False, repost=None) -> str: name = name or ('@foo' if is_channel else 'foo') if not claim_id_startswith and sign is None and not is_channel: - claim = Stream().update(title=title, author=author, description=desc).claim + if repost: + claim = Claim() + claim.repost.reference.claim_id = repost + else: + claim = Stream().update(title=title, author=author, description=desc).claim return await self.chain.claim_name( name, hexlify(claim.to_bytes()).decode(), amount ) @@ -872,6 +876,22 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): self.assertEqual(0, r.meta['signed_claim_count']) # channel2 lost abandoned claim self.assertEqual(0, r.meta['signed_support_count']) + async def test_reposts(self): + self.stream1 = await self.get_claim(await self.create_claim()) + claim_id = self.stream1.claim_id + + # in same block + self.stream2 = await self.get_claim(await self.create_claim(repost=claim_id)) + await self.generate(1) + r, = await self.db.search_claims(claim_id=claim_id) + self.assertEqual(1, r.meta['reposted_count']) + + # in subsequent block + self.stream3 = await self.get_claim(await self.create_claim(repost=claim_id)) + await self.generate(1) + r, = await self.db.search_claims(claim_id=claim_id) + self.assertEqual(2, r.meta['reposted_count']) + async def resolve_to_claim_id(self, url): return (await self.db.resolve([url]))[url].claim_id