added respoted_count calculation

This commit is contained in:
Lex Berezhny 2020-09-24 15:19:30 -04:00
parent 2853bb437d
commit bba9aec4f2
7 changed files with 108 additions and 13 deletions

View file

@ -8,7 +8,8 @@ from lbry.db.queries.txio import (
minimum_txo_columns, row_to_txo, minimum_txo_columns, row_to_txo,
where_unspent_txos, where_claims_with_changed_supports, where_unspent_txos, where_claims_with_changed_supports,
count_unspent_txos, where_channels_with_changed_content, count_unspent_txos, where_channels_with_changed_content,
where_abandoned_claims, count_channels_with_changed_content where_abandoned_claims, count_channels_with_changed_content,
where_claims_with_changed_reposts,
) )
from lbry.db.query_context import ProgressContext, event_emitter from lbry.db.query_context import ProgressContext, event_emitter
from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes
@ -56,6 +57,17 @@ def staked_support_count_calc(other):
) )
def reposted_claim_count_calc(other):
repost = TXO.alias('repost')
return (
select(func.coalesce(func.count(repost.c.reposted_claim_hash), 0))
.where(
(repost.c.reposted_claim_hash == other.c.claim_hash) &
(repost.c.spent_height == 0)
).scalar_subquery()
)
def make_label(action, blocks): def make_label(action, blocks):
if blocks[0] == blocks[-1]: if blocks[0] == blocks[-1]:
return f"{action} {blocks[0]:>6}" return f"{action} {blocks[0]:>6}"
@ -73,6 +85,7 @@ def select_claims_for_saving(
*minimum_txo_columns, TXO.c.claim_hash, *minimum_txo_columns, TXO.c.claim_hash,
staked_support_amount_calc(TXO).label('staked_support_amount'), staked_support_amount_calc(TXO).label('staked_support_amount'),
staked_support_count_calc(TXO).label('staked_support_count'), staked_support_count_calc(TXO).label('staked_support_count'),
reposted_claim_count_calc(TXO).label('reposted_count'),
TXO.c.signature, TXO.c.signature_digest, TXO.c.signature, TXO.c.signature_digest,
case([( case([(
TXO.c.channel_hash.isnot(None), TXO.c.channel_hash.isnot(None),
@ -95,6 +108,7 @@ def row_to_claim_for_saving(row) -> Tuple[Output, dict]:
return row_to_txo(row), { return row_to_txo(row), {
'staked_support_amount': int(row.staked_support_amount), 'staked_support_amount': int(row.staked_support_amount),
'staked_support_count': int(row.staked_support_count), 'staked_support_count': int(row.staked_support_count),
'reposted_count': int(row.reposted_count),
'signature': row.signature, 'signature': row.signature,
'signature_digest': row.signature_digest, 'signature_digest': row.signature_digest,
'channel_public_key': row.channel_public_key 'channel_public_key': row.channel_public_key
@ -237,6 +251,18 @@ def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext):
p.step(result.rowcount) p.step(result.rowcount)
@event_emitter("blockchain.sync.claims.reposts", "claims")
def update_reposts(blocks: Tuple[int, int], claims: int, p: ProgressContext):
p.start(claims)
sql = (
Claim.update()
.where(where_claims_with_changed_reposts(blocks))
.values(reposted_count=reposted_claim_count_calc(Claim))
)
result = p.ctx.execute(sql)
p.step(result.rowcount)
@event_emitter("blockchain.sync.claims.channels", "channels") @event_emitter("blockchain.sync.claims.channels", "channels")
def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext): def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext):
update_sql = Claim.update().values( update_sql = Claim.update().values(

View file

@ -217,6 +217,9 @@ class BlockchainSync(Sync):
async def count_claims_with_changed_supports(self, blocks) -> int: async def count_claims_with_changed_supports(self, blocks) -> int:
return await self.db.run(q.count_claims_with_changed_supports, blocks) return await self.db.run(q.count_claims_with_changed_supports, blocks)
async def count_claims_with_changed_reposts(self, blocks) -> int:
return await self.db.run(q.count_claims_with_changed_reposts, blocks)
async def count_channels_with_changed_content(self, blocks) -> int: async def count_channels_with_changed_content(self, blocks) -> int:
return await self.db.run(q.count_channels_with_changed_content, blocks) return await self.db.run(q.count_channels_with_changed_content, blocks)
@ -226,13 +229,13 @@ class BlockchainSync(Sync):
) )
async def sync_claims(self, blocks) -> bool: async def sync_claims(self, blocks) -> bool:
delete_claims = takeovers = claims_with_changed_supports = 0 delete_claims = takeovers = claims_with_changed_supports = claims_with_changed_reposts = 0
initial_sync = not await self.db.has_claims() initial_sync = not await self.db.has_claims()
with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p: with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p:
if initial_sync: if initial_sync:
total, batches = await self.distribute_unspent_txos(CLAIM_TYPE_CODES) total, batches = await self.distribute_unspent_txos(CLAIM_TYPE_CODES)
elif blocks: elif blocks:
p.start(4) p.start(5)
# 1. content claims to be inserted or updated # 1. content claims to be inserted or updated
total = await self.count_unspent_txos( total = await self.count_unspent_txos(
CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True
@ -247,6 +250,10 @@ class BlockchainSync(Sync):
claims_with_changed_supports = await self.count_claims_with_changed_supports(blocks) claims_with_changed_supports = await self.count_claims_with_changed_supports(blocks)
total += claims_with_changed_supports total += claims_with_changed_supports
p.step() p.step()
# 4. claims to be updated with new repost totals
claims_with_changed_reposts = await self.count_claims_with_changed_reposts(blocks)
total += claims_with_changed_reposts
p.step()
# 5. claims to be updated due to name takeovers # 5. claims to be updated due to name takeovers
takeovers = await self.count_takeovers(blocks) takeovers = await self.count_takeovers(blocks)
total += takeovers total += takeovers
@ -270,6 +277,8 @@ class BlockchainSync(Sync):
await self.db.run(claim_phase.update_takeovers, blocks, takeovers) await self.db.run(claim_phase.update_takeovers, blocks, takeovers)
if claims_with_changed_supports: if claims_with_changed_supports:
await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports) await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports)
if claims_with_changed_reposts:
await self.db.run(claim_phase.update_reposts, blocks, claims_with_changed_reposts)
if initial_sync: if initial_sync:
await self.db.run(claim_phase.claims_constraints_and_indexes) await self.db.run(claim_phase.claims_constraints_and_indexes)
else: else:

View file

@ -193,6 +193,31 @@ def count_channels_with_changed_content(blocks: Optional[Tuple[int, int]]):
return context().fetchone(sql)['total'] return context().fetchone(sql)['total']
def where_changed_repost_txos(blocks: Optional[Tuple[int, int]]):
return (
(TXO.c.txo_type == TXO_TYPES['repost']) & (
between(TXO.c.height, blocks[0], blocks[-1]) |
between(TXO.c.spent_height, blocks[0], blocks[-1])
)
)
def where_claims_with_changed_reposts(blocks: Optional[Tuple[int, int]]):
return Claim.c.claim_hash.in_(
select(TXO.c.reposted_claim_hash).where(
where_changed_repost_txos(blocks)
)
)
def count_claims_with_changed_reposts(blocks: Optional[Tuple[int, int]]):
sql = (
select(func.count(distinct(TXO.c.reposted_claim_hash)).label('total'))
.where(where_changed_repost_txos(blocks))
)
return context().fetchone(sql)['total']
def select_transactions(cols, account_ids=None, **constraints): def select_transactions(cols, account_ids=None, **constraints):
s: Select = select(*cols).select_from(TX) s: Select = select(*cols).select_from(TX)
if not {'tx_hash', 'tx_hash__in'}.intersection(constraints): if not {'tx_hash', 'tx_hash__in'}.intersection(constraints):

View file

@ -476,6 +476,7 @@ class BulkLoader:
'channel_hash': None, 'channel_hash': None,
'signature': None, 'signature': None,
'signature_digest': None, 'signature_digest': None,
'reposted_claim_hash': None,
'public_key': None, 'public_key': None,
'public_key_hash': None 'public_key_hash': None
} }
@ -488,6 +489,8 @@ class BulkLoader:
row['public_key_hash'] = self.ledger.address_to_hash160( row['public_key_hash'] = self.ledger.address_to_hash160(
self.ledger.public_key_to_address(claim.channel.public_key_bytes) self.ledger.public_key_to_address(claim.channel.public_key_bytes)
) )
elif claim.is_repost:
row['reposted_claim_hash'] = claim.repost.reference.claim_hash
else: else:
row['txo_type'] = TXO_TYPES['stream'] row['txo_type'] = TXO_TYPES['stream']
elif txo.is_support: elif txo.is_support:
@ -511,7 +514,7 @@ class BulkLoader:
return row return row
def claim_to_rows( def claim_to_rows(
self, txo: Output, staked_support_amount: int, staked_support_count: int, self, txo: Output, staked_support_amount: int, staked_support_count: int, reposted_count: int,
signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None, signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None,
) -> Tuple[dict, List]: ) -> Tuple[dict, List]:
@ -540,6 +543,7 @@ class BulkLoader:
'fee_currency': None, 'fee_currency': None,
# reposts # reposts
'reposted_claim_hash': None, 'reposted_claim_hash': None,
'reposted_count': reposted_count,
# signed claims # signed claims
'channel_hash': None, 'channel_hash': None,
'is_signature_valid': None, 'is_signature_valid': None,

View file

@ -133,6 +133,9 @@ TXO = Table(
Column('signature', LargeBinary, nullable=True), Column('signature', LargeBinary, nullable=True),
Column('signature_digest', LargeBinary, nullable=True), Column('signature_digest', LargeBinary, nullable=True),
# reposts
Column('reposted_claim_hash', LargeBinary, nullable=True),
# channels # channels
Column('public_key', LargeBinary, nullable=True), Column('public_key', LargeBinary, nullable=True),
Column('public_key_hash', LargeBinary, nullable=True), Column('public_key_hash', LargeBinary, nullable=True),
@ -158,6 +161,13 @@ pg_add_txo_constraints_and_indexes = [
f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) " f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) "
f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
# for finding claims which need repost totals re-calculated in a block range
f"CREATE INDEX txo_added_reposts_by_height ON txo (height DESC) "
f"INCLUDE (reposted_claim_hash) WHERE txo_type={TXO_TYPES['repost']};",
f"CREATE INDEX txo_spent_reposts_by_height ON txo (spent_height DESC) "
f"INCLUDE (reposted_claim_hash) WHERE txo_type={TXO_TYPES['repost']};",
"CREATE INDEX txo_reposted_claim_hash ON txo (reposted_claim_hash)"
"WHERE reposted_claim_hash IS NOT NULL AND spent_height = 0;",
"CREATE INDEX txo_height ON txo (height);", "CREATE INDEX txo_height ON txo (height);",
] ]
@ -209,7 +219,6 @@ Claim = Table(
Column('description', Text, nullable=True), Column('description', Text, nullable=True),
Column('claim_type', SmallInteger), Column('claim_type', SmallInteger),
Column('claim_reposted_count', Integer, server_default='0'),
Column('staked_support_count', Integer, server_default='0'), Column('staked_support_count', Integer, server_default='0'),
Column('staked_support_amount', BigInteger, server_default='0'), Column('staked_support_amount', BigInteger, server_default='0'),
@ -221,8 +230,8 @@ Claim = Table(
Column('duration', Integer, nullable=True), Column('duration', Integer, nullable=True),
# reposts # reposts
Column('reposted_claim_hash', LargeBinary, nullable=True), Column('reposted_claim_hash', LargeBinary, nullable=True), # on claim doing the repost
Column('reposted_count', Integer, server_default='0'), Column('reposted_count', Integer, server_default='0'), # on claim being reposted
# claims which are channels # claims which are channels
Column('signed_claim_count', Integer, server_default='0'), Column('signed_claim_count', Integer, server_default='0'),
@ -255,6 +264,8 @@ pg_add_claim_and_tag_constraints_and_indexes = [
# used to count()/sum() claims signed by channel # used to count()/sum() claims signed by channel
"CREATE INDEX signed_content ON claim (channel_hash) " "CREATE INDEX signed_content ON claim (channel_hash) "
"INCLUDE (amount) WHERE is_signature_valid;", "INCLUDE (amount) WHERE is_signature_valid;",
# used to count()/sum() reposted claims
"CREATE INDEX reposted_content ON claim (reposted_claim_hash);",
# basic tag indexes # basic tag indexes
"ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);", "ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);",
"CREATE INDEX tags ON tag (tag) INCLUDE (claim_hash);", "CREATE INDEX tags ON tag (tag) INCLUDE (claim_hash);",

View file

@ -792,7 +792,7 @@ class EventGenerator:
yield from self.claims_main_start() yield from self.claims_main_start()
yield from self.claims_insert(self.claims) yield from self.claims_insert(self.claims)
if self.initial_sync: if self.initial_sync:
yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (9,), (1,)) yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (10,), (1,))
else: else:
yield from self.claims_takeovers(self.takeovers) yield from self.claims_takeovers(self.takeovers)
yield from self.claims_stakes() yield from self.claims_stakes()
@ -920,12 +920,12 @@ class EventGenerator:
def spends_steps(self): def spends_steps(self):
yield from self.generate( yield from self.generate(
"blockchain.sync.spends.main", ("steps",), 0, None, "blockchain.sync.spends.main", ("steps",), 0, None,
(17 if self.initial_sync else 5,), (20 if self.initial_sync else 5,),
(1,) (1,)
) )
def claims_init(self): def claims_init(self):
yield from self.generate("blockchain.sync.claims.init", ("steps",), 0, None, (4,), (1,)) yield from self.generate("blockchain.sync.claims.init", ("steps",), 0, None, (5,), (1,))
def claims_main_start(self): def claims_main_start(self):
total = ( total = (

View file

@ -9,7 +9,7 @@ from distutils.dir_util import copy_tree, remove_tree
from lbry import Config, Database, RegTestLedger, Transaction, Output, Input from lbry import Config, Database, RegTestLedger, Transaction, Output, Input
from lbry.crypto.base58 import Base58 from lbry.crypto.base58 import Base58
from lbry.schema.claim import Stream, Channel from lbry.schema.claim import Claim, Stream, Channel
from lbry.schema.result import Outputs from lbry.schema.result import Outputs
from lbry.schema.support import Support from lbry.schema.support import Support
from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError
@ -115,9 +115,13 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase):
async def create_claim( async def create_claim(
self, title='', amount='0.01', name=None, author='', desc='', self, title='', amount='0.01', name=None, author='', desc='',
claim_id_startswith='', sign=None, is_channel=False) -> str: claim_id_startswith='', sign=None, is_channel=False, repost=None) -> str:
name = name or ('@foo' if is_channel else 'foo') name = name or ('@foo' if is_channel else 'foo')
if not claim_id_startswith and sign is None and not is_channel: if not claim_id_startswith and sign is None and not is_channel:
if repost:
claim = Claim()
claim.repost.reference.claim_id = repost
else:
claim = Stream().update(title=title, author=author, description=desc).claim claim = Stream().update(title=title, author=author, description=desc).claim
return await self.chain.claim_name( return await self.chain.claim_name(
name, hexlify(claim.to_bytes()).decode(), amount name, hexlify(claim.to_bytes()).decode(), amount
@ -872,6 +876,22 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
self.assertEqual(0, r.meta['signed_claim_count']) # channel2 lost abandoned claim self.assertEqual(0, r.meta['signed_claim_count']) # channel2 lost abandoned claim
self.assertEqual(0, r.meta['signed_support_count']) self.assertEqual(0, r.meta['signed_support_count'])
async def test_reposts(self):
self.stream1 = await self.get_claim(await self.create_claim())
claim_id = self.stream1.claim_id
# in same block
self.stream2 = await self.get_claim(await self.create_claim(repost=claim_id))
await self.generate(1)
r, = await self.db.search_claims(claim_id=claim_id)
self.assertEqual(1, r.meta['reposted_count'])
# in subsequent block
self.stream3 = await self.get_claim(await self.create_claim(repost=claim_id))
await self.generate(1)
r, = await self.db.search_claims(claim_id=claim_id)
self.assertEqual(2, r.meta['reposted_count'])
async def resolve_to_claim_id(self, url): async def resolve_to_claim_id(self, url):
return (await self.db.resolve([url]))[url].claim_id return (await self.db.resolve([url]))[url].claim_id