diff --git a/peer.go b/peer.go index bb25bc07..68d8adfd 100644 --- a/peer.go +++ b/peer.go @@ -130,14 +130,16 @@ type peer struct { prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager. prevGetBlocksStop *btcwire.ShaHash // owned by blockmaanger. requestQueue *list.List - invSendQueue *list.List continueHash *btcwire.ShaHash outputQueue chan outMsg - outputInvChan chan *btcwire.InvVect - txProcessed chan bool - blockProcessed chan bool - quit chan bool - userAgent string + sendQueue chan outMsg + sendDoneQueue chan bool + queueWg sync.WaitGroup // TODO(oga) wg -> single use channel? + outputInvChan chan *btcwire.InvVect + txProcessed chan bool + blockProcessed chan bool + quit chan bool + userAgent string } // String returns the peer's address and directionality as a human-readable @@ -322,7 +324,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction hash is not known. -func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error { +func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { // Attempt to fetch the requested transaction from the pool. A // call could be made to check for existence first, but simply trying // to fetch a missing transaction results in the same behavior. @@ -332,6 +334,12 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error { "pool: %v", sha, err) return err } + + // Once we have fetched data wait for any previous operation to finish. + if waitChan != nil { + <-waitChan + } + p.QueueMessage(tx.MsgTx(), doneChan) return nil @@ -339,21 +347,7 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error { // pushBlockMsg sends a block message for the provided block hash to the // connected peer. An error is returned if the block hash is not known. -func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan chan bool) error { - // What should this function do about the rate limiting the - // number of blocks queued for this peer? - // Current thought is have a counting mutex in the peer - // such that if > N Tx/Block requests are currently in - // the tx queue, wait until the mutex clears allowing more to be - // sent. This prevents 500 1+MB blocks from being loaded into - // memory and sit around until the output queue drains. - // Actually the outputQueue has a limit of 50 in its queue - // but still 50MB to 1.6GB(50 32MB blocks) just setting - // in memory waiting to be sent is pointless. - // I would recommend a getdata request limit of about 5 - // outstanding objects. - // Should the tx complete api be a mutex or channel? - +func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan, waitChan chan bool) error { blk, err := p.server.db.FetchBlockBySha(sha) if err != nil { peerLog.Tracef("Unable to fetch requested block sha %v: %v", @@ -361,6 +355,11 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan chan bool) error { return err } + // Once we have fetched data wait for any previous operation to finish. + if waitChan != nil { + <-waitChan + } + // We only send the channel for this message if we aren't sending // an inv straight after. var dc chan bool @@ -570,6 +569,11 @@ func (p *peer) handleHeadersMsg(msg *btcwire.MsgHeaders) { func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { notFound := btcwire.NewMsgNotFound() + // We wait on the this wait channel periodically to prevent queueing + // far more data than we can send in a reasonable time, wasting memory. + // The waiting occurs after the database fetch for the next one to + // provide a little pipelining. + var waitChan chan bool doneChan := make(chan bool) out: for i, iv := range msg.InvList { @@ -577,13 +581,16 @@ out: // If this will be the last message we send. if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 { c = doneChan + } else if i > 0 && i+1%3 == 0 { + // buffered so as to not make the send goroutine block. + c = make(chan bool, 1) } var err error switch iv.Type { case btcwire.InvTypeTx: - err = p.pushTxMsg(&iv.Hash, c) + err = p.pushTxMsg(&iv.Hash, c, waitChan) case btcwire.InvTypeBlock: - err = p.pushBlockMsg(&iv.Hash, c) + err = p.pushBlockMsg(&iv.Hash, c, waitChan) default: peerLog.Warnf("Unknown type in inventory request %d", iv.Type) @@ -592,6 +599,7 @@ out: if err != nil { notFound.AddInvVect(iv) } + waitChan = c } if len(notFound.InvList) != 0 { p.QueueMessage(notFound, doneChan) @@ -1117,6 +1125,7 @@ out: // Ensure connection is closed and notify server and block manager that // the peer is done. p.Disconnect() + p.server.donePeers <- p // Only tell blockmanager we are gone if we ever told it we existed. if p.versionKnown { @@ -1126,11 +1135,136 @@ out: peerLog.Tracef("Peer input handler done for %s", p.addr) } +// queueHandler handles the queueing of outgoing data for the peer. This runs +// as a muxer for various sources of input so we can ensure that blockmanager +// and the server goroutine both will not block on us sending a message. +// We then pass the data on to outHandler to be actually written. +func (p *peer) queueHandler() { + pendingMsgs := list.New() + invSendQueue := list.New() + trickleTicker := time.NewTicker(time.Second * 10) + + // We keep the waiting flag so that we know if we have a message queued + // to the outHandler or not. We could use the presence of a head + // of the list for this but then we have rather racy concerns about + // whether it has gotten it at cleanup time - and thus who sends on the + // message's done channel. To avoid such confusion we keep a different + // flag and pendingMsgs only contains messages that we have not yet + // passed to outHandler. + waiting := false + + // To avoid duplication below. + queuePacket := func(msg outMsg, list *list.List, waiting bool) bool { + if !waiting { + peerLog.Tracef("%s: sending to outHandler", p) + p.sendQueue <- msg + peerLog.Tracef("%s: sent to outHandler", p) + } else { + list.PushBack(msg) + } + // we are always waiting now. + return true + } +out: + for { + select { + case msg := <-p.outputQueue: + waiting = queuePacket(msg, pendingMsgs, waiting) + + case <-p.sendDoneQueue: + peerLog.Tracef("%s: acked by outhandler", p) + // one message on sendChan means one message on + // txDoneChan when it has been sent. + next := pendingMsgs.Front() + if next != nil { + val := pendingMsgs.Remove(next) + peerLog.Tracef("%s: sending to outHandler", p) + p.sendQueue <- val.(outMsg) + peerLog.Tracef("%s: sent to outHandler", p) + } else { + waiting = false + } + + case iv := <-p.outputInvChan: + // No handshake? They'll find out soon enough. + if p.versionKnown { + invSendQueue.PushBack(iv) + } + + case <-trickleTicker.C: + // Don't send anything if we're disconnecting or there + // is no queued inventory. + // version is known if send queue has any entries. + if atomic.LoadInt32(&p.disconnect) != 0 || + invSendQueue.Len() == 0 { + continue + } + + // Create and send as many inv messages as needed to + // drain the inventory send queue. + invMsg := btcwire.NewMsgInv() + for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() { + iv := invSendQueue.Remove(e).(*btcwire.InvVect) + + // Don't send inventory that became known after + // the initial check. + if p.isKnownInventory(iv) { + continue + } + + invMsg.AddInvVect(iv) + if len(invMsg.InvList) >= maxInvTrickleSize { + waiting = queuePacket( + outMsg{msg: invMsg}, + pendingMsgs, waiting) + invMsg = btcwire.NewMsgInv() + } + + // Add the inventory that is being relayed to + // the known inventory for the peer. + p.addKnownInventory(iv) + } + if len(invMsg.InvList) > 0 { + waiting = queuePacket(outMsg{msg: invMsg}, + pendingMsgs, waiting) + } + + case <-p.quit: + break out + } + } + + // Drain any wait channels before we go away so we don't leave something + // waiting for us. + for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() { + val := pendingMsgs.Remove(e) + msg := val.(outMsg) + if msg.doneChan != nil { + msg.doneChan <- false + } + } +cleanup: + for { + select { + case msg := <-p.outputQueue: + if msg.doneChan != nil { + msg.doneChan <- false + } + case <-p.outputInvChan: + // Just drain channel + // sendDoneQueue is buffered so doesn't need draining. + default: + break cleanup + } + } + p.queueWg.Done() + peerLog.Tracef("Peer queue handler done for %s", p.addr) +} + // outHandler handles all outgoing messages for the peer. It must be run as a // goroutine. It uses a buffered channel to serialize output messages while // allowing the sender to continue running asynchronously. func (p *peer) outHandler() { - trickleTicker := time.NewTicker(time.Second * 10) pingTimer := time.AfterFunc(pingTimeoutMinutes*time.Minute, func() { nonce, err := btcwire.RandomUint64() if err != nil { @@ -1143,7 +1277,7 @@ func (p *peer) outHandler() { out: for { select { - case msg := <-p.outputQueue: + case msg := <-p.sendQueue: // If the message is one we should get a reply for // then reset the timer, we only want to send pings // when otherwise we would not recieve a reply from @@ -1152,6 +1286,7 @@ out: // the inv is of no interest explicitly solicited invs // should elicit a reply but we don't track them // specially. + peerLog.Tracef("%s: recieved from queuehandler", p) reset := true switch msg.msg.(type) { case *btcwire.MsgVersion: @@ -1181,47 +1316,9 @@ out: if msg.doneChan != nil { msg.doneChan <- true } - - case iv := <-p.outputInvChan: - // No handshake? They'll find out soon enough. - if p.versionKnown { - p.invSendQueue.PushBack(iv) - } - - case <-trickleTicker.C: - // Don't send anything if we're disconnecting or there - // is no queued inventory. - if atomic.LoadInt32(&p.disconnect) != 0 || - p.invSendQueue.Len() == 0 || - !p.versionKnown { - continue - } - - // Create and send as many inv messages as needed to - // drain the inventory send queue. - invMsg := btcwire.NewMsgInv() - for e := p.invSendQueue.Front(); e != nil; e = p.invSendQueue.Front() { - iv := p.invSendQueue.Remove(e).(*btcwire.InvVect) - - // Don't send inventory that became known after - // the initial check. - if p.isKnownInventory(iv) { - continue - } - - invMsg.AddInvVect(iv) - if len(invMsg.InvList) >= maxInvTrickleSize { - p.writeMessage(invMsg) - invMsg = btcwire.NewMsgInv() - } - - // Add the inventory that is being relayed to - // the known inventory for the peer. - p.addKnownInventory(iv) - } - if len(invMsg.InvList) > 0 { - p.writeMessage(invMsg) - } + peerLog.Tracef("%s: acking queuehandler", p) + p.sendDoneQueue <- true + peerLog.Tracef("%s: acked queuehandler", p) case <-p.quit: break out @@ -1230,17 +1327,20 @@ out: pingTimer.Stop() + p.queueWg.Wait() + // Drain any wait channels before we go away so we don't leave something - // waiting for us. + // waiting for us. We have waited on queueWg and thus we can be sure + // that we will not miss anything sent on sendQueue. cleanup: for { select { - case msg := <-p.outputQueue: + case msg := <-p.sendQueue: if msg.doneChan != nil { msg.doneChan <- false } - case <-p.outputInvChan: - // Just drain channel + // no need to send on sendDoneQueue since queueHandler + // has been waited on and already exited. default: break cleanup } @@ -1252,6 +1352,18 @@ cleanup: // uses a buffered channel to communicate with the output handler goroutine so // it is automatically rate limited and safe for concurrent access. func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) { + // Avoid risk of deadlock if goroutine already exited. The goroutine + // we will be sending to hangs around until it knows for a fact that + // it is marked as disconnected. *then* it drains the channels. + if !p.Connected() { + // avoid deadlock... + if doneChan != nil { + go func() { + doneChan <- false + }() + } + return + } p.outputQueue <- outMsg{msg: msg, doneChan: doneChan} } @@ -1266,6 +1378,13 @@ func (p *peer) QueueInventory(invVect *btcwire.InvVect) { return } + // Avoid risk of deadlock if goroutine already exited. The goroutine + // we will be sending to hangs around until it knows for a fact that + // it is marked as disconnected. *then* it drains the channels. + if !p.Connected() { + return + } + p.outputInvChan <- invVect } @@ -1282,6 +1401,7 @@ func (p *peer) Disconnect() { if atomic.AddInt32(&p.disconnect, 1) != 1 { return } + peerLog.Tracef("disconnecting %s", p.addr) close(p.quit) if atomic.LoadInt32(&p.connected) != 0 { p.conn.Close() @@ -1311,6 +1431,10 @@ func (p *peer) Start() error { // Start processing input and output. go p.inHandler() + // queueWg is kept so that outHandler knows when the queue has exited so + // it can drain correctly. + p.queueWg.Add(1) + go p.queueHandler() go p.outHandler() return nil @@ -1337,8 +1461,9 @@ func newPeerBase(s *server, inbound bool) *peer { requestedTxns: make(map[btcwire.ShaHash]bool), requestedBlocks: make(map[btcwire.ShaHash]bool), requestQueue: list.New(), - invSendQueue: list.New(), outputQueue: make(chan outMsg, outputBufferSize), + sendQueue: make(chan outMsg, 1), // nonblocking sync. + sendDoneQueue: make(chan bool, 1), // nonblocking sync. outputInvChan: make(chan *btcwire.InvVect, outputBufferSize), txProcessed: make(chan bool, 1), blockProcessed: make(chan bool, 1),