wip lbry.db

This commit is contained in:
Lex Berezhny 2020-05-01 09:29:44 -04:00
parent ccd32eae70
commit 6a33d86bfe
7 changed files with 1582 additions and 1266 deletions

View file

@ -1,6 +1,7 @@
from .database import Database, in_account from .database import Database
from .constants import TXO_TYPES, CLAIM_TYPE_CODES, CLAIM_TYPE_NAMES
from .tables import ( from .tables import (
Table, Version, metadata, Table, Version, metadata,
AccountAddress, PubkeyAddress, AccountAddress, PubkeyAddress,
Block, TX, TXO, TXI Block, TX, TXO, TXI, Claim, Tag, Claimtrie
) )

59
lbry/db/constants.py Normal file
View file

@ -0,0 +1,59 @@
TXO_TYPES = {
"other": 0,
"stream": 1,
"channel": 2,
"support": 3,
"purchase": 4,
"collection": 5,
"repost": 6,
}
CLAIM_TYPE_NAMES = [
'stream',
'channel',
'collection',
'repost',
]
CLAIM_TYPE_CODES = [
TXO_TYPES[name] for name in CLAIM_TYPE_NAMES
]
STREAM_TYPES = {
'video': 1,
'audio': 2,
'image': 3,
'document': 4,
'binary': 5,
'model': 6
}
MATURE_TAGS = (
'nsfw', 'porn', 'xxx', 'mature', 'adult', 'sex'
)
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
SEARCH_INTEGER_PARAMS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'reposted',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed',
'trending_local', 'trending_global',
}
SEARCH_PARAMS = {
'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
'is_controlling', 'limit', 'offset', 'order_by',
'no_totals',
} | SEARCH_INTEGER_PARAMS
SEARCH_ORDER_FIELDS = {
'name', 'claim_hash'
} | SEARCH_INTEGER_PARAMS

File diff suppressed because it is too large Load diff

1201
lbry/db/queries.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,7 @@ from lbry.schema.tags import clean_tags
from lbry.schema.result import Outputs, Censor from lbry.schema.result import Outputs, Censor
from lbry.blockchain.ledger import Ledger, RegTestLedger from lbry.blockchain.ledger import Ledger, RegTestLedger
from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS from .constants import CLAIM_TYPES, STREAM_TYPES
from .full_text_search import FTS_ORDER_BY from .full_text_search import FTS_ORDER_BY
@ -34,32 +34,6 @@ class SQLiteInterruptedError(apsw.InterruptError):
self.metrics = metrics self.metrics = metrics
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
INTEGER_PARAMS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount',
'tx_position', 'channel_join', 'reposted',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed',
'trending_local', 'trending_global',
}
SEARCH_PARAMS = {
'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
'is_controlling', 'limit', 'offset', 'order_by',
'no_totals',
} | INTEGER_PARAMS
ORDER_FIELDS = {
'name', 'claim_hash'
} | INTEGER_PARAMS
@dataclass @dataclass
@ -166,446 +140,3 @@ def reports_metrics(func):
return wrapper return wrapper
@reports_metrics
def search_to_bytes(constraints) -> Union[bytes, Tuple[bytes, Dict]]:
return encode_result(search(constraints))
@reports_metrics
def resolve_to_bytes(urls) -> Union[bytes, Tuple[bytes, Dict]]:
return encode_result(resolve(urls))
def encode_result(result):
return Outputs.to_bytes(*result)
@measure
def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor) -> List:
context = ctx.get()
context.set_query_timeout()
try:
c = context.db.cursor()
def row_filter(cursor, row):
nonlocal row_offset
row = row_factory(cursor, row)
if len(row) > 1 and censor.censor(row):
return
if row_offset:
row_offset -= 1
return
return row
c.setrowtrace(row_filter)
i, rows = 0, []
for row in c.execute(sql, values):
i += 1
rows.append(row)
if i >= row_limit:
break
return rows
except apsw.Error as err:
plain_sql = interpolate(sql, values)
if context.is_tracking_metrics:
context.metrics['execute_query'][-1]['sql'] = plain_sql
if isinstance(err, apsw.InterruptError):
context.log.warning("interrupted slow sqlite query:\n%s", plain_sql)
raise SQLiteInterruptedError(context.metrics)
context.log.exception('failed running query', exc_info=err)
raise SQLiteOperationalError(context.metrics)
def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]:
if 'order_by' in constraints:
order_by_parts = constraints['order_by']
if isinstance(order_by_parts, str):
order_by_parts = [order_by_parts]
sql_order_by = []
for order_by in order_by_parts:
is_asc = order_by.startswith('^')
column = order_by[1:] if is_asc else order_by
if column not in ORDER_FIELDS:
raise NameError(f'{column} is not a valid order_by field')
if column == 'name':
column = 'normalized'
sql_order_by.append(
f"claim.{column} ASC" if is_asc else f"claim.{column} DESC"
)
constraints['order_by'] = sql_order_by
ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'}
for constraint in INTEGER_PARAMS:
if constraint in constraints:
value = constraints.pop(constraint)
postfix = ''
if isinstance(value, str):
if len(value) >= 2 and value[:2] in ops:
postfix, value = ops[value[:2]], value[2:]
elif len(value) >= 1 and value[0] in ops:
postfix, value = ops[value[0]], value[1:]
if constraint == 'fee_amount':
value = Decimal(value)*1000
constraints[f'claim.{constraint}{postfix}'] = int(value)
if constraints.pop('is_controlling', False):
if {'sequence', 'amount_order'}.isdisjoint(constraints):
for_count = False
constraints['claimtrie.claim_hash__is_not_null'] = ''
if 'sequence' in constraints:
constraints['order_by'] = 'claim.activation_height ASC'
constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1
if 'amount_order' in constraints:
constraints['order_by'] = 'claim.effective_amount DESC'
constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1
if 'claim_id' in constraints:
claim_id = constraints.pop('claim_id')
if len(claim_id) == 40:
constraints['claim.claim_id'] = claim_id
else:
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
elif 'claim_ids' in constraints:
constraints['claim.claim_id__in'] = set(constraints.pop('claim_ids'))
if 'reposted_claim_id' in constraints:
constraints['claim.reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1]
if 'name' in constraints:
constraints['claim.normalized'] = normalize_name(constraints.pop('name'))
if 'public_key_id' in constraints:
constraints['claim.public_key_hash'] = (
ctx.get().ledger.address_to_hash160(constraints.pop('public_key_id')))
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = constraints.pop('channel_hash')
if 'channel_ids' in constraints:
channel_ids = constraints.pop('channel_ids')
if channel_ids:
constraints['claim.channel_hash__in'] = {
unhexlify(cid)[::-1] for cid in channel_ids
}
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = {
unhexlify(ncid)[::-1] for ncid in not_channel_ids
}
constraints['claim.claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
constraints['claim.signature_valid'] = constraints.pop('signature_valid')
else:
constraints['null_or_signature__or'] = {
'claim.signature_valid__is_null': True,
'claim.signature_valid': constraints.pop('signature_valid')
}
elif constraints.pop('has_channel_signature', False):
constraints['claim.signature_valid__is_not_null'] = True
if 'txid' in constraints:
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
nout = constraints.pop('nout', 0)
constraints['claim.txo_hash'] = tx_hash + struct.pack('<I', nout)
if 'claim_type' in constraints:
claim_types = constraints.pop('claim_type')
if isinstance(claim_types, str):
claim_types = [claim_types]
if claim_types:
constraints['claim.claim_type__in'] = {
CLAIM_TYPES[claim_type] for claim_type in claim_types
}
if 'stream_types' in constraints:
stream_types = constraints.pop('stream_types')
if stream_types:
constraints['claim.stream_type__in'] = {
STREAM_TYPES[stream_type] for stream_type in stream_types
}
if 'media_types' in constraints:
media_types = constraints.pop('media_types')
if media_types:
constraints['claim.media_type__in'] = set(media_types)
if 'fee_currency' in constraints:
constraints['claim.fee_currency'] = constraints.pop('fee_currency').lower()
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
_apply_constraints_for_array_attributes(constraints, 'location', lambda _: _, for_count)
if 'text' in constraints:
constraints["search"] = constraints.pop("text")
constraints["order_by"] = FTS_ORDER_BY
select = f"SELECT {cols} FROM search JOIN claim ON (search.rowid=claim.rowid)"
else:
select = f"SELECT {cols} FROM claim"
if not for_count:
select += " LEFT JOIN claimtrie USING (claim_hash)"
return query(select, **constraints)
def select_claims(censor: Censor, cols: str, for_count=False, **constraints) -> List:
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = resolve_url(channel_url)
if isinstance(match, dict):
constraints['channel_hash'] = match['claim_hash']
else:
return [{'row_count': 0}] if cols == 'count(*) as row_count' else []
row_offset = constraints.pop('offset', 0)
row_limit = constraints.pop('limit', 20)
sql, values = claims_query(cols, for_count, **constraints)
return execute_query(sql, values, row_offset, row_limit, censor)
@measure
def count_claims(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = select_claims(Censor(), 'count(*) as row_count', for_count=True, **constraints)
return count[0]['row_count']
def search_claims(censor: Censor, **constraints) -> List:
return select_claims(
censor,
"""
claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
claim.claim_hash, claim.txo_hash,
claim.claims_in_channel, claim.reposted,
claim.height, claim.creation_height,
claim.activation_height, claim.expiration_height,
claim.effective_amount, claim.support_amount,
claim.trending_group, claim.trending_mixed,
claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url,
claim.channel_hash, claim.reposted_claim_hash,
claim.signature_valid
""", **constraints
)
def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]):
censor = ctx.get().get_resolve_censor()
repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows)))
channel_hashes = set(chain(
filter(None, map(itemgetter('channel_hash'), txo_rows)),
censor_channels
))
reposted_txos = []
if repost_hashes:
reposted_txos = search_claims(censor, **{'claim.claim_hash__in': repost_hashes})
channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos)))
channel_txos = []
if channel_hashes:
channel_txos = search_claims(censor, **{'claim.claim_hash__in': channel_hashes})
# channels must come first for client side inflation to work properly
return channel_txos + reposted_txos
@measure
def search(constraints) -> Tuple[List, List, int, int, Censor]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None
if not constraints.pop('no_totals', False):
total = count_claims(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
context = ctx.get()
search_censor = context.get_search_censor()
txo_rows = search_claims(search_censor, **constraints)
extra_txo_rows = _get_referenced_rows(txo_rows, search_censor.censored.keys())
return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor
@measure
def resolve(urls) -> Tuple[List, List]:
txo_rows = [resolve_url(raw_url) for raw_url in urls]
extra_txo_rows = _get_referenced_rows(
[txo for txo in txo_rows if isinstance(txo, dict)],
[txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)]
)
return txo_rows, extra_txo_rows
@measure
def resolve_url(raw_url):
censor = ctx.get().get_resolve_censor()
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
channel = None
if url.has_channel:
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^creation_height']
matches = search_claims(censor, **query, limit=1)
if matches:
channel = matches[0]
elif censor.censored:
return ResolveCensoredError(raw_url, next(iter(censor.censored)))
else:
return LookupError(f'Could not find channel in "{raw_url}".')
if url.has_stream:
query = url.stream.to_dict()
if channel is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount', '^height']
else:
query['order_by'] = ['^channel_join']
query['channel_hash'] = channel['claim_hash']
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = search_claims(censor, **query, limit=1)
if matches:
return matches[0]
elif censor.censored:
return ResolveCensoredError(raw_url, next(iter(censor.censored)))
else:
return LookupError(f'Could not find claim at "{raw_url}".')
return channel
CLAIM_HASH_OR_REPOST_HASH_SQL = f"""
CASE WHEN claim.claim_type = {CLAIM_TYPES['repost']}
THEN claim.reposted_claim_hash
ELSE claim.claim_hash
END
"""
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH])
all_items = {item for item in all_items if item not in not_items}
any_items = {item for item in any_items if item not in not_items}
any_queries = {}
if attr == 'tag':
common_tags = any_items & COMMON_TAGS.keys()
if common_tags:
any_items -= common_tags
if len(common_tags) < 5:
for item in common_tags:
index_name = COMMON_TAGS[item]
any_queries[f'#_common_tag_{index_name}'] = f"""
EXISTS(
SELECT 1 FROM tag INDEXED BY tag_{index_name}_idx
WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
AND tag = '{item}'
)
"""
elif len(common_tags) >= 5:
constraints.update({
f'$any_common_tag{i}': item for i, item in enumerate(common_tags)
})
values = ', '.join(
f':$any_common_tag{i}' for i in range(len(common_tags))
)
any_queries[f'#_any_common_tags'] = f"""
EXISTS(
SELECT 1 FROM tag WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash
AND tag IN ({values})
)
"""
if any_items:
constraints.update({
f'$any_{attr}{i}': item for i, item in enumerate(any_items)
})
values = ', '.join(
f':$any_{attr}{i}' for i in range(len(any_items))
)
if for_count or attr == 'tag':
any_queries[f'#_any_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
any_queries[f'#_any_{attr}'] = f"""
EXISTS(
SELECT 1 FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""
if len(any_queries) == 1:
constraints.update(any_queries)
elif len(any_queries) > 1:
constraints[f'ORed_{attr}_queries__any'] = any_queries
if all_items:
constraints[f'$all_{attr}_count'] = len(all_items)
constraints.update({
f'$all_{attr}{i}': item for i, item in enumerate(all_items)
})
values = ', '.join(
f':$all_{attr}{i}' for i in range(len(all_items))
)
if for_count:
constraints[f'#_all_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count
)
"""
else:
constraints[f'#_all_{attr}'] = f"""
{len(all_items)}=(
SELECT count(*) FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""
if not_items:
constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
})
values = ', '.join(
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
constraints[f'#_not_{attr}'] = f"""
{CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN (
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
)
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(
SELECT 1 FROM {attr} WHERE
{CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash
AND {attr} IN ({values})
)
"""

View file

@ -6,6 +6,9 @@ from sqlalchemy import (
) )
SCHEMA_VERSION = '1.4'
metadata = MetaData() metadata = MetaData()
@ -18,7 +21,6 @@ Version = Table(
PubkeyAddress = Table( PubkeyAddress = Table(
'pubkey_address', metadata, 'pubkey_address', metadata,
Column('address', Text, primary_key=True), Column('address', Text, primary_key=True),
Column('history', Text, nullable=True),
Column('used_times', Integer, server_default='0'), Column('used_times', Integer, server_default='0'),
) )
@ -41,6 +43,7 @@ Block = Table(
Column('previous_hash', LargeBinary), Column('previous_hash', LargeBinary),
Column('file_number', SmallInteger), Column('file_number', SmallInteger),
Column('height', Integer), Column('height', Integer),
Column('block_filter', LargeBinary, nullable=True)
) )
@ -54,6 +57,7 @@ TX = Table(
Column('is_verified', Boolean, server_default='FALSE'), Column('is_verified', Boolean, server_default='FALSE'),
Column('purchased_claim_hash', LargeBinary, nullable=True), Column('purchased_claim_hash', LargeBinary, nullable=True),
Column('day', Integer, nullable=True), Column('day', Integer, nullable=True),
Column('tx_filter', LargeBinary, nullable=True)
) )
@ -64,14 +68,13 @@ TXO = Table(
Column('address', Text), Column('address', Text),
Column('position', Integer), Column('position', Integer),
Column('amount', BigInteger), Column('amount', BigInteger),
Column('script', LargeBinary), Column('script_offset', BigInteger),
Column('script_length', BigInteger),
Column('is_reserved', Boolean, server_default='0'), Column('is_reserved', Boolean, server_default='0'),
Column('txo_type', Integer, server_default='0'), Column('txo_type', Integer, server_default='0'),
Column('claim_id', Text, nullable=True), Column('claim_id', Text, nullable=True),
Column('claim_hash', LargeBinary, nullable=True), Column('claim_hash', LargeBinary, nullable=True),
Column('claim_name', Text, nullable=True), Column('claim_name', Text, nullable=True),
Column('channel_hash', LargeBinary, nullable=True),
Column('reposted_claim_hash', LargeBinary, nullable=True),
) )
txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address) txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address)
@ -81,8 +84,37 @@ TXI = Table(
'txi', metadata, 'txi', metadata,
Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)), Column('tx_hash', LargeBinary, ForeignKey(TX.columns.tx_hash)),
Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash), primary_key=True), Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash), primary_key=True),
Column('address', Text), Column('address', Text, nullable=True),
Column('position', Integer), Column('position', Integer),
) )
txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address) txi_join_account = TXI.join(AccountAddress, TXI.columns.address == AccountAddress.columns.address)
Claim = Table(
'claim', metadata,
Column('claim_hash', LargeBinary, primary_key=True),
Column('claim_name', Text),
Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash)),
Column('amount', BigInteger),
Column('channel_hash', LargeBinary, nullable=True),
Column('effective_amount', BigInteger, server_default='0'),
Column('reposted_claim_hash', LargeBinary, nullable=True),
Column('activation_height', Integer, server_default='0'),
Column('expiration_height', Integer, server_default='0'),
)
Tag = Table(
'tag', metadata,
Column('claim_hash', LargeBinary),
Column('tag', Text),
)
Claimtrie = Table(
'claimtrie', metadata,
Column('normalized', Text, primary_key=True),
Column('claim_hash', LargeBinary, ForeignKey(Claim.columns.claim_hash)),
Column('last_take_over_height', Integer),
)

129
lbry/db/utils.py Normal file
View file

@ -0,0 +1,129 @@
from itertools import islice
from typing import List, Union
from sqlalchemy import text, and_
from sqlalchemy.sql.expression import Select
try:
from sqlalchemy.dialects.postgresql import insert as pg_insert
except ImportError:
pg_insert = None
from .tables import AccountAddress
def chunk(rows, step):
it, total = iter(rows), len(rows)
for _ in range(0, total, step):
yield min(step, total), islice(it, step)
total -= step
def constrain_single_or_list(constraints, column, value, convert=lambda x: x):
if value is not None:
if isinstance(value, list):
value = [convert(v) for v in value]
if len(value) == 1:
constraints[column] = value[0]
elif len(value) > 1:
constraints[f"{column}__in"] = value
else:
constraints[column] = convert(value)
return constraints
def in_account_ids(account_ids: Union[List[str], str]):
if isinstance(account_ids, list):
if len(account_ids) > 1:
return AccountAddress.c.account.in_(account_ids)
account_ids = account_ids[0]
return AccountAddress.c.account == account_ids
def query(table, s: Select, **constraints) -> Select:
limit = constraints.pop('limit', None)
if limit is not None:
s = s.limit(limit)
offset = constraints.pop('offset', None)
if offset is not None:
s = s.offset(offset)
order_by = constraints.pop('order_by', None)
if order_by:
if isinstance(order_by, str):
s = s.order_by(text(order_by))
elif isinstance(order_by, list):
s = s.order_by(text(', '.join(order_by)))
else:
raise ValueError("order_by must be string or list")
group_by = constraints.pop('group_by', None)
if group_by is not None:
s = s.group_by(text(group_by))
account_ids = constraints.pop('account_ids', [])
if account_ids:
s = s.where(in_account_ids(account_ids))
if constraints:
s = s.where(
constraints_to_clause(table, constraints)
)
return s
def constraints_to_clause(tables, constraints):
clause = []
for key, constraint in constraints.items():
if key.endswith('__not'):
col, op = key[:-len('__not')], '__ne__'
elif key.endswith('__is_null'):
col = key[:-len('__is_null')]
op = '__eq__'
constraint = None
elif key.endswith('__is_not_null'):
col = key[:-len('__is_not_null')]
op = '__ne__'
constraint = None
elif key.endswith('__lt'):
col, op = key[:-len('__lt')], '__lt__'
elif key.endswith('__lte'):
col, op = key[:-len('__lte')], '__le__'
elif key.endswith('__gt'):
col, op = key[:-len('__gt')], '__gt__'
elif key.endswith('__gte'):
col, op = key[:-len('__gte')], '__ge__'
elif key.endswith('__like'):
col, op = key[:-len('__like')], 'like'
elif key.endswith('__not_like'):
col, op = key[:-len('__not_like')], 'notlike'
elif key.endswith('__in') or key.endswith('__not_in'):
if key.endswith('__in'):
col, op, one_val_op = key[:-len('__in')], 'in_', '__eq__'
else:
col, op, one_val_op = key[:-len('__not_in')], 'notin_', '__ne__'
if isinstance(constraint, Select):
pass
elif constraint:
if isinstance(constraint, (list, set, tuple)):
if len(constraint) == 1:
op = one_val_op
constraint = next(iter(constraint))
elif isinstance(constraint, str):
constraint = text(constraint)
else:
raise ValueError(f"{col} requires a list, set or string as constraint value.")
else:
continue
else:
col, op = key, '__eq__'
attr = None
for table in tables:
attr = getattr(table.c, col, None)
if attr is not None:
clause.append(getattr(attr, op)(constraint))
break
if attr is None:
raise ValueError(f"Attribute '{col}' not found on tables: {', '.join([t.name for t in tables])}.")
return and_(*clause)