From a1bb291b282433035f63b17773c38fc9e0f103ff Mon Sep 17 00:00:00 2001 From: David Hill Date: Thu, 14 Apr 2016 13:58:09 -0400 Subject: [PATCH] mempool: Have ProcessTransaction return accepted transactions. (#547) It is not the responsibility of mempool to relay transactions, so return a slice of transactions accepted to the mempool due to the passed transaction to the caller. --- blockmanager.go | 7 +++- mempool.go | 98 +++++++++++++++++++++++++++---------------------- rpcserver.go | 4 +- server.go | 74 ++++++++++++++----------------------- 4 files changed, 90 insertions(+), 93 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index be8c395a..b8c4a55b 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -454,7 +454,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. allowOrphans := cfg.MaxOrphanTxs > 0 - err := b.server.txMemPool.ProcessTransaction(tmsg.tx, + acceptedTxs, err := b.server.txMemPool.ProcessTransaction(tmsg.tx, allowOrphans, true) // Remove transaction from request maps. Either the mempool/chain @@ -489,6 +489,8 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { false) return } + + b.server.AnnounceNewTransactions(acceptedTxs) } // current returns true if we believe we are synced with our peers, false if we @@ -1209,7 +1211,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { b.server.txMemPool.RemoveTransaction(tx, false) b.server.txMemPool.RemoveDoubleSpends(tx) b.server.txMemPool.RemoveOrphan(tx.Sha()) - b.server.txMemPool.ProcessOrphans(tx.Sha()) + acceptedTxs := b.server.txMemPool.ProcessOrphans(tx.Sha()) + b.server.AnnounceNewTransactions(acceptedTxs) } if r := b.server.rpcServer; r != nil { diff --git a/mempool.go b/mempool.go index 6947424d..9feeb675 100644 --- a/mempool.go +++ b/mempool.go @@ -52,10 +52,6 @@ type mempoolConfig struct { // the current best chain. Chain *blockchain.BlockChain - // RelayNtfnChan defines the channel to send newly accepted transactions - // to. If unset or set to nil, notifications will not be sent. - RelayNtfnChan chan *btcutil.Tx - // SigCache defines a signature cache to use. SigCache *txscript.SigCache @@ -751,7 +747,9 @@ func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo // ProcessOrphans. See the comment for ProcessOrphans for more details. // // This function MUST be called with the mempool lock held (for writes). -func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { +func (mp *txMemPool) processOrphans(hash *wire.ShaHash) []*btcutil.Tx { + var acceptedTxns []*btcutil.Tx + // Start with processing at least the passed hash. processHashes := list.New() processHashes.PushBack(hash) @@ -807,10 +805,9 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { continue } - // Notify the caller of the new tx added to mempool. - if mp.cfg.RelayNtfnChan != nil { - mp.cfg.RelayNtfnChan <- tx - } + // Add this transaction to the list of transactions + // that are no longer orphans. + acceptedTxns = append(acceptedTxns, tx) // Add this transaction to the list of transactions to // process so any orphans that depend on this one are @@ -828,6 +825,8 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { processHashes.PushBack(orphanHash) } } + + return acceptedTxns } // ProcessOrphans determines if there are any orphans which depend on the passed @@ -836,11 +835,16 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { // newly accepted transactions (to detect further orphans which may no longer be // orphans) until there are no more. // +// It returns a slice of transactions added to the mempool. A nil slice means +// no transactions were moved from the orphan pool to the mempool. +// // This function is safe for concurrent access. -func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) { +func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) []*btcutil.Tx { mp.Lock() - mp.processOrphans(hash) + acceptedTxns := mp.processOrphans(hash) mp.Unlock() + + return acceptedTxns } // ProcessTransaction is the main workhorse for handling insertion of new @@ -848,8 +852,13 @@ func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) { // such as rejecting duplicate transactions, ensuring transactions follow all // rules, orphan transaction handling, and insertion into the memory pool. // +// It returns a slice of transactions added to the mempool. When the +// error is nil, the list will include the passed transaction itself along +// with any additional orphan transaactions that were added as a result of +// the passed one being accepted. +// // This function is safe for concurrent access. -func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool) error { +func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool) ([]*btcutil.Tx, error) { // Protect concurrent access. mp.Lock() defer mp.Unlock() @@ -859,47 +868,50 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b // Potentially accept the transaction to the memory pool. missingParents, err := mp.maybeAcceptTransaction(tx, true, rateLimit) if err != nil { - return err + return nil, err } if len(missingParents) == 0 { - // Notify the caller that the tx was added to the mempool. - if mp.cfg.RelayNtfnChan != nil { - mp.cfg.RelayNtfnChan <- tx - } - // Accept any orphan transactions that depend on this // transaction (they may no longer be orphans if all inputs // are now available) and repeat for those accepted // transactions until there are no more. - mp.processOrphans(tx.Sha()) - } else { - // The transaction is an orphan (has inputs missing). Reject - // it if the flag to allow orphans is not set. - if !allowOrphan { - // Only use the first missing parent transaction in - // the error message. - // - // NOTE: RejectDuplicate is really not an accurate - // reject code here, but it matches the reference - // implementation and there isn't a better choice due - // to the limited number of reject codes. Missing - // inputs is assumed to mean they are already spent - // which is not really always the case. - str := fmt.Sprintf("orphan transaction %v references "+ - "outputs of unknown or fully-spent "+ - "transaction %v", tx.Sha(), missingParents[0]) - return txRuleError(wire.RejectDuplicate, str) - } + newTxs := mp.processOrphans(tx.Sha()) + acceptedTxs := make([]*btcutil.Tx, len(newTxs)+1) - // Potentially add the orphan transaction to the orphan pool. - err := mp.maybeAddOrphan(tx) - if err != nil { - return err - } + // Add the parent transaction first so remote nodes + // do not add orphans. + acceptedTxs[0] = tx + copy(acceptedTxs[1:], newTxs) + + return acceptedTxs, nil } - return nil + // The transaction is an orphan (has inputs missing). Reject + // it if the flag to allow orphans is not set. + if !allowOrphan { + // Only use the first missing parent transaction in + // the error message. + // + // NOTE: RejectDuplicate is really not an accurate + // reject code here, but it matches the reference + // implementation and there isn't a better choice due + // to the limited number of reject codes. Missing + // inputs is assumed to mean they are already spent + // which is not really always the case. + str := fmt.Sprintf("orphan transaction %v references "+ + "outputs of unknown or fully-spent "+ + "transaction %v", tx.Sha(), missingParents[0]) + return nil, txRuleError(wire.RejectDuplicate, str) + } + + // Potentially add the orphan transaction to the orphan pool. + err = mp.maybeAddOrphan(tx) + if err != nil { + return nil, err + } + + return nil, nil } // Count returns the number of transactions in the main pool. It does not diff --git a/rpcserver.go b/rpcserver.go index 0533b153..2d1564c1 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3438,7 +3438,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st } tx := btcutil.NewTx(msgtx) - err = s.server.txMemPool.ProcessTransaction(tx, false, false) + acceptedTxs, err := s.server.txMemPool.ProcessTransaction(tx, false, false) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -3459,6 +3459,8 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st } } + s.server.AnnounceNewTransactions(acceptedTxs) + // Keep track of all the sendrawtransaction request txns so that they // can be rebroadcast if they don't make their way into a block. iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) diff --git a/server.go b/server.go index b7fe2400..f2c64ec9 100644 --- a/server.go +++ b/server.go @@ -185,7 +185,6 @@ type server struct { blockManager *blockManager txMemPool *txMemPool cpuMiner *CPUMiner - relayNtfnChan chan *btcutil.Tx modifyRebroadcastInv chan interface{} pendingPeers chan *serverPeer newPeers chan *serverPeer @@ -961,6 +960,31 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) { s.modifyRebroadcastInv <- broadcastInventoryDel(iv) } +// AnnounceNewTransactions generates and relays inventory vectors and notifies +// both websocket and getblocktemplate long poll clients of the passed +// transactions. This function should be called whenever new transactions +// are added to the mempool. +func (s *server) AnnounceNewTransactions(newTxs []*btcutil.Tx) { + // Generate and relay inventory vectors for all newly accepted + // transactions into the memory pool due to the original being + // accepted. + for _, tx := range newTxs { + // Generate the inventory vector and relay it. + iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) + s.RelayInventory(iv, tx) + + if s.rpcServer != nil { + // Notify websocket clients about mempool transactions. + s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true) + + // Potentially notify any getblocktemplate long poll clients + // about stale block templates due to the new transaction. + s.rpcServer.gbtWorkState.NotifyMempoolTx( + s.txMemPool.LastUpdated()) + } + } +} + // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction hash is not known. func (s *server) pushTxMsg(sp *serverPeer, sha *wire.ShaHash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { @@ -1871,8 +1895,8 @@ func (s *server) BanPeer(sp *serverPeer) { s.banPeers <- sp } -// RelayInventory relays the passed inventory to all connected peers that are -// not already known to have it. +// RelayInventory relays the passed inventory vector to all connected peers +// that are not already known to have it. func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) { s.relayInv <- relayMsg{invVect: invVect, data: data} } @@ -2008,45 +2032,6 @@ func (s *server) UpdatePeerHeights(latestBlkSha *wire.ShaHash, latestHeight int3 } } -// relayTransactionsHandler relays transactions sent on relayNtfnChan to other -// peers and to the RPC infrastructure if necessary. It must be run as a -// goroutine. -func (s *server) relayTransactionsHandler() { -out: - for { - select { - case tx := <-s.relayNtfnChan: - // Generate an inv and relay it. - inv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) - s.RelayInventory(inv, tx) - - if s.rpcServer != nil { - // Notify websocket clients about mempool transactions. - s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true) - - // Potentially notify any getblocktemplate long poll clients - // about stale block templates due to the new transaction. - s.rpcServer.gbtWorkState.NotifyMempoolTx( - s.txMemPool.LastUpdated()) - } - - case <-s.quit: - break out - } - } - - // Drain channels before exiting so nothing is left waiting around to send. -cleanup: - for { - select { - case <-s.relayNtfnChan: - default: - break cleanup - } - } - s.wg.Done() -} - // rebroadcastHandler keeps track of user submitted inventories that we have // sent out but have not yet made it into a block. We periodically rebroadcast // them in case our peers restarted or otherwise lost track of them. @@ -2126,9 +2111,6 @@ func (s *server) Start() { s.wg.Add(1) go s.peerHandler() - s.wg.Add(1) - go s.relayTransactionsHandler() - if s.nat != nil { s.wg.Add(1) go s.upnpUpdateThread() @@ -2480,7 +2462,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param relayInv: make(chan relayMsg, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers), quit: make(chan struct{}), - relayNtfnChan: make(chan *btcutil.Tx, cfg.MaxPeers), modifyRebroadcastInv: make(chan interface{}), peerHeightsUpdate: make(chan updatePeerHeightsMsg), nat: nat, @@ -2539,7 +2520,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param }, FetchUtxoView: s.blockManager.chain.FetchUtxoView, Chain: s.blockManager.chain, - RelayNtfnChan: s.relayNtfnChan, SigCache: s.sigCache, TimeSource: s.timeSource, AddrIndex: s.addrIndex,