// Copyright (c) 2013-2014 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package main import ( "container/list" "net" "os" "path/filepath" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" ) const ( chanBufferSize = 50 // minInFlightBlocks is the minimum number of blocks that should be // in the request queue for headers-first mode before requesting // more. minInFlightBlocks = 10 // blockDbNamePrefix is the prefix for the block database name. The // database type is appended to this value to form the full block // database name. blockDbNamePrefix = "blocks" ) // zeroHash is the zero value hash (all zeros). It is defined as a convenience. var zeroHash wire.ShaHash // newPeerMsg signifies a newly connected peer to the block handler. type newPeerMsg struct { peer *serverPeer } // blockMsg packages a bitcoin block message and the peer it came from together // so the block handler has access to that information. type blockMsg struct { block *btcutil.Block peer *serverPeer } // invMsg packages a bitcoin inv message and the peer it came from together // so the block handler has access to that information. type invMsg struct { inv *wire.MsgInv peer *serverPeer } // headersMsg packages a bitcoin headers message and the peer it came from // together so the block handler has access to that information. type headersMsg struct { headers *wire.MsgHeaders peer *serverPeer } // donePeerMsg signifies a newly disconnected peer to the block handler. type donePeerMsg struct { peer *serverPeer } // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { tx *btcutil.Tx peer *serverPeer } // getSyncPeerMsg is a message type to be sent across the message channel for // retrieving the current sync peer. type getSyncPeerMsg struct { reply chan *serverPeer } // checkConnectBlockMsg is a message type to be sent across the message channel // for requesting chain to check if a block connects to the end of the current // main chain. type checkConnectBlockMsg struct { block *btcutil.Block reply chan error } // calcNextReqDifficultyResponse is a response sent to the reply channel of a // calcNextReqDifficultyMsg query. type calcNextReqDifficultyResponse struct { difficulty uint32 err error } // calcNextReqDifficultyMsg is a message type to be sent across the message // channel for requesting the required difficulty of the next block. type calcNextReqDifficultyMsg struct { timestamp time.Time reply chan calcNextReqDifficultyResponse } // processBlockResponse is a response sent to the reply channel of a // processBlockMsg. type processBlockResponse struct { isOrphan bool err error } // fetchTransactionStoreResponse is a response sent to the reply channel of a // fetchTransactionStoreMsg. type fetchTransactionStoreResponse struct { TxStore blockchain.TxStore err error } // fetchTransactionStoreMsg is a message type to be sent across the message // channel fetching the tx input store for some Tx. type fetchTransactionStoreMsg struct { tx *btcutil.Tx reply chan fetchTransactionStoreResponse } // processBlockMsg is a message type to be sent across the message channel // for requested a block is processed. Note this call differs from blockMsg // above in that blockMsg is intended for blocks that came from peers and have // extra handling whereas this message essentially is just a concurrent safe // way to call ProcessBlock on the internal block chain instance. type processBlockMsg struct { block *btcutil.Block flags blockchain.BehaviorFlags reply chan processBlockResponse } // isCurrentMsg is a message type to be sent across the message channel for // requesting whether or not the block manager believes it is synced with // the currently connected peers. type isCurrentMsg struct { reply chan bool } // pauseMsg is a message type to be sent across the message channel for // pausing the block manager. This effectively provides the caller with // exclusive access over the manager until a receive is performed on the // unpause channel. type pauseMsg struct { unpause <-chan struct{} } // headerNode is used as a node in a list of headers that are linked together // between checkpoints. type headerNode struct { height int32 sha *wire.ShaHash } // chainState tracks the state of the best chain as blocks are inserted. This // is done because btcchain is currently not safe for concurrent access and the // block manager is typically quite busy processing block and inventory. // Therefore, requesting this information from chain through the block manager // would not be anywhere near as efficient as simply updating it as each block // is inserted and protecting it with a mutex. type chainState struct { sync.Mutex newestHash *wire.ShaHash newestHeight int32 pastMedianTime time.Time pastMedianTimeErr error } // Best returns the block hash and height known for the tip of the best known // chain. // // This function is safe for concurrent access. func (c *chainState) Best() (*wire.ShaHash, int32) { c.Lock() defer c.Unlock() return c.newestHash, c.newestHeight } // blockManager provides a concurrency safe block manager for handling all // incoming blocks. type blockManager struct { server *server started int32 shutdown int32 blockChain *blockchain.BlockChain requestedTxns map[wire.ShaHash]struct{} requestedBlocks map[wire.ShaHash]struct{} progressLogger *blockProgressLogger receivedLogBlocks int64 receivedLogTx int64 processingReqs bool syncPeer *serverPeer msgChan chan interface{} chainState chainState wg sync.WaitGroup quit chan struct{} // The following fields are used for headers-first mode. headersFirstMode bool headerList *list.List startHeader *list.Element nextCheckpoint *chaincfg.Checkpoint } // resetHeaderState sets the headers-first mode state to values appropriate for // syncing from a new peer. func (b *blockManager) resetHeaderState(newestHash *wire.ShaHash, newestHeight int32) { b.headersFirstMode = false b.headerList.Init() b.startHeader = nil // When there is a next checkpoint, add an entry for the latest known // block into the header pool. This allows the next downloaded header // to prove it links to the chain properly. if b.nextCheckpoint != nil { node := headerNode{height: newestHeight, sha: newestHash} b.headerList.PushBack(&node) } } // updateChainState updates the chain state associated with the block manager. // This allows fast access to chain information since btcchain is currently not // safe for concurrent access and the block manager is typically quite busy // processing block and inventory. func (b *blockManager) updateChainState(newestHash *wire.ShaHash, newestHeight int32) { b.chainState.Lock() defer b.chainState.Unlock() b.chainState.newestHash = newestHash b.chainState.newestHeight = newestHeight medianTime, err := b.blockChain.CalcPastMedianTime() if err != nil { b.chainState.pastMedianTimeErr = err } else { b.chainState.pastMedianTime = medianTime } } // findNextHeaderCheckpoint returns the next checkpoint after the passed height. // It returns nil when there is not one either because the height is already // later than the final checkpoint or some other reason such as disabled // checkpoints. func (b *blockManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint { // There is no next checkpoint if checkpoints are disabled or there are // none for this current network. if cfg.DisableCheckpoints { return nil } checkpoints := b.server.chainParams.Checkpoints if len(checkpoints) == 0 { return nil } // There is no next checkpoint if the height is already after the final // checkpoint. finalCheckpoint := &checkpoints[len(checkpoints)-1] if height >= finalCheckpoint.Height { return nil } // Find the next checkpoint. nextCheckpoint := finalCheckpoint for i := len(checkpoints) - 2; i >= 0; i-- { if height >= checkpoints[i].Height { break } nextCheckpoint = &checkpoints[i] } return nextCheckpoint } // startSync will choose the best peer among the available candidate peers to // download/sync the blockchain from. When syncing is already running, it // simply returns. It also examines the candidates for any which are no longer // candidates and removes them as needed. func (b *blockManager) startSync(peers *list.List) { // Return now if we're already syncing. if b.syncPeer != nil { return } // Find the height of the current known best block. _, height, err := b.server.db.NewestSha() if err != nil { bmgrLog.Errorf("%v", err) return } var bestPeer *serverPeer var enext *list.Element for e := peers.Front(); e != nil; e = enext { enext = e.Next() sp := e.Value.(*serverPeer) // Remove sync candidate peers that are no longer candidates due // to passing their latest known block. NOTE: The < is // intentional as opposed to <=. While techcnically the peer // doesn't have a later block when it's equal, it will likely // have one soon so it is a reasonable choice. It also allows // the case where both are at 0 such as during regression test. if sp.LastBlock() < int32(height) { peers.Remove(e) continue } // TODO(davec): Use a better algorithm to choose the best peer. // For now, just pick the first available candidate. bestPeer = sp } // Start syncing from the best peer if one was selected. if bestPeer != nil { locator, err := b.blockChain.LatestBlockLocator() if err != nil { bmgrLog.Errorf("Failed to get block locator for the "+ "latest block: %v", err) return } bmgrLog.Infof("Syncing to block height %d from peer %v", bestPeer.LastBlock(), bestPeer.Addr()) // When the current height is less than a known checkpoint we // can use block headers to learn about which blocks comprise // the chain up to the checkpoint and perform less validation // for them. This is possible since each header contains the // hash of the previous header and a merkle root. Therefore if // we validate all of the received headers link together // properly and the checkpoint hashes match, we can be sure the // hashes for the blocks in between are accurate. Further, once // the full blocks are downloaded, the merkle root is computed // and compared against the value in the header which proves the // full block hasn't been tampered with. // // Once we have passed the final checkpoint, or checkpoints are // disabled, use standard inv messages learn about the blocks // and fully validate them. Finally, regression test mode does // not support the headers-first approach so do normal block // downloads when in regression test mode. if b.nextCheckpoint != nil && height < b.nextCheckpoint.Height && !cfg.RegressionTest && !cfg.DisableCheckpoints { bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) b.headersFirstMode = true bmgrLog.Infof("Downloading headers for blocks %d to "+ "%d from peer %s", height+1, b.nextCheckpoint.Height, bestPeer.Addr()) } else { bestPeer.PushGetBlocksMsg(locator, &zeroHash) } b.syncPeer = bestPeer } else { bmgrLog.Warnf("No sync peer candidates available") } } // isSyncCandidate returns whether or not the peer is a candidate to consider // syncing from. func (b *blockManager) isSyncCandidate(sp *serverPeer) bool { // Typically a peer is not a candidate for sync if it's not a full node, // however regression test is special in that the regression tool is // not a full node and still needs to be considered a sync candidate. if cfg.RegressionTest { // The peer is not a candidate if it's not coming from localhost // or the hostname can't be determined for some reason. host, _, err := net.SplitHostPort(sp.Addr()) if err != nil { return false } if host != "127.0.0.1" && host != "localhost" { return false } } else { // The peer is not a candidate for sync if it's not a full node. if sp.Services()&wire.SFNodeNetwork != wire.SFNodeNetwork { return false } } // Candidate if all checks passed. return true } // handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. func (b *blockManager) handleNewPeerMsg(peers *list.List, sp *serverPeer) { // Ignore if in the process of shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return } bmgrLog.Infof("New valid peer %s (%s)", sp, sp.UserAgent()) // Ignore the peer if it's not a sync candidate. if !b.isSyncCandidate(sp) { return } // Add the peer as a candidate to sync from. peers.PushBack(sp) // Start syncing by choosing the best candidate if needed. b.startSync(peers) } // handleDonePeerMsg deals with peers that have signalled they are done. It // removes the peer as a candidate for syncing and in the case where it was // the current sync peer, attempts to select a new best peer to sync from. It // is invoked from the syncHandler goroutine. func (b *blockManager) handleDonePeerMsg(peers *list.List, sp *serverPeer) { // Remove the peer from the list of candidate peers. for e := peers.Front(); e != nil; e = e.Next() { if e.Value == sp { peers.Remove(e) break } } bmgrLog.Infof("Lost peer %s", sp) // Remove requested transactions from the global map so that they will // be fetched from elsewhere next time we get an inv. for k := range sp.requestedTxns { delete(b.requestedTxns, k) } // Remove requested blocks from the global map so that they will be // fetched from elsewhere next time we get an inv. // TODO(oga) we could possibly here check which peers have these blocks // and request them now to speed things up a little. for k := range sp.requestedBlocks { delete(b.requestedBlocks, k) } // Attempt to find a new peer to sync from if the quitting peer is the // sync peer. Also, reset the headers-first state if in headers-first // mode so if b.syncPeer != nil && b.syncPeer == sp { b.syncPeer = nil if b.headersFirstMode { // This really shouldn't fail. We have a fairly // unrecoverable database issue if it does. newestHash, height, err := b.server.db.NewestSha() if err != nil { bmgrLog.Warnf("Unable to obtain latest "+ "block information from the database: "+ "%v", err) return } b.resetHeaderState(newestHash, height) } b.startSync(peers) } } // handleTxMsg handles transaction messages from all peers. func (b *blockManager) handleTxMsg(tmsg *txMsg) { // NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of // sending an inventory message and allowing the remote peer to decide // whether or not they want to request the transaction via a getdata // message. Unfortunately, the reference implementation permits // unrequested data, so it has allowed wallets that don't follow the // spec to proliferate. While this is not ideal, there is no check here // to disconnect peers for sending unsolicited transactions to provide // interoperability. // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. allowOrphans := cfg.MaxOrphanTxs > 0 err := b.server.txMemPool.ProcessTransaction(tmsg.tx, allowOrphans, true) // Remove transaction from request maps. Either the mempool/chain // already knows about it and as such we shouldn't have any more // instances of trying to fetch it, or we failed to insert and thus // we'll retry next time we get an inv. txHash := tmsg.tx.Sha() delete(tmsg.peer.requestedTxns, *txHash) delete(b.requestedTxns, *txHash) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, // so log it as such. Otherwise, something really did go wrong, // so log it as an actual error. if _, ok := err.(RuleError); ok { bmgrLog.Debugf("Rejected transaction %v from %s: %v", txHash, tmsg.peer, err) } else { bmgrLog.Errorf("Failed to process transaction %v: %v", txHash, err) } // Convert the error into an appropriate reject message and // send it. code, reason := errToRejectErr(err) tmsg.peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false) return } } // current returns true if we believe we are synced with our peers, false if we // still have blocks to check func (b *blockManager) current() bool { if !b.blockChain.IsCurrent(b.server.timeSource) { return false } // if blockChain thinks we are current and we have no syncPeer it // is probably right. if b.syncPeer == nil { return true } _, height, err := b.server.db.NewestSha() // No matter what chain thinks, if we are below the block we are // syncing to we are not current. // TODO(oga) we can get chain to return the height of each block when we // parse an orphan, which would allow us to update the height of peers // from what it was at initial handshake. if err != nil || height < b.syncPeer.LastBlock() { return false } return true } // handleBlockMsg handles block messages from all peers. func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // If we didn't ask for this block then the peer is misbehaving. blockSha := bmsg.block.Sha() if _, exists := bmsg.peer.requestedBlocks[*blockSha]; !exists { // The regression test intentionally sends some blocks twice // to test duplicate block insertion fails. Don't disconnect // the peer or ignore the block when we're in regression test // mode in this case so the chain code is actually fed the // duplicate blocks. if !cfg.RegressionTest { bmgrLog.Warnf("Got unrequested block %v from %s -- "+ "disconnecting", blockSha, bmsg.peer.Addr()) bmsg.peer.Disconnect() return } } // When in headers-first mode, if the block matches the hash of the // first header in the list of headers that are being fetched, it's // eligible for less validation since the headers have already been // verified to link together and are valid up to the next checkpoint. // Also, remove the list entry for all blocks except the checkpoint // since it is needed to verify the next round of headers links // properly. isCheckpointBlock := false behaviorFlags := blockchain.BFNone if b.headersFirstMode { firstNodeEl := b.headerList.Front() if firstNodeEl != nil { firstNode := firstNodeEl.Value.(*headerNode) if blockSha.IsEqual(firstNode.sha) { behaviorFlags |= blockchain.BFFastAdd if firstNode.sha.IsEqual(b.nextCheckpoint.Hash) { isCheckpointBlock = true } else { b.headerList.Remove(firstNodeEl) } } } } // Remove block from request maps. Either chain will know about it and // so we shouldn't have any more instances of trying to fetch it, or we // will fail the insert and thus we'll retry next time we get an inv. delete(bmsg.peer.requestedBlocks, *blockSha) delete(b.requestedBlocks, *blockSha) // Process the block to include validation, best chain selection, orphan // handling, etc. isOrphan, err := b.blockChain.ProcessBlock(bmsg.block, b.server.timeSource, behaviorFlags) if err != nil { // When the error is a rule error, it means the block was simply // rejected as opposed to something actually going wrong, so log // it as such. Otherwise, something really did go wrong, so log // it as an actual error. if _, ok := err.(blockchain.RuleError); ok { bmgrLog.Infof("Rejected block %v from %s: %v", blockSha, bmsg.peer, err) } else { bmgrLog.Errorf("Failed to process block %v: %v", blockSha, err) } // Convert the error into an appropriate reject message and // send it. code, reason := errToRejectErr(err) bmsg.peer.PushRejectMsg(wire.CmdBlock, code, reason, blockSha, false) return } // Meta-data about the new block this peer is reporting. We use this // below to update this peer's lastest block height and the heights of // other peers based on their last announced block sha. This allows us // to dynamically update the block heights of peers, avoiding stale heights // when looking for a new sync peer. Upon acceptance of a block or // recognition of an orphan, we also use this information to update // the block heights over other peers who's invs may have been ignored // if we are actively syncing while the chain is not yet current or // who may have lost the lock announcment race. var heightUpdate int32 var blkShaUpdate *wire.ShaHash // Request the parents for the orphan block from the peer that sent it. if isOrphan { // We've just received an orphan block from a peer. In order // to update the height of the peer, we try to extract the // block height from the scriptSig of the coinbase transaction. // Extraction is only attempted if the block's version is // high enough (ver 2+). header := &bmsg.block.MsgBlock().Header if blockchain.ShouldHaveSerializedBlockHeight(header) { coinbaseTx := bmsg.block.Transactions()[0] cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx) if err != nil { bmgrLog.Warnf("Unable to extract height from "+ "coinbase tx: %v", err) } else { bmgrLog.Debugf("Extracted height of %v from "+ "orphan block", cbHeight) heightUpdate = int32(cbHeight) blkShaUpdate = blockSha } } orphanRoot := b.blockChain.GetOrphanRoot(blockSha) locator, err := b.blockChain.LatestBlockLocator() if err != nil { bmgrLog.Warnf("Failed to get block locator for the "+ "latest block: %v", err) } else { bmsg.peer.PushGetBlocksMsg(locator, orphanRoot) } } else { // When the block is not an orphan, log information about it and // update the chain state. b.progressLogger.LogBlockHeight(bmsg.block) // Query the db for the latest best block since the block // that was processed could be on a side chain or have caused // a reorg. newestSha, newestHeight, _ := b.server.db.NewestSha() b.updateChainState(newestSha, newestHeight) // Update this peer's latest block height, for future // potential sync node candidacy. heightUpdate = int32(newestHeight) blkShaUpdate = newestSha // Allow any clients performing long polling via the // getblocktemplate RPC to be notified when the new block causes // their old block template to become stale. rpcServer := b.server.rpcServer if rpcServer != nil { rpcServer.gbtWorkState.NotifyBlockConnected(blockSha) } } // Update the block height for this peer. But only send a message to // the server for updating peer heights if this is an orphan or our // chain is "current". This avoids sending a spammy amount of messages // if we're syncing the chain from scratch. if blkShaUpdate != nil && heightUpdate != 0 { bmsg.peer.UpdateLastBlockHeight(heightUpdate) if isOrphan || b.current() { go b.server.UpdatePeerHeights(blkShaUpdate, int32(heightUpdate), bmsg.peer) } } // Sync the db to disk. b.server.db.Sync() // Nothing more to do if we aren't in headers-first mode. if !b.headersFirstMode { return } // This is headers-first mode, so if the block is not a checkpoint // request more blocks using the header list when the request queue is // getting short. if !isCheckpointBlock { if b.startHeader != nil && len(bmsg.peer.requestedBlocks) < minInFlightBlocks { b.fetchHeaderBlocks() } return } // This is headers-first mode and the block is a checkpoint. When // there is a next checkpoint, get the next round of headers by asking // for headers starting from the block after this one up to the next // checkpoint. prevHeight := b.nextCheckpoint.Height prevHash := b.nextCheckpoint.Hash b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) if b.nextCheckpoint != nil { locator := blockchain.BlockLocator([]*wire.ShaHash{prevHash}) err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { bmgrLog.Warnf("Failed to send getheaders message to "+ "peer %s: %v", bmsg.peer.Addr(), err) return } bmgrLog.Infof("Downloading headers for blocks %d to %d from "+ "peer %s", prevHeight+1, b.nextCheckpoint.Height, b.syncPeer.Addr()) return } // This is headers-first mode, the block is a checkpoint, and there are // no more checkpoints, so switch to normal mode by requesting blocks // from the block after this one up to the end of the chain (zero hash). b.headersFirstMode = false b.headerList.Init() bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode") locator := blockchain.BlockLocator([]*wire.ShaHash{blockSha}) err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) if err != nil { bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v", bmsg.peer.Addr(), err) return } } // fetchHeaderBlocks creates and sends a request to the syncPeer for the next // list of blocks to be downloaded based on the current list of headers. func (b *blockManager) fetchHeaderBlocks() { // Nothing to do if there is no start header. if b.startHeader == nil { bmgrLog.Warnf("fetchHeaderBlocks called with no start header") return } // Build up a getdata request for the list of blocks the headers // describe. The size hint will be limited to wire.MaxInvPerMsg by // the function, so no need to double check it here. gdmsg := wire.NewMsgGetDataSizeHint(uint(b.headerList.Len())) numRequested := 0 for e := b.startHeader; e != nil; e = e.Next() { node, ok := e.Value.(*headerNode) if !ok { bmgrLog.Warn("Header list node type is not a headerNode") continue } iv := wire.NewInvVect(wire.InvTypeBlock, node.sha) haveInv, err := b.haveInventory(iv) if err != nil { bmgrLog.Warnf("Unexpected failure when checking for "+ "existing inventory during header block "+ "fetch: %v", err) } if !haveInv { b.requestedBlocks[*node.sha] = struct{}{} b.syncPeer.requestedBlocks[*node.sha] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ } b.startHeader = e.Next() if numRequested >= wire.MaxInvPerMsg { break } } if len(gdmsg.InvList) > 0 { b.syncPeer.QueueMessage(gdmsg, nil) } } // handleHeadersMsghandles headers messages from all peers. func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { // The remote peer is misbehaving if we didn't request headers. msg := hmsg.headers numHeaders := len(msg.Headers) if !b.headersFirstMode { bmgrLog.Warnf("Got %d unrequested headers from %s -- "+ "disconnecting", numHeaders, hmsg.peer.Addr()) hmsg.peer.Disconnect() return } // Nothing to do for an empty headers message. if numHeaders == 0 { return } // Process all of the received headers ensuring each one connects to the // previous and that checkpoints match. receivedCheckpoint := false var finalHash *wire.ShaHash for _, blockHeader := range msg.Headers { blockHash := blockHeader.BlockSha() finalHash = &blockHash // Ensure there is a previous header to compare against. prevNodeEl := b.headerList.Back() if prevNodeEl == nil { bmgrLog.Warnf("Header list does not contain a previous" + "element as expected -- disconnecting peer") hmsg.peer.Disconnect() return } // Ensure the header properly connects to the previous one and // add it to the list of headers. node := headerNode{sha: &blockHash} prevNode := prevNodeEl.Value.(*headerNode) if prevNode.sha.IsEqual(&blockHeader.PrevBlock) { node.height = prevNode.height + 1 e := b.headerList.PushBack(&node) if b.startHeader == nil { b.startHeader = e } } else { bmgrLog.Warnf("Received block header that does not "+ "properly connect to the chain from peer %s "+ "-- disconnecting", hmsg.peer.Addr()) hmsg.peer.Disconnect() return } // Verify the header at the next checkpoint height matches. if node.height == b.nextCheckpoint.Height { if node.sha.IsEqual(b.nextCheckpoint.Hash) { receivedCheckpoint = true bmgrLog.Infof("Verified downloaded block "+ "header against checkpoint at height "+ "%d/hash %s", node.height, node.sha) } else { bmgrLog.Warnf("Block header at height %d/hash "+ "%s from peer %s does NOT match "+ "expected checkpoint hash of %s -- "+ "disconnecting", node.height, node.sha, hmsg.peer.Addr(), b.nextCheckpoint.Hash) hmsg.peer.Disconnect() return } break } } // When this header is a checkpoint, switch to fetching the blocks for // all of the headers since the last checkpoint. if receivedCheckpoint { // Since the first entry of the list is always the final block // that is already in the database and is only used to ensure // the next header links properly, it must be removed before // fetching the blocks. b.headerList.Remove(b.headerList.Front()) bmgrLog.Infof("Received %v block headers: Fetching blocks", b.headerList.Len()) b.progressLogger.SetLastLogTime(time.Now()) b.fetchHeaderBlocks() return } // This header is not a checkpoint, so request the next batch of // headers starting from the latest known header and ending with the // next checkpoint. locator := blockchain.BlockLocator([]*wire.ShaHash{finalHash}) err := hmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { bmgrLog.Warnf("Failed to send getheaders message to "+ "peer %s: %v", hmsg.peer.Addr(), err) return } } // haveInventory returns whether or not the inventory represented by the passed // inventory vector is known. This includes checking all of the various places // inventory can be when it is in different states such as blocks that are part // of the main chain, on a side chain, in the orphan pool, and transactions that // are in the memory pool (either the main pool or orphan pool). func (b *blockManager) haveInventory(invVect *wire.InvVect) (bool, error) { switch invVect.Type { case wire.InvTypeBlock: // Ask chain if the block is known to it in any form (main // chain, side chain, or orphan). return b.blockChain.HaveBlock(&invVect.Hash) case wire.InvTypeTx: // Ask the transaction memory pool if the transaction is known // to it in any form (main pool or orphan). if b.server.txMemPool.HaveTransaction(&invVect.Hash) { return true, nil } // Check if the transaction exists from the point of view of the // end of the main chain. return b.server.db.ExistsTxSha(&invVect.Hash) } // The requested inventory is is an unsupported type, so just claim // it is known to avoid requesting it. return true, nil } // handleInvMsg handles inv messages from all peers. // We examine the inventory advertised by the remote peer and act accordingly. func (b *blockManager) handleInvMsg(imsg *invMsg) { // Attempt to find the final block in the inventory list. There may // not be one. lastBlock := -1 invVects := imsg.inv.InvList for i := len(invVects) - 1; i >= 0; i-- { if invVects[i].Type == wire.InvTypeBlock { lastBlock = i break } } // If this inv contains a block announcement, and this isn't coming from // our current sync peer or we're current, then update the last // announced block for this peer. We'll use this information later to // update the heights of peers based on blocks we've accepted that they // previously announced. if lastBlock != -1 && (imsg.peer != b.syncPeer || b.current()) { imsg.peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) } // Ignore invs from peers that aren't the sync if we are not current. // Helps prevent fetching a mass of orphans. if imsg.peer != b.syncPeer && !b.current() { return } // If our chain is current and a peer announces a block we already // know of, then update their current block height. if lastBlock != -1 && b.current() { exists, err := b.server.db.ExistsSha(&invVects[lastBlock].Hash) if err == nil && exists { blkHeight, err := b.server.db.FetchBlockHeightBySha(&invVects[lastBlock].Hash) if err != nil { bmgrLog.Warnf("Unable to fetch block height for block (sha: %v), %v", &invVects[lastBlock].Hash, err) } else { imsg.peer.UpdateLastBlockHeight(int32(blkHeight)) } } } // Request the advertised inventory if we don't already have it. Also, // request parent blocks of orphans if we receive one we already have. // Finally, attempt to detect potential stalls due to long side chains // we already have and request more blocks to prevent them. chain := b.blockChain for i, iv := range invVects { // Ignore unsupported inventory types. if iv.Type != wire.InvTypeBlock && iv.Type != wire.InvTypeTx { continue } // Add the inventory to the cache of known inventory // for the peer. imsg.peer.AddKnownInventory(iv) // Ignore inventory when we're in headers-first mode. if b.headersFirstMode { continue } // Request the inventory if we don't already have it. haveInv, err := b.haveInventory(iv) if err != nil { bmgrLog.Warnf("Unexpected failure when checking for "+ "existing inventory during inv message "+ "processing: %v", err) continue } if !haveInv { // Add it to the request queue. imsg.peer.requestQueue = append(imsg.peer.requestQueue, iv) continue } if iv.Type == wire.InvTypeBlock { // The block is an orphan block that we already have. // When the existing orphan was processed, it requested // the missing parent blocks. When this scenario // happens, it means there were more blocks missing // than are allowed into a single inventory message. As // a result, once this peer requested the final // advertised block, the remote peer noticed and is now // resending the orphan block as an available block // to signal there are more missing blocks that need to // be requested. if chain.IsKnownOrphan(&iv.Hash) { // Request blocks starting at the latest known // up to the root of the orphan that just came // in. orphanRoot := chain.GetOrphanRoot(&iv.Hash) locator, err := chain.LatestBlockLocator() if err != nil { bmgrLog.Errorf("PEER: Failed to get block "+ "locator for the latest block: "+ "%v", err) continue } imsg.peer.PushGetBlocksMsg(locator, orphanRoot) continue } // We already have the final block advertised by this // inventory message, so force a request for more. This // should only happen if we're on a really long side // chain. if i == lastBlock { // Request blocks after this one up to the // final one the remote peer knows about (zero // stop hash). locator := chain.BlockLocatorFromHash(&iv.Hash) imsg.peer.PushGetBlocksMsg(locator, &zeroHash) } } } // Request as much as possible at once. Anything that won't fit into // the request will be requested on the next inv message. numRequested := 0 gdmsg := wire.NewMsgGetData() requestQueue := imsg.peer.requestQueue for len(requestQueue) != 0 { iv := requestQueue[0] requestQueue[0] = nil requestQueue = requestQueue[1:] switch iv.Type { case wire.InvTypeBlock: // Request the block if there is not already a pending // request. if _, exists := b.requestedBlocks[iv.Hash]; !exists { b.requestedBlocks[iv.Hash] = struct{}{} imsg.peer.requestedBlocks[iv.Hash] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ } case wire.InvTypeTx: // Request the transaction if there is not already a // pending request. if _, exists := b.requestedTxns[iv.Hash]; !exists { b.requestedTxns[iv.Hash] = struct{}{} imsg.peer.requestedTxns[iv.Hash] = struct{}{} gdmsg.AddInvVect(iv) numRequested++ } } if numRequested >= wire.MaxInvPerMsg { break } } imsg.peer.requestQueue = requestQueue if len(gdmsg.InvList) > 0 { imsg.peer.QueueMessage(gdmsg, nil) } } // blockHandler is the main handler for the block manager. It must be run // as a goroutine. It processes block and inv messages in a separate goroutine // from the peer handlers so the block (MsgBlock) messages are handled by a // single thread without needing to lock memory data structures. This is // important because the block manager controls which blocks are needed and how // the fetching should proceed. func (b *blockManager) blockHandler() { candidatePeers := list.New() out: for { select { case m := <-b.msgChan: switch msg := m.(type) { case *newPeerMsg: b.handleNewPeerMsg(candidatePeers, msg.peer) case *txMsg: b.handleTxMsg(msg) msg.peer.txProcessed <- struct{}{} case *blockMsg: b.handleBlockMsg(msg) msg.peer.blockProcessed <- struct{}{} case *invMsg: b.handleInvMsg(msg) case *headersMsg: b.handleHeadersMsg(msg) case *donePeerMsg: b.handleDonePeerMsg(candidatePeers, msg.peer) case getSyncPeerMsg: msg.reply <- b.syncPeer case checkConnectBlockMsg: err := b.blockChain.CheckConnectBlock(msg.block) msg.reply <- err case calcNextReqDifficultyMsg: difficulty, err := b.blockChain.CalcNextRequiredDifficulty( msg.timestamp) msg.reply <- calcNextReqDifficultyResponse{ difficulty: difficulty, err: err, } case fetchTransactionStoreMsg: txStore, err := b.blockChain.FetchTransactionStore(msg.tx, false) msg.reply <- fetchTransactionStoreResponse{ TxStore: txStore, err: err, } case processBlockMsg: isOrphan, err := b.blockChain.ProcessBlock( msg.block, b.server.timeSource, msg.flags) if err != nil { msg.reply <- processBlockResponse{ isOrphan: false, err: err, } } // Query the db for the latest best block since // the block that was processed could be on a // side chain or have caused a reorg. newestSha, newestHeight, _ := b.server.db.NewestSha() b.updateChainState(newestSha, newestHeight) // Allow any clients performing long polling via the // getblocktemplate RPC to be notified when the new block causes // their old block template to become stale. rpcServer := b.server.rpcServer if rpcServer != nil { rpcServer.gbtWorkState.NotifyBlockConnected(msg.block.Sha()) } msg.reply <- processBlockResponse{ isOrphan: isOrphan, err: nil, } case isCurrentMsg: msg.reply <- b.current() case pauseMsg: // Wait until the sender unpauses the manager. <-msg.unpause default: bmgrLog.Warnf("Invalid message type in block "+ "handler: %T", msg) } case <-b.quit: break out } } b.wg.Done() bmgrLog.Trace("Block handler done") } // handleNotifyMsg handles notifications from blockchain. It does things such // as request orphan block parents and relay accepted blocks to connected peers. func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { switch notification.Type { // A block has been accepted into the block chain. Relay it to other // peers. case blockchain.NTBlockAccepted: // Don't relay if we are not current. Other peers that are // current should already know about it. if !b.current() { return } block, ok := notification.Data.(*btcutil.Block) if !ok { bmgrLog.Warnf("Chain accepted notification is not a block.") break } // Generate the inventory vector and relay it. iv := wire.NewInvVect(wire.InvTypeBlock, block.Sha()) b.server.RelayInventory(iv, nil) // A block has been connected to the main block chain. case blockchain.NTBlockConnected: block, ok := notification.Data.(*btcutil.Block) if !ok { bmgrLog.Warnf("Chain connected notification is not a block.") break } // Remove all of the transactions (except the coinbase) in the // connected block from the transaction pool. Secondly, remove any // transactions which are now double spends as a result of these // new transactions. Finally, remove any transaction that is // no longer an orphan. Note that removing a transaction from // pool also removes any transactions which depend on it, // recursively. for _, tx := range block.Transactions()[1:] { b.server.txMemPool.RemoveTransaction(tx) b.server.txMemPool.RemoveDoubleSpends(tx) b.server.txMemPool.RemoveOrphan(tx.Sha()) b.server.txMemPool.ProcessOrphans(tx.Sha()) } if r := b.server.rpcServer; r != nil { // Now that this block is in the blockchain we can mark // all the transactions (except the coinbase) as no // longer needing rebroadcasting. for _, tx := range block.Transactions()[1:] { iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) b.server.RemoveRebroadcastInventory(iv) } // Notify registered websocket clients of incoming block. r.ntfnMgr.NotifyBlockConnected(block) } // If we're maintaining the address index, and it is up to date // then update it based off this new block. if cfg.AddrIndex && b.server.addrIndexer.IsCaughtUp() { b.server.addrIndexer.UpdateAddressIndex(block) } // A block has been disconnected from the main block chain. case blockchain.NTBlockDisconnected: block, ok := notification.Data.(*btcutil.Block) if !ok { bmgrLog.Warnf("Chain disconnected notification is not a block.") break } // Reinsert all of the transactions (except the coinbase) into // the transaction pool. for _, tx := range block.Transactions()[1:] { _, err := b.server.txMemPool.MaybeAcceptTransaction(tx, false, false) if err != nil { // Remove the transaction and all transactions // that depend on it if it wasn't accepted into // the transaction pool. b.server.txMemPool.RemoveTransaction(tx) } } // Notify registered websocket clients. if r := b.server.rpcServer; r != nil { r.ntfnMgr.NotifyBlockDisconnected(block) } } } // NewPeer informs the block manager of a newly active peer. func (b *blockManager) NewPeer(sp *serverPeer) { // Ignore if we are shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return } b.msgChan <- &newPeerMsg{peer: sp} } // QueueTx adds the passed transaction message and peer to the block handling // queue. func (b *blockManager) QueueTx(tx *btcutil.Tx, sp *serverPeer) { // Don't accept more transactions if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { sp.txProcessed <- struct{}{} return } b.msgChan <- &txMsg{tx: tx, peer: sp} } // QueueBlock adds the passed block message and peer to the block handling queue. func (b *blockManager) QueueBlock(block *btcutil.Block, sp *serverPeer) { // Don't accept more blocks if we're shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { sp.blockProcessed <- struct{}{} return } b.msgChan <- &blockMsg{block: block, peer: sp} } // QueueInv adds the passed inv message and peer to the block handling queue. func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *serverPeer) { // No channel handling here because peers do not need to block on inv // messages. if atomic.LoadInt32(&b.shutdown) != 0 { return } b.msgChan <- &invMsg{inv: inv, peer: sp} } // QueueHeaders adds the passed headers message and peer to the block handling // queue. func (b *blockManager) QueueHeaders(headers *wire.MsgHeaders, sp *serverPeer) { // No channel handling here because peers do not need to block on // headers messages. if atomic.LoadInt32(&b.shutdown) != 0 { return } b.msgChan <- &headersMsg{headers: headers, peer: sp} } // DonePeer informs the blockmanager that a peer has disconnected. func (b *blockManager) DonePeer(sp *serverPeer) { // Ignore if we are shutting down. if atomic.LoadInt32(&b.shutdown) != 0 { return } b.msgChan <- &donePeerMsg{peer: sp} } // Start begins the core block handler which processes block and inv messages. func (b *blockManager) Start() { // Already started? if atomic.AddInt32(&b.started, 1) != 1 { return } bmgrLog.Trace("Starting block manager") b.wg.Add(1) go b.blockHandler() } // Stop gracefully shuts down the block manager by stopping all asynchronous // handlers and waiting for them to finish. func (b *blockManager) Stop() error { if atomic.AddInt32(&b.shutdown, 1) != 1 { bmgrLog.Warnf("Block manager is already in the process of " + "shutting down") return nil } bmgrLog.Infof("Block manager shutting down") close(b.quit) b.wg.Wait() return nil } // SyncPeer returns the current sync peer. func (b *blockManager) SyncPeer() *serverPeer { reply := make(chan *serverPeer) b.msgChan <- getSyncPeerMsg{reply: reply} return <-reply } // CheckConnectBlock performs several checks to confirm connecting the passed // block to the main chain does not violate any rules. This function makes use // of CheckConnectBlock on an internal instance of a block chain. It is funneled // through the block manager since btcchain is not safe for concurrent access. func (b *blockManager) CheckConnectBlock(block *btcutil.Block) error { reply := make(chan error) b.msgChan <- checkConnectBlockMsg{block: block, reply: reply} return <-reply } // CalcNextRequiredDifficulty calculates the required difficulty for the next // block after the current main chain. This function makes use of // CalcNextRequiredDifficulty on an internal instance of a block chain. It is // funneled through the block manager since btcchain is not safe for concurrent // access. func (b *blockManager) CalcNextRequiredDifficulty(timestamp time.Time) (uint32, error) { reply := make(chan calcNextReqDifficultyResponse) b.msgChan <- calcNextReqDifficultyMsg{timestamp: timestamp, reply: reply} response := <-reply return response.difficulty, response.err } // FetchTransactionStore makes use of FetchTransactionStore on an internal // instance of a block chain. It is safe for concurrent access. func (b *blockManager) FetchTransactionStore(tx *btcutil.Tx) (blockchain.TxStore, error) { reply := make(chan fetchTransactionStoreResponse, 1) b.msgChan <- fetchTransactionStoreMsg{tx: tx, reply: reply} response := <-reply return response.TxStore, response.err } // ProcessBlock makes use of ProcessBlock on an internal instance of a block // chain. It is funneled through the block manager since btcchain is not safe // for concurrent access. func (b *blockManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { reply := make(chan processBlockResponse, 1) b.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} response := <-reply return response.isOrphan, response.err } // IsCurrent returns whether or not the block manager believes it is synced with // the connected peers. func (b *blockManager) IsCurrent() bool { reply := make(chan bool) b.msgChan <- isCurrentMsg{reply: reply} return <-reply } // Pause pauses the block manager until the returned channel is closed. // // Note that while paused, all peer and block processing is halted. The // message sender should avoid pausing the block manager for long durations. func (b *blockManager) Pause() chan<- struct{} { c := make(chan struct{}) b.msgChan <- pauseMsg{c} return c } // newBlockManager returns a new bitcoin block manager. // Use Start to begin processing asynchronous block and inv updates. func newBlockManager(s *server) (*blockManager, error) { newestHash, height, err := s.db.NewestSha() if err != nil { return nil, err } bm := blockManager{ server: s, requestedTxns: make(map[wire.ShaHash]struct{}), requestedBlocks: make(map[wire.ShaHash]struct{}), progressLogger: newBlockProgressLogger("Processed", bmgrLog), msgChan: make(chan interface{}, cfg.MaxPeers*3), headerList: list.New(), quit: make(chan struct{}), } bm.progressLogger = newBlockProgressLogger("Processed", bmgrLog) bm.blockChain = blockchain.New(s.db, s.chainParams, bm.handleNotifyMsg, s.sigCache) bm.blockChain.DisableCheckpoints(cfg.DisableCheckpoints) if !cfg.DisableCheckpoints { // Initialize the next checkpoint based on the current height. bm.nextCheckpoint = bm.findNextHeaderCheckpoint(height) if bm.nextCheckpoint != nil { bm.resetHeaderState(newestHash, height) } } else { bmgrLog.Info("Checkpoints are disabled") } bmgrLog.Infof("Generating initial block node index. This may " + "take a while...") err = bm.blockChain.GenerateInitialIndex() if err != nil { return nil, err } bmgrLog.Infof("Block index generation complete") // Initialize the chain state now that the intial block node index has // been generated. bm.updateChainState(newestHash, height) return &bm, nil } // removeRegressionDB removes the existing regression test database if running // in regression test mode and it already exists. func removeRegressionDB(dbPath string) error { // Dont do anything if not in regression test mode. if !cfg.RegressionTest { return nil } // Remove the old regression test database if it already exists. fi, err := os.Stat(dbPath) if err == nil { btcdLog.Infof("Removing regression test database from '%s'", dbPath) if fi.IsDir() { err := os.RemoveAll(dbPath) if err != nil { return err } } else { err := os.Remove(dbPath) if err != nil { return err } } } return nil } // dbPath returns the path to the block database given a database type. func blockDbPath(dbType string) string { // The database name is based on the database type. dbName := blockDbNamePrefix + "_" + dbType if dbType == "sqlite" { dbName = dbName + ".db" } dbPath := filepath.Join(cfg.DataDir, dbName) return dbPath } // warnMultipeDBs shows a warning if multiple block database types are detected. // This is not a situation most users want. It is handy for development however // to support multiple side-by-side databases. func warnMultipeDBs() { // This is intentionally not using the known db types which depend // on the database types compiled into the binary since we want to // detect legacy db types as well. dbTypes := []string{"leveldb", "sqlite"} duplicateDbPaths := make([]string, 0, len(dbTypes)-1) for _, dbType := range dbTypes { if dbType == cfg.DbType { continue } // Store db path as a duplicate db if it exists. dbPath := blockDbPath(dbType) if fileExists(dbPath) { duplicateDbPaths = append(duplicateDbPaths, dbPath) } } // Warn if there are extra databases. if len(duplicateDbPaths) > 0 { selectedDbPath := blockDbPath(cfg.DbType) btcdLog.Warnf("WARNING: There are multiple block chain databases "+ "using different database types.\nYou probably don't "+ "want to waste disk space by having more than one.\n"+ "Your current database is located at [%v].\nThe "+ "additional database is located at %v", selectedDbPath, duplicateDbPaths) } } // setupBlockDB loads (or creates when needed) the block database taking into // account the selected database backend. It also contains additional logic // such warning the user if there are multiple databases which consume space on // the file system and ensuring the regression test database is clean when in // regression test mode. func setupBlockDB() (database.Db, error) { // The memdb backend does not have a file path associated with it, so // handle it uniquely. We also don't want to worry about the multiple // database type warnings when running with the memory database. if cfg.DbType == "memdb" { btcdLog.Infof("Creating block database in memory.") db, err := database.CreateDB(cfg.DbType) if err != nil { return nil, err } return db, nil } warnMultipeDBs() // The database name is based on the database type. dbPath := blockDbPath(cfg.DbType) // The regression test is special in that it needs a clean database for // each run, so remove it now if it already exists. removeRegressionDB(dbPath) btcdLog.Infof("Loading block database from '%s'", dbPath) db, err := database.OpenDB(cfg.DbType, dbPath) if err != nil { // Return the error if it's not because the database // doesn't exist. if err != database.ErrDbDoesNotExist { return nil, err } // Create the db if it does not exist. err = os.MkdirAll(cfg.DataDir, 0700) if err != nil { return nil, err } db, err = database.CreateDB(cfg.DbType, dbPath) if err != nil { return nil, err } } return db, nil } // loadBlockDB opens the block database and returns a handle to it. func loadBlockDB() (database.Db, error) { db, err := setupBlockDB() if err != nil { return nil, err } // Get the latest block height from the database. _, height, err := db.NewestSha() if err != nil { db.Close() return nil, err } // Insert the appropriate genesis block for the bitcoin network being // connected to if needed. if height == -1 { genesis := btcutil.NewBlock(activeNetParams.GenesisBlock) _, err := db.InsertBlock(genesis) if err != nil { db.Close() return nil, err } btcdLog.Infof("Inserted genesis block %v", activeNetParams.GenesisHash) height = 0 } btcdLog.Infof("Block database loaded with block height %d", height) return db, nil }