Rework and improve async script validation logic.

The previous script validation logic entailed starting up a hard-coded
number of goroutines to process the transaction scripts in parallel.  In
particular, one goroutine (up to 8 max) was started per transaction in a
block and another one was started for each input script pair in the
each transaction.  This resulted in 64 goroutines simultaneously running
scripts and verifying cryptographic signatures.  This could easily lead to
the overall system feeling sluggish.

Further the previous design could also result in bursty behavior since the
number of inputs to a transaction as well as its complexity can vary
widely between transactions.  For example, starting 2 goroutines (one to
process the transaction and one for actual script pair validation) to
verify a transaction with a single input was not desirable.

Finally, the previous design validated all transactions and inputs
regardless of a failure in one of the other scripts.  This really didn't
have a big impact since it's quite rare that blocks with invalid
verifications are being processed, but it was a potential way DoS vector.

This commit changes the logic in a few ways to improve things:

- The max number of validation goroutines is now based on the number of
  cores in the system
- All transaction inputs from all transactions in the block are collated
  into a single list which is fed through the aforementioned validation
  goroutines
- The validation CPU usage is much more consistent due to the collation of
  inputs
- A validation error in any goroutine immediately stops validation of all
  remaining inputs
- The errors have been improved to include context about what tx script
  pair failed as opposed to showing the information as a warning

This closes conformal/btcd#59.
This commit is contained in:
Dave Collins 2014-01-16 12:48:37 -06:00
parent 28f485a1d1
commit 84f6089bc9

View file

@ -10,108 +10,199 @@ import (
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
"math"
"runtime"
)
// txValidate is used to track results of validating scripts for each
// transaction input index.
type txValidate struct {
txIndex int
err error
// txValidateItem holds a transaction along with which input to validate.
type txValidateItem struct {
txInIndex int
txIn *btcwire.TxIn
tx *btcutil.Tx
}
// validateTxIn validates a the script pair for the passed spending transaction
// (along with the specific input index) and origin transaction (with the
// specific output index).
func validateTxIn(txInIdx int, txin *btcwire.TxIn, tx *btcutil.Tx, originTx *btcutil.Tx, flags btcscript.ScriptFlags) error {
// If the input transaction has no previous input, there is nothing
// to check.
originTxIdx := txin.PreviousOutpoint.Index
if originTxIdx == math.MaxUint32 {
// txValidator provides a type which asynchronously validates transaction
// inputs. It provides several channels for communication and a processing
// function that is intended to be in run multiple goroutines.
type txValidator struct {
validateChan chan *txValidateItem
quitChan chan bool
resultChan chan error
txStore TxStore
flags btcscript.ScriptFlags
}
// sendResult sends the result of a script pair validation on the internal
// result channel while respecting the quit channel. The allows orderly
// shutdown when the validation process is aborted early due to a validation
// error in one of the other goroutines.
func (v *txValidator) sendResult(result error) {
select {
case v.resultChan <- result:
case <-v.quitChan:
}
}
// validateHandler consumes items to validate from the internal validate channel
// and returns the result of the validation on the internal result channel. It
// must be run as a goroutine.
func (v *txValidator) validateHandler() {
out:
for {
select {
case txVI := <-v.validateChan:
// Ensure the referenced input transaction is available.
//txIn := txVI.tx.MsgTx().TxIn[txVI.txInIdx]
txIn := txVI.txIn
txInHash := &txIn.PreviousOutpoint.Hash
originTx, exists := v.txStore[*txInHash]
if !exists || originTx.Err != nil || originTx.Tx == nil {
err := fmt.Errorf("unable to find input "+
"transaction %v referenced from "+
"transaction %v", txInHash,
txVI.tx.Sha())
v.sendResult(err)
break out
}
originMsgTx := originTx.Tx.MsgTx()
// Ensure the output index in the referenced transaction
// is available.
originTxIndex := txIn.PreviousOutpoint.Index
if originTxIndex >= uint32(len(originMsgTx.TxOut)) {
err := fmt.Errorf("out of bounds "+
"input index %d in transaction %v "+
"referenced from transaction %v",
originTxIndex, txInHash, txVI.tx.Sha())
v.sendResult(err)
break out
}
// Create a new script engine for the script pair.
sigScript := txIn.SignatureScript
pkScript := originMsgTx.TxOut[originTxIndex].PkScript
engine, err := btcscript.NewScript(sigScript, pkScript,
txVI.txInIndex, txVI.tx.MsgTx(), v.flags)
if err != nil {
v.sendResult(err)
break out
}
// Execute the script pair.
if err := engine.Execute(); err != nil {
err := fmt.Errorf("validate of input "+
"%d failed: %v", txVI.txInIndex, err)
v.sendResult(err)
break out
}
// Validation succeeded.
v.sendResult(nil)
case <-v.quitChan:
break out
}
}
}
// Validate validates the scripts for all of the passed transaction inputs using
// multiple goroutines.
func (v *txValidator) Validate(items []*txValidateItem) error {
if len(items) == 0 {
return nil
}
if originTxIdx >= uint32(len(originTx.MsgTx().TxOut)) {
originTxSha := &txin.PreviousOutpoint.Hash
log.Warnf("unable to locate source tx %v spending tx %v",
originTxSha, tx.Sha())
return fmt.Errorf("invalid index %x", originTxIdx)
// Limit the number of goroutines to do script validation based on the
// number of processor cores. This help ensure the system stays
// reasonably responsive under heavy load.
maxGoRoutines := runtime.NumCPU() * 3
if maxGoRoutines <= 0 {
maxGoRoutines = 1
}
if maxGoRoutines > len(items) {
maxGoRoutines = len(items)
}
sigScript := txin.SignatureScript
pkScript := originTx.MsgTx().TxOut[originTxIdx].PkScript
engine, err := btcscript.NewScript(sigScript, pkScript, txInIdx,
tx.MsgTx(), flags)
if err != nil {
return err
// Start up validation handlers that are used to asynchronously
// validate each transaction input.
for i := 0; i < maxGoRoutines; i++ {
go v.validateHandler()
}
err = engine.Execute()
if err != nil {
log.Warnf("validate of input %v failed: %v", txInIdx, err)
return err
// Validate each of the inputs. The quit channel is closed when any
// errors occur so all processing goroutines exit regardless of which
// input had the validation error.
numInputs := len(items)
currentItem := 0
processedItems := 0
for processedItems < numInputs {
// Only send items while there are still items that need to
// be processed. The select statement will never select a nil
// channel.
var validateChan chan *txValidateItem
var item *txValidateItem
if currentItem < numInputs {
validateChan = v.validateChan
item = items[currentItem]
}
select {
case validateChan <- item:
currentItem++
case err := <-v.resultChan:
processedItems++
if err != nil {
close(v.quitChan)
return err
}
}
}
close(v.quitChan)
return nil
}
// newTxValidator returns a new instance of txValidator to be used for
// validating transaction scripts asynchronously.
func newTxValidator(txStore TxStore, flags btcscript.ScriptFlags) *txValidator {
return &txValidator{
validateChan: make(chan *txValidateItem),
quitChan: make(chan bool),
resultChan: make(chan error),
txStore: txStore,
flags: flags,
}
}
// ValidateTransactionScripts validates the scripts for the passed transaction
// using multiple goroutines.
func ValidateTransactionScripts(tx *btcutil.Tx, txStore TxStore, flags btcscript.ScriptFlags) (err error) {
c := make(chan txValidate)
job := tx.MsgTx().TxIn
resultErrors := make([]error, len(job))
var currentItem int
var completedItems int
processFunc := func(txInIdx int) {
log.Tracef("validating tx %v input %v len %v",
tx.Sha(), txInIdx, len(job))
txin := job[txInIdx]
originTxSha := &txin.PreviousOutpoint.Hash
origintxidx := txin.PreviousOutpoint.Index
var originTx *btcutil.Tx
if origintxidx != math.MaxUint32 {
txInfo, ok := txStore[*originTxSha]
if !ok {
//wtf?
fmt.Printf("obj not found in txStore %v",
originTxSha)
}
originTx = txInfo.Tx
func ValidateTransactionScripts(tx *btcutil.Tx, txStore TxStore, flags btcscript.ScriptFlags) error {
// Collect all of the transaction inputs and required information for
// validation.
txIns := tx.MsgTx().TxIn
txValItems := make([]*txValidateItem, 0, len(txIns))
for txInIdx, txIn := range txIns {
// Skip coinbases.
if txIn.PreviousOutpoint.Index == math.MaxUint32 {
continue
}
err := validateTxIn(txInIdx, txin, tx, originTx, flags)
r := txValidate{txInIdx, err}
c <- r
}
for currentItem = 0; currentItem < len(job) && currentItem < 16; currentItem++ {
go processFunc(currentItem)
}
for completedItems < len(job) {
select {
case result := <-c:
completedItems++
resultErrors[result.txIndex] = result.err
// would be nice to determine if we could stop
// on early errors here instead of running more.
if err == nil {
err = result.err
}
if currentItem < len(job) {
go processFunc(currentItem)
currentItem++
}
txVI := &txValidateItem{
txInIndex: txInIdx,
txIn: txIn,
tx: tx,
}
txValItems = append(txValItems, txVI)
}
for i := 0; i < len(job); i++ {
if resultErrors[i] != nil {
log.Warnf("tx %v failed input %v, err %v", tx.Sha(), i,
resultErrors[i])
}
// Validate all of the inputs.
validator := newTxValidator(txStore, flags)
if err := validator.Validate(txValItems); err != nil {
return err
}
return
return nil
}
// checkBlockScripts executes and validates the scripts for all transactions in
@ -124,38 +215,33 @@ func checkBlockScripts(block *btcutil.Block, txStore TxStore) error {
flags |= btcscript.ScriptBip16
}
txList := block.Transactions()
c := make(chan txValidate)
resultErrors := make([]error, len(txList))
var currentItem int
var completedItems int
processFunc := func(txIdx int) {
err := ValidateTransactionScripts(txList[txIdx], txStore, flags)
r := txValidate{txIdx, err}
c <- r
// Collect all of the transaction inputs and required information for
// validation for all transactions in the block into a single slice.
numInputs := 0
for _, tx := range block.Transactions() {
numInputs += len(tx.MsgTx().TxIn)
}
for currentItem = 0; currentItem < len(txList) && currentItem < 8; currentItem++ {
go processFunc(currentItem)
}
for completedItems < len(txList) {
select {
case result := <-c:
completedItems++
resultErrors[result.txIndex] = result.err
// would be nice to determine if we could stop
// on early errors here instead of running more.
if currentItem < len(txList) {
go processFunc(currentItem)
currentItem++
txValItems := make([]*txValidateItem, 0, numInputs)
for _, tx := range block.Transactions() {
for txInIdx, txIn := range tx.MsgTx().TxIn {
// Skip coinbases.
if txIn.PreviousOutpoint.Index == math.MaxUint32 {
continue
}
txVI := &txValidateItem{
txInIndex: txInIdx,
txIn: txIn,
tx: tx,
}
txValItems = append(txValItems, txVI)
}
}
for i := 0; i < len(txList); i++ {
if resultErrors[i] != nil {
return resultErrors[i]
}
// Validate all of the inputs.
validator := newTxValidator(txStore, flags)
if err := validator.Validate(txValItems); err != nil {
return err
}
return nil