lbry-sdk/torba/basedatabase.py

305 lines
11 KiB
Python
Raw Normal View History

2018-06-08 05:47:46 +02:00
import logging
2018-06-12 16:02:04 +02:00
from typing import List, Union
2018-06-14 02:57:57 +02:00
from operator import itemgetter
2018-06-12 16:02:04 +02:00
2018-06-08 05:47:46 +02:00
import sqlite3
from twisted.internet import defer
from twisted.enterprise import adbapi
2018-06-12 16:02:04 +02:00
import torba.baseaccount
2018-06-08 05:47:46 +02:00
log = logging.getLogger(__name__)
2018-06-11 15:33:32 +02:00
class SQLiteMixin(object):
CREATE_TABLES_QUERY = None
def __init__(self, path):
self._db_path = path
self.db = None
def start(self):
log.info("connecting to database: %s", self._db_path)
self.db = adbapi.ConnectionPool(
'sqlite3', self._db_path, cp_min=1, cp_max=1, check_same_thread=False
)
return self.db.runInteraction(
lambda t: t.executescript(self.CREATE_TABLES_QUERY)
)
def stop(self):
self.db.close()
return defer.succeed(True)
def _debug_sql(self, sql):
""" For use during debugging to execute arbitrary SQL queries without waiting on reactor. """
conn = self.db.connectionFactory(self.db)
trans = self.db.transactionFactory(self, conn)
return trans.execute(sql).fetchall()
def _insert_sql(self, table, data):
2018-06-12 16:02:04 +02:00
# type: (str, dict) -> tuple[str, List]
2018-06-11 15:33:32 +02:00
columns, values = [], []
for column, value in data.items():
columns.append(column)
values.append(value)
2018-06-12 16:02:04 +02:00
sql = "REPLACE INTO {} ({}) VALUES ({})".format(
2018-06-11 15:33:32 +02:00
table, ', '.join(columns), ', '.join(['?'] * len(values))
)
return sql, values
@defer.inlineCallbacks
def query_one_value_list(self, query, params):
2018-06-12 16:02:04 +02:00
# type: (str, Union[dict,tuple]) -> defer.Deferred[List]
2018-06-11 15:33:32 +02:00
result = yield self.db.runQuery(query, params)
if result:
defer.returnValue([i[0] for i in result])
else:
defer.returnValue([])
@defer.inlineCallbacks
def query_one_value(self, query, params=None, default=None):
result = yield self.db.runQuery(query, params)
if result:
defer.returnValue(result[0][0])
else:
defer.returnValue(default)
@defer.inlineCallbacks
def query_dict_value_list(self, query, fields, params=None):
result = yield self.db.runQuery(query.format(', '.join(fields)), params)
if result:
defer.returnValue([dict(zip(fields, r)) for r in result])
else:
defer.returnValue([])
@defer.inlineCallbacks
def query_dict_value(self, query, fields, params=None, default=None):
result = yield self.query_dict_value_list(query, fields, params)
if result:
defer.returnValue(result[0])
else:
defer.returnValue(default)
def query_count(self, sql, params):
return self.query_one_value(
"SELECT count(*) FROM ({})".format(sql), params
)
def insert_and_return_id(self, table, data):
def do_insert(t):
t.execute(*self._insert_sql(table, data))
return t.lastrowid
return self.db.runInteraction(do_insert)
class BaseDatabase(SQLiteMixin):
2018-06-08 05:47:46 +02:00
CREATE_TX_TABLE = """
create table if not exists tx (
2018-06-14 02:57:57 +02:00
txhash blob primary key,
2018-06-08 05:47:46 +02:00
raw blob not null,
height integer not null,
2018-06-14 02:57:57 +02:00
is_verified boolean not null default false
2018-06-08 05:47:46 +02:00
);
2018-06-11 15:33:32 +02:00
"""
CREATE_PUBKEY_ADDRESS_TABLE = """
create table if not exists pubkey_address (
address blob primary key,
account blob not null,
chain integer not null,
position integer not null,
pubkey blob not null,
history text,
2018-06-14 02:57:57 +02:00
used_times integer not null default 0
2018-06-08 05:47:46 +02:00
);
"""
CREATE_TXO_TABLE = """
create table if not exists txo (
txoid integer primary key,
2018-06-14 02:57:57 +02:00
txhash blob references tx,
2018-06-11 15:33:32 +02:00
address blob references pubkey_address,
position integer not null,
2018-06-08 05:47:46 +02:00
amount integer not null,
script blob not null
);
"""
CREATE_TXI_TABLE = """
create table if not exists txi (
2018-06-14 02:57:57 +02:00
txhash blob references tx,
2018-06-11 15:33:32 +02:00
address blob references pubkey_address,
2018-06-08 05:47:46 +02:00
txoid integer references txo
);
"""
CREATE_TABLES_QUERY = (
CREATE_TX_TABLE +
2018-06-11 15:33:32 +02:00
CREATE_PUBKEY_ADDRESS_TABLE +
2018-06-08 05:47:46 +02:00
CREATE_TXO_TABLE +
CREATE_TXI_TABLE
)
2018-06-14 02:57:57 +02:00
def add_transaction(self, address, hash, tx, height, is_verified):
2018-06-11 15:33:32 +02:00
def _steps(t):
2018-06-14 02:57:57 +02:00
current_height = t.execute("SELECT height FROM tx WHERE txhash=?", (sqlite3.Binary(tx.hash),)).fetchone()
if current_height is None:
2018-06-11 15:33:32 +02:00
t.execute(*self._insert_sql('tx', {
2018-06-14 02:57:57 +02:00
'txhash': sqlite3.Binary(tx.hash),
2018-06-11 15:33:32 +02:00
'raw': sqlite3.Binary(tx.raw),
'height': height,
'is_verified': is_verified
}))
2018-06-14 02:57:57 +02:00
elif current_height[0] != height:
t.execute("UPDATE tx SET height = :height WHERE txhash = :txhash", {
'txhash': sqlite3.Binary(tx.hash),
'height': height,
})
existing_txos = list(map(itemgetter(0), t.execute(
"SELECT position FROM txo WHERE txhash = ?",
(sqlite3.Binary(tx.hash),)
).fetchall()))
2018-06-12 16:02:04 +02:00
for txo in tx.outputs:
2018-06-14 02:57:57 +02:00
if txo.index in existing_txos:
continue
2018-06-12 16:02:04 +02:00
if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == hash:
t.execute(*self._insert_sql("txo", {
2018-06-14 02:57:57 +02:00
'txhash': sqlite3.Binary(tx.hash),
2018-06-12 16:02:04 +02:00
'address': sqlite3.Binary(address),
'position': txo.index,
'amount': txo.amount,
'script': sqlite3.Binary(txo.script.source)
}))
elif txo.script.is_pay_script_hash:
# TODO: implement script hash payments
print('Database.add_transaction pay script hash is not implemented!')
2018-06-14 02:57:57 +02:00
existing_txis = [txi[0] for txi in t.execute(
"SELECT txoid FROM txi WHERE txhash = ? AND address = ?",
(sqlite3.Binary(tx.hash), sqlite3.Binary(address))).fetchall()]
2018-06-12 16:02:04 +02:00
for txi in tx.inputs:
txoid = t.execute(
2018-06-14 02:57:57 +02:00
"SELECT txoid FROM txo WHERE txhash = ? AND position = ?",
(sqlite3.Binary(txi.output_txhash), txi.output_index)
2018-06-12 16:02:04 +02:00
).fetchone()
2018-06-14 02:57:57 +02:00
if txoid is not None and txoid[0] not in existing_txis:
2018-06-12 16:02:04 +02:00
t.execute(*self._insert_sql("txi", {
2018-06-14 02:57:57 +02:00
'txhash': sqlite3.Binary(tx.hash),
2018-06-12 16:02:04 +02:00
'address': sqlite3.Binary(address),
2018-06-14 02:57:57 +02:00
'txoid': txoid[0],
2018-06-12 16:02:04 +02:00
}))
2018-06-14 02:57:57 +02:00
2018-06-11 15:33:32 +02:00
return self.db.runInteraction(_steps)
2018-06-08 05:47:46 +02:00
@defer.inlineCallbacks
def get_balance_for_account(self, account):
result = yield self.db.runQuery(
2018-06-14 02:57:57 +02:00
"""
SELECT SUM(amount) FROM txo
JOIN pubkey_address ON pubkey_address.address=txo.address
WHERE account=:account AND
txoid NOT IN (SELECT txoid FROM txi)
""",
2018-06-08 05:47:46 +02:00
{'account': sqlite3.Binary(account.public_key.address)}
)
if result:
defer.returnValue(result[0][0] or 0)
else:
defer.returnValue(0)
@defer.inlineCallbacks
def get_utxos(self, account, output_class):
utxos = yield self.db.runQuery(
"""
2018-06-14 02:57:57 +02:00
SELECT amount, script, txhash, txo.position
FROM txo JOIN pubkey_address ON pubkey_address.address=txo.address
2018-06-12 16:02:04 +02:00
WHERE account=:account AND txoid NOT IN (SELECT txoid FROM txi)
2018-06-08 05:47:46 +02:00
""",
{'account': sqlite3.Binary(account.public_key.address)}
)
defer.returnValue([
output_class(
values[0],
output_class.script_class(values[1]),
2018-06-12 16:02:04 +02:00
values[2],
index=values[3]
2018-06-08 05:47:46 +02:00
) for values in utxos
])
2018-06-11 15:33:32 +02:00
def add_keys(self, account, chain, keys):
sql = (
"insert into pubkey_address "
"(address, account, chain, position, pubkey) "
"values "
) + ', '.join(['(?, ?, ?, ?, ?)'] * len(keys))
values = []
for position, pubkey in keys:
values.append(sqlite3.Binary(pubkey.address))
values.append(sqlite3.Binary(account.public_key.address))
values.append(chain)
values.append(position)
values.append(sqlite3.Binary(pubkey.pubkey_bytes))
return self.db.runOperation(sql, values)
2018-06-12 16:02:04 +02:00
def get_addresses(self, account, chain, limit=None, details=False):
sql = ["SELECT {} FROM pubkey_address WHERE account = :account"]
params = {'account': sqlite3.Binary(account.public_key.address)}
if chain is not None:
sql.append("AND chain = :chain")
params['chain'] = chain
sql.append("ORDER BY position DESC")
if limit is not None:
sql.append("LIMIT {}".format(limit))
if details:
return self.query_dict_value_list(' '.join(sql), ('address', 'position', 'used_times'), params)
else:
return self.query_one_value_list(' '.join(sql).format('address'), params)
def _used_address_sql(self, account, chain, comparison_op, used_times, limit=None):
sql = [
"SELECT address FROM pubkey_address",
"WHERE account = :account AND"
]
params = {
2018-06-11 15:33:32 +02:00
'account': sqlite3.Binary(account.public_key.address),
2018-06-12 16:02:04 +02:00
'used_times': used_times
2018-06-11 15:33:32 +02:00
}
2018-06-12 16:02:04 +02:00
if chain is not None:
sql.append("chain = :chain AND")
params['chain'] = chain
sql.append("used_times {} :used_times".format(comparison_op))
sql.append("ORDER BY used_times ASC")
if limit is not None:
sql.append('LIMIT {}'.format(limit))
return ' '.join(sql), params
def get_unused_addresses(self, account, chain):
# type: (torba.baseaccount.BaseAccount, int) -> defer.Deferred[List[str]]
return self.query_one_value_list(*self._used_address_sql(
account, chain, '=', 0
2018-06-11 15:33:32 +02:00
))
2018-06-12 16:02:04 +02:00
def get_usable_addresses(self, account, chain, max_used_times, limit):
return self.query_one_value_list(*self._used_address_sql(
account, chain, '<=', max_used_times, limit
2018-06-11 15:33:32 +02:00
))
2018-06-12 16:02:04 +02:00
def get_address(self, address):
return self.query_dict_value(
"SELECT {} FROM pubkey_address WHERE address= :address",
('address', 'account', 'chain', 'position', 'pubkey', 'history', 'used_times'),
{'address': sqlite3.Binary(address)}
2018-06-08 05:47:46 +02:00
)
2018-06-12 16:02:04 +02:00
def set_address_history(self, address, history):
2018-06-08 05:47:46 +02:00
return self.db.runOperation(
2018-06-12 16:02:04 +02:00
"UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?",
(sqlite3.Binary(history), history.count(b':')//2, sqlite3.Binary(address))
2018-06-08 05:47:46 +02:00
)