import logging import asyncio import sqlite3 from binascii import hexlify from concurrent.futures.thread import ThreadPoolExecutor from typing import Tuple, List, Union, Any, Iterable, Dict, Optional from datetime import date import sqlalchemy from sqlalchemy import select, text, and_, union, func from sqlalchemy.sql.expression import Select try: from sqlalchemy.dialects.postgresql import insert as pg_insert except ImportError: pg_insert = None from lbry.wallet import PubKey from lbry.wallet.transaction import Transaction, Output, OutputScript, TXRefImmutable from lbry.wallet.constants import TXO_TYPES, CLAIM_TYPES from .tables import metadata, Version, TX, TXI, TXO, PubkeyAddress, AccountAddress log = logging.getLogger(__name__) sqlite3.enable_callback_tracebacks(True) def insert_or_ignore(conn, table): if conn.dialect.name == 'sqlite': return table.insert(prefixes=("OR IGNORE",)) elif conn.dialect.name == 'postgresql': return pg_insert(table).on_conflict_do_nothing() else: raise RuntimeError(f'Unknown database dialect: {conn.dialect.name}.') def insert_or_replace(conn, table, replace): if conn.dialect.name == 'sqlite': return table.insert(prefixes=("OR REPLACE",)) elif conn.dialect.name == 'postgresql': insert = pg_insert(table) return insert.on_conflict_do_update( table.primary_key, set_={col: getattr(insert.excluded, col) for col in replace} ) else: raise RuntimeError(f'Unknown database dialect: {conn.dialect.name}.') def dict_to_clause(t, d): clauses = [] for key, value in d.items(): clauses.append(getattr(t.c, key) == value) return and_(*clauses) def constrain_single_or_list(constraints, column, value, convert=lambda x: x): if value is not None: if isinstance(value, list): value = [convert(v) for v in value] if len(value) == 1: constraints[column] = value[0] elif len(value) > 1: constraints[f"{column}__in"] = value else: constraints[column] = convert(value) return constraints def constraints_to_sql(constraints, joiner=' AND ', prepend_key=''): sql, values = [], {} for key, constraint in constraints.items(): tag = '0' if '#' in key: key, tag = key[:key.index('#')], key[key.index('#')+1:] col, op, key = key, '=', key.replace('.', '_') if not key: sql.append(constraint) continue if key.startswith('$$'): col, key = col[2:], key[1:] elif key.startswith('$'): values[key] = constraint continue if key.endswith('__not'): col, op = col[:-len('__not')], '!=' elif key.endswith('__is_null'): col = col[:-len('__is_null')] sql.append(f'{col} IS NULL') continue if key.endswith('__is_not_null'): col = col[:-len('__is_not_null')] sql.append(f'{col} IS NOT NULL') continue if key.endswith('__lt'): col, op = col[:-len('__lt')], '<' elif key.endswith('__lte'): col, op = col[:-len('__lte')], '<=' elif key.endswith('__gt'): col, op = col[:-len('__gt')], '>' elif key.endswith('__gte'): col, op = col[:-len('__gte')], '>=' elif key.endswith('__like'): col, op = col[:-len('__like')], 'LIKE' elif key.endswith('__not_like'): col, op = col[:-len('__not_like')], 'NOT LIKE' elif key.endswith('__in') or key.endswith('__not_in'): if key.endswith('__in'): col, op, one_val_op = col[:-len('__in')], 'IN', '=' else: col, op, one_val_op = col[:-len('__not_in')], 'NOT IN', '!=' if constraint: if isinstance(constraint, (list, set, tuple)): if len(constraint) == 1: values[f'{key}{tag}'] = next(iter(constraint)) sql.append(f'{col} {one_val_op} :{key}{tag}') else: keys = [] for i, val in enumerate(constraint): keys.append(f':{key}{tag}_{i}') values[f'{key}{tag}_{i}'] = val sql.append(f'{col} {op} ({", ".join(keys)})') elif isinstance(constraint, str): sql.append(f'{col} {op} ({constraint})') else: raise ValueError(f"{col} requires a list, set or string as constraint value.") continue elif key.endswith('__any') or key.endswith('__or'): where, subvalues = constraints_to_sql(constraint, ' OR ', key+tag+'_') sql.append(f'({where})') values.update(subvalues) continue if key.endswith('__and'): where, subvalues = constraints_to_sql(constraint, ' AND ', key+tag+'_') sql.append(f'({where})') values.update(subvalues) continue sql.append(f'{col} {op} :{prepend_key}{key}{tag}') values[prepend_key+key+tag] = constraint return joiner.join(sql) if sql else '', values def query(select, **constraints) -> Tuple[str, Dict[str, Any]]: sql = [select] limit = constraints.pop('limit', None) offset = constraints.pop('offset', None) order_by = constraints.pop('order_by', None) group_by = constraints.pop('group_by', None) accounts = constraints.pop('accounts', []) if accounts: constraints['account__in'] = [a.public_key.address for a in accounts] where, values = constraints_to_sql(constraints) if where: sql.append('WHERE') sql.append(where) if group_by is not None: sql.append(f'GROUP BY {group_by}') if order_by: sql.append('ORDER BY') if isinstance(order_by, str): sql.append(order_by) elif isinstance(order_by, list): sql.append(', '.join(order_by)) else: raise ValueError("order_by must be string or list") if limit is not None: sql.append(f'LIMIT {limit}') if offset is not None: sql.append(f'OFFSET {offset}') return ' '.join(sql), values def in_account(accounts: Union[List[PubKey], PubKey]): if isinstance(accounts, list): if len(accounts) > 1: return AccountAddress.c.account.in_({a.public_key.address for a in accounts}) accounts = accounts[0] return AccountAddress.c.account == accounts.public_key.address def query2(table, select: Select, **constraints) -> Select: limit = constraints.pop('limit', None) if limit is not None: select = select.limit(limit) offset = constraints.pop('offset', None) if offset is not None: select = select.offset(offset) order_by = constraints.pop('order_by', None) if order_by: if isinstance(order_by, str): select = select.order_by(text(order_by)) elif isinstance(order_by, list): select = select.order_by(text(', '.join(order_by))) else: raise ValueError("order_by must be string or list") group_by = constraints.pop('group_by', None) if group_by is not None: select = select.group_by(text(group_by)) accounts = constraints.pop('accounts', []) if accounts: select.append_whereclause(in_account(accounts)) if constraints: select.append_whereclause( constraints_to_clause2(table, constraints) ) return select def constraints_to_clause2(tables, constraints): clause = [] for key, constraint in constraints.items(): if key.endswith('__not'): col, op = key[:-len('__not')], '__ne__' elif key.endswith('__is_null'): col = key[:-len('__is_null')] op = '__eq__' constraint = None elif key.endswith('__is_not_null'): col = key[:-len('__is_not_null')] op = '__ne__' constraint = None elif key.endswith('__lt'): col, op = key[:-len('__lt')], '__lt__' elif key.endswith('__lte'): col, op = key[:-len('__lte')], '__le__' elif key.endswith('__gt'): col, op = key[:-len('__gt')], '__gt__' elif key.endswith('__gte'): col, op = key[:-len('__gte')], '__ge__' elif key.endswith('__like'): col, op = key[:-len('__like')], 'like' elif key.endswith('__not_like'): col, op = key[:-len('__not_like')], 'notlike' elif key.endswith('__in') or key.endswith('__not_in'): if key.endswith('__in'): col, op, one_val_op = key[:-len('__in')], 'in_', '__eq__' else: col, op, one_val_op = key[:-len('__not_in')], 'notin_', '__ne__' if isinstance(constraint, sqlalchemy.sql.expression.Select): pass elif constraint: if isinstance(constraint, (list, set, tuple)): if len(constraint) == 1: op = one_val_op constraint = next(iter(constraint)) elif isinstance(constraint, str): constraint = text(constraint) else: raise ValueError(f"{col} requires a list, set or string as constraint value.") else: continue else: col, op = key, '__eq__' attr = None for table in tables: attr = getattr(table.c, col, None) if attr is not None: clause.append(getattr(attr, op)(constraint)) break if attr is None: raise ValueError(f"Attribute '{col}' not found on tables: {', '.join([t.name for t in tables])}.") return and_(*clause) class Database: SCHEMA_VERSION = "1.3" MAX_QUERY_VARIABLES = 900 def __init__(self, url): self.url = url self.ledger = None self.executor = ThreadPoolExecutor(max_workers=1) self.engine = None self.db: Optional[sqlalchemy.engine.Connection] = None async def execute_fetchall(self, sql, params=None) -> List[dict]: def foo(): if params: result = self.db.execute(sql, params) else: result = self.db.execute(sql) if result.returns_rows: return [dict(r) for r in result.fetchall()] else: try: self.db.commit() except: pass return [] return await asyncio.get_event_loop().run_in_executor(self.executor, foo) def sync_executemany(self, sql, parameters): self.db.execute(sql, parameters) async def executemany(self, sql: str, parameters: Iterable = None): return await asyncio.get_event_loop().run_in_executor( self.executor, self.sync_executemany, sql, parameters ) def sync_open(self): log.info("connecting to database: %s", self.url) self.engine = sqlalchemy.create_engine(self.url) self.db = self.engine.connect() if self.SCHEMA_VERSION: if self.engine.has_table('version'): version = self.db.execute(Version.select().limit(1)).fetchone() if version and version.version == self.SCHEMA_VERSION: return metadata.drop_all(self.engine) metadata.create_all(self.engine) self.db.execute(Version.insert().values(version=self.SCHEMA_VERSION)) else: metadata.create_all(self.engine) return self async def open(self): return await asyncio.get_event_loop().run_in_executor( self.executor, self.sync_open ) def sync_close(self): if self.engine is not None: self.engine.dispose() self.engine = None self.db = None async def close(self): await asyncio.get_event_loop().run_in_executor( self.executor, self.sync_close ) def sync_create(self, name): engine = sqlalchemy.create_engine(self.url) db = engine.connect() db.execute('commit') db.execute(f'create database {name}') async def create(self, name): await asyncio.get_event_loop().run_in_executor( self.executor, self.sync_create, name ) def sync_drop(self, name): engine = sqlalchemy.create_engine(self.url) db = engine.connect() db.execute('commit') db.execute(f'drop database if exists {name}') async def drop(self, name): await asyncio.get_event_loop().run_in_executor( self.executor, self.sync_drop, name ) def txo_to_row(self, tx, txo): row = { 'tx_hash': tx.hash, 'txo_hash': txo.hash, 'address': txo.get_address(self.ledger), 'position': txo.position, 'amount': txo.amount, 'script': txo.script.source } if txo.is_claim: if txo.can_decode_claim: claim = txo.claim row['txo_type'] = TXO_TYPES.get(claim.claim_type, TXO_TYPES['stream']) if claim.is_repost: row['reposted_claim_hash'] = claim.repost.reference.claim_hash if claim.is_signed: row['channel_hash'] = claim.signing_channel_hash else: row['txo_type'] = TXO_TYPES['stream'] elif txo.is_support: row['txo_type'] = TXO_TYPES['support'] elif txo.purchase is not None: row['txo_type'] = TXO_TYPES['purchase'] row['claim_id'] = txo.purchased_claim_id row['claim_hash'] = txo.purchased_claim_hash if txo.script.is_claim_involved: row['claim_id'] = txo.claim_id row['claim_hash'] = txo.claim_hash row['claim_name'] = txo.claim_name return row def tx_to_row(self, tx): row = { 'tx_hash': tx.hash, 'raw': tx.raw, 'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified, 'day': tx.get_ordinal_day(self.ledger), } txos = tx.outputs if len(txos) >= 2 and txos[1].can_decode_purchase_data: txos[0].purchase = txos[1] row['purchased_claim_hash'] = txos[1].purchase_data.claim_hash return row async def insert_transaction(self, tx): await self.execute_fetchall(TX.insert().values(self.tx_to_row(tx))) def _transaction_io(self, conn: sqlite3.Connection, tx: Transaction, address, txhash): conn.execute( insert_or_replace(conn, TX, ('block_hash', 'height', 'position', 'is_verified', 'day')).values( self.tx_to_row(tx) ) ) is_my_input = False for txi in tx.inputs: if txi.txo_ref.txo is not None: txo = txi.txo_ref.txo if txo.has_address and txo.get_address(self.ledger) == address: is_my_input = True conn.execute( insert_or_ignore(conn, TXI).values({ 'tx_hash': tx.hash, 'txo_hash': txo.hash, 'address': address, 'position': txi.position }) ) for txo in tx.outputs: if txo.script.is_pay_pubkey_hash and (txo.pubkey_hash == txhash or is_my_input): conn.execute(insert_or_ignore(conn, TXO).values(self.txo_to_row(tx, txo))) elif txo.script.is_pay_script_hash: # TODO: implement script hash payments log.warning('Database.save_transaction_io: pay script hash is not implemented!') def save_transaction_io(self, tx: Transaction, address, txhash, history): return self.save_transaction_io_batch([tx], address, txhash, history) def save_transaction_io_batch(self, txs: Iterable[Transaction], address, txhash, history): history_count = history.count(':') // 2 def __many(): for tx in txs: self._transaction_io(self.db, tx, address, txhash) self.db.execute( PubkeyAddress.update() .values(history=history, used_times=history_count) .where(PubkeyAddress.c.address == address) ) return asyncio.get_event_loop().run_in_executor(self.executor, __many) async def reserve_outputs(self, txos, is_reserved=True): txo_hashes = [txo.hash for txo in txos] if txo_hashes: await self.execute_fetchall( TXO.update().values(is_reserved=is_reserved).where(TXO.c.txo_hash.in_(txo_hashes)) ) async def release_outputs(self, txos): await self.reserve_outputs(txos, is_reserved=False) async def rewind_blockchain(self, above_height): # pylint: disable=no-self-use # TODO: # 1. delete transactions above_height # 2. update address histories removing deleted TXs return True async def select_transactions(self, cols, accounts=None, **constraints): s: Select = select(cols).select_from(TX) if not {'tx_hash', 'tx_hash__in'}.intersection(constraints): assert accounts, "'accounts' argument required when no 'tx_hash' constraint is present" where = in_account(accounts) tx_hashes = union( select([TXO.c.tx_hash], where, TXO.join(AccountAddress, TXO.c.address == AccountAddress.c.address)), select([TXI.c.tx_hash], where, TXI.join(AccountAddress, TXI.c.address == AccountAddress.c.address)) ) s.append_whereclause(TX.c.tx_hash.in_(tx_hashes)) return await self.execute_fetchall(query2([TX], s, **constraints)) TXO_NOT_MINE = Output(None, None, is_my_output=False) async def get_transactions(self, wallet=None, **constraints): include_is_spent = constraints.pop('include_is_spent', False) include_is_my_input = constraints.pop('include_is_my_input', False) include_is_my_output = constraints.pop('include_is_my_output', False) tx_rows = await self.select_transactions( [TX.c.tx_hash, TX.c.raw, TX.c.height, TX.c.position, TX.c.is_verified], order_by=constraints.pop('order_by', ["height=0 DESC", "height DESC", "position DESC"]), **constraints ) if not tx_rows: return [] txids, txs, txi_txoids = [], [], [] for row in tx_rows: txids.append(row['tx_hash']) txs.append(Transaction( raw=row['raw'], height=row['height'], position=row['position'], is_verified=bool(row['is_verified']) )) for txi in txs[-1].inputs: txi_txoids.append(txi.txo_ref.hash) step = self.MAX_QUERY_VARIABLES annotated_txos = {} for offset in range(0, len(txids), step): annotated_txos.update({ txo.id: txo for txo in (await self.get_txos( wallet=wallet, tx_hash__in=txids[offset:offset+step], order_by='txo.tx_hash', include_is_spent=include_is_spent, include_is_my_input=include_is_my_input, include_is_my_output=include_is_my_output, )) }) referenced_txos = {} for offset in range(0, len(txi_txoids), step): referenced_txos.update({ txo.id: txo for txo in (await self.get_txos( wallet=wallet, txo_hash__in=txi_txoids[offset:offset+step], order_by='txo.txo_hash', include_is_my_output=include_is_my_output, )) }) for tx in txs: for txi in tx.inputs: txo = referenced_txos.get(txi.txo_ref.id) if txo: txi.txo_ref = txo.ref for txo in tx.outputs: _txo = annotated_txos.get(txo.id) if _txo: txo.update_annotations(_txo) else: txo.update_annotations(self.TXO_NOT_MINE) for tx in txs: txos = tx.outputs if len(txos) >= 2 and txos[1].can_decode_purchase_data: txos[0].purchase = txos[1] return txs async def get_transaction_count(self, **constraints): constraints.pop('wallet', None) constraints.pop('offset', None) constraints.pop('limit', None) constraints.pop('order_by', None) count = await self.select_transactions([func.count().label('total')], **constraints) return count[0]['total'] or 0 async def get_transaction(self, **constraints): txs = await self.get_transactions(limit=1, **constraints) if txs: return txs[0] async def select_txos( self, cols, accounts=None, is_my_input=None, is_my_output=True, is_my_input_or_output=None, exclude_internal_transfers=False, include_is_spent=False, include_is_my_input=False, is_spent=None, spent=None, **constraints): s: Select = select(cols) if accounts: #account_in_sql, values = constraints_to_sql({ # '$$account__in': [a.public_key.address for a in accounts] #}) my_addresses = select([AccountAddress.c.address]).where(in_account(accounts)) #f"SELECT address FROM account_address WHERE {account_in_sql}" if is_my_input_or_output: include_is_my_input = True s.append_whereclause( TXO.c.address.in_(my_addresses) | ( (TXI.c.address != None) & (TXI.c.address.in_(my_addresses)) ) ) #constraints['received_or_sent__or'] = { # 'txo.address__in': my_addresses, # 'sent__and': { # 'txi.address__is_not_null': True, # 'txi.address__in': my_addresses # } #} else: if is_my_output: s.append_whereclause(TXO.c.address.in_(my_addresses)) #constraints['txo.address__in'] = my_addresses elif is_my_output is False: s.append_whereclause(TXO.c.address.notin_(my_addresses)) #constraints['txo.address__not_in'] = my_addresses if is_my_input: include_is_my_input = True s.append_whereclause( (TXI.c.address != None) & (TXI.c.address.in_(my_addresses)) ) #constraints['txi.address__is_not_null'] = True #constraints['txi.address__in'] = my_addresses elif is_my_input is False: include_is_my_input = True s.append_whereclause( (TXI.c.address == None) | (TXI.c.address.notin_(my_addresses)) ) #constraints['is_my_input_false__or'] = { # 'txi.address__is_null': True, # 'txi.address__not_in': my_addresses #} if exclude_internal_transfers: include_is_my_input = True s.append_whereclause( (TXO.c.txo_type != TXO_TYPES['other']) | (TXI.c.address == None) | (TXI.c.address.notin_(my_addresses)) ) #constraints['exclude_internal_payments__or'] = { # 'txo.txo_type__not': TXO_TYPES['other'], # 'txi.address__is_null': True, # 'txi.address__not_in': my_addresses #} joins = TXO.join(TX) if spent is None: spent = TXI.alias('spent') #sql = [f"SELECT {cols} FROM txo JOIN tx ON (tx.tx_hash=txo.tx_hash)"] if is_spent: s.append_whereclause(spent.c.txo_hash != None) #constraints['spent.txo_hash__is_not_null'] = True elif is_spent is False: s.append_whereclause((spent.c.txo_hash == None) & (TXO.c.is_reserved == False)) #constraints['is_reserved'] = False #constraints['spent.txo_hash__is_null'] = True if include_is_spent or is_spent is not None: joins = joins.join(spent, spent.c.txo_hash == TXO.c.txo_hash, isouter=True) #sql.append("LEFT JOIN txi AS spent ON (spent.txo_hash=txo.txo_hash)") if include_is_my_input: joins = joins.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True) #sql.append("LEFT JOIN txi ON (txi.position=0 AND txi.tx_hash=txo.tx_hash)") s.append_from(joins) return await self.execute_fetchall(query2([TXO, TX], s, **constraints)) async def get_txos(self, wallet=None, no_tx=False, **constraints): include_is_spent = constraints.get('include_is_spent', False) include_is_my_input = constraints.get('include_is_my_input', False) include_is_my_output = constraints.pop('include_is_my_output', False) include_received_tips = constraints.pop('include_received_tips', False) select_columns = [ TX.c.tx_hash, TX.c.raw, TX.c.height, TX.c.position.label('tx_position'), TX.c.is_verified, TXO.c.txo_type, TXO.c.position.label('txo_position'), TXO.c.amount, TXO.c.script ] my_accounts = None if wallet is not None: my_accounts = select([AccountAddress.c.address], in_account(wallet.accounts)) if include_is_my_output and my_accounts is not None: if constraints.get('is_my_output', None) in (True, False): select_columns.append(text(f"{1 if constraints['is_my_output'] else 0} AS is_my_output")) else: select_columns.append(TXO.c.address.in_(my_accounts).label('is_my_output')) if include_is_my_input and my_accounts is not None: if constraints.get('is_my_input', None) in (True, False): select_columns.append(text(f"{1 if constraints['is_my_input'] else 0} AS is_my_input")) else: select_columns.append(( (TXI.c.address != None) & (TXI.c.address.in_(my_accounts)) ).label('is_my_input')) spent = TXI.alias('spent') if include_is_spent: select_columns.append((spent.c.txo_hash != None).label('is_spent')) if include_received_tips: support = TXO.alias('support') select_columns.append(select( [func.coalesce(func.sum(support.c.amount), 0)], (support.c.claim_hash == TXO.c.claim_hash) & (support.c.txo_type == TXO_TYPES['support']) & (support.c.address.in_(my_accounts)) & (support.c.txo_hash.notin_(select([TXI.c.txo_hash]))), support ).label('received_tips')) #select_columns.append(f"""( #SELECT COALESCE(SUM(support.amount), 0) FROM txo AS support WHERE # support.claim_hash = txo.claim_hash AND # support.txo_type = {TXO_TYPES['support']} AND # support.address IN (SELECT address FROM account_address WHERE {my_accounts_sql}) AND # support.txo_hash NOT IN (SELECT txo_hash FROM txi) #) AS received_tips""") if 'order_by' not in constraints or constraints['order_by'] == 'height': constraints['order_by'] = [ "tx.height=0 DESC", "tx.height DESC", "tx.position DESC", "txo.position" ] elif constraints.get('order_by', None) == 'none': del constraints['order_by'] rows = await self.select_txos(select_columns, spent=spent, **constraints) txos = [] txs = {} for row in rows: if no_tx: txo = Output( amount=row['amount'], script=OutputScript(row['script']), tx_ref=TXRefImmutable.from_hash(row['tx_hash'], row['height']), position=row['txo_position'] ) else: if row['tx_hash'] not in txs: txs[row['tx_hash']] = Transaction( row['raw'], height=row['height'], position=row['tx_position'], is_verified=bool(row['is_verified']) ) txo = txs[row['tx_hash']].outputs[row['txo_position']] if include_is_spent: txo.is_spent = bool(row['is_spent']) if include_is_my_input: txo.is_my_input = bool(row['is_my_input']) if include_is_my_output: txo.is_my_output = bool(row['is_my_output']) if include_is_my_input and include_is_my_output: if txo.is_my_input and txo.is_my_output and row['txo_type'] == TXO_TYPES['other']: txo.is_internal_transfer = True else: txo.is_internal_transfer = False if include_received_tips: txo.received_tips = row['received_tips'] txos.append(txo) channel_hashes = set() for txo in txos: if txo.is_claim and txo.can_decode_claim: if txo.claim.is_signed: channel_hashes.add(txo.claim.signing_channel_hash) if txo.claim.is_channel and wallet: for account in wallet.accounts: private_key = account.get_channel_private_key( txo.claim.channel.public_key_bytes ) if private_key: txo.private_key = private_key break if channel_hashes: channels = { txo.claim_hash: txo for txo in (await self.get_channels( wallet=wallet, claim_hash__in=channel_hashes, )) } for txo in txos: if txo.is_claim and txo.can_decode_claim: txo.channel = channels.get(txo.claim.signing_channel_hash, None) return txos @staticmethod def _clean_txo_constraints_for_aggregation(constraints): constraints.pop('include_is_spent', None) constraints.pop('include_is_my_input', None) constraints.pop('include_is_my_output', None) constraints.pop('include_received_tips', None) constraints.pop('wallet', None) constraints.pop('resolve', None) constraints.pop('offset', None) constraints.pop('limit', None) constraints.pop('order_by', None) async def get_txo_count(self, **constraints): self._clean_txo_constraints_for_aggregation(constraints) count = await self.select_txos([func.count().label('total')], **constraints) return count[0]['total'] or 0 async def get_txo_sum(self, **constraints): self._clean_txo_constraints_for_aggregation(constraints) result = await self.select_txos([func.sum(TXO.c.amount).label('total')], **constraints) return result[0]['total'] or 0 async def get_txo_plot(self, start_day=None, days_back=0, end_day=None, days_after=None, **constraints): self._clean_txo_constraints_for_aggregation(constraints) if start_day is None: current_ordinal = self.ledger.headers.estimated_date(self.ledger.headers.height).toordinal() constraints['day__gte'] = current_ordinal - days_back else: constraints['day__gte'] = date.fromisoformat(start_day).toordinal() if end_day is not None: constraints['day__lte'] = date.fromisoformat(end_day).toordinal() elif days_after is not None: constraints['day__lte'] = constraints['day__gte'] + days_after plot = await self.select_txos( [TX.c.day, func.sum(TXO.c.amount).label('total')], group_by='day', order_by='day', **constraints ) for row in plot: row['day'] = date.fromordinal(row['day']) return plot def get_utxos(self, **constraints): return self.get_txos(is_spent=False, **constraints) def get_utxo_count(self, **constraints): return self.get_txo_count(is_spent=False, **constraints) async def get_balance(self, wallet=None, accounts=None, **constraints): assert wallet or accounts, \ "'wallet' or 'accounts' constraints required to calculate balance" constraints['accounts'] = accounts or wallet.accounts balance = await self.select_txos( [func.sum(TXO.c.amount).label('total')], is_spent=False, **constraints ) return balance[0]['total'] or 0 async def select_addresses(self, cols, **constraints): return await self.execute_fetchall(query2( [AccountAddress, PubkeyAddress], select(cols).select_from(PubkeyAddress.join(AccountAddress)), **constraints )) async def get_addresses(self, cols=None, **constraints): if cols is None: cols = ( PubkeyAddress.c.address, PubkeyAddress.c.history, PubkeyAddress.c.used_times, AccountAddress.c.account, AccountAddress.c.chain, AccountAddress.c.pubkey, AccountAddress.c.chain_code, AccountAddress.c.n, AccountAddress.c.depth ) addresses = await self.select_addresses(cols, **constraints) if AccountAddress.c.pubkey in cols: for address in addresses: address['pubkey'] = PubKey( self.ledger, bytes(address.pop('pubkey')), bytes(address.pop('chain_code')), address.pop('n'), address.pop('depth') ) return addresses async def get_address_count(self, cols=None, **constraints): count = await self.select_addresses([func.count().label('total')], **constraints) return count[0]['total'] or 0 async def get_address(self, **constraints): addresses = await self.get_addresses(limit=1, **constraints) if addresses: return addresses[0] async def add_keys(self, account, chain, pubkeys): await self.execute_fetchall( insert_or_ignore(self.db, PubkeyAddress).values([{ 'address': k.address } for k in pubkeys]) ) await self.execute_fetchall( insert_or_ignore(self.db, AccountAddress).values([{ 'account': account.id, 'address': k.address, 'chain': chain, 'pubkey': k.pubkey_bytes, 'chain_code': k.chain_code, 'n': k.n, 'depth': k.depth } for k in pubkeys]) ) async def _set_address_history(self, address, history): await self.execute_fetchall( PubkeyAddress.update() .values(history=history, used_times=history.count(':')//2) .where(PubkeyAddress.c.address == address) ) async def set_address_history(self, address, history): await self._set_address_history(address, history) @staticmethod def constrain_purchases(constraints): accounts = constraints.pop('accounts', None) assert accounts, "'accounts' argument required to find purchases" if not {'purchased_claim_hash', 'purchased_claim_hash__in'}.intersection(constraints): constraints['purchased_claim_hash__is_not_null'] = True constraints['tx_hash__in'] = select( [TXI.c.tx_hash], in_account(accounts), TXI.join(AccountAddress, TXI.c.address == AccountAddress.c.address) ) async def get_purchases(self, **constraints): self.constrain_purchases(constraints) return [tx.outputs[0] for tx in await self.get_transactions(**constraints)] def get_purchase_count(self, **constraints): self.constrain_purchases(constraints) return self.get_transaction_count(**constraints) @staticmethod def constrain_claims(constraints): if {'txo_type', 'txo_type__in'}.intersection(constraints): return claim_types = constraints.pop('claim_type', None) if claim_types: constrain_single_or_list( constraints, 'txo_type', claim_types, lambda x: TXO_TYPES[x] ) else: constraints['txo_type__in'] = CLAIM_TYPES async def get_claims(self, **constraints) -> List[Output]: self.constrain_claims(constraints) return await self.get_utxos(**constraints) def get_claim_count(self, **constraints): self.constrain_claims(constraints) return self.get_utxo_count(**constraints) @staticmethod def constrain_streams(constraints): constraints['txo_type'] = TXO_TYPES['stream'] def get_streams(self, **constraints): self.constrain_streams(constraints) return self.get_claims(**constraints) def get_stream_count(self, **constraints): self.constrain_streams(constraints) return self.get_claim_count(**constraints) @staticmethod def constrain_channels(constraints): constraints['txo_type'] = TXO_TYPES['channel'] def get_channels(self, **constraints): self.constrain_channels(constraints) return self.get_claims(**constraints) def get_channel_count(self, **constraints): self.constrain_channels(constraints) return self.get_claim_count(**constraints) @staticmethod def constrain_supports(constraints): constraints['txo_type'] = TXO_TYPES['support'] def get_supports(self, **constraints): self.constrain_supports(constraints) return self.get_utxos(**constraints) def get_support_count(self, **constraints): self.constrain_supports(constraints) return self.get_utxo_count(**constraints) @staticmethod def constrain_collections(constraints): constraints['txo_type'] = TXO_TYPES['collection'] def get_collections(self, **constraints): self.constrain_collections(constraints) return self.get_utxos(**constraints) def get_collection_count(self, **constraints): self.constrain_collections(constraints) return self.get_utxo_count(**constraints) async def release_all_outputs(self, account): await self.execute_fetchall( "UPDATE txo SET is_reserved = 0 WHERE" " is_reserved = 1 AND txo.address IN (" " SELECT address from account_address WHERE account = ?" " )", (account.public_key.address, ) ) def get_supports_summary(self, **constraints): return self.get_txos( txo_type=TXO_TYPES['support'], is_spent=False, is_my_output=True, include_is_my_input=True, no_tx=True, **constraints )