Keep track of currently requested blocks per peer.

Use this information so that we do not request a block per peer we got
an inv for it, makes multi peer much quieter and rather more bandwidth
efficient.

In order to remove a number of possible races we combine blockhandling
an synchandler and use one channel for all messages. This ensures that
all messages from a single peer will be recieved in order.  It also
removes the need for a lot of locking between the peer removal code and
the block/inv handlers.
This commit is contained in:
Owain G. Ainsworth 2013-09-30 23:53:21 +01:00
parent ca7cb8c875
commit 65725189db
2 changed files with 79 additions and 51 deletions

View file

@ -25,6 +25,11 @@ const (
blockDbNamePrefix = "blocks" blockDbNamePrefix = "blocks"
) )
// newPeerMsg signifies a newly connected peer to the block handler.
type newPeerMsg struct {
peer *peer
}
// blockMsg packages a bitcoin block message and the peer it came from together // blockMsg packages a bitcoin block message and the peer it came from together
// so the block handler has access to that information. // so the block handler has access to that information.
type blockMsg struct { type blockMsg struct {
@ -39,6 +44,11 @@ type invMsg struct {
peer *peer peer *peer
} }
// donePeerMsg signifies a newly disconnected peer to the block handler.
type donePeerMsg struct {
peer *peer
}
// txMsg packages a bitcoin tx message and the peer it came from together // txMsg packages a bitcoin tx message and the peer it came from together
// so the block handler has access to that information. // so the block handler has access to that information.
type txMsg struct { type txMsg struct {
@ -54,17 +64,14 @@ type blockManager struct {
shutdown bool shutdown bool
blockChain *btcchain.BlockChain blockChain *btcchain.BlockChain
blockPeer map[btcwire.ShaHash]*peer blockPeer map[btcwire.ShaHash]*peer
requestedBlocks map[btcwire.ShaHash]bool
blockPeerMutex sync.Mutex blockPeerMutex sync.Mutex
receivedLogBlocks int64 receivedLogBlocks int64
receivedLogTx int64 receivedLogTx int64
lastBlockLogTime time.Time lastBlockLogTime time.Time
processingReqs bool processingReqs bool
syncPeer *peer syncPeer *peer
newBlocks chan bool msgChan chan interface{}
newCandidates chan *peer
donePeers chan *peer
blockQueue chan *blockMsg
invQueue chan *invMsg
chainNotify chan *btcchain.Notification chainNotify chan *btcchain.Notification
chainNotifySink chan *btcchain.Notification chainNotifySink chan *btcchain.Notification
wg sync.WaitGroup wg sync.WaitGroup
@ -120,10 +127,10 @@ func (b *blockManager) startSync(peers *list.List) {
} }
} }
// handleNewCandidateMsg deals with new peers that have signalled they may // handleNewPeerMsg deals with new peers that have signalled they may
// be considered as a sync peer (they have already successfully negotiated). It // be considered as a sync peer (they have already successfully negotiated). It
// also starts syncing if needed. It is invoked from the syncHandler goroutine. // also starts syncing if needed. It is invoked from the syncHandler goroutine.
func (b *blockManager) handleNewCandidateMsg(peers *list.List, p *peer) { func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) {
// Ignore if in the process of shutting down. // Ignore if in the process of shutting down.
if b.shutdown { if b.shutdown {
return return
@ -160,29 +167,14 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) {
b.syncPeer = nil b.syncPeer = nil
b.startSync(peers) b.startSync(peers)
} }
}
// syncHandler deals with handling downloading (syncing) the block chain from // remove requested blocks from the global map so that they will be
// other peers as they connect and disconnect. It must be run as a goroutine. // fetched from elsewher next time we get an inv.
func (b *blockManager) syncHandler() { // TODO(oga) we could possibly here check which peers have these blocks
log.Tracef("[BMGR] Starting sync handler") // and request them now to speed things up a little.
candidatePeers := list.New() for k := range p.requestedBlocks {
out: delete(b.requestedBlocks, k)
// Live while we're not shutting down.
for !b.shutdown {
select {
case peer := <-b.newCandidates:
b.handleNewCandidateMsg(candidatePeers, peer)
case peer := <-b.donePeers:
b.handleDonePeerMsg(candidatePeers, peer)
case <-b.quit:
break out
}
} }
b.wg.Done()
log.Trace("[BMGR] Sync handler done")
} }
// logBlockHeight logs a new block height as an information message to show // logBlockHeight logs a new block height as an information message to show
@ -228,6 +220,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Process the block to include validation, best chain selection, orphan // Process the block to include validation, best chain selection, orphan
// handling, etc. // handling, etc.
err := b.blockChain.ProcessBlock(bmsg.block) err := b.blockChain.ProcessBlock(bmsg.block)
// Remove block from request maps. Either chain knows about it and such
// we shouldn't have any more instances of trying to fetch it, or we
// failed to insert and thus we'll retry next time we get an inv.
delete(bmsg.peer.requestedBlocks, *blockSha)
delete(b.requestedBlocks, *blockSha)
if err != nil { if err != nil {
b.blockPeerMutex.Lock() b.blockPeerMutex.Lock()
delete(b.blockPeer, *blockSha) delete(b.blockPeer, *blockSha)
@ -343,13 +342,16 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
gdmsg := btcwire.NewMsgGetData() gdmsg := btcwire.NewMsgGetData()
for e := imsg.peer.requestQueue.Front(); e != nil; e = imsg.peer.requestQueue.Front() { for e := imsg.peer.requestQueue.Front(); e != nil; e = imsg.peer.requestQueue.Front() {
iv := e.Value.(*btcwire.InvVect) iv := e.Value.(*btcwire.InvVect)
gdmsg.AddInvVect(iv)
imsg.peer.requestQueue.Remove(e) imsg.peer.requestQueue.Remove(e)
// check that no one else has asked for this // check that no one else has asked for this. if so we don't
// put on global ``requested'' map // need to ask.
// put on local ``requested'' map if _, exists := b.requestedBlocks[iv.Hash]; !exists {
b.requestedBlocks[iv.Hash] = true
imsg.peer.requestedBlocks[iv.Hash] = true
gdmsg.AddInvVect(iv)
numRequested++
}
numRequested++
if numRequested >= btcwire.MaxInvPerMsg { if numRequested >= btcwire.MaxInvPerMsg {
break break
} }
@ -369,18 +371,28 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
// NOTE: Tx messages need to be handled here too. // NOTE: Tx messages need to be handled here too.
// (either that or block and tx need to be handled in separate threads) // (either that or block and tx need to be handled in separate threads)
func (b *blockManager) blockHandler() { func (b *blockManager) blockHandler() {
candidatePeers := list.New()
out: out:
for !b.shutdown { for !b.shutdown {
select { select {
// Handle new block messages. case m := <-b.msgChan:
case bmsg := <-b.blockQueue: switch msg := m.(type) {
b.handleBlockMsg(bmsg) case *newPeerMsg:
bmsg.peer.blockProcessed <- true b.handleNewPeerMsg(candidatePeers, msg.peer)
// Handle new inventory messages. case *blockMsg:
case imsg := <-b.invQueue: b.handleBlockMsg(msg)
b.handleInvMsg(imsg) msg.peer.blockProcessed <- true
case *invMsg:
b.handleInvMsg(msg)
case *donePeerMsg:
b.handleDonePeerMsg(candidatePeers, msg.peer)
default:
// bitch and whine.
}
case <-b.quit: case <-b.quit:
break out break out
} }
@ -487,6 +499,15 @@ out:
log.Trace("[BMGR] Chain notification handler done") log.Trace("[BMGR] Chain notification handler done")
} }
// NewPeer informs the blockmanager of a newly active peer.
func (b *blockManager) NewPeer(p *peer) {
// Ignore if we are shutting down.
if b.shutdown {
return
}
b.msgChan <- &newPeerMsg{peer: p}
}
// QueueBlock adds the passed block message and peer to the block handling queue. // QueueBlock adds the passed block message and peer to the block handling queue.
func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) { func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
// Don't accept more blocks if we're shutting down. // Don't accept more blocks if we're shutting down.
@ -496,7 +517,7 @@ func (b *blockManager) QueueBlock(block *btcutil.Block, p *peer) {
} }
bmsg := blockMsg{block: block, peer: p} bmsg := blockMsg{block: block, peer: p}
b.blockQueue <- &bmsg b.msgChan <- &bmsg
} }
// QueueInv adds the passed inv message and peer to the block handling queue. // QueueInv adds the passed inv message and peer to the block handling queue.
@ -508,7 +529,16 @@ func (b *blockManager) QueueInv(inv *btcwire.MsgInv, p *peer) {
} }
imsg := invMsg{inv: inv, peer: p} imsg := invMsg{inv: inv, peer: p}
b.invQueue <- &imsg b.msgChan <- &imsg
}
// DonePeer informs the blockmanager that a peer has disconnected.
func (b *blockManager) DonePeer(p *peer) {
// Ignore if we are shutting down.
if b.shutdown {
return
}
b.msgChan <- &donePeerMsg{peer: p}
} }
// Start begins the core block handler which processes block and inv messages. // Start begins the core block handler which processes block and inv messages.
@ -519,8 +549,7 @@ func (b *blockManager) Start() {
} }
log.Trace("[BMGR] Starting block manager") log.Trace("[BMGR] Starting block manager")
b.wg.Add(4) b.wg.Add(3)
go b.syncHandler()
go b.blockHandler() go b.blockHandler()
go b.chainNotificationSinkHandler() go b.chainNotificationSinkHandler()
go b.chainNotificationHandler() go b.chainNotificationHandler()
@ -551,12 +580,9 @@ func newBlockManager(s *server) (*blockManager, error) {
server: s, server: s,
blockChain: btcchain.New(s.db, s.btcnet, chainNotify), blockChain: btcchain.New(s.db, s.btcnet, chainNotify),
blockPeer: make(map[btcwire.ShaHash]*peer), blockPeer: make(map[btcwire.ShaHash]*peer),
requestedBlocks: make(map[btcwire.ShaHash]bool),
lastBlockLogTime: time.Now(), lastBlockLogTime: time.Now(),
newBlocks: make(chan bool, 1), msgChan: make(chan interface{}, cfg.MaxPeers*3),
newCandidates: make(chan *peer, cfg.MaxPeers),
donePeers: make(chan *peer, cfg.MaxPeers),
blockQueue: make(chan *blockMsg, chanBufferSize),
invQueue: make(chan *invMsg, chanBufferSize),
chainNotify: chainNotify, chainNotify: chainNotify,
chainNotifySink: make(chan *btcchain.Notification), chainNotifySink: make(chan *btcchain.Notification),
quit: make(chan bool), quit: make(chan bool),

View file

@ -107,6 +107,7 @@ type peer struct {
knownAddresses map[string]bool knownAddresses map[string]bool
knownInventory *MruInventoryMap knownInventory *MruInventoryMap
knownInvMutex sync.Mutex knownInvMutex sync.Mutex
requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager.
lastBlock int32 lastBlock int32
retrycount int64 retrycount int64
prevGetBlocksBegin *btcwire.ShaHash prevGetBlocksBegin *btcwire.ShaHash
@ -287,7 +288,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
} }
// Signal the block manager this peer is a new sync candidate. // Signal the block manager this peer is a new sync candidate.
p.server.blockManager.newCandidates <- p p.server.blockManager.NewPeer(p)
// TODO: Relay alerts. // TODO: Relay alerts.
} }
@ -903,7 +904,7 @@ out:
// the peer is done. // the peer is done.
p.Disconnect() p.Disconnect()
p.server.donePeers <- p p.server.donePeers <- p
p.server.blockManager.donePeers <- p p.server.blockManager.DonePeer(p)
p.quit <- true p.quit <- true
p.wg.Done() p.wg.Done()
@ -1046,6 +1047,7 @@ func newPeerBase(s *server, inbound bool) *peer {
inbound: inbound, inbound: inbound,
knownAddresses: make(map[string]bool), knownAddresses: make(map[string]bool),
knownInventory: NewMruInventoryMap(maxKnownInventory), knownInventory: NewMruInventoryMap(maxKnownInventory),
requestedBlocks: make(map[btcwire.ShaHash]bool),
requestQueue: list.New(), requestQueue: list.New(),
invSendQueue: list.New(), invSendQueue: list.New(),
outputQueue: make(chan btcwire.Message, outputBufferSize), outputQueue: make(chan btcwire.Message, outputBufferSize),