diff --git a/blockmanager.go b/blockmanager.go index b5180972..8c7b23d9 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -21,6 +21,11 @@ import ( const ( chanBufferSize = 50 + // minInFlightBlocks is the minimum number of blocks that should be + // in the request queue for headers-first mode before requesting + // more. + minInFlightBlocks = 10 + // blockDbNamePrefix is the prefix for the block database name. The // database type is appended to this value to form the full block // database name. @@ -65,6 +70,13 @@ type txMsg struct { peer *peer } +// headerNode is used as a node in a list of headers that are linked together +// between checkpoints. +type headerNode struct { + height int64 + sha *btcwire.ShaHash +} + // blockManager provides a concurrency safe block manager for handling all // incoming blocks. type blockManager struct { @@ -84,20 +96,55 @@ type blockManager struct { wg sync.WaitGroup quit chan bool - headerPool map[btcwire.ShaHash]*headerstr - headerOrphan map[btcwire.ShaHash]*headerstr - fetchingHeaders bool - startBlock *btcwire.ShaHash - fetchBlock *btcwire.ShaHash - lastBlock *btcwire.ShaHash - latestCheckpoint *btcchain.Checkpoint + // The following fields are used for headers-first mode. + headersFirstMode bool + headerList *list.List + startHeader *list.Element + nextCheckpoint *btcchain.Checkpoint } -type headerstr struct { - header *btcwire.BlockHeader - next *headerstr - height int - sha btcwire.ShaHash +// resetHeaderState sets the headers-first mode state to values appropriate for +// syncing from a new peer. +func (b *blockManager) resetHeaderState(newestHash *btcwire.ShaHash, newestHeight int64) { + b.headersFirstMode = false + b.headerList.Init() + b.startHeader = nil + + // When there is a next checkpoint, add an entry for the latest known + // block into the header pool. This allows the next downloaded header + // to prove it links to the chain properly. + if b.nextCheckpoint != nil { + node := headerNode{height: newestHeight, sha: newestHash} + b.headerList.PushBack(&node) + } +} + +// findNextHeaderCheckpoint returns the next checkpoint after the passed height. +// It returns nil when there is not one either because the height is already +// later than the final checkpoint or some other reason such as disabled +// checkpoints. +func (b *blockManager) findNextHeaderCheckpoint(height int64) *btcchain.Checkpoint { + checkpoints := b.blockChain.Checkpoints() + if checkpoints == nil { + return nil + } + + // There is no next checkpoint if the height is already after the final + // checkpoint. + finalCheckpoint := &checkpoints[len(checkpoints)-1] + if height >= finalCheckpoint.Height { + return nil + } + + // Find the next checkpoint. + nextCheckpoint := finalCheckpoint + for i := len(checkpoints) - 2; i >= 0; i-- { + if height >= checkpoints[i].Height { + break + } + nextCheckpoint = &checkpoints[i] + } + return nextCheckpoint } // startSync will choose the best peer among the available candidate peers to @@ -151,14 +198,31 @@ func (b *blockManager) startSync(peers *list.List) { bmgrLog.Infof("Syncing to block height %d from peer %v", bestPeer.lastBlock, bestPeer.addr) - // if starting from the beginning fetch headers and download - // blocks based on that, otherwise compute the block download - // via inv messages. Regression test mode does not support the - // headers-first approach so do normal block downloads when in - // regression test mode. - if height == 0 && !cfg.RegressionTest && !cfg.DisableCheckpoints { - bestPeer.PushGetHeadersMsg(locator) - b.fetchingHeaders = true + // When the current height is less than a known checkpoint we + // can use block headers to learn about which blocks comprise + // the chain up to the checkpoint and perform less validation + // for them. This is possible since each header contains the + // hash of the previous header and a merkle root. Therefore if + // we validate all of the received headers link together + // properly and the checkpoint hashes match, we can be sure the + // hashes for the blocks in between are accurate. Further, once + // the full blocks are downloaded, the merkle root is computed + // and compared against the value in the header which proves the + // full block hasn't been tampered with. + // + // Once we have passed the final checkpoint, or checkpoints are + // disabled, use standard inv messages learn about the blocks + // and fully validate them. Finally, regression test mode does + // not support the headers-first approach so do normal block + // downloads when in regression test mode. + if b.nextCheckpoint != nil && height < b.nextCheckpoint.Height && + !cfg.RegressionTest && !cfg.DisableCheckpoints { + + bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + b.headersFirstMode = true + bmgrLog.Infof("Downloading headers for blocks %d to "+ + "%d from peer %s", height+1, + b.nextCheckpoint.Height, bestPeer.addr) } else { bestPeer.PushGetBlocksMsg(locator, &zeroHash) } @@ -249,17 +313,22 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) { } // Attempt to find a new peer to sync from if the quitting peer is the - // sync peer. + // sync peer. Also, reset the headers-first state if in headers-first + // mode so if b.syncPeer != nil && b.syncPeer == p { - if b.fetchingHeaders { - b.fetchingHeaders = false - b.startBlock = nil - b.fetchBlock = nil - b.lastBlock = nil - b.headerPool = make(map[btcwire.ShaHash]*headerstr) - b.headerOrphan = make(map[btcwire.ShaHash]*headerstr) - } b.syncPeer = nil + if b.headersFirstMode { + // This really shouldn't fail. We have a fairly + // unrecoverable database issue if it does. + newestHash, height, err := b.server.db.NewestSha() + if err != nil { + bmgrLog.Warnf("Unable to obtain latest "+ + "block information from the database: "+ + "%v", err) + return + } + b.resetHeaderState(newestHash, height) + } b.startSync(peers) } } @@ -277,7 +346,7 @@ func (b *blockManager) logBlockHeight(block *btcutil.Block) { return } - // Truncated the duration to 10s of milliseconds. + // Truncate the duration to 10s of milliseconds. durationMillis := int64(duration / time.Millisecond) tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) @@ -364,16 +433,6 @@ func (b *blockManager) current() bool { // handleBlockMsg handles block messages from all peers. func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { - defer func() { - if b.startBlock != nil && - len(bmsg.peer.requestedBlocks) < 10 { - - // block queue getting short, ask for more. - b.fetchHeaderBlocks() - } - - }() - // If we didn't ask for this block then the peer is misbehaving. blockSha, _ := bmsg.block.Sha() if _, ok := bmsg.peer.requestedBlocks[*blockSha]; !ok { @@ -394,21 +453,26 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // handler can request the parent blocks from the appropriate peer. b.blockPeer[*blockSha] = bmsg.peer + // When in headers-first mode, if the block matches the hash of the + // first header in the list of headers that are being fetched, it's + // eligible for less validation since the headers have already been + // verified to link together and are valid up to the next checkpoint. + // Also, remove the list entry for all blocks except the checkpoint + // since it is needed to verify the next round of headers links + // properly. + isCheckpointBlock := false fastAdd := false - if b.fetchBlock != nil && blockSha.IsEqual(b.fetchBlock) { - firstblock, ok := b.headerPool[*blockSha] - if ok { - if b.latestCheckpoint == nil { - b.latestCheckpoint = - b.blockChain.LatestCheckpoint() - } - if int64(firstblock.height) <= - b.latestCheckpoint.Height { - + if b.headersFirstMode { + firstNodeEl := b.headerList.Front() + if firstNodeEl != nil { + firstNode := firstNodeEl.Value.(*headerNode) + if blockSha.IsEqual(firstNode.sha) { fastAdd = true - } - if firstblock.next != nil { - b.fetchBlock = &firstblock.next.sha + if firstNode.sha.IsEqual(b.nextCheckpoint.Hash) { + isCheckpointBlock = true + } else { + b.headerList.Remove(firstNodeEl) + } } } } @@ -419,16 +483,6 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { delete(bmsg.peer.requestedBlocks, *blockSha) delete(b.requestedBlocks, *blockSha) - if fastAdd && blockSha.IsEqual(b.lastBlock) { - // have processed all blocks, switch to normal handling - b.fetchingHeaders = false - b.startBlock = nil - b.fetchBlock = nil - b.lastBlock = nil - b.headerPool = make(map[btcwire.ShaHash]*headerstr) - b.headerOrphan = make(map[btcwire.ShaHash]*headerstr) - } - // Process the block to include validation, best chain selection, orphan // handling, etc. err := b.blockChain.ProcessBlock(bmsg.block, fastAdd) @@ -456,116 +510,202 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Sync the db to disk. b.server.db.Sync() + + // Nothing more to do if we aren't in headers-first mode. + if !b.headersFirstMode { + return + } + + // This is headers-first mode, so if the block is not a checkpoint + // request more blocks using the header list when the request queue is + // getting short. + if !isCheckpointBlock { + if b.startHeader != nil && + len(bmsg.peer.requestedBlocks) < minInFlightBlocks { + b.fetchHeaderBlocks() + } + return + } + + // This is headers-first mode and the block is a checkpoint. When + // there is a next checkpoint, get the next round of headers by asking + // for headers starting from the block after this one up to the next + // checkpoint. + prevHeight := b.nextCheckpoint.Height + prevHash := b.nextCheckpoint.Hash + b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight) + if b.nextCheckpoint != nil { + locator := btcchain.BlockLocator{prevHash} + err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) + if err != nil { + bmgrLog.Warnf("Failed to send getheaders message to "+ + "peer %s: %v", bmsg.peer.addr, err) + return + } + bmgrLog.Infof("Downloading headers for blocks %d to %d from "+ + "peer %s", prevHeight+1, b.nextCheckpoint.Height, + b.syncPeer.addr) + return + } + + // This is headers-first mode, the block is a checkpoint, and there are + // no more checkpoints, so switch to normal mode by requesting blocks + // from the block after this one up to the end of the chain (zero hash). + b.headersFirstMode = false + b.headerList.Init() + bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode") + locator := btcchain.BlockLocator{blockSha} + err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash) + if err != nil { + bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v", + bmsg.peer.addr, err) + return + } } -// fetchHeaderBlocks is creates and sends a request to the syncPeer for -// the next list of blocks to be downloaded. +// fetchHeaderBlocks creates and sends a request to the syncPeer for the next +// list of blocks to be downloaded based on the current list of headers. func (b *blockManager) fetchHeaderBlocks() { - gdmsg := btcwire.NewMsgGetDataSizeHint(btcwire.MaxInvPerMsg) + // Nothing to do if there is not start header. + if b.startHeader == nil { + bmgrLog.Warnf("fetchHeaderBlocks called with no start header") + return + } + + // Build up a getdata request for the list of blocks the headers + // describe. The size hint will be limited to btcwire.MaxInvPerMsg by + // the function, so no need to double check it here. + gdmsg := btcwire.NewMsgGetDataSizeHint(uint(b.headerList.Len())) numRequested := 0 - startBlock := b.startBlock - for { - if b.startBlock == nil { - break - } - blockHash := b.startBlock - firstblock, ok := b.headerPool[*blockHash] + for e := b.startHeader; e != nil; e = e.Next() { + node, ok := e.Value.(*headerNode) if !ok { - bmgrLog.Warnf("current fetch block %v missing from headerPool", blockHash) - break + bmgrLog.Warn("Header list node type is not a headerNode") + continue } - iv := btcwire.NewInvVect(btcwire.InvTypeBlock, blockHash) + + iv := btcwire.NewInvVect(btcwire.InvTypeBlock, node.sha) if !b.haveInventory(iv) { - b.requestedBlocks[*blockHash] = true - b.syncPeer.requestedBlocks[*blockHash] = true + b.requestedBlocks[*node.sha] = true + b.syncPeer.requestedBlocks[*node.sha] = true gdmsg.AddInvVect(iv) numRequested++ } - - if b.fetchBlock == nil { - b.fetchBlock = b.startBlock - } - if firstblock.next == nil { - b.startBlock = nil - break - } else { - b.startBlock = &firstblock.next.sha - } - + b.startHeader = e.Next() if numRequested >= btcwire.MaxInvPerMsg { break } } if len(gdmsg.InvList) > 0 { - bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList)) - b.syncPeer.QueueMessage(gdmsg, nil) } } // handleHeadersMsghandles headers messages from all peers. -func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) { - msg := bmsg.headers +func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) { + // The remote peer is misbehaving if we didn't request headers. + msg := hmsg.headers + numHeaders := len(msg.Headers) + if !b.headersFirstMode { + bmgrLog.Warnf("Got %d unrequested headers from %s -- "+ + "disconnecting", numHeaders, hmsg.peer.addr) + hmsg.peer.Disconnect() + return + } - nheaders := len(msg.Headers) - if nheaders == 0 { + // Nothing to do for an empty headers message. + if numHeaders == 0 { + return + } + + // Process all of the received headers ensuring each one connects to the + // previous and that checkpoints match. + receivedCheckpoint := false + var finalHash *btcwire.ShaHash + for _, blockHeader := range msg.Headers { + blockHash, err := blockHeader.BlockSha() + if err != nil { + bmgrLog.Warnf("Failed to compute hash of header "+ + "received from peer %s -- disconnecting", + hmsg.peer.addr) + hmsg.peer.Disconnect() + return + } + finalHash = &blockHash + + // Ensure there is a previous header to compare against. + prevNodeEl := b.headerList.Back() + if prevNodeEl == nil { + bmgrLog.Warnf("Header list does not contain a previous" + + "element as expected -- disconnecting peer") + hmsg.peer.Disconnect() + return + } + + // Ensure the header properly connects to the previous one and + // add it to the list of headers. + node := headerNode{sha: &blockHash} + prevNode := prevNodeEl.Value.(*headerNode) + if prevNode.sha.IsEqual(&blockHeader.PrevBlock) { + node.height = prevNode.height + 1 + e := b.headerList.PushBack(&node) + if b.startHeader == nil { + b.startHeader = e + } + } else { + bmgrLog.Warnf("Received block header that does not"+ + "properly connect to the chain from peer %s "+ + "-- disconnecting", hmsg.peer.addr) + hmsg.peer.Disconnect() + return + } + + // Verify the header at the next checkpoint height matches. + if node.height == b.nextCheckpoint.Height { + if node.sha.IsEqual(b.nextCheckpoint.Hash) { + receivedCheckpoint = true + bmgrLog.Infof("Verified downloaded block "+ + "header against checkpoint at height "+ + "%d/hash %s", node.height, node.sha) + } else { + bmgrLog.Warnf("Block header at height %d/hash "+ + "%s from peer %s does NOT match "+ + "expected checkpoint hash of %s -- "+ + "disconnecting", node.height, + node.sha, hmsg.peer.addr, + b.nextCheckpoint.Hash) + hmsg.peer.Disconnect() + return + } + break + } + } + + // When this header is a checkpoint, switch to fetching the blocks for + // all of the headers since the last checkpoint. + if receivedCheckpoint { + // Since the first entry of the list is always the final block + // that is already in the database and is only used to ensure + // the next header links properly, it must be removed before + // fetching the blocks. + b.headerList.Remove(b.headerList.Front()) bmgrLog.Infof("Received %v block headers: Fetching blocks", - len(b.headerPool)) + b.headerList.Len()) + b.lastBlockLogTime = time.Now() b.fetchHeaderBlocks() return } - var blockhash btcwire.ShaHash - if b.latestCheckpoint == nil { - b.latestCheckpoint = b.blockChain.LatestCheckpoint() - } - - for hdridx := range msg.Headers { - blockhash, _ = msg.Headers[hdridx].BlockSha() - var headerst headerstr - headerst.header = msg.Headers[hdridx] - headerst.sha = blockhash - prev, ok := b.headerPool[headerst.header.PrevBlock] - if ok { - if prev.next == nil { - prev.next = &headerst - } else { - bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash) - } - headerst.height = prev.height + 1 - } else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) { - ok = true - headerst.height = 1 - b.startBlock = &headerst.sha - } - if int64(headerst.height) == b.latestCheckpoint.Height { - if headerst.sha.IsEqual(b.latestCheckpoint.Hash) { - // we can trust this header first download - // TODO flag this? - } else { - // XXX marker does not match, must throw - // away headers !?!?! - // XXX dont trust peer? - } - } - if ok { - b.headerPool[blockhash] = &headerst - b.lastBlock = &blockhash - } else { - bmgrLog.Infof("found orphan block %v", blockhash) - b.headerOrphan[headerst.header.PrevBlock] = &headerst - } - } - - // Construct the getheaders request and queue it to be sent. - ghmsg := btcwire.NewMsgGetHeaders() - err := ghmsg.AddBlockLocatorHash(&blockhash) + // This header is not a checkpoint, so request the next batch of + // headers starting from the latest known header and ending with the + // next checkpoint. + locator := btcchain.BlockLocator{finalHash} + err := hmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash) if err != nil { - bmgrLog.Infof("msgheaders bad addheaders", blockhash) + bmgrLog.Warnf("Failed to send getheaders message to "+ + "peer %s: %v", hmsg.peer.addr, err) return } - - b.syncPeer.QueueMessage(ghmsg, nil) } // haveInventory returns whether or not the inventory represented by the passed @@ -632,12 +772,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // for the peer. imsg.peer.AddKnownInventory(iv) - if b.fetchingHeaders { - // if we are fetching headers and already know - // about a block, do not add process it. - if _, ok := b.headerPool[iv.Hash]; ok { - continue - } + // Ignore inventory when we're in headers-first mode. + if b.headersFirstMode { + continue } // Request the inventory if we don't already have it. @@ -921,8 +1058,8 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { // QueueHeaders adds the passed headers message and peer to the block handling // queue. func (b *blockManager) QueueHeaders(headers *btcwire.MsgHeaders, p *peer) { - // No channel handling here because peers do not need to block on inv - // messages. + // No channel handling here because peers do not need to block on + // headers messages. if atomic.LoadInt32(&b.shutdown) != 0 { return } @@ -970,6 +1107,11 @@ func (b *blockManager) Stop() error { // newBlockManager returns a new bitcoin block manager. // Use Start to begin processing asynchronous block and inv updates. func newBlockManager(s *server) (*blockManager, error) { + newestHash, height, err := s.db.NewestSha() + if err != nil { + return nil, err + } + bm := blockManager{ server: s, blockPeer: make(map[btcwire.ShaHash]*peer), @@ -977,19 +1119,24 @@ func newBlockManager(s *server) (*blockManager, error) { requestedBlocks: make(map[btcwire.ShaHash]bool), lastBlockLogTime: time.Now(), msgChan: make(chan interface{}, cfg.MaxPeers*3), - headerPool: make(map[btcwire.ShaHash]*headerstr), - headerOrphan: make(map[btcwire.ShaHash]*headerstr), + headerList: list.New(), quit: make(chan bool), } bm.blockChain = btcchain.New(s.db, s.btcnet, bm.handleNotifyMsg) bm.blockChain.DisableCheckpoints(cfg.DisableCheckpoints) - if cfg.DisableCheckpoints { + if !cfg.DisableCheckpoints { + // Initialize the next checkpoint based on the current height. + bm.nextCheckpoint = bm.findNextHeaderCheckpoint(height) + if bm.nextCheckpoint != nil { + bm.resetHeaderState(newestHash, height) + } + } else { bmgrLog.Info("Checkpoints are disabled") } bmgrLog.Infof("Generating initial block node index. This may " + "take a while...") - err := bm.blockChain.GenerateInitialIndex() + err = bm.blockChain.GenerateInitialIndex() if err != nil { return nil, err } diff --git a/peer.go b/peer.go index 7221770c..f7e3bf05 100644 --- a/peer.go +++ b/peer.go @@ -151,6 +151,8 @@ type peer struct { retryCount int64 prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager + prevGetHdrsBegin *btcwire.ShaHash // owned by blockmanager + prevGetHdrsStop *btcwire.ShaHash // owned by blockmanager requestQueue *list.List continueHash *btcwire.ShaHash outputQueue chan outMsg @@ -458,7 +460,7 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire // PushGetHeadersMsg sends a getblocks message for the provided block locator // and stop hash. It will ignore back-to-back duplicate requests. -func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { +func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error { // Extract the begin hash from the block locator, if one was specified, // to use for filtering duplicate getheaders requests. var beginHash *btcwire.ShaHash @@ -467,9 +469,9 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { } // Filter duplicate getheaders requests. - if p.prevGetBlocksBegin != nil && - beginHash != nil && - beginHash.IsEqual(p.prevGetBlocksBegin) { + if p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil && + beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) && + beginHash.IsEqual(p.prevGetHdrsBegin) { peerLog.Tracef("PEER: Filtering duplicate [getheaders] with "+ "begin hash %v", beginHash) @@ -478,6 +480,7 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { // Construct the getheaders request and queue it to be sent. msg := btcwire.NewMsgGetHeaders() + msg.HashStop = *stopHash for _, hash := range locator { err := msg.AddBlockLocatorHash(hash) if err != nil { @@ -488,7 +491,8 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { // Update the previous getheaders request information for filtering // duplicates. - p.prevGetBlocksBegin = beginHash + p.prevGetHdrsBegin = beginHash + p.prevGetHdrsStop = stopHash return nil }