Merge pull request #3039 from lbryio/fwss_fix_resolve

Include enough information in resolve protobuf so its possible to verify signatures
This commit is contained in:
Lex Berezhny 2020-09-24 20:57:56 -04:00 committed by GitHub
commit 85b8877933
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 272 additions and 67 deletions

View file

@ -16,7 +16,9 @@ FILES = [
def make_short_url(r): def make_short_url(r):
try: try:
return f'{normalize_name(r["name"].decode())}#{r["shortestID"] or r["claimID"][::-1].hex()[0]}' # TODO: we describe it as normalized but the old SDK didnt do that
name = r["name"].decode().replace("\x00", "")
return f'{name}#{r["shortestID"] or r["claimID"][::-1].hex()[0]}'
except UnicodeDecodeError: except UnicodeDecodeError:
# print(f'failed making short url due to name parse error for claim_id: {r["claimID"][::-1].hex()}') # print(f'failed making short url due to name parse error for claim_id: {r["claimID"][::-1].hex()}')
return "INVALID NAME" return "INVALID NAME"

View file

@ -4,7 +4,10 @@ from binascii import hexlify
from collections import namedtuple from collections import namedtuple
from .bcd_data_stream import BCDataStream from .bcd_data_stream import BCDataStream
from .util import subclass_tuple
def subclass_tuple(name, base):
return type(name, (base,), {'__slots__': ()})
# bitcoin opcodes # bitcoin opcodes

View file

@ -8,7 +8,8 @@ from lbry.db.queries.txio import (
minimum_txo_columns, row_to_txo, minimum_txo_columns, row_to_txo,
where_unspent_txos, where_claims_with_changed_supports, where_unspent_txos, where_claims_with_changed_supports,
count_unspent_txos, where_channels_with_changed_content, count_unspent_txos, where_channels_with_changed_content,
where_abandoned_claims, count_channels_with_changed_content where_abandoned_claims, count_channels_with_changed_content,
where_claims_with_changed_reposts,
) )
from lbry.db.query_context import ProgressContext, event_emitter from lbry.db.query_context import ProgressContext, event_emitter
from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes from lbry.db.tables import TX, TXO, Claim, Support, pg_add_claim_and_tag_constraints_and_indexes
@ -56,6 +57,17 @@ def staked_support_count_calc(other):
) )
def reposted_claim_count_calc(other):
repost = TXO.alias('repost')
return (
select(func.coalesce(func.count(repost.c.reposted_claim_hash), 0))
.where(
(repost.c.reposted_claim_hash == other.c.claim_hash) &
(repost.c.spent_height == 0)
).scalar_subquery()
)
def make_label(action, blocks): def make_label(action, blocks):
if blocks[0] == blocks[-1]: if blocks[0] == blocks[-1]:
return f"{action} {blocks[0]:>6}" return f"{action} {blocks[0]:>6}"
@ -73,6 +85,7 @@ def select_claims_for_saving(
*minimum_txo_columns, TXO.c.claim_hash, *minimum_txo_columns, TXO.c.claim_hash,
staked_support_amount_calc(TXO).label('staked_support_amount'), staked_support_amount_calc(TXO).label('staked_support_amount'),
staked_support_count_calc(TXO).label('staked_support_count'), staked_support_count_calc(TXO).label('staked_support_count'),
reposted_claim_count_calc(TXO).label('reposted_count'),
TXO.c.signature, TXO.c.signature_digest, TXO.c.signature, TXO.c.signature_digest,
case([( case([(
TXO.c.channel_hash.isnot(None), TXO.c.channel_hash.isnot(None),
@ -95,6 +108,7 @@ def row_to_claim_for_saving(row) -> Tuple[Output, dict]:
return row_to_txo(row), { return row_to_txo(row), {
'staked_support_amount': int(row.staked_support_amount), 'staked_support_amount': int(row.staked_support_amount),
'staked_support_count': int(row.staked_support_count), 'staked_support_count': int(row.staked_support_count),
'reposted_count': int(row.reposted_count),
'signature': row.signature, 'signature': row.signature,
'signature_digest': row.signature_digest, 'signature_digest': row.signature_digest,
'channel_public_key': row.channel_public_key 'channel_public_key': row.channel_public_key
@ -237,6 +251,18 @@ def update_stakes(blocks: Tuple[int, int], claims: int, p: ProgressContext):
p.step(result.rowcount) p.step(result.rowcount)
@event_emitter("blockchain.sync.claims.reposts", "claims")
def update_reposts(blocks: Tuple[int, int], claims: int, p: ProgressContext):
p.start(claims)
sql = (
Claim.update()
.where(where_claims_with_changed_reposts(blocks))
.values(reposted_count=reposted_claim_count_calc(Claim))
)
result = p.ctx.execute(sql)
p.step(result.rowcount)
@event_emitter("blockchain.sync.claims.channels", "channels") @event_emitter("blockchain.sync.claims.channels", "channels")
def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext): def update_channel_stats(blocks: Tuple[int, int], initial_sync: int, p: ProgressContext):
update_sql = Claim.update().values( update_sql = Claim.update().values(

View file

@ -217,6 +217,9 @@ class BlockchainSync(Sync):
async def count_claims_with_changed_supports(self, blocks) -> int: async def count_claims_with_changed_supports(self, blocks) -> int:
return await self.db.run(q.count_claims_with_changed_supports, blocks) return await self.db.run(q.count_claims_with_changed_supports, blocks)
async def count_claims_with_changed_reposts(self, blocks) -> int:
return await self.db.run(q.count_claims_with_changed_reposts, blocks)
async def count_channels_with_changed_content(self, blocks) -> int: async def count_channels_with_changed_content(self, blocks) -> int:
return await self.db.run(q.count_channels_with_changed_content, blocks) return await self.db.run(q.count_channels_with_changed_content, blocks)
@ -226,13 +229,13 @@ class BlockchainSync(Sync):
) )
async def sync_claims(self, blocks) -> bool: async def sync_claims(self, blocks) -> bool:
delete_claims = takeovers = claims_with_changed_supports = 0 delete_claims = takeovers = claims_with_changed_supports = claims_with_changed_reposts = 0
initial_sync = not await self.db.has_claims() initial_sync = not await self.db.has_claims()
with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p: with Progress(self.db.message_queue, CLAIMS_INIT_EVENT) as p:
if initial_sync: if initial_sync:
total, batches = await self.distribute_unspent_txos(CLAIM_TYPE_CODES) total, batches = await self.distribute_unspent_txos(CLAIM_TYPE_CODES)
elif blocks: elif blocks:
p.start(4) p.start(5)
# 1. content claims to be inserted or updated # 1. content claims to be inserted or updated
total = await self.count_unspent_txos( total = await self.count_unspent_txos(
CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True CLAIM_TYPE_CODES, blocks, missing_or_stale_in_claims_table=True
@ -247,6 +250,10 @@ class BlockchainSync(Sync):
claims_with_changed_supports = await self.count_claims_with_changed_supports(blocks) claims_with_changed_supports = await self.count_claims_with_changed_supports(blocks)
total += claims_with_changed_supports total += claims_with_changed_supports
p.step() p.step()
# 4. claims to be updated with new repost totals
claims_with_changed_reposts = await self.count_claims_with_changed_reposts(blocks)
total += claims_with_changed_reposts
p.step()
# 5. claims to be updated due to name takeovers # 5. claims to be updated due to name takeovers
takeovers = await self.count_takeovers(blocks) takeovers = await self.count_takeovers(blocks)
total += takeovers total += takeovers
@ -270,6 +277,8 @@ class BlockchainSync(Sync):
await self.db.run(claim_phase.update_takeovers, blocks, takeovers) await self.db.run(claim_phase.update_takeovers, blocks, takeovers)
if claims_with_changed_supports: if claims_with_changed_supports:
await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports) await self.db.run(claim_phase.update_stakes, blocks, claims_with_changed_supports)
if claims_with_changed_reposts:
await self.db.run(claim_phase.update_reposts, blocks, claims_with_changed_reposts)
if initial_sync: if initial_sync:
await self.db.run(claim_phase.claims_constraints_and_indexes) await self.db.run(claim_phase.claims_constraints_and_indexes)
else: else:

View file

@ -16,10 +16,6 @@ class ReadOnlyList(Sequence[T]):
return len(self.lst) return len(self.lst)
def subclass_tuple(name, base):
return type(name, (base,), {'__slots__': ()})
class ArithUint256: class ArithUint256:
# https://github.com/bitcoin/bitcoin/blob/master/src/arith_uint256.cpp # https://github.com/bitcoin/bitcoin/blob/master/src/arith_uint256.cpp

View file

@ -53,7 +53,7 @@ SEARCH_INTEGER_PARAMS = {
'height', 'creation_height', 'activation_height', 'expiration_height', 'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'reposted', 'tx_position', 'channel_join', 'reposted',
'amount', 'effective_amount', 'support_amount', 'amount', 'staked_amount', 'support_amount',
'trending_group', 'trending_mixed', 'trending_group', 'trending_mixed',
'trending_local', 'trending_global', 'trending_local', 'trending_global',
} }

View file

@ -1,58 +1,63 @@
import logging import logging
import itertools import itertools
from operator import itemgetter
from typing import List, Dict from typing import List, Dict
from lbry.schema.url import URL from lbry.schema.url import URL
from lbry.schema.result import Outputs as ResultOutput from lbry.schema.result import Outputs as ResultOutput
from lbry.error import ResolveCensoredError from lbry.error import ResolveCensoredError
from lbry.blockchain.transaction import Output from lbry.blockchain.transaction import Output
from . import rows_to_txos
from ..query_context import context from ..query_context import context
from .search import search_claims from .search import select_claims
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]): def resolve_claims(**constraints):
# censor = context().get_resolve_censor() censor = context().get_resolve_censor()
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows))) rows = context().fetchall(select_claims(**constraints))
return rows_to_txos(rows), censor
def _get_referenced_rows(txo_rows: List[Output], censor_channels: List[bytes]):
repost_hashes = set(txo.reposted_claim.claim_hash for txo in txo_rows if txo.reposted_claim)
channel_hashes = set(itertools.chain( channel_hashes = set(itertools.chain(
filter(None, map(itemgetter('channel_hash'), txo_rows)), (txo.channel.claim_hash for txo in txo_rows if txo.channel),
censor_channels censor_channels
)) ))
reposted_txos = [] reposted_txos = []
if repost_hashes: if repost_hashes:
reposted_txos = search_claims(**{'claim.claim_hash__in': repost_hashes}) reposted_txos = resolve_claims(**{'claim.claim_hash__in': repost_hashes})
channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos))) if reposted_txos:
reposted_txos = reposted_txos[0]
channel_hashes |= set(txo.channel.claim_hash for txo in reposted_txos if txo.channel)
channel_txos = [] channel_txos = []
if channel_hashes: if channel_hashes:
channel_txos = search_claims(**{'claim.claim_hash__in': channel_hashes}) channel_txos = resolve_claims(**{'claim.claim_hash__in': channel_hashes})
channel_txos = channel_txos[0] if channel_txos else []
# channels must come first for client side inflation to work properly # channels must come first for client side inflation to work properly
return channel_txos + reposted_txos return channel_txos + reposted_txos
def protobuf_resolve(urls, **kwargs) -> str: def protobuf_resolve(urls, **kwargs) -> str:
return ResultOutput.to_base64([resolve_url(raw_url) for raw_url in urls], []) txo_rows = [resolve_url(raw_url) for raw_url in urls]
extra_txo_rows = _get_referenced_rows(
[txo_row for txo_row in txo_rows if isinstance(txo_row, Output)],
[txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)]
)
return ResultOutput.to_base64(txo_rows, extra_txo_rows)
def resolve(urls, **kwargs) -> Dict[str, Output]: def resolve(urls, **kwargs) -> Dict[str, Output]:
return {url: resolve_url(url) for url in urls} return {url: resolve_url(url) for url in urls}
#txo_rows = [resolve_url(raw_url) for raw_url in urls]
#extra_txo_rows = _get_referenced_rows(
# [txo for txo in txo_rows if isinstance(txo, dict)],
# [txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)]
#)
#return txo_rows, extra_txo_rows
def resolve_url(raw_url): def resolve_url(raw_url):
censor = context().get_resolve_censor()
try: try:
url = URL.parse(raw_url) url = URL.parse(raw_url)
except ValueError as e: except ValueError as e:
@ -66,13 +71,12 @@ def resolve_url(raw_url):
q['is_controlling'] = True q['is_controlling'] = True
else: else:
q['order_by'] = ['^creation_height'] q['order_by'] = ['^creation_height']
#matches = search_claims(censor, **q, limit=1) matches, censor = resolve_claims(**q, limit=1)
matches = search_claims(**q, limit=1)[0]
if matches: if matches:
channel = matches[0] channel = matches[0]
elif censor.censored: elif censor.censored:
return ResolveCensoredError(raw_url, next(iter(censor.censored))) return ResolveCensoredError(raw_url, next(iter(censor.censored)))
else: elif not channel:
return LookupError(f'Could not find channel in "{raw_url}".') return LookupError(f'Could not find channel in "{raw_url}".')
if url.has_stream: if url.has_stream:
@ -83,10 +87,11 @@ def resolve_url(raw_url):
q['is_signature_valid'] = True q['is_signature_valid'] = True
elif set(q) == {'name'}: elif set(q) == {'name'}:
q['is_controlling'] = True q['is_controlling'] = True
# matches = search_claims(censor, **q, limit=1) matches, censor = resolve_claims(**q, limit=1)
matches = search_claims(**q, limit=1)[0]
if matches: if matches:
return matches[0] stream = matches[0]
stream.channel = channel
return stream
elif censor.censored: elif censor.censored:
return ResolveCensoredError(raw_url, next(iter(censor.censored))) return ResolveCensoredError(raw_url, next(iter(censor.censored)))
else: else:

View file

@ -32,6 +32,18 @@ BASE_SELECT_SUPPORT_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
] ]
def compat_layer(**constraints):
# for old sdk, to be removed later
replacements = {"effective_amount": "staked_amount"}
for old_key, new_key in replacements.items():
if old_key in constraints:
constraints[new_key] = constraints.pop(old_key)
order_by = constraints.get("order_by", [])
if old_key in order_by:
constraints["order_by"] = [order_key if order_key != old_key else new_key for order_key in order_by]
return constraints
def select_supports(cols: List = None, **constraints) -> Select: def select_supports(cols: List = None, **constraints) -> Select:
if cols is None: if cols is None:
cols = BASE_SELECT_SUPPORT_COLUMNS cols = BASE_SELECT_SUPPORT_COLUMNS
@ -63,6 +75,7 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Claim.c.activation_height, Claim.c.activation_height,
Claim.c.takeover_height, Claim.c.takeover_height,
Claim.c.creation_height, Claim.c.creation_height,
Claim.c.expiration_height,
Claim.c.is_controlling, Claim.c.is_controlling,
Claim.c.channel_hash, Claim.c.channel_hash,
Claim.c.reposted_count, Claim.c.reposted_count,
@ -82,6 +95,7 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
def select_claims(cols: List = None, for_count=False, **constraints) -> Select: def select_claims(cols: List = None, for_count=False, **constraints) -> Select:
constraints = compat_layer(**constraints)
if cols is None: if cols is None:
cols = BASE_SELECT_CLAIM_COLUMNS cols = BASE_SELECT_CLAIM_COLUMNS
if 'order_by' in constraints: if 'order_by' in constraints:
@ -123,7 +137,7 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select:
constraints['offset'] = int(constraints.pop('sequence')) - 1 constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1 constraints['limit'] = 1
if 'amount_order' in constraints: if 'amount_order' in constraints:
constraints['order_by'] = 'effective_amount DESC' constraints['order_by'] = 'staked_amount DESC'
constraints['offset'] = int(constraints.pop('amount_order')) - 1 constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1 constraints['limit'] = 1

View file

@ -193,6 +193,31 @@ def count_channels_with_changed_content(blocks: Optional[Tuple[int, int]]):
return context().fetchone(sql)['total'] return context().fetchone(sql)['total']
def where_changed_repost_txos(blocks: Optional[Tuple[int, int]]):
return (
(TXO.c.txo_type == TXO_TYPES['repost']) & (
between(TXO.c.height, blocks[0], blocks[-1]) |
between(TXO.c.spent_height, blocks[0], blocks[-1])
)
)
def where_claims_with_changed_reposts(blocks: Optional[Tuple[int, int]]):
return Claim.c.claim_hash.in_(
select(TXO.c.reposted_claim_hash).where(
where_changed_repost_txos(blocks)
)
)
def count_claims_with_changed_reposts(blocks: Optional[Tuple[int, int]]):
sql = (
select(func.count(distinct(TXO.c.reposted_claim_hash)).label('total'))
.where(where_changed_repost_txos(blocks))
)
return context().fetchone(sql)['total']
def select_transactions(cols, account_ids=None, **constraints): def select_transactions(cols, account_ids=None, **constraints):
s: Select = select(*cols).select_from(TX) s: Select = select(*cols).select_from(TX)
if not {'tx_hash', 'tx_hash__in'}.intersection(constraints): if not {'tx_hash', 'tx_hash__in'}.intersection(constraints):
@ -372,7 +397,7 @@ META_ATTRS = (
'activation_height', 'takeover_height', 'creation_height', 'staked_amount', 'activation_height', 'takeover_height', 'creation_height', 'staked_amount',
'short_url', 'canonical_url', 'staked_support_amount', 'staked_support_count', 'short_url', 'canonical_url', 'staked_support_amount', 'staked_support_count',
'signed_claim_count', 'signed_support_count', 'is_signature_valid', 'signed_claim_count', 'signed_support_count', 'is_signature_valid',
'reposted_count', 'reposted_count', 'expiration_height'
) )

View file

@ -476,6 +476,7 @@ class BulkLoader:
'channel_hash': None, 'channel_hash': None,
'signature': None, 'signature': None,
'signature_digest': None, 'signature_digest': None,
'reposted_claim_hash': None,
'public_key': None, 'public_key': None,
'public_key_hash': None 'public_key_hash': None
} }
@ -488,6 +489,8 @@ class BulkLoader:
row['public_key_hash'] = self.ledger.address_to_hash160( row['public_key_hash'] = self.ledger.address_to_hash160(
self.ledger.public_key_to_address(claim.channel.public_key_bytes) self.ledger.public_key_to_address(claim.channel.public_key_bytes)
) )
elif claim.is_repost:
row['reposted_claim_hash'] = claim.repost.reference.claim_hash
else: else:
row['txo_type'] = TXO_TYPES['stream'] row['txo_type'] = TXO_TYPES['stream']
elif txo.is_support: elif txo.is_support:
@ -511,7 +514,7 @@ class BulkLoader:
return row return row
def claim_to_rows( def claim_to_rows(
self, txo: Output, staked_support_amount: int, staked_support_count: int, self, txo: Output, staked_support_amount: int, staked_support_count: int, reposted_count: int,
signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None, signature: bytes = None, signature_digest: bytes = None, channel_public_key: bytes = None,
) -> Tuple[dict, List]: ) -> Tuple[dict, List]:
@ -540,6 +543,7 @@ class BulkLoader:
'fee_currency': None, 'fee_currency': None,
# reposts # reposts
'reposted_claim_hash': None, 'reposted_claim_hash': None,
'reposted_count': reposted_count,
# signed claims # signed claims
'channel_hash': None, 'channel_hash': None,
'is_signature_valid': None, 'is_signature_valid': None,

View file

@ -133,6 +133,9 @@ TXO = Table(
Column('signature', LargeBinary, nullable=True), Column('signature', LargeBinary, nullable=True),
Column('signature_digest', LargeBinary, nullable=True), Column('signature_digest', LargeBinary, nullable=True),
# reposts
Column('reposted_claim_hash', LargeBinary, nullable=True),
# channels # channels
Column('public_key', LargeBinary, nullable=True), Column('public_key', LargeBinary, nullable=True),
Column('public_key_hash', LargeBinary, nullable=True), Column('public_key_hash', LargeBinary, nullable=True),
@ -158,6 +161,13 @@ pg_add_txo_constraints_and_indexes = [
f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) " f"CREATE INDEX txo_spent_supports_by_height ON txo (spent_height DESC) "
f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};", f"INCLUDE (claim_hash) WHERE txo_type={TXO_TYPES['support']};",
# for finding claims which need repost totals re-calculated in a block range
f"CREATE INDEX txo_added_reposts_by_height ON txo (height DESC) "
f"INCLUDE (reposted_claim_hash) WHERE txo_type={TXO_TYPES['repost']};",
f"CREATE INDEX txo_spent_reposts_by_height ON txo (spent_height DESC) "
f"INCLUDE (reposted_claim_hash) WHERE txo_type={TXO_TYPES['repost']};",
"CREATE INDEX txo_reposted_claim_hash ON txo (reposted_claim_hash)"
"WHERE reposted_claim_hash IS NOT NULL AND spent_height = 0;",
"CREATE INDEX txo_height ON txo (height);", "CREATE INDEX txo_height ON txo (height);",
] ]
@ -209,7 +219,6 @@ Claim = Table(
Column('description', Text, nullable=True), Column('description', Text, nullable=True),
Column('claim_type', SmallInteger), Column('claim_type', SmallInteger),
Column('claim_reposted_count', Integer, server_default='0'),
Column('staked_support_count', Integer, server_default='0'), Column('staked_support_count', Integer, server_default='0'),
Column('staked_support_amount', BigInteger, server_default='0'), Column('staked_support_amount', BigInteger, server_default='0'),
@ -221,8 +230,8 @@ Claim = Table(
Column('duration', Integer, nullable=True), Column('duration', Integer, nullable=True),
# reposts # reposts
Column('reposted_claim_hash', LargeBinary, nullable=True), Column('reposted_claim_hash', LargeBinary, nullable=True), # on claim doing the repost
Column('reposted_count', Integer, server_default='0'), Column('reposted_count', Integer, server_default='0'), # on claim being reposted
# claims which are channels # claims which are channels
Column('signed_claim_count', Integer, server_default='0'), Column('signed_claim_count', Integer, server_default='0'),
@ -255,6 +264,8 @@ pg_add_claim_and_tag_constraints_and_indexes = [
# used to count()/sum() claims signed by channel # used to count()/sum() claims signed by channel
"CREATE INDEX signed_content ON claim (channel_hash) " "CREATE INDEX signed_content ON claim (channel_hash) "
"INCLUDE (amount) WHERE is_signature_valid;", "INCLUDE (amount) WHERE is_signature_valid;",
# used to count()/sum() reposted claims
"CREATE INDEX reposted_content ON claim (reposted_claim_hash);",
# basic tag indexes # basic tag indexes
"ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);", "ALTER TABLE tag ADD PRIMARY KEY (claim_hash, tag);",
"CREATE INDEX tags ON tag (tag) INCLUDE (claim_hash);", "CREATE INDEX tags ON tag (tag) INCLUDE (claim_hash);",

View file

@ -1,7 +1,6 @@
import base64 import base64
import struct import struct
from typing import List from typing import List
from binascii import hexlify
from itertools import chain from itertools import chain
from lbry.error import ResolveCensoredError from lbry.error import ResolveCensoredError
@ -16,10 +15,10 @@ BLOCKED = ErrorMessage.Code.Name(ErrorMessage.BLOCKED)
def set_reference(reference, claim_hash, rows): def set_reference(reference, claim_hash, rows):
if claim_hash: if claim_hash:
for txo in rows: for txo in rows:
if claim_hash == txo['claim_hash']: if claim_hash == txo.claim_hash:
reference.tx_hash = txo['txo_hash'][:32] reference.tx_hash = txo.tx_ref.hash
reference.nout = struct.unpack('<I', txo['txo_hash'][32:])[0] reference.nout = txo.position
reference.height = txo['height'] reference.height = txo.spent_height
return return
@ -193,11 +192,11 @@ class Outputs:
if txo.meta['canonical_url'] is not None: if txo.meta['canonical_url'] is not None:
txo_message.claim.canonical_url = txo.meta['canonical_url'] txo_message.claim.canonical_url = txo.meta['canonical_url']
txo_message.claim.is_controlling = bool(txo.meta['takeover_height']) txo_message.claim.is_controlling = bool(txo.meta['takeover_height'])
#if txo['last_take_over_height'] is not None: if txo_message.claim.is_controlling:
# txo_message.claim.take_over_height = txo['last_take_over_height'] txo_message.claim.take_over_height = txo.meta['takeover_height']
txo_message.claim.creation_height = txo.meta['creation_height'] txo_message.claim.creation_height = txo.meta['creation_height']
txo_message.claim.activation_height = txo.meta['activation_height'] txo_message.claim.activation_height = txo.meta['activation_height']
#txo_message.claim.expiration_height = txo['expiration_height'] txo_message.claim.expiration_height = txo.meta['expiration_height']
if txo.meta['signed_claim_count'] is not None: if txo.meta['signed_claim_count'] is not None:
txo_message.claim.claims_in_channel = txo.meta['signed_claim_count'] txo_message.claim.claims_in_channel = txo.meta['signed_claim_count']
txo_message.claim.effective_amount = txo.meta['staked_amount'] txo_message.claim.effective_amount = txo.meta['staked_amount']
@ -206,5 +205,6 @@ class Outputs:
#txo_message.claim.trending_mixed = txo['trending_mixed'] #txo_message.claim.trending_mixed = txo['trending_mixed']
#txo_message.claim.trending_local = txo['trending_local'] #txo_message.claim.trending_local = txo['trending_local']
#txo_message.claim.trending_global = txo['trending_global'] #txo_message.claim.trending_global = txo['trending_global']
#set_reference(txo_message.claim.channel, txo['channel_hash'], extra_txo_rows) if txo.channel:
set_reference(txo_message.claim.channel, txo.claim.signing_channel_hash, extra_txo_rows)
#set_reference(txo_message.claim.repost, txo['reposted_claim_hash'], extra_txo_rows) #set_reference(txo_message.claim.repost, txo['reposted_claim_hash'], extra_txo_rows)

View file

@ -137,12 +137,16 @@ class Daemon:
data = await request.json() data = await request.json()
params = data.get('params', {}) params = data.get('params', {})
method = getattr(self.api, data['method']) method = getattr(self.api, data['method'])
try:
result = await method(**params) result = await method(**params)
encoded_result = jsonrpc_dumps_pretty(result, service=self.service) encoded_result = jsonrpc_dumps_pretty(result, service=self.service)
return Response( return Response(
text=encoded_result, text=encoded_result,
content_type='application/json' content_type='application/json'
) )
except Exception as e:
log.exception("RPC error")
raise e
async def on_connect(self, request): async def on_connect(self, request):
web_socket = WebSocketManager() web_socket = WebSocketManager()

View file

@ -792,7 +792,7 @@ class EventGenerator:
yield from self.claims_main_start() yield from self.claims_main_start()
yield from self.claims_insert(self.claims) yield from self.claims_insert(self.claims)
if self.initial_sync: if self.initial_sync:
yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (9,), (1,)) yield from self.generate("blockchain.sync.claims.indexes", ("steps",), 0, None, (10,), (1,))
else: else:
yield from self.claims_takeovers(self.takeovers) yield from self.claims_takeovers(self.takeovers)
yield from self.claims_stakes() yield from self.claims_stakes()
@ -920,12 +920,12 @@ class EventGenerator:
def spends_steps(self): def spends_steps(self):
yield from self.generate( yield from self.generate(
"blockchain.sync.spends.main", ("steps",), 0, None, "blockchain.sync.spends.main", ("steps",), 0, None,
(17 if self.initial_sync else 5,), (20 if self.initial_sync else 5,),
(1,) (1,)
) )
def claims_init(self): def claims_init(self):
yield from self.generate("blockchain.sync.claims.init", ("steps",), 0, None, (4,), (1,)) yield from self.generate("blockchain.sync.claims.init", ("steps",), 0, None, (5,), (1,))
def claims_main_start(self): def claims_main_start(self):
total = ( total = (

View file

@ -9,7 +9,8 @@ from distutils.dir_util import copy_tree, remove_tree
from lbry import Config, Database, RegTestLedger, Transaction, Output, Input from lbry import Config, Database, RegTestLedger, Transaction, Output, Input
from lbry.crypto.base58 import Base58 from lbry.crypto.base58 import Base58
from lbry.schema.claim import Stream, Channel from lbry.schema.claim import Claim, Stream, Channel
from lbry.schema.result import Outputs
from lbry.schema.support import Support from lbry.schema.support import Support
from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError from lbry.error import LbrycrdEventSubscriptionError, LbrycrdUnauthorizedError
from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.lbrycrd import Lbrycrd
@ -114,9 +115,13 @@ class SyncingBlockchainTestCase(BasicBlockchainTestCase):
async def create_claim( async def create_claim(
self, title='', amount='0.01', name=None, author='', desc='', self, title='', amount='0.01', name=None, author='', desc='',
claim_id_startswith='', sign=None, is_channel=False) -> str: claim_id_startswith='', sign=None, is_channel=False, repost=None) -> str:
name = name or ('@foo' if is_channel else 'foo') name = name or ('@foo' if is_channel else 'foo')
if not claim_id_startswith and sign is None and not is_channel: if not claim_id_startswith and sign is None and not is_channel:
if repost:
claim = Claim()
claim.repost.reference.claim_id = repost
else:
claim = Stream().update(title=title, author=author, description=desc).claim claim = Stream().update(title=title, author=author, description=desc).claim
return await self.chain.claim_name( return await self.chain.claim_name(
name, hexlify(claim.to_bytes()).decode(), amount name, hexlify(claim.to_bytes()).decode(), amount
@ -400,11 +405,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
signed = await self.chain.sign_raw_transaction_with_wallet(funded['hex']) signed = await self.chain.sign_raw_transaction_with_wallet(funded['hex'])
await self.chain.send_raw_transaction(signed['hex']) await self.chain.send_raw_transaction(signed['hex'])
tx = Transaction(unhexlify(signed['hex'])) tx = Transaction(unhexlify(signed['hex']))
claim = None claim = self.find_claim_txo(tx)
for txo in tx.outputs:
if txo.is_claim:
claim = txo
break
support_tx = Transaction().add_outputs([ support_tx = Transaction().add_outputs([
Output.pay_support_pubkey_hash(CENT, claim.claim_name, claim.claim_id, address), Output.pay_support_pubkey_hash(CENT, claim.claim_name, claim.claim_id, address),
]) ])
@ -415,7 +416,7 @@ class TestMultiBlockFileSyncing(BasicBlockchainTestCase):
# supports \w data aren't supported until block 350, fast forward a little # supports \w data aren't supported until block 350, fast forward a little
await self.chain.generate(60) await self.chain.generate(60)
claim = tx.outputs[0] claim = self.find_claim_txo(tx)
tx = Transaction().add_outputs([ tx = Transaction().add_outputs([
Output.pay_support_pubkey_hash(CENT, claim.claim_name, claim.claim_id, address), Output.pay_support_pubkey_hash(CENT, claim.claim_name, claim.claim_id, address),
Output.pay_support_data_pubkey_hash( Output.pay_support_data_pubkey_hash(
@ -875,6 +876,22 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
self.assertEqual(0, r.meta['signed_claim_count']) # channel2 lost abandoned claim self.assertEqual(0, r.meta['signed_claim_count']) # channel2 lost abandoned claim
self.assertEqual(0, r.meta['signed_support_count']) self.assertEqual(0, r.meta['signed_support_count'])
async def test_reposts(self):
self.stream1 = await self.get_claim(await self.create_claim())
claim_id = self.stream1.claim_id
# in same block
self.stream2 = await self.get_claim(await self.create_claim(repost=claim_id))
await self.generate(1)
r, = await self.db.search_claims(claim_id=claim_id)
self.assertEqual(1, r.meta['reposted_count'])
# in subsequent block
self.stream3 = await self.get_claim(await self.create_claim(repost=claim_id))
await self.generate(1)
r, = await self.db.search_claims(claim_id=claim_id)
self.assertEqual(2, r.meta['reposted_count'])
async def resolve_to_claim_id(self, url): async def resolve_to_claim_id(self, url):
return (await self.db.resolve([url]))[url].claim_id return (await self.db.resolve([url]))[url].claim_id
@ -897,6 +914,73 @@ class TestGeneralBlockchainSync(SyncingBlockchainTestCase):
self.assertEqual(stream_c.claim_id, await self.resolve_to_claim_id("@foo#a/foo#c")) self.assertEqual(stream_c.claim_id, await self.resolve_to_claim_id("@foo#a/foo#c"))
self.assertEqual(stream_cd.claim_id, await self.resolve_to_claim_id("@foo#ab/foo#cd")) self.assertEqual(stream_cd.claim_id, await self.resolve_to_claim_id("@foo#ab/foo#cd"))
async def test_resolve_protobuf_includes_enough_information_for_signature_validation(self):
chan_ab = await self.get_claim(
await self.create_claim(claim_id_startswith='ab', is_channel=True))
await self.create_claim(claim_id_startswith='cd', sign=chan_ab)
await self.generate(1)
resolutions = await self.db.protobuf_resolve(["@foo#ab/foo#cd"])
resolutions = Outputs.from_base64(resolutions)
txs = await self.db.get_transactions(tx_hash__in=[tx[0] for tx in resolutions.txs])
self.assertEqual(len(txs), 2)
resolutions = resolutions.inflate(txs)
claim = resolutions[0][0]
self.assertTrue(claim.is_signed_by(claim.channel, self.chain.ledger))
async def test_resolve_not_found(self):
await self.get_claim(await self.create_claim(claim_id_startswith='ab', is_channel=True))
await self.generate(1)
resolutions = Outputs.from_base64(await self.db.protobuf_resolve(["@foo#ab/notfound"]))
self.assertEqual(resolutions.txos[0].error.text, "Could not find claim at \"@foo#ab/notfound\".")
resolutions = Outputs.from_base64(await self.db.protobuf_resolve(["@notfound#ab/notfound"]))
self.assertEqual(resolutions.txos[0].error.text, "Could not find channel in \"@notfound#ab/notfound\".")
async def test_claim_search_effective_amount(self):
claim = await self.get_claim(await self.create_claim(claim_id_startswith='ab', is_channel=True, amount='0.42'))
await self.generate(1)
results = await self.db.search_claims(staked_amount=42000000)
self.assertEqual(claim.claim_id, results[0].claim_id)
# compat layer
results = await self.db.search_claims(effective_amount=42000000, amount_order=1, order_by=["effective_amount"])
self.assertEqual(claim.claim_id, results[0].claim_id)
async def test_meta_fields_are_translated_to_protobuf(self):
chan_ab = await self.get_claim(
await self.create_claim(claim_id_startswith='ab', is_channel=True))
await self.create_claim(claim_id_startswith='cd', sign=chan_ab)
await self.generate(1)
resolutions = Outputs.from_base64(await self.db.protobuf_resolve(["@foo#ab/foo#cd"]))
claim = resolutions.txos[0].claim
self.assertEqual(claim.effective_amount, 1000000)
self.assertEqual(claim.expiration_height, 602)
self.assertEqual(claim.take_over_height, 102)
self.assertTrue(claim.is_controlling)
# takeover
await self.create_claim(claim_id_startswith='ad', sign=chan_ab, amount='1.1')
await self.generate(1)
resolutions = Outputs.from_base64(await self.db.protobuf_resolve(["@foo#ab/foo#cd"]))
claim = resolutions.txos[0].claim
self.assertEqual(claim.take_over_height, 0)
self.assertFalse(claim.is_controlling)
resolutions = Outputs.from_base64(await self.db.protobuf_resolve(["@foo#ab/foo#ad"]))
claim = resolutions.txos[0].claim
self.assertEqual(claim.take_over_height, 103)
self.assertTrue(claim.is_controlling)
async def test_uris_and_uppercase(self):
# fixme: this is a bug but its how the old SDK expects it (non-normalized URIs)
# to be decided if we are going to ignore it or how its used today
chan_ab = await self.get_claim(
await self.create_claim(claim_id_startswith='ab', is_channel=True, name="@Chá"))
await self.create_claim(claim_id_startswith='cd', sign=chan_ab, name="Hortelã")
await self.generate(1)
resolutions = Outputs.from_base64(await self.db.protobuf_resolve(["Hortelã"]))
self.assertEqual(1, len(resolutions.txos))
claim = resolutions.txos[0].claim
self.assertEqual("@Chá#a/Hortelã#c", claim.canonical_url)
self.assertEqual("Hortelã#c", claim.short_url)
class TestClaimtrieSync(SyncingBlockchainTestCase): class TestClaimtrieSync(SyncingBlockchainTestCase):

View file

@ -1,4 +1,5 @@
from unittest import TestCase from unittest import TestCase
from pickle import dumps
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from lbry import Config, Ledger, Transaction from lbry import Config, Ledger, Transaction
@ -253,3 +254,24 @@ class TestTransactionSerialization(TestCase):
txo = t.outputs[0] txo = t.outputs[0]
self.assertEqual(txo.script.template.name, 'pay_script_hash') self.assertEqual(txo.script.template.name, 'pay_script_hash')
self.assertEqual(txo.get_address(self.ledger), 'rVBhueRT9E8RPdVcpCdXV5gRiiXVjE6VD9') self.assertEqual(txo.get_address(self.ledger), 'rVBhueRT9E8RPdVcpCdXV5gRiiXVjE6VD9')
def test_tx_with_claim_can_pickle(self):
# used to fail with this error:
# _pickle.PicklingError: Can't pickle <class 'lbry.blockchain.util.PUSH_SINGLE'>:
# attribute lookup PUSH_SINGLE on lbry.blockchain.util failed
raw = unhexlify(
"01000000012433e1b327603843b083344dbae5306ff7927f87ebbc5ae9eb50856c5b53fd1d000000006a4"
"7304402201a91e1023d11c383a11e26bf8f9034087b15d8ada78fa565e0610455ffc8505e0220038a63a6"
"ecb399723d4f1f78a20ddec0a78bf8fb6c75e63e166ef780f3944fbf0121021810150a2e4b088ec51b20c"
"be1b335962b634545860733367824d5dc3eda767dffffffff028096980000000000fdff00b50463617473"
"4cdc080110011a7808011230080410011a084d616361726f6e6922002a003214416c6c207269676874732"
"072657365727665642e38004a0052005a001a42080110011a30add80aaf02559ba09853636a0658c42b72"
"7cb5bb4ba8acedb4b7fe656065a47a31878dbf9912135ddb9e13806cc1479d220a696d6167652f6a70656"
"72a5c080110031a404180cc0fa4d3839ee29cca866baed25fafb43fca1eb3b608ee889d351d3573d042c7"
"b83e2e643db0d8e062a04e6e9ae6b90540a2f95fe28638d0f18af4361a1c2214f73de93f4299fb32c32f9"
"49e02198a8e91101abd6d7576a914be16e4b0f9bd8f6d47d02b3a887049c36d3b84cb88ac0cd2520b0000"
"00001976a914f521178feb733a719964e1da4a9efb09dcc39cfa88ac00000000"
)
tx = Transaction(raw)
tx.outputs[0].script.values # triggers parsing, needed to reproduce pickle error
dumps(tx)