diff --git a/blockmanager.go b/blockmanager.go index fea9ecff..c10edcc8 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -32,6 +32,13 @@ type blockMsg struct { peer *peer } +// invMsg packages a bitcoin inv message and the peer it came from together +// so the block handler has access to that information. +type invMsg struct { + inv *btcwire.MsgInv + peer *peer +} + // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { @@ -57,6 +64,7 @@ type blockManager struct { newCandidates chan *peer donePeers chan *peer blockQueue chan *blockMsg + invQueue chan *invMsg chainNotify chan *btcchain.Notification wg sync.WaitGroup quit chan bool @@ -106,7 +114,7 @@ func (b *blockManager) startSync(peers *list.List) { log.Infof("[BMGR] Syncing to block height %d from peer %v", bestPeer.lastBlock, bestPeer.conn.RemoteAddr()) - bestPeer.pushGetBlocksMsg(locator, &zeroHash) + bestPeer.PushGetBlocksMsg(locator, &zeroHash) b.syncPeer = bestPeer } @@ -248,6 +256,109 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { b.server.db.Sync() } +// handleInvMsg handles inv messages from all peers. +// We examine the inventory advertised by the remote peer and act accordingly. +// +// NOTE: This will need to have tx handling added as well when they are +// supported. +func (b *blockManager) handleInvMsg(imsg *invMsg) { + // Attempt to find the final block in the inventory list. There may + // not be one. + lastBlock := -1 + invVects := imsg.inv.InvList + for i := len(invVects) - 1; i >= 0; i-- { + if invVects[i].Type == btcwire.InvVect_Block { + lastBlock = i + break + } + } + + // Request the advertised inventory if we don't already have it. Also, + // request parent blocks of orphans if we receive one we already have. + // Finally, attempt to detect potential stalls due to long side chains + // we already have and request more blocks to prevent them. + for i, iv := range invVects { + switch iv.Type { + case btcwire.InvVect_Block: + // Add the inventory to the cache of known inventory + // for the peer. + imsg.peer.addKnownInventory(iv) + + // Request the inventory if we don't already have it. + if !b.blockChain.HaveInventory(iv) { + // Add it to the request queue. + imsg.peer.requestQueue.PushBack(iv) + continue + } + + // The block is an orphan block that we already have. + // When the existing orphan was processed, it requested + // the missing parent blocks. When this scenario + // happens, it means there were more blocks missing + // than are allowed into a single inventory message. As + // a result, once this peer requested the final + // advertised block, the remote peer noticed and is now + // resending the orphan block as an available block + // to signal there are more missing blocks that need to + // be requested. + if b.blockChain.IsKnownOrphan(&iv.Hash) { + // Request blocks starting at the latest known + // up to the root of the orphan that just came + // in. + orphanRoot := b.blockChain.GetOrphanRoot( + &iv.Hash) + locator, err := b.blockChain.LatestBlockLocator() + if err != nil { + log.Errorf("[PEER] Failed to get block "+ + "locator for the latest block: "+ + "%v", err) + continue + } + imsg.peer.PushGetBlocksMsg(locator, orphanRoot) + continue + } + + // We already have the final block advertised by this + // inventory message, so force a request for more. This + // should only really happen if we're on a really long + // side chain. + if i == lastBlock { + // Request blocks after this one up to the + // final one the remote peer knows about (zero + // stop hash). + locator := b.blockChain.BlockLocatorFromHash( + &iv.Hash) + imsg.peer.PushGetBlocksMsg(locator, &zeroHash) + } + + // Ignore unsupported inventory types. + default: + continue + } + } + + // Request as much as possible at once. Anything that won't fit into + // the request will be requested on the next inv message. + numRequested := 0 + gdmsg := btcwire.NewMsgGetData() + for e := imsg.peer.requestQueue.Front(); e != nil; e = imsg.peer.requestQueue.Front() { + iv := e.Value.(*btcwire.InvVect) + gdmsg.AddInvVect(iv) + imsg.peer.requestQueue.Remove(e) + // check that no one else has asked for this + // put on global ``requested'' map + // put on local ``requested'' map + + numRequested++ + if numRequested >= btcwire.MaxInvPerMsg { + break + } + } + if len(gdmsg.InvList) > 0 { + imsg.peer.QueueMessage(gdmsg) + } +} + // blockHandler is the main handler for the block manager. It must be run // as a goroutine. It processes block and inv messages in a separate goroutine // from the peer handlers so the block (MsgBlock) and tx (MsgTx) messages are @@ -265,6 +376,8 @@ out: case bmsg := <-b.blockQueue: b.handleBlockMsg(bmsg) bmsg.peer.blockProcessed <- true + case imsg := <-b.invQueue: + b.handleInvMsg(imsg) case <-b.quit: break out @@ -294,7 +407,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { "for the latest block: %v", err) break } - peer.pushGetBlocksMsg(locator, orphanRoot) + peer.PushGetBlocksMsg(locator, orphanRoot) delete(b.blockPeer, *orphanRoot) break } else { @@ -349,6 +462,18 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { b.blockQueue <- &bmsg } +// QueueInv adds the passed inv message and peer to the block handling queue. +func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { + // No channel handlign ehre because peers do not need to block on inv + // messages. + if b.shutdown { + return + } + + imsg := invMsg{inv: inv, peer: p} + b.invQueue <- &imsg +} + // Start begins the core block handler which processes block and inv messages. func (b *blockManager) Start() { // Already started? @@ -393,6 +518,7 @@ func newBlockManager(s *server) *blockManager { newCandidates: make(chan *peer, cfg.MaxPeers), donePeers: make(chan *peer, cfg.MaxPeers), blockQueue: make(chan *blockMsg, chanBufferSize), + invQueue: make(chan *invMsg, chanBufferSize), chainNotify: chainNotify, quit: make(chan bool), } diff --git a/peer.go b/peer.go index a7d031cf..471230fe 100644 --- a/peer.go +++ b/peer.go @@ -345,7 +345,7 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error { // pushGetBlocksMsg sends a getblocks message for the provided block locator // and stop hash. It will ignore back-to-back duplicate requests. -func (p *peer) pushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error { +func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire.ShaHash) error { p.prevGetBlockMutex.Lock() defer p.prevGetBlockMutex.Unlock() @@ -418,102 +418,10 @@ func (p *peer) handleBlockMsg(msg *btcwire.MsgBlock, buf []byte) { // handleInvMsg is invoked when a peer receives an inv bitcoin message and is // used to examine the inventory being advertised by the remote peer and react -// accordingly. -// -// NOTE: This will need to have tx handling added as well when they are -// supported. +// accordingly. We pass the message down to blockmanager which will call +// PushMessage with any appropraite responses. func (p *peer) handleInvMsg(msg *btcwire.MsgInv) { - // Attempt to find the final block in the inventory list. There may - // not be one. - lastBlock := -1 - invVects := msg.InvList - for i := len(invVects) - 1; i >= 0; i-- { - if invVects[i].Type == btcwire.InvVect_Block { - lastBlock = i - break - } - } - - // Request the advertised inventory if we don't already have it. Also, - // request parent blocks of orphans if we receive one we already have. - // Finally, attempt to detect potential stalls due to long side chains - // we already have and request more blocks to prevent them. - chain := p.server.blockManager.blockChain - for i, iv := range invVects { - switch iv.Type { - case btcwire.InvVect_Block: - // Add the inventory to the cache of known inventory - // for the peer. - p.addKnownInventory(iv) - - // Request the inventory if we don't already have it. - if !chain.HaveInventory(iv) { - // Add it to the request queue. - p.requestQueue.PushBack(iv) - continue - } - - // The block is an orphan block that we already have. - // When the existing orphan was processed, it requested - // the missing parent blocks. When this scenario - // happens, it means there were more blocks missing - // than are allowed into a single inventory message. As - // a result, once this peer requested the final - // advertised block, the remote peer noticed and is now - // resending the orphan block as an available block - // to signal there are more missing blocks that need to - // be requested. - if chain.IsKnownOrphan(&iv.Hash) { - // Request blocks starting at the latest known - // up to the root of the orphan that just came - // in. - orphanRoot := chain.GetOrphanRoot(&iv.Hash) - locator, err := chain.LatestBlockLocator() - if err != nil { - log.Errorf("[PEER] Failed to get block "+ - "locator for the latest block: "+ - "%v", err) - continue - } - p.pushGetBlocksMsg(locator, orphanRoot) - continue - } - - // We already have the final block advertised by this - // inventory message, so force a request for more. This - // should only really happen if we're on a really long - // side chain. - if i == lastBlock { - // Request blocks after this one up to the - // final one the remote peer knows about (zero - // stop hash). - locator := chain.BlockLocatorFromHash(&iv.Hash) - p.pushGetBlocksMsg(locator, &zeroHash) - } - - // Ignore unsupported inventory types. - default: - continue - } - } - - // Request as much as possible at once. Anything that won't fit into - // the request will be requested on the next inv message. - numRequested := 0 - gdmsg := btcwire.NewMsgGetData() - for e := p.requestQueue.Front(); e != nil; e = p.requestQueue.Front() { - iv := e.Value.(*btcwire.InvVect) - gdmsg.AddInvVect(iv) - p.requestQueue.Remove(e) - - numRequested++ - if numRequested >= btcwire.MaxInvPerMsg { - break - } - } - if len(gdmsg.InvList) > 0 { - p.QueueMessage(gdmsg) - } + p.server.blockManager.QueueInv(msg, p) } // handleGetData is invoked when a peer receives a getdata bitcoin message and