diff --git a/scriptval.go b/scriptval.go index 26360eb2..3e0e7598 100644 --- a/scriptval.go +++ b/scriptval.go @@ -10,108 +10,199 @@ import ( "github.com/conformal/btcutil" "github.com/conformal/btcwire" "math" + "runtime" ) -// txValidate is used to track results of validating scripts for each -// transaction input index. -type txValidate struct { - txIndex int - err error +// txValidateItem holds a transaction along with which input to validate. +type txValidateItem struct { + txInIndex int + txIn *btcwire.TxIn + tx *btcutil.Tx } -// validateTxIn validates a the script pair for the passed spending transaction -// (along with the specific input index) and origin transaction (with the -// specific output index). -func validateTxIn(txInIdx int, txin *btcwire.TxIn, tx *btcutil.Tx, originTx *btcutil.Tx, flags btcscript.ScriptFlags) error { - // If the input transaction has no previous input, there is nothing - // to check. - originTxIdx := txin.PreviousOutpoint.Index - if originTxIdx == math.MaxUint32 { +// txValidator provides a type which asynchronously validates transaction +// inputs. It provides several channels for communication and a processing +// function that is intended to be in run multiple goroutines. +type txValidator struct { + validateChan chan *txValidateItem + quitChan chan bool + resultChan chan error + txStore TxStore + flags btcscript.ScriptFlags +} + +// sendResult sends the result of a script pair validation on the internal +// result channel while respecting the quit channel. The allows orderly +// shutdown when the validation process is aborted early due to a validation +// error in one of the other goroutines. +func (v *txValidator) sendResult(result error) { + select { + case v.resultChan <- result: + case <-v.quitChan: + } +} + +// validateHandler consumes items to validate from the internal validate channel +// and returns the result of the validation on the internal result channel. It +// must be run as a goroutine. +func (v *txValidator) validateHandler() { +out: + for { + select { + case txVI := <-v.validateChan: + // Ensure the referenced input transaction is available. + //txIn := txVI.tx.MsgTx().TxIn[txVI.txInIdx] + txIn := txVI.txIn + txInHash := &txIn.PreviousOutpoint.Hash + originTx, exists := v.txStore[*txInHash] + if !exists || originTx.Err != nil || originTx.Tx == nil { + err := fmt.Errorf("unable to find input "+ + "transaction %v referenced from "+ + "transaction %v", txInHash, + txVI.tx.Sha()) + v.sendResult(err) + break out + } + originMsgTx := originTx.Tx.MsgTx() + + // Ensure the output index in the referenced transaction + // is available. + originTxIndex := txIn.PreviousOutpoint.Index + if originTxIndex >= uint32(len(originMsgTx.TxOut)) { + err := fmt.Errorf("out of bounds "+ + "input index %d in transaction %v "+ + "referenced from transaction %v", + originTxIndex, txInHash, txVI.tx.Sha()) + v.sendResult(err) + break out + } + + // Create a new script engine for the script pair. + sigScript := txIn.SignatureScript + pkScript := originMsgTx.TxOut[originTxIndex].PkScript + engine, err := btcscript.NewScript(sigScript, pkScript, + txVI.txInIndex, txVI.tx.MsgTx(), v.flags) + if err != nil { + v.sendResult(err) + break out + } + + // Execute the script pair. + if err := engine.Execute(); err != nil { + err := fmt.Errorf("validate of input "+ + "%d failed: %v", txVI.txInIndex, err) + v.sendResult(err) + break out + } + + // Validation succeeded. + v.sendResult(nil) + + case <-v.quitChan: + break out + } + } +} + +// Validate validates the scripts for all of the passed transaction inputs using +// multiple goroutines. +func (v *txValidator) Validate(items []*txValidateItem) error { + if len(items) == 0 { return nil } - if originTxIdx >= uint32(len(originTx.MsgTx().TxOut)) { - originTxSha := &txin.PreviousOutpoint.Hash - log.Warnf("unable to locate source tx %v spending tx %v", - originTxSha, tx.Sha()) - return fmt.Errorf("invalid index %x", originTxIdx) + // Limit the number of goroutines to do script validation based on the + // number of processor cores. This help ensure the system stays + // reasonably responsive under heavy load. + maxGoRoutines := runtime.NumCPU() * 3 + if maxGoRoutines <= 0 { + maxGoRoutines = 1 + } + if maxGoRoutines > len(items) { + maxGoRoutines = len(items) } - sigScript := txin.SignatureScript - pkScript := originTx.MsgTx().TxOut[originTxIdx].PkScript - engine, err := btcscript.NewScript(sigScript, pkScript, txInIdx, - tx.MsgTx(), flags) - if err != nil { - return err + // Start up validation handlers that are used to asynchronously + // validate each transaction input. + for i := 0; i < maxGoRoutines; i++ { + go v.validateHandler() } - err = engine.Execute() - if err != nil { - log.Warnf("validate of input %v failed: %v", txInIdx, err) - return err + // Validate each of the inputs. The quit channel is closed when any + // errors occur so all processing goroutines exit regardless of which + // input had the validation error. + numInputs := len(items) + currentItem := 0 + processedItems := 0 + for processedItems < numInputs { + // Only send items while there are still items that need to + // be processed. The select statement will never select a nil + // channel. + var validateChan chan *txValidateItem + var item *txValidateItem + if currentItem < numInputs { + validateChan = v.validateChan + item = items[currentItem] + } + + select { + case validateChan <- item: + currentItem++ + + case err := <-v.resultChan: + processedItems++ + if err != nil { + close(v.quitChan) + return err + } + } } + close(v.quitChan) return nil } +// newTxValidator returns a new instance of txValidator to be used for +// validating transaction scripts asynchronously. +func newTxValidator(txStore TxStore, flags btcscript.ScriptFlags) *txValidator { + return &txValidator{ + validateChan: make(chan *txValidateItem), + quitChan: make(chan bool), + resultChan: make(chan error), + txStore: txStore, + flags: flags, + } +} + // ValidateTransactionScripts validates the scripts for the passed transaction // using multiple goroutines. -func ValidateTransactionScripts(tx *btcutil.Tx, txStore TxStore, flags btcscript.ScriptFlags) (err error) { - c := make(chan txValidate) - job := tx.MsgTx().TxIn - resultErrors := make([]error, len(job)) - - var currentItem int - var completedItems int - - processFunc := func(txInIdx int) { - log.Tracef("validating tx %v input %v len %v", - tx.Sha(), txInIdx, len(job)) - txin := job[txInIdx] - originTxSha := &txin.PreviousOutpoint.Hash - origintxidx := txin.PreviousOutpoint.Index - - var originTx *btcutil.Tx - if origintxidx != math.MaxUint32 { - txInfo, ok := txStore[*originTxSha] - if !ok { - //wtf? - fmt.Printf("obj not found in txStore %v", - originTxSha) - } - originTx = txInfo.Tx +func ValidateTransactionScripts(tx *btcutil.Tx, txStore TxStore, flags btcscript.ScriptFlags) error { + // Collect all of the transaction inputs and required information for + // validation. + txIns := tx.MsgTx().TxIn + txValItems := make([]*txValidateItem, 0, len(txIns)) + for txInIdx, txIn := range txIns { + // Skip coinbases. + if txIn.PreviousOutpoint.Index == math.MaxUint32 { + continue } - err := validateTxIn(txInIdx, txin, tx, originTx, flags) - r := txValidate{txInIdx, err} - c <- r - } - for currentItem = 0; currentItem < len(job) && currentItem < 16; currentItem++ { - go processFunc(currentItem) - } - for completedItems < len(job) { - select { - case result := <-c: - completedItems++ - resultErrors[result.txIndex] = result.err - // would be nice to determine if we could stop - // on early errors here instead of running more. - if err == nil { - err = result.err - } - if currentItem < len(job) { - go processFunc(currentItem) - currentItem++ - } + txVI := &txValidateItem{ + txInIndex: txInIdx, + txIn: txIn, + tx: tx, } + txValItems = append(txValItems, txVI) } - for i := 0; i < len(job); i++ { - if resultErrors[i] != nil { - log.Warnf("tx %v failed input %v, err %v", tx.Sha(), i, - resultErrors[i]) - } + + // Validate all of the inputs. + validator := newTxValidator(txStore, flags) + if err := validator.Validate(txValItems); err != nil { + return err } - return + + return nil + } // checkBlockScripts executes and validates the scripts for all transactions in @@ -124,38 +215,33 @@ func checkBlockScripts(block *btcutil.Block, txStore TxStore) error { flags |= btcscript.ScriptBip16 } - txList := block.Transactions() - c := make(chan txValidate) - resultErrors := make([]error, len(txList)) - - var currentItem int - var completedItems int - processFunc := func(txIdx int) { - err := ValidateTransactionScripts(txList[txIdx], txStore, flags) - r := txValidate{txIdx, err} - c <- r + // Collect all of the transaction inputs and required information for + // validation for all transactions in the block into a single slice. + numInputs := 0 + for _, tx := range block.Transactions() { + numInputs += len(tx.MsgTx().TxIn) } - for currentItem = 0; currentItem < len(txList) && currentItem < 8; currentItem++ { - go processFunc(currentItem) - } - for completedItems < len(txList) { - select { - case result := <-c: - completedItems++ - resultErrors[result.txIndex] = result.err - // would be nice to determine if we could stop - // on early errors here instead of running more. - - if currentItem < len(txList) { - go processFunc(currentItem) - currentItem++ + txValItems := make([]*txValidateItem, 0, numInputs) + for _, tx := range block.Transactions() { + for txInIdx, txIn := range tx.MsgTx().TxIn { + // Skip coinbases. + if txIn.PreviousOutpoint.Index == math.MaxUint32 { + continue } + + txVI := &txValidateItem{ + txInIndex: txInIdx, + txIn: txIn, + tx: tx, + } + txValItems = append(txValItems, txVI) } } - for i := 0; i < len(txList); i++ { - if resultErrors[i] != nil { - return resultErrors[i] - } + + // Validate all of the inputs. + validator := newTxValidator(txStore, flags) + if err := validator.Validate(txValItems); err != nil { + return err } return nil