diff --git a/lbry/blockchain/sync/claims.py b/lbry/blockchain/sync/claims.py index 7b07951ea..e85d1ce4b 100644 --- a/lbry/blockchain/sync/claims.py +++ b/lbry/blockchain/sync/claims.py @@ -38,26 +38,46 @@ def channel_content_count_calc(signable): support = TXO.alias('support') -def staked_support_aggregation(aggregate): +def staked_support_subquery(claim_hash_column, aggregate): + """Return a query that selects unspent supports""" + content = Claim.alias("content") return ( - select(aggregate).where( - (support.c.txo_type == TXO_TYPES['support']) & + select( + aggregate + ).select_from( + support + .join(content, support.c.claim_hash == content.c.claim_hash) + ).where( + ((content.c.claim_hash == claim_hash_column) | (content.c.channel_hash == claim_hash_column)) & + (support.c.txo_type == TXO_TYPES["support"]) & (support.c.spent_height == 0) - ).scalar_subquery() + ) + .scalar_subquery() ) -def staked_support_amount_calc(other): - return ( - staked_support_aggregation(func.coalesce(func.sum(support.c.amount), 0)) - .where(support.c.claim_hash == other.c.claim_hash) - ) +def staked_support_amount_calc(claim_hash): + """Return a query that sums unspent supports for a claim""" + return staked_support_subquery(claim_hash, func.coalesce(func.sum(support.c.amount), 0)) -def staked_support_count_calc(other): +def staked_support_count_calc(claim_hash): + """Return a query that counts unspent supports for a claim""" + return staked_support_subquery(claim_hash, func.coalesce(func.count('*'), 0)) + + +def claims_in_channel_amount_calc(claim_hash): + """Return a query that sums the amount of all the claims in a channel""" + content = Claim.alias("content") return ( - staked_support_aggregation(func.coalesce(func.count('*'), 0)) - .where(support.c.claim_hash == other.c.claim_hash) + select( + func.coalesce(func.sum(content.c.amount), 0) + ).select_from( + content + ).where( + content.c.channel_hash == claim_hash + ) + .scalar_subquery() ) @@ -87,8 +107,9 @@ def select_claims_for_saving( channel_txo = TXO.alias('channel_txo') return select( *minimum_txo_columns, TXO.c.claim_hash, - staked_support_amount_calc(TXO).label('staked_support_amount'), - staked_support_count_calc(TXO).label('staked_support_count'), + claims_in_channel_amount_calc(TXO.c.claim_hash).label('claims_in_channel_amount'), + staked_support_amount_calc(TXO.c.claim_hash).label('staked_support_amount'), + staked_support_count_calc(TXO.c.claim_hash).label('staked_support_count'), reposted_claim_count_calc(TXO).label('reposted_count'), TXO.c.signature, TXO.c.signature_digest, case([( @@ -110,6 +131,7 @@ def select_claims_for_saving( def row_to_claim_for_saving(row) -> Tuple[Output, dict]: return row_to_txo(row), { + 'claims_in_channel_amount': int(row.claims_in_channel_amount), 'staked_support_amount': int(row.staked_support_amount), 'staked_support_count': int(row.staked_support_count), 'reposted_count': int(row.reposted_count), @@ -247,8 +269,13 @@ def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext): Claim.update() .where(where_claims_with_changed_supports(blocks)) .values( - staked_support_amount=staked_support_amount_calc(Claim), - staked_support_count=staked_support_count_calc(Claim), + staked_amount=( + Claim.c.amount + + claims_in_channel_amount_calc(Claim.c.claim_hash) + + staked_support_amount_calc(Claim.c.claim_hash) + ), + staked_support_amount=staked_support_amount_calc(Claim.c.claim_hash), + staked_support_count=staked_support_count_calc(Claim.c.claim_hash), ) ) result = p.ctx.execute(sql) diff --git a/lbry/db/queries/search.py b/lbry/db/queries/search.py index 19dac918e..fe6247dbf 100644 --- a/lbry/db/queries/search.py +++ b/lbry/db/queries/search.py @@ -120,7 +120,7 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ Claim.c.short_url, Claim.c.signed_claim_count, Claim.c.signed_support_count, - (Claim.c.amount + Claim.c.staked_support_amount).label('staked_amount'), + Claim.c.staked_amount, Claim.c.staked_support_amount, Claim.c.staked_support_count, Claim.c.is_signature_valid, diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index 99db5c38b..2aa102c35 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -143,28 +143,35 @@ def distribute_unspent_txos( return total, buckets -def where_changed_support_txos(blocks: Optional[Tuple[int, int]]): +def claims_with_changed_supports(blocks: Optional[Tuple[int, int]]) -> Select: + has_changed_supports = ( + select(Claim.c.claim_hash.label("claim_hash"), Claim.c.channel_hash.label("channel_hash")) + .join(Claim, Claim.c.claim_hash == TXO.c.claim_hash) + .where( + (TXO.c.txo_type == TXO_TYPES['support']) & + (between(TXO.c.height, blocks[0], blocks[-1]) | between(TXO.c.spent_height, blocks[0], blocks[-1])) + ) + .cte("has_changed_supports") + ) + return ( - (TXO.c.txo_type == TXO_TYPES['support']) & ( - between(TXO.c.height, blocks[0], blocks[-1]) | - between(TXO.c.spent_height, blocks[0], blocks[-1]) + select(has_changed_supports.c.claim_hash.label("claim_hash")) + .union_all( # UNION ALL is faster than UNION because it does not remove duplicates + select(has_changed_supports.c.channel_hash) + .where(has_changed_supports.c.channel_hash.isnot(None)) ) ) -def where_claims_with_changed_supports(blocks: Optional[Tuple[int, int]]): +def where_claims_with_changed_supports(blocks: Optional[Tuple[int, int]]) -> Select: return Claim.c.claim_hash.in_( - select(TXO.c.claim_hash).where( - where_changed_support_txos(blocks) - ) + claims_with_changed_supports(blocks) ) def count_claims_with_changed_supports(blocks: Optional[Tuple[int, int]]) -> int: - sql = ( - select(func.count(distinct(TXO.c.claim_hash)).label('total')) - .where(where_changed_support_txos(blocks)) - ) + sub_query = claims_with_changed_supports(blocks).subquery() + sql = select(func.count(distinct(sub_query.c.claim_hash)).label('total')).select_from(sub_query) return context().fetchone(sql)['total'] diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 68e192500..b4098290d 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -532,8 +532,8 @@ class BulkLoader: return row def claim_to_rows( - self, txo: Output, staked_support_amount: int, staked_support_count: int, reposted_count: int, - signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None, + self, txo: Output, claims_in_channel_amount: int, staked_support_amount: int, staked_support_count: int, + reposted_count: int, signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None, ) -> Tuple[dict, List]: tx = txo.tx_ref @@ -545,7 +545,7 @@ class BulkLoader: 'height': tx.height, 'timestamp': tx.timestamp, # support - 'staked_amount': txo.amount + staked_support_amount, + 'staked_amount': txo.amount + claims_in_channel_amount + staked_support_amount, 'staked_support_amount': staked_support_amount, 'staked_support_count': staked_support_count, # basic metadata diff --git a/tests/integration/blockchain/test_blockchain.py b/tests/integration/blockchain/test_blockchain.py index 3add108d3..58ada45dc 100644 --- a/tests/integration/blockchain/test_blockchain.py +++ b/tests/integration/blockchain/test_blockchain.py @@ -1032,7 +1032,7 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): self.assertEqual(total, 1600000000) # create a claim in channel A and have channel B support that claim - claim_a = await self.get_claim(await self.create_claim(name="bob", amount='2.0', sign=channel_a)) + claim_a = await self.get_claim(await self.create_claim(name="alfred", amount='2.0', sign=channel_a)) await self.support_claim(claim_a, '1.0', sign=channel_b) await self.generate(1) @@ -1097,6 +1097,22 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase): ]) self.assertEqual(total, 1100000000) + # add a few supports and make sure `staked_amount` is right + claim_b = await self.get_claim(await self.create_claim(name="bob", amount='0.7', sign=channel_a)) + await self.generate(1) + await self.support_claim(claim_b, '1.3', sign=channel_b) + await self.support_claim(claim_b, '1.3', sign=channel_c) + await self.support_claim(channel_b, '1.7', sign=channel_c) # this should not be counted for channel a + await self.generate(1) + + ch_c, ch_b, ch_a = await self.db.search_claims(order_by=['name'], claim_type="channel", limit=3) + _, total = await self.db.sum_supports(channel_a.claim_hash, include_channel_content=True, exclude_own_supports=False) + self.assertEqual('@A', ch_a.claim_name) + self.assertEqual(1000000, ch_a.amount) + self.assertEqual(2460000000, total) # sum just the supports, including unlocked (abandoned) tips + self.assertEqual(2031000000, ch_a.meta['staked_amount']) # sum claims and supports + + async def test_meta_fields_are_translated_to_protobuf(self): chan_ab = await self.get_claim( await self.create_claim(claim_id_startswith='ab', is_channel=True))