Implement a fast path for the Initial Block Download.

It is not necessary to do all of the transaction validation on
blocks if they have been confirmed to be in the block chain leading
up to the final checkpoint in a given blockschain.

This algorithm fetches block headers from the peer, then once it has
established the full blockchain connection, it requests blocks.
Any blocks before the final checkpoint pass true for fastAdd on
btcchain operation, which causes it to do less valiation on the block.
This commit is contained in:
Dale Rahn 2013-11-15 18:16:51 -05:00
parent c34ab6a95e
commit 7b406dcb0f
2 changed files with 257 additions and 2 deletions

View file

@ -47,6 +47,13 @@ type invMsg struct {
peer *peer
}
// blockMsg packages a bitcoin block message and the peer it came from together
// so the block handler has access to that information.
type headersMsg struct {
headers *btcwire.MsgHeaders
peer *peer
}
// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *peer
@ -77,6 +84,21 @@ type blockManager struct {
msgChan chan interface{}
wg sync.WaitGroup
quit chan bool
headerPool map[btcwire.ShaHash]*headerstr
headerOrphan map[btcwire.ShaHash]*headerstr
fetchingHeaders bool
startBlock *btcwire.ShaHash
fetchBlock *btcwire.ShaHash
lastBlock *btcwire.ShaHash
latestCheckpoint *btcchain.Checkpoint
}
type headerstr struct {
header *btcwire.BlockHeader
next *headerstr
height int
sha btcwire.ShaHash
}
// startSync will choose the best peer among the available candidate peers to
@ -129,7 +151,17 @@ func (b *blockManager) startSync(peers *list.List) {
bmgrLog.Infof("Syncing to block height %d from peer %v",
bestPeer.lastBlock, bestPeer.addr)
// if starting from the beginning fetch headers and
// download blocks based on that, otherwise compute
// the block download via inv messages.
if height == 0 {
bestPeer.PushGetHeadersMsg(locator)
b.fetchingHeaders = true
} else {
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
}
b.syncPeer = bestPeer
} else {
bmgrLog.Warnf("No sync peer candidates available")
@ -331,6 +363,16 @@ func (b *blockManager) current() bool {
// handleBlockMsg handles block messages from all peers.
func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
defer func() {
if b.startBlock != nil &&
len(bmsg.peer.requestedBlocks) < 10 {
// block queue getting short, ask for more.
b.fetchHeaderBlocks()
}
}()
// Keep track of which peer the block was sent from so the notification
// handler can request the parent blocks from the appropriate peer.
blockSha, _ := bmsg.block.Sha()
@ -351,9 +393,37 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
}
b.blockPeer[*blockSha] = bmsg.peer
fastAdd := false
if b.fetchBlock != nil && blockSha.IsEqual(b.fetchBlock) {
firstblock, ok := b.headerPool[*blockSha]
if ok {
if b.latestCheckpoint == nil {
b.latestCheckpoint =
b.blockChain.LatestCheckpoint()
}
if int64(firstblock.height) <=
b.latestCheckpoint.Height {
fastAdd = true
}
if firstblock.next != nil {
b.fetchBlock = &firstblock.next.sha
}
}
}
// Process the block to include validation, best chain selection, orphan
// handling, etc.
err := b.blockChain.ProcessBlock(bmsg.block)
err := b.blockChain.ProcessBlock(bmsg.block, fastAdd)
if fastAdd && blockSha.IsEqual(b.lastBlock) {
// have processed all blocks, switch to normal handling
b.fetchingHeaders = false
b.startBlock = nil
b.fetchBlock = nil
b.lastBlock = nil
b.headerPool = make(map[btcwire.ShaHash]*headerstr)
b.headerOrphan = make(map[btcwire.ShaHash]*headerstr)
}
// Remove block from request maps. Either chain knows about it and such
// we shouldn't have any more instances of trying to fetch it, or we
@ -459,6 +529,14 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// for the peer.
imsg.peer.addKnownInventory(iv)
if b.fetchingHeaders {
// if we are fetching headers and already know
// about a block, do not add process it.
if _, ok := b.headerPool[iv.Hash]; ok {
continue
}
}
// Request the inventory if we don't already have it.
if !b.haveInventory(iv) {
// Add it to the request queue.
@ -574,6 +652,9 @@ out:
case *invMsg:
b.handleInvMsg(msg)
case *headersMsg:
b.handleHeadersMsg(msg)
case *donePeerMsg:
b.handleDonePeerMsg(candidatePeers, msg.peer)
@ -734,6 +815,17 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) {
b.msgChan <- &invMsg{inv: inv, peer: p}
}
// QueueInv adds the passed headers message and peer to the block handling queue.
func (b *blockManager) QueueHeaders(headers *btcwire.MsgHeaders, p *peer) {
// No channel handling here because peers do not need to block on inv
// messages.
if atomic.LoadInt32(&b.shutdown) != 0 {
return
}
b.msgChan <- &headersMsg{headers: headers, peer: p}
}
// DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(p *peer) {
// Ignore if we are shutting down.
@ -781,6 +873,8 @@ func newBlockManager(s *server) (*blockManager, error) {
requestedBlocks: make(map[btcwire.ShaHash]bool),
lastBlockLogTime: time.Now(),
msgChan: make(chan interface{}, cfg.MaxPeers*3),
headerPool: make(map[btcwire.ShaHash]*headerstr),
headerOrphan: make(map[btcwire.ShaHash]*headerstr),
quit: make(chan bool),
}
bm.blockChain = btcchain.New(s.db, s.btcnet, bm.handleNotifyMsg)
@ -927,3 +1021,116 @@ func loadBlockDB() (btcdb.Db, error) {
btcdLog.Infof("Block database loaded with block height %d", height)
return db, nil
}
// handleHeadersMsg is invoked when a peer receives a headers bitcoin
// message.
func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) {
msg := bmsg.headers
nheaders := len(msg.Headers)
if nheaders == 0 {
bmgrLog.Infof("Received %v0 block headers: Fetching blocks",
len(b.headerPool))
b.fetchHeaderBlocks()
return
}
var blockhash btcwire.ShaHash
if b.latestCheckpoint == nil {
b.latestCheckpoint = b.blockChain.LatestCheckpoint()
}
for hdridx := range msg.Headers {
blockhash, _ = msg.Headers[hdridx].BlockSha()
var headerst headerstr
headerst.header = msg.Headers[hdridx]
headerst.sha = blockhash
prev, ok := b.headerPool[headerst.header.PrevBlock]
if ok {
if prev.next == nil {
prev.next = &headerst
} else {
bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash)
}
headerst.height = prev.height + 1
} else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) {
ok = true
headerst.height = 1
b.startBlock = &headerst.sha
}
if int64(headerst.height) == b.latestCheckpoint.Height {
if headerst.sha.IsEqual(b.latestCheckpoint.Hash) {
// we can trust this header first download
// TODO flag this?
} else {
// XXX marker does not match, must throw
// away headers !?!?!
// XXX dont trust peer?
}
}
if ok {
b.headerPool[blockhash] = &headerst
b.lastBlock = &blockhash
} else {
bmgrLog.Infof("found orphan block %v", blockhash)
b.headerOrphan[headerst.header.PrevBlock] = &headerst
}
}
// Construct the getheaders request and queue it to be sent.
ghmsg := btcwire.NewMsgGetHeaders()
err := ghmsg.AddBlockLocatorHash(&blockhash)
if err != nil {
bmgrLog.Infof("msgheaders bad addheaders", blockhash)
return
}
b.syncPeer.QueueMessage(ghmsg, nil)
}
// fetchHeaderBlocks is creates and sends a request to the syncPeer for
// the next list of blocks to downloaded.
func (b *blockManager) fetchHeaderBlocks() {
gdmsg := btcwire.NewMsgGetData()
numRequested := 0
startBlock := b.startBlock
for {
if b.startBlock == nil {
break
}
blockhash := b.startBlock
firstblock, ok := b.headerPool[*blockhash]
if !ok {
bmgrLog.Warnf("current fetch block %v missing from headerPool", blockhash)
break
}
var iv btcwire.InvVect
iv.Hash = *blockhash
iv.Type = btcwire.InvTypeBlock
if !b.haveInventory(&iv) {
b.requestedBlocks[*blockhash] = true
b.syncPeer.requestedBlocks[*blockhash] = true
gdmsg.AddInvVect(&iv)
numRequested++
}
if b.fetchBlock == nil {
b.fetchBlock = b.startBlock
}
if firstblock.next == nil {
b.startBlock = nil
break
} else {
b.startBlock = &firstblock.next.sha
}
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
if len(gdmsg.InvList) > 0 {
bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList))
b.syncPeer.QueueMessage(gdmsg, nil)
}
}

48
peer.go
View file

@ -431,6 +431,43 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire
return nil
}
// PushGetHeadersMsg sends a getblocks message for the provided block locator
// and stop hash. It will ignore back-to-back duplicate requests.
func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error {
// Extract the begin hash from the block locator, if one was specified,
// to use for filtering duplicate getblocks requests.
// request.
var beginHash *btcwire.ShaHash
if len(locator) > 0 {
beginHash = locator[0]
}
// Filter duplicate getblocks requests.
if p.prevGetBlocksBegin != nil &&
beginHash != nil &&
beginHash.IsEqual(p.prevGetBlocksBegin) {
peerLog.Tracef("PEER: Filtering duplicate [getblocks] with begin "+
"hash %v", beginHash)
return nil
}
// Construct the getheaders request and queue it to be sent.
msg := btcwire.NewMsgGetHeaders()
for _, hash := range locator {
err := msg.AddBlockLocatorHash(hash)
if err != nil {
return err
}
}
p.QueueMessage(msg, nil)
// Update the previous getblocks request information for filtering
// duplicates.
p.prevGetBlocksBegin = beginHash
return nil
}
// handleMemPoolMsg is invoked when a peer receives a mempool bitcoin message.
// It creates and sends an inventory message with the contents of the memory
// pool up to the maximum inventory allowed per message.
@ -520,6 +557,14 @@ func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
p.server.blockManager.QueueInv(msg, p)
}
// handleHeadersMsg is invoked when a peer receives an inv bitcoin message and
// is used to examine the inventory being advertised by the remote peer and
// react accordingly. We pass the message down to blockmanager which will call
// QueueMessage with any appropriate responses.
func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) {
p.server.blockManager.QueueHeaders(msg, p)
}
// handleGetData is invoked when a peer receives a getdata bitcoin message and
// is used to deliver block and transaction information.
func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
@ -1044,6 +1089,9 @@ out:
case *btcwire.MsgGetHeaders:
p.handleGetHeadersMsg(msg)
case *btcwire.MsgHeaders:
p.handleHeadersMsg(msg)
default:
peerLog.Debugf("Received unhandled message of type %v: Fix Me",
rmsg.Command())