diff --git a/blockmanager.go b/blockmanager.go index 65e3e221..53e17ca1 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -543,7 +543,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { } } if len(gdmsg.InvList) > 0 { - imsg.peer.QueueMessage(gdmsg) + imsg.peer.QueueMessage(gdmsg, nil) } } diff --git a/peer.go b/peer.go index 844b2e1f..f14a5506 100644 --- a/peer.go +++ b/peer.go @@ -32,6 +32,14 @@ const ( // maxKnownInventory is the maximum number of items to keep in the known // inventory cache. maxKnownInventory = 20000 + + // idleTimeoutMinutes is the number of minutes of inactivity before + // we time out a peer. + idleTimeoutMinutes = 5 + + // pingTimeoutMinutes is the number of minutes since we last sent a + // message requiring a reply before we will ping a host. + pingTimeoutMinutes = 2 ) // userAgent is the user agent string used to identify ourselves to other @@ -91,7 +99,7 @@ func newNetAddress(addr net.Addr, services btcwire.ServiceFlag) (*btcwire.NetAdd type outMsg struct { msg btcwire.Message - doneChan chan bool + doneChan chan bool } // peer provides a bitcoin peer for handling bitcoin communications. @@ -203,7 +211,7 @@ func (p *peer) pushVersionMsg() error { // Advertise that we're a full node. msg.Services = btcwire.SFNodeNetwork - p.QueueMessage(msg) + p.QueueMessage(msg, nil) return nil } @@ -259,7 +267,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { p.na = na // Send verack. - p.QueueMessage(btcwire.NewMsgVerAck()) + p.QueueMessage(btcwire.NewMsgVerAck(), nil) // Outbound connections. if !p.inbound { @@ -283,7 +291,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // include a timestamp with addresses. hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { - p.QueueMessage(btcwire.NewMsgGetAddr()) + p.QueueMessage(btcwire.NewMsgGetAddr(), nil) } // Mark the address as a known good address. @@ -307,7 +315,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction hash is not known. -func (p *peer) pushTxMsg(sha *btcwire.ShaHash) error { +func (p *peer) pushTxMsg(sha *btcwire.ShaHash, doneChan chan bool) error { // Attempt to fetch the requested transaction from the pool. A // call could be made to check for existence first, but simply trying // to fetch a missing transaction results in the same behavior. @@ -317,14 +325,14 @@ func (p *peer) pushTxMsg(sha *btcwire.ShaHash) error { "pool: %v", sha, err) return err } - p.QueueMessage(tx) + p.QueueMessage(tx, doneChan) return nil } // pushBlockMsg sends a block message for the provided block hash to the // connected peer. An error is returned if the block hash is not known. -func (p *peer) pushBlockMsg(sha *btcwire.ShaHash) error { +func (p *peer) pushBlockMsg(sha *btcwire.ShaHash, doneChan chan bool) error { // What should this function do about the rate limiting the // number of blocks queued for this peer? // Current thought is have a counting mutex in the peer @@ -345,7 +353,15 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash) error { sha, err) return err } - p.QueueMessage(blk.MsgBlock()) + + // We only send the channel for this message if we aren't sending + // an inv straight after. + var dc chan bool + sendInv := p.continueHash != nil && p.continueHash.IsEqual(sha) + if !sendInv { + dc = doneChan + } + p.QueueMessage(blk.MsgBlock(), dc) // When the peer requests the final block that was advertised in // response to a getblocks message which requested more blocks than @@ -358,8 +374,13 @@ func (p *peer) pushBlockMsg(sha *btcwire.ShaHash) error { invMsg := btcwire.NewMsgInv() iv := btcwire.NewInvVect(btcwire.InvTypeBlock, hash) invMsg.AddInvVect(iv) - p.QueueMessage(invMsg) + p.QueueMessage(invMsg, doneChan) p.continueHash = nil + } else if doneChan != nil { + // Avoid deadlock when caller waits on channel. + go func() { + doneChan <- false + }() } } return nil @@ -394,7 +415,7 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire return err } } - p.QueueMessage(msg) + p.QueueMessage(msg, nil) // Update the previous getblocks request information for filtering // duplicates. @@ -422,7 +443,7 @@ func (p *peer) handleMemPoolMsg(msg *btcwire.MsgMemPool) { // Send the inventory message if there is anything to send. if len(invMsg.InvList) > 0 { - p.QueueMessage(invMsg) + p.QueueMessage(invMsg, nil) } } @@ -494,14 +515,20 @@ func (p *peer) handleInvMsg(msg *btcwire.MsgInv) { func (p *peer) handleGetDataMsg(msg *btcwire.MsgGetData) { notFound := btcwire.NewMsgNotFound() + doneChan := make(chan bool) out: - for _, iv := range msg.InvList { + for i, iv := range msg.InvList { + var c chan bool + // If this will be the last message we send. + if i == len(msg.InvList)-1 && len(notFound.InvList) == 0 { + c = doneChan + } var err error switch iv.Type { case btcwire.InvTypeTx: - err = p.pushTxMsg(&iv.Hash) + err = p.pushTxMsg(&iv.Hash, c) case btcwire.InvTypeBlock: - err = p.pushBlockMsg(&iv.Hash) + err = p.pushBlockMsg(&iv.Hash, c) default: log.Warnf("PEER: Unknown type in inventory request %d", iv.Type) @@ -512,8 +539,15 @@ out: } } if len(notFound.InvList) != 0 { - p.QueueMessage(notFound) + p.QueueMessage(notFound, doneChan) } + + // Wait for messages to be sent. We can send quite a lot of data at this + // point and this will keep the peer busy for a decent amount of time. + // We don't process anything else by them in this time so that we + // have an idea of when we should hear back from them - else the idle + // timeout could fire when we were only half done sending the blocks. + <-doneChan } // handleGetBlocksMsg is invoked when a peer receives a getdata bitcoin message. @@ -592,11 +626,11 @@ func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) { continueHash := invMsg.InvList[invListLen-1].Hash p.continueHash = &continueHash } - p.QueueMessage(invMsg) + p.QueueMessage(invMsg, nil) } } -// handleGetBlocksMsg is invoked when a peer receives a getheaders bitcoin +// handleGetHeadersMsg is invoked when a peer receives a getheaders bitcoin // message. func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { // Attempt to look up the height of the provided stop hash. @@ -621,7 +655,7 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { hdr := block.MsgBlock().Header // copy hdr.TxnCount = 0 headersMsg.AddBlockHeader(&hdr) - p.QueueMessage(headersMsg) + p.QueueMessage(headersMsg, nil) return } @@ -683,7 +717,7 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { // next loop iteration. start += int64(len(hashList)) } - p.QueueMessage(headersMsg) + p.QueueMessage(headersMsg, nil) } // handleGetAddrMsg is invoked when a peer receives a getaddr bitcoin message @@ -727,14 +761,14 @@ func (p *peer) pushAddrMsg(addresses []*btcwire.NetAddress) error { // Split into multiple messages as needed. if numAdded > 0 && numAdded%btcwire.MaxAddrPerMsg == 0 { - p.QueueMessage(msg) + p.QueueMessage(msg, nil) msg.ClearAddresses() } } // Send message with remaining addresses if needed. if numAdded%btcwire.MaxAddrPerMsg != 0 { - p.QueueMessage(msg) + p.QueueMessage(msg, nil) } return nil } @@ -789,7 +823,7 @@ func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { // Only Reply with pong is message comes from a new enough client. if p.protocolVersion > btcwire.BIP0031Version { // Include nonce from ping so pong can be identified. - p.QueueMessage(btcwire.NewMsgPong(msg.Nonce)) + p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil) } } @@ -899,9 +933,20 @@ func (p *peer) isAllowedByRegression(err error) bool { // inHandler handles all incoming messages for the peer. It must be run as a // goroutine. func (p *peer) inHandler() { + idleTimer := time.AfterFunc(idleTimeoutMinutes*time.Minute, func() { + // XXX technically very very very slightly racy, doesn't really + // matter. + if p.versionKnown { + log.Warnf("Peer %s no answer for %d minutes, "+ + "disconnecting", idleTimeoutMinutes, p) + } + p.Disconnect() + }) out: for atomic.LoadInt32(&p.disconnect) == 0 { rmsg, buf, err := p.readMessage() + // Stop the timer now, if we go around again we will reset it. + idleTimer.Stop() if err != nil { // In order to allow regression tests with malformed // messages, don't disconnect the peer when we're in @@ -910,6 +955,7 @@ out: if cfg.RegressionTest && p.isAllowedByRegression(err) { log.Errorf("PEER: Allowed regression test "+ "error: %v", err) + idleTimer.Reset(idleTimeoutMinutes * time.Minute) continue } @@ -998,8 +1044,13 @@ out: } p.server.addrManager.Connected(p.na) } + // ok we got a message, reset the timer. + // timer just calls p.Disconnect() after logging. + idleTimer.Reset(idleTimeoutMinutes * time.Minute) } + idleTimer.Stop() + // Ensure connection is closed and notify server and block manager that // the peer is done. p.Disconnect() @@ -1017,10 +1068,51 @@ out: // allowing the sender to continue running asynchronously. func (p *peer) outHandler() { trickleTicker := time.NewTicker(time.Second * 10) + pingTimer := time.AfterFunc(pingTimeoutMinutes*time.Minute, func() { + nonce, err := btcwire.RandomUint64() + if err != nil { + log.Errorf("Not sending ping on timeout to %s: %v", + p, err) + return + } + p.QueueMessage(btcwire.NewMsgPing(nonce), nil) + }) out: for { select { case msg := <-p.outputQueue: + // If the message is one we should get a reply for + // then reset the timer, we only want to send pings + // when otherwise we would not recieve a reply from + // the peer. We specifically do not count block or inv + // messages here since they are not sure of a reply if + // the inv is of no interest explicitly solicited invs + // should elicit a reply but we don't track them + // specially. + reset := true + switch msg.msg.(type) { + case *btcwire.MsgVersion: + // should get an ack + case *btcwire.MsgGetAddr: + // should get addresses + case *btcwire.MsgPing: + // expects pong + case *btcwire.MsgMemPool: + // Should return an inv. + case *btcwire.MsgGetData: + // Should get us block, tx, or not found. + case *btcwire.MsgGetHeaders: + // Should get us headers back. + + default: + // Not one of the above, no sure reply. + // We want to ping if nothing else + // interesting happens. + reset = false + } + if reset { + pingTimer.Reset(pingTimeoutMinutes * time.Minute) + } p.writeMessage(msg.msg) if msg.doneChan != nil { msg.doneChan <- true @@ -1072,6 +1164,8 @@ out: } } + pingTimer.Stop() + // Drain any wait channels before we go away so we don't leave something // waiting for us. cleanup: @@ -1091,8 +1185,8 @@ cleanup: // QueueMessage adds the passed bitcoin message to the peer send queue. It // uses a buffered channel to communicate with the output handler goroutine so // it is automatically rate limited and safe for concurrent access. -func (p *peer) QueueMessage(msg btcwire.Message) { - p.outputQueue <- outMsg{msg: msg} +func (p *peer) QueueMessage(msg btcwire.Message, doneChan chan bool) { + p.outputQueue <- outMsg{msg: msg, doneChan: doneChan} } // QueueInventory adds the passed inventory to the inventory send queue which diff --git a/server.go b/server.go index dc3046f8..d7cdb361 100644 --- a/server.go +++ b/server.go @@ -193,7 +193,7 @@ func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { excluded = true } if !excluded { - p.QueueMessage(bmsg.message) + p.QueueMessage(bmsg.message, nil) } } }