diff --git a/blockmanager.go b/blockmanager.go index 757ae71f..ae683c45 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -9,7 +9,6 @@ import ( "net" "os" "path/filepath" - "sort" "sync" "sync/atomic" "time" @@ -623,14 +622,6 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Clear the rejected transactions. b.rejectedTxns = make(map[chainhash.Hash]struct{}) - - // Allow any clients performing long polling via the - // getblocktemplate RPC to be notified when the new block causes - // their old block template to become stale. - rpcServer := b.server.rpcServer - if rpcServer != nil { - rpcServer.gbtWorkState.NotifyBlockConnected(blockHash) - } } // Update the block height for this peer. But only send a message to @@ -1143,14 +1134,6 @@ out: } } - // Allow any clients performing long polling via the - // getblocktemplate RPC to be notified when the new block causes - // their old block template to become stale. - rpcServer := b.server.rpcServer - if rpcServer != nil { - rpcServer.gbtWorkState.NotifyBlockConnected(msg.block.Hash()) - } - msg.reply <- processBlockResponse{ isOrphan: isOrphan, err: nil, @@ -1177,9 +1160,10 @@ out: bmgrLog.Trace("Block handler done") } -// handleNotifyMsg handles notifications from blockchain. It does things such -// as request orphan block parents and relay accepted blocks to connected peers. -func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { +// handleBlockchainNotification handles notifications from blockchain. It does +// things such as request orphan block parents and relay accepted blocks to +// connected peers. +func (b *blockManager) handleBlockchainNotification(notification *blockchain.Notification) { switch notification.Type { // A block has been accepted into the block chain. Relay it to other // peers. @@ -1219,23 +1203,11 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { b.txMemPool.RemoveTransaction(tx, false) b.txMemPool.RemoveDoubleSpends(tx) b.txMemPool.RemoveOrphan(tx) + b.server.TransactionConfirmed(tx) acceptedTxs := b.txMemPool.ProcessOrphans(tx) b.server.AnnounceNewTransactions(acceptedTxs) } - if r := b.server.rpcServer; r != nil { - // Now that this block is in the blockchain we can mark - // all the transactions (except the coinbase) as no - // longer needing rebroadcasting. - for _, tx := range block.Transactions()[1:] { - iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) - b.server.RemoveRebroadcastInventory(iv) - } - - // Notify registered websocket clients of incoming block. - r.ntfnMgr.NotifyBlockConnected(block) - } - // A block has been disconnected from the main block chain. case blockchain.NTBlockDisconnected: block, ok := notification.Data.(*btcutil.Block) @@ -1256,11 +1228,6 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { b.txMemPool.RemoveTransaction(tx, true) } } - - // Notify registered websocket clients. - if r := b.server.rpcServer; r != nil { - r.ntfnMgr.NotifyBlockDisconnected(block) - } } } diff --git a/rpcserver.go b/rpcserver.go index 01e28f38..ef0dc703 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4098,6 +4098,42 @@ func newRPCServer(listenAddrs []string, generator *mining.BlkTmplGenerator, s *s return &rpc, nil } +func (s *rpcServer) handleBlockchainNotification(notification *blockchain.Notification) { + switch notification.Type { + case blockchain.NTBlockAccepted: + block, ok := notification.Data.(*btcutil.Block) + if !ok { + rpcsLog.Warnf("Chain accepted notification is not a block.") + break + } + + // Allow any clients performing long polling via the + // getblocktemplate RPC to be notified when the new block causes + // their old block template to become stale. + s.gbtWorkState.NotifyBlockConnected(block.Hash()) + + case blockchain.NTBlockConnected: + block, ok := notification.Data.(*btcutil.Block) + if !ok { + rpcsLog.Warnf("Chain connected notification is not a block.") + break + } + + // Notify registered websocket clients of incoming block. + s.ntfnMgr.NotifyBlockConnected(block) + + case blockchain.NTBlockDisconnected: + block, ok := notification.Data.(*btcutil.Block) + if !ok { + rpcsLog.Warnf("Chain disconnected notification is not a block.") + break + } + + // Notify registered websocket clients. + s.ntfnMgr.NotifyBlockDisconnected(block) + } +} + func init() { rpcHandlers = rpcHandlersBeforeInit rand.Seed(time.Now().UnixNano()) diff --git a/server.go b/server.go index ecf1a80d..b2837929 100644 --- a/server.go +++ b/server.go @@ -14,6 +14,7 @@ import ( "math" "net" "runtime" + "sort" "strconv" "strings" "sync" @@ -176,6 +177,7 @@ type server struct { hashCache *txscript.HashCache rpcServer *rpcServer blockManager *blockManager + chain *blockchain.BlockChain txMemPool *mempool.TxPool cpuMiner *cpuminer.CPUMiner modifyRebroadcastInv chan interface{} @@ -728,9 +730,8 @@ func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) { // algorithm used by getheaders. func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Hash) ([]chainhash.Hash, error) { // Attempt to look up the height of the provided stop hash. - chain := s.blockManager.chain endIdx := int32(math.MaxInt32) - height, err := chain.BlockHeightByHash(hashStop) + height, err := s.chain.BlockHeightByHash(hashStop) if err == nil { endIdx = height + 1 } @@ -755,7 +756,7 @@ func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Ha // This mirrors the behavior in the reference implementation. startIdx := int32(1) for _, loc := range locators { - height, err := chain.BlockHeightByHash(loc) + height, err := s.chain.BlockHeightByHash(loc) if err == nil { // Start with the next hash since we know this one. startIdx = height + 1 @@ -770,7 +771,7 @@ func (s *server) locateBlocks(locators []*chainhash.Hash, hashStop *chainhash.Ha } // Fetch the inventory from the block database. - return chain.HeightRange(startIdx, endIdx) + return s.chain.HeightRange(startIdx, endIdx) } // fetchHeaders fetches and decodes headers from the db for each hash in @@ -1096,6 +1097,18 @@ func (s *server) AnnounceNewTransactions(newTxs []*mempool.TxDesc) { } } +// Transaction has one confirmation on the main chain. Now we can mark it as no +// longer needing rebroadcasting. +func (s *server) TransactionConfirmed(tx *btcutil.Tx) { + // Rebroadcasting is only necessary when the RPC server is active. + if s.rpcServer == nil { + return + } + + iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) + s.RemoveRebroadcastInventory(iv) +} + // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction hash is not known. func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, @@ -2456,15 +2469,23 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } // Create a new block chain instance with the appropriate configuration. - chain, err := blockchain.New(&blockchain.Config{ - DB: s.db, - ChainParams: s.chainParams, - Checkpoints: checkpoints, - TimeSource: s.timeSource, - Notifications: bm.handleNotifyMsg, - SigCache: s.sigCache, - IndexManager: indexManager, - HashCache: s.hashCache, + var err error + s.chain, err = blockchain.New(&blockchain.Config{ + DB: s.db, + ChainParams: s.chainParams, + Checkpoints: checkpoints, + TimeSource: s.timeSource, + // TODO: Modify blockchain to be able to register multiple listeners and + // have the block manager and RPC server subscribe directly. + Notifications: func(notification *blockchain.Notification) { + s.blockManager.handleBlockchainNotification(notification) + if s.rpcServer != nil { + s.rpcServer.handleBlockchainNotification(notification) + } + }, + SigCache: s.sigCache, + IndexManager: indexManager, + HashCache: s.hashCache, }) if err != nil { return nil, err @@ -2482,11 +2503,11 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param MaxTxVersion: 2, }, ChainParams: chainParams, - FetchUtxoView: chain.FetchUtxoView, - BestHeight: func() int32 { return chain.BestSnapshot().Height }, - MedianTimePast: func() time.Time { return chain.BestSnapshot().MedianTime }, + FetchUtxoView: s.chain.FetchUtxoView, + BestHeight: func() int32 { return s.chain.BestSnapshot().Height }, + MedianTimePast: func() time.Time { return s.chain.BestSnapshot().MedianTime }, CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) { - return chain.CalcSequenceLock(tx, view, true) + return s.chain.CalcSequenceLock(tx, view, true) }, IsDeploymentActive: bm.chain.IsDeploymentActive, SigCache: s.sigCache, @@ -2495,7 +2516,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } s.txMemPool = mempool.New(&txC) - s.bm, err = newBlockManager(&s, indexManager, &chain, s.txMemPool) + s.blockManager, err = newBlockManager(&s, indexManager, s.chain, s.txMemPool) if err != nil { return nil, err } @@ -2514,15 +2535,15 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param TxMinFreeFee: cfg.minRelayTxFee, } blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy, - s.chainParams, s.txMemPool, s.blockManager.chain, s.timeSource, + s.chainParams, s.txMemPool, s.chain, s.timeSource, s.sigCache, s.hashCache) s.cpuMiner = cpuminer.New(&cpuminer.Config{ ChainParams: chainParams, BlockTemplateGenerator: blockTemplateGenerator, MiningAddrs: cfg.miningAddrs, - ProcessBlock: bm.ProcessBlock, + ProcessBlock: s.blockManager.ProcessBlock, ConnectedCount: s.ConnectedCount, - IsCurrent: bm.IsCurrent, + IsCurrent: s.blockManager.IsCurrent, }) // Only setup a function to return new addresses to connect to when