Rework block fetching code.

This commit significantly reworks the fetching code to interop better with
bitcoind.  In particular, when an inventory message is sent, and the
remote peer requests the final block, the remote peer sends the current
end of the main chain to signal that there are more blocks to get.

Previously this code was automatically requesting more blocks when the
number of in-flight blocks was under a certain threshold.  The original
approach does help alleviate delays in the "request final, wait for
orphan, request more" round trip, but due to the aforementioned mechanism,
it leads to double requests and other subtle issues.
This commit is contained in:
Dave Collins 2013-08-29 14:44:43 -05:00
parent f7387f217a
commit 83407ade61
2 changed files with 164 additions and 209 deletions

View file

@ -5,7 +5,6 @@
package main
import (
"container/list"
"github.com/conformal/btcchain"
"github.com/conformal/btcdb"
_ "github.com/conformal/btcdb/sqlite3"
@ -21,12 +20,6 @@ const (
chanBufferSize = 50
)
// inventoryItem is used to track known and requested inventory items.
type inventoryItem struct {
invVect *btcwire.InvVect
peers []*peer
}
// blockMsg packages a bitcoin block message and the peer it came from together
// so the block handler has access to that information.
type blockMsg struct {
@ -34,13 +27,6 @@ type blockMsg struct {
peer *peer
}
// invMsg packages a bitcoin inv message and the peer it came from together
// so the block handler has access to that information.
type invMsg struct {
msg *btcwire.MsgInv
peer *peer
}
// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
@ -49,26 +35,19 @@ type txMsg struct {
}
// blockManager provides a concurrency safe block manager for handling all
// incoming block inventory advertisement as well as issuing requests to
// download needed blocks of the block chain from other peers. It works by
// forcing all incoming block inventory advertisements through a single
// goroutine which then determines whether the block is needed and how the
// requests should be made amongst multiple peers.
// incoming blocks.
type blockManager struct {
server *server
started bool
shutdown bool
blockChain *btcchain.BlockChain
requestQueue *list.List
requestMap map[string]*inventoryItem
outstandingBlocks int
blockPeer map[btcwire.ShaHash]*peer
receivedLogBlocks int64
receivedLogTx int64
lastBlockLogTime time.Time
processingReqs bool
newBlocks chan bool
blockQueue chan *blockMsg
invQueue chan *invMsg
chainNotify chan *btcchain.Notification
wg sync.WaitGroup
quit chan bool
@ -83,7 +62,7 @@ func (b *blockManager) logBlockHeight(numTx, height int64) {
now := time.Now()
duration := now.Sub(b.lastBlockLogTime)
if b.outstandingBlocks != 0 && duration < time.Second*10 {
if duration < time.Second*10 {
return
}
@ -105,103 +84,46 @@ func (b *blockManager) logBlockHeight(numTx, height int64) {
b.lastBlockLogTime = now
}
// handleInvMsg handles inventory messages for all peers. It adds blocks that
// we need along with which peers know about each block to a request queue
// based upon the advertised inventory. It also attempts to strike a balance
// between the number of in-flight blocks and keeping the request queue full
// by issuing more getblocks (MsgGetBlocks) requests as needed.
func (b *blockManager) handleInvMsg(msg *btcwire.MsgInv, p *peer) {
// Find the last block in the inventory list.
invVects := msg.InvList
var lastHash *btcwire.ShaHash
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == btcwire.InvVect_Block {
lastHash = &invVects[i].Hash
break
}
}
for _, iv := range invVects {
switch iv.Type {
case btcwire.InvVect_Block:
// Ignore this block if we already have it.
// TODO(davec): Need to check orphans too.
if b.server.db.ExistsSha(&iv.Hash) {
log.Tracef("[BMGR] Ignoring known block %v.", &iv.Hash)
continue
}
// Add the peer to the list of peers which can serve the block if
// it's already queued to be fetched.
if item, ok := b.requestMap[iv.Hash.String()]; ok {
item.peers = append(item.peers, p)
continue
}
// Add the item to the end of the request queue.
item := &inventoryItem{
invVect: iv,
peers: []*peer{p},
}
b.requestMap[item.invVect.Hash.String()] = item
b.requestQueue.PushBack(item)
b.outstandingBlocks++
case btcwire.InvVect_Tx:
// XXX: Handle transactions here.
}
}
// Request more blocks if there aren't enough in-flight blocks.
if lastHash != nil && b.outstandingBlocks < btcwire.MaxBlocksPerMsg*5 {
stopHash := btcwire.ShaHash{}
gbmsg := btcwire.NewMsgGetBlocks(&stopHash)
gbmsg.AddBlockLocatorHash(lastHash)
p.QueueMessage(gbmsg)
}
}
// handleBlockMsg handles block messages from all peers. It is currently
// very simple. It doesn't validate the block or handle orphans and side
// chains. It simply inserts the block into the database after ensuring the
// previous block is already inserted.
func (b *blockManager) handleBlockMsg(block *btcutil.Block) {
b.outstandingBlocks--
msg := block.MsgBlock()
// handleBlockMsg handles block messages from all peers.
func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Keep track of which peer the block was sent from so the notification
// handler can request the parent blocks from the appropriate peer.
blockSha, _ := bmsg.block.Sha()
b.blockPeer[*blockSha] = bmsg.peer
// Process the block to include validation, best chain selection, orphan
// handling, etc.
err := b.blockChain.ProcessBlock(block)
err := b.blockChain.ProcessBlock(bmsg.block)
if err != nil {
blockSha, err2 := block.Sha()
if err2 != nil {
log.Errorf("[BMGR] %v", err2)
}
delete(b.blockPeer, *blockSha)
log.Warnf("[BMGR] Failed to process block %v: %v", blockSha, err)
return
}
// Don't keep track of the peer that sent the block any longer if it's
// not an orphan.
if !b.blockChain.IsKnownOrphan(blockSha) {
delete(b.blockPeer, *blockSha)
}
// Log info about the new block height.
_, height, err := b.server.db.NewestSha()
if err != nil {
log.Warnf("[BMGR] Failed to obtain latest sha - %v", err)
return
}
b.logBlockHeight(int64(len(msg.Transactions)), height)
b.logBlockHeight(int64(len(bmsg.block.MsgBlock().Transactions)), height)
// Sync the db to disk when there are no more outstanding blocks.
// NOTE: Periodic syncs happen as new data is requested as well.
if b.outstandingBlocks <= 0 {
// Sync the db to disk.
b.server.db.Sync()
}
}
// blockHandler is the main handler for the block manager. It must be run as a
// goroutine. It processes block and inv messages in a separate goroutine from
// the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are handled
// by a single thread without needing to lock memory data structures. This is
// important because the block manager controls which blocks are needed and how
// the fetching should proceed.
// blockHandler is the main handler for the block manager. It must be run
// as a goroutine. It processes block and inv messages in a separate goroutine
// from the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are
// handled by a single thread without needing to lock memory data structures.
// This is important because the block manager controls which blocks are needed
// and how the fetching should proceed.
//
// NOTE: Tx messages need to be handled here too.
// (either that or block and tx need to be handled in separate threads)
@ -211,40 +133,9 @@ out:
select {
// Handle new block messages.
case bmsg := <-b.blockQueue:
b.handleBlockMsg(bmsg.block)
b.handleBlockMsg(bmsg)
bmsg.peer.blockProcessed <- true
// Handle new inventory messages.
case msg := <-b.invQueue:
b.handleInvMsg(msg.msg, msg.peer)
// Request the blocks.
if b.requestQueue.Len() > 0 && !b.processingReqs {
b.processingReqs = true
b.newBlocks <- true
}
case <-b.newBlocks:
numRequested := 0
gdmsg := btcwire.NewMsgGetData()
var p *peer
for e := b.requestQueue.Front(); e != nil; e = b.requestQueue.Front() {
item := e.Value.(*inventoryItem)
p = item.peers[0]
gdmsg.AddInvVect(item.invVect)
delete(b.requestMap, item.invVect.Hash.String())
b.requestQueue.Remove(e)
numRequested++
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
b.server.db.Sync()
if len(gdmsg.InvList) > 0 && p != nil {
p.QueueMessage(gdmsg)
}
b.processingReqs = false
case <-b.quit:
break out
}
@ -256,14 +147,22 @@ out:
// handleNotifyMsg handles notifications from btcchain. Currently it doesn't
// respond to any notifications, but the idea is that it requests missing blocks
// in response to orphan notifications and updates the wallet for blocks
// connected and disconnected to the main chain.
// connected to and disconnected from the main chain.
func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
switch notification.Type {
case btcchain.NTOrphanBlock:
// TODO(davec): Ask the peer to fill in the missing blocks for the
// orphan root if it's not nil.
orphanRoot := notification.Data.(*btcwire.ShaHash)
_ = orphanRoot
if peer, exists := b.blockPeer[*orphanRoot]; exists {
locator, err := b.blockChain.LatestBlockLocator()
if err != nil {
log.Error("[BMGR] Failed to get block locator "+
"for the latest block: %v", err)
break
}
peer.pushGetBlocksMsg(locator, orphanRoot)
delete(b.blockPeer, *orphanRoot)
break
}
case btcchain.NTBlockAccepted:
// TODO(davec): Relay inventory, but don't relay old inventory
@ -300,18 +199,6 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
b.blockQueue <- &bmsg
}
// QueueInv adds the passed inventory message and peer to the inventory handling
// queue.
func (b *blockManager) QueueInv(msg *btcwire.MsgInv, p *peer) {
// Don't accept more inventory if we're shutting down.
if b.shutdown {
return
}
imsg := invMsg{msg: msg, peer: p}
b.invQueue <- &imsg
}
// Start begins the core block handler which processes block and inv messages.
func (b *blockManager) Start() {
// Already started?
@ -342,51 +229,6 @@ func (b *blockManager) Stop() error {
return nil
}
// AddBlockLocators adds block locators to a getblocks message starting with
// the passed hash back to the genesis block hash. In order to keep the list
// of locator hashes to a reasonable number of entries, first it adds the
// most recent 10 block hashes (starting with the passed hash), then doubles the
// step each loop iteration to exponentially decrease the number of hashes the
// further away from head and closer to the genesis block it gets.
func (b *blockManager) AddBlockLocators(hash *btcwire.ShaHash, msg *btcwire.MsgGetBlocks) error {
// XXX(davec): This is fetching the block data too.
block, err := b.server.db.FetchBlockBySha(hash)
if err != nil {
log.Warnf("[BMGR] Lookup of known valid index failed %v", hash)
return err
}
blockIndex := block.Height()
// We want inventory after the passed hash.
msg.AddBlockLocatorHash(hash)
// Generate the block locators according to the algorithm described in
// in the function comment and make sure to leave room for the already
// added hash and final genesis hash.
increment := int64(1)
for i := 1; i < btcwire.MaxBlockLocatorsPerMsg-2; i++ {
if i > 10 {
increment *= 2
}
blockIndex -= increment
if blockIndex <= 1 {
break
}
h, err := b.server.db.FetchBlockShaByHeight(blockIndex)
if err != nil {
// This shouldn't happen and it's ok to ignore, so just
// continue to the next.
log.Warnf("[BMGR] Lookup of known valid index failed %v",
blockIndex)
continue
}
msg.AddBlockLocatorHash(h)
}
msg.AddBlockLocatorHash(&btcwire.GenesisHash)
return nil
}
// newBlockManager returns a new bitcoin block manager.
// Use Start to begin processing asynchronous block and inv updates.
func newBlockManager(s *server) *blockManager {
@ -394,12 +236,10 @@ func newBlockManager(s *server) *blockManager {
bm := blockManager{
server: s,
blockChain: btcchain.New(s.db, s.btcnet, chainNotify),
requestQueue: list.New(),
requestMap: make(map[string]*inventoryItem),
blockPeer: make(map[btcwire.ShaHash]*peer),
lastBlockLogTime: time.Now(),
newBlocks: make(chan bool, 1),
blockQueue: make(chan *blockMsg, chanBufferSize),
invQueue: make(chan *invMsg, chanBufferSize),
chainNotify: chainNotify,
quit: make(chan bool),
}

133
peer.go
View file

@ -6,8 +6,10 @@ package main
import (
"bytes"
"container/list"
"errors"
"fmt"
"github.com/conformal/btcchain"
"github.com/conformal/btcdb"
"github.com/conformal/btcutil"
"github.com/conformal/btcwire"
@ -91,6 +93,7 @@ type peer struct {
versionKnown bool
knownAddresses map[string]bool
lastBlock int32
requestQueue *list.List
wg sync.WaitGroup
outputQueue chan btcwire.Message
blockProcessed chan bool
@ -230,18 +233,20 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
}
// Request latest blocks if the peer has blocks we're interested in.
// XXX: Ask block manager for latest so we get in-flight too...
sha, lastBlock, err := p.server.db.NewestSha()
_, lastBlock, err := p.server.db.NewestSha()
if err != nil {
log.Errorf("[PEER] %v", err)
p.Disconnect()
}
// If the peer has blocks we're interested in.
if p.lastBlock > int32(lastBlock) {
stopHash := btcwire.ShaHash{}
gbmsg := btcwire.NewMsgGetBlocks(&stopHash)
p.server.blockManager.AddBlockLocators(sha, gbmsg)
p.outputQueue <- gbmsg
locator, err := p.server.blockManager.blockChain.LatestBlockLocator()
if err != nil {
log.Error("[PEER] Failed to get block locator for the "+
"latest block: %v", err)
p.Disconnect()
}
p.pushGetBlocksMsg(locator, &zeroHash)
}
// TODO: Relay alerts.
@ -281,6 +286,115 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error {
return nil
}
// pushGetBlocksMsg send a getblocks message for the provided block locator
// and stop hash.
func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
msg := btcwire.NewMsgGetBlocks(stopHash)
for _, hash := range locator {
err := msg.AddBlockLocatorHash(hash)
if err != nil {
return err
}
}
p.QueueMessage(msg)
return nil
}
// handleInvMsg is invoked when a peer receives an inv bitcoin message and is
// used to examine the inventory being advertised by the remote peer and react
// accordingly.
//
// NOTE: This will need to have tx handling added as well when they are
// supported.
func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
// Attempt to find the final block in the inventory list. There may
// not be one.
lastBlock := -1
invVects := msg.InvList
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == btcwire.InvVect_Block {
lastBlock = i
break
}
}
// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
// we already have and request more blocks to prevent them.
chain := p.server.blockManager.blockChain
for i, iv := range invVects {
switch iv.Type {
case btcwire.InvVect_Block:
if !chain.HaveInventory(iv) {
// Add it to the request queue.
p.requestQueue.PushBack(iv)
continue
}
// The block is an orphan block that we already have.
// When the existing orphan was processed, it requested
// the missing parent blocks. When this scenario
// happens, it means there were more blocks missing
// than are allowed into a single inventory message. As
// a result, once this peer requested the final
// advertised block, the remote peer noticed and is now
// resending the orphan block as an available block
// to signal there are more missing blocks that need to
// be requested.
if chain.IsKnownOrphan(&iv.Hash) {
// Request blocks starting at the latest known
// up to the root of the orphan that just came
// in.
orphanRoot := chain.GetOrphanRoot(&iv.Hash)
locator, err := chain.LatestBlockLocator()
if err != nil {
log.Error("[PEER] Failed to get block "+
"locator for the latest block: "+
"%v", err)
continue
}
p.pushGetBlocksMsg(locator, orphanRoot)
continue
}
// We already have the final block advertised by this
// inventory message, so force a request for more. This
// should only really happen if we're on a really long
// side chain.
if i == lastBlock {
// Request blocks after this one up to the
// final one the remote peer knows about (zero
// stop hash).
locator := chain.BlockLocatorFromHash(&iv.Hash)
p.pushGetBlocksMsg(locator, &zeroHash)
}
// Ignore unsupported inventory types.
default:
continue
}
}
// Request as much as possible at once. Anything that won't fit into
// the request will be requested on the next inv message.
numRequested := 0
gdmsg := btcwire.NewMsgGetData()
for e := p.requestQueue.Front(); e != nil; e = p.requestQueue.Front() {
iv := e.Value.(*btcwire.InvVect)
gdmsg.AddInvVect(iv)
p.requestQueue.Remove(e)
numRequested++
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
if len(gdmsg.InvList) > 0 {
p.QueueMessage(gdmsg)
}
}
// handleGetData is invoked when a peer receives a getdata bitcoin message and
// is used to deliver block and transaction information.
func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
@ -340,8 +454,8 @@ func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
}
// Don't attempt to fetch more than we can put into a single message.
if endIdx-startIdx > btcwire.MaxInvPerMsg {
endIdx = startIdx + btcwire.MaxInvPerMsg
if endIdx-startIdx > btcwire.MaxBlocksPerMsg {
endIdx = startIdx + btcwire.MaxBlocksPerMsg
}
// Fetch the inventory from the block database.
@ -677,7 +791,7 @@ out:
<-p.blockProcessed
case *btcwire.MsgInv:
p.server.blockManager.QueueInv(msg, p)
p.handleInvMsg(msg)
case *btcwire.MsgGetData:
p.handleGetDataMsg(msg)
@ -793,6 +907,7 @@ func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer {
inbound: inbound,
persistent: persistent,
knownAddresses: make(map[string]bool),
requestQueue: list.New(),
outputQueue: make(chan btcwire.Message, outputBufferSize),
blockProcessed: make(chan bool, 1),
quit: make(chan bool),