diff --git a/peer.go b/peer.go index a4cab8d2..4f04c0d3 100644 --- a/peer.go +++ b/peer.go @@ -89,6 +89,11 @@ func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAdd return na, nil } +type outMsg struct { + msg btcwire.Message + doneChan chan bool +} + // peer provides a bitcoin peer for handling bitcoin communications. type peer struct { server *server @@ -118,7 +123,7 @@ type peer struct { requestQueue *list.List invSendQueue *list.List continueHash *btcwire.ShaHash - outputQueue chan btcwire.Message + outputQueue chan outMsg outputInvChan chan *btcwire.InvVect txProcessed chan bool blockProcessed chan bool @@ -1020,7 +1025,10 @@ out: for { select { case msg := <-p.outputQueue: - p.writeMessage(msg) + p.writeMessage(msg.msg) + if msg.doneChan != nil { + msg.doneChan <- true + } case iv := <-p.outputInvChan: // No handshake? They'll find out soon enough. @@ -1067,6 +1075,20 @@ out: break out } } + + // Drain any wait channels before we go away so we don't leave something + // waiting for us. +cleanup: + for { + select { + case msg := <-p.outputQueue: + if msg.doneChan != nil { + msg.doneChan <- false + } + default: + break cleanup + } + } log.Tracef("PEER: Peer output handler done for %s", p.addr) } @@ -1074,7 +1096,7 @@ out: // uses a buffered channel to communicate with the output handler goroutine so // it is automatically rate limited and safe for concurrent access. func (p *peer) QueueMessage(msg btcwire.Message) { - p.outputQueue <- msg + p.outputQueue <- outMsg{msg: msg} } // QueueInventory adds the passed inventory to the inventory send queue which @@ -1161,7 +1183,7 @@ func newPeerBase(s *server, inbound bool) *peer { requestedBlocks: make(map[btcwire.ShaHash]bool), requestQueue: list.New(), invSendQueue: list.New(), - outputQueue: make(chan btcwire.Message, outputBufferSize), + outputQueue: make(chan outMsg, outputBufferSize), outputInvChan: make(chan *btcwire.InvVect, outputBufferSize), txProcessed: make(chan bool, 1), blockProcessed: make(chan bool, 1),