mempool/mining: Introduce TxSource interface.

This introduces the concept of a new interface named TxSource which aims
to generically provide a concurrent safe source of transactions to be
considered for inclusion in a new block.  This is a step towards
decoupling the mining code from the internals of btcd.  Ultimately the
intent is to create a separate mining package.

The new TxSource interface relies on a new struct named miningTxDesc,
which describes each entry in the transaction source.  Once this code is
refactored into a separate mining package, the mining prefix can simply
be dropped leaving the type exported as mining.TxDesc.

To go along with this, the existing TxDesc type in the mempool has been
renamed to mempoolTxDesc and changed to embed the new miningTxDesc type.
This allows the mempool to efficiently implement the MiningTxDescs
method needed to satisfy the TxSource interface.

This approach effectively separates the direct reliance of the mining
code on the mempool and its data structures.  Even though the memory
pool will still be the default concrete implementation of the interface,
making it an interface offers much more flexibility in terms of testing
and even provides the potential to allow more than one source (perhaps
multiple independent relay networks, for example).

Finally, the memory pool and all of the mining code has been updated to
implement and use the new interface.
This commit is contained in:
Dave Collins 2015-11-25 13:30:44 -06:00
parent 8ab565ce21
commit 2b6a9a56e5
4 changed files with 111 additions and 49 deletions

View file

@ -53,6 +53,7 @@ var (
type CPUMiner struct {
sync.Mutex
policy *miningPolicy
txSource TxSource
server *server
numWorkers uint32
started bool
@ -184,7 +185,7 @@ func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, blockHeight int32,
// Initial state.
lastGenerated := time.Now()
lastTxUpdate := m.server.txMemPool.LastUpdated()
lastTxUpdate := m.txSource.LastUpdated()
hashesCompleted := uint64(0)
// Note that the entire extra nonce range is iterated and the offset is
@ -219,7 +220,7 @@ func (m *CPUMiner) solveBlock(msgBlock *wire.MsgBlock, blockHeight int32,
// has been updated since the block template was
// generated and it has been at least one
// minute.
if lastTxUpdate != m.server.txMemPool.LastUpdated() &&
if lastTxUpdate != m.txSource.LastUpdated() &&
time.Now().After(lastGenerated.Add(time.Minute)) {
return false
@ -603,6 +604,7 @@ func (m *CPUMiner) GenerateNBlocks(n uint32) ([]*wire.ShaHash, error) {
func newCPUMiner(policy *miningPolicy, s *server) *CPUMiner {
return &CPUMiner{
policy: policy,
txSource: s.txMemPool,
server: s,
numWorkers: defaultNumWorkers,
updateNumWorkers: make(chan struct{}),

View file

@ -40,14 +40,14 @@ const (
maxSigOpsPerTx = blockchain.MaxSigOpsPerBlock / 5
)
// TxDesc is a descriptor containing a transaction in the mempool and the
// metadata we store about it.
type TxDesc struct {
Tx *btcutil.Tx // Transaction.
Added time.Time // Time when added to pool.
Height int32 // Blockheight when added to pool.
Fee int64 // Transaction fees.
StartingPriority float64 // Priority when added to the pool.
// mempoolTxDesc is a descriptor containing a transaction in the mempool along
// with additional metadata.
type mempoolTxDesc struct {
miningTxDesc
// StartingPriority is the priority of the transaction when it was added
// to the pool.
StartingPriority float64
}
// txMemPool is used as a source of transactions that need to be mined into
@ -56,7 +56,7 @@ type TxDesc struct {
type txMemPool struct {
sync.RWMutex
server *server
pool map[wire.ShaHash]*TxDesc
pool map[wire.ShaHash]*mempoolTxDesc
orphans map[wire.ShaHash]*btcutil.Tx
orphansByPrev map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx
addrindex map[string]map[wire.ShaHash]struct{} // maps address to txs
@ -66,6 +66,9 @@ type txMemPool struct {
lastPennyUnix int64 // unix time of last ``penny spend''
}
// Ensure the txMemPool type implements the mining.TxSource interface.
var _ TxSource = (*txMemPool)(nil)
// removeOrphan is the internal function which implements the public
// RemoveOrphan. See the comment for RemoveOrphan for more details.
//
@ -377,11 +380,13 @@ func (mp *txMemPool) RemoveDoubleSpends(tx *btcutil.Tx) {
func (mp *txMemPool) addTransaction(txStore blockchain.TxStore, tx *btcutil.Tx, height int32, fee int64) {
// Add the transaction to the pool and mark the referenced outpoints
// as spent by the pool.
mp.pool[*tx.Sha()] = &TxDesc{
mp.pool[*tx.Sha()] = &mempoolTxDesc{
miningTxDesc: miningTxDesc{
Tx: tx,
Added: time.Now(),
Height: height,
Fee: fee,
},
StartingPriority: calcPriority(tx.MsgTx(), txStore, height),
}
for _, txIn := range tx.MsgTx().TxIn {
@ -537,8 +542,8 @@ func (mp *txMemPool) FilterTransactionsByAddress(addr btcutil.Address) ([]*btcut
if txs, exists := mp.addrindex[addr.EncodeAddress()]; exists {
addressTxs := make([]*btcutil.Tx, 0, len(txs))
for txHash := range txs {
if tx, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, tx.Tx)
if txD, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, txD.Tx)
}
}
return addressTxs, nil
@ -1020,11 +1025,11 @@ func (mp *txMemPool) TxShas() []*wire.ShaHash {
// The descriptors are to be treated as read only.
//
// This function is safe for concurrent access.
func (mp *txMemPool) TxDescs() []*TxDesc {
func (mp *txMemPool) TxDescs() []*mempoolTxDesc {
mp.RLock()
defer mp.RUnlock()
descs := make([]*TxDesc, len(mp.pool))
descs := make([]*mempoolTxDesc, len(mp.pool))
i := 0
for _, desc := range mp.pool {
descs[i] = desc
@ -1034,6 +1039,25 @@ func (mp *txMemPool) TxDescs() []*TxDesc {
return descs
}
// MiningDescs returns a slice of mining descriptors for all the transactions
// in the pool.
//
// This is part of the TxSource interface implementation and is safe for
// concurrent access as required by the interface contract.
func (mp *txMemPool) MiningDescs() []*miningTxDesc {
mp.RLock()
defer mp.RUnlock()
descs := make([]*miningTxDesc, len(mp.pool))
i := 0
for _, desc := range mp.pool {
descs[i] = &desc.miningTxDesc
i++
}
return descs
}
// LastUpdated returns the last time a transaction was added to or removed from
// the main pool. It does not include the orphan pool.
//
@ -1050,7 +1074,7 @@ func (mp *txMemPool) LastUpdated() time.Time {
func newTxMemPool(server *server) *txMemPool {
memPool := &txMemPool{
server: server,
pool: make(map[wire.ShaHash]*TxDesc),
pool: make(map[wire.ShaHash]*mempoolTxDesc),
orphans: make(map[wire.ShaHash]*btcutil.Tx),
orphansByPrev: make(map[wire.ShaHash]map[wire.ShaHash]*btcutil.Tx),
outpoints: make(map[wire.OutPoint]*btcutil.Tx),

View file

@ -40,6 +40,42 @@ const (
coinbaseFlags = "/P2SH/btcd/"
)
// miningTxDesc is a descriptor about a transaction in a transaction source
// along with additional metadata.
type miningTxDesc struct {
// Tx is the transaction associated with the entry.
Tx *btcutil.Tx
// Added is the time when the entry was added to the source pool.
Added time.Time
// Height is the block height when the entry was added to the the source
// pool.
Height int32
// Fee is the total fee the transaction associated with the entry pays.
Fee int64
}
// TxSource represents a source of transactions to consider for inclusion in
// new blocks.
//
// The interface contract requires that all of these methods are safe for
// concurrent access with respect to the source.
type TxSource interface {
// LastUpdated returns the last time a transaction was added to or
// removed from the source pool.
LastUpdated() time.Time
// MiningDescs returns a slice of mining descriptors for all the
// transactions in the source pool.
MiningDescs() []*miningTxDesc
// HaveTransaction returns whether or not the passed transaction hash
// exists in the source pool.
HaveTransaction(hash *wire.ShaHash) bool
}
// miningPolicy houses the policy (configuration parameters) which is used to
// control the generation of block templates. See the documentation for
// NewBlockTemplate for more details on each of these parameters are used.
@ -73,7 +109,7 @@ type txPrioItem struct {
// dependsOn holds a map of transaction hashes which this one depends
// on. It will only be set when the transaction references other
// transactions in the memory pool and hence must come after them in
// transactions in the source pool and hence must come after them in
// a block.
dependsOn map[wire.ShaHash]struct{}
}
@ -328,7 +364,7 @@ func medianAdjustedTime(chainState *chainState, timeSource blockchain.MedianTime
}
// NewBlockTemplate returns a new block template that is ready to be solved
// using the transactions from the passed transaction memory pool and a coinbase
// using the transactions from the passed transaction source pool and a coinbase
// that either pays to the passed address if it is not nil, or a coinbase that
// is redeemable by anyone if the passed address is nil. The nil address
// functionality is useful since there are cases such as the getblocktemplate
@ -349,7 +385,7 @@ func medianAdjustedTime(chainState *chainState, timeSource blockchain.MedianTime
// prioritizes based on the priority (then fee per kilobyte) or the fee per
// kilobyte (then priority) depending on whether or not the BlockPrioritySize
// policy setting allots space for high-priority transactions. Transactions
// which spend outputs from other transactions in the memory pool are added to a
// which spend outputs from other transactions in the source pool are added to a
// dependency map so they can be added to the priority queue once the
// transactions they depend on have been included.
//
@ -390,6 +426,7 @@ func medianAdjustedTime(chainState *chainState, timeSource blockchain.MedianTime
// | <= policy.BlockMinSize) | |
// ----------------------------------- --
func NewBlockTemplate(policy *miningPolicy, server *server, payToAddress btcutil.Address) (*BlockTemplate, error) {
var txSource TxSource = server.txMemPool
blockManager := server.blockManager
timeSource := server.timeSource
chainState := &blockManager.chainState
@ -420,27 +457,26 @@ func NewBlockTemplate(policy *miningPolicy, server *server, payToAddress btcutil
}
numCoinbaseSigOps := int64(blockchain.CountSigOps(coinbaseTx))
// Get the current memory pool transactions and create a priority queue
// to hold the transactions which are ready for inclusion into a block
// Get the current source transactions and create a priority queue to
// hold the transactions which are ready for inclusion into a block
// along with some priority related and fee metadata. Reserve the same
// number of items that are in the memory pool for the priority queue.
// Also, choose the initial sort order for the priority queue based on
// whether or not there is an area allocated for high-priority
// transactions.
mempoolTxns := server.txMemPool.TxDescs()
// number of items that are available for the priority queue. Also,
// choose the initial sort order for the priority queue based on whether
// or not there is an area allocated for high-priority transactions.
sourceTxns := txSource.MiningDescs()
sortedByFee := policy.BlockPrioritySize == 0
priorityQueue := newTxPriorityQueue(len(mempoolTxns), sortedByFee)
priorityQueue := newTxPriorityQueue(len(sourceTxns), sortedByFee)
// Create a slice to hold the transactions to be included in the
// generated block with reserved space. Also create a transaction
// store to house all of the input transactions so multiple lookups
// can be avoided.
blockTxns := make([]*btcutil.Tx, 0, len(mempoolTxns))
blockTxns := make([]*btcutil.Tx, 0, len(sourceTxns))
blockTxns = append(blockTxns, coinbaseTx)
blockTxStore := make(blockchain.TxStore)
// dependers is used to track transactions which depend on another
// transaction in the memory pool. This, in conjunction with the
// transaction in the source pool. This, in conjunction with the
// dependsOn map kept with each dependent transaction helps quickly
// determine which dependent transactions are now eligible for inclusion
// in the block once each transaction has been included.
@ -452,16 +488,16 @@ func NewBlockTemplate(policy *miningPolicy, server *server, payToAddress btcutil
// a transaction as it is selected for inclusion in the final block.
// However, since the total fees aren't known yet, use a dummy value for
// the coinbase fee which will be updated later.
txFees := make([]int64, 0, len(mempoolTxns))
txSigOpCounts := make([]int64, 0, len(mempoolTxns))
txFees := make([]int64, 0, len(sourceTxns))
txSigOpCounts := make([]int64, 0, len(sourceTxns))
txFees = append(txFees, -1) // Updated once known
txSigOpCounts = append(txSigOpCounts, numCoinbaseSigOps)
minrLog.Debugf("Considering %d mempool transactions for inclusion to "+
"new block", len(mempoolTxns))
minrLog.Debugf("Considering %d transactions for inclusion to new block",
len(sourceTxns))
mempoolLoop:
for _, txDesc := range mempoolTxns {
for _, txDesc := range sourceTxns {
// A block can't have more than one coinbase or contain
// non-finalized transactions.
tx := txDesc.Tx
@ -491,13 +527,13 @@ mempoolLoop:
// Setup dependencies for any transactions which reference
// other transactions in the mempool so they can be properly
// ordered below.
prioItem := &txPrioItem{tx: txDesc.Tx}
prioItem := &txPrioItem{tx: tx}
for _, txIn := range tx.MsgTx().TxIn {
originHash := &txIn.PreviousOutPoint.Hash
originIndex := txIn.PreviousOutPoint.Index
txData, exists := txStore[*originHash]
if !exists || txData.Err != nil || txData.Tx == nil {
if !server.txMemPool.HaveTransaction(originHash) {
if !txSource.HaveTransaction(originHash) {
minrLog.Tracef("Skipping tx %s because "+
"it references tx %s which is "+
"not available", tx.Sha,
@ -506,7 +542,7 @@ mempoolLoop:
}
// The transaction is referencing another
// transaction in the memory pool, so setup an
// transaction in the source pool, so setup an
// ordering dependency.
depList, exists := dependers[*originHash]
if !exists {

View file

@ -2180,15 +2180,15 @@ func handleGetInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (in
// handleGetMempoolInfo implements the getmempoolinfo command.
func handleGetMempoolInfo(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
txD := s.server.txMemPool.TxDescs()
mempoolTxns := s.server.txMemPool.TxDescs()
var numBytes int64
for _, desc := range txD {
numBytes += int64(desc.Tx.MsgTx().SerializeSize())
for _, txD := range mempoolTxns {
numBytes += int64(txD.Tx.MsgTx().SerializeSize())
}
ret := &btcjson.GetMempoolInfoResult{
Size: int64(len(txD)),
Size: int64(len(mempoolTxns)),
Bytes: numBytes,
}
@ -2416,7 +2416,7 @@ func handleGetRawMempool(s *rpcServer, cmd interface{}, closeChan <-chan struct{
}
mpd := &btcjson.GetRawMempoolVerboseResult{
Size: int32(desc.Tx.MsgTx().SerializeSize()),
Size: int32(tx.MsgTx().SerializeSize()),
Fee: btcutil.Amount(desc.Fee).ToBTC(),
Time: desc.Added.Unix(),
Height: int64(desc.Height),
@ -2424,7 +2424,7 @@ func handleGetRawMempool(s *rpcServer, cmd interface{}, closeChan <-chan struct{
CurrentPriority: currentPriority,
Depends: make([]string, 0),
}
for _, txIn := range desc.Tx.MsgTx().TxIn {
for _, txIn := range tx.MsgTx().TxIn {
hash := &txIn.PreviousOutPoint.Hash
if s.server.txMemPool.haveTransaction(hash) {
mpd.Depends = append(mpd.Depends,
@ -2432,7 +2432,7 @@ func handleGetRawMempool(s *rpcServer, cmd interface{}, closeChan <-chan struct{
}
}
result[desc.Tx.Sha().String()] = mpd
result[tx.Sha().String()] = mpd
}
return result, nil