lbry-sdk/lbry/db/database.py
2020-04-11 17:27:41 -04:00

1004 lines
39 KiB
Python

import logging
import asyncio
import sqlite3
from binascii import hexlify
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Tuple, List, Union, Any, Iterable, Dict, Optional
from datetime import date
import sqlalchemy
from sqlalchemy import select, text, and_, union, func
from sqlalchemy.sql.expression import Select
try:
from sqlalchemy.dialects.postgresql import insert as pg_insert
except ImportError:
pg_insert = None
from lbry.wallet import PubKey
from lbry.wallet.transaction import Transaction, Output, OutputScript, TXRefImmutable
from lbry.wallet.constants import TXO_TYPES, CLAIM_TYPES
from .tables import metadata, Version, TX, TXI, TXO, PubkeyAddress, AccountAddress
log = logging.getLogger(__name__)
sqlite3.enable_callback_tracebacks(True)
def insert_or_ignore(conn, table):
if conn.dialect.name == 'sqlite':
return table.insert(prefixes=("OR IGNORE",))
elif conn.dialect.name == 'postgresql':
return pg_insert(table).on_conflict_do_nothing()
else:
raise RuntimeError(f'Unknown database dialect: {conn.dialect.name}.')
def insert_or_replace(conn, table, replace):
if conn.dialect.name == 'sqlite':
return table.insert(prefixes=("OR REPLACE",))
elif conn.dialect.name == 'postgresql':
insert = pg_insert(table)
return insert.on_conflict_do_update(
table.primary_key, set_={col: getattr(insert.excluded, col) for col in replace}
)
else:
raise RuntimeError(f'Unknown database dialect: {conn.dialect.name}.')
def dict_to_clause(t, d):
clauses = []
for key, value in d.items():
clauses.append(getattr(t.c, key) == value)
return and_(*clauses)
def constrain_single_or_list(constraints, column, value, convert=lambda x: x):
if value is not None:
if isinstance(value, list):
value = [convert(v) for v in value]
if len(value) == 1:
constraints[column] = value[0]
elif len(value) > 1:
constraints[f"{column}__in"] = value
else:
constraints[column] = convert(value)
return constraints
def constraints_to_sql(constraints, joiner=' AND ', prepend_key=''):
sql, values = [], {}
for key, constraint in constraints.items():
tag = '0'
if '#' in key:
key, tag = key[:key.index('#')], key[key.index('#')+1:]
col, op, key = key, '=', key.replace('.', '_')
if not key:
sql.append(constraint)
continue
if key.startswith('$$'):
col, key = col[2:], key[1:]
elif key.startswith('$'):
values[key] = constraint
continue
if key.endswith('__not'):
col, op = col[:-len('__not')], '!='
elif key.endswith('__is_null'):
col = col[:-len('__is_null')]
sql.append(f'{col} IS NULL')
continue
if key.endswith('__is_not_null'):
col = col[:-len('__is_not_null')]
sql.append(f'{col} IS NOT NULL')
continue
if key.endswith('__lt'):
col, op = col[:-len('__lt')], '<'
elif key.endswith('__lte'):
col, op = col[:-len('__lte')], '<='
elif key.endswith('__gt'):
col, op = col[:-len('__gt')], '>'
elif key.endswith('__gte'):
col, op = col[:-len('__gte')], '>='
elif key.endswith('__like'):
col, op = col[:-len('__like')], 'LIKE'
elif key.endswith('__not_like'):
col, op = col[:-len('__not_like')], 'NOT LIKE'
elif key.endswith('__in') or key.endswith('__not_in'):
if key.endswith('__in'):
col, op, one_val_op = col[:-len('__in')], 'IN', '='
else:
col, op, one_val_op = col[:-len('__not_in')], 'NOT IN', '!='
if constraint:
if isinstance(constraint, (list, set, tuple)):
if len(constraint) == 1:
values[f'{key}{tag}'] = next(iter(constraint))
sql.append(f'{col} {one_val_op} :{key}{tag}')
else:
keys = []
for i, val in enumerate(constraint):
keys.append(f':{key}{tag}_{i}')
values[f'{key}{tag}_{i}'] = val
sql.append(f'{col} {op} ({", ".join(keys)})')
elif isinstance(constraint, str):
sql.append(f'{col} {op} ({constraint})')
else:
raise ValueError(f"{col} requires a list, set or string as constraint value.")
continue
elif key.endswith('__any') or key.endswith('__or'):
where, subvalues = constraints_to_sql(constraint, ' OR ', key+tag+'_')
sql.append(f'({where})')
values.update(subvalues)
continue
if key.endswith('__and'):
where, subvalues = constraints_to_sql(constraint, ' AND ', key+tag+'_')
sql.append(f'({where})')
values.update(subvalues)
continue
sql.append(f'{col} {op} :{prepend_key}{key}{tag}')
values[prepend_key+key+tag] = constraint
return joiner.join(sql) if sql else '', values
def query(select, **constraints) -> Tuple[str, Dict[str, Any]]:
sql = [select]
limit = constraints.pop('limit', None)
offset = constraints.pop('offset', None)
order_by = constraints.pop('order_by', None)
group_by = constraints.pop('group_by', None)
accounts = constraints.pop('accounts', [])
if accounts:
constraints['account__in'] = [a.public_key.address for a in accounts]
where, values = constraints_to_sql(constraints)
if where:
sql.append('WHERE')
sql.append(where)
if group_by is not None:
sql.append(f'GROUP BY {group_by}')
if order_by:
sql.append('ORDER BY')
if isinstance(order_by, str):
sql.append(order_by)
elif isinstance(order_by, list):
sql.append(', '.join(order_by))
else:
raise ValueError("order_by must be string or list")
if limit is not None:
sql.append(f'LIMIT {limit}')
if offset is not None:
sql.append(f'OFFSET {offset}')
return ' '.join(sql), values
def in_account(accounts: Union[List[PubKey], PubKey]):
if isinstance(accounts, list):
if len(accounts) > 1:
return AccountAddress.c.account.in_({a.public_key.address for a in accounts})
accounts = accounts[0]
return AccountAddress.c.account == accounts.public_key.address
def query2(table, select: Select, **constraints) -> Select:
limit = constraints.pop('limit', None)
if limit is not None:
select = select.limit(limit)
offset = constraints.pop('offset', None)
if offset is not None:
select = select.offset(offset)
order_by = constraints.pop('order_by', None)
if order_by:
if isinstance(order_by, str):
select = select.order_by(text(order_by))
elif isinstance(order_by, list):
select = select.order_by(text(', '.join(order_by)))
else:
raise ValueError("order_by must be string or list")
group_by = constraints.pop('group_by', None)
if group_by is not None:
select = select.group_by(text(group_by))
accounts = constraints.pop('accounts', [])
if accounts:
select.append_whereclause(in_account(accounts))
if constraints:
select.append_whereclause(
constraints_to_clause2(table, constraints)
)
return select
def constraints_to_clause2(tables, constraints):
clause = []
for key, constraint in constraints.items():
if key.endswith('__not'):
col, op = key[:-len('__not')], '__ne__'
elif key.endswith('__is_null'):
col = key[:-len('__is_null')]
op = '__eq__'
constraint = None
elif key.endswith('__is_not_null'):
col = key[:-len('__is_not_null')]
op = '__ne__'
constraint = None
elif key.endswith('__lt'):
col, op = key[:-len('__lt')], '__lt__'
elif key.endswith('__lte'):
col, op = key[:-len('__lte')], '__le__'
elif key.endswith('__gt'):
col, op = key[:-len('__gt')], '__gt__'
elif key.endswith('__gte'):
col, op = key[:-len('__gte')], '__ge__'
elif key.endswith('__like'):
col, op = key[:-len('__like')], 'like'
elif key.endswith('__not_like'):
col, op = key[:-len('__not_like')], 'notlike'
elif key.endswith('__in') or key.endswith('__not_in'):
if key.endswith('__in'):
col, op, one_val_op = key[:-len('__in')], 'in_', '__eq__'
else:
col, op, one_val_op = key[:-len('__not_in')], 'notin_', '__ne__'
if isinstance(constraint, sqlalchemy.sql.expression.Select):
pass
elif constraint:
if isinstance(constraint, (list, set, tuple)):
if len(constraint) == 1:
op = one_val_op
constraint = next(iter(constraint))
elif isinstance(constraint, str):
constraint = text(constraint)
else:
raise ValueError(f"{col} requires a list, set or string as constraint value.")
else:
continue
else:
col, op = key, '__eq__'
attr = None
for table in tables:
attr = getattr(table.c, col, None)
if attr is not None:
clause.append(getattr(attr, op)(constraint))
break
if attr is None:
raise ValueError(f"Attribute '{col}' not found on tables: {', '.join([t.name for t in tables])}.")
return and_(*clause)
class Database:
SCHEMA_VERSION = "1.3"
MAX_QUERY_VARIABLES = 900
def __init__(self, url):
self.url = url
self.ledger = None
self.executor = ThreadPoolExecutor(max_workers=1)
self.engine = None
self.db: Optional[sqlalchemy.engine.Connection] = None
async def execute_fetchall(self, sql, params=None) -> List[dict]:
def foo():
if params:
result = self.db.execute(sql, params)
else:
result = self.db.execute(sql)
if result.returns_rows:
return [dict(r) for r in result.fetchall()]
else:
try:
self.db.commit()
except:
pass
return []
return await asyncio.get_event_loop().run_in_executor(self.executor, foo)
def sync_executemany(self, sql, parameters):
self.db.execute(sql, parameters)
async def executemany(self, sql: str, parameters: Iterable = None):
return await asyncio.get_event_loop().run_in_executor(
self.executor, self.sync_executemany, sql, parameters
)
def sync_open(self):
log.info("connecting to database: %s", self.url)
self.engine = sqlalchemy.create_engine(self.url)
self.db = self.engine.connect()
if self.SCHEMA_VERSION:
if self.engine.has_table('version'):
version = self.db.execute(Version.select().limit(1)).fetchone()
if version and version.version == self.SCHEMA_VERSION:
return
metadata.drop_all(self.engine)
metadata.create_all(self.engine)
self.db.execute(Version.insert().values(version=self.SCHEMA_VERSION))
else:
metadata.create_all(self.engine)
return self
async def open(self):
return await asyncio.get_event_loop().run_in_executor(
self.executor, self.sync_open
)
def sync_close(self):
if self.engine is not None:
self.engine.dispose()
self.engine = None
self.db = None
async def close(self):
await asyncio.get_event_loop().run_in_executor(
self.executor, self.sync_close
)
def sync_create(self, name):
engine = sqlalchemy.create_engine(self.url)
db = engine.connect()
db.execute('commit')
db.execute(f'create database {name}')
async def create(self, name):
await asyncio.get_event_loop().run_in_executor(
self.executor, self.sync_create, name
)
def sync_drop(self, name):
engine = sqlalchemy.create_engine(self.url)
db = engine.connect()
db.execute('commit')
db.execute(f'drop database if exists {name}')
async def drop(self, name):
await asyncio.get_event_loop().run_in_executor(
self.executor, self.sync_drop, name
)
def txo_to_row(self, tx, txo):
row = {
'tx_hash': tx.hash,
'txo_hash': txo.hash,
'address': txo.get_address(self.ledger),
'position': txo.position,
'amount': txo.amount,
'script': txo.script.source
}
if txo.is_claim:
if txo.can_decode_claim:
claim = txo.claim
row['txo_type'] = TXO_TYPES.get(claim.claim_type, TXO_TYPES['stream'])
if claim.is_repost:
row['reposted_claim_hash'] = claim.repost.reference.claim_hash
if claim.is_signed:
row['channel_hash'] = claim.signing_channel_hash
else:
row['txo_type'] = TXO_TYPES['stream']
elif txo.is_support:
row['txo_type'] = TXO_TYPES['support']
elif txo.purchase is not None:
row['txo_type'] = TXO_TYPES['purchase']
row['claim_id'] = txo.purchased_claim_id
row['claim_hash'] = txo.purchased_claim_hash
if txo.script.is_claim_involved:
row['claim_id'] = txo.claim_id
row['claim_hash'] = txo.claim_hash
row['claim_name'] = txo.claim_name
return row
def tx_to_row(self, tx):
row = {
'tx_hash': tx.hash,
'raw': tx.raw,
'height': tx.height,
'position': tx.position,
'is_verified': tx.is_verified,
'day': tx.get_ordinal_day(self.ledger),
}
txos = tx.outputs
if len(txos) >= 2 and txos[1].can_decode_purchase_data:
txos[0].purchase = txos[1]
row['purchased_claim_hash'] = txos[1].purchase_data.claim_hash
return row
async def insert_transaction(self, tx):
await self.execute_fetchall(TX.insert().values(self.tx_to_row(tx)))
def _transaction_io(self, conn: sqlite3.Connection, tx: Transaction, address, txhash):
conn.execute(
insert_or_replace(conn, TX, ('block_hash', 'height', 'position', 'is_verified', 'day')).values(
self.tx_to_row(tx)
)
)
is_my_input = False
for txi in tx.inputs:
if txi.txo_ref.txo is not None:
txo = txi.txo_ref.txo
if txo.has_address and txo.get_address(self.ledger) == address:
is_my_input = True
conn.execute(
insert_or_ignore(conn, TXI).values({
'tx_hash': tx.hash,
'txo_hash': txo.hash,
'address': address,
'position': txi.position
})
)
for txo in tx.outputs:
if txo.script.is_pay_pubkey_hash and (txo.pubkey_hash == txhash or is_my_input):
conn.execute(insert_or_ignore(conn, TXO).values(self.txo_to_row(tx, txo)))
elif txo.script.is_pay_script_hash:
# TODO: implement script hash payments
log.warning('Database.save_transaction_io: pay script hash is not implemented!')
def save_transaction_io(self, tx: Transaction, address, txhash, history):
return self.save_transaction_io_batch([tx], address, txhash, history)
def save_transaction_io_batch(self, txs: Iterable[Transaction], address, txhash, history):
history_count = history.count(':') // 2
def __many():
for tx in txs:
self._transaction_io(self.db, tx, address, txhash)
self.db.execute(
PubkeyAddress.update()
.values(history=history, used_times=history_count)
.where(PubkeyAddress.c.address == address)
)
return asyncio.get_event_loop().run_in_executor(self.executor, __many)
async def reserve_outputs(self, txos, is_reserved=True):
txo_hashes = [txo.hash for txo in txos]
if txo_hashes:
await self.execute_fetchall(
TXO.update().values(is_reserved=is_reserved).where(TXO.c.txo_hash.in_(txo_hashes))
)
async def release_outputs(self, txos):
await self.reserve_outputs(txos, is_reserved=False)
async def rewind_blockchain(self, above_height): # pylint: disable=no-self-use
# TODO:
# 1. delete transactions above_height
# 2. update address histories removing deleted TXs
return True
async def select_transactions(self, cols, accounts=None, **constraints):
s: Select = select(cols).select_from(TX)
if not {'tx_hash', 'tx_hash__in'}.intersection(constraints):
assert accounts, "'accounts' argument required when no 'tx_hash' constraint is present"
where = in_account(accounts)
tx_hashes = union(
select([TXO.c.tx_hash], where, TXO.join(AccountAddress, TXO.c.address == AccountAddress.c.address)),
select([TXI.c.tx_hash], where, TXI.join(AccountAddress, TXI.c.address == AccountAddress.c.address))
)
s.append_whereclause(TX.c.tx_hash.in_(tx_hashes))
return await self.execute_fetchall(query2([TX], s, **constraints))
TXO_NOT_MINE = Output(None, None, is_my_output=False)
async def get_transactions(self, wallet=None, **constraints):
include_is_spent = constraints.pop('include_is_spent', False)
include_is_my_input = constraints.pop('include_is_my_input', False)
include_is_my_output = constraints.pop('include_is_my_output', False)
tx_rows = await self.select_transactions(
[TX.c.tx_hash, TX.c.raw, TX.c.height, TX.c.position, TX.c.is_verified],
order_by=constraints.pop('order_by', ["height=0 DESC", "height DESC", "position DESC"]),
**constraints
)
if not tx_rows:
return []
txids, txs, txi_txoids = [], [], []
for row in tx_rows:
txids.append(row['tx_hash'])
txs.append(Transaction(
raw=row['raw'], height=row['height'], position=row['position'],
is_verified=bool(row['is_verified'])
))
for txi in txs[-1].inputs:
txi_txoids.append(txi.txo_ref.hash)
step = self.MAX_QUERY_VARIABLES
annotated_txos = {}
for offset in range(0, len(txids), step):
annotated_txos.update({
txo.id: txo for txo in
(await self.get_txos(
wallet=wallet,
tx_hash__in=txids[offset:offset+step], order_by='txo.tx_hash',
include_is_spent=include_is_spent,
include_is_my_input=include_is_my_input,
include_is_my_output=include_is_my_output,
))
})
referenced_txos = {}
for offset in range(0, len(txi_txoids), step):
referenced_txos.update({
txo.id: txo for txo in
(await self.get_txos(
wallet=wallet,
txo_hash__in=txi_txoids[offset:offset+step], order_by='txo.txo_hash',
include_is_my_output=include_is_my_output,
))
})
for tx in txs:
for txi in tx.inputs:
txo = referenced_txos.get(txi.txo_ref.id)
if txo:
txi.txo_ref = txo.ref
for txo in tx.outputs:
_txo = annotated_txos.get(txo.id)
if _txo:
txo.update_annotations(_txo)
else:
txo.update_annotations(self.TXO_NOT_MINE)
for tx in txs:
txos = tx.outputs
if len(txos) >= 2 and txos[1].can_decode_purchase_data:
txos[0].purchase = txos[1]
return txs
async def get_transaction_count(self, **constraints):
constraints.pop('wallet', None)
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = await self.select_transactions([func.count().label('total')], **constraints)
return count[0]['total'] or 0
async def get_transaction(self, **constraints):
txs = await self.get_transactions(limit=1, **constraints)
if txs:
return txs[0]
async def select_txos(
self, cols, accounts=None, is_my_input=None, is_my_output=True,
is_my_input_or_output=None, exclude_internal_transfers=False,
include_is_spent=False, include_is_my_input=False,
is_spent=None, spent=None, **constraints):
s: Select = select(cols)
if accounts:
#account_in_sql, values = constraints_to_sql({
# '$$account__in': [a.public_key.address for a in accounts]
#})
my_addresses = select([AccountAddress.c.address]).where(in_account(accounts))
#f"SELECT address FROM account_address WHERE {account_in_sql}"
if is_my_input_or_output:
include_is_my_input = True
s.append_whereclause(
TXO.c.address.in_(my_addresses) | (
(TXI.c.address != None) &
(TXI.c.address.in_(my_addresses))
)
)
#constraints['received_or_sent__or'] = {
# 'txo.address__in': my_addresses,
# 'sent__and': {
# 'txi.address__is_not_null': True,
# 'txi.address__in': my_addresses
# }
#}
else:
if is_my_output:
s.append_whereclause(TXO.c.address.in_(my_addresses))
#constraints['txo.address__in'] = my_addresses
elif is_my_output is False:
s.append_whereclause(TXO.c.address.notin_(my_addresses))
#constraints['txo.address__not_in'] = my_addresses
if is_my_input:
include_is_my_input = True
s.append_whereclause(
(TXI.c.address != None) &
(TXI.c.address.in_(my_addresses))
)
#constraints['txi.address__is_not_null'] = True
#constraints['txi.address__in'] = my_addresses
elif is_my_input is False:
include_is_my_input = True
s.append_whereclause(
(TXI.c.address == None) |
(TXI.c.address.notin_(my_addresses))
)
#constraints['is_my_input_false__or'] = {
# 'txi.address__is_null': True,
# 'txi.address__not_in': my_addresses
#}
if exclude_internal_transfers:
include_is_my_input = True
s.append_whereclause(
(TXO.c.txo_type != TXO_TYPES['other']) |
(TXI.c.address == None) |
(TXI.c.address.notin_(my_addresses))
)
#constraints['exclude_internal_payments__or'] = {
# 'txo.txo_type__not': TXO_TYPES['other'],
# 'txi.address__is_null': True,
# 'txi.address__not_in': my_addresses
#}
joins = TXO.join(TX)
if spent is None:
spent = TXI.alias('spent')
#sql = [f"SELECT {cols} FROM txo JOIN tx ON (tx.tx_hash=txo.tx_hash)"]
if is_spent:
s.append_whereclause(spent.c.txo_hash != None)
#constraints['spent.txo_hash__is_not_null'] = True
elif is_spent is False:
s.append_whereclause((spent.c.txo_hash == None) & (TXO.c.is_reserved == False))
#constraints['is_reserved'] = False
#constraints['spent.txo_hash__is_null'] = True
if include_is_spent or is_spent is not None:
joins = joins.join(spent, spent.c.txo_hash == TXO.c.txo_hash, isouter=True)
#sql.append("LEFT JOIN txi AS spent ON (spent.txo_hash=txo.txo_hash)")
if include_is_my_input:
joins = joins.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True)
#sql.append("LEFT JOIN txi ON (txi.position=0 AND txi.tx_hash=txo.tx_hash)")
s.append_from(joins)
return await self.execute_fetchall(query2([TXO, TX], s, **constraints))
async def get_txos(self, wallet=None, no_tx=False, **constraints):
include_is_spent = constraints.get('include_is_spent', False)
include_is_my_input = constraints.get('include_is_my_input', False)
include_is_my_output = constraints.pop('include_is_my_output', False)
include_received_tips = constraints.pop('include_received_tips', False)
select_columns = [
TX.c.tx_hash, TX.c.raw, TX.c.height, TX.c.position.label('tx_position'), TX.c.is_verified,
TXO.c.txo_type, TXO.c.position.label('txo_position'), TXO.c.amount, TXO.c.script
]
my_accounts = None
if wallet is not None:
my_accounts = select([AccountAddress.c.address], in_account(wallet.accounts))
if include_is_my_output and my_accounts is not None:
if constraints.get('is_my_output', None) in (True, False):
select_columns.append(text(f"{1 if constraints['is_my_output'] else 0} AS is_my_output"))
else:
select_columns.append(TXO.c.address.in_(my_accounts).label('is_my_output'))
if include_is_my_input and my_accounts is not None:
if constraints.get('is_my_input', None) in (True, False):
select_columns.append(text(f"{1 if constraints['is_my_input'] else 0} AS is_my_input"))
else:
select_columns.append((
(TXI.c.address != None) &
(TXI.c.address.in_(my_accounts))
).label('is_my_input'))
spent = TXI.alias('spent')
if include_is_spent:
select_columns.append((spent.c.txo_hash != None).label('is_spent'))
if include_received_tips:
support = TXO.alias('support')
select_columns.append(select(
[func.coalesce(func.sum(support.c.amount), 0)],
(support.c.claim_hash == TXO.c.claim_hash) &
(support.c.txo_type == TXO_TYPES['support']) &
(support.c.address.in_(my_accounts)) &
(support.c.txo_hash.notin_(select([TXI.c.txo_hash]))),
support
).label('received_tips'))
#select_columns.append(f"""(
#SELECT COALESCE(SUM(support.amount), 0) FROM txo AS support WHERE
# support.claim_hash = txo.claim_hash AND
# support.txo_type = {TXO_TYPES['support']} AND
# support.address IN (SELECT address FROM account_address WHERE {my_accounts_sql}) AND
# support.txo_hash NOT IN (SELECT txo_hash FROM txi)
#) AS received_tips""")
if 'order_by' not in constraints or constraints['order_by'] == 'height':
constraints['order_by'] = [
"tx.height=0 DESC", "tx.height DESC", "tx.position DESC", "txo.position"
]
elif constraints.get('order_by', None) == 'none':
del constraints['order_by']
rows = await self.select_txos(select_columns, spent=spent, **constraints)
txos = []
txs = {}
for row in rows:
if no_tx:
txo = Output(
amount=row['amount'],
script=OutputScript(row['script']),
tx_ref=TXRefImmutable.from_hash(row['tx_hash'], row['height']),
position=row['txo_position']
)
else:
if row['tx_hash'] not in txs:
txs[row['tx_hash']] = Transaction(
row['raw'], height=row['height'], position=row['tx_position'],
is_verified=bool(row['is_verified'])
)
txo = txs[row['tx_hash']].outputs[row['txo_position']]
if include_is_spent:
txo.is_spent = bool(row['is_spent'])
if include_is_my_input:
txo.is_my_input = bool(row['is_my_input'])
if include_is_my_output:
txo.is_my_output = bool(row['is_my_output'])
if include_is_my_input and include_is_my_output:
if txo.is_my_input and txo.is_my_output and row['txo_type'] == TXO_TYPES['other']:
txo.is_internal_transfer = True
else:
txo.is_internal_transfer = False
if include_received_tips:
txo.received_tips = row['received_tips']
txos.append(txo)
channel_hashes = set()
for txo in txos:
if txo.is_claim and txo.can_decode_claim:
if txo.claim.is_signed:
channel_hashes.add(txo.claim.signing_channel_hash)
if txo.claim.is_channel and wallet:
for account in wallet.accounts:
private_key = account.get_channel_private_key(
txo.claim.channel.public_key_bytes
)
if private_key:
txo.private_key = private_key
break
if channel_hashes:
channels = {
txo.claim_hash: txo for txo in
(await self.get_channels(
wallet=wallet,
claim_hash__in=channel_hashes,
))
}
for txo in txos:
if txo.is_claim and txo.can_decode_claim:
txo.channel = channels.get(txo.claim.signing_channel_hash, None)
return txos
@staticmethod
def _clean_txo_constraints_for_aggregation(constraints):
constraints.pop('include_is_spent', None)
constraints.pop('include_is_my_input', None)
constraints.pop('include_is_my_output', None)
constraints.pop('include_received_tips', None)
constraints.pop('wallet', None)
constraints.pop('resolve', None)
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
async def get_txo_count(self, **constraints):
self._clean_txo_constraints_for_aggregation(constraints)
count = await self.select_txos([func.count().label('total')], **constraints)
return count[0]['total'] or 0
async def get_txo_sum(self, **constraints):
self._clean_txo_constraints_for_aggregation(constraints)
result = await self.select_txos([func.sum(TXO.c.amount).label('total')], **constraints)
return result[0]['total'] or 0
async def get_txo_plot(self, start_day=None, days_back=0, end_day=None, days_after=None, **constraints):
self._clean_txo_constraints_for_aggregation(constraints)
if start_day is None:
current_ordinal = self.ledger.headers.estimated_date(self.ledger.headers.height).toordinal()
constraints['day__gte'] = current_ordinal - days_back
else:
constraints['day__gte'] = date.fromisoformat(start_day).toordinal()
if end_day is not None:
constraints['day__lte'] = date.fromisoformat(end_day).toordinal()
elif days_after is not None:
constraints['day__lte'] = constraints['day__gte'] + days_after
plot = await self.select_txos(
[TX.c.day, func.sum(TXO.c.amount).label('total')],
group_by='day', order_by='day', **constraints
)
for row in plot:
row['day'] = date.fromordinal(row['day'])
return plot
def get_utxos(self, **constraints):
return self.get_txos(is_spent=False, **constraints)
def get_utxo_count(self, **constraints):
return self.get_txo_count(is_spent=False, **constraints)
async def get_balance(self, wallet=None, accounts=None, **constraints):
assert wallet or accounts, \
"'wallet' or 'accounts' constraints required to calculate balance"
constraints['accounts'] = accounts or wallet.accounts
balance = await self.select_txos(
[func.sum(TXO.c.amount).label('total')], is_spent=False, **constraints
)
return balance[0]['total'] or 0
async def select_addresses(self, cols, **constraints):
return await self.execute_fetchall(query2(
[AccountAddress, PubkeyAddress],
select(cols).select_from(PubkeyAddress.join(AccountAddress)),
**constraints
))
async def get_addresses(self, cols=None, **constraints):
if cols is None:
cols = (
PubkeyAddress.c.address,
PubkeyAddress.c.history,
PubkeyAddress.c.used_times,
AccountAddress.c.account,
AccountAddress.c.chain,
AccountAddress.c.pubkey,
AccountAddress.c.chain_code,
AccountAddress.c.n,
AccountAddress.c.depth
)
addresses = await self.select_addresses(cols, **constraints)
if AccountAddress.c.pubkey in cols:
for address in addresses:
address['pubkey'] = PubKey(
self.ledger, bytes(address.pop('pubkey')), bytes(address.pop('chain_code')),
address.pop('n'), address.pop('depth')
)
return addresses
async def get_address_count(self, cols=None, **constraints):
count = await self.select_addresses([func.count().label('total')], **constraints)
return count[0]['total'] or 0
async def get_address(self, **constraints):
addresses = await self.get_addresses(limit=1, **constraints)
if addresses:
return addresses[0]
async def add_keys(self, account, chain, pubkeys):
await self.execute_fetchall(
insert_or_ignore(self.db, PubkeyAddress).values([{
'address': k.address
} for k in pubkeys])
)
await self.execute_fetchall(
insert_or_ignore(self.db, AccountAddress).values([{
'account': account.id,
'address': k.address,
'chain': chain,
'pubkey': k.pubkey_bytes,
'chain_code': k.chain_code,
'n': k.n,
'depth': k.depth
} for k in pubkeys])
)
async def _set_address_history(self, address, history):
await self.execute_fetchall(
PubkeyAddress.update()
.values(history=history, used_times=history.count(':')//2)
.where(PubkeyAddress.c.address == address)
)
async def set_address_history(self, address, history):
await self._set_address_history(address, history)
@staticmethod
def constrain_purchases(constraints):
accounts = constraints.pop('accounts', None)
assert accounts, "'accounts' argument required to find purchases"
if not {'purchased_claim_hash', 'purchased_claim_hash__in'}.intersection(constraints):
constraints['purchased_claim_hash__is_not_null'] = True
constraints['tx_hash__in'] = select(
[TXI.c.tx_hash], in_account(accounts),
TXI.join(AccountAddress, TXI.c.address == AccountAddress.c.address)
)
async def get_purchases(self, **constraints):
self.constrain_purchases(constraints)
return [tx.outputs[0] for tx in await self.get_transactions(**constraints)]
def get_purchase_count(self, **constraints):
self.constrain_purchases(constraints)
return self.get_transaction_count(**constraints)
@staticmethod
def constrain_claims(constraints):
if {'txo_type', 'txo_type__in'}.intersection(constraints):
return
claim_types = constraints.pop('claim_type', None)
if claim_types:
constrain_single_or_list(
constraints, 'txo_type', claim_types, lambda x: TXO_TYPES[x]
)
else:
constraints['txo_type__in'] = CLAIM_TYPES
async def get_claims(self, **constraints) -> List[Output]:
self.constrain_claims(constraints)
return await self.get_utxos(**constraints)
def get_claim_count(self, **constraints):
self.constrain_claims(constraints)
return self.get_utxo_count(**constraints)
@staticmethod
def constrain_streams(constraints):
constraints['txo_type'] = TXO_TYPES['stream']
def get_streams(self, **constraints):
self.constrain_streams(constraints)
return self.get_claims(**constraints)
def get_stream_count(self, **constraints):
self.constrain_streams(constraints)
return self.get_claim_count(**constraints)
@staticmethod
def constrain_channels(constraints):
constraints['txo_type'] = TXO_TYPES['channel']
def get_channels(self, **constraints):
self.constrain_channels(constraints)
return self.get_claims(**constraints)
def get_channel_count(self, **constraints):
self.constrain_channels(constraints)
return self.get_claim_count(**constraints)
@staticmethod
def constrain_supports(constraints):
constraints['txo_type'] = TXO_TYPES['support']
def get_supports(self, **constraints):
self.constrain_supports(constraints)
return self.get_utxos(**constraints)
def get_support_count(self, **constraints):
self.constrain_supports(constraints)
return self.get_utxo_count(**constraints)
@staticmethod
def constrain_collections(constraints):
constraints['txo_type'] = TXO_TYPES['collection']
def get_collections(self, **constraints):
self.constrain_collections(constraints)
return self.get_utxos(**constraints)
def get_collection_count(self, **constraints):
self.constrain_collections(constraints)
return self.get_utxo_count(**constraints)
async def release_all_outputs(self, account):
await self.execute_fetchall(
"UPDATE txo SET is_reserved = 0 WHERE"
" is_reserved = 1 AND txo.address IN ("
" SELECT address from account_address WHERE account = ?"
" )", (account.public_key.address, )
)
def get_supports_summary(self, **constraints):
return self.get_txos(
txo_type=TXO_TYPES['support'],
is_spent=False, is_my_output=True,
include_is_my_input=True,
no_tx=True,
**constraints
)