1161 lines
46 KiB
Python
1161 lines
46 KiB
Python
import os
|
|
import logging
|
|
import asyncio
|
|
import sqlite3
|
|
import platform
|
|
from binascii import hexlify
|
|
from dataclasses import dataclass
|
|
from contextvars import ContextVar
|
|
from concurrent.futures.thread import ThreadPoolExecutor
|
|
from concurrent.futures.process import ProcessPoolExecutor
|
|
from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable, Dict, Optional
|
|
from datetime import date
|
|
from prometheus_client import Gauge, Counter, Histogram
|
|
from lbry.utils import LockWithMetrics
|
|
|
|
from .bip32 import PubKey
|
|
from .transaction import Transaction, Output, OutputScript, TXRefImmutable
|
|
from .constants import TXO_TYPES, CLAIM_TYPES
|
|
from .util import date_to_julian_day
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
sqlite3.enable_callback_tracebacks(True)
|
|
|
|
HISTOGRAM_BUCKETS = (
|
|
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 60.0, float('inf')
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class ReaderProcessState:
|
|
cursor: sqlite3.Cursor
|
|
|
|
|
|
reader_context: Optional[ContextVar[ReaderProcessState]] = ContextVar('reader_context')
|
|
|
|
|
|
def initializer(path):
|
|
db = sqlite3.connect(path)
|
|
db.row_factory = dict_row_factory
|
|
db.executescript("pragma journal_mode=WAL;")
|
|
reader = ReaderProcessState(db.cursor())
|
|
reader_context.set(reader)
|
|
|
|
|
|
def run_read_only_fetchall(sql, params):
|
|
cursor = reader_context.get().cursor
|
|
try:
|
|
return cursor.execute(sql, params).fetchall()
|
|
except (Exception, OSError) as e:
|
|
log.exception('Error running transaction:', exc_info=e)
|
|
raise
|
|
|
|
|
|
def run_read_only_fetchone(sql, params):
|
|
cursor = reader_context.get().cursor
|
|
try:
|
|
return cursor.execute(sql, params).fetchone()
|
|
except (Exception, OSError) as e:
|
|
log.exception('Error running transaction:', exc_info=e)
|
|
raise
|
|
|
|
|
|
if platform.system() == 'Windows' or 'ANDROID_ARGUMENT' in os.environ:
|
|
ReaderExecutorClass = ThreadPoolExecutor
|
|
else:
|
|
ReaderExecutorClass = ProcessPoolExecutor
|
|
|
|
|
|
class AIOSQLite:
|
|
reader_executor: ReaderExecutorClass
|
|
|
|
waiting_writes_metric = Gauge(
|
|
"waiting_writes_count", "Number of waiting db writes", namespace="daemon_database"
|
|
)
|
|
waiting_reads_metric = Gauge(
|
|
"waiting_reads_count", "Number of waiting db writes", namespace="daemon_database"
|
|
)
|
|
write_count_metric = Counter(
|
|
"write_count", "Number of database writes", namespace="daemon_database"
|
|
)
|
|
read_count_metric = Counter(
|
|
"read_count", "Number of database reads", namespace="daemon_database"
|
|
)
|
|
acquire_write_lock_metric = Histogram(
|
|
f'write_lock_acquired', 'Time to acquire the write lock', namespace="daemon_database", buckets=HISTOGRAM_BUCKETS
|
|
)
|
|
held_write_lock_metric = Histogram(
|
|
f'write_lock_held', 'Length of time the write lock is held for', namespace="daemon_database",
|
|
buckets=HISTOGRAM_BUCKETS
|
|
)
|
|
|
|
def __init__(self):
|
|
# has to be single threaded as there is no mapping of thread:connection
|
|
self.writer_executor = ThreadPoolExecutor(max_workers=1)
|
|
self.writer_connection: Optional[sqlite3.Connection] = None
|
|
self._closing = False
|
|
self.query_count = 0
|
|
self.write_lock = LockWithMetrics(self.acquire_write_lock_metric, self.held_write_lock_metric)
|
|
self.writers = 0
|
|
self.read_ready = asyncio.Event()
|
|
self.urgent_read_done = asyncio.Event()
|
|
|
|
@classmethod
|
|
async def connect(cls, path: Union[bytes, str], *args, **kwargs):
|
|
sqlite3.enable_callback_tracebacks(True)
|
|
db = cls()
|
|
|
|
def _connect_writer():
|
|
db.writer_connection = sqlite3.connect(path, *args, **kwargs)
|
|
|
|
readers = max(os.cpu_count() - 2, 2)
|
|
db.reader_executor = ReaderExecutorClass(
|
|
max_workers=readers, initializer=initializer, initargs=(path, )
|
|
)
|
|
await asyncio.get_event_loop().run_in_executor(db.writer_executor, _connect_writer)
|
|
db.read_ready.set()
|
|
db.urgent_read_done.set()
|
|
return db
|
|
|
|
async def close(self):
|
|
if self._closing:
|
|
return
|
|
self._closing = True
|
|
await asyncio.get_event_loop().run_in_executor(self.writer_executor, self.writer_connection.close)
|
|
self.writer_executor.shutdown(wait=True)
|
|
self.reader_executor.shutdown(wait=True)
|
|
self.read_ready.clear()
|
|
self.writer_connection = None
|
|
|
|
def executemany(self, sql: str, params: Iterable):
|
|
params = params if params is not None else []
|
|
# this fetchall is needed to prevent SQLITE_MISUSE
|
|
return self.run(lambda conn: conn.executemany(sql, params).fetchall())
|
|
|
|
def executescript(self, script: str) -> Awaitable:
|
|
return self.run(lambda conn: conn.executescript(script))
|
|
|
|
async def _execute_fetch(self, sql: str, parameters: Iterable = None,
|
|
read_only=False, fetch_all: bool = False) -> List[dict]:
|
|
read_only_fn = run_read_only_fetchall if fetch_all else run_read_only_fetchone
|
|
parameters = parameters if parameters is not None else []
|
|
still_waiting = False
|
|
urgent_read = False
|
|
if read_only:
|
|
self.waiting_reads_metric.inc()
|
|
self.read_count_metric.inc()
|
|
try:
|
|
while self.writers: # more writes can come in while we are waiting for the first
|
|
if not urgent_read and still_waiting and self.urgent_read_done.is_set():
|
|
# throttle the writes if they pile up
|
|
self.urgent_read_done.clear()
|
|
urgent_read = True
|
|
# wait until the running writes have finished
|
|
await self.read_ready.wait()
|
|
still_waiting = True
|
|
return await asyncio.get_event_loop().run_in_executor(
|
|
self.reader_executor, read_only_fn, sql, parameters
|
|
)
|
|
finally:
|
|
if urgent_read:
|
|
# unthrottle the writers if they had to be throttled
|
|
self.urgent_read_done.set()
|
|
self.waiting_reads_metric.dec()
|
|
if fetch_all:
|
|
return await self.run(lambda conn: conn.execute(sql, parameters).fetchall())
|
|
return await self.run(lambda conn: conn.execute(sql, parameters).fetchone())
|
|
|
|
async def execute_fetchall(self, sql: str, parameters: Iterable = None,
|
|
read_only=False) -> List[dict]:
|
|
return await self._execute_fetch(sql, parameters, read_only, fetch_all=True)
|
|
|
|
async def execute_fetchone(self, sql: str, parameters: Iterable = None,
|
|
read_only=False) -> List[dict]:
|
|
return await self._execute_fetch(sql, parameters, read_only, fetch_all=False)
|
|
|
|
def execute(self, sql: str, parameters: Iterable = None) -> Awaitable[sqlite3.Cursor]:
|
|
parameters = parameters if parameters is not None else []
|
|
return self.run(lambda conn: conn.execute(sql, parameters))
|
|
|
|
async def run(self, fun, *args, **kwargs):
|
|
self.write_count_metric.inc()
|
|
self.waiting_writes_metric.inc()
|
|
# it's possible many writes are coming in one after the other, these can
|
|
# block reader calls for a long time
|
|
# if the reader waits for the writers to finish and then has to wait for
|
|
# yet more, it will clear the urgent_read_done event to block more writers
|
|
# piling on
|
|
try:
|
|
await self.urgent_read_done.wait()
|
|
except Exception as e:
|
|
self.waiting_writes_metric.dec()
|
|
raise e
|
|
self.writers += 1
|
|
# block readers
|
|
self.read_ready.clear()
|
|
try:
|
|
async with self.write_lock:
|
|
return await asyncio.get_event_loop().run_in_executor(
|
|
self.writer_executor, lambda: self.__run_transaction(fun, *args, **kwargs)
|
|
)
|
|
finally:
|
|
self.writers -= 1
|
|
self.waiting_writes_metric.dec()
|
|
if not self.writers:
|
|
# unblock the readers once the last enqueued writer finishes
|
|
self.read_ready.set()
|
|
|
|
def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs):
|
|
self.writer_connection.execute('begin')
|
|
try:
|
|
self.query_count += 1
|
|
result = fun(self.writer_connection, *args, **kwargs) # type: ignore
|
|
self.writer_connection.commit()
|
|
return result
|
|
except (Exception, OSError) as e:
|
|
log.exception('Error running transaction:', exc_info=e)
|
|
self.writer_connection.rollback()
|
|
log.warning("rolled back")
|
|
raise
|
|
|
|
async def run_with_foreign_keys_disabled(self, fun, *args, **kwargs):
|
|
self.write_count_metric.inc()
|
|
self.waiting_writes_metric.inc()
|
|
try:
|
|
await self.urgent_read_done.wait()
|
|
except Exception as e:
|
|
self.waiting_writes_metric.dec()
|
|
raise e
|
|
self.writers += 1
|
|
self.read_ready.clear()
|
|
try:
|
|
async with self.write_lock:
|
|
return await asyncio.get_event_loop().run_in_executor(
|
|
self.writer_executor, self.__run_transaction_with_foreign_keys_disabled, fun, args, kwargs
|
|
)
|
|
finally:
|
|
self.writers -= 1
|
|
self.waiting_writes_metric.dec()
|
|
if not self.writers:
|
|
self.read_ready.set()
|
|
|
|
def __run_transaction_with_foreign_keys_disabled(self,
|
|
fun: Callable[[sqlite3.Connection, Any, Any], Any],
|
|
args, kwargs):
|
|
foreign_keys_enabled, = self.writer_connection.execute("pragma foreign_keys").fetchone()
|
|
if not foreign_keys_enabled:
|
|
raise sqlite3.IntegrityError("foreign keys are disabled, use `AIOSQLite.run` instead")
|
|
try:
|
|
self.writer_connection.execute('pragma foreign_keys=off').fetchone()
|
|
return self.__run_transaction(fun, *args, **kwargs)
|
|
finally:
|
|
self.writer_connection.execute('pragma foreign_keys=on').fetchone()
|
|
|
|
|
|
def constraints_to_sql(constraints, joiner=' AND ', prepend_key=''):
|
|
sql, values = [], {}
|
|
for key, constraint in constraints.items():
|
|
tag = '0'
|
|
if '#' in key:
|
|
key, tag = key[:key.index('#')], key[key.index('#')+1:]
|
|
col, op, key = key, '=', key.replace('.', '_')
|
|
if not key:
|
|
sql.append(constraint)
|
|
continue
|
|
if key.startswith('$$'):
|
|
col, key = col[2:], key[1:]
|
|
elif key.startswith('$'):
|
|
values[key] = constraint
|
|
continue
|
|
if key.endswith('__not'):
|
|
col, op = col[:-len('__not')], '!='
|
|
elif key.endswith('__is_null'):
|
|
col = col[:-len('__is_null')]
|
|
sql.append(f'{col} IS NULL')
|
|
continue
|
|
if key.endswith('__is_not_null'):
|
|
col = col[:-len('__is_not_null')]
|
|
sql.append(f'{col} IS NOT NULL')
|
|
continue
|
|
if key.endswith('__lt'):
|
|
col, op = col[:-len('__lt')], '<'
|
|
elif key.endswith('__lte'):
|
|
col, op = col[:-len('__lte')], '<='
|
|
elif key.endswith('__gt'):
|
|
col, op = col[:-len('__gt')], '>'
|
|
elif key.endswith('__gte'):
|
|
col, op = col[:-len('__gte')], '>='
|
|
elif key.endswith('__like'):
|
|
col, op = col[:-len('__like')], 'LIKE'
|
|
elif key.endswith('__not_like'):
|
|
col, op = col[:-len('__not_like')], 'NOT LIKE'
|
|
elif key.endswith('__in') or key.endswith('__not_in'):
|
|
if key.endswith('__in'):
|
|
col, op, one_val_op = col[:-len('__in')], 'IN', '='
|
|
else:
|
|
col, op, one_val_op = col[:-len('__not_in')], 'NOT IN', '!='
|
|
if constraint:
|
|
if isinstance(constraint, (list, set, tuple)):
|
|
if len(constraint) == 1:
|
|
values[f'{key}{tag}'] = next(iter(constraint))
|
|
sql.append(f'{col} {one_val_op} :{key}{tag}')
|
|
else:
|
|
keys = []
|
|
for i, val in enumerate(constraint):
|
|
keys.append(f':{key}{tag}_{i}')
|
|
values[f'{key}{tag}_{i}'] = val
|
|
sql.append(f'{col} {op} ({", ".join(keys)})')
|
|
elif isinstance(constraint, str):
|
|
sql.append(f'{col} {op} ({constraint})')
|
|
else:
|
|
raise ValueError(f"{col} requires a list, set or string as constraint value.")
|
|
continue
|
|
elif key.endswith('__any') or key.endswith('__or'):
|
|
where, subvalues = constraints_to_sql(constraint, ' OR ', key+tag+'_')
|
|
sql.append(f'({where})')
|
|
values.update(subvalues)
|
|
continue
|
|
if key.endswith('__and'):
|
|
where, subvalues = constraints_to_sql(constraint, ' AND ', key+tag+'_')
|
|
sql.append(f'({where})')
|
|
values.update(subvalues)
|
|
continue
|
|
sql.append(f'{col} {op} :{prepend_key}{key}{tag}')
|
|
values[prepend_key+key+tag] = constraint
|
|
return joiner.join(sql) if sql else '', values
|
|
|
|
|
|
def query(select, **constraints) -> Tuple[str, Dict[str, Any]]:
|
|
sql = [select]
|
|
limit = constraints.pop('limit', None)
|
|
offset = constraints.pop('offset', None)
|
|
order_by = constraints.pop('order_by', None)
|
|
group_by = constraints.pop('group_by', None)
|
|
|
|
accounts = constraints.pop('accounts', [])
|
|
if accounts:
|
|
constraints['account__in'] = [a.public_key.address for a in accounts]
|
|
|
|
where, values = constraints_to_sql(constraints)
|
|
if where:
|
|
sql.append('WHERE')
|
|
sql.append(where)
|
|
|
|
if group_by is not None:
|
|
sql.append(f'GROUP BY {group_by}')
|
|
|
|
if order_by:
|
|
sql.append('ORDER BY')
|
|
if isinstance(order_by, str):
|
|
sql.append(order_by)
|
|
elif isinstance(order_by, list):
|
|
sql.append(', '.join(order_by))
|
|
else:
|
|
raise ValueError("order_by must be string or list")
|
|
|
|
if limit is not None:
|
|
sql.append(f'LIMIT {limit}')
|
|
|
|
if offset is not None:
|
|
sql.append(f'OFFSET {offset}')
|
|
|
|
return ' '.join(sql), values
|
|
|
|
|
|
def interpolate(sql, values):
|
|
for k in sorted(values.keys(), reverse=True):
|
|
value = values[k]
|
|
if isinstance(value, bytes):
|
|
value = f"X'{hexlify(value).decode()}'"
|
|
elif isinstance(value, str):
|
|
value = f"'{value}'"
|
|
else:
|
|
value = str(value)
|
|
sql = sql.replace(f":{k}", value)
|
|
return sql
|
|
|
|
|
|
def constrain_single_or_list(constraints, column, value, convert=lambda x: x):
|
|
if value is not None:
|
|
if isinstance(value, list):
|
|
value = [convert(v) for v in value]
|
|
if len(value) == 1:
|
|
constraints[column] = value[0]
|
|
elif len(value) > 1:
|
|
constraints[f"{column}__in"] = value
|
|
else:
|
|
constraints[column] = convert(value)
|
|
return constraints
|
|
|
|
|
|
class SQLiteMixin:
|
|
|
|
SCHEMA_VERSION: Optional[str] = None
|
|
CREATE_TABLES_QUERY: str
|
|
MAX_QUERY_VARIABLES = 900
|
|
|
|
CREATE_VERSION_TABLE = """
|
|
create table if not exists version (
|
|
version text
|
|
);
|
|
"""
|
|
|
|
def __init__(self, path):
|
|
self._db_path = path
|
|
self.db: AIOSQLite = None
|
|
self.ledger = None
|
|
|
|
async def open(self):
|
|
log.info("connecting to database: %s", self._db_path)
|
|
self.db = await AIOSQLite.connect(self._db_path, isolation_level=None)
|
|
if self.SCHEMA_VERSION:
|
|
tables = [t[0] for t in await self.db.execute_fetchall(
|
|
"SELECT name FROM sqlite_master WHERE type='table';"
|
|
)]
|
|
if tables:
|
|
if 'version' in tables:
|
|
version = await self.db.execute_fetchone("SELECT version FROM version LIMIT 1;")
|
|
if version == (self.SCHEMA_VERSION,):
|
|
return
|
|
await self.db.executescript('\n'.join(
|
|
f"DROP TABLE {table};" for table in tables
|
|
))
|
|
await self.db.execute(self.CREATE_VERSION_TABLE)
|
|
await self.db.execute("INSERT INTO version VALUES (?)", (self.SCHEMA_VERSION,))
|
|
await self.db.executescript(self.CREATE_TABLES_QUERY)
|
|
|
|
async def close(self):
|
|
await self.db.close()
|
|
|
|
@staticmethod
|
|
def _insert_sql(table: str, data: dict, ignore_duplicate: bool = False,
|
|
replace: bool = False) -> Tuple[str, List]:
|
|
columns, values = [], []
|
|
for column, value in data.items():
|
|
columns.append(column)
|
|
values.append(value)
|
|
policy = ""
|
|
if ignore_duplicate:
|
|
policy = " OR IGNORE"
|
|
if replace:
|
|
policy = " OR REPLACE"
|
|
sql = "INSERT{} INTO {} ({}) VALUES ({})".format(
|
|
policy, table, ', '.join(columns), ', '.join(['?'] * len(values))
|
|
)
|
|
return sql, values
|
|
|
|
@staticmethod
|
|
def _update_sql(table: str, data: dict, where: str,
|
|
constraints: Union[list, tuple]) -> Tuple[str, list]:
|
|
columns, values = [], []
|
|
for column, value in data.items():
|
|
columns.append(f"{column} = ?")
|
|
values.append(value)
|
|
values.extend(constraints)
|
|
sql = "UPDATE {} SET {} WHERE {}".format(
|
|
table, ', '.join(columns), where
|
|
)
|
|
return sql, values
|
|
|
|
|
|
def dict_row_factory(cursor, row):
|
|
d = {}
|
|
for idx, col in enumerate(cursor.description):
|
|
d[col[0]] = row[idx]
|
|
return d
|
|
|
|
|
|
class Database(SQLiteMixin):
|
|
|
|
SCHEMA_VERSION = "1.3"
|
|
|
|
PRAGMAS = """
|
|
pragma journal_mode=WAL;
|
|
"""
|
|
|
|
CREATE_ACCOUNT_TABLE = """
|
|
create table if not exists account_address (
|
|
account text not null,
|
|
address text not null,
|
|
chain integer not null,
|
|
pubkey blob not null,
|
|
chain_code blob not null,
|
|
n integer not null,
|
|
depth integer not null,
|
|
primary key (account, address)
|
|
);
|
|
create index if not exists address_account_idx on account_address (address, account);
|
|
"""
|
|
|
|
CREATE_PUBKEY_ADDRESS_TABLE = """
|
|
create table if not exists pubkey_address (
|
|
address text primary key,
|
|
history text,
|
|
used_times integer not null default 0
|
|
);
|
|
"""
|
|
|
|
CREATE_TX_TABLE = """
|
|
create table if not exists tx (
|
|
txid text primary key,
|
|
raw blob not null,
|
|
height integer not null,
|
|
position integer not null,
|
|
is_verified boolean not null default 0,
|
|
purchased_claim_id text,
|
|
day integer
|
|
);
|
|
create index if not exists tx_purchased_claim_id_idx on tx (purchased_claim_id);
|
|
"""
|
|
|
|
CREATE_TXO_TABLE = """
|
|
create table if not exists txo (
|
|
txid text references tx,
|
|
txoid text primary key,
|
|
address text references pubkey_address,
|
|
position integer not null,
|
|
amount integer not null,
|
|
script blob not null,
|
|
is_reserved boolean not null default 0,
|
|
|
|
txo_type integer not null default 0,
|
|
claim_id text,
|
|
claim_name text,
|
|
|
|
channel_id text,
|
|
reposted_claim_id text
|
|
);
|
|
create index if not exists txo_txid_idx on txo (txid);
|
|
create index if not exists txo_address_idx on txo (address);
|
|
create index if not exists txo_claim_id_idx on txo (claim_id, txo_type);
|
|
create index if not exists txo_claim_name_idx on txo (claim_name);
|
|
create index if not exists txo_txo_type_idx on txo (txo_type);
|
|
create index if not exists txo_channel_id_idx on txo (channel_id);
|
|
create index if not exists txo_reposted_claim_idx on txo (reposted_claim_id);
|
|
"""
|
|
|
|
CREATE_TXI_TABLE = """
|
|
create table if not exists txi (
|
|
txid text references tx,
|
|
txoid text references txo primary key,
|
|
address text references pubkey_address,
|
|
position integer not null
|
|
);
|
|
create index if not exists txi_address_idx on txi (address);
|
|
create index if not exists first_input_idx on txi (txid, address) where position=0;
|
|
"""
|
|
|
|
CREATE_TABLES_QUERY = (
|
|
PRAGMAS +
|
|
CREATE_ACCOUNT_TABLE +
|
|
CREATE_PUBKEY_ADDRESS_TABLE +
|
|
CREATE_TX_TABLE +
|
|
CREATE_TXO_TABLE +
|
|
CREATE_TXI_TABLE
|
|
)
|
|
|
|
async def open(self):
|
|
await super().open()
|
|
self.db.writer_connection.row_factory = dict_row_factory
|
|
|
|
def txo_to_row(self, tx, txo):
|
|
row = {
|
|
'txid': tx.id,
|
|
'txoid': txo.id,
|
|
'address': txo.get_address(self.ledger),
|
|
'position': txo.position,
|
|
'amount': txo.amount,
|
|
'script': sqlite3.Binary(txo.script.source)
|
|
}
|
|
if txo.is_claim:
|
|
if txo.can_decode_claim:
|
|
claim = txo.claim
|
|
row['txo_type'] = TXO_TYPES.get(claim.claim_type, TXO_TYPES['stream'])
|
|
if claim.is_repost:
|
|
row['reposted_claim_id'] = claim.repost.reference.claim_id
|
|
if claim.is_signed:
|
|
row['channel_id'] = claim.signing_channel_id
|
|
else:
|
|
row['txo_type'] = TXO_TYPES['stream']
|
|
elif txo.is_support:
|
|
row['txo_type'] = TXO_TYPES['support']
|
|
elif txo.purchase is not None:
|
|
row['txo_type'] = TXO_TYPES['purchase']
|
|
row['claim_id'] = txo.purchased_claim_id
|
|
if txo.script.is_claim_involved:
|
|
row['claim_id'] = txo.claim_id
|
|
row['claim_name'] = txo.claim_name
|
|
return row
|
|
|
|
def tx_to_row(self, tx):
|
|
row = {
|
|
'txid': tx.id,
|
|
'raw': sqlite3.Binary(tx.raw),
|
|
'height': tx.height,
|
|
'position': tx.position,
|
|
'is_verified': tx.is_verified,
|
|
'day': tx.get_julian_day(self.ledger),
|
|
}
|
|
txos = tx.outputs
|
|
if len(txos) >= 2 and txos[1].can_decode_purchase_data:
|
|
txos[0].purchase = txos[1]
|
|
row['purchased_claim_id'] = txos[1].purchase_data.claim_id
|
|
return row
|
|
|
|
async def insert_transaction(self, tx):
|
|
await self.db.execute_fetchall(*self._insert_sql('tx', self.tx_to_row(tx)))
|
|
|
|
async def update_transaction(self, tx):
|
|
await self.db.execute_fetchall(*self._update_sql("tx", {
|
|
'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified
|
|
}, 'txid = ?', (tx.id,)))
|
|
|
|
def _transaction_io(self, conn: sqlite3.Connection, tx: Transaction, address, txhash):
|
|
conn.execute(*self._insert_sql('tx', self.tx_to_row(tx), replace=True)).fetchall()
|
|
|
|
is_my_input = False
|
|
|
|
for txi in tx.inputs:
|
|
if txi.txo_ref.txo is not None:
|
|
txo = txi.txo_ref.txo
|
|
if txo.has_address and txo.get_address(self.ledger) == address:
|
|
is_my_input = True
|
|
conn.execute(*self._insert_sql("txi", {
|
|
'txid': tx.id,
|
|
'txoid': txo.id,
|
|
'address': address,
|
|
'position': txi.position
|
|
}, ignore_duplicate=True)).fetchall()
|
|
|
|
for txo in tx.outputs:
|
|
if txo.script.is_pay_pubkey_hash and (txo.pubkey_hash == txhash or is_my_input):
|
|
conn.execute(*self._insert_sql(
|
|
"txo", self.txo_to_row(tx, txo), ignore_duplicate=True
|
|
)).fetchall()
|
|
elif txo.script.is_pay_script_hash:
|
|
# TODO: implement script hash payments
|
|
log.warning('Database.save_transaction_io: pay script hash is not implemented!')
|
|
|
|
def save_transaction_io(self, tx: Transaction, address, txhash, history):
|
|
return self.save_transaction_io_batch([tx], address, txhash, history)
|
|
|
|
def save_transaction_io_batch(self, txs: Iterable[Transaction], address, txhash, history):
|
|
history_count = history.count(':') // 2
|
|
|
|
def __many(conn):
|
|
for tx in txs:
|
|
self._transaction_io(conn, tx, address, txhash)
|
|
conn.execute(
|
|
"UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?",
|
|
(history, history_count, address)
|
|
).fetchall()
|
|
|
|
return self.db.run(__many)
|
|
|
|
async def reserve_outputs(self, txos, is_reserved=True):
|
|
txoids = [(is_reserved, txo.id) for txo in txos]
|
|
await self.db.executemany("UPDATE txo SET is_reserved = ? WHERE txoid = ?", txoids)
|
|
|
|
async def release_outputs(self, txos):
|
|
await self.reserve_outputs(txos, is_reserved=False)
|
|
|
|
async def rewind_blockchain(self, above_height): # pylint: disable=no-self-use
|
|
# TODO:
|
|
# 1. delete transactions above_height
|
|
# 2. update address histories removing deleted TXs
|
|
return True
|
|
|
|
async def select_transactions(self, cols, accounts=None, read_only=False, **constraints):
|
|
if not {'txid', 'txid__in'}.intersection(constraints):
|
|
assert accounts, "'accounts' argument required when no 'txid' constraint is present"
|
|
where, values = constraints_to_sql({
|
|
'$$account_address.account__in': [a.public_key.address for a in accounts]
|
|
})
|
|
constraints['txid__in'] = f"""
|
|
SELECT txo.txid FROM txo JOIN account_address USING (address) WHERE {where}
|
|
UNION
|
|
SELECT txi.txid FROM txi JOIN account_address USING (address) WHERE {where}
|
|
"""
|
|
constraints.update(values)
|
|
return await self.db.execute_fetchall(
|
|
*query(f"SELECT {cols} FROM tx", **constraints), read_only=read_only
|
|
)
|
|
|
|
TXO_NOT_MINE = Output(None, None, is_my_output=False)
|
|
|
|
async def get_transactions(self, wallet=None, **constraints):
|
|
include_is_spent = constraints.pop('include_is_spent', False)
|
|
include_is_my_input = constraints.pop('include_is_my_input', False)
|
|
include_is_my_output = constraints.pop('include_is_my_output', False)
|
|
|
|
tx_rows = await self.select_transactions(
|
|
'txid, raw, height, position, is_verified',
|
|
order_by=constraints.pop('order_by', ["height=0 DESC", "height DESC", "position DESC"]),
|
|
**constraints
|
|
)
|
|
|
|
if not tx_rows:
|
|
return []
|
|
|
|
txids, txs, txi_txoids = [], [], []
|
|
for row in tx_rows:
|
|
txids.append(row['txid'])
|
|
txs.append(Transaction(
|
|
raw=row['raw'], height=row['height'], position=row['position'],
|
|
is_verified=bool(row['is_verified'])
|
|
))
|
|
for txi in txs[-1].inputs:
|
|
txi_txoids.append(txi.txo_ref.id)
|
|
|
|
step = self.MAX_QUERY_VARIABLES
|
|
annotated_txos = {}
|
|
for offset in range(0, len(txids), step):
|
|
annotated_txos.update({
|
|
txo.id: txo for txo in
|
|
(await self.get_txos(
|
|
wallet=wallet,
|
|
txid__in=txids[offset:offset+step], order_by='txo.txid',
|
|
include_is_spent=include_is_spent,
|
|
include_is_my_input=include_is_my_input,
|
|
include_is_my_output=include_is_my_output,
|
|
))
|
|
})
|
|
|
|
referenced_txos = {}
|
|
for offset in range(0, len(txi_txoids), step):
|
|
referenced_txos.update({
|
|
txo.id: txo for txo in
|
|
(await self.get_txos(
|
|
wallet=wallet,
|
|
txoid__in=txi_txoids[offset:offset+step], order_by='txo.txoid',
|
|
include_is_my_output=include_is_my_output,
|
|
))
|
|
})
|
|
|
|
for tx in txs:
|
|
for txi in tx.inputs:
|
|
txo = referenced_txos.get(txi.txo_ref.id)
|
|
if txo:
|
|
txi.txo_ref = txo.ref
|
|
for txo in tx.outputs:
|
|
_txo = annotated_txos.get(txo.id)
|
|
if _txo:
|
|
txo.update_annotations(_txo)
|
|
else:
|
|
txo.update_annotations(self.TXO_NOT_MINE)
|
|
|
|
for tx in txs:
|
|
txos = tx.outputs
|
|
if len(txos) >= 2 and txos[1].can_decode_purchase_data:
|
|
txos[0].purchase = txos[1]
|
|
|
|
return txs
|
|
|
|
async def get_transaction_count(self, **constraints):
|
|
constraints.pop('wallet', None)
|
|
constraints.pop('offset', None)
|
|
constraints.pop('limit', None)
|
|
constraints.pop('order_by', None)
|
|
count = await self.select_transactions('COUNT(*) as total', **constraints)
|
|
return count[0]['total'] or 0
|
|
|
|
async def get_transaction(self, **constraints):
|
|
txs = await self.get_transactions(limit=1, **constraints)
|
|
if txs:
|
|
return txs[0]
|
|
|
|
async def select_txos(
|
|
self, cols, accounts=None, is_my_input=None, is_my_output=True,
|
|
is_my_input_or_output=None, exclude_internal_transfers=False,
|
|
include_is_spent=False, include_is_my_input=False,
|
|
is_spent=None, read_only=False, **constraints):
|
|
for rename_col in ('txid', 'txoid'):
|
|
for rename_constraint in (rename_col, rename_col+'__in', rename_col+'__not_in'):
|
|
if rename_constraint in constraints:
|
|
constraints['txo.'+rename_constraint] = constraints.pop(rename_constraint)
|
|
if accounts:
|
|
account_in_sql, values = constraints_to_sql({
|
|
'$$account__in': [a.public_key.address for a in accounts]
|
|
})
|
|
my_addresses = f"SELECT address FROM account_address WHERE {account_in_sql}"
|
|
constraints.update(values)
|
|
if is_my_input_or_output:
|
|
include_is_my_input = True
|
|
constraints['received_or_sent__or'] = {
|
|
'txo.address__in': my_addresses,
|
|
'sent__and': {
|
|
'txi.address__is_not_null': True,
|
|
'txi.address__in': my_addresses
|
|
}
|
|
}
|
|
else:
|
|
if is_my_output:
|
|
constraints['txo.address__in'] = my_addresses
|
|
elif is_my_output is False:
|
|
constraints['txo.address__not_in'] = my_addresses
|
|
if is_my_input:
|
|
include_is_my_input = True
|
|
constraints['txi.address__is_not_null'] = True
|
|
constraints['txi.address__in'] = my_addresses
|
|
elif is_my_input is False:
|
|
include_is_my_input = True
|
|
constraints['is_my_input_false__or'] = {
|
|
'txi.address__is_null': True,
|
|
'txi.address__not_in': my_addresses
|
|
}
|
|
if exclude_internal_transfers:
|
|
include_is_my_input = True
|
|
constraints['exclude_internal_payments__or'] = {
|
|
'txo.txo_type__not': TXO_TYPES['other'],
|
|
'txo.address__not_in': my_addresses,
|
|
'txi.address__is_null': True,
|
|
'txi.address__not_in': my_addresses,
|
|
}
|
|
sql = [f"SELECT {cols} FROM txo JOIN tx ON (tx.txid=txo.txid)"]
|
|
if is_spent:
|
|
constraints['spent.txoid__is_not_null'] = True
|
|
elif is_spent is False:
|
|
constraints['is_reserved'] = False
|
|
constraints['spent.txoid__is_null'] = True
|
|
if include_is_spent or is_spent is not None:
|
|
sql.append("LEFT JOIN txi AS spent ON (spent.txoid=txo.txoid)")
|
|
if include_is_my_input:
|
|
sql.append("LEFT JOIN txi ON (txi.position=0 AND txi.txid=txo.txid)")
|
|
return await self.db.execute_fetchall(*query(' '.join(sql), **constraints), read_only=read_only)
|
|
|
|
async def get_txos(self, wallet=None, no_tx=False, read_only=False, **constraints):
|
|
include_is_spent = constraints.get('include_is_spent', False)
|
|
include_is_my_input = constraints.get('include_is_my_input', False)
|
|
include_is_my_output = constraints.pop('include_is_my_output', False)
|
|
include_received_tips = constraints.pop('include_received_tips', False)
|
|
|
|
select_columns = [
|
|
"tx.txid, raw, tx.height, tx.position as tx_position, tx.is_verified, "
|
|
"txo_type, txo.position as txo_position, amount, script"
|
|
]
|
|
|
|
my_accounts = {a.public_key.address for a in wallet.accounts} if wallet else set()
|
|
my_accounts_sql = ""
|
|
if include_is_my_output or include_is_my_input:
|
|
my_accounts_sql, values = constraints_to_sql({'$$account__in#_wallet': my_accounts})
|
|
constraints.update(values)
|
|
|
|
if include_is_my_output and my_accounts:
|
|
if constraints.get('is_my_output', None) in (True, False):
|
|
select_columns.append(f"{1 if constraints['is_my_output'] else 0} AS is_my_output")
|
|
else:
|
|
select_columns.append(f"""(
|
|
txo.address IN (SELECT address FROM account_address WHERE {my_accounts_sql})
|
|
) AS is_my_output""")
|
|
|
|
if include_is_my_input and my_accounts:
|
|
if constraints.get('is_my_input', None) in (True, False):
|
|
select_columns.append(f"{1 if constraints['is_my_input'] else 0} AS is_my_input")
|
|
else:
|
|
select_columns.append(f"""(
|
|
txi.address IS NOT NULL AND
|
|
txi.address IN (SELECT address FROM account_address WHERE {my_accounts_sql})
|
|
) AS is_my_input""")
|
|
|
|
if include_is_spent:
|
|
select_columns.append("spent.txoid IS NOT NULL AS is_spent")
|
|
|
|
if include_received_tips:
|
|
select_columns.append(f"""(
|
|
SELECT COALESCE(SUM(support.amount), 0) FROM txo AS support WHERE
|
|
support.claim_id = txo.claim_id AND
|
|
support.txo_type = {TXO_TYPES['support']} AND
|
|
support.address IN (SELECT address FROM account_address WHERE {my_accounts_sql}) AND
|
|
support.txoid NOT IN (SELECT txoid FROM txi)
|
|
) AS received_tips""")
|
|
|
|
if 'order_by' not in constraints or constraints['order_by'] == 'height':
|
|
constraints['order_by'] = [
|
|
"tx.height=0 DESC", "tx.height DESC", "tx.position DESC", "txo.position"
|
|
]
|
|
elif constraints.get('order_by', None) == 'none':
|
|
del constraints['order_by']
|
|
|
|
rows = await self.select_txos(', '.join(select_columns), read_only=read_only, **constraints)
|
|
|
|
txos = []
|
|
txs = {}
|
|
for row in rows:
|
|
if no_tx:
|
|
txo = Output(
|
|
amount=row['amount'],
|
|
script=OutputScript(row['script']),
|
|
tx_ref=TXRefImmutable.from_id(row['txid'], row['height']),
|
|
position=row['txo_position']
|
|
)
|
|
else:
|
|
if row['txid'] not in txs:
|
|
txs[row['txid']] = Transaction(
|
|
row['raw'], height=row['height'], position=row['tx_position'],
|
|
is_verified=bool(row['is_verified'])
|
|
)
|
|
txo = txs[row['txid']].outputs[row['txo_position']]
|
|
if include_is_spent:
|
|
txo.is_spent = bool(row['is_spent'])
|
|
if include_is_my_input:
|
|
txo.is_my_input = bool(row['is_my_input'])
|
|
if include_is_my_output:
|
|
txo.is_my_output = bool(row['is_my_output'])
|
|
if include_is_my_input and include_is_my_output:
|
|
if txo.is_my_input and txo.is_my_output and row['txo_type'] == TXO_TYPES['other']:
|
|
txo.is_internal_transfer = True
|
|
else:
|
|
txo.is_internal_transfer = False
|
|
if include_received_tips:
|
|
txo.received_tips = row['received_tips']
|
|
txos.append(txo)
|
|
|
|
channel_ids = set()
|
|
for txo in txos:
|
|
if txo.is_claim and txo.can_decode_claim:
|
|
if txo.claim.is_signed:
|
|
channel_ids.add(txo.claim.signing_channel_id)
|
|
if txo.claim.is_channel and wallet:
|
|
for account in wallet.accounts:
|
|
private_key = account.get_channel_private_key(
|
|
txo.claim.channel.public_key_bytes
|
|
)
|
|
if private_key:
|
|
txo.private_key = private_key
|
|
break
|
|
|
|
if channel_ids:
|
|
channels = {
|
|
txo.claim_id: txo for txo in
|
|
(await self.get_channels(
|
|
wallet=wallet,
|
|
claim_id__in=channel_ids,
|
|
read_only=read_only
|
|
))
|
|
}
|
|
for txo in txos:
|
|
if txo.is_claim and txo.can_decode_claim:
|
|
txo.channel = channels.get(txo.claim.signing_channel_id, None)
|
|
|
|
return txos
|
|
|
|
@staticmethod
|
|
def _clean_txo_constraints_for_aggregation(constraints):
|
|
constraints.pop('include_is_spent', None)
|
|
constraints.pop('include_is_my_input', None)
|
|
constraints.pop('include_is_my_output', None)
|
|
constraints.pop('include_received_tips', None)
|
|
constraints.pop('wallet', None)
|
|
constraints.pop('resolve', None)
|
|
constraints.pop('offset', None)
|
|
constraints.pop('limit', None)
|
|
constraints.pop('order_by', None)
|
|
|
|
async def get_txo_count(self, **constraints):
|
|
self._clean_txo_constraints_for_aggregation(constraints)
|
|
count = await self.select_txos('COUNT(*) AS total', **constraints)
|
|
return count[0]['total'] or 0
|
|
|
|
async def get_txo_sum(self, **constraints):
|
|
self._clean_txo_constraints_for_aggregation(constraints)
|
|
result = await self.select_txos('SUM(amount) AS total', **constraints)
|
|
return result[0]['total'] or 0
|
|
|
|
async def get_txo_plot(self, start_day=None, days_back=0, end_day=None, days_after=None, **constraints):
|
|
self._clean_txo_constraints_for_aggregation(constraints)
|
|
if start_day is None:
|
|
constraints['day__gte'] = self.ledger.headers.estimated_julian_day(
|
|
self.ledger.headers.height
|
|
) - days_back
|
|
else:
|
|
constraints['day__gte'] = date_to_julian_day(
|
|
date.fromisoformat(start_day)
|
|
)
|
|
if end_day is not None:
|
|
constraints['day__lte'] = date_to_julian_day(
|
|
date.fromisoformat(end_day)
|
|
)
|
|
elif days_after is not None:
|
|
constraints['day__lte'] = constraints['day__gte'] + days_after
|
|
return await self.select_txos(
|
|
"DATE(day) AS day, SUM(amount) AS total",
|
|
group_by='day', order_by='day', **constraints
|
|
)
|
|
|
|
def get_utxos(self, read_only=False, **constraints):
|
|
return self.get_txos(is_spent=False, read_only=read_only, **constraints)
|
|
|
|
def get_utxo_count(self, **constraints):
|
|
return self.get_txo_count(is_spent=False, **constraints)
|
|
|
|
async def get_balance(self, wallet=None, accounts=None, read_only=False, **constraints):
|
|
assert wallet or accounts, \
|
|
"'wallet' or 'accounts' constraints required to calculate balance"
|
|
constraints['accounts'] = accounts or wallet.accounts
|
|
balance = await self.select_txos(
|
|
'SUM(amount) as total', is_spent=False, read_only=read_only, **constraints
|
|
)
|
|
return balance[0]['total'] or 0
|
|
|
|
async def select_addresses(self, cols, read_only=False, **constraints):
|
|
return await self.db.execute_fetchall(*query(
|
|
f"SELECT {cols} FROM pubkey_address JOIN account_address USING (address)",
|
|
**constraints
|
|
), read_only=read_only)
|
|
|
|
async def get_addresses(self, cols=None, read_only=False, **constraints):
|
|
cols = cols or (
|
|
'address', 'account', 'chain', 'history', 'used_times',
|
|
'pubkey', 'chain_code', 'n', 'depth'
|
|
)
|
|
addresses = await self.select_addresses(', '.join(cols), read_only=read_only, **constraints)
|
|
if 'pubkey' in cols:
|
|
for address in addresses:
|
|
address['pubkey'] = PubKey(
|
|
self.ledger, address.pop('pubkey'), address.pop('chain_code'),
|
|
address.pop('n'), address.pop('depth')
|
|
)
|
|
return addresses
|
|
|
|
async def get_address_count(self, cols=None, read_only=False, **constraints):
|
|
count = await self.select_addresses('COUNT(*) as total', read_only=read_only, **constraints)
|
|
return count[0]['total'] or 0
|
|
|
|
async def get_address(self, read_only=False, **constraints):
|
|
addresses = await self.get_addresses(read_only=read_only, limit=1, **constraints)
|
|
if addresses:
|
|
return addresses[0]
|
|
|
|
async def add_keys(self, account, chain, pubkeys):
|
|
await self.db.executemany(
|
|
"insert or ignore into account_address "
|
|
"(account, address, chain, pubkey, chain_code, n, depth) values "
|
|
"(?, ?, ?, ?, ?, ?, ?)", ((
|
|
account.id, k.address, chain,
|
|
sqlite3.Binary(k.pubkey_bytes),
|
|
sqlite3.Binary(k.chain_code),
|
|
k.n, k.depth
|
|
) for k in pubkeys)
|
|
)
|
|
await self.db.executemany(
|
|
"insert or ignore into pubkey_address (address) values (?)",
|
|
((pubkey.address,) for pubkey in pubkeys)
|
|
)
|
|
|
|
async def _set_address_history(self, address, history):
|
|
await self.db.execute_fetchall(
|
|
"UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?",
|
|
(history, history.count(':')//2, address)
|
|
)
|
|
|
|
async def set_address_history(self, address, history):
|
|
await self._set_address_history(address, history)
|
|
|
|
@staticmethod
|
|
def constrain_purchases(constraints):
|
|
accounts = constraints.pop('accounts', None)
|
|
assert accounts, "'accounts' argument required to find purchases"
|
|
if not {'purchased_claim_id', 'purchased_claim_id__in'}.intersection(constraints):
|
|
constraints['purchased_claim_id__is_not_null'] = True
|
|
constraints.update({
|
|
f'$account{i}': a.public_key.address for i, a in enumerate(accounts)
|
|
})
|
|
account_values = ', '.join([f':$account{i}' for i in range(len(accounts))])
|
|
constraints['txid__in'] = f"""
|
|
SELECT txid FROM txi JOIN account_address USING (address)
|
|
WHERE account_address.account IN ({account_values})
|
|
"""
|
|
|
|
async def get_purchases(self, **constraints):
|
|
self.constrain_purchases(constraints)
|
|
return [tx.outputs[0] for tx in await self.get_transactions(**constraints)]
|
|
|
|
def get_purchase_count(self, **constraints):
|
|
self.constrain_purchases(constraints)
|
|
return self.get_transaction_count(**constraints)
|
|
|
|
@staticmethod
|
|
def constrain_claims(constraints):
|
|
if {'txo_type', 'txo_type__in'}.intersection(constraints):
|
|
return
|
|
claim_types = constraints.pop('claim_type', None)
|
|
if claim_types:
|
|
constrain_single_or_list(
|
|
constraints, 'txo_type', claim_types, lambda x: TXO_TYPES[x]
|
|
)
|
|
else:
|
|
constraints['txo_type__in'] = CLAIM_TYPES
|
|
|
|
async def get_claims(self, read_only=False, **constraints) -> List[Output]:
|
|
self.constrain_claims(constraints)
|
|
return await self.get_utxos(read_only=read_only, **constraints)
|
|
|
|
def get_claim_count(self, **constraints):
|
|
self.constrain_claims(constraints)
|
|
return self.get_utxo_count(**constraints)
|
|
|
|
@staticmethod
|
|
def constrain_streams(constraints):
|
|
constraints['txo_type'] = TXO_TYPES['stream']
|
|
|
|
def get_streams(self, read_only=False, **constraints):
|
|
self.constrain_streams(constraints)
|
|
return self.get_claims(read_only=read_only, **constraints)
|
|
|
|
def get_stream_count(self, **constraints):
|
|
self.constrain_streams(constraints)
|
|
return self.get_claim_count(**constraints)
|
|
|
|
@staticmethod
|
|
def constrain_channels(constraints):
|
|
constraints['txo_type'] = TXO_TYPES['channel']
|
|
|
|
def get_channels(self, **constraints):
|
|
self.constrain_channels(constraints)
|
|
return self.get_claims(**constraints)
|
|
|
|
def get_channel_count(self, **constraints):
|
|
self.constrain_channels(constraints)
|
|
return self.get_claim_count(**constraints)
|
|
|
|
@staticmethod
|
|
def constrain_supports(constraints):
|
|
constraints['txo_type'] = TXO_TYPES['support']
|
|
|
|
def get_supports(self, **constraints):
|
|
self.constrain_supports(constraints)
|
|
return self.get_utxos(**constraints)
|
|
|
|
def get_support_count(self, **constraints):
|
|
self.constrain_supports(constraints)
|
|
return self.get_utxo_count(**constraints)
|
|
|
|
@staticmethod
|
|
def constrain_collections(constraints):
|
|
constraints['txo_type'] = TXO_TYPES['collection']
|
|
|
|
def get_collections(self, **constraints):
|
|
self.constrain_collections(constraints)
|
|
return self.get_utxos(**constraints)
|
|
|
|
def get_collection_count(self, **constraints):
|
|
self.constrain_collections(constraints)
|
|
return self.get_utxo_count(**constraints)
|
|
|
|
async def release_all_outputs(self, account):
|
|
await self.db.execute_fetchall(
|
|
"UPDATE txo SET is_reserved = 0 WHERE"
|
|
" is_reserved = 1 AND txo.address IN ("
|
|
" SELECT address from account_address WHERE account = ?"
|
|
" )", (account.public_key.address, )
|
|
)
|
|
|
|
def get_supports_summary(self, read_only=False, **constraints):
|
|
return self.get_txos(
|
|
txo_type=TXO_TYPES['support'],
|
|
is_spent=False, is_my_output=True,
|
|
include_is_my_input=True,
|
|
no_tx=True, read_only=read_only,
|
|
**constraints
|
|
)
|