Rework and improve headers-first mode.
This commit improves how the headers-first mode works in several ways. The previous headers-first code was an initial implementation that did not have all of the bells and whistles and a few less than ideal characteristics. This commit improves the heaers-first code to resolve the issues discussed next. - The previous code only used headers-first mode when starting out from block height 0 rather than allowing it to work starting at any height before the final checkpoint. This means if you stopped the chain download at any point before the final checkpoint and restarted, it would not resume and you therefore would not have the benefit of the faster processing offered by headers-first mode. - Previously all headers (even those after the final checkpoint) were downloaded and only the final checkpoint was verified. This resulted in the following issues: - As the block chain grew, increasingly larger numbers of headers were downloaded and kept in memory - If the node the node serving up the headers was serving an invalid chain, it wouldn't be detected until downloading a large number of headers - When an invalid checkpoint was detected, no action was taken to recover which meant the chain download would essentially be stalled - The headers were kept in memory even though they didn't need to be as merely keeping track of the hashes and heights is enough to provde they properly link together and checkpoints match - There was no logging when headers were being downloaded so it could appear like nothing was happening - Duplicate requests for the same headers weren't being filtered which meant is was possible to inadvertently download the same headers twice only to throw them away. This commit resolves these issues with the following changes: - The current height is now examined at startup and prior each sync peer selection to allow it to resume headers-first mode starting from the known height to the next checkpoint - All checkpoints are now verified and the headers are only downloaded from the current known block height up to the next checkpoint. This has several desirable properties: - The amount of memory required is bounded by the maximum distance between to checkpoints rather than the entire length of the chain - A node serving up an invalid chain is detected very quickly and with little work - When an invalid checkpoint is detected, the headers are simply discarded and the peer is disconnected for serving an invalid chain - When the sync peer disconnets, all current headers are thrown away and, due to the new aforementioned resume code, when a new sync peer is selected, headers-first mode will continue from the last known good block - In addition to reduced memory usage from only keeping information about headers between two checkpoints, the only information now kept in memory about the headers is the hash and height rather than the entire header - There is now logging information about what is happening with headers - Duplicate header requests are now filtered
This commit is contained in:
parent
c6d865f3b5
commit
5ec951f6a7
2 changed files with 312 additions and 161 deletions
459
blockmanager.go
459
blockmanager.go
|
@ -21,6 +21,11 @@ import (
|
|||
const (
|
||||
chanBufferSize = 50
|
||||
|
||||
// minInFlightBlocks is the minimum number of blocks that should be
|
||||
// in the request queue for headers-first mode before requesting
|
||||
// more.
|
||||
minInFlightBlocks = 10
|
||||
|
||||
// blockDbNamePrefix is the prefix for the block database name. The
|
||||
// database type is appended to this value to form the full block
|
||||
// database name.
|
||||
|
@ -65,6 +70,13 @@ type txMsg struct {
|
|||
peer *peer
|
||||
}
|
||||
|
||||
// headerNode is used as a node in a list of headers that are linked together
|
||||
// between checkpoints.
|
||||
type headerNode struct {
|
||||
height int64
|
||||
sha *btcwire.ShaHash
|
||||
}
|
||||
|
||||
// blockManager provides a concurrency safe block manager for handling all
|
||||
// incoming blocks.
|
||||
type blockManager struct {
|
||||
|
@ -84,20 +96,55 @@ type blockManager struct {
|
|||
wg sync.WaitGroup
|
||||
quit chan bool
|
||||
|
||||
headerPool map[btcwire.ShaHash]*headerstr
|
||||
headerOrphan map[btcwire.ShaHash]*headerstr
|
||||
fetchingHeaders bool
|
||||
startBlock *btcwire.ShaHash
|
||||
fetchBlock *btcwire.ShaHash
|
||||
lastBlock *btcwire.ShaHash
|
||||
latestCheckpoint *btcchain.Checkpoint
|
||||
// The following fields are used for headers-first mode.
|
||||
headersFirstMode bool
|
||||
headerList *list.List
|
||||
startHeader *list.Element
|
||||
nextCheckpoint *btcchain.Checkpoint
|
||||
}
|
||||
|
||||
type headerstr struct {
|
||||
header *btcwire.BlockHeader
|
||||
next *headerstr
|
||||
height int
|
||||
sha btcwire.ShaHash
|
||||
// resetHeaderState sets the headers-first mode state to values appropriate for
|
||||
// syncing from a new peer.
|
||||
func (b *blockManager) resetHeaderState(newestHash *btcwire.ShaHash, newestHeight int64) {
|
||||
b.headersFirstMode = false
|
||||
b.headerList.Init()
|
||||
b.startHeader = nil
|
||||
|
||||
// When there is a next checkpoint, add an entry for the latest known
|
||||
// block into the header pool. This allows the next downloaded header
|
||||
// to prove it links to the chain properly.
|
||||
if b.nextCheckpoint != nil {
|
||||
node := headerNode{height: newestHeight, sha: newestHash}
|
||||
b.headerList.PushBack(&node)
|
||||
}
|
||||
}
|
||||
|
||||
// findNextHeaderCheckpoint returns the next checkpoint after the passed height.
|
||||
// It returns nil when there is not one either because the height is already
|
||||
// later than the final checkpoint or some other reason such as disabled
|
||||
// checkpoints.
|
||||
func (b *blockManager) findNextHeaderCheckpoint(height int64) *btcchain.Checkpoint {
|
||||
checkpoints := b.blockChain.Checkpoints()
|
||||
if checkpoints == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// There is no next checkpoint if the height is already after the final
|
||||
// checkpoint.
|
||||
finalCheckpoint := &checkpoints[len(checkpoints)-1]
|
||||
if height >= finalCheckpoint.Height {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Find the next checkpoint.
|
||||
nextCheckpoint := finalCheckpoint
|
||||
for i := len(checkpoints) - 2; i >= 0; i-- {
|
||||
if height >= checkpoints[i].Height {
|
||||
break
|
||||
}
|
||||
nextCheckpoint = &checkpoints[i]
|
||||
}
|
||||
return nextCheckpoint
|
||||
}
|
||||
|
||||
// startSync will choose the best peer among the available candidate peers to
|
||||
|
@ -151,14 +198,31 @@ func (b *blockManager) startSync(peers *list.List) {
|
|||
bmgrLog.Infof("Syncing to block height %d from peer %v",
|
||||
bestPeer.lastBlock, bestPeer.addr)
|
||||
|
||||
// if starting from the beginning fetch headers and download
|
||||
// blocks based on that, otherwise compute the block download
|
||||
// via inv messages. Regression test mode does not support the
|
||||
// headers-first approach so do normal block downloads when in
|
||||
// regression test mode.
|
||||
if height == 0 && !cfg.RegressionTest && !cfg.DisableCheckpoints {
|
||||
bestPeer.PushGetHeadersMsg(locator)
|
||||
b.fetchingHeaders = true
|
||||
// When the current height is less than a known checkpoint we
|
||||
// can use block headers to learn about which blocks comprise
|
||||
// the chain up to the checkpoint and perform less validation
|
||||
// for them. This is possible since each header contains the
|
||||
// hash of the previous header and a merkle root. Therefore if
|
||||
// we validate all of the received headers link together
|
||||
// properly and the checkpoint hashes match, we can be sure the
|
||||
// hashes for the blocks in between are accurate. Further, once
|
||||
// the full blocks are downloaded, the merkle root is computed
|
||||
// and compared against the value in the header which proves the
|
||||
// full block hasn't been tampered with.
|
||||
//
|
||||
// Once we have passed the final checkpoint, or checkpoints are
|
||||
// disabled, use standard inv messages learn about the blocks
|
||||
// and fully validate them. Finally, regression test mode does
|
||||
// not support the headers-first approach so do normal block
|
||||
// downloads when in regression test mode.
|
||||
if b.nextCheckpoint != nil && height < b.nextCheckpoint.Height &&
|
||||
!cfg.RegressionTest && !cfg.DisableCheckpoints {
|
||||
|
||||
bestPeer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
|
||||
b.headersFirstMode = true
|
||||
bmgrLog.Infof("Downloading headers for blocks %d to "+
|
||||
"%d from peer %s", height+1,
|
||||
b.nextCheckpoint.Height, bestPeer.addr)
|
||||
} else {
|
||||
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
|
||||
}
|
||||
|
@ -249,17 +313,22 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) {
|
|||
}
|
||||
|
||||
// Attempt to find a new peer to sync from if the quitting peer is the
|
||||
// sync peer.
|
||||
// sync peer. Also, reset the headers-first state if in headers-first
|
||||
// mode so
|
||||
if b.syncPeer != nil && b.syncPeer == p {
|
||||
if b.fetchingHeaders {
|
||||
b.fetchingHeaders = false
|
||||
b.startBlock = nil
|
||||
b.fetchBlock = nil
|
||||
b.lastBlock = nil
|
||||
b.headerPool = make(map[btcwire.ShaHash]*headerstr)
|
||||
b.headerOrphan = make(map[btcwire.ShaHash]*headerstr)
|
||||
}
|
||||
b.syncPeer = nil
|
||||
if b.headersFirstMode {
|
||||
// This really shouldn't fail. We have a fairly
|
||||
// unrecoverable database issue if it does.
|
||||
newestHash, height, err := b.server.db.NewestSha()
|
||||
if err != nil {
|
||||
bmgrLog.Warnf("Unable to obtain latest "+
|
||||
"block information from the database: "+
|
||||
"%v", err)
|
||||
return
|
||||
}
|
||||
b.resetHeaderState(newestHash, height)
|
||||
}
|
||||
b.startSync(peers)
|
||||
}
|
||||
}
|
||||
|
@ -277,7 +346,7 @@ func (b *blockManager) logBlockHeight(block *btcutil.Block) {
|
|||
return
|
||||
}
|
||||
|
||||
// Truncated the duration to 10s of milliseconds.
|
||||
// Truncate the duration to 10s of milliseconds.
|
||||
durationMillis := int64(duration / time.Millisecond)
|
||||
tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
|
||||
|
||||
|
@ -364,16 +433,6 @@ func (b *blockManager) current() bool {
|
|||
|
||||
// handleBlockMsg handles block messages from all peers.
|
||||
func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
||||
defer func() {
|
||||
if b.startBlock != nil &&
|
||||
len(bmsg.peer.requestedBlocks) < 10 {
|
||||
|
||||
// block queue getting short, ask for more.
|
||||
b.fetchHeaderBlocks()
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
// If we didn't ask for this block then the peer is misbehaving.
|
||||
blockSha, _ := bmsg.block.Sha()
|
||||
if _, ok := bmsg.peer.requestedBlocks[*blockSha]; !ok {
|
||||
|
@ -394,21 +453,26 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
// handler can request the parent blocks from the appropriate peer.
|
||||
b.blockPeer[*blockSha] = bmsg.peer
|
||||
|
||||
// When in headers-first mode, if the block matches the hash of the
|
||||
// first header in the list of headers that are being fetched, it's
|
||||
// eligible for less validation since the headers have already been
|
||||
// verified to link together and are valid up to the next checkpoint.
|
||||
// Also, remove the list entry for all blocks except the checkpoint
|
||||
// since it is needed to verify the next round of headers links
|
||||
// properly.
|
||||
isCheckpointBlock := false
|
||||
fastAdd := false
|
||||
if b.fetchBlock != nil && blockSha.IsEqual(b.fetchBlock) {
|
||||
firstblock, ok := b.headerPool[*blockSha]
|
||||
if ok {
|
||||
if b.latestCheckpoint == nil {
|
||||
b.latestCheckpoint =
|
||||
b.blockChain.LatestCheckpoint()
|
||||
}
|
||||
if int64(firstblock.height) <=
|
||||
b.latestCheckpoint.Height {
|
||||
|
||||
if b.headersFirstMode {
|
||||
firstNodeEl := b.headerList.Front()
|
||||
if firstNodeEl != nil {
|
||||
firstNode := firstNodeEl.Value.(*headerNode)
|
||||
if blockSha.IsEqual(firstNode.sha) {
|
||||
fastAdd = true
|
||||
}
|
||||
if firstblock.next != nil {
|
||||
b.fetchBlock = &firstblock.next.sha
|
||||
if firstNode.sha.IsEqual(b.nextCheckpoint.Hash) {
|
||||
isCheckpointBlock = true
|
||||
} else {
|
||||
b.headerList.Remove(firstNodeEl)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -419,16 +483,6 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
delete(bmsg.peer.requestedBlocks, *blockSha)
|
||||
delete(b.requestedBlocks, *blockSha)
|
||||
|
||||
if fastAdd && blockSha.IsEqual(b.lastBlock) {
|
||||
// have processed all blocks, switch to normal handling
|
||||
b.fetchingHeaders = false
|
||||
b.startBlock = nil
|
||||
b.fetchBlock = nil
|
||||
b.lastBlock = nil
|
||||
b.headerPool = make(map[btcwire.ShaHash]*headerstr)
|
||||
b.headerOrphan = make(map[btcwire.ShaHash]*headerstr)
|
||||
}
|
||||
|
||||
// Process the block to include validation, best chain selection, orphan
|
||||
// handling, etc.
|
||||
err := b.blockChain.ProcessBlock(bmsg.block, fastAdd)
|
||||
|
@ -456,116 +510,202 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
|
|||
|
||||
// Sync the db to disk.
|
||||
b.server.db.Sync()
|
||||
|
||||
// Nothing more to do if we aren't in headers-first mode.
|
||||
if !b.headersFirstMode {
|
||||
return
|
||||
}
|
||||
|
||||
// This is headers-first mode, so if the block is not a checkpoint
|
||||
// request more blocks using the header list when the request queue is
|
||||
// getting short.
|
||||
if !isCheckpointBlock {
|
||||
if b.startHeader != nil &&
|
||||
len(bmsg.peer.requestedBlocks) < minInFlightBlocks {
|
||||
b.fetchHeaderBlocks()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// This is headers-first mode and the block is a checkpoint. When
|
||||
// there is a next checkpoint, get the next round of headers by asking
|
||||
// for headers starting from the block after this one up to the next
|
||||
// checkpoint.
|
||||
prevHeight := b.nextCheckpoint.Height
|
||||
prevHash := b.nextCheckpoint.Hash
|
||||
b.nextCheckpoint = b.findNextHeaderCheckpoint(prevHeight)
|
||||
if b.nextCheckpoint != nil {
|
||||
locator := btcchain.BlockLocator{prevHash}
|
||||
err := bmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
|
||||
if err != nil {
|
||||
bmgrLog.Warnf("Failed to send getheaders message to "+
|
||||
"peer %s: %v", bmsg.peer.addr, err)
|
||||
return
|
||||
}
|
||||
bmgrLog.Infof("Downloading headers for blocks %d to %d from "+
|
||||
"peer %s", prevHeight+1, b.nextCheckpoint.Height,
|
||||
b.syncPeer.addr)
|
||||
return
|
||||
}
|
||||
|
||||
// This is headers-first mode, the block is a checkpoint, and there are
|
||||
// no more checkpoints, so switch to normal mode by requesting blocks
|
||||
// from the block after this one up to the end of the chain (zero hash).
|
||||
b.headersFirstMode = false
|
||||
b.headerList.Init()
|
||||
bmgrLog.Infof("Reached the final checkpoint -- switching to normal mode")
|
||||
locator := btcchain.BlockLocator{blockSha}
|
||||
err = bmsg.peer.PushGetBlocksMsg(locator, &zeroHash)
|
||||
if err != nil {
|
||||
bmgrLog.Warnf("Failed to send getblocks message to peer %s: %v",
|
||||
bmsg.peer.addr, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// fetchHeaderBlocks is creates and sends a request to the syncPeer for
|
||||
// the next list of blocks to be downloaded.
|
||||
// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
|
||||
// list of blocks to be downloaded based on the current list of headers.
|
||||
func (b *blockManager) fetchHeaderBlocks() {
|
||||
gdmsg := btcwire.NewMsgGetDataSizeHint(btcwire.MaxInvPerMsg)
|
||||
// Nothing to do if there is not start header.
|
||||
if b.startHeader == nil {
|
||||
bmgrLog.Warnf("fetchHeaderBlocks called with no start header")
|
||||
return
|
||||
}
|
||||
|
||||
// Build up a getdata request for the list of blocks the headers
|
||||
// describe. The size hint will be limited to btcwire.MaxInvPerMsg by
|
||||
// the function, so no need to double check it here.
|
||||
gdmsg := btcwire.NewMsgGetDataSizeHint(uint(b.headerList.Len()))
|
||||
numRequested := 0
|
||||
startBlock := b.startBlock
|
||||
for {
|
||||
if b.startBlock == nil {
|
||||
break
|
||||
}
|
||||
blockHash := b.startBlock
|
||||
firstblock, ok := b.headerPool[*blockHash]
|
||||
for e := b.startHeader; e != nil; e = e.Next() {
|
||||
node, ok := e.Value.(*headerNode)
|
||||
if !ok {
|
||||
bmgrLog.Warnf("current fetch block %v missing from headerPool", blockHash)
|
||||
break
|
||||
bmgrLog.Warn("Header list node type is not a headerNode")
|
||||
continue
|
||||
}
|
||||
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, blockHash)
|
||||
|
||||
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, node.sha)
|
||||
if !b.haveInventory(iv) {
|
||||
b.requestedBlocks[*blockHash] = true
|
||||
b.syncPeer.requestedBlocks[*blockHash] = true
|
||||
b.requestedBlocks[*node.sha] = true
|
||||
b.syncPeer.requestedBlocks[*node.sha] = true
|
||||
gdmsg.AddInvVect(iv)
|
||||
numRequested++
|
||||
}
|
||||
|
||||
if b.fetchBlock == nil {
|
||||
b.fetchBlock = b.startBlock
|
||||
}
|
||||
if firstblock.next == nil {
|
||||
b.startBlock = nil
|
||||
break
|
||||
} else {
|
||||
b.startBlock = &firstblock.next.sha
|
||||
}
|
||||
|
||||
b.startHeader = e.Next()
|
||||
if numRequested >= btcwire.MaxInvPerMsg {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(gdmsg.InvList) > 0 {
|
||||
bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList))
|
||||
|
||||
b.syncPeer.QueueMessage(gdmsg, nil)
|
||||
}
|
||||
}
|
||||
|
||||
// handleHeadersMsghandles headers messages from all peers.
|
||||
func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) {
|
||||
msg := bmsg.headers
|
||||
func (b *blockManager) handleHeadersMsg(hmsg *headersMsg) {
|
||||
// The remote peer is misbehaving if we didn't request headers.
|
||||
msg := hmsg.headers
|
||||
numHeaders := len(msg.Headers)
|
||||
if !b.headersFirstMode {
|
||||
bmgrLog.Warnf("Got %d unrequested headers from %s -- "+
|
||||
"disconnecting", numHeaders, hmsg.peer.addr)
|
||||
hmsg.peer.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
nheaders := len(msg.Headers)
|
||||
if nheaders == 0 {
|
||||
// Nothing to do for an empty headers message.
|
||||
if numHeaders == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Process all of the received headers ensuring each one connects to the
|
||||
// previous and that checkpoints match.
|
||||
receivedCheckpoint := false
|
||||
var finalHash *btcwire.ShaHash
|
||||
for _, blockHeader := range msg.Headers {
|
||||
blockHash, err := blockHeader.BlockSha()
|
||||
if err != nil {
|
||||
bmgrLog.Warnf("Failed to compute hash of header "+
|
||||
"received from peer %s -- disconnecting",
|
||||
hmsg.peer.addr)
|
||||
hmsg.peer.Disconnect()
|
||||
return
|
||||
}
|
||||
finalHash = &blockHash
|
||||
|
||||
// Ensure there is a previous header to compare against.
|
||||
prevNodeEl := b.headerList.Back()
|
||||
if prevNodeEl == nil {
|
||||
bmgrLog.Warnf("Header list does not contain a previous" +
|
||||
"element as expected -- disconnecting peer")
|
||||
hmsg.peer.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure the header properly connects to the previous one and
|
||||
// add it to the list of headers.
|
||||
node := headerNode{sha: &blockHash}
|
||||
prevNode := prevNodeEl.Value.(*headerNode)
|
||||
if prevNode.sha.IsEqual(&blockHeader.PrevBlock) {
|
||||
node.height = prevNode.height + 1
|
||||
e := b.headerList.PushBack(&node)
|
||||
if b.startHeader == nil {
|
||||
b.startHeader = e
|
||||
}
|
||||
} else {
|
||||
bmgrLog.Warnf("Received block header that does not"+
|
||||
"properly connect to the chain from peer %s "+
|
||||
"-- disconnecting", hmsg.peer.addr)
|
||||
hmsg.peer.Disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
// Verify the header at the next checkpoint height matches.
|
||||
if node.height == b.nextCheckpoint.Height {
|
||||
if node.sha.IsEqual(b.nextCheckpoint.Hash) {
|
||||
receivedCheckpoint = true
|
||||
bmgrLog.Infof("Verified downloaded block "+
|
||||
"header against checkpoint at height "+
|
||||
"%d/hash %s", node.height, node.sha)
|
||||
} else {
|
||||
bmgrLog.Warnf("Block header at height %d/hash "+
|
||||
"%s from peer %s does NOT match "+
|
||||
"expected checkpoint hash of %s -- "+
|
||||
"disconnecting", node.height,
|
||||
node.sha, hmsg.peer.addr,
|
||||
b.nextCheckpoint.Hash)
|
||||
hmsg.peer.Disconnect()
|
||||
return
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// When this header is a checkpoint, switch to fetching the blocks for
|
||||
// all of the headers since the last checkpoint.
|
||||
if receivedCheckpoint {
|
||||
// Since the first entry of the list is always the final block
|
||||
// that is already in the database and is only used to ensure
|
||||
// the next header links properly, it must be removed before
|
||||
// fetching the blocks.
|
||||
b.headerList.Remove(b.headerList.Front())
|
||||
bmgrLog.Infof("Received %v block headers: Fetching blocks",
|
||||
len(b.headerPool))
|
||||
b.headerList.Len())
|
||||
b.lastBlockLogTime = time.Now()
|
||||
b.fetchHeaderBlocks()
|
||||
return
|
||||
}
|
||||
var blockhash btcwire.ShaHash
|
||||
|
||||
if b.latestCheckpoint == nil {
|
||||
b.latestCheckpoint = b.blockChain.LatestCheckpoint()
|
||||
}
|
||||
|
||||
for hdridx := range msg.Headers {
|
||||
blockhash, _ = msg.Headers[hdridx].BlockSha()
|
||||
var headerst headerstr
|
||||
headerst.header = msg.Headers[hdridx]
|
||||
headerst.sha = blockhash
|
||||
prev, ok := b.headerPool[headerst.header.PrevBlock]
|
||||
if ok {
|
||||
if prev.next == nil {
|
||||
prev.next = &headerst
|
||||
} else {
|
||||
bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash)
|
||||
}
|
||||
headerst.height = prev.height + 1
|
||||
} else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) {
|
||||
ok = true
|
||||
headerst.height = 1
|
||||
b.startBlock = &headerst.sha
|
||||
}
|
||||
if int64(headerst.height) == b.latestCheckpoint.Height {
|
||||
if headerst.sha.IsEqual(b.latestCheckpoint.Hash) {
|
||||
// we can trust this header first download
|
||||
// TODO flag this?
|
||||
} else {
|
||||
// XXX marker does not match, must throw
|
||||
// away headers !?!?!
|
||||
// XXX dont trust peer?
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
b.headerPool[blockhash] = &headerst
|
||||
b.lastBlock = &blockhash
|
||||
} else {
|
||||
bmgrLog.Infof("found orphan block %v", blockhash)
|
||||
b.headerOrphan[headerst.header.PrevBlock] = &headerst
|
||||
}
|
||||
}
|
||||
|
||||
// Construct the getheaders request and queue it to be sent.
|
||||
ghmsg := btcwire.NewMsgGetHeaders()
|
||||
err := ghmsg.AddBlockLocatorHash(&blockhash)
|
||||
// This header is not a checkpoint, so request the next batch of
|
||||
// headers starting from the latest known header and ending with the
|
||||
// next checkpoint.
|
||||
locator := btcchain.BlockLocator{finalHash}
|
||||
err := hmsg.peer.PushGetHeadersMsg(locator, b.nextCheckpoint.Hash)
|
||||
if err != nil {
|
||||
bmgrLog.Infof("msgheaders bad addheaders", blockhash)
|
||||
bmgrLog.Warnf("Failed to send getheaders message to "+
|
||||
"peer %s: %v", hmsg.peer.addr, err)
|
||||
return
|
||||
}
|
||||
|
||||
b.syncPeer.QueueMessage(ghmsg, nil)
|
||||
}
|
||||
|
||||
// haveInventory returns whether or not the inventory represented by the passed
|
||||
|
@ -632,12 +772,9 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
|
|||
// for the peer.
|
||||
imsg.peer.AddKnownInventory(iv)
|
||||
|
||||
if b.fetchingHeaders {
|
||||
// if we are fetching headers and already know
|
||||
// about a block, do not add process it.
|
||||
if _, ok := b.headerPool[iv.Hash]; ok {
|
||||
continue
|
||||
}
|
||||
// Ignore inventory when we're in headers-first mode.
|
||||
if b.headersFirstMode {
|
||||
continue
|
||||
}
|
||||
|
||||
// Request the inventory if we don't already have it.
|
||||
|
@ -921,8 +1058,8 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) {
|
|||
// QueueHeaders adds the passed headers message and peer to the block handling
|
||||
// queue.
|
||||
func (b *blockManager) QueueHeaders(headers *btcwire.MsgHeaders, p *peer) {
|
||||
// No channel handling here because peers do not need to block on inv
|
||||
// messages.
|
||||
// No channel handling here because peers do not need to block on
|
||||
// headers messages.
|
||||
if atomic.LoadInt32(&b.shutdown) != 0 {
|
||||
return
|
||||
}
|
||||
|
@ -970,6 +1107,11 @@ func (b *blockManager) Stop() error {
|
|||
// newBlockManager returns a new bitcoin block manager.
|
||||
// Use Start to begin processing asynchronous block and inv updates.
|
||||
func newBlockManager(s *server) (*blockManager, error) {
|
||||
newestHash, height, err := s.db.NewestSha()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bm := blockManager{
|
||||
server: s,
|
||||
blockPeer: make(map[btcwire.ShaHash]*peer),
|
||||
|
@ -977,19 +1119,24 @@ func newBlockManager(s *server) (*blockManager, error) {
|
|||
requestedBlocks: make(map[btcwire.ShaHash]bool),
|
||||
lastBlockLogTime: time.Now(),
|
||||
msgChan: make(chan interface{}, cfg.MaxPeers*3),
|
||||
headerPool: make(map[btcwire.ShaHash]*headerstr),
|
||||
headerOrphan: make(map[btcwire.ShaHash]*headerstr),
|
||||
headerList: list.New(),
|
||||
quit: make(chan bool),
|
||||
}
|
||||
bm.blockChain = btcchain.New(s.db, s.btcnet, bm.handleNotifyMsg)
|
||||
bm.blockChain.DisableCheckpoints(cfg.DisableCheckpoints)
|
||||
if cfg.DisableCheckpoints {
|
||||
if !cfg.DisableCheckpoints {
|
||||
// Initialize the next checkpoint based on the current height.
|
||||
bm.nextCheckpoint = bm.findNextHeaderCheckpoint(height)
|
||||
if bm.nextCheckpoint != nil {
|
||||
bm.resetHeaderState(newestHash, height)
|
||||
}
|
||||
} else {
|
||||
bmgrLog.Info("Checkpoints are disabled")
|
||||
}
|
||||
|
||||
bmgrLog.Infof("Generating initial block node index. This may " +
|
||||
"take a while...")
|
||||
err := bm.blockChain.GenerateInitialIndex()
|
||||
err = bm.blockChain.GenerateInitialIndex()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
14
peer.go
14
peer.go
|
@ -151,6 +151,8 @@ type peer struct {
|
|||
retryCount int64
|
||||
prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager
|
||||
prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager
|
||||
prevGetHdrsBegin *btcwire.ShaHash // owned by blockmanager
|
||||
prevGetHdrsStop *btcwire.ShaHash // owned by blockmanager
|
||||
requestQueue *list.List
|
||||
continueHash *btcwire.ShaHash
|
||||
outputQueue chan outMsg
|
||||
|
@ -458,7 +460,7 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire
|
|||
|
||||
// PushGetHeadersMsg sends a getblocks message for the provided block locator
|
||||
// and stop hash. It will ignore back-to-back duplicate requests.
|
||||
func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error {
|
||||
func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
|
||||
// Extract the begin hash from the block locator, if one was specified,
|
||||
// to use for filtering duplicate getheaders requests.
|
||||
var beginHash *btcwire.ShaHash
|
||||
|
@ -467,9 +469,9 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error {
|
|||
}
|
||||
|
||||
// Filter duplicate getheaders requests.
|
||||
if p.prevGetBlocksBegin != nil &&
|
||||
beginHash != nil &&
|
||||
beginHash.IsEqual(p.prevGetBlocksBegin) {
|
||||
if p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil &&
|
||||
beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) &&
|
||||
beginHash.IsEqual(p.prevGetHdrsBegin) {
|
||||
|
||||
peerLog.Tracef("PEER: Filtering duplicate [getheaders] with "+
|
||||
"begin hash %v", beginHash)
|
||||
|
@ -478,6 +480,7 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error {
|
|||
|
||||
// Construct the getheaders request and queue it to be sent.
|
||||
msg := btcwire.NewMsgGetHeaders()
|
||||
msg.HashStop = *stopHash
|
||||
for _, hash := range locator {
|
||||
err := msg.AddBlockLocatorHash(hash)
|
||||
if err != nil {
|
||||
|
@ -488,7 +491,8 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error {
|
|||
|
||||
// Update the previous getheaders request information for filtering
|
||||
// duplicates.
|
||||
p.prevGetBlocksBegin = beginHash
|
||||
p.prevGetHdrsBegin = beginHash
|
||||
p.prevGetHdrsStop = stopHash
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue