Rework getblocks handling.

This commit reworks the getblocks handling a bit to clean it up and match
the reference implementation handling.  In particular, it adds monitoring
for when peers request the final block advertised from a previous
getblocks message and automatically avertises the latest known block
inventory to trigger the peer to send another getblocks message.
This commit is contained in:
Dave Collins 2013-09-05 01:20:48 -05:00
parent 83a9bbd4dd
commit cf7438a646

85
peer.go
View file

@ -94,6 +94,7 @@ type peer struct {
knownAddresses map[string]bool knownAddresses map[string]bool
lastBlock int32 lastBlock int32
requestQueue *list.List requestQueue *list.List
continueHash *btcwire.ShaHash
wg sync.WaitGroup wg sync.WaitGroup
outputQueue chan btcwire.Message outputQueue chan btcwire.Message
blockProcessed chan bool blockProcessed chan bool
@ -269,6 +270,22 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error {
return err return err
} }
p.QueueMessage(blk.MsgBlock()) p.QueueMessage(blk.MsgBlock())
// When the peer requests the final block that was advertised in
// response to a getblocks message which requested more blocks than
// would fit into a single message, send it a new inventory message
// to trigger it to issue another getblocks message for the next
// batch of inventory.
if p.continueHash != nil && p.continueHash.IsEqual(&sha) {
hash, _, err := p.server.db.NewestSha()
if err == nil {
invMsg := btcwire.NewMsgInv()
iv := btcwire.NewInvVect(btcwire.InvVect_Block, hash)
invMsg.AddInvVect(iv)
p.QueueMessage(invMsg)
p.continueHash = nil
}
}
return nil return nil
} }
@ -410,27 +427,24 @@ out:
// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message. // handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message.
func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) { func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
var err error
startIdx := int64(0)
endIdx := btcdb.AllShas
// Return all block hashes to the latest one (up to max per message) if // Return all block hashes to the latest one (up to max per message) if
// no stop hash was specified. // no stop hash was specified.
// Attempt to find the ending index of the stop hash if specified. // Attempt to find the ending index of the stop hash if specified.
endIdx := btcdb.AllShas
if !msg.HashStop.IsEqual(&zeroHash) { if !msg.HashStop.IsEqual(&zeroHash) {
block, err := p.server.db.FetchBlockBySha(&msg.HashStop) block, err := p.server.db.FetchBlockBySha(&msg.HashStop)
if err != nil { if err == nil {
// Fetch all if we dont recognize the stop hash. endIdx = block.Height() + 1
endIdx = btcdb.AllShas
} }
endIdx = block.Height()
} }
// TODO(davec): This should have some logic to utilize the additional // Find the most recent known block based on the block locator.
// locator hashes to ensure the proper chain. // Use the block after the genesis block if no other blocks in the
// provided locator are known. This does mean the client will start
// over with the genesis block if unknown block locators are provided.
// This mirrors the behavior in the reference implementation.
startIdx := int64(1)
for _, hash := range msg.BlockLocatorHashes { for _, hash := range msg.BlockLocatorHashes {
// TODO(drahn) does using the caching interface make sense
// on index lookups ?
block, err := p.server.db.FetchBlockBySha(hash) block, err := p.server.db.FetchBlockBySha(hash)
if err == nil { if err == nil {
// Start with the next hash since we know this one. // Start with the next hash since we know this one.
@ -440,29 +454,55 @@ func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
} }
// Don't attempt to fetch more than we can put into a single message. // Don't attempt to fetch more than we can put into a single message.
autoContinue := false
if endIdx-startIdx > btcwire.MaxBlocksPerMsg { if endIdx-startIdx > btcwire.MaxBlocksPerMsg {
endIdx = startIdx + btcwire.MaxBlocksPerMsg endIdx = startIdx + btcwire.MaxBlocksPerMsg
autoContinue = true
} }
// Generate inventory message.
//
// The FetchBlockBySha call is limited to a maximum number of hashes
// per invocation. Since the maximum number of inventory per message
// might be larger, call it multiple times with the appropriate indices
// as needed.
invMsg := btcwire.NewMsgInv()
for start := startIdx; start < endIdx; {
// Fetch the inventory from the block database. // Fetch the inventory from the block database.
hashList, err := p.server.db.FetchHeightRange(startIdx, endIdx) hashList, err := p.server.db.FetchHeightRange(start, endIdx)
if err != nil { if err != nil {
log.Warnf(" lookup returned %v ", err) log.Warnf("[PEER] Block lookup failed: %v", err)
return return
} }
// Nothing to send. // The database did not return any further hashes. Break out of
// the loop now.
if len(hashList) == 0 { if len(hashList) == 0 {
return break
} }
// Generate inventory vectors and push the inventory message. // Add block inventory to the message.
inv := btcwire.NewMsgInv()
for _, hash := range hashList { for _, hash := range hashList {
iv := btcwire.InvVect{Type: btcwire.InvVect_Block, Hash: hash} hashCopy := hash
inv.AddInvVect(&iv) iv := btcwire.NewInvVect(btcwire.InvVect_Block, &hashCopy)
invMsg.AddInvVect(iv)
}
start += int64(len(hashList))
}
// Send the inventory message if there is anything to send.
if len(invMsg.InvList) > 0 {
invListLen := len(invMsg.InvList)
if autoContinue && invListLen == btcwire.MaxBlocksPerMsg {
// Intentionally use a copy of the final hash so there
// is not a reference into the inventory slice which
// would prevent the entire slice from being eligible
// for GC as soon as it's sent.
continueHash := invMsg.InvList[invListLen-1].Hash
p.continueHash = &continueHash
}
p.QueueMessage(invMsg)
} }
p.QueueMessage(inv)
} }
// handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin // handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin
@ -495,9 +535,10 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
} }
// Find the most recent known block based on the block locator. // Find the most recent known block based on the block locator.
// It's the block after the genesis block if no other blocks in the // Use the block after the genesis block if no other blocks in the
// provided locator are known. This does mean the client will start // provided locator are known. This does mean the client will start
// over with the genesis block if unknown block locators are provided. // over with the genesis block if unknown block locators are provided.
// This mirrors the behavior in the reference implementation.
startIdx := int64(1) startIdx := int64(1)
for _, hash := range msg.BlockLocatorHashes { for _, hash := range msg.BlockLocatorHashes {
block, err := p.server.db.FetchBlockBySha(hash) block, err := p.server.db.FetchBlockBySha(hash)