Improve txstore unspent output bookkeeping.

This change "reverses" the mapping used by the transaction store to
reference and lookup unspent credits.  Rather than mapping slice
indexes of a block, and then another block map for slice indexes of
transactions with unspent credits, and requiring a lookup through each
credit for whether it is spent or unspent, keep a simple map of
outpoints to a lookup key to find the transaction in a block.

This has a positive effect on performance when searching for previous
transaction outputs that have been spent by a newly-inserted
transaction.  Rather than iterating through every block with an
unspent credit, and then every transaction with unspent credits, a
simple map lookup can be done to check whether a transaction input's
previous outpoint is marked as unspent by wallet, and then access the
transaction record itself by the lookup key.  While transactions
created by wallet with the sendfrom/many RPCs may mark debits with the
previous credits already known, the previous outputs may still not be
known if a debiting transaction was added by rescan, or notified as a
result of a create+sendrawtransaction.
This commit is contained in:
Josh Rickmar 2014-06-17 22:48:54 -05:00
parent cb717455c7
commit 9153a342e4
2 changed files with 141 additions and 225 deletions

View file

@ -95,7 +95,6 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
for i := uint32(0); i < blockCount; i++ {
b := &blockTxCollection{
txIndexes: map[int]uint32{},
unspent: map[int]uint32{},
}
tmpn64, err := b.ReadFrom(r)
n64 += tmpn64
@ -108,12 +107,24 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
s.blocks = append(s.blocks, b)
s.blockIndexes[b.Height] = i
// Recreate unspent map. If any of the block's transactions
// contain unspent credits, mark the store's unspent map to
// reflect that this block contains transactions with unspent
// credits.
if len(b.unspent) != 0 {
s.unspent[b.Height] = struct{}{}
// Recreate store unspent map.
for blockIndex, i := range b.txIndexes {
tx := b.txs[i]
for outputIdx, cred := range tx.credits {
if cred == nil {
continue
}
if cred.spentBy == nil {
op := btcwire.OutPoint{
Hash: *tx.tx.Sha(),
Index: uint32(outputIdx),
}
s.unspent[op] = BlockTxKey{
BlockIndex: blockIndex,
BlockHeight: b.Height,
}
}
}
}
}
@ -262,18 +273,6 @@ func (b *blockTxCollection) ReadFrom(r io.Reader) (int64, error) {
// block index of the underlying transaction to the slice index
// of the record.
b.txIndexes[t.tx.Index()] = i
// Recreate unspent map. For each credit of this transaction,
// if any credit is unspent, mark it in unspent map.
for _, c := range t.credits {
if c == nil {
continue
}
if c.spentBy == nil {
b.unspent[t.tx.Index()] = i
break
}
}
}
return n64, nil

View file

@ -161,9 +161,7 @@ type Store struct {
blocks []*blockTxCollection
blockIndexes map[int32]uint32
// unspent is a set of block heights which contain transactions with
// unspent outputs.
unspent map[int32]struct{}
unspent map[btcwire.OutPoint]BlockTxKey
// unconfirmed holds a collection of wallet transactions that have not
// been mined into a block yet.
@ -187,10 +185,6 @@ type blockTxCollection struct {
// index.
txs []*txRecord
txIndexes map[int]uint32
// unspents maps block indexes of transactions with unspent outputs to
// their index in the txs slice.
unspent map[int]uint32
}
// unconfirmedStore stores all unconfirmed transactions managed by the Store.
@ -264,7 +258,7 @@ type credit struct {
func New() *Store {
return &Store{
blockIndexes: map[int32]uint32{},
unspent: map[int32]struct{}{},
unspent: map[btcwire.OutPoint]BlockTxKey{},
unconfirmed: unconfirmedStore{
txs: map[btcwire.ShaHash]*txRecord{},
spentBlockOutPoints: map[BlockOutputKey]*txRecord{},
@ -329,7 +323,6 @@ func (s *Store) blockCollectionForInserts(block *Block) *blockTxCollection {
b = &blockTxCollection{
Block: *block,
txIndexes: map[int]uint32{},
unspent: map[int]uint32{},
}
// If this new block cannot be appended to the end of the blocks
@ -386,9 +379,6 @@ func (c *blockTxCollection) txRecordForInserts(tx *btcutil.Tx) *txRecord {
for i, r := range detached {
newIndex := uint32(i + len(c.txs))
c.txIndexes[r.Tx().Index()] = newIndex
if _, ok := c.unspent[r.Tx().Index()]; ok {
c.unspent[r.Tx().Index()] = newIndex
}
}
c.txs = append(c.txs, detached...)
} else {
@ -505,10 +495,8 @@ func (s *Store) moveMinedTx(r *txRecord, block *Block) error {
s.unconfirmed.spentBlockOutPoints[outputKey] = rr
credit.spentBy = &BlockTxKey{BlockHeight: -1}
} else if credit.spentBy == nil {
// Mark entire transaction as containing at least one
// unspent credit.
s.unspent[key.BlockHeight] = struct{}{}
b.unspent[key.BlockIndex] = txIndex
// Mark outpoint unspent.
s.unspent[*op] = key
// Increment spendable amount delta as a result of
// moving this credit to this block.
@ -677,53 +665,44 @@ func (t *TxRecord) AddDebits(spent []*Credit) (*Debits, error) {
}
// findPreviousCredits searches for all unspent credits that make up the inputs
// for tx. This lookup is very expensive and should be avoided at all costs.
// for tx.
func (s *Store) findPreviousCredits(tx *btcutil.Tx) ([]*Credit, error) {
unfound := make(map[btcwire.OutPoint]struct{}, len(tx.MsgTx().TxIn))
for _, txIn := range tx.MsgTx().TxIn {
unfound[txIn.PreviousOutpoint] = struct{}{}
type createdCredit struct {
credit *Credit
err error
}
spent := make([]*Credit, 0, len(unfound))
done:
for blockHeight := range s.unspent {
b, err := s.lookupBlock(blockHeight)
if err != nil {
return nil, err
}
for blockIndex, txIdx := range b.unspent {
if uint32(len(b.txs)) <= txIdx {
return nil, MissingBlockTxError{
BlockIndex: blockIndex,
BlockHeight: blockHeight,
}
inputs := tx.MsgTx().TxIn
creditChans := make([]chan createdCredit, len(inputs))
for i, txIn := range inputs {
creditChans[i] = make(chan createdCredit)
go func(i int, op btcwire.OutPoint) {
key, ok := s.unspent[op]
if !ok {
close(creditChans[i])
return
}
r := b.txs[txIdx]
op := btcwire.OutPoint{Hash: *r.Tx().Sha()}
for i, cred := range r.credits {
if cred == nil || cred.spentBy != nil {
continue
}
op.Index = uint32(i)
if _, ok := unfound[op]; ok {
key := BlockTxKey{blockIndex, b.Height}
t := &TxRecord{key, r, s}
c := &Credit{t, op.Index}
spent = append(spent, c)
delete(unfound, op)
if len(unfound) == 0 {
break done
}
}
r, err := s.lookupBlockTx(key)
if err != nil {
creditChans[i] <- createdCredit{err: err}
return
}
}
t := &TxRecord{key, r, s}
c := &Credit{t, op.Index}
creditChans[i] <- createdCredit{credit: c}
}(i, txIn.PreviousOutpoint)
}
spent := make([]*Credit, 0, len(inputs))
for _, c := range creditChans {
cc, ok := <-c
if !ok {
continue
}
if cc.err != nil {
return nil, cc.err
}
spent = append(spent, cc.credit)
}
return spent, nil
}
@ -732,24 +711,13 @@ done:
func (s *Store) markOutputsSpent(spent []*Credit, t *TxRecord) (btcutil.Amount, error) {
var a btcutil.Amount
for _, prev := range spent {
op := prev.OutPoint()
switch prev.BlockHeight {
case -1: // unconfirmed
op := prev.OutPoint()
s.unconfirmed.spentUnconfirmed[*op] = t.txRecord
default:
b, err := s.lookupBlock(prev.BlockHeight)
if err != nil {
return 0, err
}
r, _, err := b.lookupTxRecord(prev.BlockIndex)
if err != nil {
return 0, err
}
// Update spent info. If this transaction (and possibly
// block) no longer contains any unspent transactions,
// remove from bookkeeping maps.
// Update spent info.
credit := prev.txRecord.credits[prev.OutputIndex]
if credit.spentBy != nil {
if *credit.spentBy == t.BlockTxKey {
@ -758,12 +726,7 @@ func (s *Store) markOutputsSpent(spent []*Credit, t *TxRecord) (btcutil.Amount,
return 0, ErrInconsistentStore
}
credit.spentBy = &t.BlockTxKey
if !r.hasUnspents() {
delete(b.unspent, prev.BlockIndex)
if len(b.unspent) == 0 {
delete(s.unspent, b.Height)
}
}
delete(s.unspent, *op)
if t.BlockHeight == -1 { // unconfirmed
op := prev.OutPoint()
key := prev.outputKey()
@ -772,8 +735,7 @@ func (s *Store) markOutputsSpent(spent []*Credit, t *TxRecord) (btcutil.Amount,
}
// Increment total debited amount.
v := r.Tx().MsgTx().TxOut[prev.OutputIndex].Value
a += btcutil.Amount(v)
a += prev.Amount()
}
}
@ -790,18 +752,6 @@ func (s *Store) markOutputsSpent(spent []*Credit, t *TxRecord) (btcutil.Amount,
return a, nil
}
func (r *txRecord) hasUnspents() bool {
for _, credit := range r.credits {
if credit == nil {
continue
}
if credit.spentBy == nil {
return true
}
}
return false
}
// AddCredit marks the transaction record as containing a transaction output
// spendable by wallet. The output is added unspent, and is marked spent
// when a new transaction spending the output is inserted into the store.
@ -825,14 +775,10 @@ func (t *TxRecord) AddCredit(index uint32, change bool) (*Credit, error) {
if err != nil {
return nil, err
}
_, txsIndex, err := b.lookupTxRecord(t.Tx().Index())
if err != nil {
return nil, err
}
// New outputs are added unspent.
t.s.unspent[t.BlockTxKey.BlockHeight] = struct{}{}
b.unspent[t.Tx().Index()] = txsIndex
op := btcwire.OutPoint{Hash: *t.tx.Sha(), Index: index}
t.s.unspent[op] = t.BlockTxKey
switch a := t.tx.MsgTx().TxOut[index].Value; t.tx.Index() {
case 0: // Coinbase
b.amountDeltas.Reward += btcutil.Amount(a)
@ -855,7 +801,6 @@ func (s *Store) Rollback(height int32) error {
s.blocks = s.blocks[:i]
for _, b := range detached {
delete(s.blockIndexes, b.Block.Height)
delete(s.unspent, b.Block.Height)
for _, r := range b.txs {
oldTxIndex := r.Tx().Index()
@ -872,13 +817,21 @@ func (s *Store) Rollback(height int32) error {
s.unconfirmed.previousOutpoints[op] = r
}
// For each detached spent credit, lookup the spender
// and modify its debit record to reference spending an
// For each detached spent credit, remove from the
// store's unspent map, and lookup the spender and
// modify its debit record to reference spending an
// unconfirmed transaction.
for outIdx, credit := range r.credits {
if credit == nil {
continue
}
op := btcwire.OutPoint{
Hash: *r.Tx().Sha(),
Index: uint32(outIdx),
}
delete(s.unspent, op)
spenderKey := credit.spentBy
if spenderKey == nil {
continue
@ -903,10 +856,6 @@ func (s *Store) Rollback(height int32) error {
}
// Swap the maps the spender is saved in.
op := btcwire.OutPoint{
Hash: *r.Tx().Sha(),
Index: uint32(outIdx),
}
delete(s.unconfirmed.spentBlockOutPointKeys, op)
delete(s.unconfirmed.spentBlockOutPoints, prev)
s.unconfirmed.spentUnconfirmed[op] = spender
@ -1076,25 +1025,40 @@ func (s *Store) removeConflict(r *txRecord) error {
// UnspentOutputs returns all unspent received transaction outputs.
// The order is undefined.
func (s *Store) UnspentOutputs() ([]*Credit, error) {
unspent := make([]*Credit, 0, len(s.unspent))
for height := range s.unspent {
b, err := s.lookupBlock(height)
if err != nil {
return nil, err
}
for blockIndex, index := range b.unspent {
r := b.txs[index]
for outputIndex, credit := range r.credits {
if credit == nil || credit.spentBy != nil {
continue
}
key := BlockTxKey{blockIndex, b.Height}
txRecord := &TxRecord{key, r, s}
c := &Credit{txRecord, uint32(outputIndex)}
unspent = append(unspent, c)
}
}
type createdCredit struct {
credit *Credit
err error
}
creditChans := make([]chan createdCredit, len(s.unspent))
i := 0
for op, key := range s.unspent {
creditChans[i] = make(chan createdCredit)
go func(i int, opIndex uint32) {
r, err := s.lookupBlockTx(key)
if err != nil {
creditChans[i] <- createdCredit{err: err}
return
}
t := &TxRecord{key, r, s}
c := &Credit{t, opIndex}
creditChans[i] <- createdCredit{credit: c}
}(i, op.Index)
i++
}
unspent := make([]*Credit, 0, len(s.unspent))
for _, c := range creditChans {
cc, ok := <-c
if !ok {
continue
}
if cc.err != nil {
return nil, cc.err
}
unspent = append(unspent, cc.credit)
}
for _, r := range s.unconfirmed.txs {
for outputIndex, credit := range r.credits {
if credit == nil || credit.spentBy != nil {
@ -1106,6 +1070,7 @@ func (s *Store) UnspentOutputs() ([]*Credit, error) {
unspent = append(unspent, c)
}
}
return unspent, nil
}
@ -1117,29 +1082,41 @@ type unspentTx struct {
sliceIndex uint32
}
type unspentTxs []unspentTx
func (u unspentTxs) Len() int { return len(u) }
func (u unspentTxs) Less(i, j int) bool { return u[i].blockIndex < u[j].blockIndex }
func (u unspentTxs) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
type int32Slice []int32
func (s int32Slice) Len() int { return len(s) }
func (s int32Slice) Less(i, j int) bool { return s[i] < s[j] }
func (s int32Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
type txRecordSlice []*txRecord
func (s txRecordSlice) Len() int { return len(s) }
func (s txRecordSlice) Less(i, j int) bool { return s[i].received.Before(s[j].received) }
func (s txRecordSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
type creditSlice []*Credit
func (s creditSlice) Len() int { return len(s) }
func (s creditSlice) Less(i, j int) bool { return s[i].received.Before(s[j].received) }
func (s creditSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s creditSlice) Len() int {
return len(s)
}
func (s creditSlice) Less(i, j int) bool {
switch {
// If both credits are from the same tx, sort by output index.
case s[i].Tx().Sha() == s[j].Tx().Sha():
return s[i].OutputIndex < s[j].OutputIndex
// If both transactions are unmined, sort by their received date.
case s[i].BlockIndex == -1 && s[j].BlockIndex == -1:
return s[i].received.Before(s[j].received)
// Unmined (newer) txs always come last.
case s[i].BlockIndex == -1:
return false
case s[j].BlockIndex == -1:
return true
// If both txs are mined in different blocks, sort by block height.
case s[i].BlockHeight != s[j].BlockHeight:
return s[i].BlockHeight < s[j].BlockHeight
// Both txs share the same block, sort by block index.
default:
return s[i].BlockIndex < s[j].BlockIndex
}
}
func (s creditSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// SortedUnspentOutputs returns all unspent recevied transaction outputs.
// The order is first unmined transactions (sorted by receive date), then
@ -1148,71 +1125,11 @@ func (s creditSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// index in increasing order. Credits (outputs) from the same transaction
// are sorted by output index in increasing order.
func (s *Store) SortedUnspentOutputs() ([]*Credit, error) {
// The cap isn't the actual max this slice can grow to, but should help
// by avoid some realloations after each append.
unspent := make([]*Credit, 0, len(s.unspent)+len(s.unconfirmed.txs))
// Create slice of unspent unconfirmed transactions sorted by receive
// date (newest to oldest).
unconfirmedTxs := make(txRecordSlice, 0, len(s.unconfirmed.txs))
for _, r := range s.unconfirmed.txs {
unconfirmedTxs = append(unconfirmedTxs, r)
unspent, err := s.UnspentOutputs()
if err != nil {
return []*Credit{}, err
}
sort.Sort(sort.Reverse(unconfirmedTxs))
// For each sorted unconfirmed tx, append its unspent credits.
for _, r := range unconfirmedTxs {
for outputIndex, credit := range r.credits {
if credit == nil || credit.spentBy != nil {
continue
}
key := BlockTxKey{BlockHeight: -1}
txRecord := &TxRecord{key, r, s}
c := &Credit{txRecord, uint32(outputIndex)}
unspent = append(unspent, c)
}
}
// Create slice of sorted block heights of blocks (decreasing order)
// that contain unspent txouts.
blockHeights := make([]int32, 0, len(s.unspent))
for height := range s.unspent {
blockHeights = append(blockHeights, height)
}
sort.Sort(sort.Reverse(int32Slice(blockHeights)))
for _, height := range blockHeights {
b, err := s.lookupBlock(height)
if err != nil {
return nil, err
}
// Create slice of sorted transaction indexes for txs with
// unspent outputs.
unspents := make([]unspentTx, 0, len(b.unspent))
for blockIndex, index := range b.unspent {
u := unspentTx{blockIndex, index}
unspents = append(unspents, u)
}
sort.Sort(unspentTxs(unspents))
// Iterate through each transaction from this block.
for _, unspentTx := range unspents {
r := b.txs[unspentTx.sliceIndex]
// Credits are alredy sorted, with non-credit outputs
// nilled, so no need to create a new sorted slice.
for outputIndex, credit := range r.credits {
if credit == nil || credit.spentBy != nil {
continue
}
key := BlockTxKey{unspentTx.blockIndex, b.Height}
txRecord := &TxRecord{key, r, s}
c := &Credit{txRecord, uint32(outputIndex)}
unspent = append(unspent, c)
}
}
}
sort.Sort(sort.Reverse(creditSlice(unspent)))
return unspent, nil
}