From 2a0c653c37f81c8b6835708d774d6011f55376ad Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Fri, 1 May 2020 23:25:07 -0400 Subject: [PATCH] claim_search progress --- lbry/db/__init__.py | 5 +- lbry/db/constants.py | 5 + lbry/db/database.py | 5 +- lbry/db/queries.py | 186 ++++++++++++++++++++++++++++++-------- lbry/db/tables.py | 69 +++++++++++--- lbry/db/utils.py | 22 ++++- lbry/service/api.py | 4 +- lbry/service/base.py | 6 +- lbry/service/full_node.py | 3 +- lbry/wallet/wallet.py | 8 +- 10 files changed, 248 insertions(+), 65 deletions(-) diff --git a/lbry/db/__init__.py b/lbry/db/__init__.py index d95063beb..c93a317ee 100644 --- a/lbry/db/__init__.py +++ b/lbry/db/__init__.py @@ -1,5 +1,8 @@ from .database import Database -from .constants import TXO_TYPES, CLAIM_TYPE_CODES, CLAIM_TYPE_NAMES +from .constants import ( + TXO_TYPES, SPENDABLE_TYPE_CODES, + CLAIM_TYPE_CODES, CLAIM_TYPE_NAMES +) from .tables import ( Table, Version, metadata, AccountAddress, PubkeyAddress, diff --git a/lbry/db/constants.py b/lbry/db/constants.py index 909bfb458..8f04f2ec7 100644 --- a/lbry/db/constants.py +++ b/lbry/db/constants.py @@ -19,6 +19,11 @@ CLAIM_TYPE_CODES = [ TXO_TYPES[name] for name in CLAIM_TYPE_NAMES ] +SPENDABLE_TYPE_CODES = [ + TXO_TYPES['other'], + TXO_TYPES['purchase'] +] + STREAM_TYPES = { 'video': 1, 'audio': 2, diff --git a/lbry/db/database.py b/lbry/db/database.py index 5d199c3a6..97a89c6c4 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -7,9 +7,10 @@ from functools import partial from sqlalchemy import create_engine, text from lbry.crypto.bip32 import PubKey +from lbry.schema.result import Censor from lbry.blockchain.ledger import Ledger from lbry.blockchain.transaction import Transaction, Output -from .constants import TXO_TYPES, CLAIM_TYPE_CODES +from .constants import TXO_TYPES from . import queries as q @@ -182,7 +183,7 @@ class Database: async def get_purchases(self, **constraints) -> Tuple[List[Output], Optional[int]]: return await self.run_in_executor(q.get_purchases, **constraints) - async def search_claims(self, **constraints): + async def search_claims(self, **constraints) -> Tuple[List[Output], Optional[int], Censor]: return await self.run_in_executor(q.search, **constraints) async def get_txo_sum(self, **constraints): diff --git a/lbry/db/queries.py b/lbry/db/queries.py index d6dcf8407..81b2beda5 100644 --- a/lbry/db/queries.py +++ b/lbry/db/queries.py @@ -15,6 +15,7 @@ from sqlalchemy.future import select from lbry.schema.tags import clean_tags from lbry.schema.result import Censor, Outputs from lbry.schema.url import URL, normalize_name +from lbry.schema.mime_types import guess_stream_type from lbry.error import ResolveCensoredError from lbry.blockchain.ledger import Ledger from lbry.blockchain.transaction import Transaction, Output, Input, OutputScript, TXRefImmutable @@ -141,6 +142,8 @@ class RowCollector: self.txs = [] self.txos = [] self.txis = [] + self.claims = [] + self.tags = [] @staticmethod def block_to_row(block): @@ -231,12 +234,95 @@ class RowCollector: self.txos.append(self.txo_to_row(tx, txo)) return self + def add_claim(self, txo): + try: + assert txo.claim_name + assert txo.normalized_name + except: + #self.logger.exception(f"Could not decode claim name for {tx.id}:{txo.position}.") + return + tx = txo.tx_ref.tx + claim_hash = txo.claim_hash + claim_record = { + 'claim_hash': claim_hash, + 'claim_id': txo.claim_id, + 'claim_name': txo.claim_name, + 'normalized': txo.normalized_name, + 'address': txo.get_address(self.ledger), + 'txo_hash': txo.ref.hash, + 'tx_position': tx.position, + 'amount': txo.amount, + 'timestamp': 0, # TODO: fix + 'creation_timestamp': 0, # TODO: fix + 'height': tx.height, + 'creation_height': tx.height, + 'release_time': None, + 'title': None, + 'author': None, + 'description': None, + 'claim_type': None, + # streams + 'stream_type': None, + 'media_type': None, + 'fee_currency': None, + 'fee_amount': 0, + 'duration': None, + # reposts + 'reposted_claim_hash': None, + # claims which are channels + 'public_key_bytes': None, + 'public_key_hash': None, + } + self.claims.append(claim_record) + + try: + claim = txo.claim + except: + #self.logger.exception(f"Could not parse claim protobuf for {tx.id}:{txo.position}.") + return + + if claim.is_stream: + claim_record['claim_type'] = TXO_TYPES['stream'] + claim_record['media_type'] = claim.stream.source.media_type + claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])] + claim_record['title'] = claim.stream.title + claim_record['description'] = claim.stream.description + claim_record['author'] = claim.stream.author + if claim.stream.video and claim.stream.video.duration: + claim_record['duration'] = claim.stream.video.duration + if claim.stream.audio and claim.stream.audio.duration: + claim_record['duration'] = claim.stream.audio.duration + if claim.stream.release_time: + claim_record['release_time'] = claim.stream.release_time + if claim.stream.has_fee: + fee = claim.stream.fee + if isinstance(fee.currency, str): + claim_record['fee_currency'] = fee.currency.lower() + if isinstance(fee.amount, Decimal): + claim_record['fee_amount'] = int(fee.amount*1000) + elif claim.is_repost: + claim_record['claim_type'] = TXO_TYPES['repost'] + claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash + elif claim.is_channel: + claim_record['claim_type'] = TXO_TYPES['channel'] + claim_record['public_key_bytes'] = txo.claim.channel.public_key_bytes + claim_record['public_key_hash'] = self.ledger.address_to_hash160( + self.ledger.public_key_to_address(txo.claim.channel.public_key_bytes) + ) + + for tag in clean_tags(claim.message.tags): + self.tags.append({'claim_hash': claim_hash, 'tag': tag}) + + return self + def save(self, progress: Callable = None): queries = ( (Block.insert(), self.blocks), (TX.insert(), self.txs), (TXO.insert(), self.txos), (TXI.insert(), self.txis), + (Claim.insert(), self.claims), + (Tag.insert(), self.tags), ) total_rows = sum(len(query[1]) for query in queries) inserted_rows = 0 @@ -273,15 +359,10 @@ def process_claims_and_supports(block_range=None): context.execute(sql) context.execute(Claim.delete()) + rows = RowCollector(ctx()) for claim in get_txos(txo_type__in=CLAIM_TYPE_CODES, is_spent=False)[0]: - context.execute( - Claim.insert(), { - 'claim_hash': claim.claim_hash, - 'claim_name': claim.claim_name, - 'amount': claim.amount, - 'txo_hash': claim.hash - } - ) + rows.add_claim(claim) + rows.save() def execute_fetchall(sql): @@ -805,9 +886,9 @@ def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]: if column not in SEARCH_ORDER_FIELDS: raise NameError(f'{column} is not a valid order_by field') if column == 'name': - column = 'claim.claim_name' + column = 'claim_name' sql_order_by.append( - f"{column} ASC" if is_asc else f"{column} DESC" + f"claim.{column} ASC" if is_asc else f"claim.{column} DESC" ) constraints['order_by'] = sql_order_by @@ -823,45 +904,45 @@ def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]: postfix, value = ops[value[0]], value[1:] if constraint == 'fee_amount': value = Decimal(value)*1000 - constraints[f'claim.{constraint}{postfix}'] = int(value) + constraints[f'{constraint}{postfix}'] = int(value) if constraints.pop('is_controlling', False): if {'sequence', 'amount_order'}.isdisjoint(constraints): for_count = False - constraints['claimtrie.claim_hash__is_not_null'] = '' + constraints['Claimtrie.claim_hash__is_not_null'] = '' if 'sequence' in constraints: - constraints['order_by'] = 'claim.activation_height ASC' + constraints['order_by'] = 'activation_height ASC' constraints['offset'] = int(constraints.pop('sequence')) - 1 constraints['limit'] = 1 if 'amount_order' in constraints: - constraints['order_by'] = 'claim.effective_amount DESC' + constraints['order_by'] = 'effective_amount DESC' constraints['offset'] = int(constraints.pop('amount_order')) - 1 constraints['limit'] = 1 if 'claim_id' in constraints: claim_id = constraints.pop('claim_id') if len(claim_id) == 40: - constraints['claim.claim_id'] = claim_id + constraints['claim_id'] = claim_id else: - constraints['claim.claim_id__like'] = f'{claim_id[:40]}%' + constraints['claim_id__like'] = f'{claim_id[:40]}%' elif 'claim_ids' in constraints: - constraints['claim.claim_id__in'] = set(constraints.pop('claim_ids')) + constraints['claim_id__in'] = set(constraints.pop('claim_ids')) if 'reposted_claim_id' in constraints: - constraints['claim.reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1] + constraints['reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1] if 'name' in constraints: constraints['claim_name'] = normalize_name(constraints.pop('name')) if 'public_key_id' in constraints: - constraints['claim.public_key_hash'] = ( + constraints['public_key_hash'] = ( ctx().ledger.address_to_hash160(constraints.pop('public_key_id'))) if 'channel_hash' in constraints: - constraints['claim.channel_hash'] = constraints.pop('channel_hash') + constraints['channel_hash'] = constraints.pop('channel_hash') if 'channel_ids' in constraints: channel_ids = constraints.pop('channel_ids') if channel_ids: - constraints['claim.channel_hash__in'] = { + constraints['channel_hash__in'] = { unhexlify(cid)[::-1] for cid in channel_ids } if 'not_channel_ids' in constraints: @@ -870,52 +951,52 @@ def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]: not_channel_ids_binary = { unhexlify(ncid)[::-1] for ncid in not_channel_ids } - constraints['claim.claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary + constraints['claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary if constraints.get('has_channel_signature', False): - constraints['claim.channel_hash__not_in'] = not_channel_ids_binary + constraints['channel_hash__not_in'] = not_channel_ids_binary else: constraints['null_or_not_channel__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.channel_hash__not_in': not_channel_ids_binary + 'signature_valid__is_null': True, + 'channel_hash__not_in': not_channel_ids_binary } if 'signature_valid' in constraints: has_channel_signature = constraints.pop('has_channel_signature', False) if has_channel_signature: - constraints['claim.signature_valid'] = constraints.pop('signature_valid') + constraints['signature_valid'] = constraints.pop('signature_valid') else: constraints['null_or_signature__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.signature_valid': constraints.pop('signature_valid') + 'signature_valid__is_null': True, + 'signature_valid': constraints.pop('signature_valid') } elif constraints.pop('has_channel_signature', False): - constraints['claim.signature_valid__is_not_null'] = True + constraints['signature_valid__is_not_null'] = True if 'txid' in constraints: tx_hash = unhexlify(constraints.pop('txid'))[::-1] nout = constraints.pop('nout', 0) - constraints['claim.txo_hash'] = tx_hash + struct.pack(' Tuple[str, Dict]: constraints["search"] = constraints.pop("text") return query( - [Claim, TXO, TX], - select(*cols).select_from(Claim.join(Claimtrie).join(TXO).join(TX)), + [Claim, Claimtrie], + select(*cols).select_from(Claim.join(Claimtrie, isouter=True).join(TXO).join(TX)), **constraints ) @@ -961,6 +1042,13 @@ def search_claims(censor: Censor, **constraints) -> List: censor, [ Claimtrie.c.claim_hash.label('is_controlling'), Claimtrie.c.last_take_over_height, + TX.c.raw, + TX.c.height, + TX.c.tx_hash, + TXO.c.script_offset, + TXO.c.script_length, + TXO.c.amount, + TXO.c.position.label('txo_position'), Claim.c.claim_hash, Claim.c.txo_hash, # Claim.c.claims_in_channel, @@ -1009,7 +1097,7 @@ def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]): return channel_txos + reposted_txos -def search(**constraints) -> Tuple[List, List, int, int, Censor]: +def old_search(**constraints) -> Tuple[List, List, int, int, Censor]: assert set(constraints).issubset(SEARCH_PARAMS), \ f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" total = None @@ -1024,6 +1112,30 @@ def search(**constraints) -> Tuple[List, List, int, int, Censor]: return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor +def search(**constraints) -> Tuple[List, int, Censor]: + assert set(constraints).issubset(SEARCH_PARAMS), \ + f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" + total = None + if not constraints.pop('no_totals', False): + total = count_claims(**constraints) + constraints['offset'] = abs(constraints.get('offset', 0)) + constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) + context = ctx() + search_censor = context.get_search_censor() + txos = [] + for row in search_claims(search_censor, **constraints): + source = row['raw'][row['script_offset']:row['script_offset']+row['script_length']] + txo = Output( + amount=row['amount'], + script=OutputScript(source), + tx_ref=TXRefImmutable.from_hash(row['tx_hash'], row['height']), + position=row['txo_position'] + ) + txos.append(txo) + #extra_txo_rows = _get_referenced_rows(txo_rows, search_censor.censored.keys()) + return txos, total, search_censor + + def resolve(urls) -> Tuple[List, List]: txo_rows = [resolve_url(raw_url) for raw_url in urls] extra_txo_rows = _get_referenced_rows( diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 845475e41..8548b5dbd 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -29,11 +29,11 @@ AccountAddress = Table( 'account_address', metadata, Column('account', Text, primary_key=True), Column('address', Text, ForeignKey(PubkeyAddress.columns.address), primary_key=True), - Column('chain', Integer), + Column('chain', SmallInteger), Column('pubkey', LargeBinary), Column('chain_code', LargeBinary), Column('n', Integer), - Column('depth', Integer), + Column('depth', SmallInteger), ) @@ -66,12 +66,12 @@ TXO = Table( Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)), Column('txo_hash', LargeBinary, primary_key=True), Column('address', Text), - Column('position', Integer), + Column('position', SmallInteger), Column('amount', BigInteger), - Column('script_offset', BigInteger), - Column('script_length', BigInteger), + Column('script_offset', Integer), + Column('script_length', Integer), Column('is_reserved', Boolean, server_default='0'), - Column('txo_type', Integer, server_default='0'), + Column('txo_type', SmallInteger, server_default='0'), Column('claim_id', Text, nullable=True), Column('claim_hash', LargeBinary, nullable=True), Column('claim_name', Text, nullable=True), @@ -85,7 +85,7 @@ TXI = Table( Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)), Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash), primary_key=True), Column('address', Text, nullable=True), - Column('position', Integer), + Column('position', SmallInteger), ) txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address) @@ -94,14 +94,61 @@ txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddres Claim = Table( 'claim', metadata, Column('claim_hash', LargeBinary, primary_key=True), + Column('claim_id', Text), Column('claim_name', Text), + Column('normalized', Text), + Column('address', Text), Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash)), + Column('tx_position', SmallInteger), Column('amount', BigInteger), - Column('channel_hash', LargeBinary, nullable=True), - Column('effective_amount', BigInteger, server_default='0'), + Column('timestamp', Integer), # last updated timestamp + Column('creation_timestamp', Integer), + Column('height', Integer), # last updated height + Column('creation_height', Integer), + Column('activation_height', Integer, nullable=True), + Column('expiration_height', Integer), + Column('release_time', Integer, nullable=True), + + # normalized#shortest-unique-claim_id + Column('short_url', Text), + # channel's-short_url/normalized#shortest-unique-claim_id-within-channel + Column('canonical_url', Text, nullable=True), + + Column('title', Text, nullable=True), + Column('author', Text, nullable=True), + Column('description', Text, nullable=True), + + Column('claim_type', SmallInteger), + Column('reposted', Integer, server_default='0'), + + # streams + Column('stream_type', Text, nullable=True), + Column('media_type', Text, nullable=True), + Column('fee_amount', BigInteger, server_default='0'), + Column('fee_currency', Text, nullable=True), + Column('duration', Integer, nullable=True), + + # reposts Column('reposted_claim_hash', LargeBinary, nullable=True), - Column('activation_height', Integer, server_default='0'), - Column('expiration_height', Integer, server_default='0'), + + # claims which are channels + Column('public_key_bytes', LargeBinary, nullable=True), + Column('public_key_hash', LargeBinary, nullable=True), + Column('claims_in_channel', Integer, server_default='0'), + + # claims which are inside channels + Column('channel_hash', LargeBinary, nullable=True), + Column('channel_join', Integer, nullable=True), # height at which claim got valid signature / joined channel + Column('signature', LargeBinary, nullable=True), + Column('signature_digest', LargeBinary, nullable=True), + Column('signature_valid', Boolean, nullable=True), + + Column('effective_amount', BigInteger, server_default='0'), + Column('support_amount', BigInteger, server_default='0'), + Column('trending_group', BigInteger, server_default='0'), + Column('trending_mixed', BigInteger, server_default='0'), + Column('trending_local', BigInteger, server_default='0'), + Column('trending_global', BigInteger, server_default='0'), ) diff --git a/lbry/db/utils.py b/lbry/db/utils.py index a9cfff6b8..41e3ff9da 100644 --- a/lbry/db/utils.py +++ b/lbry/db/utils.py @@ -119,11 +119,23 @@ def constraints_to_clause(tables, constraints): else: col, op = key, '__eq__' attr = None - for table in tables: - attr = getattr(table.c, col, None) - if attr is not None: - clause.append(getattr(attr, op)(constraint)) - break + if '.' in col: + table_name, col = col.split('.') + _table = None + for table in tables: + if table.name == table_name.lower(): + _table = table + break + if _table is not None: + attr = getattr(_table.c, col) + else: + raise ValueError(f"Table '{table_name}' not available: {', '.join([t.name for t in tables])}.") + else: + for table in tables: + attr = getattr(table.c, col, None) + if attr is not None: + break if attr is None: raise ValueError(f"Attribute '{col}' not found on tables: {', '.join([t.name for t in tables])}.") + clause.append(getattr(attr, op)(constraint)) return and_(*clause) diff --git a/lbry/service/api.py b/lbry/service/api.py index 0294dae63..65e5a08a0 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -1603,10 +1603,10 @@ class API: kwargs['signature_valid'] = 0 page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50) kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size}) - txos, blocked, _, total = await self.service.search_claims(wallet.accounts, **kwargs) + txos, total, censored = await self.service.search_claims(wallet.accounts, **kwargs) result = { "items": txos, - "blocked": blocked, + #"blocked": censored, "page": page_num, "page_size": page_size } diff --git a/lbry/service/base.py b/lbry/service/base.py index c2e7a2e3f..81288a92d 100644 --- a/lbry/service/base.py +++ b/lbry/service/base.py @@ -2,10 +2,11 @@ import os import asyncio import logging from datetime import datetime -from typing import Iterable, List, Optional, NamedTuple +from typing import Iterable, List, Optional, Tuple, NamedTuple from lbry.db import Database from lbry.db.constants import TXO_TYPES +from lbry.schema.result import Censor from lbry.blockchain.dewies import dewies_to_lbc from lbry.blockchain.transaction import Transaction, Output from lbry.blockchain.ledger import Ledger @@ -198,8 +199,7 @@ class Service: async def resolve(self, accounts, urls, **kwargs): raise NotImplementedError - async def search_claims( - self, accounts, include_purchase_receipt=False, include_is_my_output=False, **kwargs): + async def search_claims(self, accounts, **kwargs) -> Tuple[List[Output], Optional[int], Censor]: raise NotImplementedError async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: diff --git a/lbry/service/full_node.py b/lbry/service/full_node.py index 2a1028d61..354531735 100644 --- a/lbry/service/full_node.py +++ b/lbry/service/full_node.py @@ -43,8 +43,7 @@ class FullNode(Service): } async def search_claims(self, accounts, **kwargs): - txo_rows, extra_txo_rows, offset, total, search_censor = await self.db.search_claims(**kwargs) - return txo_rows, None, None, total + return await self.db.search_claims(**kwargs) async def get_transaction_address_filters(self, block_hash): return { diff --git a/lbry/wallet/wallet.py b/lbry/wallet/wallet.py index 5bfd8e457..a065db846 100644 --- a/lbry/wallet/wallet.py +++ b/lbry/wallet/wallet.py @@ -11,7 +11,7 @@ from hashlib import sha256 from operator import attrgetter from decimal import Decimal -from lbry.db import Database +from lbry.db import Database, SPENDABLE_TYPE_CODES from lbry.blockchain.ledger import Ledger from lbry.constants import COIN, NULL_HASH32 from lbry.blockchain.transaction import Transaction, Input, Output @@ -294,7 +294,11 @@ class Wallet: async def get_effective_amount_estimators(self, funding_accounts: Iterable[Account]): estimators = [] - for utxo in (await self.db.get_utxos(accounts=funding_accounts))[0]: + utxos = await self.db.get_utxos( + accounts=funding_accounts, + txo_type__in=SPENDABLE_TYPE_CODES + ) + for utxo in utxos[0]: estimators.append(OutputEffectiveAmountEstimator(self.ledger, utxo)) return estimators