claim_search progress

This commit is contained in:
Lex Berezhny 2020-05-01 23:25:07 -04:00
parent 219c7cf37d
commit 2a0c653c37
10 changed files with 248 additions and 65 deletions

View file

@ -1,5 +1,8 @@
from .database import Database
from .constants import TXO_TYPES, CLAIM_TYPE_CODES, CLAIM_TYPE_NAMES
from .constants import (
TXO_TYPES, SPENDABLE_TYPE_CODES,
CLAIM_TYPE_CODES, CLAIM_TYPE_NAMES
)
from .tables import (
Table, Version, metadata,
AccountAddress, PubkeyAddress,

View file

@ -19,6 +19,11 @@ CLAIM_TYPE_CODES = [
TXO_TYPES[name] for name in CLAIM_TYPE_NAMES
]
SPENDABLE_TYPE_CODES = [
TXO_TYPES['other'],
TXO_TYPES['purchase']
]
STREAM_TYPES = {
'video': 1,
'audio': 2,

View file

@ -7,9 +7,10 @@ from functools import partial
from sqlalchemy import create_engine, text
from lbry.crypto.bip32 import PubKey
from lbry.schema.result import Censor
from lbry.blockchain.ledger import Ledger
from lbry.blockchain.transaction import Transaction, Output
from .constants import TXO_TYPES, CLAIM_TYPE_CODES
from .constants import TXO_TYPES
from . import queries as q
@ -182,7 +183,7 @@ class Database:
async def get_purchases(self, **constraints) -> Tuple[List[Output], Optional[int]]:
return await self.run_in_executor(q.get_purchases, **constraints)
async def search_claims(self, **constraints):
async def search_claims(self, **constraints) -> Tuple[List[Output], Optional[int], Censor]:
return await self.run_in_executor(q.search, **constraints)
async def get_txo_sum(self, **constraints):

View file

@ -15,6 +15,7 @@ from sqlalchemy.future import select
from lbry.schema.tags import clean_tags
from lbry.schema.result import Censor, Outputs
from lbry.schema.url import URL, normalize_name
from lbry.schema.mime_types import guess_stream_type
from lbry.error import ResolveCensoredError
from lbry.blockchain.ledger import Ledger
from lbry.blockchain.transaction import Transaction, Output, Input, OutputScript, TXRefImmutable
@ -141,6 +142,8 @@ class RowCollector:
self.txs = []
self.txos = []
self.txis = []
self.claims = []
self.tags = []
@staticmethod
def block_to_row(block):
@ -231,12 +234,95 @@ class RowCollector:
self.txos.append(self.txo_to_row(tx, txo))
return self
def add_claim(self, txo):
try:
assert txo.claim_name
assert txo.normalized_name
except:
#self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.")
return
tx = txo.tx_ref.tx
claim_hash = txo.claim_hash
claim_record = {
'claim_hash': claim_hash,
'claim_id': txo.claim_id,
'claim_name': txo.claim_name,
'normalized': txo.normalized_name,
'address': txo.get_address(self.ledger),
'txo_hash': txo.ref.hash,
'tx_position': tx.position,
'amount': txo.amount,
'timestamp': 0, # TODO: fix
'creation_timestamp': 0, # TODO: fix
'height': tx.height,
'creation_height': tx.height,
'release_time': None,
'title': None,
'author': None,
'description': None,
'claim_type': None,
# streams
'stream_type': None,
'media_type': None,
'fee_currency': None,
'fee_amount': 0,
'duration': None,
# reposts
'reposted_claim_hash': None,
# claims which are channels
'public_key_bytes': None,
'public_key_hash': None,
}
self.claims.append(claim_record)
try:
claim = txo.claim
except:
#self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.")
return
if claim.is_stream:
claim_record['claim_type'] = TXO_TYPES['stream']
claim_record['media_type'] = claim.stream.source.media_type
claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])]
claim_record['title'] = claim.stream.title
claim_record['description'] = claim.stream.description
claim_record['author'] = claim.stream.author
if claim.stream.video and claim.stream.video.duration:
claim_record['duration'] = claim.stream.video.duration
if claim.stream.audio and claim.stream.audio.duration:
claim_record['duration'] = claim.stream.audio.duration
if claim.stream.release_time:
claim_record['release_time'] = claim.stream.release_time
if claim.stream.has_fee:
fee = claim.stream.fee
if isinstance(fee.currency, str):
claim_record['fee_currency'] = fee.currency.lower()
if isinstance(fee.amount, Decimal):
claim_record['fee_amount'] = int(fee.amount*1000)
elif claim.is_repost:
claim_record['claim_type'] = TXO_TYPES['repost']
claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash
elif claim.is_channel:
claim_record['claim_type'] = TXO_TYPES['channel']
claim_record['public_key_bytes'] = txo.claim.channel.public_key_bytes
claim_record['public_key_hash'] = self.ledger.address_to_hash160(
self.ledger.public_key_to_address(txo.claim.channel.public_key_bytes)
)
for tag in clean_tags(claim.message.tags):
self.tags.append({'claim_hash': claim_hash, 'tag': tag})
return self
def save(self, progress: Callable = None):
queries = (
(Block.insert(), self.blocks),
(TX.insert(), self.txs),
(TXO.insert(), self.txos),
(TXI.insert(), self.txis),
(Claim.insert(), self.claims),
(Tag.insert(), self.tags),
)
total_rows = sum(len(query[1]) for query in queries)
inserted_rows = 0
@ -273,15 +359,10 @@ def process_claims_and_supports(block_range=None):
context.execute(sql)
context.execute(Claim.delete())
rows = RowCollector(ctx())
for claim in get_txos(txo_type__in=CLAIM_TYPE_CODES, is_spent=False)[0]:
context.execute(
Claim.insert(), {
'claim_hash': claim.claim_hash,
'claim_name': claim.claim_name,
'amount': claim.amount,
'txo_hash': claim.hash
}
)
rows.add_claim(claim)
rows.save()
def execute_fetchall(sql):
@ -805,9 +886,9 @@ def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
if column not in SEARCH_ORDER_FIELDS:
raise NameError(f'{column} is not a valid order_by field')
if column == 'name':
column = 'claim.claim_name'
column = 'claim_name'
sql_order_by.append(
f"{column} ASC" if is_asc else f"{column} DESC"
f"claim.{column} ASC" if is_asc else f"claim.{column} DESC"
)
constraints['order_by'] = sql_order_by
@ -823,45 +904,45 @@ def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
postfix, value = ops[value[0]], value[1:]
if constraint == 'fee_amount':
value = Decimal(value)*1000
constraints[f'claim.{constraint}{postfix}'] = int(value)
constraints[f'{constraint}{postfix}'] = int(value)
if constraints.pop('is_controlling', False):
if {'sequence', 'amount_order'}.isdisjoint(constraints):
for_count = False
constraints['claimtrie.claim_hash__is_not_null'] = ''
constraints['Claimtrie.claim_hash__is_not_null'] = ''
if 'sequence' in constraints:
constraints['order_by'] = 'claim.activation_height ASC'
constraints['order_by'] = 'activation_height ASC'
constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1
if 'amount_order' in constraints:
constraints['order_by'] = 'claim.effective_amount DESC'
constraints['order_by'] = 'effective_amount DESC'
constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1
if 'claim_id' in constraints:
claim_id = constraints.pop('claim_id')
if len(claim_id) == 40:
constraints['claim.claim_id'] = claim_id
constraints['claim_id'] = claim_id
else:
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
constraints['claim_id__like'] = f'{claim_id[:40]}%'
elif 'claim_ids' in constraints:
constraints['claim.claim_id__in'] = set(constraints.pop('claim_ids'))
constraints['claim_id__in'] = set(constraints.pop('claim_ids'))
if 'reposted_claim_id' in constraints:
constraints['claim.reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1]
constraints['reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1]
if 'name' in constraints:
constraints['claim_name'] = normalize_name(constraints.pop('name'))
if 'public_key_id' in constraints:
constraints['claim.public_key_hash'] = (
constraints['public_key_hash'] = (
ctx().ledger.address_to_hash160(constraints.pop('public_key_id')))
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = constraints.pop('channel_hash')
constraints['channel_hash'] = constraints.pop('channel_hash')
if 'channel_ids' in constraints:
channel_ids = constraints.pop('channel_ids')
if channel_ids:
constraints['claim.channel_hash__in'] = {
constraints['channel_hash__in'] = {
unhexlify(cid)[::-1] for cid in channel_ids
}
if 'not_channel_ids' in constraints:
@ -870,52 +951,52 @@ def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
not_channel_ids_binary = {
unhexlify(ncid)[::-1] for ncid in not_channel_ids
}
constraints['claim.claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary
constraints['claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
constraints['channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
'signature_valid__is_null': True,
'channel_hash__not_in': not_channel_ids_binary
}
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
constraints['claim.signature_valid'] = constraints.pop('signature_valid')
constraints['signature_valid'] = constraints.pop('signature_valid')
else:
constraints['null_or_signature__or'] = {
'claim.signature_valid__is_null': True,
'claim.signature_valid': constraints.pop('signature_valid')
'signature_valid__is_null': True,
'signature_valid': constraints.pop('signature_valid')
}
elif constraints.pop('has_channel_signature', False):
constraints['claim.signature_valid__is_not_null'] = True
constraints['signature_valid__is_not_null'] = True
if 'txid' in constraints:
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
nout = constraints.pop('nout', 0)
constraints['claim.txo_hash'] = tx_hash + struct.pack('<I', nout)
constraints['txo_hash'] = tx_hash + struct.pack('<I', nout)
if 'claim_type' in constraints:
claim_types = constraints.pop('claim_type')
if isinstance(claim_types, str):
claim_types = [claim_types]
if claim_types:
constraints['claim.claim_type__in'] = {
constraints['claim_type__in'] = {
CLAIM_TYPES[claim_type] for claim_type in claim_types
}
if 'stream_types' in constraints:
stream_types = constraints.pop('stream_types')
if stream_types:
constraints['claim.stream_type__in'] = {
constraints['stream_type__in'] = {
STREAM_TYPES[stream_type] for stream_type in stream_types
}
if 'media_types' in constraints:
media_types = constraints.pop('media_types')
if media_types:
constraints['claim.media_type__in'] = set(media_types)
constraints['media_type__in'] = set(media_types)
if 'fee_currency' in constraints:
constraints['claim.fee_currency'] = constraints.pop('fee_currency').lower()
constraints['fee_currency'] = constraints.pop('fee_currency').lower()
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
@ -926,8 +1007,8 @@ def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
constraints["search"] = constraints.pop("text")
return query(
[Claim, TXO, TX],
select(*cols).select_from(Claim.join(Claimtrie).join(TXO).join(TX)),
[Claim, Claimtrie],
select(*cols).select_from(Claim.join(Claimtrie, isouter=True).join(TXO).join(TX)),
**constraints
)
@ -961,6 +1042,13 @@ def search_claims(censor: Censor, **constraints) -> List:
censor, [
Claimtrie.c.claim_hash.label('is_controlling'),
Claimtrie.c.last_take_over_height,
TX.c.raw,
TX.c.height,
TX.c.tx_hash,
TXO.c.script_offset,
TXO.c.script_length,
TXO.c.amount,
TXO.c.position.label('txo_position'),
Claim.c.claim_hash,
Claim.c.txo_hash,
# Claim.c.claims_in_channel,
@ -1009,7 +1097,7 @@ def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]):
return channel_txos + reposted_txos
def search(**constraints) -> Tuple[List, List, int, int, Censor]:
def old_search(**constraints) -> Tuple[List, List, int, int, Censor]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None
@ -1024,6 +1112,30 @@ def search(**constraints) -> Tuple[List, List, int, int, Censor]:
return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor
def search(**constraints) -> Tuple[List, int, Censor]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None
if not constraints.pop('no_totals', False):
total = count_claims(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
context = ctx()
search_censor = context.get_search_censor()
txos = []
for row in search_claims(search_censor, **constraints):
source = row['raw'][row['script_offset']:row['script_offset']+row['script_length']]
txo = Output(
amount=row['amount'],
script=OutputScript(source),
tx_ref=TXRefImmutable.from_hash(row['tx_hash'], row['height']),
position=row['txo_position']
)
txos.append(txo)
#extra_txo_rows = _get_referenced_rows(txo_rows, search_censor.censored.keys())
return txos, total, search_censor
def resolve(urls) -> Tuple[List, List]:
txo_rows = [resolve_url(raw_url) for raw_url in urls]
extra_txo_rows = _get_referenced_rows(

View file

@ -29,11 +29,11 @@ AccountAddress = Table(
'account_address', metadata,
Column('account', Text, primary_key=True),
Column('address', Text, ForeignKey(PubkeyAddress.columns.address), primary_key=True),
Column('chain', Integer),
Column('chain', SmallInteger),
Column('pubkey', LargeBinary),
Column('chain_code', LargeBinary),
Column('n', Integer),
Column('depth', Integer),
Column('depth', SmallInteger),
)
@ -66,12 +66,12 @@ TXO = Table(
Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)),
Column('txo_hash', LargeBinary, primary_key=True),
Column('address', Text),
Column('position', Integer),
Column('position', SmallInteger),
Column('amount', BigInteger),
Column('script_offset', BigInteger),
Column('script_length', BigInteger),
Column('script_offset', Integer),
Column('script_length', Integer),
Column('is_reserved', Boolean, server_default='0'),
Column('txo_type', Integer, server_default='0'),
Column('txo_type', SmallInteger, server_default='0'),
Column('claim_id', Text, nullable=True),
Column('claim_hash', LargeBinary, nullable=True),
Column('claim_name', Text, nullable=True),
@ -85,7 +85,7 @@ TXI = Table(
Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)),
Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash), primary_key=True),
Column('address', Text, nullable=True),
Column('position', Integer),
Column('position', SmallInteger),
)
txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address)
@ -94,14 +94,61 @@ txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddres
Claim = Table(
'claim', metadata,
Column('claim_hash', LargeBinary, primary_key=True),
Column('claim_id', Text),
Column('claim_name', Text),
Column('normalized', Text),
Column('address', Text),
Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash)),
Column('tx_position', SmallInteger),
Column('amount', BigInteger),
Column('channel_hash', LargeBinary, nullable=True),
Column('effective_amount', BigInteger, server_default='0'),
Column('timestamp', Integer), # last updated timestamp
Column('creation_timestamp', Integer),
Column('height', Integer), # last updated height
Column('creation_height', Integer),
Column('activation_height', Integer, nullable=True),
Column('expiration_height', Integer),
Column('release_time', Integer, nullable=True),
# normalized#shortest-unique-claim_id
Column('short_url', Text),
# channel's-short_url/normalized#shortest-unique-claim_id-within-channel
Column('canonical_url', Text, nullable=True),
Column('title', Text, nullable=True),
Column('author', Text, nullable=True),
Column('description', Text, nullable=True),
Column('claim_type', SmallInteger),
Column('reposted', Integer, server_default='0'),
# streams
Column('stream_type', Text, nullable=True),
Column('media_type', Text, nullable=True),
Column('fee_amount', BigInteger, server_default='0'),
Column('fee_currency', Text, nullable=True),
Column('duration', Integer, nullable=True),
# reposts
Column('reposted_claim_hash', LargeBinary, nullable=True),
Column('activation_height', Integer, server_default='0'),
Column('expiration_height', Integer, server_default='0'),
# claims which are channels
Column('public_key_bytes', LargeBinary, nullable=True),
Column('public_key_hash', LargeBinary, nullable=True),
Column('claims_in_channel', Integer, server_default='0'),
# claims which are inside channels
Column('channel_hash', LargeBinary, nullable=True),
Column('channel_join', Integer, nullable=True), # height at which claim got valid signature / joined channel
Column('signature', LargeBinary, nullable=True),
Column('signature_digest', LargeBinary, nullable=True),
Column('signature_valid', Boolean, nullable=True),
Column('effective_amount', BigInteger, server_default='0'),
Column('support_amount', BigInteger, server_default='0'),
Column('trending_group', BigInteger, server_default='0'),
Column('trending_mixed', BigInteger, server_default='0'),
Column('trending_local', BigInteger, server_default='0'),
Column('trending_global', BigInteger, server_default='0'),
)

View file

@ -119,11 +119,23 @@ def constraints_to_clause(tables, constraints):
else:
col, op = key, '__eq__'
attr = None
for table in tables:
attr = getattr(table.c, col, None)
if attr is not None:
clause.append(getattr(attr, op)(constraint))
break
if '.' in col:
table_name, col = col.split('.')
_table = None
for table in tables:
if table.name == table_name.lower():
_table = table
break
if _table is not None:
attr = getattr(_table.c, col)
else:
raise ValueError(f"Table '{table_name}' not available: {', '.join([t.name for t in tables])}.")
else:
for table in tables:
attr = getattr(table.c, col, None)
if attr is not None:
break
if attr is None:
raise ValueError(f"Attribute '{col}' not found on tables: {', '.join([t.name for t in tables])}.")
clause.append(getattr(attr, op)(constraint))
return and_(*clause)

View file

@ -1603,10 +1603,10 @@ class API:
kwargs['signature_valid'] = 0
page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
txos, blocked, _, total = await self.service.search_claims(wallet.accounts, **kwargs)
txos, total, censored = await self.service.search_claims(wallet.accounts, **kwargs)
result = {
"items": txos,
"blocked": blocked,
#"blocked": censored,
"page": page_num,
"page_size": page_size
}

View file

@ -2,10 +2,11 @@ import os
import asyncio
import logging
from datetime import datetime
from typing import Iterable, List, Optional, NamedTuple
from typing import Iterable, List, Optional, Tuple, NamedTuple
from lbry.db import Database
from lbry.db.constants import TXO_TYPES
from lbry.schema.result import Censor
from lbry.blockchain.dewies import dewies_to_lbc
from lbry.blockchain.transaction import Transaction, Output
from lbry.blockchain.ledger import Ledger
@ -198,8 +199,7 @@ class Service:
async def resolve(self, accounts, urls, **kwargs):
raise NotImplementedError
async def search_claims(
self, accounts, include_purchase_receipt=False, include_is_my_output=False, **kwargs):
async def search_claims(self, accounts, **kwargs) -> Tuple[List[Output], Optional[int], Censor]:
raise NotImplementedError
async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output:

View file

@ -43,8 +43,7 @@ class FullNode(Service):
}
async def search_claims(self, accounts, **kwargs):
txo_rows, extra_txo_rows, offset, total, search_censor = await self.db.search_claims(**kwargs)
return txo_rows, None, None, total
return await self.db.search_claims(**kwargs)
async def get_transaction_address_filters(self, block_hash):
return {

View file

@ -11,7 +11,7 @@ from hashlib import sha256
from operator import attrgetter
from decimal import Decimal
from lbry.db import Database
from lbry.db import Database, SPENDABLE_TYPE_CODES
from lbry.blockchain.ledger import Ledger
from lbry.constants import COIN, NULL_HASH32
from lbry.blockchain.transaction import Transaction, Input, Output
@ -294,7 +294,11 @@ class Wallet:
async def get_effective_amount_estimators(self, funding_accounts: Iterable[Account]):
estimators = []
for utxo in (await self.db.get_utxos(accounts=funding_accounts))[0]:
utxos = await self.db.get_utxos(
accounts=funding_accounts,
txo_type__in=SPENDABLE_TYPE_CODES
)
for utxo in utxos[0]:
estimators.append(OutputEffectiveAmountEstimator(self.ledger, utxo))
return estimators