blockchain

This commit is contained in:
Lex Berezhny 2020-06-21 19:50:53 -04:00
parent d1a243247d
commit 53b7d0a58b
2 changed files with 165 additions and 196 deletions

View file

@ -56,7 +56,7 @@ class BlockchainDB:
) )
self.connection.create_aggregate("find_shortest_id", 2, FindShortestID) self.connection.create_aggregate("find_shortest_id", 2, FindShortestID)
#self.connection.execute( #self.connection.execute(
# "CREATE INDEX IF NOT EXISTS claim_originalheight ON claim (originalheight);" # "CREATE INDEX IF NOT EXISTS claim_originalheight_claimid ON claim (originalheight, claimid);"
#) #)
self.connection.row_factory = sqlite3.Row self.connection.row_factory = sqlite3.Row
@ -205,7 +205,7 @@ class BlockchainDB:
) AS shortestID ) AS shortestID
FROM claim FROM claim
WHERE originalHeight BETWEEN ? AND ? WHERE originalHeight BETWEEN ? AND ?
ORDER BY originalHeight ORDER BY originalHeight, claimid
""", (start_height, end_height) """, (start_height, end_height)
)] )]

View file

@ -4,7 +4,7 @@ import logging
from contextvars import ContextVar from contextvars import ContextVar
from typing import Optional, Tuple, Set, NamedTuple from typing import Optional, Tuple, Set, NamedTuple
from sqlalchemy import func, bindparam, case, distinct, between from sqlalchemy import func, bindparam, case, distinct, desc
from sqlalchemy.future import select from sqlalchemy.future import select
from lbry.event import BroadcastSubscription from lbry.event import BroadcastSubscription
@ -15,7 +15,7 @@ from lbry.db.query_context import progress, context, Event
from lbry.db.queries import rows_to_txos from lbry.db.queries import rows_to_txos
from lbry.db.sync import ( from lbry.db.sync import (
condition_spent_claims, condition_spent_supports, condition_spent_claims, condition_spent_supports,
select_missing_claims, select_stale_claims, select_missing_supports select_missing_supports, process_claim_changes
) )
from lbry.schema.url import normalize_name from lbry.schema.url import normalize_name
@ -90,79 +90,9 @@ def process_takeovers(starting_height: int, ending_height: int):
p.step(done) p.step(done)
def signature_validation(d: dict, row: dict, public_key) -> dict:
d['is_signature_valid'] = False
if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key):
d['is_signature_valid'] = True
return d
def select_updated_channel_keys(starting_height, ending_height, *cols):
return (
select(*cols).select_from(Claim)
.where(
(Claim.c.claim_type == TXO_TYPES['channel']) &
between(Claim.c.public_key_height, starting_height, ending_height)
)
)
def get_updated_channel_key_count(ctx, starting_height, ending_height):
sql = select_updated_channel_keys(
starting_height, ending_height, func.count('*').label('total')
)
return ctx.fetchone(sql)['total']
def get_updated_channel_keys(ctx, starting_height, ending_height):
sql = select_updated_channel_keys(
starting_height, ending_height,
Claim.c.claim_hash, Claim.c.public_key, Claim.c.height
)
return ctx.fetchall(sql)
def get_signables_for_channel(ctx, table, pk, channel):
sql = (
select(pk, table.c.signature, table.c.signature_digest)
.where(table.c.channel_hash == channel['claim_hash'])
)
return ctx.fetchall(sql)
def select_unvalidated_signables(signable, starting_height: int, ending_height: int, *cols):
channel = Claim.alias('channel')
if len(cols) > 1:
cols += (channel.c.public_key,)
return (
select(*cols)
.select_from(signable.join(channel, signable.c.channel_hash == channel.c.claim_hash))
.where(
(signable.c.signature != None) &
(signable.c.is_signature_valid == False) &
between(signable.c.height, starting_height, ending_height)
)
)
def get_unvalidated_signable_count(ctx, signable, starting_height: int, ending_height: int):
sql = select_unvalidated_signables(
signable, starting_height, ending_height, func.count('*').label('total')
)
return ctx.fetchone(sql)['total']
def get_unvalidated_signables(ctx, signable, starting_height: int, ending_height: int, pk):
sql = select_unvalidated_signables(
signable, starting_height, ending_height,
pk, signable.c.signature, signable.c.signature_digest
)
return ctx.fetchall(sql)
class ClaimChanges(NamedTuple): class ClaimChanges(NamedTuple):
deleted_channels: Set[bytes] deleted_channels: Set[bytes]
channels_with_changed_claims: Set[bytes] channels_with_changed_content: Set[bytes]
claims_with_changed_supports: Set[bytes] claims_with_changed_supports: Set[bytes]
@ -187,20 +117,7 @@ def process_claims_and_supports():
p.start(1) p.start(1)
p.ctx.execute(Claim.delete().where(condition_spent_claims())) p.ctx.execute(Claim.delete().where(condition_spent_claims()))
with progress(Event.CLAIM_INSERT) as p: process_claim_changes()
channels_with_added_claims = set()
loader = p.ctx.get_bulk_loader()
for txo in rows_to_txos(p.ctx.fetchall(select_missing_claims)):
loader.add_claim(txo)
if txo.can_decode_claim and txo.claim.is_signed:
channels_with_added_claims.add(txo.claim.signing_channel_hash)
loader.save()
with progress(Event.CLAIM_UPDATE) as p:
loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_stale_claims)):
loader.update_claim(claim)
loader.save()
with progress(Event.SUPPORT_DELETE) as p: with progress(Event.SUPPORT_DELETE) as p:
claims_with_deleted_supports = { claims_with_deleted_supports = {
@ -208,25 +125,28 @@ def process_claims_and_supports():
select(distinct(Support.c.claim_hash)).where(condition_spent_supports) select(distinct(Support.c.claim_hash)).where(condition_spent_supports)
) )
} }
channels_with_deleted_supports = {
r['channel_hash'] for r in p.ctx.fetchall(
select(distinct(Support.c.channel_hash))
.where(condition_spent_supports & (Support.c.channel_hash != None))
)
}
p.start(1) p.start(1)
sql = Support.delete().where(condition_spent_supports) sql = Support.delete().where(condition_spent_supports)
p.ctx.execute(sql) p.ctx.execute(sql)
with progress(Event.SUPPORT_INSERT) as p: with progress(Event.SUPPORT_INSERT) as p:
claims_with_added_supports = { claims_with_added_supports = set()
r['claim_hash'] for r in p.ctx.fetchall(
select(distinct(Support.c.claim_hash)).where(condition_spent_supports)
)
}
loader = p.ctx.get_bulk_loader() loader = p.ctx.get_bulk_loader()
for support in rows_to_txos(p.ctx.fetchall(select_missing_supports)): for txo in rows_to_txos(p.ctx.fetchall(select_missing_supports)):
loader.add_support(support) loader.add_support(txo)
claims_with_added_supports.add(txo.claim_hash)
loader.save() loader.save()
return ClaimChanges( return ClaimChanges(
deleted_channels=deleted_channels, deleted_channels=deleted_channels,
channels_with_changed_claims=( channels_with_changed_content=(
channels_with_added_claims | channels_with_deleted_claims channels_with_deleted_supports | channels_with_deleted_claims
), ),
claims_with_changed_supports=( claims_with_changed_supports=(
claims_with_added_supports | claims_with_deleted_supports claims_with_added_supports | claims_with_deleted_supports
@ -234,35 +154,137 @@ def process_claims_and_supports():
) )
def condition_unvalidated_signables(signable):
return (
(signable.c.is_signature_valid == None) &
(signable.c.channel_hash != None)
)
def get_unvalidated_signable_count(ctx, signable):
sql = (
select(func.count('*').label('total'))
.select_from(signable)
.where(condition_unvalidated_signables(signable))
)
return ctx.fetchone(sql)['total']
def select_unvalidated_signables(signable, pk, include_urls=False, include_previous=False):
sql = (
select(
pk, signable.c.signature, signable.c.signature_digest, signable.c.channel_hash, (
select(TXO.c.public_key).select_from(TXO)
.where(
(TXO.c.claim_hash == signable.c.channel_hash) &
(TXO.c.txo_type == TXO_TYPES['channel']) &
(TXO.c.height <= signable.c.height)
)
.order_by(desc(TXO.c.height))
.scalar_subquery().label('public_key')
),
)
.where(condition_unvalidated_signables(signable))
)
if include_previous:
assert signable.name != 'support', "Supports cannot be updated and don't have a previous."
sql = sql.add_columns(
select(TXO.c.channel_hash).select_from(TXO)
.where(
(TXO.c.claim_hash == signable.c.claim_hash) &
(TXO.c.txo_type.in_(CLAIM_TYPE_CODES)) &
(TXO.c.height <= signable.c.height)
)
.order_by(desc(TXO.c.height)).offset(1)
.scalar_subquery().label('previous_channel_hash')
)
if include_urls:
channel = Claim.alias('channel')
return sql.add_columns(
signable.c.short_url.label('claim_url'),
channel.c.short_url.label('channel_url')
).select_from(signable.join(channel, signable.c.channel_hash == channel.c.claim_hash))
return sql.select_from(signable)
def signature_validation(d: dict, row: dict, public_key) -> dict:
d['is_signature_valid'] = False
if Output.is_signature_valid(bytes(row['signature']), bytes(row['signature_digest']), public_key):
d['is_signature_valid'] = True
return d
def process_signature_validation(changes: ClaimChanges):
with progress(Event.CLAIM_SIGN) as p:
p.start(get_unvalidated_signable_count(p.ctx, Claim))
claim_updates = []
sql = select_unvalidated_signables(
Claim, Claim.c.claim_hash, include_urls=True, include_previous=changes is not None
)
for claim in p.ctx.execute(sql):
claim_updates.append(
signature_validation({
'pk': claim['claim_hash'],
'canonical_url': claim['channel_url'] + '/' + claim['claim_url']
}, claim, claim['public_key'])
)
if changes is not None:
changes.channels_with_changed_content.add(claim['channel_hash'])
if claim['previous_channel_hash']:
changes.channels_with_changed_content.add(claim['previous_channel_hash'])
if len(claim_updates) > 500:
p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates)
p.step(len(claim_updates))
claim_updates.clear()
if claim_updates:
p.ctx.execute(Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates)
del claim_updates
with progress(Event.SUPPORT_SIGN) as p:
p.start(get_unvalidated_signable_count(p.ctx, Support))
support_updates = []
for support in p.ctx.execute(select_unvalidated_signables(Support, Support.c.txo_hash)):
support_updates.append(
signature_validation({'pk': support['txo_hash']}, support, support['public_key'])
)
if changes is not None:
changes.channels_with_changed_content.add(support['channel_hash'])
if len(support_updates) > 500:
p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates)
p.step(len(support_updates))
support_updates.clear()
if support_updates:
p.ctx.execute(Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates)
del support_updates
def channel_content_count_calc(signable):
return (
select(func.count('*'))
.select_from(signable)
.where((signable.c.channel_hash == Claim.c.claim_hash) & signable.c.is_signature_valid)
.scalar_subquery()
)
def claim_support_aggregation(*cols):
return (
select(*cols)
.select_from(Support)
.where(Support.c.claim_hash == Claim.c.claim_hash)
.scalar_subquery()
)
def process_metadata(starting_height: int, ending_height: int, initial_sync: bool): def process_metadata(starting_height: int, ending_height: int, initial_sync: bool):
# TODO:
# - claim updates to point to a different channel
# - deleting a channel should invalidate contained claim signatures
chain = get_or_initialize_lbrycrd() chain = get_or_initialize_lbrycrd()
channel = Claim.alias('channel') channel = Claim.alias('channel')
stream = Claim.alias('stream')
changes = process_claims_and_supports() if not initial_sync else None changes = process_claims_and_supports() if not initial_sync else None
support_amount_calculator = ( staked_support_amount_calc = claim_support_aggregation(func.coalesce(func.sum(Support.c.amount), 0))
select(func.coalesce(func.sum(Support.c.amount), 0) + Claim.c.amount) staked_support_count_calc = claim_support_aggregation(func.count('*'))
.select_from(Support)
.where(Support.c.claim_hash == Claim.c.claim_hash)
.scalar_subquery()
)
supports_in_claim_calculator = (
select(func.count('*'))
.select_from(Support)
.where(Support.c.claim_hash == Claim.c.claim_hash)
.scalar_subquery()
)
stream = Claim.alias('stream')
claims_in_channel_calculator = (
select(func.count('*'))
.select_from(stream)
.where(stream.c.channel_hash == Claim.c.claim_hash)
.scalar_subquery()
)
with progress(Event.CLAIM_META) as p: with progress(Event.CLAIM_META) as p:
p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height)) p.start(chain.db.sync_get_claim_metadata_count(start_height=starting_height, end_height=ending_height))
@ -275,10 +297,15 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo
.where(channel.c.claim_hash == Claim.c.channel_hash) .where(channel.c.claim_hash == Claim.c.channel_hash)
.scalar_subquery() + '/' + bindparam('short_url_') .scalar_subquery() + '/' + bindparam('short_url_')
)], else_=Claim.c.canonical_url), )], else_=Claim.c.canonical_url),
support_amount=support_amount_calculator, staked_support_amount=staked_support_amount_calc,
supports_in_claim_count=supports_in_claim_calculator, staked_support_count=staked_support_count_calc,
claims_in_channel_count=case([( signed_claim_count=case([(
(Claim.c.claim_type == TXO_TYPES['channel']), claims_in_channel_calculator (Claim.c.claim_type == TXO_TYPES['channel']),
channel_content_count_calc(stream)
)], else_=0),
signed_support_count=case([(
(Claim.c.claim_type == TXO_TYPES['channel']),
channel_content_count_calc(Support)
)], else_=0), )], else_=0),
) )
) )
@ -292,6 +319,8 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo
done += len(claims) done += len(claims)
p.step(done) p.step(done)
process_signature_validation(changes)
if not initial_sync and changes.claims_with_changed_supports: if not initial_sync and changes.claims_with_changed_supports:
# covered by Event.CLAIM_META during initial_sync, then only run if supports change # covered by Event.CLAIM_META during initial_sync, then only run if supports change
with progress(Event.CLAIM_CALC) as p: with progress(Event.CLAIM_CALC) as p:
@ -300,20 +329,23 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo
Claim.update() Claim.update()
.where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports))) .where((Claim.c.claim_hash.in_(changes.claims_with_changed_supports)))
.values( .values(
support_amount=support_amount_calculator, staked_support_amount=staked_support_amount_calc,
supports_in_claim_count=supports_in_claim_calculator, staked_support_count=staked_support_count_calc,
) )
) )
p.ctx.execute(sql) p.ctx.execute(sql)
if not initial_sync and changes.channels_with_changed_claims: if not initial_sync and changes.channels_with_changed_content:
# covered by Event.CLAIM_META during initial_sync, then only run if claims are deleted # covered by Event.CLAIM_META during initial_sync, then only run if claims are deleted
with progress(Event.CLAIM_CALC) as p: with progress(Event.CLAIM_CALC) as p:
p.start(len(changes.channels_with_changed_claims)) p.start(len(changes.channels_with_changed_content))
sql = ( sql = (
Claim.update() Claim.update()
.where((Claim.c.claim_hash.in_(changes.channels_with_changed_claims))) .where((Claim.c.claim_hash.in_(changes.channels_with_changed_content)))
.values(claims_in_channel_count=claims_in_channel_calculator) .values(
signed_claim_count=channel_content_count_calc(stream),
signed_support_count=channel_content_count_calc(Support),
)
) )
p.ctx.execute(sql) p.ctx.execute(sql)
@ -356,69 +388,6 @@ def process_metadata(starting_height: int, ending_height: int, initial_sync: boo
# done += len(supports) # done += len(supports)
# p.step(done) # p.step(done)
with progress(Event.CHANNEL_SIGN) as p:
p.start(get_updated_channel_key_count(p.ctx, starting_height, ending_height))
done, step_size = 0, 500
for offset in range(starting_height, ending_height+1, step_size):
channels = get_updated_channel_keys(p.ctx, offset, min(offset+step_size, ending_height))
for channel in channels:
claim_updates = []
for claim in get_signables_for_channel(p.ctx, Claim, Claim.c.claim_hash, channel):
claim_updates.append(
signature_validation({'pk': claim['claim_hash']}, claim, channel['public_key'])
)
if claim_updates:
p.ctx.execute(
Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates
)
support_updates = []
for support in get_signables_for_channel(p.ctx, Support, Support.c.txo_hash, channel):
support_updates.append(
signature_validation({'pk': support['txo_hash']}, support, channel['public_key'])
)
if support_updates:
p.ctx.execute(
Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates
)
p.step(len(channels))
with progress(Event.CLAIM_SIGN) as p:
p.start(get_unvalidated_signable_count(p.ctx, Claim, starting_height, ending_height))
done, step_size = 0, 500
for offset in range(starting_height, ending_height+1, step_size):
claims = get_unvalidated_signables(
p.ctx, Claim, offset, min(offset+step_size, ending_height), Claim.c.claim_hash)
claim_updates = []
for claim in claims:
claim_updates.append(
signature_validation({'pk': claim['claim_hash']}, claim, claim['public_key'])
)
if claim_updates:
p.ctx.execute(
Claim.update().where(Claim.c.claim_hash == bindparam('pk')), claim_updates
)
p.step(done)
with progress(Event.SUPPORT_SIGN) as p:
p.start(get_unvalidated_signable_count(p.ctx, Support, starting_height, ending_height))
done, step_size = 0, 500
for offset in range(starting_height, ending_height+1, step_size):
supports = get_unvalidated_signables(
p.ctx, Support, offset, min(offset+step_size, ending_height), Support.c.txo_hash)
support_updates = []
for support in supports:
support_updates.append(
signature_validation({'pk': support['txo_hash']}, support, support['public_key'])
)
if support_updates:
p.ctx.execute(
Support.update().where(Support.c.txo_hash == bindparam('pk')), support_updates
)
p.step(done)
def process_block_and_tx_filters(): def process_block_and_tx_filters():
@ -498,7 +467,7 @@ class BlockchainSync(Sync):
if -1 < our_best_file_height < chain_file['best_height']: if -1 < our_best_file_height < chain_file['best_height']:
# we have some blocks, need to figure out what we're missing # we have some blocks, need to figure out what we're missing
# call get_block_files again limited to this file and current_height # call get_block_files again limited to this file and current_height
file = (await self.chain.db.get_block_files( chain_file = (await self.chain.db.get_block_files(
file_number=chain_file['file_number'], start_height=our_best_file_height+1 file_number=chain_file['file_number'], start_height=our_best_file_height+1
))[0] ))[0]
tx_count += chain_file['txs'] tx_count += chain_file['txs']