diff --git a/torba/basedatabase.py b/torba/basedatabase.py index d28b368e3..ba9fbed1b 100644 --- a/torba/basedatabase.py +++ b/torba/basedatabase.py @@ -2,6 +2,7 @@ import logging import asyncio from asyncio import wrap_future from concurrent.futures.thread import ThreadPoolExecutor +from operator import itemgetter from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable @@ -48,7 +49,7 @@ class AIOSQLite: parameters = parameters if parameters is not None else [] return self.run(lambda conn, sql, parameters: conn.execute(sql, parameters), sql, parameters) - def run(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs) -> Awaitable: + def run(self, fun, *args, **kwargs) -> Awaitable: return wrap_future(self.executor.submit(self.__run_transaction, fun, *args, **kwargs)) def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs): @@ -272,56 +273,65 @@ class BaseDatabase(SQLiteMixin): 'script': sqlite3.Binary(txo.script.source) } - async def save_transaction_io(self, save_tx, tx: BaseTransaction, address, txhash, history): + def save_transaction_io(self, save_tx, tx: BaseTransaction, address, txhash, history): - if save_tx == 'insert': - await self.db.execute(*self._insert_sql('tx', { - 'txid': tx.id, - 'raw': sqlite3.Binary(tx.raw), - 'height': tx.height, - 'position': tx.position, - 'is_verified': tx.is_verified - })) - elif save_tx == 'update': - await self.db.execute(*self._update_sql("tx", { - 'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified - }, 'txid = ?', (tx.id,))) - - existing_txos = [r[0] for r in await self.db.execute_fetchall(*query( - "SELECT position FROM txo", txid=tx.id - ))] - - for txo in tx.outputs: - if txo.position in existing_txos: - continue - if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash: - await self.db.execute(*self._insert_sql("txo", self.txo_to_row(tx, address, txo))) - elif txo.script.is_pay_script_hash: - # TODO: implement script hash payments - log.warning('Database.save_transaction_io: pay script hash is not implemented!') - - # lookup the address associated with each TXI (via its TXO) - txoid_to_address = {r[0]: r[1] for r in await self.db.execute_fetchall(*query( - "SELECT txoid, address FROM txo", txoid__in=[txi.txo_ref.id for txi in tx.inputs] - ))} - - # list of TXIs that have already been added - existing_txis = [r[0] for r in await self.db.execute_fetchall(*query( - "SELECT txoid FROM txi", txid=tx.id - ))] - - for txi in tx.inputs: - txoid = txi.txo_ref.id - new_txi = txoid not in existing_txis - address_matches = txoid_to_address.get(txoid) == address - if new_txi and address_matches: - await self.db.execute(*self._insert_sql("txi", { + def _transaction(conn: sqlite3.Connection, save_tx, tx: BaseTransaction, address, txhash, history): + if save_tx == 'insert': + conn.execute(*self._insert_sql('tx', { 'txid': tx.id, - 'txoid': txoid, - 'address': address, + 'raw': sqlite3.Binary(tx.raw), + 'height': tx.height, + 'position': tx.position, + 'is_verified': tx.is_verified })) + elif save_tx == 'update': + conn.execute(*self._update_sql("tx", { + 'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified + }, 'txid = ?', (tx.id,))) - await self._set_address_history(address, history) + existing_txos = set(map(itemgetter(0), conn.execute(*query( + "SELECT position FROM txo", txid=tx.id + )))) + + txos_insert = [] + for txo in tx.outputs: + if txo.position in existing_txos: + continue + if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash: + txos_insert.append( + (tx.id, txo.id, address, txo.position, txo.amount, sqlite3.Binary(txo.script.source)) + ) + elif txo.script.is_pay_script_hash: + # TODO: implement script hash payments + log.warning('Database.save_transaction_io: pay script hash is not implemented!') + conn.executemany( + 'INSERT INTO txo (txid, txoid, address, position, amount, script) values (?, ?, ?, ?, ?, ?)', + txos_insert + ) + + # lookup the address associated with each TXI (via its TXO) + txoid_to_address = {r[0]: r[1] for r in conn.execute(*query( + "SELECT txoid, address FROM txo", txoid__in=[txi.txo_ref.id for txi in tx.inputs] + ))} + + # list of TXIs that have already been added + existing_txis = {r[0] for r in conn.execute(*query( + "SELECT txoid FROM txi", txid=tx.id + ))} + + txis_insert = [] + for txi in tx.inputs: + txoid = txi.txo_ref.id + new_txi = txoid not in existing_txis + address_matches = txoid_to_address.get(txoid) == address + if new_txi and address_matches: + txis_insert.append((tx.id, txoid, address)) + conn.executemany('INSERT INTO txi (txid, txoid, address) VALUES (?, ?, ?)', txis_insert) + conn.execute( + "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?", + (history, history.count(':')//2, address) + ) + return self.db.run(_transaction, save_tx, tx, address, txhash, history) async def reserve_outputs(self, txos, is_reserved=True): txoids = [txo.id for txo in txos]