merged channel and content syncs and no longer storing canonical_url in column

This commit is contained in:
Lex Berezhny 2020-07-13 09:30:32 -04:00
parent 09a2b2fa46
commit e63151a370
6 changed files with 80 additions and 161 deletions

View file

@ -1,5 +1,5 @@
import logging
from typing import Tuple, Union
from typing import Tuple
from sqlalchemy import case, func, desc, text
from sqlalchemy.future import select
@ -8,12 +8,12 @@ from lbry.db.queries.txio import (
minimum_txo_columns, row_to_txo,
where_unspent_txos, where_claims_with_changed_supports,
count_unspent_txos, where_channels_with_changed_content,
where_abandoned_claims
where_abandoned_claims, count_channels_with_changed_content
)
from lbry.db.query_context import ProgressContext, event_emitter
from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_constraints_and_indexes
from lbry.db.utils import least
from lbry.db.constants import TXO_TYPES
from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES
from lbry.blockchain.transaction import Output
from .context import get_or_initialize_lbrycrd
@ -64,58 +64,45 @@ def make_label(action, blocks):
def select_claims_for_saving(
txo_types: Union[int, Tuple[int, ...]],
blocks: Tuple[int, int],
missing_in_claims_table=False,
missing_or_stale_in_claims_table=False,
):
select_claims = select(
channel_txo = TXO.alias('channel_txo')
return select(
*minimum_txo_columns, TXO.c.claim_hash,
staked_support_amount_calc(TXO).label('staked_support_amount'),
staked_support_count_calc(TXO).label('staked_support_count')
staked_support_count_calc(TXO).label('staked_support_count'),
TXO.c.signature, TXO.c.signature_digest,
case([(
TXO.c.channel_hash.isnot(None),
select(channel_txo.c.public_key).select_from(channel_txo).where(
(channel_txo.c.txo_type == TXO_TYPES['channel']) &
(channel_txo.c.claim_hash == TXO.c.channel_hash) &
(channel_txo.c.height <= TXO.c.height)
).order_by(desc(channel_txo.c.height)).limit(1).scalar_subquery()
)]).label('channel_public_key')
).where(
where_unspent_txos(
txo_types, blocks,
CLAIM_TYPE_CODES, blocks,
missing_in_claims_table=missing_in_claims_table,
missing_or_stale_in_claims_table=missing_or_stale_in_claims_table,
)
).select_from(TXO.join(TX))
if txo_types != TXO_TYPES['channel']:
channel_txo = TXO.alias('channel_txo')
return (
select_claims.add_columns(
TXO.c.signature, TXO.c.signature_digest,
case([(
TXO.c.channel_hash.isnot(None),
select(channel_txo.c.public_key).select_from(channel_txo).where(
(channel_txo.c.txo_type == TXO_TYPES['channel']) &
(channel_txo.c.claim_hash == TXO.c.channel_hash) &
(channel_txo.c.height <= TXO.c.height)
).order_by(desc(channel_txo.c.height)).limit(1).scalar_subquery()
)]).label('channel_public_key')
)
)
return select_claims
def row_to_claim_for_saving(row) -> Tuple[Output, dict]:
txo = row_to_txo(row)
extra = {
return row_to_txo(row), {
'staked_support_amount': int(row.staked_support_amount),
'staked_support_count': int(row.staked_support_count),
'signature': row.signature,
'signature_digest': row.signature_digest,
'channel_public_key': row.channel_public_key
}
if hasattr(row, 'signature'):
extra.update({
'signature': row.signature,
'signature_digest': row.signature_digest,
'channel_public_key': row.channel_public_key
})
return txo, extra
@event_emitter("blockchain.sync.claims.insert", "claims")
def claims_insert(
txo_types: Union[int, Tuple[int, ...]],
blocks: Tuple[int, int],
missing_in_claims_table: bool,
p: ProgressContext
@ -124,23 +111,21 @@ def claims_insert(
p.start(
count_unspent_txos(
txo_types, blocks,
CLAIM_TYPE_CODES, blocks,
missing_in_claims_table=missing_in_claims_table,
), progress_id=blocks[0], label=make_label("add claims at", blocks)
)
channel_url_cache = {}
with p.ctx.engine.connect().execution_options(stream_results=True) as c:
loader = p.ctx.get_bulk_loader()
cursor = c.execute(select_claims_for_saving(
txo_types, blocks, missing_in_claims_table=missing_in_claims_table
blocks, missing_in_claims_table=missing_in_claims_table
).order_by(TXO.c.claim_hash))
for rows in cursor.partitions(900):
claim_metadata = chain.db.sync_get_claim_metadata(
claim_hashes=[row['claim_hash'] for row in rows]
)
i, txos_w_extra, unknown_channel_urls, txos_wo_channel_url = 0, [], set(), []
i = 0
for row in rows:
metadata = claim_metadata[i] if i < len(claim_metadata) else None
if metadata is None:
@ -157,37 +142,12 @@ def claims_insert(
'expiration_height': metadata['expiration_height'],
'takeover_height': metadata['takeover_height'],
})
txos_w_extra.append((txo, extra))
set_or_add_to_url_lookup(
channel_url_cache, txo, extra, unknown_channel_urls, txos_wo_channel_url
)
perform_url_lookup(chain, channel_url_cache, unknown_channel_urls, txos_wo_channel_url)
for txo, extra in txos_w_extra:
loader.add_claim(txo, **extra)
if len(loader.claims) >= 25_000:
p.add(loader.flush(Claim))
p.add(loader.flush(Claim))
def set_or_add_to_url_lookup(cache: dict, txo: Output, extra: dict, to_lookup: set, to_set: list):
claim = txo.can_decode_claim
if claim and claim.is_signed:
if claim.signing_channel_hash not in cache:
to_lookup.add(claim.signing_channel_hash)
to_set.append((claim.signing_channel_hash, extra))
else:
extra['channel_url'] = cache[claim.signing_channel_hash]
def perform_url_lookup(chain, cache, to_lookup: set, to_set: list):
if to_lookup:
channels = chain.db.sync_get_claim_metadata(claim_hashes=list(to_lookup))
for channel in channels:
cache[channel['claim_hash']] = channel['short_url']
for channel_hash, extra in to_set:
extra['channel_url'] = cache.get(channel_hash)
@event_emitter("blockchain.sync.claims.indexes", "steps")
def claims_constraints_and_indexes(p: ProgressContext):
p.start(2)
@ -202,27 +162,18 @@ def claims_constraints_and_indexes(p: ProgressContext):
@event_emitter("blockchain.sync.claims.update", "claims")
def claims_update(txo_types: Union[int, Tuple[int, ...]], blocks: Tuple[int, int], p: ProgressContext):
def claims_update(blocks: Tuple[int, int], p: ProgressContext):
p.start(
count_unspent_txos(txo_types, blocks, missing_or_stale_in_claims_table=True),
count_unspent_txos(CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True),
progress_id=blocks[0], label=make_label("update claims at", blocks)
)
chain = get_or_initialize_lbrycrd(p.ctx)
with p.ctx.engine.connect().execution_options(stream_results=True) as c:
loader = p.ctx.get_bulk_loader()
cursor = c.execute(select_claims_for_saving(
txo_types, blocks, missing_or_stale_in_claims_table=True
blocks, missing_or_stale_in_claims_table=True
))
channel_url_cache = {}
for row in cursor:
txo, extra = row_to_claim_for_saving(row)
claim = txo.can_decode_claim
if claim and claim.is_signed:
if claim.signing_channel_hash not in channel_url_cache:
channels = chain.db.sync_get_claim_metadata(claim_hashes=[claim.signing_channel_hash])
if channels:
channel_url_cache[channels[0]['claim_hash']] = channels[0]['short_url']
extra['channel_url'] = channel_url_cache.get(claim.signing_channel_hash)
loader.update_claim(txo, **extra)
if len(loader.update_claims) >= 500:
p.add(loader.flush(Claim))
@ -276,15 +227,18 @@ def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext):
@event_emitter("blockchain.sync.claims.channels", "channels")
def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, channels: int, p: ProgressContext):
p.start(channels, label="channel stats")
def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext):
update_sql = Claim.update().values(
signed_claim_count=channel_content_count_calc(Claim.alias('content')),
signed_support_count=channel_content_count_calc(Support),
)
if initial_sync:
p.start(p.ctx.fetchtotal(Claim.c.claim_type == TXO_TYPES['channel']), label="channel stats")
update_sql = update_sql.where(Claim.c.claim_type == TXO_TYPES['channel'])
else:
elif blocks:
p.start(count_channels_with_changed_content(blocks), label="channel stats")
update_sql = update_sql.where(where_channels_with_changed_content(blocks))
else:
return
result = p.ctx.execute(update_sql)
p.step(result.rowcount)

View file

@ -5,7 +5,7 @@ from typing import Optional, Tuple, Set, List, Coroutine
from lbry.db import Database
from lbry.db import queries as q
from lbry.db.constants import TXO_TYPES, CONTENT_TYPE_CODES
from lbry.db.constants import TXO_TYPES, CLAIM_TYPE_CODES
from lbry.db.query_context import Event, Progress
from lbry.event import BroadcastSubscription
from lbry.service.base import Sync, BlockEvent
@ -187,73 +187,44 @@ class BlockchainSync(Sync):
start_height=blocks[0], end_height=blocks[-1]
)
async def sync_claims(self, blocks):
total = delete_claims = takeovers = claims_with_changed_supports = 0
async def sync_claims(self, blocks) -> bool:
delete_claims = takeovers = claims_with_changed_supports = 0
initial_sync = not await self.db.has_claims()
with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p:
if initial_sync:
p.start(2)
# 1. distribute channel insertion load
channels, channel_batches = await self.distribute_unspent_txos(TXO_TYPES['channel'])
channels_with_changed_content = channels
total += channels + channels_with_changed_content
p.step()
# 2. distribute content insertion load
content, content_batches = await self.distribute_unspent_txos(CONTENT_TYPE_CODES)
total += content
p.step()
total, batches = await self.distribute_unspent_txos(CLAIM_TYPE_CODES)
elif blocks:
p.start(6)
# 1. channel claims to be inserted or updated
channels = await self.count_unspent_txos(
TXO_TYPES['channel'], blocks, missing_or_stale_in_claims_table=True
p.start(4)
# 1. content claims to be inserted or updated
total = await self.count_unspent_txos(
CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True
)
channel_batches = [blocks] if channels else []
total += channels
batches = [blocks] if total else []
p.step()
# 2. content claims to be inserted or updated
content = await self.count_unspent_txos(
CONTENT_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True
)
content_batches = [blocks] if content else []
total += content
p.step()
# 3. claims to be deleted
# 2. claims to be deleted
delete_claims = await self.count_abandoned_claims()
total += delete_claims
p.step()
# 4. claims to be updated with new support totals
# 3. claims to be updated with new support totals
claims_with_changed_supports = await self.count_claims_with_changed_supports(blocks)
total += claims_with_changed_supports
p.step()
# 5. channels to be updated with changed content totals
channels_with_changed_content = await self.count_channels_with_changed_content(blocks)
total += channels_with_changed_content
p.step()
# 6. claims to be updated due to name takeovers
# 5. claims to be updated due to name takeovers
takeovers = await self.count_takeovers(blocks)
total += takeovers
p.step()
else:
return
return initial_sync
with Progress(self.db.message_queue, CLAIMS_MAIN_EVENT) as p:
p.start(total)
insertions = [
(TXO_TYPES['channel'], channel_batches),
(CONTENT_TYPE_CODES, content_batches),
]
for txo_type, batches in insertions:
if batches:
if batches:
await self.run_tasks([
self.db.run(claim_phase.claims_insert, batch, not initial_sync) for batch in batches
])
if not initial_sync:
await self.run_tasks([
self.db.run(
claim_phase.claims_insert, txo_type, batch, not initial_sync
) for batch in batches
self.db.run(claim_phase.claims_update, batch) for batch in batches
])
if not initial_sync:
await self.run_tasks([
self.db.run(claim_phase.claims_update, txo_type, batch)
for batch in batches
])
if delete_claims:
await self.db.run(claim_phase.claims_delete, delete_claims)
if takeovers:
@ -262,8 +233,7 @@ class BlockchainSync(Sync):
await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports)
if initial_sync:
await self.db.run(claim_phase.claims_constraints_and_indexes)
if channels_with_changed_content:
return initial_sync, channels_with_changed_content
return initial_sync
async def sync_supports(self, blocks):
delete_supports = 0
@ -298,11 +268,8 @@ class BlockchainSync(Sync):
if initial_sync:
await self.db.run(support_phase.supports_constraints_and_indexes)
async def sync_channel_stats(self, blocks, initial_sync, channels_with_changed_content):
if channels_with_changed_content:
await self.db.run(
claim_phase.update_channel_stats, blocks, initial_sync, channels_with_changed_content
)
async def sync_channel_stats(self, blocks, initial_sync):
await self.db.run(claim_phase.update_channel_stats, blocks, initial_sync)
async def sync_trends(self):
pass
@ -312,10 +279,9 @@ class BlockchainSync(Sync):
sync_filters_task = asyncio.create_task(self.sync_filters())
sync_trends_task = asyncio.create_task(self.sync_trends())
await self.sync_spends(blocks_added)
channel_stats = await self.sync_claims(blocks_added)
initial_claim_sync = await self.sync_claims(blocks_added)
await self.sync_supports(blocks_added)
if channel_stats:
await self.sync_channel_stats(blocks_added, *channel_stats)
await self.sync_channel_stats(blocks_added, initial_claim_sync)
await sync_trends_task
await sync_filters_task
if blocks_added:

View file

@ -4,7 +4,7 @@ from decimal import Decimal
from binascii import unhexlify
from typing import Tuple, List, Optional
from sqlalchemy import func
from sqlalchemy import func, case
from sqlalchemy.future import select, Select
from lbry.schema.tags import clean_tags
@ -56,6 +56,7 @@ def search_support_count(**constraints) -> int:
return count[0]['total'] or 0
channel_claim = Claim.alias('channel')
BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Claim.c.activation_height,
Claim.c.takeover_height,
@ -64,15 +65,19 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Claim.c.channel_hash,
Claim.c.reposted_claim_hash,
Claim.c.short_url,
Claim.c.canonical_url,
Claim.c.signed_claim_count,
Claim.c.signed_support_count,
(Claim.c.amount + Claim.c.staked_support_amount).label('staked_amount'),
Claim.c.staked_support_amount,
Claim.c.staked_support_count,
Claim.c.is_signature_valid,
case([(
channel_claim.c.short_url.isnot(None),
channel_claim.c.short_url + '/' + Claim.c.short_url
)]).label('canonical_url'),
]
def select_claims(cols: List = None, for_count=False, **constraints) -> Select:
if cols is None:
cols = BASE_SELECT_CLAIM_COLUMNS
@ -203,8 +208,14 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select:
# TODO: fix
constraints["search"] = constraints.pop("text")
joins = Claim.join(TXO).join(TX)
return query([Claim], select(*cols).select_from(joins), **constraints)
return query(
[Claim],
select(*cols)
.select_from(
Claim.join(TXO).join(TX)
.join(channel_claim, Claim.c.channel_hash == channel_claim.c.claim_hash, isouter=True)
), **constraints
)
def search_claims(**constraints) -> Tuple[List[Output], Optional[int], Optional[Censor]]:
@ -226,6 +237,8 @@ def search_claim_count(**constraints) -> int:
constraints.pop('order_by', None)
count = context().fetchall(select_claims([func.count().label('total')], **constraints))
return count[0]['total'] or 0
CLAIM_HASH_OR_REPOST_HASH_SQL = f"""
CASE WHEN claim.claim_type = {TXO_TYPES['repost']}
THEN claim.reposted_claim_hash

View file

@ -612,7 +612,7 @@ class BulkLoader:
def add_claim(
self, txo: Output, short_url: str,
creation_height: int, activation_height: int, expiration_height: int,
takeover_height: int = None, channel_url: str = None, **extra
takeover_height: int = None, **extra
):
try:
claim_name = txo.claim_name.replace('\x00', '')
@ -630,18 +630,13 @@ class BulkLoader:
d['expiration_height'] = expiration_height
d['takeover_height'] = takeover_height
d['is_controlling'] = takeover_height is not None
if d['is_signature_valid'] and channel_url is not None:
d['canonical_url'] = channel_url + '/' + short_url
else:
d['canonical_url'] = None
self.claims.append(d)
self.tags.extend(tags)
return self
def update_claim(self, txo: Output, channel_url: str = None, **extra):
def update_claim(self, txo: Output, **extra):
d, tags = self.claim_to_rows(txo, **extra)
d['pk'] = txo.claim_hash
d['channel_url'] = channel_url
d['set_canonical_url'] = d['is_signature_valid']
self.update_claims.append(d)
self.delete_tags.append({'pk': txo.claim_hash})
@ -656,11 +651,7 @@ class BulkLoader:
(TXI.insert(), self.txis),
(Claim.insert(), self.claims),
(Tag.delete().where(Tag.c.claim_hash == bindparam('pk')), self.delete_tags),
(Claim.update().where(Claim.c.claim_hash == bindparam('pk')).values(
canonical_url=case([
(bindparam('set_canonical_url'), bindparam('channel_url') + '/' + Claim.c.short_url)
], else_=None)
), self.update_claims),
(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), self.update_claims),
(Tag.insert(), self.tags),
(Support.insert(), self.supports),
)

View file

@ -146,10 +146,10 @@ Claim = Table(
Column('sync_height', Integer), # claim dynamic values up-to-date as of this height (eg. staked_amount)
Column('is_controlling', Boolean),
# normalized#shortest-unique-claim_id
# short_url: normalized#shortest-unique-claim_id
Column('short_url', Text),
# channel's-short_url/normalized#shortest-unique-claim_id-within-channel
Column('canonical_url', Text, nullable=True),
# canonical_url: channel's-short_url/normalized#shortest-unique-claim_id-within-channel
# canonical_url is computed dynamically
Column('title', Text, nullable=True),
Column('author', Text, nullable=True),

View file

@ -536,11 +536,6 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
(0, None, (9,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,))
]
)
self.assertConsumingEvents(
events, "blockchain.sync.claims.init", ("steps",), [
(0, None, (2,), (1,), (2,))
]
)
self.assertEqual(
events.pop(0), {
"event": "blockchain.sync.claims.main",
@ -648,7 +643,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
)
self.assertConsumingEvents(
events, "blockchain.sync.claims.init", ("steps",), [
(0, None, (6,), (1,), (2,), (3,), (4,), (5,), (6,))
(0, None, (4,), (1,), (2,), (3,), (4,))
]
)
self.assertEqual(