Use chan struct{} for throttling related chans.

The done and wait channels used to throttle outgoing data are being used
as semaphores.  As mentioned in the previous commit, it's more efficient
to use a 0-byte type and allow compiler optimizations for the specific use
case.
This commit is contained in:
Dave Collins 2014-07-10 22:40:53 -05:00
parent 83cffc5d27
commit a261436e1b

48
peer.go
View file

@ -114,10 +114,12 @@ func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAdd
return na, nil return na, nil
} }
// TODO(davec): Rename and comment this // outMsg is used to house a message to be sent along with a channel to signal
// when the message has been sent (or won't be sent due to tings such as
// shutdown)
type outMsg struct { type outMsg struct {
msg btcwire.Message msg btcwire.Message
doneChan chan bool doneChan chan struct{}
} }
// peer provides a bitcoin peer for handling bitcoin communications. The // peer provides a bitcoin peer for handling bitcoin communications. The
@ -426,7 +428,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// pushTxMsg sends a tx message for the provided transaction hash to the // pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction hash is not known. // connected peer. An error is returned if the transaction hash is not known.
func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan struct{}) error {
// Attempt to fetch the requested transaction from the pool. A // Attempt to fetch the requested transaction from the pool. A
// call could be made to check for existence first, but simply trying // call could be made to check for existence first, but simply trying
// to fetch a missing transaction results in the same behavior. // to fetch a missing transaction results in the same behavior.
@ -436,7 +438,7 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) err
"pool: %v", sha, err) "pool: %v", sha, err)
if doneChan != nil { if doneChan != nil {
doneChan <- false doneChan <- struct{}{}
} }
return err return err
} }
@ -453,14 +455,14 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) err
// pushBlockMsg sends a block message for the provided block hash to the // pushBlockMsg sends a block message for the provided block hash to the
// connected peer. An error is returned if the block hash is not known. // connected peer. An error is returned if the block hash is not known.
func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan struct{}) error {
blk, err := p.server.db.FetchBlockBySha(sha) blk, err := p.server.db.FetchBlockBySha(sha)
if err != nil { if err != nil {
peerLog.Tracef("Unable to fetch requested block sha %v: %v", peerLog.Tracef("Unable to fetch requested block sha %v: %v",
sha, err) sha, err)
if doneChan != nil { if doneChan != nil {
doneChan <- false doneChan <- struct{}{}
} }
return err return err
} }
@ -472,7 +474,7 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool)
// We only send the channel for this message if we aren't sending // We only send the channel for this message if we aren't sending
// an inv straight after. // an inv straight after.
var dc chan bool var dc chan struct{}
sendInv := p.continueHash != nil && p.continueHash.IsEqual(sha) sendInv := p.continueHash != nil && p.continueHash.IsEqual(sha)
if !sendInv { if !sendInv {
dc = doneChan dc = doneChan
@ -493,7 +495,7 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool)
p.QueueMessage(invMsg, doneChan) p.QueueMessage(invMsg, doneChan)
p.continueHash = nil p.continueHash = nil
} else if doneChan != nil { } else if doneChan != nil {
doneChan <- false doneChan <- struct{}{}
} }
} }
return nil return nil
@ -503,11 +505,11 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool)
// the connected peer. Since a merkle block requires the peer to have a filter // the connected peer. Since a merkle block requires the peer to have a filter
// loaded, this call will simply be ignored if there is no filter laoded. An // loaded, this call will simply be ignored if there is no filter laoded. An
// error is returned if the block hash is not known. // error is returned if the block hash is not known.
func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan struct{}) error {
// Do not send a response if the peer doesn't have a filter loaded. // Do not send a response if the peer doesn't have a filter loaded.
if !p.filter.IsLoaded() { if !p.filter.IsLoaded() {
if doneChan != nil { if doneChan != nil {
doneChan <- false doneChan <- struct{}{}
} }
return nil return nil
} }
@ -518,7 +520,7 @@ func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan
sha, err) sha, err)
if doneChan != nil { if doneChan != nil {
doneChan <- false doneChan <- struct{}{}
} }
return err return err
} }
@ -554,7 +556,7 @@ func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan
// Send the merkleblock. Only send the done channel with this message // Send the merkleblock. Only send the done channel with this message
// if no transactions will be sent afterwards. // if no transactions will be sent afterwards.
var dc chan bool var dc chan struct{}
if finalValidTxIndex == -1 { if finalValidTxIndex == -1 {
dc = doneChan dc = doneChan
} }
@ -563,7 +565,7 @@ func (p *peer) pushMerkleBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan
// Finally, send any matched transactions. // Finally, send any matched transactions.
for i, txR := range txList { for i, txR := range txList {
// Only send the done channel on the final transaction. // Only send the done channel on the final transaction.
var dc chan bool var dc chan struct{}
if i == finalValidTxIndex { if i == finalValidTxIndex {
dc = doneChan dc = doneChan
} }
@ -766,17 +768,17 @@ func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
// far more data than we can send in a reasonable time, wasting memory. // far more data than we can send in a reasonable time, wasting memory.
// The waiting occurs after the database fetch for the next one to // The waiting occurs after the database fetch for the next one to
// provide a little pipelining. // provide a little pipelining.
var waitChan chan bool var waitChan chan struct{}
doneChan := make(chan bool, 1) doneChan := make(chan struct{}, 1)
for i, iv := range msg.InvList { for i, iv := range msg.InvList {
var c chan bool var c chan struct{}
// If this will be the last message we send. // If this will be the last message we send.
if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 { if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 {
c = doneChan c = doneChan
} else if (i+1)%3 == 0 { } else if (i+1)%3 == 0 {
// Buffered so as to not make the send goroutine block. // Buffered so as to not make the send goroutine block.
c = make(chan bool, 1) c = make(chan struct{}, 1)
} }
var err error var err error
switch iv.Type { switch iv.Type {
@ -1561,7 +1563,7 @@ out:
val := pendingMsgs.Remove(e) val := pendingMsgs.Remove(e)
msg := val.(outMsg) msg := val.(outMsg)
if msg.doneChan != nil { if msg.doneChan != nil {
msg.doneChan <- false msg.doneChan <- struct{}{}
} }
} }
cleanup: cleanup:
@ -1569,7 +1571,7 @@ cleanup:
select { select {
case msg := <-p.outputQueue: case msg := <-p.outputQueue:
if msg.doneChan != nil { if msg.doneChan != nil {
msg.doneChan <- false msg.doneChan <- struct{}{}
} }
case <-p.outputInvChan: case <-p.outputInvChan:
// Just drain channel // Just drain channel
@ -1644,7 +1646,7 @@ out:
p.lastSend = time.Now() p.lastSend = time.Now()
p.StatsMtx.Unlock() p.StatsMtx.Unlock()
if msg.doneChan != nil { if msg.doneChan != nil {
msg.doneChan <- true msg.doneChan <- struct{}{}
} }
peerLog.Tracef("%s: acking queuehandler", p) peerLog.Tracef("%s: acking queuehandler", p)
p.sendDoneQueue <- struct{}{} p.sendDoneQueue <- struct{}{}
@ -1667,7 +1669,7 @@ cleanup:
select { select {
case msg := <-p.sendQueue: case msg := <-p.sendQueue:
if msg.doneChan != nil { if msg.doneChan != nil {
msg.doneChan <- false msg.doneChan <- struct{}{}
} }
// no need to send on sendDoneQueue since queueHandler // no need to send on sendDoneQueue since queueHandler
// has been waited on and already exited. // has been waited on and already exited.
@ -1681,7 +1683,7 @@ cleanup:
// QueueMessage adds the passed bitcoin message to the peer send queue. It // QueueMessage adds the passed bitcoin message to the peer send queue. It
// uses a buffered channel to communicate with the output handler goroutine so // uses a buffered channel to communicate with the output handler goroutine so
// it is automatically rate limited and safe for concurrent access. // it is automatically rate limited and safe for concurrent access.
func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) { func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan struct{}) {
// Avoid risk of deadlock if goroutine already exited. The goroutine // Avoid risk of deadlock if goroutine already exited. The goroutine
// we will be sending to hangs around until it knows for a fact that // we will be sending to hangs around until it knows for a fact that
// it is marked as disconnected. *then* it drains the channels. // it is marked as disconnected. *then* it drains the channels.
@ -1689,7 +1691,7 @@ func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) {
// avoid deadlock... // avoid deadlock...
if doneChan != nil { if doneChan != nil {
go func() { go func() {
doneChan <- false doneChan <- struct{}{}
}() }()
} }
return return