Implement transaction pool and relay.

This commit is a rather large one which implements transaction pool and
relay according to the protocol rules of the reference implementation.
It makes use of btcchain to ensure the transactions are valid for the
block chain and includes several stricter checks which determine if they
are "standard" or not before admitting them into the pool and relaying
them.

There are still a few TODOs around the more strict rules which determine
which transactions are willing to be mined, but the core checks which
are imperative (everything except the all of the "standard" checks really)
to operate as a good citizen on the bitcoin network are in place.
This commit is contained in:
Dave Collins 2013-09-20 12:55:27 -05:00
parent 3678153ae4
commit 78e9b94d93
5 changed files with 837 additions and 29 deletions

View file

@ -10,9 +10,6 @@ yet.
The following is a list of major items remaining before production release: The following is a list of major items remaining before production release:
- Implement multi-peer support
- Implement transaction mempool & relay
- Complete address manager
- Documentation - Documentation
- A lot of code cleanup - A lot of code cleanup
- Optimize - Optimize

View file

@ -326,20 +326,25 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// request parent blocks of orphans if we receive one we already have. // request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains // Finally, attempt to detect potential stalls due to long side chains
// we already have and request more blocks to prevent them. // we already have and request more blocks to prevent them.
chain := b.blockChain
for i, iv := range invVects { for i, iv := range invVects {
switch iv.Type { // Ignore unsupported inventory types.
case btcwire.InvVect_Block: if iv.Type != btcwire.InvVect_Block && iv.Type != btcwire.InvVect_Tx {
// Add the inventory to the cache of known inventory continue
// for the peer. }
imsg.peer.addKnownInventory(iv)
// Request the inventory if we don't already have it. // Add the inventory to the cache of known inventory
if !b.blockChain.HaveInventory(iv) { // for the peer.
// Add it to the request queue. imsg.peer.addKnownInventory(iv)
imsg.peer.requestQueue.PushBack(iv)
continue
}
// Request the inventory if we don't already have it.
if !chain.HaveInventory(iv) {
// Add it to the request queue.
imsg.peer.requestQueue.PushBack(iv)
continue
}
if iv.Type == btcwire.InvVect_Block {
// The block is an orphan block that we already have. // The block is an orphan block that we already have.
// When the existing orphan was processed, it requested // When the existing orphan was processed, it requested
// the missing parent blocks. When this scenario // the missing parent blocks. When this scenario
@ -350,13 +355,12 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// resending the orphan block as an available block // resending the orphan block as an available block
// to signal there are more missing blocks that need to // to signal there are more missing blocks that need to
// be requested. // be requested.
if b.blockChain.IsKnownOrphan(&iv.Hash) { if chain.IsKnownOrphan(&iv.Hash) {
// Request blocks starting at the latest known // Request blocks starting at the latest known
// up to the root of the orphan that just came // up to the root of the orphan that just came
// in. // in.
orphanRoot := b.blockChain.GetOrphanRoot( orphanRoot := chain.GetOrphanRoot(&iv.Hash)
&iv.Hash) locator, err := chain.LatestBlockLocator()
locator, err := b.blockChain.LatestBlockLocator()
if err != nil { if err != nil {
log.Errorf("[PEER] Failed to get block "+ log.Errorf("[PEER] Failed to get block "+
"locator for the latest block: "+ "locator for the latest block: "+
@ -375,14 +379,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// Request blocks after this one up to the // Request blocks after this one up to the
// final one the remote peer knows about (zero // final one the remote peer knows about (zero
// stop hash). // stop hash).
locator := b.blockChain.BlockLocatorFromHash( locator := chain.BlockLocatorFromHash(&iv.Hash)
&iv.Hash)
imsg.peer.PushGetBlocksMsg(locator, &zeroHash) imsg.peer.PushGetBlocksMsg(locator, &zeroHash)
} }
// Ignore unsupported inventory types.
default:
continue
} }
} }
@ -390,7 +389,8 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// the request will be requested on the next inv message. // the request will be requested on the next inv message.
numRequested := 0 numRequested := 0
gdmsg := btcwire.NewMsgGetData() gdmsg := btcwire.NewMsgGetData()
for e := imsg.peer.requestQueue.Front(); e != nil; e = imsg.peer.requestQueue.Front() { requestQueue := imsg.peer.requestQueue
for e := requestQueue.Front(); e != nil; e = requestQueue.Front() {
iv := e.Value.(*btcwire.InvVect) iv := e.Value.(*btcwire.InvVect)
imsg.peer.requestQueue.Remove(e) imsg.peer.requestQueue.Remove(e)
// check that no one else has asked for this. if so we don't // check that no one else has asked for this. if so we don't
@ -487,7 +487,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
block, ok := notification.Data.(*btcutil.Block) block, ok := notification.Data.(*btcutil.Block)
if !ok { if !ok {
log.Warnf("[BMGR] Chain notification type not a block.") log.Warnf("[BMGR] Chain accepted notification is not a block.")
break break
} }
@ -498,6 +498,40 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
// Generate the inventory vector and relay it. // Generate the inventory vector and relay it.
iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash) iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash)
b.server.RelayInventory(iv) b.server.RelayInventory(iv)
// A block has been connected to the main block chain.
case btcchain.NTBlockConnected:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
log.Warnf("[BMGR] Chain connected notification is not a block.")
break
}
// Remove all of the transactions (except the coinbase) in the
// connected block from the transaction pool.
for _, tx := range block.MsgBlock().Transactions[1:] {
b.server.txMemPool.removeTransaction(tx)
}
// A block has been disconnected from the main block chain.
case btcchain.NTBlockDisconnected:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
log.Warnf("[BMGR] Chain disconnected notification is not a block.")
break
}
// Reinsert all of the transactions (except the coinbase) into
// the transaction pool.
for _, tx := range block.MsgBlock().Transactions[1:] {
err := b.server.txMemPool.ProcessTransaction(tx)
if err != nil {
// Remove the transaction and all transactions
// that depend on it if it wasn't accepted into
// the transaction pool.
b.server.txMemPool.removeTransaction(tx)
}
}
} }
} }

751
mempool.go Normal file
View file

@ -0,0 +1,751 @@
// Copyright (c) 2013 Conformal Systems LLC.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"container/list"
"crypto/rand"
"fmt"
"github.com/conformal/btcchain"
"github.com/conformal/btcdb"
"github.com/conformal/btcscript"
"github.com/conformal/btcwire"
"math"
"math/big"
"sync"
"time"
)
const (
// mempoolHeight is the height used for the "block" height field of the
// contextual transaction information provided in a transaction store.
mempoolHeight = 0x7fffffff
// maxOrphanTransactions is the maximum number of orphan transactions
// that can be queued. At the time this comment was written, this
// equates to 10,000 transactions, but will increase if the max allowed
// block payload increases.
maxOrphanTransactions = btcwire.MaxBlockPayload / 100
// maxOrphanTxSize is the maximum size allowed for orphan transactions.
// This helps prevent memory exhaustion attacks from sending a lot of
// of big orphans.
maxOrphanTxSize = 5000
// maxStandardTxSize is the maximum size allowed for transactions that
// are considered standard and will therefore be relayed and considered
// for mining.
maxStandardTxSize = btcwire.MaxBlockPayload / 10
// maxStandardSigScriptSize is the maximum size allowed for a
// transaction input signature script to be considered standard. This
// value allows for a CHECKMULTISIG pay-to-sript-hash with 3 signatures
// since each signature is about 80-bytes, the 3 corresponding public
// keys are 65-bytes each if uncompressed, and the script opcodes take
// a few extra bytes. This value also adds a few extra bytes for
// prosperity. 3*80 + 3*65 + 65 = 500
maxStandardSigScriptSize = 500
// maxStandardMultiSigs is the maximum number of signatures
// allowed in a multi-signature transaction output script for it to be
// considered standard.
maxStandardMultiSigs = 3
// minTxRelayFee is the minimum fee in satoshi that is required for
// a transaction to be treated as free for relay purposes. It is also
// used to help determine if a transation is considered dust.
minTxRelayFee = 10000
)
// txMemPool is used as a source of transactions that need to be mined into
// blocks and relayed to other peers. It is safe for concurrent access from
// multiple peers.
type txMemPool struct {
server *server
pool map[btcwire.ShaHash]*btcwire.MsgTx
orphans map[btcwire.ShaHash]*btcwire.MsgTx
orphansByPrev map[btcwire.ShaHash]*list.List
outpoints map[btcwire.OutPoint]*btcwire.MsgTx
lock sync.RWMutex
}
// isDust returns whether or not the passed transaction output amount is
// considered dust or not. Dust is defined in terms of the minimum transaction
// relay fee. In particular, if the cost to the network to spend coins is more
// than 1/3 of the minimum transaction relay fee, it is considered dust.
func isDust(txOut *btcwire.TxOut) bool {
// Get the serialized size of the transaction output.
//
// TODO(davec): The serialized size should come from btcwire, but it
// currently doesn't provide a way to do so for transaction outputs, so
// calculate it here based on the current format.
// 8 bytes for value + 1 byte for script length + script length
txOutSize := 9 + len(txOut.PkScript)
// The total serialized size consists of the output and the associated
// input script to redeem it. Since there is no input script
// to redeem it yet, use the minimum size of a typical input script.
//
// Pay-to-pubkey-hash bytes breakdown:
//
// Output to hash (34 bytes):
// 8 value, 1 script len, 25 script [1 OP_DUP, 1 OP_HASH_160,
// 1 OP_DATA_20, 20 hash, 1 OP_EQUALVERIFY, 1 OP_CHECKSIG]
//
// Input with compressed pubkey (148 bytes):
// 36 prev outpoint, 1 script len, 107 script [1 OP_DATA_72, 72 sig,
// 1 OP_DATA_33, 33 compressed pubkey], 4 sequence
//
// Input with uncompressed pubkey (180 bytes):
// 36 prev outpoint, 1 script len, 139 script [1 OP_DATA_72, 72 sig,
// 1 OP_DATA_65, 65 compressed pubkey], 4 sequence
//
// Pay-to-pubkey bytes breakdown:
//
// Output to compressed pubkey (44 bytes):
// 8 value, 1 script len, 35 script [1 OP_DATA_33,
// 33 compressed pubkey, 1 OP_CHECKSIG]
//
// Output to uncompressed pubkey (76 bytes):
// 8 value, 1 script len, 67 script [1 OP_DATA_65, 65 pubkey,
// 1 OP_CHECKSIG]
//
// Input (114 bytes):
// 36 prev outpoint, 1 script len, 73 script [1 OP_DATA_72,
// 72 sig], 4 sequence
//
// Theoretically this could examine the script type of the output script
// and use a different size for the typical input script size for
// pay-to-pubkey vs pay-to-pubkey-hash inputs per the above breakdowns,
// but the only combinination which is less than the value chosen is
// a pay-to-pubkey script with a compressed pubkey, which is not very
// common.
//
// The most common scripts are pay-to-script-hash, and as per the above
// breakdown, the minimum size of a p2sh input script is 148 bytes. So
// that figure is used.
totalSize := txOutSize + 148
// The output is considered dust if the cost to the network to spend the
// coins is more than 1/3 of the minimum transaction relay fee.
// minTxRelayFee is in Satoshi/KB (kilobyte, not kibibyte), so
// multiply by 1000 to convert bytes.
//
// Using the typical values for a pay-to-script-hash transaction from
// the breakdown above and the default minimum transaction relay fee of
// 10000, this equates to values less than 5460 satoshi being considered
// dust.
//
// The following is equivalent to (value/totalSize) * (1/3) * 1000
// without needing to do floating point math.
return txOut.Value*1000/(3*int64(totalSize)) < minTxRelayFee
}
// checkPkScriptStandard performs a series of checks on a transaction ouput
// script (public key script) to ensure it is a "standard" public key script.
// A standard public key script is one that is a recognized form, and for
// multi-signature scripts, only contains from 1 to 3 signatures.
func checkPkScriptStandard(pkScript []byte) error {
scriptClass := btcscript.GetScriptClass(pkScript)
switch scriptClass {
case btcscript.MultiSigTy:
// TODO(davec): Need to get the actual number of signatures.
numSigs := 1
if numSigs < 1 {
return fmt.Errorf("multi-signature script with no " +
"signatures")
}
if numSigs > maxStandardMultiSigs {
fmt.Errorf("multi-signature script with %d signatures "+
"which is more than the allowed max of %d",
numSigs, maxStandardMultiSigs)
}
case btcscript.NonStandardTy:
return fmt.Errorf("non-standard script form")
}
return nil
}
// checkTransactionStandard performs a series of checks on a transaction to
// ensure it is a "standard" transaction. A standard transaction is one that
// conforms to several additional limiting cases over what is considered a
// "sane" transaction such as having a version in the supported range, being
// finalized, conforming to more stringent size constraints, having scripts
// of recognized forms, and not containing "dust" outputs (those that are
// so small it costs more to process them than they are worth).
func checkTransactionStandard(tx *btcwire.MsgTx, height int64) error {
// The transaction must be a currently supported version.
if tx.Version > btcwire.TxVersion || tx.Version < 1 {
return fmt.Errorf("transaction version %d is not in the "+
"valid range of %d-%d", tx.Version, 1,
btcwire.TxVersion)
}
// The transaction must be finalized to be standard and therefore
// considered for inclusion in a block.
if !btcchain.IsFinalizedTransaction(tx, height, time.Now()) {
return fmt.Errorf("transaction is not finalized")
}
// Since extremely large transactions with a lot of inputs can cost
// almost as much to process as the sender fees, limit the maximum
// size of a transaction. This also helps mitigate CPU exhaustion
// attacks.
var serializedTxBuf bytes.Buffer
err := tx.Serialize(&serializedTxBuf)
if err != nil {
return err
}
serializedLen := serializedTxBuf.Len()
if serializedLen > maxStandardTxSize {
return fmt.Errorf("transaction size of %v is larger than max "+
"allowed size of %v", serializedLen, maxStandardTxSize)
}
for i, txIn := range tx.TxIn {
// Each transaction input signature script must not exceed the
// maximum size allowed for a standard transaction. See
// the comment on maxStandardSigScriptSize for more details.
sigScriptLen := len(txIn.SignatureScript)
if sigScriptLen > maxStandardSigScriptSize {
return fmt.Errorf("transaction input %d: signature "+
"script size of %d bytes is large than max "+
"allowed size of %d bytes", i, sigScriptLen,
maxStandardSigScriptSize)
}
// Each transaction input signature script must only contain
// opcodes which push data onto the stack.
if !btcscript.IsPushOnlyScript(txIn.SignatureScript) {
return fmt.Errorf("transaction input %d: signature " +
"script is not push only")
}
}
// None of the output public key scripts can be a non-standard script or
// be "dust".
for i, txOut := range tx.TxOut {
err := checkPkScriptStandard(txOut.PkScript)
if err != nil {
return fmt.Errorf("transaction output %d: %v", i, err)
}
if isDust(txOut) {
return fmt.Errorf("transaction output %d: payment "+
"of %d is dust", i, txOut.Value)
}
}
return nil
}
// checkInputsStandard performs a series of checks on a transactions inputs
// to ensure they are "standard". A standard transaction input is one that
// that consumes the same number of outputs from the stack as the output script
// pushes. This help prevent resource exhaustion attacks by "creative" use of
// scripts that are super expensive to process like OP_DUP OP_CHECKSIG OP_DROP
// repeated a large number of times followed by a final OP_TRUE.
func checkInputsStandard(tx *btcwire.MsgTx) error {
// TODO(davec): Implement
return nil
}
// removeOrphan removes the passed orphan transaction from the orphan pool and
// previous orphan index.
func (mp *txMemPool) removeOrphan(txHash *btcwire.ShaHash) {
// Protect concurrent access.
mp.lock.Lock()
defer mp.lock.Unlock()
// Nothing to do if passed tx is not an orphan.
tx, exists := mp.orphans[*txHash]
if !exists {
return
}
// Remove the reference from the previous orphan index.
for _, txIn := range tx.TxIn {
originTxHash := txIn.PreviousOutpoint.Hash
if orphans, exists := mp.orphansByPrev[originTxHash]; exists {
for e := orphans.Front(); e != nil; e = e.Next() {
if e.Value.(*btcwire.MsgTx) == tx {
orphans.Remove(e)
break
}
}
// Remove the map entry altogether if there are no
// longer any orphans which depend on it.
if orphans.Len() == 0 {
delete(mp.orphansByPrev, originTxHash)
}
}
}
// Remove the transaction from the orphan pool.
delete(mp.orphans, *txHash)
}
// limitNumOrphans limits the number of orphan transactions by evicting a random
// orphan if adding a new one would cause it to overflow the max allowed.
func (mp *txMemPool) limitNumOrphans() error {
// Protect concurrent access.
mp.lock.Lock()
defer mp.lock.Unlock()
if len(mp.orphans)+1 > maxOrphanTransactions {
// Generate a cryptographically random hash.
randHashBytes := make([]byte, btcwire.HashSize)
_, err := rand.Read(randHashBytes)
if err != nil {
return err
}
randHashNum := new(big.Int).SetBytes(randHashBytes)
// Try to find the first entry that is greater than the random
// hash. Use the first entry (which is already pseudorandom due
// to Go's range statement over maps) as a fallback if none of
// the hashes in the orphan pool are larger than the random
// hash.
var foundHash *btcwire.ShaHash
for txHash := range mp.orphans {
if foundHash == nil {
foundHash = &txHash
}
txHashNum := btcchain.ShaHashToBig(&txHash)
if txHashNum.Cmp(randHashNum) > 0 {
foundHash = &txHash
break
}
}
// Need to unlock and relock since removeOrphan has its own
// locking.
mp.lock.Unlock()
mp.removeOrphan(foundHash)
mp.lock.Lock()
}
return nil
}
// addOrphan adds an orphan transaction to the orphan pool.
func (mp *txMemPool) addOrphan(tx *btcwire.MsgTx, txHash *btcwire.ShaHash) {
// Limit the number orphan transactions to prevent memory exhaustion. A
// random orphan is evicted to make room if needed.
mp.limitNumOrphans()
mp.lock.Lock()
defer mp.lock.Unlock()
mp.orphans[*txHash] = tx
for _, txIn := range tx.TxIn {
originTxHash := txIn.PreviousOutpoint.Hash
if mp.orphansByPrev[originTxHash] == nil {
mp.orphansByPrev[originTxHash] = list.New()
}
mp.orphansByPrev[originTxHash].PushBack(tx)
}
log.Debugf("[TXMP] Stored orphan transaction %v (total: %d)", txHash,
len(mp.orphans))
}
// maybeAddOrphan potentially adds an orphan to the orphan pool.
func (mp *txMemPool) maybeAddOrphan(tx *btcwire.MsgTx, txHash *btcwire.ShaHash) error {
// Ignore orphan transactions that are too large. This helps avoid
// a memory exhaustion attack based on sending a lot of really large
// orphans. In the case there is a valid transaction larger than this,
// it will ultimtely be rebroadcast after the parent transactions
// have been mined or otherwise received.
//
// Note that the number of orphan transactions in the orphan pool is
// also limited, so this equates to a maximum memory used of
// maxOrphanTxSize * maxOrphanTransactions (which is 500MB as of the
// time this comment was written).
var serializedTxBuf bytes.Buffer
err := tx.Serialize(&serializedTxBuf)
if err != nil {
return err
}
serializedLen := serializedTxBuf.Len()
if serializedLen > maxOrphanTxSize {
return fmt.Errorf("orphan transaction size of %d bytes is "+
"larger than max allowed size of %d bytes",
serializedLen, maxOrphanTxSize)
}
// Add the orphan if the none of the above disqualified it.
mp.addOrphan(tx, txHash)
return nil
}
// isTransactionInPool returns whether or not the passed transaction already
// exists in the memory pool.
func (mp *txMemPool) isTransactionInPool(hash *btcwire.ShaHash) bool {
mp.lock.RLock()
defer mp.lock.RUnlock()
if _, exists := mp.pool[*hash]; exists {
return true
}
if _, exists := mp.orphans[*hash]; exists {
return true
}
return false
}
// removeTransaction removes the passed transaction from the memory pool.
func (mp *txMemPool) removeTransaction(tx *btcwire.MsgTx) {
mp.lock.Lock()
defer mp.lock.Unlock()
// Remove any transactions which rely on this one.
txHash, _ := tx.TxSha()
for i := uint32(0); i < uint32(len(tx.TxOut)); i++ {
outpoint := btcwire.NewOutPoint(&txHash, i)
if txRedeemer, exists := mp.outpoints[*outpoint]; exists {
mp.lock.Unlock()
mp.removeTransaction(txRedeemer)
mp.lock.Lock()
}
}
// Remove the transaction and mark the referenced outpoints as unspent
// by the pool.
if tx, exists := mp.pool[txHash]; exists {
for _, txIn := range tx.TxIn {
delete(mp.outpoints, txIn.PreviousOutpoint)
}
delete(mp.pool, txHash)
}
}
// addTransaction adds the passed transaction to the memory pool. It should
// not be called directly as it doesn't perform any validation. This is a
// helper for maybeAcceptTransaction.
func (mp *txMemPool) addTransaction(tx *btcwire.MsgTx, txHash *btcwire.ShaHash) {
mp.lock.Lock()
defer mp.lock.Unlock()
// Add the transaction to the pool and mark the referenced outpoints
// as spent by the pool.
mp.pool[*txHash] = tx
for _, txIn := range tx.TxIn {
mp.outpoints[txIn.PreviousOutpoint] = tx
}
}
// checkPoolDoubleSpend checks whether or not the passed transaction is
// attempting to spend coins already spent by other transactions in the pool.
// Note it does not check for double spends against transactions already in the
// main chain.
func (mp *txMemPool) checkPoolDoubleSpend(tx *btcwire.MsgTx) error {
mp.lock.RLock()
defer mp.lock.RUnlock()
for _, txIn := range tx.TxIn {
if txR, exists := mp.outpoints[txIn.PreviousOutpoint]; exists {
hash, _ := txR.TxSha()
return fmt.Errorf("transaction %v in the pool "+
"already spends the same coins", hash)
}
}
return nil
}
// fetchInputTransactions fetches the input transactions referenced by the
// passed transaction. First, it fetches from the main chain, then it tries to
// fetch any missing inputs from the transaction pool.
func (mp *txMemPool) fetchInputTransactions(tx *btcwire.MsgTx) (btcchain.TxStore, error) {
mp.lock.RLock()
defer mp.lock.RUnlock()
txStore, err := mp.server.blockManager.blockChain.FetchTransactionStore(tx)
if err != nil {
return nil, err
}
// Attempt to populate any missing inputs from the transaction pool.
for _, txD := range txStore {
if txD.Err == btcdb.TxShaMissing || txD.Tx == nil {
if poolTx, exists := mp.pool[*txD.Hash]; exists {
txD.Tx = poolTx
txD.BlockHeight = mempoolHeight
txD.Spent = make([]bool, len(poolTx.TxOut))
txD.Err = nil
}
}
}
return txStore, nil
}
// maybeAcceptTransaction is the main workhorse for handling insertion of new
// free-standing transactions into a memory pool. It includes functionality
// such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool.
func (mp *txMemPool) maybeAcceptTransaction(tx *btcwire.MsgTx, isOrphan *bool) error {
*isOrphan = false
txHash, err := tx.TxSha()
if err != nil {
return err
}
// Don't accept the transaction if it already exists in the pool. This
// applies to orphan transactions as well. This check is intended to
// be a quick check to weed out duplicates. It is more expensive to
// detect a duplicate transaction in the main chain, so that is done
// later.
if mp.isTransactionInPool(&txHash) {
return fmt.Errorf("already have transaction %v", txHash)
}
// Perform preliminary sanity checks on the transaction. This makes
// use of btcchain which contains the invariant rules for what
// transactions are allowed into blocks.
err = btcchain.CheckTransactionSanity(tx)
if err != nil {
return err
}
// A standalone transaction must not be a coinbase transaction.
if btcchain.IsCoinBase(tx) {
return fmt.Errorf("transaction %v is an individual coinbase",
txHash)
}
// Don't accept transactions with a lock time after the maximum int32
// value for now. This is an artifact of older bitcoind clients which
// treated this field as an int32 and would treat anything larger
// incorrectly (as negative).
if tx.LockTime > math.MaxInt32 {
return fmt.Errorf("transaction %v is has a lock time after "+
"2038 which is not accepted yet", txHash)
}
// Get the current height of the main chain. A standalone transaction
// will be mined into the next block at best, so
_, curHeight, err := mp.server.db.NewestSha()
if err != nil {
return err
}
nextBlockHeight := curHeight + 1
// Don't allow non-standard transactions on the main network.
if activeNetParams.btcnet == btcwire.MainNet {
err := checkTransactionStandard(tx, nextBlockHeight)
if err != nil {
return fmt.Errorf("transaction %v is not a standard "+
"transaction: %v", txHash, err)
}
}
// The transaction may not use any of the same outputs as other
// transactions already in the pool as that would ultimately result in a
// double spend. This check is intended to be quick and therefore only
// detects double spends within the transaction pool itself. The
// transaction could still be double spending coins from the main chain
// at this point. There is a more in-depth check that happens later
// after fetching the referenced transaction inputs from the main chain
// which examines the actual spend data and prevents double spends.
err = mp.checkPoolDoubleSpend(tx)
if err != nil {
return err
}
// Fetch all of the transactions referenced by the inputs to this
// transaction. This function also attempts to fetch the transaction
// itself to be used for detecting a duplicate transaction without
// needing to do a separate lookup.
txStore, err := mp.fetchInputTransactions(tx)
if err != nil {
return err
}
// Don't allow the transaction if it exists in the main chain and is not
// not already fully spent.
if txD, exists := txStore[txHash]; exists && txD.Err == nil {
for _, isOutputSpent := range txD.Spent {
if !isOutputSpent {
return fmt.Errorf("transaction already exists")
}
}
}
delete(txStore, txHash)
// Transaction is an orphan if any of the inputs don't exist.
for _, txD := range txStore {
if txD.Err == btcdb.TxShaMissing {
*isOrphan = true
return nil
}
}
// Perform several checks on the transaction inputs using the invariant
// rules in btcchain for what transactions are allowed into blocks.
// Also returns the fees associated with the transaction which will be
// used later.
txFee, err := btcchain.CheckTransactionInputs(tx, nextBlockHeight, txStore)
if err != nil {
return err
}
// Don't allow transactions with non-standard inputs on the main
// network.
if activeNetParams.btcnet == btcwire.MainNet {
err := checkInputsStandard(tx)
if err != nil {
return fmt.Errorf("transaction %v has a non-standard "+
"input: %v", txHash, err)
}
}
// Note: if you modify this code to accept non-standard transactions,
// you should add code here to check that the transaction does a
// reasonable number of ECDSA signature verifications.
// TODO(davec): Don't allow the transaction if the transation fee
// would be too low to get into an empty block.
_ = txFee
// Verify crypto signatures for each input and reject the transaction if
// any don't verify.
err = btcchain.ValidateTransactionScripts(tx, &txHash, time.Now(), txStore)
if err != nil {
return err
}
// TODO(davec): Rate-limit free transactions
// Add to transaction pool.
mp.addTransaction(tx, &txHash)
mp.lock.RLock()
log.Infof("[TXMP] Accepted transaction %v (pool size: %v)", txHash,
len(mp.pool))
mp.lock.RUnlock()
// TODO(davec): Notifications
// Generate the inventory vector and relay it.
iv := btcwire.NewInvVect(btcwire.InvVect_Tx, &txHash)
mp.server.RelayInventory(iv)
return nil
}
// processOrphans determines if there are any orphans which depend on the passed
// transaction hash (they are no longer orphans if true) and potentially accepts
// them. It repeats the process for the newly accepted transactions (to detect
// further orphans which may no longer be orphans) until there are no more.
func (mp *txMemPool) processOrphans(hash *btcwire.ShaHash) error {
// Start with processing at least the passed hash.
processHashes := list.New()
processHashes.PushBack(hash)
for processHashes.Len() > 0 {
// Pop the first hash to process.
firstElement := processHashes.Remove(processHashes.Front())
processHash := firstElement.(*btcwire.ShaHash)
// Look up all orphans that are referenced by the transaction we
// just accepted. This will typically only be one, but it could
// be multiple if the referenced transaction contains multiple
// outputs. Skip to the next item on the list of hashes to
// process if there are none.
orphans, exists := mp.orphansByPrev[*processHash]
if !exists || orphans == nil {
continue
}
for e := orphans.Front(); e != nil; e = e.Next() {
tx := e.Value.(*btcwire.MsgTx)
// Remove the orphan from the orphan pool.
orphanHash, err := tx.TxSha()
if err != nil {
return err
}
mp.removeOrphan(&orphanHash)
// Potentially accept the transaction into the
// transaction pool.
var isOrphan bool
err = mp.maybeAcceptTransaction(tx, &isOrphan)
if err != nil {
return err
}
if isOrphan {
mp.removeOrphan(&orphanHash)
}
// Add this transaction to the list of transactions to
// process so any orphans that depend on this one are
// handled too.
processHashes.PushBack(&orphanHash)
}
}
return nil
}
// ProcessTransaction is the main workhorse for handling insertion of new
// free-standing transactions into a memory pool. It includes functionality
// such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool.
func (mp *txMemPool) ProcessTransaction(tx *btcwire.MsgTx) error {
txHash, err := tx.TxSha()
if err != nil {
return err
}
log.Tracef("[TXMP] Processing transaction %v", txHash)
// Potentially accept the transaction to the memory pool.
var isOrphan bool
err = mp.maybeAcceptTransaction(tx, &isOrphan)
if err != nil {
return err
}
if !isOrphan {
// Accept any orphan transactions that depend on this
// transaction (they are no longer orphans) and repeat for those
// accepted transactions until there are no more.
err = mp.processOrphans(&txHash)
if err != nil {
return err
}
} else {
// When the transaction is an orphan (has inputs missing),
// potentially add it to the orphan pool.
err := mp.maybeAddOrphan(tx, &txHash)
if err != nil {
return err
}
}
return nil
}
// newTxMemPool returns a new memory pool for validating and storing standalone
// transactions until they are mined into a block.
func newTxMemPool(server *server) *txMemPool {
return &txMemPool{
server: server,
pool: make(map[btcwire.ShaHash]*btcwire.MsgTx),
orphans: make(map[btcwire.ShaHash]*btcwire.MsgTx),
orphansByPrev: make(map[btcwire.ShaHash]*list.List),
outpoints: make(map[btcwire.OutPoint]*btcwire.MsgTx),
}
}

29
peer.go
View file

@ -400,6 +400,28 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire
return nil return nil
} }
// handleTxMsg is invoked when a peer receives a tx bitcoin message. It blocks
// until the bitcoin transaction has been fully processed. Unlock the block
// handler this does not serialize all transactions through a single thread
// transactions don't rely on the previous one in a linear fashion like blocks.
func (p *peer) handleTxMsg(msg *btcwire.MsgTx) {
// Add the transaction to the known inventory for the peer.
hash, err := msg.TxSha()
if err != nil {
log.Errorf("Unable to get transaction hash: %v", err)
return
}
iv := btcwire.NewInvVect(btcwire.InvVect_Tx, &hash)
p.addKnownInventory(iv)
// Process the transaction.
err = p.server.txMemPool.ProcessTransaction(msg)
if err != nil {
log.Errorf("Failed to process transaction %v: %v", hash, err)
return
}
}
// handleBlockMsg is invoked when a peer receives a block bitcoin message. It // handleBlockMsg is invoked when a peer receives a block bitcoin message. It
// blocks until the bitcoin block has been fully processed. // blocks until the bitcoin block has been fully processed.
func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) { func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) {
@ -836,7 +858,7 @@ out:
// regression test mode and the error is one of the // regression test mode and the error is one of the
// allowed errors. // allowed errors.
if cfg.RegressionTest && p.isAllowedByRegression(err) { if cfg.RegressionTest && p.isAllowedByRegression(err) {
log.Errorf("[PEER] Allowed regression test"+ log.Errorf("[PEER] Allowed regression test "+
"error: %v", err) "error: %v", err)
continue continue
} }
@ -882,6 +904,9 @@ out:
case *btcwire.MsgAlert: case *btcwire.MsgAlert:
p.server.BroadcastMessage(msg, p) p.server.BroadcastMessage(msg, p)
case *btcwire.MsgTx:
p.handleTxMsg(msg)
case *btcwire.MsgBlock: case *btcwire.MsgBlock:
p.handleBlockMsg(msg, buf) p.handleBlockMsg(msg, buf)
@ -905,7 +930,7 @@ out:
} }
// Mark the address as currently connected and working as of // Mark the address as currently connected and working as of
// now if one of the messages that trigger // now if one of the messages that trigger it was processed.
if markConnected && atomic.LoadInt32(&p.disconnect) == 0 { if markConnected && atomic.LoadInt32(&p.disconnect) == 0 {
if p.na == nil { if p.na == nil {
log.Warnf("we're getting stuff before we " + log.Warnf("we're getting stuff before we " +

View file

@ -45,6 +45,7 @@ type server struct {
addrManager *AddrManager addrManager *AddrManager
rpcServer *rpcServer rpcServer *rpcServer
blockManager *blockManager blockManager *blockManager
txMemPool *txMemPool
newPeers chan *peer newPeers chan *peer
donePeers chan *peer donePeers chan *peer
banPeers chan *peer banPeers chan *peer
@ -117,7 +118,6 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time,
func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool {
for e := peers.Front(); e != nil; e = e.Next() { for e := peers.Front(); e != nil; e = e.Next() {
if e.Value == p { if e.Value == p {
// Issue an asynchronous reconnect if the peer was a // Issue an asynchronous reconnect if the peer was a
// persistent outbound connection. // persistent outbound connection.
if !p.inbound && p.persistent && if !p.inbound && p.persistent &&
@ -589,6 +589,7 @@ func newServer(addr string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*server, er
return nil, err return nil, err
} }
s.blockManager = bm s.blockManager = bm
s.txMemPool = newTxMemPool(&s)
if !cfg.DisableRPC { if !cfg.DisableRPC {
s.rpcServer, err = newRPCServer(&s) s.rpcServer, err = newRPCServer(&s)