Correct total byte counters for server.

Previously the getnettotals was just looping through all of the currently
connected peers to sum the byte counts and returning that.  However, the
intention of the getnettotals RPC is to get all bytes since the server was
started, so this logic was not correct.

This commit modifies the code to keep an atomic counter on the server for
bytes read/written and has each peer update the server counters as well as
the per-peer counters.
This commit is contained in:
Dave Collins 2014-02-05 11:09:45 -06:00
parent 5d70935b04
commit a39f4a0698
3 changed files with 19 additions and 35 deletions

View file

@ -957,9 +957,13 @@ func (p *peer) readMessage() (btcwire.Message, []byte, error) {
n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.protocolVersion, p.btcnet) n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.protocolVersion, p.btcnet)
if err != nil { if err != nil {
p.bytesReceived += uint64(n) p.bytesReceived += uint64(n)
p.server.bytesReceived = atomic.AddUint64(&p.server.bytesReceived,
uint64(n))
return nil, nil, err return nil, nil, err
} }
p.bytesReceived += uint64(n) p.bytesReceived += uint64(n)
p.server.bytesReceived = atomic.AddUint64(&p.server.bytesReceived,
uint64(n))
// Use closures to log expensive operations so they are only run when // Use closures to log expensive operations so they are only run when
// the logging level requires it. // the logging level requires it.
@ -1026,11 +1030,14 @@ func (p *peer) writeMessage(msg btcwire.Message) {
n, err := btcwire.WriteMessageN(p.conn, msg, p.protocolVersion, p.btcnet) n, err := btcwire.WriteMessageN(p.conn, msg, p.protocolVersion, p.btcnet)
if err != nil { if err != nil {
p.bytesSent += uint64(n) p.bytesSent += uint64(n)
p.server.bytesSent = atomic.AddUint64(&p.server.bytesSent,
uint64(n))
p.Disconnect() p.Disconnect()
p.logError("Can't send message: %v", err) p.logError("Can't send message: %v", err)
return return
} }
p.bytesSent += uint64(n) p.bytesSent += uint64(n)
p.server.bytesSent = atomic.AddUint64(&p.server.bytesSent, uint64(n))
} }
// isAllowedByRegression returns whether or not the passed error is allowed by // isAllowedByRegression returns whether or not the passed error is allowed by

View file

@ -980,10 +980,10 @@ func handleGetInfo(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
// handleGetNetTotals implements the getnettotals command. // handleGetNetTotals implements the getnettotals command.
func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) { func handleGetNetTotals(s *rpcServer, cmd btcjson.Cmd) (interface{}, error) {
netTotals := s.server.NetTotals() totalBytesRecv, totalBytesSent := s.server.NetTotals()
reply := &btcjson.GetNetTotalsResult{ reply := &btcjson.GetNetTotalsResult{
TotalBytesRecv: netTotals.TotalBytesRecv, TotalBytesRecv: totalBytesRecv,
TotalBytesSent: netTotals.TotalBytesSent, TotalBytesSent: totalBytesSent,
TimeMillis: time.Now().UTC().UnixNano() / 1000, TimeMillis: time.Now().UTC().UnixNano() / 1000,
} }
return reply, nil return reply, nil

View file

@ -51,9 +51,11 @@ type server struct {
nonce uint64 nonce uint64
listeners []net.Listener listeners []net.Listener
btcnet btcwire.BitcoinNet btcnet btcwire.BitcoinNet
started int32 // atomic started int32 // atomic
shutdown int32 // atomic shutdown int32 // atomic
shutdownSched int32 // atomic shutdownSched int32 // atomic
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
addrManager *AddrManager addrManager *AddrManager
rpcServer *rpcServer rpcServer *rpcServer
blockManager *blockManager blockManager *blockManager
@ -300,19 +302,6 @@ type getAddedNodesMsg struct {
reply chan []*peer reply chan []*peer
} }
// NetTotals contains information about the total bytes received and sent across
// the network.
type NetTotals struct {
TotalBytesRecv uint64
TotalBytesSent uint64
}
// getNetTotals is a message type to be sent across the query channel for
// retrieving the current total bytes sent and received from all peers.
type getNetTotals struct {
reply chan *NetTotals
}
// handleQuery is the central handler for all queries and commands from other // handleQuery is the central handler for all queries and commands from other
// goroutines related to peer state. // goroutines related to peer state.
func (s *server) handleQuery(querymsg interface{}, state *peerState) { func (s *server) handleQuery(querymsg interface{}, state *peerState) {
@ -416,18 +405,6 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
peers = append(peers, peer) peers = append(peers, peer)
} }
msg.reply <- peers msg.reply <- peers
// Request the total bytes sent and received.
case getNetTotals:
// Respond with a ....
netTotals := NetTotals{}
state.forAllPeers(func(p *peer) {
if p.Connected() {
netTotals.TotalBytesRecv += p.bytesReceived
netTotals.TotalBytesSent += p.bytesSent
}
})
msg.reply <- &netTotals
} }
} }
@ -721,10 +698,10 @@ func (s *server) RemoveAddr(addr string) error {
// NetTotals returns the sum of all bytes received and sent across the network // NetTotals returns the sum of all bytes received and sent across the network
// for all peers. // for all peers.
func (s *server) NetTotals() *NetTotals { func (s *server) NetTotals() (uint64, uint64) {
reply := make(chan *NetTotals) totalBytesReceived := atomic.LoadUint64(&s.bytesReceived)
s.query <- getNetTotals{reply: reply} totalBytesSent := atomic.LoadUint64(&s.bytesSent)
return <-reply return totalBytesReceived, totalBytesSent
} }
// Start begins accepting connections from peers. // Start begins accepting connections from peers.