mempool: Introduce mempoolConfig.

This is in preparation of moving mempool to its own subpackage.  No
functional change.
This commit is contained in:
David Hill 2015-11-20 22:12:17 -05:00
parent 2b6a9a56e5
commit 83bcfea271
2 changed files with 93 additions and 33 deletions

View file

@ -50,12 +50,51 @@ type mempoolTxDesc struct {
StartingPriority float64 StartingPriority float64
} }
// mempoolConfig is a descriptor containing the memory pool configuration.
type mempoolConfig struct {
// DisableRelayPriority defines whether to relay free or low-fee
// transactions that do not have enough priority to be relayed.
DisableRelayPriority bool
// EnableAddrIndex defines whether the address index should be enabled.
EnableAddrIndex bool
// FetchTransactionStore defines the function to use to fetch
// transacation information.
FetchTransactionStore func(*btcutil.Tx, bool) (blockchain.TxStore, error)
// FreeTxRelayLimit defines the given amount in thousands of bytes
// per minute that transactions with no fee are rate limited to.
FreeTxRelayLimit float64
// MaxOrphanTxs defines the maximum number of orphan transactions to
// keep in memory.
MaxOrphanTxs int
// MinRelayTxFee defines the minimum transaction fee in BTC/kB to be
// considered a non-zero fee.
MinRelayTxFee btcutil.Amount
// NewestSha defines the function to retrieve the newest sha
NewestSha func() (*wire.ShaHash, int32, error)
// RelayNtfnChan defines the channel to send newly accepted transactions
// to. If unset or set to nil, notifications will not be sent.
RelayNtfnChan chan *btcutil.Tx
// SigCache defines a signature cache to use.
SigCache *txscript.SigCache
// TimeSource defines the timesource to use.
TimeSource blockchain.MedianTimeSource
}
// txMemPool is used as a source of transactions that need to be mined into // txMemPool is used as a source of transactions that need to be mined into
// blocks and relayed to other peers. It is safe for concurrent access from // blocks and relayed to other peers. It is safe for concurrent access from
// multiple peers. // multiple peers.
type txMemPool struct { type txMemPool struct {
sync.RWMutex sync.RWMutex
server *server cfg mempoolConfig
pool map[wire.ShaHash]*mempoolTxDesc pool map[wire.ShaHash]*mempoolTxDesc
orphans map[wire.ShaHash]*btcutil.Tx orphans map[wire.ShaHash]*btcutil.Tx
orphansByPrev map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx orphansByPrev map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx
@ -113,7 +152,7 @@ func (mp *txMemPool) RemoveOrphan(txHash *wire.ShaHash) {
// //
// This function MUST be called with the mempool lock held (for writes). // This function MUST be called with the mempool lock held (for writes).
func (mp *txMemPool) limitNumOrphans() error { func (mp *txMemPool) limitNumOrphans() error {
if len(mp.orphans)+1 > cfg.MaxOrphanTxs && cfg.MaxOrphanTxs > 0 { if len(mp.orphans)+1 > mp.cfg.MaxOrphanTxs && mp.cfg.MaxOrphanTxs > 0 {
// Generate a cryptographically random hash. // Generate a cryptographically random hash.
randHashBytes := make([]byte, wire.HashSize) randHashBytes := make([]byte, wire.HashSize)
_, err := rand.Read(randHashBytes) _, err := rand.Read(randHashBytes)
@ -179,7 +218,7 @@ func (mp *txMemPool) maybeAddOrphan(tx *btcutil.Tx) error {
// //
// Note that the number of orphan transactions in the orphan pool is // Note that the number of orphan transactions in the orphan pool is
// also limited, so this equates to a maximum memory used of // also limited, so this equates to a maximum memory used of
// maxOrphanTxSize * cfg.MaxOrphanTxs (which is ~5MB using the default // maxOrphanTxSize * mp.cfg.MaxOrphanTxs (which is ~5MB using the default
// values at the time this comment was written). // values at the time this comment was written).
serializedLen := tx.MsgTx().SerializeSize() serializedLen := tx.MsgTx().SerializeSize()
if serializedLen > maxOrphanTxSize { if serializedLen > maxOrphanTxSize {
@ -282,7 +321,7 @@ func (mp *txMemPool) removeTransaction(tx *btcutil.Tx, removeRedeemers bool) {
// Remove the transaction and mark the referenced outpoints as unspent // Remove the transaction and mark the referenced outpoints as unspent
// by the pool. // by the pool.
if txDesc, exists := mp.pool[*txHash]; exists { if txDesc, exists := mp.pool[*txHash]; exists {
if cfg.AddrIndex { if mp.cfg.EnableAddrIndex {
mp.removeTransactionFromAddrIndex(tx) mp.removeTransactionFromAddrIndex(tx)
} }
@ -394,7 +433,7 @@ func (mp *txMemPool) addTransaction(txStore blockchain.TxStore, tx *btcutil.Tx,
} }
mp.lastUpdated = time.Now() mp.lastUpdated = time.Now()
if cfg.AddrIndex { if mp.cfg.EnableAddrIndex {
mp.addTransactionToAddrIndex(tx) mp.addTransactionToAddrIndex(tx)
} }
} }
@ -493,7 +532,7 @@ func (mp *txMemPool) checkPoolDoubleSpend(tx *btcutil.Tx) error {
// //
// This function MUST be called with the mempool lock held (for reads). // This function MUST be called with the mempool lock held (for reads).
func (mp *txMemPool) fetchInputTransactions(tx *btcutil.Tx, includeSpent bool) (blockchain.TxStore, error) { func (mp *txMemPool) fetchInputTransactions(tx *btcutil.Tx, includeSpent bool) (blockchain.TxStore, error) {
txStore, err := mp.server.blockManager.blockChain.FetchTransactionStore(tx, includeSpent) txStore, err := mp.cfg.FetchTransactionStore(tx, includeSpent)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -599,7 +638,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
// Get the current height of the main chain. A standalone transaction // Get the current height of the main chain. A standalone transaction
// will be mined into the next block at best, so it's height is at least // will be mined into the next block at best, so it's height is at least
// one more than the current height. // one more than the current height.
_, curHeight, err := mp.server.db.NewestSha() _, curHeight, err := mp.cfg.NewestSha()
if err != nil { if err != nil {
// This is an unexpected error so don't turn it into a rule // This is an unexpected error so don't turn it into a rule
// error. // error.
@ -611,7 +650,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
// forbid their relaying. // forbid their relaying.
if !activeNetParams.RelayNonStdTxs { if !activeNetParams.RelayNonStdTxs {
err := checkTransactionStandard(tx, nextBlockHeight, err := checkTransactionStandard(tx, nextBlockHeight,
mp.server.timeSource, cfg.minRelayTxFee) mp.cfg.TimeSource, mp.cfg.MinRelayTxFee)
if err != nil { if err != nil {
// Attempt to extract a reject code from the error so // Attempt to extract a reject code from the error so
// it can be retained. When not possible, fall back to // it can be retained. When not possible, fall back to
@ -742,7 +781,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
// transaction does not exceeed 1000 less than the reserved space for // transaction does not exceeed 1000 less than the reserved space for
// high-priority transactions, don't require a fee for it. // high-priority transactions, don't require a fee for it.
serializedSize := int64(tx.MsgTx().SerializeSize()) serializedSize := int64(tx.MsgTx().SerializeSize())
minFee := calcMinRequiredTxRelayFee(serializedSize, cfg.minRelayTxFee) minFee := calcMinRequiredTxRelayFee(serializedSize, mp.cfg.MinRelayTxFee)
if serializedSize >= (defaultBlockPrioritySize-1000) && txFee < minFee { if serializedSize >= (defaultBlockPrioritySize-1000) && txFee < minFee {
str := fmt.Sprintf("transaction %v has %d fees which is under "+ str := fmt.Sprintf("transaction %v has %d fees which is under "+
"the required amount of %d", txHash, txFee, "the required amount of %d", txHash, txFee,
@ -754,7 +793,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
// in the next block. Transactions which are being added back to the // in the next block. Transactions which are being added back to the
// memory pool from blocks that have been disconnected during a reorg // memory pool from blocks that have been disconnected during a reorg
// are exempted. // are exempted.
if isNew && !cfg.NoRelayPriority && txFee < minFee { if isNew && !mp.cfg.DisableRelayPriority && txFee < minFee {
currentPriority := calcPriority(tx.MsgTx(), txStore, currentPriority := calcPriority(tx.MsgTx(), txStore,
nextBlockHeight) nextBlockHeight)
if currentPriority <= minHighPriority { if currentPriority <= minHighPriority {
@ -776,7 +815,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
mp.lastPennyUnix = nowUnix mp.lastPennyUnix = nowUnix
// Are we still over the limit? // Are we still over the limit?
if mp.pennyTotal >= cfg.FreeTxRelayLimit*10*1000 { if mp.pennyTotal >= mp.cfg.FreeTxRelayLimit*10*1000 {
str := fmt.Sprintf("transaction %v has been rejected "+ str := fmt.Sprintf("transaction %v has been rejected "+
"by the rate limiter due to low fees", txHash) "by the rate limiter due to low fees", txHash)
return nil, txRuleError(wire.RejectInsufficientFee, str) return nil, txRuleError(wire.RejectInsufficientFee, str)
@ -786,13 +825,13 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
mp.pennyTotal += float64(serializedSize) mp.pennyTotal += float64(serializedSize)
txmpLog.Tracef("rate limit: curTotal %v, nextTotal: %v, "+ txmpLog.Tracef("rate limit: curTotal %v, nextTotal: %v, "+
"limit %v", oldTotal, mp.pennyTotal, "limit %v", oldTotal, mp.pennyTotal,
cfg.FreeTxRelayLimit*10*1000) mp.cfg.FreeTxRelayLimit*10*1000)
} }
// Verify crypto signatures for each input and reject the transaction if // Verify crypto signatures for each input and reject the transaction if
// any don't verify. // any don't verify.
err = blockchain.ValidateTransactionScripts(tx, txStore, err = blockchain.ValidateTransactionScripts(tx, txStore,
txscript.StandardVerifyFlags, mp.server.sigCache) txscript.StandardVerifyFlags, mp.cfg.SigCache)
if err != nil { if err != nil {
if cerr, ok := err.(blockchain.RuleError); ok { if cerr, ok := err.(blockchain.RuleError); ok {
return nil, chainRuleError(cerr) return nil, chainRuleError(cerr)
@ -806,15 +845,6 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo
txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash, txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash,
len(mp.pool)) len(mp.pool))
if mp.server.rpcServer != nil {
// Notify websocket clients about mempool transactions.
mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew)
// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
mp.server.rpcServer.gbtWorkState.NotifyMempoolTx(mp.lastUpdated)
}
return nil, nil return nil, nil
} }
@ -897,10 +927,10 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) {
continue continue
} }
// Generate and relay the inventory vector for the // Notify the caller of the new tx added to mempool.
// newly accepted transaction. if mp.cfg.RelayNtfnChan != nil {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) mp.cfg.RelayNtfnChan <- tx
mp.server.RelayInventory(iv, tx) }
// Add this transaction to the list of transactions to // Add this transaction to the list of transactions to
// process so any orphans that depend on this one are // process so any orphans that depend on this one are
@ -953,9 +983,10 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b
} }
if len(missingParents) == 0 { if len(missingParents) == 0 {
// Generate the inventory vector and relay it. // Notify the caller that the tx was added to the mempool.
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) if mp.cfg.RelayNtfnChan != nil {
mp.server.RelayInventory(iv, tx) mp.cfg.RelayNtfnChan <- tx
}
// Accept any orphan transactions that depend on this // Accept any orphan transactions that depend on this
// transaction (they may no longer be orphans if all inputs // transaction (they may no longer be orphans if all inputs
@ -1071,15 +1102,15 @@ func (mp *txMemPool) LastUpdated() time.Time {
// newTxMemPool returns a new memory pool for validating and storing standalone // newTxMemPool returns a new memory pool for validating and storing standalone
// transactions until they are mined into a block. // transactions until they are mined into a block.
func newTxMemPool(server *server) *txMemPool { func newTxMemPool(cfg *mempoolConfig) *txMemPool {
memPool := &txMemPool{ memPool := &txMemPool{
server: server, cfg: *cfg,
pool: make(map[wire.ShaHash]*mempoolTxDesc), pool: make(map[wire.ShaHash]*mempoolTxDesc),
orphans: make(map[wire.ShaHash]*btcutil.Tx), orphans: make(map[wire.ShaHash]*btcutil.Tx),
orphansByPrev: make(map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx), orphansByPrev: make(map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx),
outpoints: make(map[wire.OutPoint]*btcutil.Tx), outpoints: make(map[wire.OutPoint]*btcutil.Tx),
} }
if cfg.AddrIndex { if cfg.EnableAddrIndex {
memPool.addrindex = make(map[string]map[wire.ShaHash]struct{}) memPool.addrindex = make(map[string]map[wire.ShaHash]struct{})
} }
return memPool return memPool

View file

@ -180,6 +180,7 @@ type server struct {
addrIndexer *addrIndexer addrIndexer *addrIndexer
txMemPool *txMemPool txMemPool *txMemPool
cpuMiner *CPUMiner cpuMiner *CPUMiner
relayNtfnChan chan *btcutil.Tx
modifyRebroadcastInv chan interface{} modifyRebroadcastInv chan interface{}
pendingPeers chan *serverPeer pendingPeers chan *serverPeer
newPeers chan *serverPeer newPeers chan *serverPeer
@ -1899,6 +1900,20 @@ func (s *server) rebroadcastHandler() {
out: out:
for { for {
select { select {
case tx := <-s.relayNtfnChan:
// Generate an inv and relay it.
inv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())
s.RelayInventory(inv, tx)
if s.rpcServer != nil {
// Notify websocket clients about mempool transactions.
s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true)
// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
s.rpcServer.gbtWorkState.NotifyMempoolTx(s.txMemPool.LastUpdated())
}
case riv := <-s.modifyRebroadcastInv: case riv := <-s.modifyRebroadcastInv:
switch msg := riv.(type) { switch msg := riv.(type) {
// Incoming InvVects are added to our map of RPC txs. // Incoming InvVects are added to our map of RPC txs.
@ -2322,6 +2337,7 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param
relayInv: make(chan relayMsg, cfg.MaxPeers), relayInv: make(chan relayMsg, cfg.MaxPeers),
broadcast: make(chan broadcastMsg, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers),
quit: make(chan struct{}), quit: make(chan struct{}),
relayNtfnChan: make(chan *btcutil.Tx, cfg.MaxPeers),
modifyRebroadcastInv: make(chan interface{}), modifyRebroadcastInv: make(chan interface{}),
peerHeightsUpdate: make(chan updatePeerHeightsMsg), peerHeightsUpdate: make(chan updatePeerHeightsMsg),
nat: nat, nat: nat,
@ -2335,7 +2351,6 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param
return nil, err return nil, err
} }
s.blockManager = bm s.blockManager = bm
s.txMemPool = newTxMemPool(&s)
// Create the mining policy based on the configuration options. // Create the mining policy based on the configuration options.
policy := miningPolicy{ policy := miningPolicy{
@ -2346,6 +2361,20 @@ func newServer(listenAddrs []string, db database.Db, chainParams *chaincfg.Param
} }
s.cpuMiner = newCPUMiner(&policy, &s) s.cpuMiner = newCPUMiner(&policy, &s)
txC := mempoolConfig{
DisableRelayPriority: cfg.NoRelayPriority,
EnableAddrIndex: cfg.AddrIndex,
FetchTransactionStore: s.blockManager.blockChain.FetchTransactionStore,
FreeTxRelayLimit: cfg.FreeTxRelayLimit,
MaxOrphanTxs: cfg.MaxOrphanTxs,
MinRelayTxFee: cfg.minRelayTxFee,
NewestSha: s.db.NewestSha,
RelayNtfnChan: s.relayNtfnChan,
SigCache: s.sigCache,
TimeSource: s.timeSource,
}
s.txMemPool = newTxMemPool(&txC)
if cfg.AddrIndex { if cfg.AddrIndex {
ai, err := newAddrIndexer(&s) ai, err := newAddrIndexer(&s)
if err != nil { if err != nil {