Misc cleanup.

This commit contains various code cleanup such as comment fixes and
function ordering consistency.
This commit is contained in:
Dave Collins 2014-01-28 18:53:25 -06:00
parent 112525bd7a
commit 970c0cdb30
3 changed files with 137 additions and 141 deletions

View file

@ -372,7 +372,6 @@ func (b *blockManager) current() bool {
// handleBlockMsg handles block messages from all peers. // handleBlockMsg handles block messages from all peers.
func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
defer func() { defer func() {
if b.startBlock != nil && if b.startBlock != nil &&
len(bmsg.peer.requestedBlocks) < 10 { len(bmsg.peer.requestedBlocks) < 10 {
@ -382,11 +381,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
} }
}() }()
// Keep track of which peer the block was sent from so the notification
// handler can request the parent blocks from the appropriate peer.
blockSha, _ := bmsg.block.Sha()
// If we didn't ask for this block then the peer is misbehaving. // If we didn't ask for this block then the peer is misbehaving.
blockSha, _ := bmsg.block.Sha()
if _, ok := bmsg.peer.requestedBlocks[*blockSha]; !ok { if _, ok := bmsg.peer.requestedBlocks[*blockSha]; !ok {
// The regression test intentionally sends some blocks twice // The regression test intentionally sends some blocks twice
// to test duplicate block insertion fails. Don't disconnect // to test duplicate block insertion fails. Don't disconnect
@ -400,6 +397,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
return return
} }
} }
// Keep track of which peer the block was sent from so the notification
// handler can request the parent blocks from the appropriate peer.
b.blockPeer[*blockSha] = bmsg.peer b.blockPeer[*blockSha] = bmsg.peer
fastAdd := false fastAdd := false
@ -420,9 +420,12 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
} }
} }
} }
// Process the block to include validation, best chain selection, orphan
// handling, etc. // Remove block from request maps. Either chain will know about it and
err := b.blockChain.ProcessBlock(bmsg.block, fastAdd) // so we shouldn't have any more instances of trying to fetch it, or we
// will fail the insert and thus we'll retry next time we get an inv.
delete(bmsg.peer.requestedBlocks, *blockSha)
delete(b.requestedBlocks, *blockSha)
if fastAdd && blockSha.IsEqual(b.lastBlock) { if fastAdd && blockSha.IsEqual(b.lastBlock) {
// have processed all blocks, switch to normal handling // have processed all blocks, switch to normal handling
@ -434,12 +437,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
b.headerOrphan = make(map[btcwire.ShaHash]*headerstr) b.headerOrphan = make(map[btcwire.ShaHash]*headerstr)
} }
// Remove block from request maps. Either chain knows about it and such // Process the block to include validation, best chain selection, orphan
// we shouldn't have any more instances of trying to fetch it, or we // handling, etc.
// failed to insert and thus we'll retry next time we get an inv. err := b.blockChain.ProcessBlock(bmsg.block, fastAdd)
delete(bmsg.peer.requestedBlocks, *blockSha)
delete(b.requestedBlocks, *blockSha)
if err != nil { if err != nil {
delete(b.blockPeer, *blockSha) delete(b.blockPeer, *blockSha)
@ -474,6 +474,116 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
b.server.db.Sync() b.server.db.Sync()
} }
// fetchHeaderBlocks is creates and sends a request to the syncPeer for
// the next list of blocks to be downloaded.
func (b *blockManager) fetchHeaderBlocks() {
gdmsg := btcwire.NewMsgGetDataSizeHint(btcwire.MaxInvPerMsg)
numRequested := 0
startBlock := b.startBlock
for {
if b.startBlock == nil {
break
}
blockHash := b.startBlock
firstblock, ok := b.headerPool[*blockHash]
if !ok {
bmgrLog.Warnf("current fetch block %v missing from headerPool", blockHash)
break
}
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, blockHash)
if !b.haveInventory(iv) {
b.requestedBlocks[*blockHash] = true
b.syncPeer.requestedBlocks[*blockHash] = true
gdmsg.AddInvVect(iv)
numRequested++
}
if b.fetchBlock == nil {
b.fetchBlock = b.startBlock
}
if firstblock.next == nil {
b.startBlock = nil
break
} else {
b.startBlock = &firstblock.next.sha
}
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
if len(gdmsg.InvList) > 0 {
bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList))
b.syncPeer.QueueMessage(gdmsg, nil)
}
}
// handleHeadersMsghandles headers messages from all peers.
func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) {
msg := bmsg.headers
nheaders := len(msg.Headers)
if nheaders == 0 {
bmgrLog.Infof("Received %v block headers: Fetching blocks",
len(b.headerPool))
b.fetchHeaderBlocks()
return
}
var blockhash btcwire.ShaHash
if b.latestCheckpoint == nil {
b.latestCheckpoint = b.blockChain.LatestCheckpoint()
}
for hdridx := range msg.Headers {
blockhash, _ = msg.Headers[hdridx].BlockSha()
var headerst headerstr
headerst.header = msg.Headers[hdridx]
headerst.sha = blockhash
prev, ok := b.headerPool[headerst.header.PrevBlock]
if ok {
if prev.next == nil {
prev.next = &headerst
} else {
bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash)
}
headerst.height = prev.height + 1
} else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) {
ok = true
headerst.height = 1
b.startBlock = &headerst.sha
}
if int64(headerst.height) == b.latestCheckpoint.Height {
if headerst.sha.IsEqual(b.latestCheckpoint.Hash) {
// we can trust this header first download
// TODO flag this?
} else {
// XXX marker does not match, must throw
// away headers !?!?!
// XXX dont trust peer?
}
}
if ok {
b.headerPool[blockhash] = &headerst
b.lastBlock = &blockhash
} else {
bmgrLog.Infof("found orphan block %v", blockhash)
b.headerOrphan[headerst.header.PrevBlock] = &headerst
}
}
// Construct the getheaders request and queue it to be sent.
ghmsg := btcwire.NewMsgGetHeaders()
err := ghmsg.AddBlockLocatorHash(&blockhash)
if err != nil {
bmgrLog.Infof("msgheaders bad addheaders", blockhash)
return
}
b.syncPeer.QueueMessage(ghmsg, nil)
}
// haveInventory returns whether or not the inventory represented by the passed // haveInventory returns whether or not the inventory represented by the passed
// inventory vector is known. This includes checking all of the various places // inventory vector is known. This includes checking all of the various places
// inventory can be when it is in different states such as blocks that are part // inventory can be when it is in different states such as blocks that are part
@ -824,7 +934,8 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) {
b.msgChan <- &invMsg{inv: inv, peer: p} b.msgChan <- &invMsg{inv: inv, peer: p}
} }
// QueueInv adds the passed headers message and peer to the block handling queue. // QueueHeaders adds the passed headers message and peer to the block handling
// queue.
func (b *blockManager) QueueHeaders(headers *btcwire.MsgHeaders, p *peer) { func (b *blockManager) QueueHeaders(headers *btcwire.MsgHeaders, p *peer) {
// No channel handling here because peers do not need to block on inv // No channel handling here because peers do not need to block on inv
// messages. // messages.
@ -1056,116 +1167,3 @@ func loadBlockDB() (btcdb.Db, error) {
btcdLog.Infof("Block database loaded with block height %d", height) btcdLog.Infof("Block database loaded with block height %d", height)
return db, nil return db, nil
} }
// handleHeadersMsg is invoked when a peer receives a headers bitcoin
// message.
func (b *blockManager) handleHeadersMsg(bmsg *headersMsg) {
msg := bmsg.headers
nheaders := len(msg.Headers)
if nheaders == 0 {
bmgrLog.Infof("Received %v block headers: Fetching blocks",
len(b.headerPool))
b.fetchHeaderBlocks()
return
}
var blockhash btcwire.ShaHash
if b.latestCheckpoint == nil {
b.latestCheckpoint = b.blockChain.LatestCheckpoint()
}
for hdridx := range msg.Headers {
blockhash, _ = msg.Headers[hdridx].BlockSha()
var headerst headerstr
headerst.header = msg.Headers[hdridx]
headerst.sha = blockhash
prev, ok := b.headerPool[headerst.header.PrevBlock]
if ok {
if prev.next == nil {
prev.next = &headerst
} else {
bmgrLog.Infof("two children of the same block ??? %v %v %v", prev.sha, prev.next.sha, blockhash)
}
headerst.height = prev.height + 1
} else if headerst.header.PrevBlock.IsEqual(activeNetParams.genesisHash) {
ok = true
headerst.height = 1
b.startBlock = &headerst.sha
}
if int64(headerst.height) == b.latestCheckpoint.Height {
if headerst.sha.IsEqual(b.latestCheckpoint.Hash) {
// we can trust this header first download
// TODO flag this?
} else {
// XXX marker does not match, must throw
// away headers !?!?!
// XXX dont trust peer?
}
}
if ok {
b.headerPool[blockhash] = &headerst
b.lastBlock = &blockhash
} else {
bmgrLog.Infof("found orphan block %v", blockhash)
b.headerOrphan[headerst.header.PrevBlock] = &headerst
}
}
// Construct the getheaders request and queue it to be sent.
ghmsg := btcwire.NewMsgGetHeaders()
err := ghmsg.AddBlockLocatorHash(&blockhash)
if err != nil {
bmgrLog.Infof("msgheaders bad addheaders", blockhash)
return
}
b.syncPeer.QueueMessage(ghmsg, nil)
}
// fetchHeaderBlocks is creates and sends a request to the syncPeer for
// the next list of blocks to downloaded.
func (b *blockManager) fetchHeaderBlocks() {
gdmsg := btcwire.NewMsgGetDataSizeHint(btcwire.MaxInvPerMsg)
numRequested := 0
startBlock := b.startBlock
for {
if b.startBlock == nil {
break
}
blockhash := b.startBlock
firstblock, ok := b.headerPool[*blockhash]
if !ok {
bmgrLog.Warnf("current fetch block %v missing from headerPool", blockhash)
break
}
var iv btcwire.InvVect
iv.Hash = *blockhash
iv.Type = btcwire.InvTypeBlock
if !b.haveInventory(&iv) {
b.requestedBlocks[*blockhash] = true
b.syncPeer.requestedBlocks[*blockhash] = true
gdmsg.AddInvVect(&iv)
numRequested++
}
if b.fetchBlock == nil {
b.fetchBlock = b.startBlock
}
if firstblock.next == nil {
b.startBlock = nil
break
} else {
b.startBlock = &firstblock.next.sha
}
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
if len(gdmsg.InvList) > 0 {
bmgrLog.Debugf("requesting block %v len %v\n", startBlock, len(gdmsg.InvList))
b.syncPeer.QueueMessage(gdmsg, nil)
}
}

View file

@ -64,6 +64,7 @@ func btcdMain(serverChan chan<- *server) error {
return err return err
} }
pprof.StartCPUProfile(f) pprof.StartCPUProfile(f)
defer f.Close()
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }

23
peer.go
View file

@ -460,20 +460,19 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire
// and stop hash. It will ignore back-to-back duplicate requests. // and stop hash. It will ignore back-to-back duplicate requests.
func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error { func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error {
// Extract the begin hash from the block locator, if one was specified, // Extract the begin hash from the block locator, if one was specified,
// to use for filtering duplicate getblocks requests. // to use for filtering duplicate getheaders requests.
// request.
var beginHash *btcwire.ShaHash var beginHash *btcwire.ShaHash
if len(locator) > 0 { if len(locator) > 0 {
beginHash = locator[0] beginHash = locator[0]
} }
// Filter duplicate getblocks requests. // Filter duplicate getheaders requests.
if p.prevGetBlocksBegin != nil && if p.prevGetBlocksBegin != nil &&
beginHash != nil && beginHash != nil &&
beginHash.IsEqual(p.prevGetBlocksBegin) { beginHash.IsEqual(p.prevGetBlocksBegin) {
peerLog.Tracef("PEER: Filtering duplicate [getblocks] with begin "+ peerLog.Tracef("PEER: Filtering duplicate [getheaders] with "+
"hash %v", beginHash) "begin hash %v", beginHash)
return nil return nil
} }
@ -487,7 +486,7 @@ func (p *peer) PushGetHeadersMsg(locator btcchain.BlockLocator) error {
} }
p.QueueMessage(msg, nil) p.QueueMessage(msg, nil)
// Update the previous getblocks request information for filtering // Update the previous getheaders request information for filtering
// duplicates. // duplicates.
p.prevGetBlocksBegin = beginHash p.prevGetBlocksBegin = beginHash
return nil return nil
@ -584,10 +583,8 @@ func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
p.server.blockManager.QueueInv(msg, p) p.server.blockManager.QueueInv(msg, p)
} }
// handleHeadersMsg is invoked when a peer receives an inv bitcoin message and // handleHeadersMsg is invoked when a peer receives a headers bitcoin message.
// is used to examine the inventory being advertised by the remote peer and // The message is passed down to the block manager.
// react accordingly. We pass the message down to blockmanager which will call
// QueueMessage with any appropriate responses.
func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) { func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) {
p.server.blockManager.QueueHeaders(msg, p) p.server.blockManager.QueueHeaders(msg, p)
} }
@ -1138,6 +1135,9 @@ out:
p.handleInvMsg(msg) p.handleInvMsg(msg)
markConnected = true markConnected = true
case *btcwire.MsgHeaders:
p.handleHeadersMsg(msg)
case *btcwire.MsgNotFound: case *btcwire.MsgNotFound:
// TODO(davec): Ignore this for now, but ultimately // TODO(davec): Ignore this for now, but ultimately
// it should probably be used to detect when something // it should probably be used to detect when something
@ -1154,9 +1154,6 @@ out:
case *btcwire.MsgGetHeaders: case *btcwire.MsgGetHeaders:
p.handleGetHeadersMsg(msg) p.handleGetHeadersMsg(msg)
case *btcwire.MsgHeaders:
p.handleHeadersMsg(msg)
default: default:
peerLog.Debugf("Received unhandled message of type %v: Fix Me", peerLog.Debugf("Received unhandled message of type %v: Fix Me",
rmsg.Command()) rmsg.Command())