From 83bcfea271da8b0c890ede7ef374c5a74ec5aad7 Mon Sep 17 00:00:00 2001 From: David Hill Date: Fri, 20 Nov 2015 22:12:17 -0500 Subject: [PATCH] mempool: Introduce mempoolConfig. This is in preparation of moving mempool to its own subpackage. No functional change. --- mempool.go | 95 ++++++++++++++++++++++++++++++++++++------------------ server.go | 31 +++++++++++++++++- 2 files changed, 93 insertions(+), 33 deletions(-) diff --git a/mempool.go b/mempool.go index a059674d..a3360028 100644 --- a/mempool.go +++ b/mempool.go @@ -50,12 +50,51 @@ type mempoolTxDesc struct { StartingPriority float64 } +// mempoolConfig is a descriptor containing the memory pool configuration. +type mempoolConfig struct { + // DisableRelayPriority defines whether to relay free or low-fee + // transactions that do not have enough priority to be relayed. + DisableRelayPriority bool + + // EnableAddrIndex defines whether the address index should be enabled. + EnableAddrIndex bool + + // FetchTransactionStore defines the function to use to fetch + // transacation information. + FetchTransactionStore func(*btcutil.Tx, bool) (blockchain.TxStore, error) + + // FreeTxRelayLimit defines the given amount in thousands of bytes + // per minute that transactions with no fee are rate limited to. + FreeTxRelayLimit float64 + + // MaxOrphanTxs defines the maximum number of orphan transactions to + // keep in memory. + MaxOrphanTxs int + + // MinRelayTxFee defines the minimum transaction fee in BTC/kB to be + // considered a non-zero fee. + MinRelayTxFee btcutil.Amount + + // NewestSha defines the function to retrieve the newest sha + NewestSha func() (*wire.ShaHash, int32, error) + + // RelayNtfnChan defines the channel to send newly accepted transactions + // to. If unset or set to nil, notifications will not be sent. + RelayNtfnChan chan *btcutil.Tx + + // SigCache defines a signature cache to use. + SigCache *txscript.SigCache + + // TimeSource defines the timesource to use. + TimeSource blockchain.MedianTimeSource +} + // txMemPool is used as a source of transactions that need to be mined into // blocks and relayed to other peers. It is safe for concurrent access from // multiple peers. type txMemPool struct { sync.RWMutex - server *server + cfg mempoolConfig pool map[wire.ShaHash]*mempoolTxDesc orphans map[wire.ShaHash]*btcutil.Tx orphansByPrev map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx @@ -113,7 +152,7 @@ func (mp *txMemPool) RemoveOrphan(txHash *wire.ShaHash) { // // This function MUST be called with the mempool lock held (for writes). func (mp *txMemPool) limitNumOrphans() error { - if len(mp.orphans)+1 > cfg.MaxOrphanTxs && cfg.MaxOrphanTxs > 0 { + if len(mp.orphans)+1 > mp.cfg.MaxOrphanTxs && mp.cfg.MaxOrphanTxs > 0 { // Generate a cryptographically random hash. randHashBytes := make([]byte, wire.HashSize) _, err := rand.Read(randHashBytes) @@ -179,7 +218,7 @@ func (mp *txMemPool) maybeAddOrphan(tx *btcutil.Tx) error { // // Note that the number of orphan transactions in the orphan pool is // also limited, so this equates to a maximum memory used of - // maxOrphanTxSize * cfg.MaxOrphanTxs (which is ~5MB using the default + // maxOrphanTxSize * mp.cfg.MaxOrphanTxs (which is ~5MB using the default // values at the time this comment was written). serializedLen := tx.MsgTx().SerializeSize() if serializedLen > maxOrphanTxSize { @@ -282,7 +321,7 @@ func (mp *txMemPool) removeTransaction(tx *btcutil.Tx, removeRedeemers bool) { // Remove the transaction and mark the referenced outpoints as unspent // by the pool. if txDesc, exists := mp.pool[*txHash]; exists { - if cfg.AddrIndex { + if mp.cfg.EnableAddrIndex { mp.removeTransactionFromAddrIndex(tx) } @@ -394,7 +433,7 @@ func (mp *txMemPool) addTransaction(txStore blockchain.TxStore, tx *btcutil.Tx, } mp.lastUpdated = time.Now() - if cfg.AddrIndex { + if mp.cfg.EnableAddrIndex { mp.addTransactionToAddrIndex(tx) } } @@ -493,7 +532,7 @@ func (mp *txMemPool) checkPoolDoubleSpend(tx *btcutil.Tx) error { // // This function MUST be called with the mempool lock held (for reads). func (mp *txMemPool) fetchInputTransactions(tx *btcutil.Tx, includeSpent bool) (blockchain.TxStore, error) { - txStore, err := mp.server.blockManager.blockChain.FetchTransactionStore(tx, includeSpent) + txStore, err := mp.cfg.FetchTransactionStore(tx, includeSpent) if err != nil { return nil, err } @@ -599,7 +638,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo // Get the current height of the main chain. A standalone transaction // will be mined into the next block at best, so it's height is at least // one more than the current height. - _, curHeight, err := mp.server.db.NewestSha() + _, curHeight, err := mp.cfg.NewestSha() if err != nil { // This is an unexpected error so don't turn it into a rule // error. @@ -611,7 +650,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo // forbid their relaying. if !activeNetParams.RelayNonStdTxs { err := checkTransactionStandard(tx, nextBlockHeight, - mp.server.timeSource, cfg.minRelayTxFee) + mp.cfg.TimeSource, mp.cfg.MinRelayTxFee) if err != nil { // Attempt to extract a reject code from the error so // it can be retained. When not possible, fall back to @@ -742,7 +781,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo // transaction does not exceeed 1000 less than the reserved space for // high-priority transactions, don't require a fee for it. serializedSize := int64(tx.MsgTx().SerializeSize()) - minFee := calcMinRequiredTxRelayFee(serializedSize, cfg.minRelayTxFee) + minFee := calcMinRequiredTxRelayFee(serializedSize, mp.cfg.MinRelayTxFee) if serializedSize >= (defaultBlockPrioritySize-1000) && txFee < minFee { str := fmt.Sprintf("transaction %v has %d fees which is under "+ "the required amount of %d", txHash, txFee, @@ -754,7 +793,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo // in the next block. Transactions which are being added back to the // memory pool from blocks that have been disconnected during a reorg // are exempted. - if isNew && !cfg.NoRelayPriority && txFee < minFee { + if isNew && !mp.cfg.DisableRelayPriority && txFee < minFee { currentPriority := calcPriority(tx.MsgTx(), txStore, nextBlockHeight) if currentPriority <= minHighPriority { @@ -776,7 +815,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo mp.lastPennyUnix = nowUnix // Are we still over the limit? - if mp.pennyTotal >= cfg.FreeTxRelayLimit*10*1000 { + if mp.pennyTotal >= mp.cfg.FreeTxRelayLimit*10*1000 { str := fmt.Sprintf("transaction %v has been rejected "+ "by the rate limiter due to low fees", txHash) return nil, txRuleError(wire.RejectInsufficientFee, str) @@ -786,13 +825,13 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo mp.pennyTotal += float64(serializedSize) txmpLog.Tracef("rate limit: curTotal %v, nextTotal: %v, "+ "limit %v", oldTotal, mp.pennyTotal, - cfg.FreeTxRelayLimit*10*1000) + mp.cfg.FreeTxRelayLimit*10*1000) } // Verify crypto signatures for each input and reject the transaction if // any don't verify. err = blockchain.ValidateTransactionScripts(tx, txStore, - txscript.StandardVerifyFlags, mp.server.sigCache) + txscript.StandardVerifyFlags, mp.cfg.SigCache) if err != nil { if cerr, ok := err.(blockchain.RuleError); ok { return nil, chainRuleError(cerr) @@ -806,15 +845,6 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash, len(mp.pool)) - if mp.server.rpcServer != nil { - // Notify websocket clients about mempool transactions. - mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew) - - // Potentially notify any getblocktemplate long poll clients - // about stale block templates due to the new transaction. - mp.server.rpcServer.gbtWorkState.NotifyMempoolTx(mp.lastUpdated) - } - return nil, nil } @@ -897,10 +927,10 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { continue } - // Generate and relay the inventory vector for the - // newly accepted transaction. - iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) - mp.server.RelayInventory(iv, tx) + // Notify the caller of the new tx added to mempool. + if mp.cfg.RelayNtfnChan != nil { + mp.cfg.RelayNtfnChan <- tx + } // Add this transaction to the list of transactions to // process so any orphans that depend on this one are @@ -953,9 +983,10 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b } if len(missingParents) == 0 { - // Generate the inventory vector and relay it. - iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) - mp.server.RelayInventory(iv, tx) + // Notify the caller that the tx was added to the mempool. + if mp.cfg.RelayNtfnChan != nil { + mp.cfg.RelayNtfnChan <- tx + } // Accept any orphan transactions that depend on this // transaction (they may no longer be orphans if all inputs @@ -1071,15 +1102,15 @@ func (mp *txMemPool) LastUpdated() time.Time { // newTxMemPool returns a new memory pool for validating and storing standalone // transactions until they are mined into a block. -func newTxMemPool(server *server) *txMemPool { +func newTxMemPool(cfg *mempoolConfig) *txMemPool { memPool := &txMemPool{ - server: server, + cfg: *cfg, pool: make(map[wire.ShaHash]*mempoolTxDesc), orphans: make(map[wire.ShaHash]*btcutil.Tx), orphansByPrev: make(map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx), outpoints: make(map[wire.OutPoint]*btcutil.Tx), } - if cfg.AddrIndex { + if cfg.EnableAddrIndex { memPool.addrindex = make(map[string]map[wire.ShaHash]struct{}) } return memPool diff --git a/server.go b/server.go index 585a8f69..ff1fab27 100644 --- a/server.go +++ b/server.go @@ -180,6 +180,7 @@ type server struct { addrIndexer *addrIndexer txMemPool *txMemPool cpuMiner *CPUMiner + relayNtfnChan chan *btcutil.Tx modifyRebroadcastInv chan interface{} pendingPeers chan *serverPeer newPeers chan *serverPeer @@ -1899,6 +1900,20 @@ func (s *server) rebroadcastHandler() { out: for { select { + case tx := <-s.relayNtfnChan: + // Generate an inv and relay it. + inv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) + s.RelayInventory(inv, tx) + + if s.rpcServer != nil { + // Notify websocket clients about mempool transactions. + s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true) + + // Potentially notify any getblocktemplate long poll clients + // about stale block templates due to the new transaction. + s.rpcServer.gbtWorkState.NotifyMempoolTx(s.txMemPool.LastUpdated()) + } + case riv := <-s.modifyRebroadcastInv: switch msg := riv.(type) { // Incoming InvVects are added to our map of RPC txs. @@ -2322,6 +2337,7 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param relayInv: make(chan relayMsg, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers), quit: make(chan struct{}), + relayNtfnChan: make(chan *btcutil.Tx, cfg.MaxPeers), modifyRebroadcastInv: make(chan interface{}), peerHeightsUpdate: make(chan updatePeerHeightsMsg), nat: nat, @@ -2335,7 +2351,6 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param return nil, err } s.blockManager = bm - s.txMemPool = newTxMemPool(&s) // Create the mining policy based on the configuration options. policy := miningPolicy{ @@ -2346,6 +2361,20 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param } s.cpuMiner = newCPUMiner(&policy, &s) + txC := mempoolConfig{ + DisableRelayPriority: cfg.NoRelayPriority, + EnableAddrIndex: cfg.AddrIndex, + FetchTransactionStore: s.blockManager.blockChain.FetchTransactionStore, + FreeTxRelayLimit: cfg.FreeTxRelayLimit, + MaxOrphanTxs: cfg.MaxOrphanTxs, + MinRelayTxFee: cfg.minRelayTxFee, + NewestSha: s.db.NewestSha, + RelayNtfnChan: s.relayNtfnChan, + SigCache: s.sigCache, + TimeSource: s.timeSource, + } + s.txMemPool = newTxMemPool(&txC) + if cfg.AddrIndex { ai, err := newAddrIndexer(&s) if err != nil {