lbry-sdk/lbry/blockchain/sync/claims.py

243 lines
8.8 KiB
Python
Raw Normal View History

2020-07-12 00:18:33 +02:00
import logging
from typing import Tuple
2020-07-12 00:18:33 +02:00
2020-07-13 06:55:30 +02:00
from sqlalchemy import case, func, desc, text
2020-07-12 00:18:33 +02:00
from sqlalchemy.future import select
from lbry.db.queries.txio import (
minimum_txo_columns, row_to_txo,
where_unspent_txos, where_claims_with_changed_supports,
count_unspent_txos, where_channels_with_changed_content,
where_abandoned_claims, count_channels_with_changed_content
2020-07-12 00:18:33 +02:00
)
from lbry.db.query_context import ProgressContext, event_emitter
2020-07-14 03:00:24 +02:00
from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes
2020-07-12 00:18:33 +02:00
from lbry.db.utils import least
from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES
2020-07-12 00:18:33 +02:00
from lbry.blockchain.transaction import Output
from .context import get_or_initialize_lbrycrd
log = logging.getLogger(__name__)
def channel_content_count_calc(signable):
return (
select(func.count(signable.c.claim_hash))
.where((signable.c.channel_hash == Claim.c.claim_hash) & signable.c.is_signature_valid)
.scalar_subquery()
)
support = TXO.alias('support')
def staked_support_aggregation(aggregate):
return (
select(aggregate).where(
(support.c.txo_type == TXO_TYPES['support']) &
(support.c.spent_height == 0)
).scalar_subquery()
)
def staked_support_amount_calc(other):
return (
staked_support_aggregation(func.coalesce(func.sum(support.c.amount), 0))
.where(support.c.claim_hash == other.c.claim_hash)
)
def staked_support_count_calc(other):
return (
staked_support_aggregation(func.coalesce(func.count('*'), 0))
.where(support.c.claim_hash == other.c.claim_hash)
)
def make_label(action, blocks):
if blocks[0] == blocks[-1]:
2020-07-13 20:29:38 +02:00
return f"{action} {blocks[0]:>6}"
2020-07-12 00:18:33 +02:00
else:
2020-07-13 20:29:38 +02:00
return f"{action} {blocks[0]:>6}-{blocks[-1]:>6}"
2020-07-12 00:18:33 +02:00
def select_claims_for_saving(
blocks: Tuple[int, int],
missing_in_claims_table=False,
missing_or_stale_in_claims_table=False,
):
channel_txo = TXO.alias('channel_txo')
return select(
2020-07-12 00:18:33 +02:00
*minimum_txo_columns, TXO.c.claim_hash,
staked_support_amount_calc(TXO).label('staked_support_amount'),
staked_support_count_calc(TXO).label('staked_support_count'),
TXO.c.signature, TXO.c.signature_digest,
case([(
TXO.c.channel_hash.isnot(None),
select(channel_txo.c.public_key).select_from(channel_txo).where(
(channel_txo.c.txo_type == TXO_TYPES['channel']) &
(channel_txo.c.claim_hash == TXO.c.channel_hash) &
(channel_txo.c.height <= TXO.c.height)
).order_by(desc(channel_txo.c.height)).limit(1).scalar_subquery()
)]).label('channel_public_key')
2020-07-12 00:18:33 +02:00
).where(
where_unspent_txos(
CLAIM_TYPE_CODES, blocks,
2020-07-12 00:18:33 +02:00
missing_in_claims_table=missing_in_claims_table,
missing_or_stale_in_claims_table=missing_or_stale_in_claims_table,
)
2020-07-13 06:55:30 +02:00
).select_from(TXO.join(TX))
2020-07-12 00:18:33 +02:00
def row_to_claim_for_saving(row) -> Tuple[Output, dict]:
return row_to_txo(row), {
2020-07-12 00:18:33 +02:00
'staked_support_amount': int(row.staked_support_amount),
'staked_support_count': int(row.staked_support_count),
'signature': row.signature,
'signature_digest': row.signature_digest,
'channel_public_key': row.channel_public_key
2020-07-12 00:18:33 +02:00
}
@event_emitter("blockchain.sync.claims.insert", "claims")
def claims_insert(
2020-07-13 06:55:30 +02:00
blocks: Tuple[int, int],
missing_in_claims_table: bool,
p: ProgressContext
2020-07-12 00:18:33 +02:00
):
chain = get_or_initialize_lbrycrd(p.ctx)
p.start(
count_unspent_txos(
CLAIM_TYPE_CODES, blocks,
2020-07-12 00:18:33 +02:00
missing_in_claims_table=missing_in_claims_table,
2020-07-13 19:12:01 +02:00
), progress_id=blocks[0], label=make_label("add claims", blocks)
2020-07-12 00:18:33 +02:00
)
with p.ctx.engine.connect().execution_options(stream_results=True) as c:
loader = p.ctx.get_bulk_loader()
cursor = c.execute(select_claims_for_saving(
blocks, missing_in_claims_table=missing_in_claims_table
2020-07-12 00:18:33 +02:00
).order_by(TXO.c.claim_hash))
for rows in cursor.partitions(900):
2020-07-13 06:55:30 +02:00
claim_metadata = chain.db.sync_get_claim_metadata(
2020-07-12 00:18:33 +02:00
claim_hashes=[row['claim_hash'] for row in rows]
2020-07-13 06:55:30 +02:00
)
i = 0
2020-07-12 00:18:33 +02:00
for row in rows:
metadata = claim_metadata[i] if i < len(claim_metadata) else {}
if metadata and metadata['claim_hash'] == row.claim_hash:
i += 1
2020-07-12 00:18:33 +02:00
txo, extra = row_to_claim_for_saving(row)
extra.update({
'short_url': metadata.get('short_url'),
'creation_height': metadata.get('creation_height'),
'activation_height': metadata.get('activation_height'),
'expiration_height': metadata.get('expiration_height'),
'takeover_height': metadata.get('takeover_height'),
2020-07-12 00:18:33 +02:00
})
loader.add_claim(txo, **extra)
if len(loader.claims) >= 25_000:
p.add(loader.flush(Claim))
p.add(loader.flush(Claim))
2020-07-13 06:55:30 +02:00
@event_emitter("blockchain.sync.claims.indexes", "steps")
def claims_constraints_and_indexes(p: ProgressContext):
2020-07-14 03:00:24 +02:00
p.start(1 + len(pg_add_claim_and_tag_constraints_and_indexes))
2020-07-13 06:55:30 +02:00
if p.ctx.is_postgres:
with p.ctx.engine.connect() as c:
c.execute(text("COMMIT;"))
c.execute(text("VACUUM ANALYZE claim;"))
c.execute(text("VACUUM ANALYZE tag;"))
2020-07-13 06:55:30 +02:00
p.step()
2020-07-14 03:00:24 +02:00
for constraint in pg_add_claim_and_tag_constraints_and_indexes:
if p.ctx.is_postgres:
p.ctx.execute(text(constraint))
2020-07-14 03:00:24 +02:00
p.step()
2020-07-13 06:55:30 +02:00
2020-07-12 00:18:33 +02:00
@event_emitter("blockchain.sync.claims.update", "claims")
def claims_update(blocks: Tuple[int, int], p: ProgressContext):
2020-07-12 00:18:33 +02:00
p.start(
count_unspent_txos(CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True),
2020-07-13 19:12:01 +02:00
progress_id=blocks[0], label=make_label("mod claims", blocks)
2020-07-12 00:18:33 +02:00
)
with p.ctx.engine.connect().execution_options(stream_results=True) as c:
loader = p.ctx.get_bulk_loader()
cursor = c.execute(select_claims_for_saving(
blocks, missing_or_stale_in_claims_table=True
2020-07-12 00:18:33 +02:00
))
for row in cursor:
txo, extra = row_to_claim_for_saving(row)
loader.update_claim(txo, **extra)
2020-07-13 20:29:38 +02:00
if len(loader.update_claims) >= 25:
2020-07-12 00:18:33 +02:00
p.add(loader.flush(Claim))
p.add(loader.flush(Claim))
@event_emitter("blockchain.sync.claims.delete", "claims")
def claims_delete(claims, p: ProgressContext):
2020-07-13 19:12:01 +02:00
p.start(claims, label="del claims")
2020-07-12 00:18:33 +02:00
deleted = p.ctx.execute(Claim.delete().where(where_abandoned_claims()))
p.step(deleted.rowcount)
@event_emitter("blockchain.sync.claims.takeovers", "claims")
def update_takeovers(blocks: Tuple[int, int], takeovers, p: ProgressContext):
2020-07-13 19:12:01 +02:00
p.start(takeovers, label=make_label("mod winner", blocks))
2020-07-12 00:18:33 +02:00
chain = get_or_initialize_lbrycrd(p.ctx)
for takeover in chain.db.sync_get_takeovers(start_height=blocks[0], end_height=blocks[-1]):
update_claims = (
Claim.update()
.where(Claim.c.normalized == takeover['normalized'])
.values(
is_controlling=case(
[(Claim.c.claim_hash == takeover['claim_hash'], True)],
else_=False
),
takeover_height=case(
[(Claim.c.claim_hash == takeover['claim_hash'], takeover['height'])],
else_=None
),
activation_height=least(Claim.c.activation_height, takeover['height']),
)
)
result = p.ctx.execute(update_claims)
p.add(result.rowcount)
@event_emitter("blockchain.sync.claims.stakes", "claims")
def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext):
p.start(claims)
sql = (
Claim.update()
.where(where_claims_with_changed_supports(blocks))
.values(
staked_support_amount=staked_support_amount_calc(Claim),
staked_support_count=staked_support_count_calc(Claim),
)
)
result = p.ctx.execute(sql)
p.step(result.rowcount)
@event_emitter("blockchain.sync.claims.channels", "channels")
def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext):
2020-07-12 00:18:33 +02:00
update_sql = Claim.update().values(
signed_claim_count=channel_content_count_calc(Claim.alias('content')),
signed_support_count=channel_content_count_calc(Support),
)
if initial_sync:
p.start(p.ctx.fetchtotal(Claim.c.claim_type == TXO_TYPES['channel']), label="channel stats")
2020-07-12 00:18:33 +02:00
update_sql = update_sql.where(Claim.c.claim_type == TXO_TYPES['channel'])
elif blocks:
p.start(count_channels_with_changed_content(blocks), label="channel stats")
2020-07-12 00:18:33 +02:00
update_sql = update_sql.where(where_channels_with_changed_content(blocks))
else:
return
2020-07-12 00:18:33 +02:00
result = p.ctx.execute(update_sql)
p.step(result.rowcount)