diff --git a/blockmanager.go b/blockmanager.go index ae683c45..2557dc37 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -133,10 +133,30 @@ type headerNode struct { hash *chainhash.Hash } +// PeerNotifier exposes methods to notify peers of status changes to +// transactions, blocks, etc. Currently server implements this interface. +type PeerNotifier interface { + AnnounceNewTransactions(newTxs []*mempool.TxDesc) + + UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *serverPeer) + + RelayInventory(invVect *wire.InvVect, data interface{}) + + TransactionConfirmed(tx *btcutil.Tx) +} + +// blockManangerConfig is a configuration struct used to initialize a new +// blockManager. +type blockManagerConfig struct { + PeerNotifier PeerNotifier + Chain *blockchain.BlockChain + TxMemPool *mempool.TxPool +} + // blockManager provides a concurrency safe block manager for handling all // incoming blocks. type blockManager struct { - server *server + peerNotifier PeerNotifier started int32 shutdown int32 chain *blockchain.BlockChain @@ -467,7 +487,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { return } - b.server.AnnounceNewTransactions(acceptedTxs) + b.peerNotifier.AnnounceNewTransactions(acceptedTxs) } // current returns true if we believe we are synced with our peers, false if we @@ -631,7 +651,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { if blkHashUpdate != nil && heightUpdate != 0 { bmsg.peer.UpdateLastBlockHeight(heightUpdate) if isOrphan || b.current() { - go b.server.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer) + go b.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, bmsg.peer) } } @@ -1182,7 +1202,7 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not // Generate the inventory vector and relay it. iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) - b.server.RelayInventory(iv, block.MsgBlock().Header) + b.peerNotifier.RelayInventory(iv, block.MsgBlock().Header) // A block has been connected to the main block chain. case blockchain.NTBlockConnected: @@ -1203,9 +1223,9 @@ func (b *blockManager) handleBlockchainNotification(notification *blockchain.Not b.txMemPool.RemoveTransaction(tx, false) b.txMemPool.RemoveDoubleSpends(tx) b.txMemPool.RemoveOrphan(tx) - b.server.TransactionConfirmed(tx) + b.peerNotifier.TransactionConfirmed(tx) acceptedTxs := b.txMemPool.ProcessOrphans(tx) - b.server.AnnounceNewTransactions(acceptedTxs) + b.peerNotifier.AnnounceNewTransactions(acceptedTxs) } // A block has been disconnected from the main block chain. @@ -1360,14 +1380,11 @@ func (b *blockManager) Pause() chan<- struct{} { // newBlockManager returns a new bitcoin block manager. // Use Start to begin processing asynchronous block and inv updates. -func newBlockManager( - s *server, indexManager blockchain.IndexManager, - chain *blockchain.BlockChain, txMemPool *mempool.TxPool, -) (*blockManager, error) { +func newBlockManager(config *blockManagerConfig) (*blockManager, error) { bm := blockManager{ - server: s, - chain: chain, - txMemPool: txMemPool, + peerNotifier: config.PeerNotifier, + chain: config.Chain, + txMemPool: config.TxMemPool, rejectedTxns: make(map[chainhash.Hash]struct{}), requestedTxns: make(map[chainhash.Hash]struct{}), requestedBlocks: make(map[chainhash.Hash]struct{}), @@ -1377,9 +1394,6 @@ func newBlockManager( quit: make(chan struct{}), } - // Register blockchain notification callbacks - bm.chain.Subscribe(bm.handleNotifyMsg) - best := bm.chain.BestSnapshot() if !cfg.DisableCheckpoints { // Initialize the next checkpoint based on the current height. @@ -1391,6 +1405,8 @@ func newBlockManager( bmgrLog.Info("Checkpoints are disabled") } + bm.chain.Subscribe(bm.handleBlockchainNotification) + return &bm, nil } diff --git a/rpcserver.go b/rpcserver.go index ef0dc703..f8b8ecf2 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4095,9 +4095,13 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s rpc.listeners = listeners + s.chain.Subscribe(rpc.handleBlockchainNotification) + return &rpc, nil } +// Callback for notifications from blockchain. It notifies clients that are +// long polling for changes or subscribed to websockets notifications. func (s *rpcServer) handleBlockchainNotification(notification *blockchain.Notification) { switch notification.Type { case blockchain.NTBlockAccepted: diff --git a/server.go b/server.go index b2837929..a408348f 100644 --- a/server.go +++ b/server.go @@ -2471,18 +2471,10 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param // Create a new block chain instance with the appropriate configuration. var err error s.chain, err = blockchain.New(&blockchain.Config{ - DB: s.db, - ChainParams: s.chainParams, - Checkpoints: checkpoints, - TimeSource: s.timeSource, - // TODO: Modify blockchain to be able to register multiple listeners and - // have the block manager and RPC server subscribe directly. - Notifications: func(notification *blockchain.Notification) { - s.blockManager.handleBlockchainNotification(notification) - if s.rpcServer != nil { - s.rpcServer.handleBlockchainNotification(notification) - } - }, + DB: s.db, + ChainParams: s.chainParams, + Checkpoints: checkpoints, + TimeSource: s.timeSource, SigCache: s.sigCache, IndexManager: indexManager, HashCache: s.hashCache, @@ -2509,14 +2501,18 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) { return s.chain.CalcSequenceLock(tx, view, true) }, - IsDeploymentActive: bm.chain.IsDeploymentActive, + IsDeploymentActive: s.chain.IsDeploymentActive, SigCache: s.sigCache, HashCache: s.hashCache, AddrIndex: s.addrIndex, } s.txMemPool = mempool.New(&txC) - s.blockManager, err = newBlockManager(&s, indexManager, s.chain, s.txMemPool) + s.blockManager, err = newBlockManager(&blockManagerConfig{ + PeerNotifier: &s, + Chain: s.chain, + TxMemPool: s.txMemPool, + }) if err != nil { return nil, err }