Alter the way peers queue outbound messages somewhat.

Instead of one thread that queues and writes, we move to a two queue
model. The queueHandler muxes all the sources of outgoung packets and
drips them to the actual sender. This is done so that a large send
doesnt' allow the channels to fillup and cause blockmanager and server
to block, which delays other peers.

Most messages we handle as is. However, for getdata we do some manual
limiting and pipelining, we queue up three and then we load the next
into memory, not sending it until the otherp ackets have been sent. We
may want to change this later to queue the packet *then* wait so that we
don't completely drain the pipe.

A few misc tweaks to avoid deadlocking by ensuring the all channels will
always drain. mostly this relates to ensuring that we know no more data
will be coming before we drain the channel, and not queueing after we
are marked to disconnect.

Discussed heavily with drahn@ and davec@.
This commit is contained in:
Owain G. Ainsworth 2013-12-20 13:06:37 +00:00
parent cc9aadf041
commit d3a7f15a87

269
peer.go
View file

@ -130,14 +130,16 @@ type peer struct {
prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager.
prevGetBlocksStop *btcwire.ShaHash // owned by blockmaanger.
requestQueue *list.List
invSendQueue *list.List
continueHash *btcwire.ShaHash
outputQueue chan outMsg
outputInvChan chan *btcwire.InvVect
txProcessed chan bool
blockProcessed chan bool
quit chan bool
userAgent string
sendQueue chan outMsg
sendDoneQueue chan bool
queueWg sync.WaitGroup // TODO(oga) wg -> single use channel?
outputInvChan chan *btcwire.InvVect
txProcessed chan bool
blockProcessed chan bool
quit chan bool
userAgent string
}
// String returns the peer's address and directionality as a human-readable
@ -322,7 +324,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction hash is not known.
func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error {
func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error {
// Attempt to fetch the requested transaction from the pool. A
// call could be made to check for existence first, but simply trying
// to fetch a missing transaction results in the same behavior.
@ -332,6 +334,12 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error {
"pool: %v", sha, err)
return err
}
// Once we have fetched data wait for any previous operation to finish.
if waitChan != nil {
<-waitChan
}
p.QueueMessage(tx.MsgTx(), doneChan)
return nil
@ -339,21 +347,7 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error {
// pushBlockMsg sends a block message for the provided block hash to the
// connected peer. An error is returned if the block hash is not known.
func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan chan bool) error {
// What should this function do about the rate limiting the
// number of blocks queued for this peer?
// Current thought is have a counting mutex in the peer
// such that if > N Tx/Block requests are currently in
// the tx queue, wait until the mutex clears allowing more to be
// sent. This prevents 500 1+MB blocks from being loaded into
// memory and sit around until the output queue drains.
// Actually the outputQueue has a limit of 50 in its queue
// but still 50MB to 1.6GB(50 32MB blocks) just setting
// in memory waiting to be sent is pointless.
// I would recommend a getdata request limit of about 5
// outstanding objects.
// Should the tx complete api be a mutex or channel?
func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error {
blk, err := p.server.db.FetchBlockBySha(sha)
if err != nil {
peerLog.Tracef("Unable to fetch requested block sha %v: %v",
@ -361,6 +355,11 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan chan bool) error {
return err
}
// Once we have fetched data wait for any previous operation to finish.
if waitChan != nil {
<-waitChan
}
// We only send the channel for this message if we aren't sending
// an inv straight after.
var dc chan bool
@ -570,6 +569,11 @@ func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) {
func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
notFound := btcwire.NewMsgNotFound()
// We wait on the this wait channel periodically to prevent queueing
// far more data than we can send in a reasonable time, wasting memory.
// The waiting occurs after the database fetch for the next one to
// provide a little pipelining.
var waitChan chan bool
doneChan := make(chan bool)
out:
for i, iv := range msg.InvList {
@ -577,13 +581,16 @@ out:
// If this will be the last message we send.
if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 {
c = doneChan
} else if i > 0 && i+1%3 == 0 {
// buffered so as to not make the send goroutine block.
c = make(chan bool, 1)
}
var err error
switch iv.Type {
case btcwire.InvTypeTx:
err = p.pushTxMsg(&iv.Hash, c)
err = p.pushTxMsg(&iv.Hash, c, waitChan)
case btcwire.InvTypeBlock:
err = p.pushBlockMsg(&iv.Hash, c)
err = p.pushBlockMsg(&iv.Hash, c, waitChan)
default:
peerLog.Warnf("Unknown type in inventory request %d",
iv.Type)
@ -592,6 +599,7 @@ out:
if err != nil {
notFound.AddInvVect(iv)
}
waitChan = c
}
if len(notFound.InvList) != 0 {
p.QueueMessage(notFound, doneChan)
@ -1117,6 +1125,7 @@ out:
// Ensure connection is closed and notify server and block manager that
// the peer is done.
p.Disconnect()
p.server.donePeers <- p
// Only tell blockmanager we are gone if we ever told it we existed.
if p.versionKnown {
@ -1126,11 +1135,136 @@ out:
peerLog.Tracef("Peer input handler done for %s", p.addr)
}
// queueHandler handles the queueing of outgoing data for the peer. This runs
// as a muxer for various sources of input so we can ensure that blockmanager
// and the server goroutine both will not block on us sending a message.
// We then pass the data on to outHandler to be actually written.
func (p *peer) queueHandler() {
pendingMsgs := list.New()
invSendQueue := list.New()
trickleTicker := time.NewTicker(time.Second * 10)
// We keep the waiting flag so that we know if we have a message queued
// to the outHandler or not. We could use the presence of a head
// of the list for this but then we have rather racy concerns about
// whether it has gotten it at cleanup time - and thus who sends on the
// message's done channel. To avoid such confusion we keep a different
// flag and pendingMsgs only contains messages that we have not yet
// passed to outHandler.
waiting := false
// To avoid duplication below.
queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
if !waiting {
peerLog.Tracef("%s: sending to outHandler", p)
p.sendQueue <- msg
peerLog.Tracef("%s: sent to outHandler", p)
} else {
list.PushBack(msg)
}
// we are always waiting now.
return true
}
out:
for {
select {
case msg := <-p.outputQueue:
waiting = queuePacket(msg, pendingMsgs, waiting)
case <-p.sendDoneQueue:
peerLog.Tracef("%s: acked by outhandler", p)
// one message on sendChan means one message on
// txDoneChan when it has been sent.
next := pendingMsgs.Front()
if next != nil {
val := pendingMsgs.Remove(next)
peerLog.Tracef("%s: sending to outHandler", p)
p.sendQueue <- val.(outMsg)
peerLog.Tracef("%s: sent to outHandler", p)
} else {
waiting = false
}
case iv := <-p.outputInvChan:
// No handshake? They'll find out soon enough.
if p.versionKnown {
invSendQueue.PushBack(iv)
}
case <-trickleTicker.C:
// Don't send anything if we're disconnecting or there
// is no queued inventory.
// version is known if send queue has any entries.
if atomic.LoadInt32(&p.disconnect) != 0 ||
invSendQueue.Len() == 0 {
continue
}
// Create and send as many inv messages as needed to
// drain the inventory send queue.
invMsg := btcwire.NewMsgInv()
for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
iv := invSendQueue.Remove(e).(*btcwire.InvVect)
// Don't send inventory that became known after
// the initial check.
if p.isKnownInventory(iv) {
continue
}
invMsg.AddInvVect(iv)
if len(invMsg.InvList) >= maxInvTrickleSize {
waiting = queuePacket(
outMsg{msg: invMsg},
pendingMsgs, waiting)
invMsg = btcwire.NewMsgInv()
}
// Add the inventory that is being relayed to
// the known inventory for the peer.
p.addKnownInventory(iv)
}
if len(invMsg.InvList) > 0 {
waiting = queuePacket(outMsg{msg: invMsg},
pendingMsgs, waiting)
}
case <-p.quit:
break out
}
}
// Drain any wait channels before we go away so we don't leave something
// waiting for us.
for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
val := pendingMsgs.Remove(e)
msg := val.(outMsg)
if msg.doneChan != nil {
msg.doneChan <- false
}
}
cleanup:
for {
select {
case msg := <-p.outputQueue:
if msg.doneChan != nil {
msg.doneChan <- false
}
case <-p.outputInvChan:
// Just drain channel
// sendDoneQueue is buffered so doesn't need draining.
default:
break cleanup
}
}
p.queueWg.Done()
peerLog.Tracef("Peer queue handler done for %s", p.addr)
}
// outHandler handles all outgoing messages for the peer. It must be run as a
// goroutine. It uses a buffered channel to serialize output messages while
// allowing the sender to continue running asynchronously.
func (p *peer) outHandler() {
trickleTicker := time.NewTicker(time.Second * 10)
pingTimer := time.AfterFunc(pingTimeoutMinutes*time.Minute, func() {
nonce, err := btcwire.RandomUint64()
if err != nil {
@ -1143,7 +1277,7 @@ func (p *peer) outHandler() {
out:
for {
select {
case msg := <-p.outputQueue:
case msg := <-p.sendQueue:
// If the message is one we should get a reply for
// then reset the timer, we only want to send pings
// when otherwise we would not recieve a reply from
@ -1152,6 +1286,7 @@ out:
// the inv is of no interest explicitly solicited invs
// should elicit a reply but we don't track them
// specially.
peerLog.Tracef("%s: recieved from queuehandler", p)
reset := true
switch msg.msg.(type) {
case *btcwire.MsgVersion:
@ -1181,47 +1316,9 @@ out:
if msg.doneChan != nil {
msg.doneChan <- true
}
case iv := <-p.outputInvChan:
// No handshake? They'll find out soon enough.
if p.versionKnown {
p.invSendQueue.PushBack(iv)
}
case <-trickleTicker.C:
// Don't send anything if we're disconnecting or there
// is no queued inventory.
if atomic.LoadInt32(&p.disconnect) != 0 ||
p.invSendQueue.Len() == 0 ||
!p.versionKnown {
continue
}
// Create and send as many inv messages as needed to
// drain the inventory send queue.
invMsg := btcwire.NewMsgInv()
for e := p.invSendQueue.Front(); e != nil; e = p.invSendQueue.Front() {
iv := p.invSendQueue.Remove(e).(*btcwire.InvVect)
// Don't send inventory that became known after
// the initial check.
if p.isKnownInventory(iv) {
continue
}
invMsg.AddInvVect(iv)
if len(invMsg.InvList) >= maxInvTrickleSize {
p.writeMessage(invMsg)
invMsg = btcwire.NewMsgInv()
}
// Add the inventory that is being relayed to
// the known inventory for the peer.
p.addKnownInventory(iv)
}
if len(invMsg.InvList) > 0 {
p.writeMessage(invMsg)
}
peerLog.Tracef("%s: acking queuehandler", p)
p.sendDoneQueue <- true
peerLog.Tracef("%s: acked queuehandler", p)
case <-p.quit:
break out
@ -1230,17 +1327,20 @@ out:
pingTimer.Stop()
p.queueWg.Wait()
// Drain any wait channels before we go away so we don't leave something
// waiting for us.
// waiting for us. We have waited on queueWg and thus we can be sure
// that we will not miss anything sent on sendQueue.
cleanup:
for {
select {
case msg := <-p.outputQueue:
case msg := <-p.sendQueue:
if msg.doneChan != nil {
msg.doneChan <- false
}
case <-p.outputInvChan:
// Just drain channel
// no need to send on sendDoneQueue since queueHandler
// has been waited on and already exited.
default:
break cleanup
}
@ -1252,6 +1352,18 @@ cleanup:
// uses a buffered channel to communicate with the output handler goroutine so
// it is automatically rate limited and safe for concurrent access.
func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) {
// Avoid risk of deadlock if goroutine already exited. The goroutine
// we will be sending to hangs around until it knows for a fact that
// it is marked as disconnected. *then* it drains the channels.
if !p.Connected() {
// avoid deadlock...
if doneChan != nil {
go func() {
doneChan <- false
}()
}
return
}
p.outputQueue <- outMsg{msg: msg, doneChan: doneChan}
}
@ -1266,6 +1378,13 @@ func (p *peer) QueueInventory(invVect *btcwire.InvVect) {
return
}
// Avoid risk of deadlock if goroutine already exited. The goroutine
// we will be sending to hangs around until it knows for a fact that
// it is marked as disconnected. *then* it drains the channels.
if !p.Connected() {
return
}
p.outputInvChan <- invVect
}
@ -1282,6 +1401,7 @@ func (p *peer) Disconnect() {
if atomic.AddInt32(&p.disconnect, 1) != 1 {
return
}
peerLog.Tracef("disconnecting %s", p.addr)
close(p.quit)
if atomic.LoadInt32(&p.connected) != 0 {
p.conn.Close()
@ -1311,6 +1431,10 @@ func (p *peer) Start() error {
// Start processing input and output.
go p.inHandler()
// queueWg is kept so that outHandler knows when the queue has exited so
// it can drain correctly.
p.queueWg.Add(1)
go p.queueHandler()
go p.outHandler()
return nil
@ -1337,8 +1461,9 @@ func newPeerBase(s *server, inbound bool) *peer {
requestedTxns: make(map[btcwire.ShaHash]bool),
requestedBlocks: make(map[btcwire.ShaHash]bool),
requestQueue: list.New(),
invSendQueue: list.New(),
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync.
sendDoneQueue: make(chan bool, 1), // nonblocking sync.
outputInvChan: make(chan *btcwire.InvVect, outputBufferSize),
txProcessed: make(chan bool, 1),
blockProcessed: make(chan bool, 1),