lbry-sdk/lbry/db/queries/search.py

468 lines
18 KiB
Python
Raw Normal View History

2020-07-11 18:18:33 -04:00
import struct
import logging
from decimal import Decimal
from binascii import unhexlify
from typing import Tuple, List, Optional, Dict
2020-07-11 18:18:33 -04:00
2020-10-13 14:34:19 -04:00
from sqlalchemy import func, case, text
2020-07-11 18:18:33 -04:00
from sqlalchemy.future import select, Select
from lbry.schema.tags import clean_tags
2020-08-04 13:49:59 -04:00
from lbry.schema.result import Censor, Outputs as ResultOutput
2020-07-11 18:18:33 -04:00
from lbry.schema.url import normalize_name
from lbry.blockchain.transaction import Output
from ..utils import query
from ..query_context import context
2020-12-07 07:39:27 -05:00
from ..tables import TX, TXO, Claim, Support, Trend, CensoredClaim
2020-07-11 18:18:33 -04:00
from ..constants import (
TXO_TYPES, STREAM_TYPES, ATTRIBUTE_ARRAY_MAX_LENGTH,
SEARCH_INTEGER_PARAMS, SEARCH_ORDER_FIELDS
)
from .txio import BASE_SELECT_TXO_COLUMNS, rows_to_txos
log = logging.getLogger(__name__)
BASE_SELECT_SUPPORT_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Support.c.channel_hash,
Support.c.is_signature_valid,
]
2020-09-18 02:43:28 -03:00
def compat_layer(**constraints):
# for old sdk, to be removed later
replacements = {"effective_amount": "staked_amount"}
for old_key, new_key in replacements.items():
if old_key in constraints:
constraints[new_key] = constraints.pop(old_key)
2020-09-18 20:16:37 -03:00
order_by = constraints.get("order_by", [])
if old_key in order_by:
constraints["order_by"] = [order_key if order_key != old_key else new_key for order_key in order_by]
2020-09-18 02:43:28 -03:00
return constraints
2020-07-11 18:18:33 -04:00
def select_supports(cols: List = None, **constraints) -> Select:
if cols is None:
cols = BASE_SELECT_SUPPORT_COLUMNS
joins = Support.join(TXO, ).join(TX)
return query([Support], select(*cols).select_from(joins), **constraints)
def search_supports(**constraints) -> Tuple[List[Output], Optional[int]]:
total = None
2020-08-10 18:37:21 -04:00
if constraints.pop('include_total', False):
2020-07-11 18:18:33 -04:00
total = search_support_count(**constraints)
2020-08-10 18:53:00 -04:00
if 'claim_id' in constraints:
constraints['claim_hash'] = unhexlify(constraints.pop('claim_id'))[::-1]
2020-07-11 18:18:33 -04:00
rows = context().fetchall(select_supports(**constraints))
txos = rows_to_txos(rows, include_tx=False)
return txos, total
def sum_supports(claim_hash, include_channel_content=False, exclude_own_supports=False) -> Tuple[List[Dict], int]:
2020-10-13 14:34:19 -04:00
supporter = Claim.alias("supporter")
content = Claim.alias("content")
where_condition = (content.c.claim_hash == claim_hash)
if include_channel_content:
where_condition |= (content.c.channel_hash == claim_hash)
support_join_condition = TXO.c.channel_hash == supporter.c.claim_hash
if exclude_own_supports:
support_join_condition &= TXO.c.channel_hash != claim_hash
2020-10-13 14:34:19 -04:00
q = select(
supporter.c.short_url.label("supporter"),
2020-10-13 14:34:19 -04:00
func.sum(TXO.c.amount).label("staked"),
).select_from(
TXO
.join(content, TXO.c.claim_hash == content.c.claim_hash)
.join(supporter, support_join_condition)
2020-10-13 14:34:19 -04:00
).where(
where_condition &
(TXO.c.txo_type == TXO_TYPES["support"]) &
((TXO.c.address == content.c.address) | ((TXO.c.address != content.c.address) & (TXO.c.spent_height == 0)))
).group_by(
supporter.c.short_url
2020-10-13 14:34:19 -04:00
).order_by(
text("staked DESC, supporter ASC")
2020-10-13 14:34:19 -04:00
)
result = context().fetchall(q)
total = sum([row['staked'] for row in result])
return result, total
2020-10-13 14:34:19 -04:00
2020-07-11 18:18:33 -04:00
def search_support_count(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = context().fetchall(select_supports([func.count().label('total')], **constraints))
return count[0]['total'] or 0
2020-11-13 16:39:05 -03:00
channel_claim = Claim.alias('channel')
BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
2020-07-11 18:18:33 -04:00
Claim.c.activation_height,
Claim.c.takeover_height,
Claim.c.creation_height,
2020-09-23 00:20:08 -03:00
Claim.c.expiration_height,
2020-07-11 18:18:33 -04:00
Claim.c.is_controlling,
Claim.c.channel_hash,
2020-08-04 10:41:49 -04:00
Claim.c.reposted_count,
2020-07-11 18:18:33 -04:00
Claim.c.reposted_claim_hash,
Claim.c.short_url,
Claim.c.signed_claim_count,
Claim.c.signed_support_count,
(Claim.c.amount + Claim.c.staked_support_amount).label('staked_amount'),
Claim.c.staked_support_amount,
Claim.c.staked_support_count,
Claim.c.is_signature_valid,
case([(
2020-11-13 16:39:05 -03:00
channel_claim.c.short_url.isnot(None),
channel_claim.c.short_url + '/' + Claim.c.short_url
)]).label('canonical_url'),
2020-12-07 07:39:27 -05:00
func.coalesce(Trend.c.trend_local, 0).label('trend_local'),
func.coalesce(Trend.c.trend_mixed, 0).label('trend_mixed'),
func.coalesce(Trend.c.trend_global, 0).label('trend_global'),
func.coalesce(Trend.c.trend_group, 0).label('trend_group'),
CensoredClaim.c.censor_type,
CensoredClaim.c.censoring_channel_hash
2020-07-11 18:18:33 -04:00
]
2020-07-11 18:18:33 -04:00
def select_claims(cols: List = None, for_count=False, **constraints) -> Select:
2020-09-18 02:43:28 -03:00
constraints = compat_layer(**constraints)
2020-07-11 18:18:33 -04:00
if cols is None:
cols = BASE_SELECT_CLAIM_COLUMNS
if 'order_by' in constraints:
order_by_parts = constraints['order_by']
if isinstance(order_by_parts, str):
order_by_parts = [order_by_parts]
sql_order_by = []
for order_by in order_by_parts:
is_asc = order_by.startswith('^')
column = order_by[1:] if is_asc else order_by
if column not in SEARCH_ORDER_FIELDS:
raise NameError(f'{column} is not a valid order_by field')
if column == 'name':
column = 'claim_name'
2020-10-19 10:47:26 -03:00
table = "trend" if column.startswith('trend') else "claim"
column = f"{table}.{column}"
2020-12-07 07:39:27 -05:00
if column in ('trend_group', 'trend_mixed', 'release_time'):
column = f"COALESCE({column}, {1<<32})"
2020-07-11 18:18:33 -04:00
sql_order_by.append(
f"{column} {'ASC' if is_asc else 'DESC'}"
2020-07-11 18:18:33 -04:00
)
constraints['order_by'] = sql_order_by
ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'}
for constraint in SEARCH_INTEGER_PARAMS:
if constraint in constraints:
value = constraints.pop(constraint)
postfix = ''
if isinstance(value, str):
if len(value) >= 2 and value[:2] in ops:
postfix, value = ops[value[:2]], value[2:]
elif len(value) >= 1 and value[0] in ops:
postfix, value = ops[value[0]], value[1:]
if constraint == 'fee_amount':
value = Decimal(value)*1000
constraints[f'{constraint}{postfix}'] = int(value)
if 'sequence' in constraints:
constraints['order_by'] = 'activation_height ASC'
constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1
if 'amount_order' in constraints:
2020-09-18 20:16:37 -03:00
constraints['order_by'] = 'staked_amount DESC'
2020-07-11 18:18:33 -04:00
constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1
if 'claim_id' in constraints:
claim_id = constraints.pop('claim_id')
if len(claim_id) == 40:
constraints['claim_id'] = claim_id
else:
constraints['claim_id__like'] = f'{claim_id[:40]}%'
elif 'claim_ids' in constraints:
constraints['claim_id__in'] = set(constraints.pop('claim_ids'))
if 'reposted_claim_id' in constraints:
constraints['reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1]
if 'name' in constraints:
constraints['normalized'] = normalize_name(constraints.pop('name'))
2020-07-11 18:18:33 -04:00
if 'public_key_id' in constraints:
constraints['public_key_hash'] = (
context().ledger.address_to_hash160(constraints.pop('public_key_id')))
2020-11-16 12:31:44 -05:00
if 'channel_id' in constraints:
channel_id = constraints.pop('channel_id')
if channel_id:
if isinstance(channel_id, str):
channel_id = [channel_id]
2020-07-11 18:18:33 -04:00
constraints['channel_hash__in'] = {
2020-11-16 12:31:44 -05:00
unhexlify(cid)[::-1] for cid in channel_id
2020-07-11 18:18:33 -04:00
}
2020-11-16 12:31:44 -05:00
if 'not_channel_id' in constraints:
not_channel_ids = constraints.pop('not_channel_id')
2020-07-11 18:18:33 -04:00
if not_channel_ids:
not_channel_ids_binary = {
unhexlify(ncid)[::-1] for ncid in not_channel_ids
}
constraints['claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary
if constraints.get('has_channel_signature', False):
constraints['channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'signature_valid__is_null': True,
'channel_hash__not_in': not_channel_ids_binary
}
2020-11-16 12:31:44 -05:00
if 'is_signature_valid' in constraints:
2020-07-11 18:18:33 -04:00
has_channel_signature = constraints.pop('has_channel_signature', False)
2020-11-16 12:31:44 -05:00
is_signature_valid = constraints.pop('is_signature_valid')
2020-07-11 18:18:33 -04:00
if has_channel_signature:
2020-11-16 12:31:44 -05:00
constraints['is_signature_valid'] = is_signature_valid
2020-07-11 18:18:33 -04:00
else:
constraints['null_or_signature__or'] = {
2020-11-16 12:31:44 -05:00
'is_signature_valid__is_null': True,
'is_signature_valid': is_signature_valid
2020-07-11 18:18:33 -04:00
}
elif constraints.pop('has_channel_signature', False):
2020-11-16 12:31:44 -05:00
constraints['is_signature_valid__is_not_null'] = True
2020-07-11 18:18:33 -04:00
if 'txid' in constraints:
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
nout = constraints.pop('nout', 0)
constraints['txo_hash'] = tx_hash + struct.pack('<I', nout)
if 'claim_type' in constraints:
claim_types = constraints.pop('claim_type')
if isinstance(claim_types, str):
2020-11-17 14:30:33 -05:00
claim_types = {claim_types}
2020-07-11 18:18:33 -04:00
if claim_types:
constraints['claim_type__in'] = {
TXO_TYPES[claim_type] for claim_type in claim_types
}
2020-11-17 14:30:33 -05:00
if 'stream_type' in constraints:
stream_types = constraints.pop('stream_type')
if isinstance(stream_types, str):
stream_types = {stream_types}
2020-07-11 18:18:33 -04:00
if stream_types:
constraints['stream_type__in'] = {
STREAM_TYPES[stream_type] for stream_type in stream_types
}
2020-11-17 14:30:33 -05:00
if 'media_type' in constraints:
media_types = constraints.pop('media_type')
if isinstance(media_types, str):
media_types = {media_types}
2020-07-11 18:18:33 -04:00
if media_types:
constraints['media_type__in'] = set(media_types)
if 'fee_currency' in constraints:
constraints['fee_currency'] = constraints.pop('fee_currency').lower()
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
_apply_constraints_for_array_attributes(constraints, 'location', lambda _: _, for_count)
if 'text' in constraints:
# TODO: fix
constraints["search"] = constraints.pop("text")
return query(
2020-11-16 12:31:44 -05:00
[Claim, TXO],
select(*cols)
.select_from(
2020-12-07 07:39:27 -05:00
Claim.join(TXO).join(TX)
.join(Trend, Trend.c.claim_hash == Claim.c.claim_hash, isouter=True)
2020-11-13 16:39:05 -03:00
.join(channel_claim, Claim.c.channel_hash == channel_claim.c.claim_hash, isouter=True)
2020-12-07 07:39:27 -05:00
.join(
CensoredClaim,
(CensoredClaim.c.claim_hash == Claim.c.claim_hash) |
(CensoredClaim.c.claim_hash == Claim.c.reposted_claim_hash) |
(CensoredClaim.c.claim_hash == Claim.c.channel_hash),
isouter=True
)
), **constraints
)
2020-07-11 18:18:33 -04:00
2020-08-04 13:49:59 -04:00
def protobuf_search_claims(**constraints) -> str:
txos, _, censor = search_claims(**constraints)
return ResultOutput.to_base64(txos, [], blocked=censor)
2020-07-11 18:18:33 -04:00
def search_claims(**constraints) -> Tuple[List[Output], Optional[int], Optional[Censor]]:
2020-11-16 12:31:44 -05:00
ctx = context()
search_censor = ctx.get_search_censor()
2020-07-11 18:18:33 -04:00
total = None
2020-08-10 14:10:15 -04:00
if constraints.pop('include_total', False):
2020-07-11 18:18:33 -04:00
total = search_claim_count(**constraints)
2020-11-16 12:31:44 -05:00
2020-07-11 18:18:33 -04:00
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
2020-11-16 12:31:44 -05:00
channel_url = constraints.pop('channel', None)
if channel_url:
2020-11-16 13:23:27 -05:00
from .resolve import resolve_url # pylint: disable=import-outside-toplevel
2020-11-16 12:31:44 -05:00
channel = resolve_url(channel_url)
if isinstance(channel, Output):
constraints['channel_hash'] = channel.claim_hash
else:
return [], total, search_censor
rows = ctx.fetchall(select_claims(**constraints))
2020-11-05 18:16:21 -03:00
rows = search_censor.apply(rows)
2020-07-11 18:18:33 -04:00
txos = rows_to_txos(rows, include_tx=False)
2020-11-16 12:31:44 -05:00
annotate_with_channels(txos)
2020-07-11 18:18:33 -04:00
return txos, total, search_censor
2020-11-16 12:31:44 -05:00
def annotate_with_channels(txos):
channel_hashes = set()
for txo in txos:
if txo.can_decode_claim and txo.claim.is_signed:
channel_hashes.add(txo.claim.signing_channel_hash)
if channel_hashes:
rows = context().fetchall(select_claims(claim_hash__in=channel_hashes))
channels = {
txo.claim_hash: txo for txo in
rows_to_txos(rows, include_tx=False)
}
for txo in txos:
if txo.can_decode_claim and txo.claim.is_signed:
txo.channel = channels.get(txo.claim.signing_channel_hash, None)
2020-07-11 18:18:33 -04:00
def search_claim_count(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = context().fetchall(select_claims([func.count().label('total')], **constraints))
return count[0]['total'] or 0
2020-07-11 18:18:33 -04:00
CLAIM_HASH_OR_REPOST_HASH_SQL = f"""
CASE WHEN claim.claim_type = {TXO_TYPES['repost']}
THEN claim.reposted_claim_hash
ELSE claim.claim_hash
END
"""
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
2020-11-16 12:31:44 -05:00
any_items = set(cleaner(constraints.pop(f'any_{attr}', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = set(cleaner(constraints.pop(f'all_{attr}', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
not_items = set(cleaner(constraints.pop(f'not_{attr}', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
2020-07-11 18:18:33 -04:00
all_items = {item for item in all_items if item not in not_items}
any_items = {item for item in any_items if item not in not_items}
any_queries = {}
# if attr == 'tag':
# common_tags = any_items & COMMON_TAGS.keys()
# if common_tags:
# any_items -= common_tags
# if len(common_tags) < 5:
# for item in common_tags:
# index_name = COMMON_TAGS[item]
# any_queries[f'#_common_tag_{index_name}'] = f"""
# EXISTS(
# SELECT 1 FROM tag INDEXED BY tag_{index_name}_idx
# WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
# AND tag = '{item}'
# )
# """
# elif len(common_tags) >= 5:
# constraints.update({
# f'$any_common_tag{i}': item for i, item in enumerate(common_tags)
# })
# values = ', '.join(
# f':$any_common_tag{i}' for i in range(len(common_tags))
# )
# any_queries[f'#_any_common_tags'] = f"""
# EXISTS(
# SELECT 1 FROM tag WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
# AND tag IN ({values})
# )
# """
if any_items:
constraints.update({
f'$any_{attr}{i}': item for i, item in enumerate(any_items)
})
values = ', '.join(
f':$any_{attr}{i}' for i in range(len(any_items))
)
if for_count or attr == 'tag':
any_queries[f'#_any_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
any_queries[f'#_any_{attr}'] = f"""
EXISTS(
SELECT 1 FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""
if len(any_queries) == 1:
constraints.update(any_queries)
elif len(any_queries) > 1:
constraints[f'ORed_{attr}_queries__any'] = any_queries
if all_items:
constraints[f'$all_{attr}_count'] = len(all_items)
constraints.update({
f'$all_{attr}{i}': item for i, item in enumerate(all_items)
})
values = ', '.join(
f':$all_{attr}{i}' for i in range(len(all_items))
)
if for_count:
constraints[f'#_all_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count
)
"""
else:
constraints[f'#_all_{attr}'] = f"""
{len(all_items)}=(
SELECT count(*) FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""
if not_items:
constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
})
values = ', '.join(
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
constraints[f'#_not_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(
SELECT 1 FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""