diff --git a/lbry/db/database.py b/lbry/db/database.py index ba64742a2..0fe7dbe59 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -1,13 +1,15 @@ +# pylint: disable=singleton-comparison + import logging import asyncio import sqlite3 -from binascii import hexlify from concurrent.futures.thread import ThreadPoolExecutor -from typing import Tuple, List, Union, Any, Iterable, Dict, Optional +from typing import List, Union, Iterable, Optional from datetime import date import sqlalchemy -from sqlalchemy import select, text, and_, union, func +from sqlalchemy.future import select +from sqlalchemy import text, and_, union, func, inspect from sqlalchemy.sql.expression import Select try: from sqlalchemy.dialects.postgresql import insert as pg_insert @@ -28,7 +30,7 @@ sqlite3.enable_callback_tracebacks(True) def insert_or_ignore(conn, table): if conn.dialect.name == 'sqlite': - return table.insert(prefixes=("OR IGNORE",)) + return table.insert().prefix_with("OR IGNORE") elif conn.dialect.name == 'postgresql': return pg_insert(table).on_conflict_do_nothing() else: @@ -37,7 +39,7 @@ def insert_or_ignore(conn, table): def insert_or_replace(conn, table, replace): if conn.dialect.name == 'sqlite': - return table.insert(prefixes=("OR REPLACE",)) + return table.insert().prefix_with("OR REPLACE") elif conn.dialect.name == 'postgresql': insert = pg_insert(table) return insert.on_conflict_do_update( @@ -47,13 +49,6 @@ def insert_or_replace(conn, table, replace): raise RuntimeError(f'Unknown database dialect: {conn.dialect.name}.') -def dict_to_clause(t, d): - clauses = [] - for key, value in d.items(): - clauses.append(getattr(t.c, key) == value) - return and_(*clauses) - - def constrain_single_or_list(constraints, column, value, convert=lambda x: x): if value is not None: if isinstance(value, list): @@ -67,116 +62,6 @@ def constrain_single_or_list(constraints, column, value, convert=lambda x: x): return constraints -def constraints_to_sql(constraints, joiner=' AND ', prepend_key=''): - sql, values = [], {} - for key, constraint in constraints.items(): - tag = '0' - if '#' in key: - key, tag = key[:key.index('#')], key[key.index('#')+1:] - col, op, key = key, '=', key.replace('.', '_') - if not key: - sql.append(constraint) - continue - if key.startswith('$$'): - col, key = col[2:], key[1:] - elif key.startswith('$'): - values[key] = constraint - continue - if key.endswith('__not'): - col, op = col[:-len('__not')], '!=' - elif key.endswith('__is_null'): - col = col[:-len('__is_null')] - sql.append(f'{col} IS NULL') - continue - if key.endswith('__is_not_null'): - col = col[:-len('__is_not_null')] - sql.append(f'{col} IS NOT NULL') - continue - if key.endswith('__lt'): - col, op = col[:-len('__lt')], '<' - elif key.endswith('__lte'): - col, op = col[:-len('__lte')], '<=' - elif key.endswith('__gt'): - col, op = col[:-len('__gt')], '>' - elif key.endswith('__gte'): - col, op = col[:-len('__gte')], '>=' - elif key.endswith('__like'): - col, op = col[:-len('__like')], 'LIKE' - elif key.endswith('__not_like'): - col, op = col[:-len('__not_like')], 'NOT LIKE' - elif key.endswith('__in') or key.endswith('__not_in'): - if key.endswith('__in'): - col, op, one_val_op = col[:-len('__in')], 'IN', '=' - else: - col, op, one_val_op = col[:-len('__not_in')], 'NOT IN', '!=' - if constraint: - if isinstance(constraint, (list, set, tuple)): - if len(constraint) == 1: - values[f'{key}{tag}'] = next(iter(constraint)) - sql.append(f'{col} {one_val_op} :{key}{tag}') - else: - keys = [] - for i, val in enumerate(constraint): - keys.append(f':{key}{tag}_{i}') - values[f'{key}{tag}_{i}'] = val - sql.append(f'{col} {op} ({", ".join(keys)})') - elif isinstance(constraint, str): - sql.append(f'{col} {op} ({constraint})') - else: - raise ValueError(f"{col} requires a list, set or string as constraint value.") - continue - elif key.endswith('__any') or key.endswith('__or'): - where, subvalues = constraints_to_sql(constraint, ' OR ', key+tag+'_') - sql.append(f'({where})') - values.update(subvalues) - continue - if key.endswith('__and'): - where, subvalues = constraints_to_sql(constraint, ' AND ', key+tag+'_') - sql.append(f'({where})') - values.update(subvalues) - continue - sql.append(f'{col} {op} :{prepend_key}{key}{tag}') - values[prepend_key+key+tag] = constraint - return joiner.join(sql) if sql else '', values - - -def query(select, **constraints) -> Tuple[str, Dict[str, Any]]: - sql = [select] - limit = constraints.pop('limit', None) - offset = constraints.pop('offset', None) - order_by = constraints.pop('order_by', None) - group_by = constraints.pop('group_by', None) - - accounts = constraints.pop('accounts', []) - if accounts: - constraints['account__in'] = [a.public_key.address for a in accounts] - - where, values = constraints_to_sql(constraints) - if where: - sql.append('WHERE') - sql.append(where) - - if group_by is not None: - sql.append(f'GROUP BY {group_by}') - - if order_by: - sql.append('ORDER BY') - if isinstance(order_by, str): - sql.append(order_by) - elif isinstance(order_by, list): - sql.append(', '.join(order_by)) - else: - raise ValueError("order_by must be string or list") - - if limit is not None: - sql.append(f'LIMIT {limit}') - - if offset is not None: - sql.append(f'OFFSET {offset}') - - return ' '.join(sql), values - - def in_account(accounts: Union[List[PubKey], PubKey]): if isinstance(accounts, list): if len(accounts) > 1: @@ -185,38 +70,38 @@ def in_account(accounts: Union[List[PubKey], PubKey]): return AccountAddress.c.account == accounts.public_key.address -def query2(table, select: Select, **constraints) -> Select: +def query2(table, s: Select, **constraints) -> Select: limit = constraints.pop('limit', None) if limit is not None: - select = select.limit(limit) + s = s.limit(limit) offset = constraints.pop('offset', None) if offset is not None: - select = select.offset(offset) + s = s.offset(offset) order_by = constraints.pop('order_by', None) if order_by: if isinstance(order_by, str): - select = select.order_by(text(order_by)) + s = s.order_by(text(order_by)) elif isinstance(order_by, list): - select = select.order_by(text(', '.join(order_by))) + s = s.order_by(text(', '.join(order_by))) else: raise ValueError("order_by must be string or list") group_by = constraints.pop('group_by', None) if group_by is not None: - select = select.group_by(text(group_by)) + s = s.group_by(text(group_by)) accounts = constraints.pop('accounts', []) if accounts: - select.append_whereclause(in_account(accounts)) + s = s.where(in_account(accounts)) if constraints: - select.append_whereclause( + s = s.where( constraints_to_clause2(table, constraints) ) - return select + return s def constraints_to_clause2(tables, constraints): @@ -287,21 +172,19 @@ class Database: self.engine = None self.db: Optional[sqlalchemy.engine.Connection] = None + def sync_execute_fetchall(self, sql, params=None): + if params: + result = self.db.execute(sql, params) + else: + result = self.db.execute(sql) + if result.returns_rows: + return [dict(r._mapping) for r in result.fetchall()] + return [] + async def execute_fetchall(self, sql, params=None) -> List[dict]: - def foo(): - if params: - result = self.db.execute(sql, params) - else: - result = self.db.execute(sql) - if result.returns_rows: - return [dict(r) for r in result.fetchall()] - else: - try: - self.db.commit() - except: - pass - return [] - return await asyncio.get_event_loop().run_in_executor(self.executor, foo) + return await asyncio.get_event_loop().run_in_executor( + self.executor, self.sync_execute_fetchall, sql, params + ) def sync_executemany(self, sql, parameters): self.db.execute(sql, parameters) @@ -316,7 +199,7 @@ class Database: self.engine = sqlalchemy.create_engine(self.url) self.db = self.engine.connect() if self.SCHEMA_VERSION: - if self.engine.has_table('version'): + if inspect(self.engine).has_table('version'): version = self.db.execute(Version.select().limit(1)).fetchone() if version and version.version == self.SCHEMA_VERSION: return @@ -478,15 +361,15 @@ class Database: return True async def select_transactions(self, cols, accounts=None, **constraints): - s: Select = select(cols).select_from(TX) + s: Select = select(*cols).select_from(TX) if not {'tx_hash', 'tx_hash__in'}.intersection(constraints): assert accounts, "'accounts' argument required when no 'tx_hash' constraint is present" where = in_account(accounts) tx_hashes = union( - select([TXO.c.tx_hash], where, TXO.join(AccountAddress, TXO.c.address == AccountAddress.c.address)), - select([TXI.c.tx_hash], where, TXI.join(AccountAddress, TXI.c.address == AccountAddress.c.address)) + select(TXO.c.tx_hash).select_from(TXO.join(AccountAddress)).where(where), + select(TXI.c.tx_hash).select_from(TXI.join(AccountAddress)).where(where) ) - s.append_whereclause(TX.c.tx_hash.in_(tx_hashes)) + s = s.where(TX.c.tx_hash.in_(tx_hashes)) return await self.execute_fetchall(query2([TX], s, **constraints)) TXO_NOT_MINE = Output(None, None, is_my_output=False) @@ -577,83 +460,53 @@ class Database: is_my_input_or_output=None, exclude_internal_transfers=False, include_is_spent=False, include_is_my_input=False, is_spent=None, spent=None, **constraints): - s: Select = select(cols) + s: Select = select(*cols) if accounts: - #account_in_sql, values = constraints_to_sql({ - # '$$account__in': [a.public_key.address for a in accounts] - #}) - my_addresses = select([AccountAddress.c.address]).where(in_account(accounts)) - #f"SELECT address FROM account_address WHERE {account_in_sql}" + my_addresses = select(AccountAddress.c.address).where(in_account(accounts)) if is_my_input_or_output: include_is_my_input = True - s.append_whereclause( + s = s.where( TXO.c.address.in_(my_addresses) | ( (TXI.c.address != None) & (TXI.c.address.in_(my_addresses)) ) ) - #constraints['received_or_sent__or'] = { - # 'txo.address__in': my_addresses, - # 'sent__and': { - # 'txi.address__is_not_null': True, - # 'txi.address__in': my_addresses - # } - #} else: if is_my_output: - s.append_whereclause(TXO.c.address.in_(my_addresses)) - #constraints['txo.address__in'] = my_addresses + s = s.where(TXO.c.address.in_(my_addresses)) elif is_my_output is False: - s.append_whereclause(TXO.c.address.notin_(my_addresses)) - #constraints['txo.address__not_in'] = my_addresses + s = s.where(TXO.c.address.notin_(my_addresses)) if is_my_input: include_is_my_input = True - s.append_whereclause( + s = s.where( (TXI.c.address != None) & (TXI.c.address.in_(my_addresses)) ) - #constraints['txi.address__is_not_null'] = True - #constraints['txi.address__in'] = my_addresses elif is_my_input is False: include_is_my_input = True - s.append_whereclause( + s = s.where( (TXI.c.address == None) | (TXI.c.address.notin_(my_addresses)) ) - #constraints['is_my_input_false__or'] = { - # 'txi.address__is_null': True, - # 'txi.address__not_in': my_addresses - #} if exclude_internal_transfers: include_is_my_input = True - s.append_whereclause( + s = s.where( (TXO.c.txo_type != TXO_TYPES['other']) | (TXI.c.address == None) | (TXI.c.address.notin_(my_addresses)) ) - #constraints['exclude_internal_payments__or'] = { - # 'txo.txo_type__not': TXO_TYPES['other'], - # 'txi.address__is_null': True, - # 'txi.address__not_in': my_addresses - #} joins = TXO.join(TX) if spent is None: spent = TXI.alias('spent') - #sql = [f"SELECT {cols} FROM txo JOIN tx ON (tx.tx_hash=txo.tx_hash)"] if is_spent: - s.append_whereclause(spent.c.txo_hash != None) - #constraints['spent.txo_hash__is_not_null'] = True + s = s.where(spent.c.txo_hash != None) elif is_spent is False: - s.append_whereclause((spent.c.txo_hash == None) & (TXO.c.is_reserved == False)) - #constraints['is_reserved'] = False - #constraints['spent.txo_hash__is_null'] = True + s = s.where((spent.c.txo_hash == None) & (TXO.c.is_reserved == False)) if include_is_spent or is_spent is not None: joins = joins.join(spent, spent.c.txo_hash == TXO.c.txo_hash, isouter=True) - #sql.append("LEFT JOIN txi AS spent ON (spent.txo_hash=txo.txo_hash)") if include_is_my_input: joins = joins.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True) - #sql.append("LEFT JOIN txi ON (txi.position=0 AND txi.tx_hash=txo.tx_hash)") - s.append_from(joins) + s = s.select_from(joins) return await self.execute_fetchall(query2([TXO, TX], s, **constraints)) async def get_txos(self, wallet=None, no_tx=False, **constraints): @@ -669,7 +522,7 @@ class Database: my_accounts = None if wallet is not None: - my_accounts = select([AccountAddress.c.address], in_account(wallet.accounts)) + my_accounts = select(AccountAddress.c.address).where(in_account(wallet.accounts)) if include_is_my_output and my_accounts is not None: if constraints.get('is_my_output', None) in (True, False): @@ -692,21 +545,15 @@ class Database: if include_received_tips: support = TXO.alias('support') - select_columns.append(select( - [func.coalesce(func.sum(support.c.amount), 0)], - (support.c.claim_hash == TXO.c.claim_hash) & - (support.c.txo_type == TXO_TYPES['support']) & - (support.c.address.in_(my_accounts)) & - (support.c.txo_hash.notin_(select([TXI.c.txo_hash]))), - support - ).label('received_tips')) - #select_columns.append(f"""( - #SELECT COALESCE(SUM(support.amount), 0) FROM txo AS support WHERE - # support.claim_hash = txo.claim_hash AND - # support.txo_type = {TXO_TYPES['support']} AND - # support.address IN (SELECT address FROM account_address WHERE {my_accounts_sql}) AND - # support.txo_hash NOT IN (SELECT txo_hash FROM txi) - #) AS received_tips""") + select_columns.append( + select(func.coalesce(func.sum(support.c.amount), 0)) + .select_from(support).where( + (support.c.claim_hash == TXO.c.claim_hash) & + (support.c.txo_type == TXO_TYPES['support']) & + (support.c.address.in_(my_accounts)) & + (support.c.txo_hash.notin_(select(TXI.c.txo_hash))) + ).label('received_tips') + ) if 'order_by' not in constraints or constraints['order_by'] == 'height': constraints['order_by'] = [ @@ -836,7 +683,7 @@ class Database: async def select_addresses(self, cols, **constraints): return await self.execute_fetchall(query2( [AccountAddress, PubkeyAddress], - select(cols).select_from(PubkeyAddress.join(AccountAddress)), + select(*cols).select_from(PubkeyAddress.join(AccountAddress)), **constraints )) @@ -905,9 +752,8 @@ class Database: assert accounts, "'accounts' argument required to find purchases" if not {'purchased_claim_hash', 'purchased_claim_hash__in'}.intersection(constraints): constraints['purchased_claim_hash__is_not_null'] = True - constraints['tx_hash__in'] = select( - [TXI.c.tx_hash], in_account(accounts), - TXI.join(AccountAddress, TXI.c.address == AccountAddress.c.address) + constraints['tx_hash__in'] = ( + select(TXI.c.tx_hash).select_from(TXI.join(AccountAddress)).where(in_account(accounts)) ) async def get_purchases(self, **constraints): @@ -988,10 +834,10 @@ class Database: async def release_all_outputs(self, account): await self.execute_fetchall( - "UPDATE txo SET is_reserved = 0 WHERE" - " is_reserved = 1 AND txo.address IN (" - " SELECT address from account_address WHERE account = ?" - " )", (account.public_key.address, ) + TXO.update().values(is_reserved=False).where( + (TXO.c.is_reserved == True) & + (TXO.c.address.in_(select(AccountAddress.c.address).where(in_account(account)))) + ) ) def get_supports_summary(self, **constraints): diff --git a/lbry/db/tables.py b/lbry/db/tables.py index b17a2533f..eca336f52 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -1,6 +1,8 @@ +# pylint: skip-file + from sqlalchemy import ( MetaData, Table, Column, ForeignKey, - Binary, Text, SmallInteger, Integer, Boolean + BINARY, TEXT, SMALLINT, INTEGER, BOOLEAN ) @@ -9,74 +11,74 @@ metadata = MetaData() Version = Table( 'version', metadata, - Column('version', Text, primary_key=True), + Column('version', TEXT, primary_key=True), ) PubkeyAddress = Table( 'pubkey_address', metadata, - Column('address', Text, primary_key=True), - Column('history', Text, nullable=True), - Column('used_times', Integer, server_default='0'), + Column('address', TEXT, primary_key=True), + Column('history', TEXT, nullable=True), + Column('used_times', INTEGER, server_default='0'), ) AccountAddress = Table( 'account_address', metadata, - Column('account', Text, primary_key=True), - Column('address', Text, ForeignKey(PubkeyAddress.columns.address), primary_key=True), - Column('chain', Integer), - Column('pubkey', Binary), - Column('chain_code', Binary), - Column('n', Integer), - Column('depth', Integer), + Column('account', TEXT, primary_key=True), + Column('address', TEXT, ForeignKey(PubkeyAddress.columns.address), primary_key=True), + Column('chain', INTEGER), + Column('pubkey', BINARY), + Column('chain_code', BINARY), + Column('n', INTEGER), + Column('depth', INTEGER), ) Block = Table( 'block', metadata, - Column('block_hash', Binary, primary_key=True), - Column('previous_hash', Binary), - Column('file_number', SmallInteger), - Column('height', Integer), + Column('block_hash', BINARY, primary_key=True), + Column('previous_hash', BINARY), + Column('file_number', SMALLINT), + Column('height', INTEGER), ) TX = Table( 'tx', metadata, - Column('block_hash', Binary, nullable=True), - Column('tx_hash', Binary, primary_key=True), - Column('raw', Binary), - Column('height', Integer), - Column('position', SmallInteger), - Column('is_verified', Boolean, server_default='FALSE'), - Column('purchased_claim_hash', Binary, nullable=True), - Column('day', Integer, nullable=True), + Column('block_hash', BINARY, nullable=True), + Column('tx_hash', BINARY, primary_key=True), + Column('raw', BINARY), + Column('height', INTEGER), + Column('position', SMALLINT), + Column('is_verified', BOOLEAN, server_default='FALSE'), + Column('purchased_claim_hash', BINARY, nullable=True), + Column('day', INTEGER, nullable=True), ) TXO = Table( 'txo', metadata, - Column('tx_hash', Binary, ForeignKey(TX.columns.tx_hash)), - Column('txo_hash', Binary, primary_key=True), - Column('address', Text), - Column('position', Integer), - Column('amount', Integer), - Column('script', Binary), - Column('is_reserved', Boolean, server_default='0'), - Column('txo_type', Integer, server_default='0'), - Column('claim_id', Text, nullable=True), - Column('claim_hash', Binary, nullable=True), - Column('claim_name', Text, nullable=True), - Column('channel_hash', Binary, nullable=True), - Column('reposted_claim_hash', Binary, nullable=True), + Column('tx_hash', BINARY, ForeignKey(TX.columns.tx_hash)), + Column('txo_hash', BINARY, primary_key=True), + Column('address', TEXT, ForeignKey(AccountAddress.columns.address)), + Column('position', INTEGER), + Column('amount', INTEGER), + Column('script', BINARY), + Column('is_reserved', BOOLEAN, server_default='0'), + Column('txo_type', INTEGER, server_default='0'), + Column('claim_id', TEXT, nullable=True), + Column('claim_hash', BINARY, nullable=True), + Column('claim_name', TEXT, nullable=True), + Column('channel_hash', BINARY, nullable=True), + Column('reposted_claim_hash', BINARY, nullable=True), ) TXI = Table( 'txi', metadata, - Column('tx_hash', Binary, ForeignKey(TX.columns.tx_hash)), - Column('txo_hash', Binary, ForeignKey(TXO.columns.txo_hash), primary_key=True), - Column('address', Text), - Column('position', Integer), + Column('tx_hash', BINARY, ForeignKey(TX.columns.tx_hash)), + Column('txo_hash', BINARY, ForeignKey(TXO.columns.txo_hash), primary_key=True), + Column('address', TEXT, ForeignKey(AccountAddress.columns.address)), + Column('position', INTEGER), ) diff --git a/setup.py b/setup.py index 9d7700e48..9f81b6042 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ setup( 'attrs==18.2.0', 'pylru==1.1.0', 'pyzmq==18.1.1', - 'sqlalchemy', + 'sqlalchemy @ git+https://github.com/sqlalchemy/sqlalchemy.git', ], classifiers=[ 'Framework :: AsyncIO',