diff --git a/peer/peer.go b/peer/peer.go index d6226efb..64ab4457 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -387,10 +387,14 @@ type HostToNetAddrFunc func(host string, port uint16, // of specific types that typically require common special handling are // provided as a convenience. type Peer struct { - started int32 - connected int32 - disconnect int32 // only to be used atomically - conn net.Conn + // The following variables must only be used atomically + started int32 + connected int32 + disconnect int32 + bytesReceived uint64 + bytesSent uint64 + + conn net.Conn // These fields are set at creation time and never modified, so they are // safe to read from concurrently without a mutex. @@ -423,8 +427,6 @@ type Peer struct { timeConnected time.Time lastSend time.Time lastRecv time.Time - bytesReceived uint64 - bytesSent uint64 startingHeight int32 lastBlock int32 lastAnnouncedBlock *wire.ShaHash @@ -505,8 +507,8 @@ func (p *Peer) StatsSnapshot() *StatsSnap { Services: services, LastSend: p.lastSend, LastRecv: p.lastRecv, - BytesSent: p.bytesSent, - BytesRecv: p.bytesReceived, + BytesSent: atomic.LoadUint64(&p.bytesSent), + BytesRecv: atomic.LoadUint64(&p.bytesReceived), ConnTime: p.timeConnected, TimeOffset: p.timeOffset, Version: protocolVersion, @@ -681,20 +683,14 @@ func (p *Peer) LastRecv() time.Time { // // This function is safe for concurrent access. func (p *Peer) BytesSent() uint64 { - p.statsMtx.RLock() - defer p.statsMtx.RUnlock() - - return p.bytesSent + return atomic.LoadUint64(&p.bytesSent) } // BytesReceived returns the total number of bytes received by the peer. // // This function is safe for concurrent access. func (p *Peer) BytesReceived() uint64 { - p.statsMtx.RLock() - defer p.statsMtx.RUnlock() - - return p.bytesReceived + return atomic.LoadUint64(&p.bytesReceived) } // TimeConnected returns the time at which the peer connected. @@ -1108,9 +1104,7 @@ func (p *Peer) handlePongMsg(msg *wire.MsgPong) { func (p *Peer) readMessage() (wire.Message, []byte, error) { n, msg, buf, err := wire.ReadMessageN(p.conn, p.ProtocolVersion(), p.cfg.ChainParams.Net) - p.statsMtx.Lock() - p.bytesReceived += uint64(n) - p.statsMtx.Unlock() + atomic.AddUint64(&p.bytesReceived, uint64(n)) if p.cfg.Listeners.OnRead != nil { p.cfg.Listeners.OnRead(p, n, msg, err) } @@ -1185,9 +1179,7 @@ func (p *Peer) writeMessage(msg wire.Message) error { // Write the message to the peer. n, err := wire.WriteMessageN(p.conn, msg, p.ProtocolVersion(), p.cfg.ChainParams.Net) - p.statsMtx.Lock() - p.bytesSent += uint64(n) - p.statsMtx.Unlock() + atomic.AddUint64(&p.bytesSent, uint64(n)) if p.cfg.Listeners.OnWrite != nil { p.cfg.Listeners.OnWrite(p, n, msg, err) } diff --git a/server.go b/server.go index dff3b23c..0bb6cfa5 100644 --- a/server.go +++ b/server.go @@ -166,14 +166,15 @@ func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) { // server provides a bitcoin server for handling communications to and from // bitcoin peers. type server struct { + // The following variables must only be used atomically. + started int32 + shutdown int32 + shutdownSched int32 + bytesReceived uint64 // Total bytes received from all peers since start. + bytesSent uint64 // Total bytes sent by all peers since start. + listeners []net.Listener chainParams *chaincfg.Params - started int32 // atomic - shutdown int32 // atomic - shutdownSched int32 // atomic - bytesMutex sync.Mutex // For the following two fields. - bytesReceived uint64 // Total bytes received from all peers since start. - bytesSent uint64 // Total bytes sent by all peers since start. addrManager *addrmgr.AddrManager sigCache *txscript.SigCache rpcServer *rpcServer @@ -1870,28 +1871,20 @@ func (s *server) ConnectNode(addr string, permanent bool) error { // AddBytesSent adds the passed number of bytes to the total bytes sent counter // for the server. It is safe for concurrent access. func (s *server) AddBytesSent(bytesSent uint64) { - s.bytesMutex.Lock() - defer s.bytesMutex.Unlock() - - s.bytesSent += bytesSent + atomic.AddUint64(&s.bytesSent, bytesSent) } // AddBytesReceived adds the passed number of bytes to the total bytes received // counter for the server. It is safe for concurrent access. func (s *server) AddBytesReceived(bytesReceived uint64) { - s.bytesMutex.Lock() - defer s.bytesMutex.Unlock() - - s.bytesReceived += bytesReceived + atomic.AddUint64(&s.bytesReceived, bytesReceived) } // NetTotals returns the sum of all bytes received and sent across the network // for all peers. It is safe for concurrent access. func (s *server) NetTotals() (uint64, uint64) { - s.bytesMutex.Lock() - defer s.bytesMutex.Unlock() - - return s.bytesReceived, s.bytesSent + return atomic.LoadUint64(&s.bytesReceived), + atomic.LoadUint64(&s.bytesSent) } // UpdatePeerHeights updates the heights of all peers who have have announced