update staked_amount to include supports for claims in the channel's stake. fixes #3072

This commit is contained in:
Alex Grintsvayg 2020-12-14 15:08:24 -05:00
parent 784343878b
commit 4e9977ac23
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
5 changed files with 83 additions and 33 deletions

View file

@ -38,26 +38,46 @@ def channel_content_count_calc(signable):
support = TXO.alias('support') support = TXO.alias('support')
def staked_support_aggregation(aggregate): def staked_support_subquery(claim_hash_column, aggregate):
"""Return a query that selects unspent supports"""
content = Claim.alias("content")
return ( return (
select(aggregate).where( select(
(support.c.txo_type == TXO_TYPES['support']) & aggregate
).select_from(
support
.join(content, support.c.claim_hash == content.c.claim_hash)
).where(
((content.c.claim_hash == claim_hash_column) | (content.c.channel_hash == claim_hash_column)) &
(support.c.txo_type == TXO_TYPES["support"]) &
(support.c.spent_height == 0) (support.c.spent_height == 0)
).scalar_subquery() )
.scalar_subquery()
) )
def staked_support_amount_calc(other): def staked_support_amount_calc(claim_hash):
return ( """Return a query that sums unspent supports for a claim"""
staked_support_aggregation(func.coalesce(func.sum(support.c.amount), 0)) return staked_support_subquery(claim_hash, func.coalesce(func.sum(support.c.amount), 0))
.where(support.c.claim_hash == other.c.claim_hash)
)
def staked_support_count_calc(other): def staked_support_count_calc(claim_hash):
"""Return a query that counts unspent supports for a claim"""
return staked_support_subquery(claim_hash, func.coalesce(func.count('*'), 0))
def claims_in_channel_amount_calc(claim_hash):
"""Return a query that sums the amount of all the claims in a channel"""
content = Claim.alias("content")
return ( return (
staked_support_aggregation(func.coalesce(func.count('*'), 0)) select(
.where(support.c.claim_hash == other.c.claim_hash) func.coalesce(func.sum(content.c.amount), 0)
).select_from(
content
).where(
content.c.channel_hash == claim_hash
)
.scalar_subquery()
) )
@ -87,8 +107,9 @@ def select_claims_for_saving(
channel_txo = TXO.alias('channel_txo') channel_txo = TXO.alias('channel_txo')
return select( return select(
*minimum_txo_columns, TXO.c.claim_hash, *minimum_txo_columns, TXO.c.claim_hash,
staked_support_amount_calc(TXO).label('staked_support_amount'), claims_in_channel_amount_calc(TXO.c.claim_hash).label('claims_in_channel_amount'),
staked_support_count_calc(TXO).label('staked_support_count'), staked_support_amount_calc(TXO.c.claim_hash).label('staked_support_amount'),
staked_support_count_calc(TXO.c.claim_hash).label('staked_support_count'),
reposted_claim_count_calc(TXO).label('reposted_count'), reposted_claim_count_calc(TXO).label('reposted_count'),
TXO.c.signature, TXO.c.signature_digest, TXO.c.signature, TXO.c.signature_digest,
case([( case([(
@ -110,6 +131,7 @@ def select_claims_for_saving(
def row_to_claim_for_saving(row) -> Tuple[Output, dict]: def row_to_claim_for_saving(row) -> Tuple[Output, dict]:
return row_to_txo(row), { return row_to_txo(row), {
'claims_in_channel_amount': int(row.claims_in_channel_amount),
'staked_support_amount': int(row.staked_support_amount), 'staked_support_amount': int(row.staked_support_amount),
'staked_support_count': int(row.staked_support_count), 'staked_support_count': int(row.staked_support_count),
'reposted_count': int(row.reposted_count), 'reposted_count': int(row.reposted_count),
@ -247,8 +269,13 @@ def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext):
Claim.update() Claim.update()
.where(where_claims_with_changed_supports(blocks)) .where(where_claims_with_changed_supports(blocks))
.values( .values(
staked_support_amount=staked_support_amount_calc(Claim), staked_amount=(
staked_support_count=staked_support_count_calc(Claim), Claim.c.amount +
claims_in_channel_amount_calc(Claim.c.claim_hash) +
staked_support_amount_calc(Claim.c.claim_hash)
),
staked_support_amount=staked_support_amount_calc(Claim.c.claim_hash),
staked_support_count=staked_support_count_calc(Claim.c.claim_hash),
) )
) )
result = p.ctx.execute(sql) result = p.ctx.execute(sql)

View file

@ -120,7 +120,7 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Claim.c.short_url, Claim.c.short_url,
Claim.c.signed_claim_count, Claim.c.signed_claim_count,
Claim.c.signed_support_count, Claim.c.signed_support_count,
(Claim.c.amount + Claim.c.staked_support_amount).label('staked_amount'), Claim.c.staked_amount,
Claim.c.staked_support_amount, Claim.c.staked_support_amount,
Claim.c.staked_support_count, Claim.c.staked_support_count,
Claim.c.is_signature_valid, Claim.c.is_signature_valid,

View file

@ -143,28 +143,35 @@ def distribute_unspent_txos(
return total, buckets return total, buckets
def where_changed_support_txos(blocks: Optional[Tuple[int, int]]): def claims_with_changed_supports(blocks: Optional[Tuple[int, int]]) -> Select:
has_changed_supports = (
select(Claim.c.claim_hash.label("claim_hash"), Claim.c.channel_hash.label("channel_hash"))
.join(Claim, Claim.c.claim_hash == TXO.c.claim_hash)
.where(
(TXO.c.txo_type == TXO_TYPES['support']) &
(between(TXO.c.height, blocks[0], blocks[-1]) | between(TXO.c.spent_height, blocks[0], blocks[-1]))
)
.cte("has_changed_supports")
)
return ( return (
(TXO.c.txo_type == TXO_TYPES['support']) & ( select(has_changed_supports.c.claim_hash.label("claim_hash"))
between(TXO.c.height, blocks[0], blocks[-1]) | .union_all( # UNION ALL is faster than UNION because it does not remove duplicates
between(TXO.c.spent_height, blocks[0], blocks[-1]) select(has_changed_supports.c.channel_hash)
.where(has_changed_supports.c.channel_hash.isnot(None))
) )
) )
def where_claims_with_changed_supports(blocks: Optional[Tuple[int, int]]): def where_claims_with_changed_supports(blocks: Optional[Tuple[int, int]]) -> Select:
return Claim.c.claim_hash.in_( return Claim.c.claim_hash.in_(
select(TXO.c.claim_hash).where( claims_with_changed_supports(blocks)
where_changed_support_txos(blocks)
)
) )
def count_claims_with_changed_supports(blocks: Optional[Tuple[int, int]]) -> int: def count_claims_with_changed_supports(blocks: Optional[Tuple[int, int]]) -> int:
sql = ( sub_query = claims_with_changed_supports(blocks).subquery()
select(func.count(distinct(TXO.c.claim_hash)).label('total')) sql = select(func.count(distinct(sub_query.c.claim_hash)).label('total')).select_from(sub_query)
.where(where_changed_support_txos(blocks))
)
return context().fetchone(sql)['total'] return context().fetchone(sql)['total']

View file

@ -532,8 +532,8 @@ class BulkLoader:
return row return row
def claim_to_rows( def claim_to_rows(
self, txo: Output, staked_support_amount: int, staked_support_count: int, reposted_count: int, self, txo: Output, claims_in_channel_amount: int, staked_support_amount: int, staked_support_count: int,
signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None, reposted_count: int, signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None,
) -> Tuple[dict, List]: ) -> Tuple[dict, List]:
tx = txo.tx_ref tx = txo.tx_ref
@ -545,7 +545,7 @@ class BulkLoader:
'height': tx.height, 'height': tx.height,
'timestamp': tx.timestamp, 'timestamp': tx.timestamp,
# support # support
'staked_amount': txo.amount + staked_support_amount, 'staked_amount': txo.amount + claims_in_channel_amount + staked_support_amount,
'staked_support_amount': staked_support_amount, 'staked_support_amount': staked_support_amount,
'staked_support_count': staked_support_count, 'staked_support_count': staked_support_count,
# basic metadata # basic metadata

View file

@ -1032,7 +1032,7 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
self.assertEqual(total, 1600000000) self.assertEqual(total, 1600000000)
# create a claim in channel A and have channel B support that claim # create a claim in channel A and have channel B support that claim
claim_a = await self.get_claim(await self.create_claim(name="bob", amount='2.0', sign=channel_a)) claim_a = await self.get_claim(await self.create_claim(name="alfred", amount='2.0', sign=channel_a))
await self.support_claim(claim_a, '1.0', sign=channel_b) await self.support_claim(claim_a, '1.0', sign=channel_b)
await self.generate(1) await self.generate(1)
@ -1097,6 +1097,22 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
]) ])
self.assertEqual(total, 1100000000) self.assertEqual(total, 1100000000)
# add a few supports and make sure `staked_amount` is right
claim_b = await self.get_claim(await self.create_claim(name="bob", amount='0.7', sign=channel_a))
await self.generate(1)
await self.support_claim(claim_b, '1.3', sign=channel_b)
await self.support_claim(claim_b, '1.3', sign=channel_c)
await self.support_claim(channel_b, '1.7', sign=channel_c) # this should not be counted for channel a
await self.generate(1)
ch_c, ch_b, ch_a = await self.db.search_claims(order_by=['name'], claim_type="channel", limit=3)
_, total = await self.db.sum_supports(channel_a.claim_hash, include_channel_content=True, exclude_own_supports=False)
self.assertEqual('@A', ch_a.claim_name)
self.assertEqual(1000000, ch_a.amount)
self.assertEqual(2460000000, total) # sum just the supports, including unlocked (abandoned) tips
self.assertEqual(2031000000, ch_a.meta['staked_amount']) # sum claims and supports
async def test_meta_fields_are_translated_to_protobuf(self): async def test_meta_fields_are_translated_to_protobuf(self):
chan_ab = await self.get_claim( chan_ab = await self.get_claim(
await self.create_claim(claim_id_startswith='ab', is_channel=True)) await self.create_claim(claim_id_startswith='ab', is_channel=True))