save_transaction_io back to a transaction

This commit is contained in:
Victor Shyba 2018-10-18 22:59:41 -03:00 committed by Lex Berezhny
parent e650562758
commit 8bbda75865

View file

@ -2,6 +2,7 @@ import logging
import asyncio import asyncio
from asyncio import wrap_future from asyncio import wrap_future
from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor
from operator import itemgetter
from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable from typing import Tuple, List, Union, Callable, Any, Awaitable, Iterable
@ -48,7 +49,7 @@ class AIOSQLite:
parameters = parameters if parameters is not None else [] parameters = parameters if parameters is not None else []
return self.run(lambda conn, sql, parameters: conn.execute(sql, parameters), sql, parameters) return self.run(lambda conn, sql, parameters: conn.execute(sql, parameters), sql, parameters)
def run(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs) -> Awaitable: def run(self, fun, *args, **kwargs) -> Awaitable:
return wrap_future(self.executor.submit(self.__run_transaction, fun, *args, **kwargs)) return wrap_future(self.executor.submit(self.__run_transaction, fun, *args, **kwargs))
def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs): def __run_transaction(self, fun: Callable[[sqlite3.Connection, Any, Any], Any], *args, **kwargs):
@ -272,10 +273,11 @@ class BaseDatabase(SQLiteMixin):
'script': sqlite3.Binary(txo.script.source) 'script': sqlite3.Binary(txo.script.source)
} }
async def save_transaction_io(self, save_tx, tx: BaseTransaction, address, txhash, history): def save_transaction_io(self, save_tx, tx: BaseTransaction, address, txhash, history):
def _transaction(conn: sqlite3.Connection, save_tx, tx: BaseTransaction, address, txhash, history):
if save_tx == 'insert': if save_tx == 'insert':
await self.db.execute(*self._insert_sql('tx', { conn.execute(*self._insert_sql('tx', {
'txid': tx.id, 'txid': tx.id,
'raw': sqlite3.Binary(tx.raw), 'raw': sqlite3.Binary(tx.raw),
'height': tx.height, 'height': tx.height,
@ -283,45 +285,53 @@ class BaseDatabase(SQLiteMixin):
'is_verified': tx.is_verified 'is_verified': tx.is_verified
})) }))
elif save_tx == 'update': elif save_tx == 'update':
await self.db.execute(*self._update_sql("tx", { conn.execute(*self._update_sql("tx", {
'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified 'height': tx.height, 'position': tx.position, 'is_verified': tx.is_verified
}, 'txid = ?', (tx.id,))) }, 'txid = ?', (tx.id,)))
existing_txos = [r[0] for r in await self.db.execute_fetchall(*query( existing_txos = set(map(itemgetter(0), conn.execute(*query(
"SELECT position FROM txo", txid=tx.id "SELECT position FROM txo", txid=tx.id
))] ))))
txos_insert = []
for txo in tx.outputs: for txo in tx.outputs:
if txo.position in existing_txos: if txo.position in existing_txos:
continue continue
if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash: if txo.script.is_pay_pubkey_hash and txo.script.values['pubkey_hash'] == txhash:
await self.db.execute(*self._insert_sql("txo", self.txo_to_row(tx, address, txo))) txos_insert.append(
(tx.id, txo.id, address, txo.position, txo.amount, sqlite3.Binary(txo.script.source))
)
elif txo.script.is_pay_script_hash: elif txo.script.is_pay_script_hash:
# TODO: implement script hash payments # TODO: implement script hash payments
log.warning('Database.save_transaction_io: pay script hash is not implemented!') log.warning('Database.save_transaction_io: pay script hash is not implemented!')
conn.executemany(
'INSERT INTO txo (txid, txoid, address, position, amount, script) values (?, ?, ?, ?, ?, ?)',
txos_insert
)
# lookup the address associated with each TXI (via its TXO) # lookup the address associated with each TXI (via its TXO)
txoid_to_address = {r[0]: r[1] for r in await self.db.execute_fetchall(*query( txoid_to_address = {r[0]: r[1] for r in conn.execute(*query(
"SELECT txoid, address FROM txo", txoid__in=[txi.txo_ref.id for txi in tx.inputs] "SELECT txoid, address FROM txo", txoid__in=[txi.txo_ref.id for txi in tx.inputs]
))} ))}
# list of TXIs that have already been added # list of TXIs that have already been added
existing_txis = [r[0] for r in await self.db.execute_fetchall(*query( existing_txis = {r[0] for r in conn.execute(*query(
"SELECT txoid FROM txi", txid=tx.id "SELECT txoid FROM txi", txid=tx.id
))] ))}
txis_insert = []
for txi in tx.inputs: for txi in tx.inputs:
txoid = txi.txo_ref.id txoid = txi.txo_ref.id
new_txi = txoid not in existing_txis new_txi = txoid not in existing_txis
address_matches = txoid_to_address.get(txoid) == address address_matches = txoid_to_address.get(txoid) == address
if new_txi and address_matches: if new_txi and address_matches:
await self.db.execute(*self._insert_sql("txi", { txis_insert.append((tx.id, txoid, address))
'txid': tx.id, conn.executemany('INSERT INTO txi (txid, txoid, address) VALUES (?, ?, ?)', txis_insert)
'txoid': txoid, conn.execute(
'address': address, "UPDATE pubkey_address SET history = ?, used_times = ? WHERE address = ?",
})) (history, history.count(':')//2, address)
)
await self._set_address_history(address, history) return self.db.run(_transaction, save_tx, tx, address, txhash, history)
async def reserve_outputs(self, txos, is_reserved=True): async def reserve_outputs(self, txos, is_reserved=True):
txoids = [txo.id for txo in txos] txoids = [txo.id for txo in txos]