diff --git a/blockmanager.go b/blockmanager.go index 388d590e..55239167 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -47,6 +47,13 @@ type invMsg struct { peer *peer } +// blockMsg packages a bitcoin block message and the peer it came from together +// so the block handler has access to that information. +type headersMsg struct { + headers *btcwire.MsgHeaders + peer *peer +} + // donePeerMsg signifies a newly disconnected peer to the block handler. type donePeerMsg struct { peer *peer @@ -77,6 +84,21 @@ type blockManager struct { msgChan chan interface{} wg sync.WaitGroup quit chan bool + + headerPool map[btcwire.ShaHash]*headerstr + headerOrphan map[btcwire.ShaHash]*headerstr + fetchingHeaders bool + startBlock *btcwire.ShaHash + fetchBlock *btcwire.ShaHash + lastBlock *btcwire.ShaHash + latestCheckpoint *btcchain.Checkpoint +} + +type headerstr struct { + header *btcwire.BlockHeader + next *headerstr + height int + sha btcwire.ShaHash } // startSync will choose the best peer among the available candidate peers to @@ -129,7 +151,17 @@ func (b *blockManager) startSync(peers *list.List) { bmgrLog.Infof("Syncing to block height %d from peer %v", bestPeer.lastBlock, bestPeer.addr) - bestPeer.PushGetBlocksMsg(locator, &zeroHash) + + // if starting from the beginning fetch headers and + // download blocks based on that, otherwise compute + // the block download via inv messages. + + if height == 0 { + bestPeer.PushGetHeadersMsg(locator) + b.fetchingHeaders = true + } else { + bestPeer.PushGetBlocksMsg(locator, &zeroHash) + } b.syncPeer = bestPeer } else { bmgrLog.Warnf("No sync peer candidates available") @@ -331,6 +363,16 @@ func (b *blockManager) current() bool { // handleBlockMsg handles block messages from all peers. func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { + + defer func() { + if b.startBlock != nil && + len(bmsg.peer.requestedBlocks) < 10 { + + // block queue getting short, ask for more. + b.fetchHeaderBlocks() + } + + }() // Keep track of which peer the block was sent from so the notification // handler can request the parent blocks from the appropriate peer. blockSha, _ := bmsg.block.Sha() @@ -351,9 +393,37 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { } b.blockPeer[*blockSha] = bmsg.peer + fastAdd := false + if b.fetchBlock != nil && blockSha.IsEqual(b.fetchBlock) { + firstblock, ok := b.headerPool[*blockSha] + if ok { + if b.latestCheckpoint == nil { + b.latestCheckpoint = + b.blockChain.LatestCheckpoint() + } + if int64(firstblock.height) <= + b.latestCheckpoint.Height { + + fastAdd = true + } + if firstblock.next != nil { + b.fetchBlock = &firstblock.next.sha + } + } + } // Process the block to include validation, best chain selection, orphan // handling, etc. - err := b.blockChain.ProcessBlock(bmsg.block) + err := b.blockChain.ProcessBlock(bmsg.block, fastAdd) + + if fastAdd && blockSha.IsEqual(b.lastBlock) { + // have processed all blocks, switch to normal handling + b.fetchingHeaders = false + b.startBlock = nil + b.fetchBlock = nil + b.lastBlock = nil + b.headerPool = make(map[btcwire.ShaHash]*headerstr) + b.headerOrphan = make(map[btcwire.ShaHash]*headerstr) + } // Remove block from request maps. Either chain knows about it and such // we shouldn't have any more instances of trying to fetch it, or we @@ -459,6 +529,14 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // for the peer. imsg.peer.addKnownInventory(iv) + if b.fetchingHeaders { + // if we are fetching headers and already know + // about a block, do not add process it. + if _, ok := b.headerPool[iv.Hash]; ok { + continue + } + } + // Request the inventory if we don't already have it. if !b.haveInventory(iv) { // Add it to the request queue. @@ -574,6 +652,9 @@ out: case *invMsg: b.handleInvMsg(msg) + case *headersMsg: + b.handleHeadersMsg(msg) + case *donePeerMsg: b.handleDonePeerMsg(candidatePeers, msg.peer) @@ -734,6 +815,17 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { b.msgChan <- &invMsg{inv: inv, peer: p} } +// QueueInv adds the passed headers message and peer to the block handling queue. +func (b *blockManager) QueueHeaders(headers *btcwire.MsgHeaders, p *peer) { + // No channel handling here because peers do not need to block on inv + // messages. + if atomic.LoadInt32(&b.shutdown) != 0 { + return + } + + b.msgChan <- &headersMsg{headers: headers, peer: p} +} + // DonePeer informs the blockmanager that a peer has disconnected. func (b *blockManager) DonePeer(p *peer) { // Ignore if we are shutting down. @@ -781,6 +873,8 @@ func newBlockManager(s *server) (*blockManager, error) { requestedBlocks: make(map[btcwire.ShaHash]bool), lastBlockLogTime: time.Now(), msgChan: make(chan interface{}, cfg.MaxPeers*3), + headerPool: make(map[btcwire.ShaHash]*headerstr), + headerOrphan: make(map[btcwire.ShaHash]*headerstr), quit: make(chan bool), } bm.blockChain = btcchain.New(s.db, s.btcnet, bm.handleNotifyMsg) @@ -927,3 +1021,116 @@ func loadBlockDB() (btcdb.Db, error) { btcdLog.Infof("Block database loaded with block height %d", height) return db, nil } + +// handleHeadersMsg is invoked when a peer receives a headers bitcoin +// message. +func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) { + msg := bmsg.headers + + nheaders := len(msg.Headers) + if nheaders == 0 { + bmgrLog.Infof("Received %v0 block headers: Fetching blocks", + len(b.headerPool)) + b.fetchHeaderBlocks() + return + } + var blockhash btcwire.ShaHash + + if b.latestCheckpoint == nil { + b.latestCheckpoint = b.blockChain.LatestCheckpoint() + } + + for hdridx := range msg.Headers { + blockhash, _ = msg.Headers[hdridx].BlockSha() + var headerst headerstr + headerst.header = msg.Headers[hdridx] + headerst.sha = blockhash + prev, ok := b.headerPool[headerst.header.PrevBlock] + if ok { + if prev.next == nil { + prev.next = &headerst + } else { + bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash) + } + headerst.height = prev.height + 1 + } else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) { + ok = true + headerst.height = 1 + b.startBlock = &headerst.sha + } + if int64(headerst.height) == b.latestCheckpoint.Height { + if headerst.sha.IsEqual(b.latestCheckpoint.Hash) { + // we can trust this header first download + // TODO flag this? + } else { + // XXX marker does not match, must throw + // away headers !?!?! + // XXX dont trust peer? + } + } + if ok { + b.headerPool[blockhash] = &headerst + b.lastBlock = &blockhash + } else { + bmgrLog.Infof("found orphan block %v", blockhash) + b.headerOrphan[headerst.header.PrevBlock] = &headerst + } + } + + // Construct the getheaders request and queue it to be sent. + ghmsg := btcwire.NewMsgGetHeaders() + err := ghmsg.AddBlockLocatorHash(&blockhash) + if err != nil { + bmgrLog.Infof("msgheaders bad addheaders", blockhash) + return + } + + b.syncPeer.QueueMessage(ghmsg, nil) +} + +// fetchHeaderBlocks is creates and sends a request to the syncPeer for +// the next list of blocks to downloaded. +func (b *blockManager) fetchHeaderBlocks() { + gdmsg := btcwire.NewMsgGetData() + numRequested := 0 + startBlock := b.startBlock + for { + if b.startBlock == nil { + break + } + blockhash := b.startBlock + firstblock, ok := b.headerPool[*blockhash] + if !ok { + bmgrLog.Warnf("current fetch block %v missing from headerPool", blockhash) + break + } + var iv btcwire.InvVect + iv.Hash = *blockhash + iv.Type = btcwire.InvTypeBlock + if !b.haveInventory(&iv) { + b.requestedBlocks[*blockhash] = true + b.syncPeer.requestedBlocks[*blockhash] = true + gdmsg.AddInvVect(&iv) + numRequested++ + } + + if b.fetchBlock == nil { + b.fetchBlock = b.startBlock + } + if firstblock.next == nil { + b.startBlock = nil + break + } else { + b.startBlock = &firstblock.next.sha + } + + if numRequested >= btcwire.MaxInvPerMsg { + break + } + } + if len(gdmsg.InvList) > 0 { + bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList)) + + b.syncPeer.QueueMessage(gdmsg, nil) + } +} diff --git a/peer.go b/peer.go index 02312e9a..4defb51d 100644 --- a/peer.go +++ b/peer.go @@ -431,6 +431,43 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire return nil } +// PushGetHeadersMsg sends a getblocks message for the provided block locator +// and stop hash. It will ignore back-to-back duplicate requests. +func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { + // Extract the begin hash from the block locator, if one was specified, + // to use for filtering duplicate getblocks requests. + // request. + var beginHash *btcwire.ShaHash + if len(locator) > 0 { + beginHash = locator[0] + } + + // Filter duplicate getblocks requests. + if p.prevGetBlocksBegin != nil && + beginHash != nil && + beginHash.IsEqual(p.prevGetBlocksBegin) { + + peerLog.Tracef("PEER: Filtering duplicate [getblocks] with begin "+ + "hash %v", beginHash) + return nil + } + + // Construct the getheaders request and queue it to be sent. + msg := btcwire.NewMsgGetHeaders() + for _, hash := range locator { + err := msg.AddBlockLocatorHash(hash) + if err != nil { + return err + } + } + p.QueueMessage(msg, nil) + + // Update the previous getblocks request information for filtering + // duplicates. + p.prevGetBlocksBegin = beginHash + return nil +} + // handleMemPoolMsg is invoked when a peer receives a mempool bitcoin message. // It creates and sends an inventory message with the contents of the memory // pool up to the maximum inventory allowed per message. @@ -520,6 +557,14 @@ func (p *peer) handleInvMsg(msg *btcwire.MsgInv) { p.server.blockManager.QueueInv(msg, p) } +// handleHeadersMsg is invoked when a peer receives an inv bitcoin message and +// is used to examine the inventory being advertised by the remote peer and +// react accordingly. We pass the message down to blockmanager which will call +// QueueMessage with any appropriate responses. +func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) { + p.server.blockManager.QueueHeaders(msg, p) +} + // handleGetData is invoked when a peer receives a getdata bitcoin message and // is used to deliver block and transaction information. func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { @@ -1044,6 +1089,9 @@ out: case *btcwire.MsgGetHeaders: p.handleGetHeadersMsg(msg) + case *btcwire.MsgHeaders: + p.handleHeadersMsg(msg) + default: peerLog.Debugf("Received unhandled message of type %v: Fix Me", rmsg.Command())