Add an idle timer for peers.

If we don't hear from a peer for 5 minutes, we disconnect them. To keep
traffic flowing we send a ping every 2 minutes if we have not send any
other message that should get a reply.
This commit is contained in:
Owain G. Ainsworth 2013-10-16 15:49:09 +01:00
parent bc89dedf9a
commit 5a9cc91e62
3 changed files with 120 additions and 26 deletions

View file

@ -543,7 +543,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) {
}
}
if len(gdmsg.InvList) > 0 {
imsg.peer.QueueMessage(gdmsg)
imsg.peer.QueueMessage(gdmsg, nil)
}
}

142
peer.go
View file

@ -32,6 +32,14 @@ const (
// maxKnownInventory is the maximum number of items to keep in the known
// inventory cache.
maxKnownInventory = 20000
// idleTimeoutMinutes is the number of minutes of inactivity before
// we time out a peer.
idleTimeoutMinutes = 5
// pingTimeoutMinutes is the number of minutes since we last sent a
// message requiring a reply before we will ping a host.
pingTimeoutMinutes = 2
)
// userAgent is the user agent string used to identify ourselves to other
@ -91,7 +99,7 @@ func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAdd
type outMsg struct {
msg btcwire.Message
doneChan chan bool
doneChan chan bool
}
// peer provides a bitcoin peer for handling bitcoin communications.
@ -203,7 +211,7 @@ func (p *peer) pushVersionMsg() error {
// Advertise that we're a full node.
msg.Services = btcwire.SFNodeNetwork
p.QueueMessage(msg)
p.QueueMessage(msg, nil)
return nil
}
@ -259,7 +267,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
p.na = na
// Send verack.
p.QueueMessage(btcwire.NewMsgVerAck())
p.QueueMessage(btcwire.NewMsgVerAck(), nil)
// Outbound connections.
if !p.inbound {
@ -283,7 +291,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// include a timestamp with addresses.
hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion
if p.server.addrManager.NeedMoreAddresses() && hasTimestamp {
p.QueueMessage(btcwire.NewMsgGetAddr())
p.QueueMessage(btcwire.NewMsgGetAddr(), nil)
}
// Mark the address as a known good address.
@ -307,7 +315,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction hash is not known.
func (p *peer) pushTxMsg(sha *btcwire.ShaHash) error {
func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error {
// Attempt to fetch the requested transaction from the pool. A
// call could be made to check for existence first, but simply trying
// to fetch a missing transaction results in the same behavior.
@ -317,14 +325,14 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash) error {
"pool: %v", sha, err)
return err
}
p.QueueMessage(tx)
p.QueueMessage(tx, doneChan)
return nil
}
// pushBlockMsg sends a block message for the provided block hash to the
// connected peer. An error is returned if the block hash is not known.
func (p *peer) pushBlockMsg(sha *btcwire.ShaHash) error {
func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan chan bool) error {
// What should this function do about the rate limiting the
// number of blocks queued for this peer?
// Current thought is have a counting mutex in the peer
@ -345,7 +353,15 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash) error {
sha, err)
return err
}
p.QueueMessage(blk.MsgBlock())
// We only send the channel for this message if we aren't sending
// an inv straight after.
var dc chan bool
sendInv := p.continueHash != nil && p.continueHash.IsEqual(sha)
if !sendInv {
dc = doneChan
}
p.QueueMessage(blk.MsgBlock(), dc)
// When the peer requests the final block that was advertised in
// response to a getblocks message which requested more blocks than
@ -358,8 +374,13 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash) error {
invMsg := btcwire.NewMsgInv()
iv := btcwire.NewInvVect(btcwire.InvTypeBlock, hash)
invMsg.AddInvVect(iv)
p.QueueMessage(invMsg)
p.QueueMessage(invMsg, doneChan)
p.continueHash = nil
} else if doneChan != nil {
// Avoid deadlock when caller waits on channel.
go func() {
doneChan <- false
}()
}
}
return nil
@ -394,7 +415,7 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire
return err
}
}
p.QueueMessage(msg)
p.QueueMessage(msg, nil)
// Update the previous getblocks request information for filtering
// duplicates.
@ -422,7 +443,7 @@ func (p *peer) handleMemPoolMsg(msg *btcwire.MsgMemPool) {
// Send the inventory message if there is anything to send.
if len(invMsg.InvList) > 0 {
p.QueueMessage(invMsg)
p.QueueMessage(invMsg, nil)
}
}
@ -494,14 +515,20 @@ func (p *peer) handleInvMsg(msg *btcwire.MsgInv) {
func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) {
notFound := btcwire.NewMsgNotFound()
doneChan := make(chan bool)
out:
for _, iv := range msg.InvList {
for i, iv := range msg.InvList {
var c chan bool
// If this will be the last message we send.
if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 {
c = doneChan
}
var err error
switch iv.Type {
case btcwire.InvTypeTx:
err = p.pushTxMsg(&iv.Hash)
err = p.pushTxMsg(&iv.Hash, c)
case btcwire.InvTypeBlock:
err = p.pushBlockMsg(&iv.Hash)
err = p.pushBlockMsg(&iv.Hash, c)
default:
log.Warnf("PEER: Unknown type in inventory request %d",
iv.Type)
@ -512,8 +539,15 @@ out:
}
}
if len(notFound.InvList) != 0 {
p.QueueMessage(notFound)
p.QueueMessage(notFound, doneChan)
}
// Wait for messages to be sent. We can send quite a lot of data at this
// point and this will keep the peer busy for a decent amount of time.
// We don't process anything else by them in this time so that we
// have an idea of when we should hear back from them - else the idle
// timeout could fire when we were only half done sending the blocks.
<-doneChan
}
// handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message.
@ -592,11 +626,11 @@ func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) {
continueHash := invMsg.InvList[invListLen-1].Hash
p.continueHash = &continueHash
}
p.QueueMessage(invMsg)
p.QueueMessage(invMsg, nil)
}
}
// handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin
// handleGetHeadersMsg is invoked when a peer receives a getheaders bitcoin
// message.
func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
// Attempt to look up the height of the provided stop hash.
@ -621,7 +655,7 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
hdr := block.MsgBlock().Header // copy
hdr.TxnCount = 0
headersMsg.AddBlockHeader(&hdr)
p.QueueMessage(headersMsg)
p.QueueMessage(headersMsg, nil)
return
}
@ -683,7 +717,7 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) {
// next loop iteration.
start += int64(len(hashList))
}
p.QueueMessage(headersMsg)
p.QueueMessage(headersMsg, nil)
}
// handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message
@ -727,14 +761,14 @@ func (p *peer) pushAddrMsg(addresses []*btcwire.NetAddress) error {
// Split into multiple messages as needed.
if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 {
p.QueueMessage(msg)
p.QueueMessage(msg, nil)
msg.ClearAddresses()
}
}
// Send message with remaining addresses if needed.
if numAdded%btcwire.MaxAddrPerMsg != 0 {
p.QueueMessage(msg)
p.QueueMessage(msg, nil)
}
return nil
}
@ -789,7 +823,7 @@ func (p *peer) handlePingMsg(msg *btcwire.MsgPing) {
// Only Reply with pong is message comes from a new enough client.
if p.protocolVersion > btcwire.BIP0031Version {
// Include nonce from ping so pong can be identified.
p.QueueMessage(btcwire.NewMsgPong(msg.Nonce))
p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil)
}
}
@ -899,9 +933,20 @@ func (p *peer) isAllowedByRegression(err error) bool {
// inHandler handles all incoming messages for the peer. It must be run as a
// goroutine.
func (p *peer) inHandler() {
idleTimer := time.AfterFunc(idleTimeoutMinutes*time.Minute, func() {
// XXX technically very very very slightly racy, doesn't really
// matter.
if p.versionKnown {
log.Warnf("Peer %s no answer for %d minutes, "+
"disconnecting", idleTimeoutMinutes, p)
}
p.Disconnect()
})
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
rmsg, buf, err := p.readMessage()
// Stop the timer now, if we go around again we will reset it.
idleTimer.Stop()
if err != nil {
// In order to allow regression tests with malformed
// messages, don't disconnect the peer when we're in
@ -910,6 +955,7 @@ out:
if cfg.RegressionTest && p.isAllowedByRegression(err) {
log.Errorf("PEER: Allowed regression test "+
"error: %v", err)
idleTimer.Reset(idleTimeoutMinutes * time.Minute)
continue
}
@ -998,8 +1044,13 @@ out:
}
p.server.addrManager.Connected(p.na)
}
// ok we got a message, reset the timer.
// timer just calls p.Disconnect() after logging.
idleTimer.Reset(idleTimeoutMinutes * time.Minute)
}
idleTimer.Stop()
// Ensure connection is closed and notify server and block manager that
// the peer is done.
p.Disconnect()
@ -1017,10 +1068,51 @@ out:
// allowing the sender to continue running asynchronously.
func (p *peer) outHandler() {
trickleTicker := time.NewTicker(time.Second * 10)
pingTimer := time.AfterFunc(pingTimeoutMinutes*time.Minute, func() {
nonce, err := btcwire.RandomUint64()
if err != nil {
log.Errorf("Not sending ping on timeout to %s: %v",
p, err)
return
}
p.QueueMessage(btcwire.NewMsgPing(nonce), nil)
})
out:
for {
select {
case msg := <-p.outputQueue:
// If the message is one we should get a reply for
// then reset the timer, we only want to send pings
// when otherwise we would not recieve a reply from
// the peer. We specifically do not count block or inv
// messages here since they are not sure of a reply if
// the inv is of no interest explicitly solicited invs
// should elicit a reply but we don't track them
// specially.
reset := true
switch msg.msg.(type) {
case *btcwire.MsgVersion:
// should get an ack
case *btcwire.MsgGetAddr:
// should get addresses
case *btcwire.MsgPing:
// expects pong
case *btcwire.MsgMemPool:
// Should return an inv.
case *btcwire.MsgGetData:
// Should get us block, tx, or not found.
case *btcwire.MsgGetHeaders:
// Should get us headers back.
default:
// Not one of the above, no sure reply.
// We want to ping if nothing else
// interesting happens.
reset = false
}
if reset {
pingTimer.Reset(pingTimeoutMinutes * time.Minute)
}
p.writeMessage(msg.msg)
if msg.doneChan != nil {
msg.doneChan <- true
@ -1072,6 +1164,8 @@ out:
}
}
pingTimer.Stop()
// Drain any wait channels before we go away so we don't leave something
// waiting for us.
cleanup:
@ -1091,8 +1185,8 @@ cleanup:
// QueueMessage adds the passed bitcoin message to the peer send queue. It
// uses a buffered channel to communicate with the output handler goroutine so
// it is automatically rate limited and safe for concurrent access.
func (p *peer) QueueMessage(msg btcwire.Message) {
p.outputQueue <- outMsg{msg: msg}
func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) {
p.outputQueue <- outMsg{msg: msg, doneChan: doneChan}
}
// QueueInventory adds the passed inventory to the inventory send queue which

View file

@ -193,7 +193,7 @@ func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) {
excluded = true
}
if !excluded {
p.QueueMessage(bmsg.message)
p.QueueMessage(bmsg.message, nil)
}
}
}