mempool: Have ProcessTransaction return accepted transactions. (#547)

It is not the responsibility of mempool to relay transactions, so
return a slice of transactions accepted to the mempool due to the
passed transaction to the caller.
This commit is contained in:
David Hill 2016-04-14 13:58:09 -04:00 committed by Dave Collins
parent e15d3008cf
commit a1bb291b28
4 changed files with 90 additions and 93 deletions

View file

@ -454,7 +454,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// Process the transaction to include validation, insertion in the // Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc. // memory pool, orphan handling, etc.
allowOrphans := cfg.MaxOrphanTxs > 0 allowOrphans := cfg.MaxOrphanTxs > 0
err := b.server.txMemPool.ProcessTransaction(tmsg.tx, acceptedTxs, err := b.server.txMemPool.ProcessTransaction(tmsg.tx,
allowOrphans, true) allowOrphans, true)
// Remove transaction from request maps. Either the mempool/chain // Remove transaction from request maps. Either the mempool/chain
@ -489,6 +489,8 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
false) false)
return return
} }
b.server.AnnounceNewTransactions(acceptedTxs)
} }
// current returns true if we believe we are synced with our peers, false if we // current returns true if we believe we are synced with our peers, false if we
@ -1209,7 +1211,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.server.txMemPool.RemoveTransaction(tx, false) b.server.txMemPool.RemoveTransaction(tx, false)
b.server.txMemPool.RemoveDoubleSpends(tx) b.server.txMemPool.RemoveDoubleSpends(tx)
b.server.txMemPool.RemoveOrphan(tx.Sha()) b.server.txMemPool.RemoveOrphan(tx.Sha())
b.server.txMemPool.ProcessOrphans(tx.Sha()) acceptedTxs := b.server.txMemPool.ProcessOrphans(tx.Sha())
b.server.AnnounceNewTransactions(acceptedTxs)
} }
if r := b.server.rpcServer; r != nil { if r := b.server.rpcServer; r != nil {

View file

@ -52,10 +52,6 @@ type mempoolConfig struct {
// the current best chain. // the current best chain.
Chain *blockchain.BlockChain Chain *blockchain.BlockChain
// RelayNtfnChan defines the channel to send newly accepted transactions
// to. If unset or set to nil, notifications will not be sent.
RelayNtfnChan chan *btcutil.Tx
// SigCache defines a signature cache to use. // SigCache defines a signature cache to use.
SigCache *txscript.SigCache SigCache *txscript.SigCache
@ -751,7 +747,9 @@ func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
// ProcessOrphans. See the comment for ProcessOrphans for more details. // ProcessOrphans. See the comment for ProcessOrphans for more details.
// //
// This function MUST be called with the mempool lock held (for writes). // This function MUST be called with the mempool lock held (for writes).
func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { func (mp *txMemPool) processOrphans(hash *wire.ShaHash) []*btcutil.Tx {
var acceptedTxns []*btcutil.Tx
// Start with processing at least the passed hash. // Start with processing at least the passed hash.
processHashes := list.New() processHashes := list.New()
processHashes.PushBack(hash) processHashes.PushBack(hash)
@ -807,10 +805,9 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) {
continue continue
} }
// Notify the caller of the new tx added to mempool. // Add this transaction to the list of transactions
if mp.cfg.RelayNtfnChan != nil { // that are no longer orphans.
mp.cfg.RelayNtfnChan <- tx acceptedTxns = append(acceptedTxns, tx)
}
// Add this transaction to the list of transactions to // Add this transaction to the list of transactions to
// process so any orphans that depend on this one are // process so any orphans that depend on this one are
@ -828,6 +825,8 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) {
processHashes.PushBack(orphanHash) processHashes.PushBack(orphanHash)
} }
} }
return acceptedTxns
} }
// ProcessOrphans determines if there are any orphans which depend on the passed // ProcessOrphans determines if there are any orphans which depend on the passed
@ -836,11 +835,16 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) {
// newly accepted transactions (to detect further orphans which may no longer be // newly accepted transactions (to detect further orphans which may no longer be
// orphans) until there are no more. // orphans) until there are no more.
// //
// It returns a slice of transactions added to the mempool. A nil slice means
// no transactions were moved from the orphan pool to the mempool.
//
// This function is safe for concurrent access. // This function is safe for concurrent access.
func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) { func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) []*btcutil.Tx {
mp.Lock() mp.Lock()
mp.processOrphans(hash) acceptedTxns := mp.processOrphans(hash)
mp.Unlock() mp.Unlock()
return acceptedTxns
} }
// ProcessTransaction is the main workhorse for handling insertion of new // ProcessTransaction is the main workhorse for handling insertion of new
@ -848,8 +852,13 @@ func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) {
// such as rejecting duplicate transactions, ensuring transactions follow all // such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool. // rules, orphan transaction handling, and insertion into the memory pool.
// //
// It returns a slice of transactions added to the mempool. When the
// error is nil, the list will include the passed transaction itself along
// with any additional orphan transaactions that were added as a result of
// the passed one being accepted.
//
// This function is safe for concurrent access. // This function is safe for concurrent access.
func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool) error { func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool) ([]*btcutil.Tx, error) {
// Protect concurrent access. // Protect concurrent access.
mp.Lock() mp.Lock()
defer mp.Unlock() defer mp.Unlock()
@ -859,21 +868,25 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b
// Potentially accept the transaction to the memory pool. // Potentially accept the transaction to the memory pool.
missingParents, err := mp.maybeAcceptTransaction(tx, true, rateLimit) missingParents, err := mp.maybeAcceptTransaction(tx, true, rateLimit)
if err != nil { if err != nil {
return err return nil, err
} }
if len(missingParents) == 0 { if len(missingParents) == 0 {
// Notify the caller that the tx was added to the mempool.
if mp.cfg.RelayNtfnChan != nil {
mp.cfg.RelayNtfnChan <- tx
}
// Accept any orphan transactions that depend on this // Accept any orphan transactions that depend on this
// transaction (they may no longer be orphans if all inputs // transaction (they may no longer be orphans if all inputs
// are now available) and repeat for those accepted // are now available) and repeat for those accepted
// transactions until there are no more. // transactions until there are no more.
mp.processOrphans(tx.Sha()) newTxs := mp.processOrphans(tx.Sha())
} else { acceptedTxs := make([]*btcutil.Tx, len(newTxs)+1)
// Add the parent transaction first so remote nodes
// do not add orphans.
acceptedTxs[0] = tx
copy(acceptedTxs[1:], newTxs)
return acceptedTxs, nil
}
// The transaction is an orphan (has inputs missing). Reject // The transaction is an orphan (has inputs missing). Reject
// it if the flag to allow orphans is not set. // it if the flag to allow orphans is not set.
if !allowOrphan { if !allowOrphan {
@ -889,17 +902,16 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b
str := fmt.Sprintf("orphan transaction %v references "+ str := fmt.Sprintf("orphan transaction %v references "+
"outputs of unknown or fully-spent "+ "outputs of unknown or fully-spent "+
"transaction %v", tx.Sha(), missingParents[0]) "transaction %v", tx.Sha(), missingParents[0])
return txRuleError(wire.RejectDuplicate, str) return nil, txRuleError(wire.RejectDuplicate, str)
} }
// Potentially add the orphan transaction to the orphan pool. // Potentially add the orphan transaction to the orphan pool.
err := mp.maybeAddOrphan(tx) err = mp.maybeAddOrphan(tx)
if err != nil { if err != nil {
return err return nil, err
}
} }
return nil return nil, nil
} }
// Count returns the number of transactions in the main pool. It does not // Count returns the number of transactions in the main pool. It does not

View file

@ -3438,7 +3438,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
} }
tx := btcutil.NewTx(msgtx) tx := btcutil.NewTx(msgtx)
err = s.server.txMemPool.ProcessTransaction(tx, false, false) acceptedTxs, err := s.server.txMemPool.ProcessTransaction(tx, false, false)
if err != nil { if err != nil {
// When the error is a rule error, it means the transaction was // When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong, // simply rejected as opposed to something actually going wrong,
@ -3459,6 +3459,8 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
} }
} }
s.server.AnnounceNewTransactions(acceptedTxs)
// Keep track of all the sendrawtransaction request txns so that they // Keep track of all the sendrawtransaction request txns so that they
// can be rebroadcast if they don't make their way into a block. // can be rebroadcast if they don't make their way into a block.
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())

View file

@ -185,7 +185,6 @@ type server struct {
blockManager *blockManager blockManager *blockManager
txMemPool *txMemPool txMemPool *txMemPool
cpuMiner *CPUMiner cpuMiner *CPUMiner
relayNtfnChan chan *btcutil.Tx
modifyRebroadcastInv chan interface{} modifyRebroadcastInv chan interface{}
pendingPeers chan *serverPeer pendingPeers chan *serverPeer
newPeers chan *serverPeer newPeers chan *serverPeer
@ -961,6 +960,31 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv) s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
} }
// AnnounceNewTransactions generates and relays inventory vectors and notifies
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
// are added to the mempool.
func (s *server) AnnounceNewTransactions(newTxs []*btcutil.Tx) {
// Generate and relay inventory vectors for all newly accepted
// transactions into the memory pool due to the original being
// accepted.
for _, tx := range newTxs {
// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())
s.RelayInventory(iv, tx)
if s.rpcServer != nil {
// Notify websocket clients about mempool transactions.
s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true)
// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
s.rpcServer.gbtWorkState.NotifyMempoolTx(
s.txMemPool.LastUpdated())
}
}
}
// pushTxMsg sends a tx message for the provided transaction hash to the // pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction hash is not known. // connected peer. An error is returned if the transaction hash is not known.
func (s *server) pushTxMsg(sp *serverPeer, sha *wire.ShaHash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { func (s *server) pushTxMsg(sp *serverPeer, sha *wire.ShaHash, doneChan chan<- struct{}, waitChan <-chan struct{}) error {
@ -1871,8 +1895,8 @@ func (s *server) BanPeer(sp *serverPeer) {
s.banPeers <- sp s.banPeers <- sp
} }
// RelayInventory relays the passed inventory to all connected peers that are // RelayInventory relays the passed inventory vector to all connected peers
// not already known to have it. // that are not already known to have it.
func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) { func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
s.relayInv <- relayMsg{invVect: invVect, data: data} s.relayInv <- relayMsg{invVect: invVect, data: data}
} }
@ -2008,45 +2032,6 @@ func (s *server) UpdatePeerHeights(latestBlkSha *wire.ShaHash, latestHeight int3
} }
} }
// relayTransactionsHandler relays transactions sent on relayNtfnChan to other
// peers and to the RPC infrastructure if necessary. It must be run as a
// goroutine.
func (s *server) relayTransactionsHandler() {
out:
for {
select {
case tx := <-s.relayNtfnChan:
// Generate an inv and relay it.
inv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())
s.RelayInventory(inv, tx)
if s.rpcServer != nil {
// Notify websocket clients about mempool transactions.
s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true)
// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
s.rpcServer.gbtWorkState.NotifyMempoolTx(
s.txMemPool.LastUpdated())
}
case <-s.quit:
break out
}
}
// Drain channels before exiting so nothing is left waiting around to send.
cleanup:
for {
select {
case <-s.relayNtfnChan:
default:
break cleanup
}
}
s.wg.Done()
}
// rebroadcastHandler keeps track of user submitted inventories that we have // rebroadcastHandler keeps track of user submitted inventories that we have
// sent out but have not yet made it into a block. We periodically rebroadcast // sent out but have not yet made it into a block. We periodically rebroadcast
// them in case our peers restarted or otherwise lost track of them. // them in case our peers restarted or otherwise lost track of them.
@ -2126,9 +2111,6 @@ func (s *server) Start() {
s.wg.Add(1) s.wg.Add(1)
go s.peerHandler() go s.peerHandler()
s.wg.Add(1)
go s.relayTransactionsHandler()
if s.nat != nil { if s.nat != nil {
s.wg.Add(1) s.wg.Add(1)
go s.upnpUpdateThread() go s.upnpUpdateThread()
@ -2480,7 +2462,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
relayInv: make(chan relayMsg, cfg.MaxPeers), relayInv: make(chan relayMsg, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan struct{}), quit: make(chan struct{}),
relayNtfnChan: make(chan *btcutil.Tx, cfg.MaxPeers),
modifyRebroadcastInv: make(chan interface{}), modifyRebroadcastInv: make(chan interface{}),
peerHeightsUpdate: make(chan updatePeerHeightsMsg), peerHeightsUpdate: make(chan updatePeerHeightsMsg),
nat: nat, nat: nat,
@ -2539,7 +2520,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
}, },
FetchUtxoView: s.blockManager.chain.FetchUtxoView, FetchUtxoView: s.blockManager.chain.FetchUtxoView,
Chain: s.blockManager.chain, Chain: s.blockManager.chain,
RelayNtfnChan: s.relayNtfnChan,
SigCache: s.sigCache, SigCache: s.sigCache,
TimeSource: s.timeSource, TimeSource: s.timeSource,
AddrIndex: s.addrIndex, AddrIndex: s.addrIndex,