From 65725189db703515e82b80e40b9ce4642230faff Mon Sep 17 00:00:00 2001 From: "Owain G. Ainsworth" Date: Mon, 30 Sep 2013 23:53:21 +0100 Subject: [PATCH] Keep track of currently requested blocks per peer. Use this information so that we do not request a block per peer we got an inv for it, makes multi peer much quieter and rather more bandwidth efficient. In order to remove a number of possible races we combine blockhandling an synchandler and use one channel for all messages. This ensures that all messages from a single peer will be recieved in order. It also removes the need for a lot of locking between the peer removal code and the block/inv handlers. --- blockmanager.go | 124 +++++++++++++++++++++++++++++------------------- peer.go | 6 ++- 2 files changed, 79 insertions(+), 51 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 5e211ad2..cbc0cb38 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -25,6 +25,11 @@ const ( blockDbNamePrefix = "blocks" ) +// newPeerMsg signifies a newly connected peer to the block handler. +type newPeerMsg struct { + peer *peer +} + // blockMsg packages a bitcoin block message and the peer it came from together // so the block handler has access to that information. type blockMsg struct { @@ -39,6 +44,11 @@ type invMsg struct { peer *peer } +// donePeerMsg signifies a newly disconnected peer to the block handler. +type donePeerMsg struct { + peer *peer +} + // txMsg packages a bitcoin tx message and the peer it came from together // so the block handler has access to that information. type txMsg struct { @@ -54,17 +64,14 @@ type blockManager struct { shutdown bool blockChain *btcchain.BlockChain blockPeer map[btcwire.ShaHash]*peer + requestedBlocks map[btcwire.ShaHash]bool blockPeerMutex sync.Mutex receivedLogBlocks int64 receivedLogTx int64 lastBlockLogTime time.Time processingReqs bool syncPeer *peer - newBlocks chan bool - newCandidates chan *peer - donePeers chan *peer - blockQueue chan *blockMsg - invQueue chan *invMsg + msgChan chan interface{} chainNotify chan *btcchain.Notification chainNotifySink chan *btcchain.Notification wg sync.WaitGroup @@ -120,10 +127,10 @@ func (b *blockManager) startSync(peers *list.List) { } } -// handleNewCandidateMsg deals with new peers that have signalled they may +// handleNewPeerMsg deals with new peers that have signalled they may // be considered as a sync peer (they have already successfully negotiated). It // also starts syncing if needed. It is invoked from the syncHandler goroutine. -func (b *blockManager) handleNewCandidateMsg(peers *list.List, p *peer) { +func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) { // Ignore if in the process of shutting down. if b.shutdown { return @@ -160,29 +167,14 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) { b.syncPeer = nil b.startSync(peers) } -} -// syncHandler deals with handling downloading (syncing) the block chain from -// other peers as they connect and disconnect. It must be run as a goroutine. -func (b *blockManager) syncHandler() { - log.Tracef("[BMGR] Starting sync handler") - candidatePeers := list.New() -out: - // Live while we're not shutting down. - for !b.shutdown { - select { - case peer := <-b.newCandidates: - b.handleNewCandidateMsg(candidatePeers, peer) - - case peer := <-b.donePeers: - b.handleDonePeerMsg(candidatePeers, peer) - - case <-b.quit: - break out - } + // remove requested blocks from the global map so that they will be + // fetched from elsewher next time we get an inv. + // TODO(oga) we could possibly here check which peers have these blocks + // and request them now to speed things up a little. + for k := range p.requestedBlocks { + delete(b.requestedBlocks, k) } - b.wg.Done() - log.Trace("[BMGR] Sync handler done") } // logBlockHeight logs a new block height as an information message to show @@ -228,6 +220,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Process the block to include validation, best chain selection, orphan // handling, etc. err := b.blockChain.ProcessBlock(bmsg.block) + + // Remove block from request maps. Either chain knows about it and such + // we shouldn't have any more instances of trying to fetch it, or we + // failed to insert and thus we'll retry next time we get an inv. + delete(bmsg.peer.requestedBlocks, *blockSha) + delete(b.requestedBlocks, *blockSha) + if err != nil { b.blockPeerMutex.Lock() delete(b.blockPeer, *blockSha) @@ -343,13 +342,16 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { gdmsg := btcwire.NewMsgGetData() for e := imsg.peer.requestQueue.Front(); e != nil; e = imsg.peer.requestQueue.Front() { iv := e.Value.(*btcwire.InvVect) - gdmsg.AddInvVect(iv) imsg.peer.requestQueue.Remove(e) - // check that no one else has asked for this - // put on global ``requested'' map - // put on local ``requested'' map + // check that no one else has asked for this. if so we don't + // need to ask. + if _, exists := b.requestedBlocks[iv.Hash]; !exists { + b.requestedBlocks[iv.Hash] = true + imsg.peer.requestedBlocks[iv.Hash] = true + gdmsg.AddInvVect(iv) + numRequested++ + } - numRequested++ if numRequested >= btcwire.MaxInvPerMsg { break } @@ -369,18 +371,28 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { // NOTE: Tx messages need to be handled here too. // (either that or block and tx need to be handled in separate threads) func (b *blockManager) blockHandler() { + candidatePeers := list.New() out: for !b.shutdown { select { - // Handle new block messages. - case bmsg := <-b.blockQueue: - b.handleBlockMsg(bmsg) - bmsg.peer.blockProcessed <- true + case m := <-b.msgChan: + switch msg := m.(type) { + case *newPeerMsg: + b.handleNewPeerMsg(candidatePeers, msg.peer) - // Handle new inventory messages. - case imsg := <-b.invQueue: - b.handleInvMsg(imsg) + case *blockMsg: + b.handleBlockMsg(msg) + msg.peer.blockProcessed <- true + case *invMsg: + b.handleInvMsg(msg) + + case *donePeerMsg: + b.handleDonePeerMsg(candidatePeers, msg.peer) + + default: + // bitch and whine. + } case <-b.quit: break out } @@ -487,6 +499,15 @@ out: log.Trace("[BMGR] Chain notification handler done") } +// NewPeer informs the blockmanager of a newly active peer. +func (b *blockManager) NewPeer(p *peer) { + // Ignore if we are shutting down. + if b.shutdown { + return + } + b.msgChan <- &newPeerMsg{peer: p} +} + // QueueBlock adds the passed block message and peer to the block handling queue. func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { // Don't accept more blocks if we're shutting down. @@ -496,7 +517,7 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { } bmsg := blockMsg{block: block, peer: p} - b.blockQueue <- &bmsg + b.msgChan <- &bmsg } // QueueInv adds the passed inv message and peer to the block handling queue. @@ -508,7 +529,16 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) { } imsg := invMsg{inv: inv, peer: p} - b.invQueue <- &imsg + b.msgChan <- &imsg +} + +// DonePeer informs the blockmanager that a peer has disconnected. +func (b *blockManager) DonePeer(p *peer) { + // Ignore if we are shutting down. + if b.shutdown { + return + } + b.msgChan <- &donePeerMsg{peer: p} } // Start begins the core block handler which processes block and inv messages. @@ -519,8 +549,7 @@ func (b *blockManager) Start() { } log.Trace("[BMGR] Starting block manager") - b.wg.Add(4) - go b.syncHandler() + b.wg.Add(3) go b.blockHandler() go b.chainNotificationSinkHandler() go b.chainNotificationHandler() @@ -551,12 +580,9 @@ func newBlockManager(s *server) (*blockManager, error) { server: s, blockChain: btcchain.New(s.db, s.btcnet, chainNotify), blockPeer: make(map[btcwire.ShaHash]*peer), + requestedBlocks: make(map[btcwire.ShaHash]bool), lastBlockLogTime: time.Now(), - newBlocks: make(chan bool, 1), - newCandidates: make(chan *peer, cfg.MaxPeers), - donePeers: make(chan *peer, cfg.MaxPeers), - blockQueue: make(chan *blockMsg, chanBufferSize), - invQueue: make(chan *invMsg, chanBufferSize), + msgChan: make(chan interface{}, cfg.MaxPeers*3), chainNotify: chainNotify, chainNotifySink: make(chan *btcchain.Notification), quit: make(chan bool), diff --git a/peer.go b/peer.go index 4d5c9d68..f6aa3a44 100644 --- a/peer.go +++ b/peer.go @@ -107,6 +107,7 @@ type peer struct { knownAddresses map[string]bool knownInventory *MruInventoryMap knownInvMutex sync.Mutex + requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager. lastBlock int32 retrycount int64 prevGetBlocksBegin *btcwire.ShaHash @@ -287,7 +288,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { } // Signal the block manager this peer is a new sync candidate. - p.server.blockManager.newCandidates <- p + p.server.blockManager.NewPeer(p) // TODO: Relay alerts. } @@ -903,7 +904,7 @@ out: // the peer is done. p.Disconnect() p.server.donePeers <- p - p.server.blockManager.donePeers <- p + p.server.blockManager.DonePeer(p) p.quit <- true p.wg.Done() @@ -1046,6 +1047,7 @@ func newPeerBase(s *server, inbound bool) *peer { inbound: inbound, knownAddresses: make(map[string]bool), knownInventory: NewMruInventoryMap(maxKnownInventory), + requestedBlocks: make(map[btcwire.ShaHash]bool), requestQueue: list.New(), invSendQueue: list.New(), outputQueue: make(chan btcwire.Message, outputBufferSize),