Remove references from blockManager to rpcServer.

Instead of having the block manager notify the RPC server about
accepted, connected, and disconnected blocks, the RPC server will
directly listen for notifications from the blockchain.
This commit is contained in:
Jim Posen 2017-08-10 17:07:06 -07:00
parent 22de1f6d08
commit 49949d4c96
3 changed files with 83 additions and 59 deletions

View file

@ -9,7 +9,6 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -623,14 +622,6 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Clear the rejected transactions. // Clear the rejected transactions.
b.rejectedTxns = make(map[chainhash.Hash]struct{}) b.rejectedTxns = make(map[chainhash.Hash]struct{})
// Allow any clients performing long polling via the
// getblocktemplate RPC to be notified when the new block causes
// their old block template to become stale.
rpcServer := b.server.rpcServer
if rpcServer != nil {
rpcServer.gbtWorkState.NotifyBlockConnected(blockHash)
}
} }
// Update the block height for this peer. But only send a message to // Update the block height for this peer. But only send a message to
@ -1143,14 +1134,6 @@ out:
} }
} }
// Allow any clients performing long polling via the
// getblocktemplate RPC to be notified when the new block causes
// their old block template to become stale.
rpcServer := b.server.rpcServer
if rpcServer != nil {
rpcServer.gbtWorkState.NotifyBlockConnected(msg.block.Hash())
}
msg.reply <- processBlockResponse{ msg.reply <- processBlockResponse{
isOrphan: isOrphan, isOrphan: isOrphan,
err: nil, err: nil,
@ -1177,9 +1160,10 @@ out:
bmgrLog.Trace("Block handler done") bmgrLog.Trace("Block handler done")
} }
// handleNotifyMsg handles notifications from blockchain. It does things such // handleBlockchainNotification handles notifications from blockchain. It does
// as request orphan block parents and relay accepted blocks to connected peers. // things such as request orphan block parents and relay accepted blocks to
func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { // connected peers.
func (b *blockManager) handleBlockchainNotification(notification *blockchain.Notification) {
switch notification.Type { switch notification.Type {
// A block has been accepted into the block chain. Relay it to other // A block has been accepted into the block chain. Relay it to other
// peers. // peers.
@ -1219,23 +1203,11 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.txMemPool.RemoveTransaction(tx, false) b.txMemPool.RemoveTransaction(tx, false)
b.txMemPool.RemoveDoubleSpends(tx) b.txMemPool.RemoveDoubleSpends(tx)
b.txMemPool.RemoveOrphan(tx) b.txMemPool.RemoveOrphan(tx)
b.server.TransactionConfirmed(tx)
acceptedTxs := b.txMemPool.ProcessOrphans(tx) acceptedTxs := b.txMemPool.ProcessOrphans(tx)
b.server.AnnounceNewTransactions(acceptedTxs) b.server.AnnounceNewTransactions(acceptedTxs)
} }
if r := b.server.rpcServer; r != nil {
// Now that this block is in the blockchain we can mark
// all the transactions (except the coinbase) as no
// longer needing rebroadcasting.
for _, tx := range block.Transactions()[1:] {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
b.server.RemoveRebroadcastInventory(iv)
}
// Notify registered websocket clients of incoming block.
r.ntfnMgr.NotifyBlockConnected(block)
}
// A block has been disconnected from the main block chain. // A block has been disconnected from the main block chain.
case blockchain.NTBlockDisconnected: case blockchain.NTBlockDisconnected:
block, ok := notification.Data.(*btcutil.Block) block, ok := notification.Data.(*btcutil.Block)
@ -1256,11 +1228,6 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.txMemPool.RemoveTransaction(tx, true) b.txMemPool.RemoveTransaction(tx, true)
} }
} }
// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
r.ntfnMgr.NotifyBlockDisconnected(block)
}
} }
} }

View file

@ -4098,6 +4098,42 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s
return &rpc, nil return &rpc, nil
} }
func (s *rpcServer) handleBlockchainNotification(notification *blockchain.Notification) {
switch notification.Type {
case blockchain.NTBlockAccepted:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
rpcsLog.Warnf("Chain accepted notification is not a block.")
break
}
// Allow any clients performing long polling via the
// getblocktemplate RPC to be notified when the new block causes
// their old block template to become stale.
s.gbtWorkState.NotifyBlockConnected(block.Hash())
case blockchain.NTBlockConnected:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
rpcsLog.Warnf("Chain connected notification is not a block.")
break
}
// Notify registered websocket clients of incoming block.
s.ntfnMgr.NotifyBlockConnected(block)
case blockchain.NTBlockDisconnected:
block, ok := notification.Data.(*btcutil.Block)
if !ok {
rpcsLog.Warnf("Chain disconnected notification is not a block.")
break
}
// Notify registered websocket clients.
s.ntfnMgr.NotifyBlockDisconnected(block)
}
}
func init() { func init() {
rpcHandlers = rpcHandlersBeforeInit rpcHandlers = rpcHandlersBeforeInit
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())

View file

@ -14,6 +14,7 @@ import (
"math" "math"
"net" "net"
"runtime" "runtime"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -176,6 +177,7 @@ type server struct {
hashCache *txscript.HashCache hashCache *txscript.HashCache
rpcServer *rpcServer rpcServer *rpcServer
blockManager *blockManager blockManager *blockManager
chain *blockchain.BlockChain
txMemPool *mempool.TxPool txMemPool *mempool.TxPool
cpuMiner *cpuminer.CPUMiner cpuMiner *cpuminer.CPUMiner
modifyRebroadcastInv chan interface{} modifyRebroadcastInv chan interface{}
@ -728,9 +730,8 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
// algorithm used by getheaders. // algorithm used by getheaders.
func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Hash) ([]chainhash.Hash, error) { func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Hash) ([]chainhash.Hash, error) {
// Attempt to look up the height of the provided stop hash. // Attempt to look up the height of the provided stop hash.
chain := s.blockManager.chain
endIdx := int32(math.MaxInt32) endIdx := int32(math.MaxInt32)
height, err := chain.BlockHeightByHash(hashStop) height, err := s.chain.BlockHeightByHash(hashStop)
if err == nil { if err == nil {
endIdx = height + 1 endIdx = height + 1
} }
@ -755,7 +756,7 @@ func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Ha
// This mirrors the behavior in the reference implementation. // This mirrors the behavior in the reference implementation.
startIdx := int32(1) startIdx := int32(1)
for _, loc := range locators { for _, loc := range locators {
height, err := chain.BlockHeightByHash(loc) height, err := s.chain.BlockHeightByHash(loc)
if err == nil { if err == nil {
// Start with the next hash since we know this one. // Start with the next hash since we know this one.
startIdx = height + 1 startIdx = height + 1
@ -770,7 +771,7 @@ func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Ha
} }
// Fetch the inventory from the block database. // Fetch the inventory from the block database.
return chain.HeightRange(startIdx, endIdx) return s.chain.HeightRange(startIdx, endIdx)
} }
// fetchHeaders fetches and decodes headers from the db for each hash in // fetchHeaders fetches and decodes headers from the db for each hash in
@ -1096,6 +1097,18 @@ func (s *server) AnnounceNewTransactions(newTxs []*mempool.TxDesc) {
} }
} }
// Transaction has one confirmation on the main chain. Now we can mark it as no
// longer needing rebroadcasting.
func (s *server) TransactionConfirmed(tx *btcutil.Tx) {
// Rebroadcasting is only necessary when the RPC server is active.
if s.rpcServer == nil {
return
}
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
s.RemoveRebroadcastInventory(iv)
}
// pushTxMsg sends a tx message for the provided transaction hash to the // pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction hash is not known. // connected peer. An error is returned if the transaction hash is not known.
func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
@ -2456,15 +2469,23 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
} }
// Create a new block chain instance with the appropriate configuration. // Create a new block chain instance with the appropriate configuration.
chain, err := blockchain.New(&blockchain.Config{ var err error
DB: s.db, s.chain, err = blockchain.New(&blockchain.Config{
ChainParams: s.chainParams, DB: s.db,
Checkpoints: checkpoints, ChainParams: s.chainParams,
TimeSource: s.timeSource, Checkpoints: checkpoints,
Notifications: bm.handleNotifyMsg, TimeSource: s.timeSource,
SigCache: s.sigCache, // TODO: Modify blockchain to be able to register multiple listeners and
IndexManager: indexManager, // have the block manager and RPC server subscribe directly.
HashCache: s.hashCache, Notifications: func(notification *blockchain.Notification) {
s.blockManager.handleBlockchainNotification(notification)
if s.rpcServer != nil {
s.rpcServer.handleBlockchainNotification(notification)
}
},
SigCache: s.sigCache,
IndexManager: indexManager,
HashCache: s.hashCache,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -2482,11 +2503,11 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
MaxTxVersion: 2, MaxTxVersion: 2,
}, },
ChainParams: chainParams, ChainParams: chainParams,
FetchUtxoView: chain.FetchUtxoView, FetchUtxoView: s.chain.FetchUtxoView,
BestHeight: func() int32 { return chain.BestSnapshot().Height }, BestHeight: func() int32 { return s.chain.BestSnapshot().Height },
MedianTimePast: func() time.Time { return chain.BestSnapshot().MedianTime }, MedianTimePast: func() time.Time { return s.chain.BestSnapshot().MedianTime },
CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) { CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) {
return chain.CalcSequenceLock(tx, view, true) return s.chain.CalcSequenceLock(tx, view, true)
}, },
IsDeploymentActive: bm.chain.IsDeploymentActive, IsDeploymentActive: bm.chain.IsDeploymentActive,
SigCache: s.sigCache, SigCache: s.sigCache,
@ -2495,7 +2516,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
} }
s.txMemPool = mempool.New(&txC) s.txMemPool = mempool.New(&txC)
s.bm, err = newBlockManager(&s, indexManager, &chain, s.txMemPool) s.blockManager, err = newBlockManager(&s, indexManager, s.chain, s.txMemPool)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2514,15 +2535,15 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
TxMinFreeFee: cfg.minRelayTxFee, TxMinFreeFee: cfg.minRelayTxFee,
} }
blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy, blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy,
s.chainParams, s.txMemPool, s.blockManager.chain, s.timeSource, s.chainParams, s.txMemPool, s.chain, s.timeSource,
s.sigCache, s.hashCache) s.sigCache, s.hashCache)
s.cpuMiner = cpuminer.New(&cpuminer.Config{ s.cpuMiner = cpuminer.New(&cpuminer.Config{
ChainParams: chainParams, ChainParams: chainParams,
BlockTemplateGenerator: blockTemplateGenerator, BlockTemplateGenerator: blockTemplateGenerator,
MiningAddrs: cfg.miningAddrs, MiningAddrs: cfg.miningAddrs,
ProcessBlock: bm.ProcessBlock, ProcessBlock: s.blockManager.ProcessBlock,
ConnectedCount: s.ConnectedCount, ConnectedCount: s.ConnectedCount,
IsCurrent: bm.IsCurrent, IsCurrent: s.blockManager.IsCurrent,
}) })
// Only setup a function to return new addresses to connect to when // Only setup a function to return new addresses to connect to when