integration/rpctest: queue block disconnected ntfns in memwallet
This commit is contained in:
parent
8aeb8c3ef2
commit
6cea610774
1 changed files with 54 additions and 27 deletions
|
@ -56,6 +56,7 @@ func (u *utxo) isMature(height int32) bool {
|
|||
type chainUpdate struct {
|
||||
blockHeight int32
|
||||
filteredTxns []*btcutil.Tx
|
||||
isConnect bool // True if connect, false if disconnect
|
||||
}
|
||||
|
||||
// undoEntry is functionally the opposite of a chainUpdate. An undoEntry is
|
||||
|
@ -176,13 +177,14 @@ func (m *memWallet) SetRPCClient(rpcClient *rpcclient.Client) {
|
|||
}
|
||||
|
||||
// IngestBlock is a call-back which is to be triggered each time a new block is
|
||||
// connected to the main chain. Ingesting a block updates the wallet's internal
|
||||
// utxo state based on the outputs created and destroyed within each block.
|
||||
// connected to the main chain. It queues the update for the chain syncer,
|
||||
// calling the private version in sequential order.
|
||||
func (m *memWallet) IngestBlock(height int32, header *wire.BlockHeader, filteredTxns []*btcutil.Tx) {
|
||||
// Append this new chain update to the end of the queue of new chain
|
||||
// updates.
|
||||
m.chainMtx.Lock()
|
||||
m.chainUpdates = append(m.chainUpdates, &chainUpdate{height, filteredTxns})
|
||||
m.chainUpdates = append(m.chainUpdates, &chainUpdate{height,
|
||||
filteredTxns, true})
|
||||
m.chainMtx.Unlock()
|
||||
|
||||
// Launch a goroutine to signal the chainSyncer that a new update is
|
||||
|
@ -193,6 +195,30 @@ func (m *memWallet) IngestBlock(height int32, header *wire.BlockHeader, filtered
|
|||
}()
|
||||
}
|
||||
|
||||
// ingestBlock updates the wallet's internal utxo state based on the outputs
|
||||
// created and destroyed within each block.
|
||||
func (m *memWallet) ingestBlock(update *chainUpdate) {
|
||||
// Update the latest synced height, then process each filtered
|
||||
// transaction in the block creating and destroying utxos within
|
||||
// the wallet as a result.
|
||||
m.currentHeight = update.blockHeight
|
||||
undo := &undoEntry{
|
||||
utxosDestroyed: make(map[wire.OutPoint]*utxo),
|
||||
}
|
||||
for _, tx := range update.filteredTxns {
|
||||
mtx := tx.MsgTx()
|
||||
isCoinbase := blockchain.IsCoinBaseTx(mtx)
|
||||
txHash := mtx.TxHash()
|
||||
m.evalOutputs(mtx.TxOut, &txHash, isCoinbase, undo)
|
||||
m.evalInputs(mtx.TxIn, undo)
|
||||
}
|
||||
|
||||
// Finally, record the undo entry for this block so we can
|
||||
// properly update our internal state in response to the block
|
||||
// being re-org'd from the main chain.
|
||||
m.reorgJournal[update.blockHeight] = undo
|
||||
}
|
||||
|
||||
// chainSyncer is a goroutine dedicated to processing new blocks in order to
|
||||
// keep the wallet's utxo state up to date.
|
||||
//
|
||||
|
@ -209,26 +235,12 @@ func (m *memWallet) chainSyncer() {
|
|||
m.chainUpdates = m.chainUpdates[1:]
|
||||
m.chainMtx.Unlock()
|
||||
|
||||
// Update the latest synced height, then process each filtered
|
||||
// transaction in the block creating and destroying utxos within
|
||||
// the wallet as a result.
|
||||
m.Lock()
|
||||
m.currentHeight = update.blockHeight
|
||||
undo := &undoEntry{
|
||||
utxosDestroyed: make(map[wire.OutPoint]*utxo),
|
||||
if update.isConnect {
|
||||
m.ingestBlock(update)
|
||||
} else {
|
||||
m.unwindBlock(update)
|
||||
}
|
||||
for _, tx := range update.filteredTxns {
|
||||
mtx := tx.MsgTx()
|
||||
isCoinbase := blockchain.IsCoinBaseTx(mtx)
|
||||
txHash := mtx.TxHash()
|
||||
m.evalOutputs(mtx.TxOut, &txHash, isCoinbase, undo)
|
||||
m.evalInputs(mtx.TxIn, undo)
|
||||
}
|
||||
|
||||
// Finally, record the undo entry for this block so we can
|
||||
// properly update our internal state in response to the block
|
||||
// being re-org'd from the main chain.
|
||||
m.reorgJournal[update.blockHeight] = undo
|
||||
m.Unlock()
|
||||
}
|
||||
}
|
||||
|
@ -285,13 +297,28 @@ func (m *memWallet) evalInputs(inputs []*wire.TxIn, undo *undoEntry) {
|
|||
}
|
||||
|
||||
// UnwindBlock is a call-back which is to be executed each time a block is
|
||||
// disconnected from the main chain. Unwinding a block undoes the effect that a
|
||||
// particular block had on the wallet's internal utxo state.
|
||||
// disconnected from the main chain. It queues the update for the chain syncer,
|
||||
// calling the private version in sequential order.
|
||||
func (m *memWallet) UnwindBlock(height int32, header *wire.BlockHeader) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
// Append this new chain update to the end of the queue of new chain
|
||||
// updates.
|
||||
m.chainMtx.Lock()
|
||||
m.chainUpdates = append(m.chainUpdates, &chainUpdate{height,
|
||||
nil, false})
|
||||
m.chainMtx.Unlock()
|
||||
|
||||
undo := m.reorgJournal[height]
|
||||
// Launch a goroutine to signal the chainSyncer that a new update is
|
||||
// available. We do this in a new goroutine in order to avoid blocking
|
||||
// the main loop of the rpc client.
|
||||
go func() {
|
||||
m.chainUpdateSignal <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
// unwindBlock undoes the effect that a particular block had on the wallet's
|
||||
// internal utxo state.
|
||||
func (m *memWallet) unwindBlock(update *chainUpdate) {
|
||||
undo := m.reorgJournal[update.blockHeight]
|
||||
|
||||
for _, utxo := range undo.utxosCreated {
|
||||
delete(m.utxos, utxo)
|
||||
|
@ -301,7 +328,7 @@ func (m *memWallet) UnwindBlock(height int32, header *wire.BlockHeader) {
|
|||
m.utxos[outPoint] = utxo
|
||||
}
|
||||
|
||||
delete(m.reorgJournal, height)
|
||||
delete(m.reorgJournal, update.blockHeight)
|
||||
}
|
||||
|
||||
// newAddress returns a new address from the wallet's hd key chain. It also
|
||||
|
|
Loading…
Add table
Reference in a new issue