Make it possible to get a notification when a message has been sent.

If a channel is passed in then true will be sent on the provided channel
after the message has successfully sent.
This commit is contained in:
Owain G. Ainsworth 2013-10-11 18:30:14 +01:00
parent 178d9dc7b0
commit 2231456c23

30
peer.go
View file

@ -89,6 +89,11 @@ func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAdd
return na, nil return na, nil
} }
type outMsg struct {
msg btcwire.Message
doneChan chan bool
}
// peer provides a bitcoin peer for handling bitcoin communications. // peer provides a bitcoin peer for handling bitcoin communications.
type peer struct { type peer struct {
server *server server *server
@ -118,7 +123,7 @@ type peer struct {
requestQueue *list.List requestQueue *list.List
invSendQueue *list.List invSendQueue *list.List
continueHash *btcwire.ShaHash continueHash *btcwire.ShaHash
outputQueue chan btcwire.Message outputQueue chan outMsg
outputInvChan chan *btcwire.InvVect outputInvChan chan *btcwire.InvVect
txProcessed chan bool txProcessed chan bool
blockProcessed chan bool blockProcessed chan bool
@ -1020,7 +1025,10 @@ out:
for { for {
select { select {
case msg := <-p.outputQueue: case msg := <-p.outputQueue:
p.writeMessage(msg) p.writeMessage(msg.msg)
if msg.doneChan != nil {
msg.doneChan <- true
}
case iv := <-p.outputInvChan: case iv := <-p.outputInvChan:
// No handshake? They'll find out soon enough. // No handshake? They'll find out soon enough.
@ -1067,6 +1075,20 @@ out:
break out break out
} }
} }
// Drain any wait channels before we go away so we don't leave something
// waiting for us.
cleanup:
for {
select {
case msg := <-p.outputQueue:
if msg.doneChan != nil {
msg.doneChan <- false
}
default:
break cleanup
}
}
log.Tracef("PEER: Peer output handler done for %s", p.addr) log.Tracef("PEER: Peer output handler done for %s", p.addr)
} }
@ -1074,7 +1096,7 @@ out:
// uses a buffered channel to communicate with the output handler goroutine so // uses a buffered channel to communicate with the output handler goroutine so
// it is automatically rate limited and safe for concurrent access. // it is automatically rate limited and safe for concurrent access.
func (p *peer) QueueMessage(msg btcwire.Message) { func (p *peer) QueueMessage(msg btcwire.Message) {
p.outputQueue <- msg p.outputQueue <- outMsg{msg: msg}
} }
// QueueInventory adds the passed inventory to the inventory send queue which // QueueInventory adds the passed inventory to the inventory send queue which
@ -1161,7 +1183,7 @@ func newPeerBase(s *server, inbound bool) *peer {
requestedBlocks: make(map[btcwire.ShaHash]bool), requestedBlocks: make(map[btcwire.ShaHash]bool),
requestQueue: list.New(), requestQueue: list.New(),
invSendQueue: list.New(), invSendQueue: list.New(),
outputQueue: make(chan btcwire.Message, outputBufferSize), outputQueue: make(chan outMsg, outputBufferSize),
outputInvChan: make(chan *btcwire.InvVect, outputBufferSize), outputInvChan: make(chan *btcwire.InvVect, outputBufferSize),
txProcessed: make(chan bool, 1), txProcessed: make(chan bool, 1),
blockProcessed: make(chan bool, 1), blockProcessed: make(chan bool, 1),