Move the inventory handling from peer into blockmanager.

This removes a horrible case of reach-around from per into the guts of
the blockmaanger to frob the chain. Soon, when we try to deduplicate the
fetching of blocks from multiple peers this will need decisions made in
a central point.

Discussed at length with davec.
This commit is contained in:
Owain G. Ainsworth 2013-09-27 01:41:02 +01:00
parent 790ba87979
commit b97db056c1
2 changed files with 132 additions and 98 deletions

View file

@ -32,6 +32,13 @@ type blockMsg struct {
peer *peer
}
// invMsg packages a bitcoin inv message and the peer it came from together
// so the block handler has access to that information.
type invMsg struct {
inv *btcwire.MsgInv
peer *peer
}
// txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information.
type txMsg struct {
@ -57,6 +64,7 @@ type blockManager struct {
newCandidates chan *peer
donePeers chan *peer
blockQueue chan *blockMsg
invQueue chan *invMsg
chainNotify chan *btcchain.Notification
wg sync.WaitGroup
quit chan bool
@ -106,7 +114,7 @@ func (b *blockManager) startSync(peers *list.List) {
log.Infof("[BMGR] Syncing to block height %d from peer %v",
bestPeer.lastBlock, bestPeer.conn.RemoteAddr())
bestPeer.pushGetBlocksMsg(locator, &zeroHash)
bestPeer.PushGetBlocksMsg(locator, &zeroHash)
b.syncPeer = bestPeer
}
@ -248,6 +256,109 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
b.server.db.Sync()
}
// handleInvMsg handles inv messages from all peers.
// We examine the inventory advertised by the remote peer and act accordingly.
//
// NOTE: This will need to have tx handling added as well when they are
// supported.
func (b *blockManager) handleInvMsg(imsg *invMsg) {
// Attempt to find the final block in the inventory list. There may
// not be one.
lastBlock := -1
invVects := imsg.inv.InvList
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == btcwire.InvVect_Block {
lastBlock = i
break
}
}
// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
// we already have and request more blocks to prevent them.
for i, iv := range invVects {
switch iv.Type {
case btcwire.InvVect_Block:
// Add the inventory to the cache of known inventory
// for the peer.
imsg.peer.addKnownInventory(iv)
// Request the inventory if we don't already have it.
if !b.blockChain.HaveInventory(iv) {
// Add it to the request queue.
imsg.peer.requestQueue.PushBack(iv)
continue
}
// The block is an orphan block that we already have.
// When the existing orphan was processed, it requested
// the missing parent blocks. When this scenario
// happens, it means there were more blocks missing
// than are allowed into a single inventory message. As
// a result, once this peer requested the final
// advertised block, the remote peer noticed and is now
// resending the orphan block as an available block
// to signal there are more missing blocks that need to
// be requested.
if b.blockChain.IsKnownOrphan(&iv.Hash) {
// Request blocks starting at the latest known
// up to the root of the orphan that just came
// in.
orphanRoot := b.blockChain.GetOrphanRoot(
&iv.Hash)
locator, err := b.blockChain.LatestBlockLocator()
if err != nil {
log.Errorf("[PEER] Failed to get block "+
"locator for the latest block: "+
"%v", err)
continue
}
imsg.peer.PushGetBlocksMsg(locator, orphanRoot)
continue
}
// We already have the final block advertised by this
// inventory message, so force a request for more. This
// should only really happen if we're on a really long
// side chain.
if i == lastBlock {
// Request blocks after this one up to the
// final one the remote peer knows about (zero
// stop hash).
locator := b.blockChain.BlockLocatorFromHash(
&iv.Hash)
imsg.peer.PushGetBlocksMsg(locator, &zeroHash)
}
// Ignore unsupported inventory types.
default:
continue
}
}
// Request as much as possible at once. Anything that won't fit into
// the request will be requested on the next inv message.
numRequested := 0
gdmsg := btcwire.NewMsgGetData()
for e := imsg.peer.requestQueue.Front(); e != nil; e = imsg.peer.requestQueue.Front() {
iv := e.Value.(*btcwire.InvVect)
gdmsg.AddInvVect(iv)
imsg.peer.requestQueue.Remove(e)
// check that no one else has asked for this
// put on global ``requested'' map
// put on local ``requested'' map
numRequested++
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
if len(gdmsg.InvList) > 0 {
imsg.peer.QueueMessage(gdmsg)
}
}
// blockHandler is the main handler for the block manager. It must be run
// as a goroutine. It processes block and inv messages in a separate goroutine
// from the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are
@ -265,6 +376,8 @@ out:
case bmsg := <-b.blockQueue:
b.handleBlockMsg(bmsg)
bmsg.peer.blockProcessed <- true
case imsg := <-b.invQueue:
b.handleInvMsg(imsg)
case <-b.quit:
break out
@ -294,7 +407,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) {
"for the latest block: %v", err)
break
}
peer.pushGetBlocksMsg(locator, orphanRoot)
peer.PushGetBlocksMsg(locator, orphanRoot)
delete(b.blockPeer, *orphanRoot)
break
} else {
@ -349,6 +462,18 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
b.blockQueue <- &bmsg
}
// QueueInv adds the passed inv message and peer to the block handling queue.
func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) {
// No channel handlign ehre because peers do not need to block on inv
// messages.
if b.shutdown {
return
}
imsg := invMsg{inv: inv, peer: p}
b.invQueue <- &imsg
}
// Start begins the core block handler which processes block and inv messages.
func (b *blockManager) Start() {
// Already started?
@ -393,6 +518,7 @@ func newBlockManager(s *server) *blockManager {
newCandidates: make(chan *peer, cfg.MaxPeers),
donePeers: make(chan *peer, cfg.MaxPeers),
blockQueue: make(chan *blockMsg, chanBufferSize),
invQueue: make(chan *invMsg, chanBufferSize),
chainNotify: chainNotify,
quit: make(chan bool),
}

100
peer.go
View file

@ -345,7 +345,7 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error {
// pushGetBlocksMsg sends a getblocks message for the provided block locator
// and stop hash. It will ignore back-to-back duplicate requests.
func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error {
p.prevGetBlockMutex.Lock()
defer p.prevGetBlockMutex.Unlock()
@ -418,102 +418,10 @@ func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) {
// handleInvMsg is invoked when a peer receives an inv bitcoin message and is
// used to examine the inventory being advertised by the remote peer and react
// accordingly.
//
// NOTE: This will need to have tx handling added as well when they are
// supported.
// accordingly. We pass the message down to blockmanager which will call
// PushMessage with any appropraite responses.
func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
// Attempt to find the final block in the inventory list. There may
// not be one.
lastBlock := -1
invVects := msg.InvList
for i := len(invVects) - 1; i >= 0; i-- {
if invVects[i].Type == btcwire.InvVect_Block {
lastBlock = i
break
}
}
// Request the advertised inventory if we don't already have it. Also,
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
// we already have and request more blocks to prevent them.
chain := p.server.blockManager.blockChain
for i, iv := range invVects {
switch iv.Type {
case btcwire.InvVect_Block:
// Add the inventory to the cache of known inventory
// for the peer.
p.addKnownInventory(iv)
// Request the inventory if we don't already have it.
if !chain.HaveInventory(iv) {
// Add it to the request queue.
p.requestQueue.PushBack(iv)
continue
}
// The block is an orphan block that we already have.
// When the existing orphan was processed, it requested
// the missing parent blocks. When this scenario
// happens, it means there were more blocks missing
// than are allowed into a single inventory message. As
// a result, once this peer requested the final
// advertised block, the remote peer noticed and is now
// resending the orphan block as an available block
// to signal there are more missing blocks that need to
// be requested.
if chain.IsKnownOrphan(&iv.Hash) {
// Request blocks starting at the latest known
// up to the root of the orphan that just came
// in.
orphanRoot := chain.GetOrphanRoot(&iv.Hash)
locator, err := chain.LatestBlockLocator()
if err != nil {
log.Errorf("[PEER] Failed to get block "+
"locator for the latest block: "+
"%v", err)
continue
}
p.pushGetBlocksMsg(locator, orphanRoot)
continue
}
// We already have the final block advertised by this
// inventory message, so force a request for more. This
// should only really happen if we're on a really long
// side chain.
if i == lastBlock {
// Request blocks after this one up to the
// final one the remote peer knows about (zero
// stop hash).
locator := chain.BlockLocatorFromHash(&iv.Hash)
p.pushGetBlocksMsg(locator, &zeroHash)
}
// Ignore unsupported inventory types.
default:
continue
}
}
// Request as much as possible at once. Anything that won't fit into
// the request will be requested on the next inv message.
numRequested := 0
gdmsg := btcwire.NewMsgGetData()
for e := p.requestQueue.Front(); e != nil; e = p.requestQueue.Front() {
iv := e.Value.(*btcwire.InvVect)
gdmsg.AddInvVect(iv)
p.requestQueue.Remove(e)
numRequested++
if numRequested >= btcwire.MaxInvPerMsg {
break
}
}
if len(gdmsg.InvList) > 0 {
p.QueueMessage(gdmsg)
}
p.server.blockManager.QueueInv(msg, p)
}
// handleGetData is invoked when a peer receives a getdata bitcoin message and