From a261436e1b18ed537294dffeec1190097897f304 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Thu, 10 Jul 2014 22:40:53 -0500 Subject: [PATCH] Use chan struct{} for throttling related chans. The done and wait channels used to throttle outgoing data are being used as semaphores. As mentioned in the previous commit, it's more efficient to use a 0-byte type and allow compiler optimizations for the specific use case. --- peer.go | 48 +++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/peer.go b/peer.go index 895e887e..ea37f9e7 100644 --- a/peer.go +++ b/peer.go @@ -114,10 +114,12 @@ func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAdd return na, nil } -// TODO(davec): Rename and comment this +// outMsg is used to house a message to be sent along with a channel to signal +// when the message has been sent (or won't be sent due to tings such as +// shutdown) type outMsg struct { msg btcwire.Message - doneChan chan bool + doneChan chan struct{} } // peer provides a bitcoin peer for handling bitcoin communications. The @@ -426,7 +428,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction hash is not known. -func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { +func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan struct{}) error { // Attempt to fetch the requested transaction from the pool. A // call could be made to check for existence first, but simply trying // to fetch a missing transaction results in the same behavior. @@ -436,7 +438,7 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) err "pool: %v", sha, err) if doneChan != nil { - doneChan <- false + doneChan <- struct{}{} } return err } @@ -453,14 +455,14 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) err // pushBlockMsg sends a block message for the provided block hash to the // connected peer. An error is returned if the block hash is not known. -func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { +func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan struct{}) error { blk, err := p.server.db.FetchBlockBySha(sha) if err != nil { peerLog.Tracef("Unable to fetch requested block sha %v: %v", sha, err) if doneChan != nil { - doneChan <- false + doneChan <- struct{}{} } return err } @@ -472,7 +474,7 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) // We only send the channel for this message if we aren't sending // an inv straight after. - var dc chan bool + var dc chan struct{} sendInv := p.continueHash != nil && p.continueHash.IsEqual(sha) if !sendInv { dc = doneChan @@ -493,7 +495,7 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) p.QueueMessage(invMsg, doneChan) p.continueHash = nil } else if doneChan != nil { - doneChan <- false + doneChan <- struct{}{} } } return nil @@ -503,11 +505,11 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) // the connected peer. Since a merkle block requires the peer to have a filter // loaded, this call will simply be ignored if there is no filter laoded. An // error is returned if the block hash is not known. -func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { +func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan struct{}) error { // Do not send a response if the peer doesn't have a filter loaded. if !p.filter.IsLoaded() { if doneChan != nil { - doneChan <- false + doneChan <- struct{}{} } return nil } @@ -518,7 +520,7 @@ func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan sha, err) if doneChan != nil { - doneChan <- false + doneChan <- struct{}{} } return err } @@ -554,7 +556,7 @@ func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan // Send the merkleblock. Only send the done channel with this message // if no transactions will be sent afterwards. - var dc chan bool + var dc chan struct{} if finalValidTxIndex == -1 { dc = doneChan } @@ -563,7 +565,7 @@ func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan // Finally, send any matched transactions. for i, txR := range txList { // Only send the done channel on the final transaction. - var dc chan bool + var dc chan struct{} if i == finalValidTxIndex { dc = doneChan } @@ -766,17 +768,17 @@ func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { // far more data than we can send in a reasonable time, wasting memory. // The waiting occurs after the database fetch for the next one to // provide a little pipelining. - var waitChan chan bool - doneChan := make(chan bool, 1) + var waitChan chan struct{} + doneChan := make(chan struct{}, 1) for i, iv := range msg.InvList { - var c chan bool + var c chan struct{} // If this will be the last message we send. if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 { c = doneChan } else if (i+1)%3 == 0 { // Buffered so as to not make the send goroutine block. - c = make(chan bool, 1) + c = make(chan struct{}, 1) } var err error switch iv.Type { @@ -1561,7 +1563,7 @@ out: val := pendingMsgs.Remove(e) msg := val.(outMsg) if msg.doneChan != nil { - msg.doneChan <- false + msg.doneChan <- struct{}{} } } cleanup: @@ -1569,7 +1571,7 @@ cleanup: select { case msg := <-p.outputQueue: if msg.doneChan != nil { - msg.doneChan <- false + msg.doneChan <- struct{}{} } case <-p.outputInvChan: // Just drain channel @@ -1644,7 +1646,7 @@ out: p.lastSend = time.Now() p.StatsMtx.Unlock() if msg.doneChan != nil { - msg.doneChan <- true + msg.doneChan <- struct{}{} } peerLog.Tracef("%s: acking queuehandler", p) p.sendDoneQueue <- struct{}{} @@ -1667,7 +1669,7 @@ cleanup: select { case msg := <-p.sendQueue: if msg.doneChan != nil { - msg.doneChan <- false + msg.doneChan <- struct{}{} } // no need to send on sendDoneQueue since queueHandler // has been waited on and already exited. @@ -1681,7 +1683,7 @@ cleanup: // QueueMessage adds the passed bitcoin message to the peer send queue. It // uses a buffered channel to communicate with the output handler goroutine so // it is automatically rate limited and safe for concurrent access. -func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) { +func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan struct{}) { // Avoid risk of deadlock if goroutine already exited. The goroutine // we will be sending to hangs around until it knows for a fact that // it is marked as disconnected. *then* it drains the channels. @@ -1689,7 +1691,7 @@ func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) { // avoid deadlock... if doneChan != nil { go func() { - doneChan <- false + doneChan <- struct{}{} }() } return