Use atomic operations instead of mutexes.

This commit is contained in:
David Hill 2016-01-12 16:54:10 -05:00
parent 16582789c3
commit 383ed041ec
2 changed files with 25 additions and 40 deletions

View file

@ -387,10 +387,14 @@ type HostToNetAddrFunc func(host string, port uint16,
// of specific types that typically require common special handling are
// provided as a convenience.
type Peer struct {
started int32
connected int32
disconnect int32 // only to be used atomically
conn net.Conn
// The following variables must only be used atomically
started int32
connected int32
disconnect int32
bytesReceived uint64
bytesSent uint64
conn net.Conn
// These fields are set at creation time and never modified, so they are
// safe to read from concurrently without a mutex.
@ -423,8 +427,6 @@ type Peer struct {
timeConnected time.Time
lastSend time.Time
lastRecv time.Time
bytesReceived uint64
bytesSent uint64
startingHeight int32
lastBlock int32
lastAnnouncedBlock *wire.ShaHash
@ -505,8 +507,8 @@ func (p *Peer) StatsSnapshot() *StatsSnap {
Services: services,
LastSend: p.lastSend,
LastRecv: p.lastRecv,
BytesSent: p.bytesSent,
BytesRecv: p.bytesReceived,
BytesSent: atomic.LoadUint64(&p.bytesSent),
BytesRecv: atomic.LoadUint64(&p.bytesReceived),
ConnTime: p.timeConnected,
TimeOffset: p.timeOffset,
Version: protocolVersion,
@ -681,20 +683,14 @@ func (p *Peer) LastRecv() time.Time {
//
// This function is safe for concurrent access.
func (p *Peer) BytesSent() uint64 {
p.statsMtx.RLock()
defer p.statsMtx.RUnlock()
return p.bytesSent
return atomic.LoadUint64(&p.bytesSent)
}
// BytesReceived returns the total number of bytes received by the peer.
//
// This function is safe for concurrent access.
func (p *Peer) BytesReceived() uint64 {
p.statsMtx.RLock()
defer p.statsMtx.RUnlock()
return p.bytesReceived
return atomic.LoadUint64(&p.bytesReceived)
}
// TimeConnected returns the time at which the peer connected.
@ -1108,9 +1104,7 @@ func (p *Peer) handlePongMsg(msg *wire.MsgPong) {
func (p *Peer) readMessage() (wire.Message, []byte, error) {
n, msg, buf, err := wire.ReadMessageN(p.conn, p.ProtocolVersion(),
p.cfg.ChainParams.Net)
p.statsMtx.Lock()
p.bytesReceived += uint64(n)
p.statsMtx.Unlock()
atomic.AddUint64(&p.bytesReceived, uint64(n))
if p.cfg.Listeners.OnRead != nil {
p.cfg.Listeners.OnRead(p, n, msg, err)
}
@ -1185,9 +1179,7 @@ func (p *Peer) writeMessage(msg wire.Message) error {
// Write the message to the peer.
n, err := wire.WriteMessageN(p.conn, msg, p.ProtocolVersion(),
p.cfg.ChainParams.Net)
p.statsMtx.Lock()
p.bytesSent += uint64(n)
p.statsMtx.Unlock()
atomic.AddUint64(&p.bytesSent, uint64(n))
if p.cfg.Listeners.OnWrite != nil {
p.cfg.Listeners.OnWrite(p, n, msg, err)
}

View file

@ -166,14 +166,15 @@ func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
// server provides a bitcoin server for handling communications to and from
// bitcoin peers.
type server struct {
// The following variables must only be used atomically.
started int32
shutdown int32
shutdownSched int32
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
listeners []net.Listener
chainParams *chaincfg.Params
started int32 // atomic
shutdown int32 // atomic
shutdownSched int32 // atomic
bytesMutex sync.Mutex // For the following two fields.
bytesReceived uint64 // Total bytes received from all peers since start.
bytesSent uint64 // Total bytes sent by all peers since start.
addrManager *addrmgr.AddrManager
sigCache *txscript.SigCache
rpcServer *rpcServer
@ -1870,28 +1871,20 @@ func (s *server) ConnectNode(addr string, permanent bool) error {
// AddBytesSent adds the passed number of bytes to the total bytes sent counter
// for the server. It is safe for concurrent access.
func (s *server) AddBytesSent(bytesSent uint64) {
s.bytesMutex.Lock()
defer s.bytesMutex.Unlock()
s.bytesSent += bytesSent
atomic.AddUint64(&s.bytesSent, bytesSent)
}
// AddBytesReceived adds the passed number of bytes to the total bytes received
// counter for the server. It is safe for concurrent access.
func (s *server) AddBytesReceived(bytesReceived uint64) {
s.bytesMutex.Lock()
defer s.bytesMutex.Unlock()
s.bytesReceived += bytesReceived
atomic.AddUint64(&s.bytesReceived, bytesReceived)
}
// NetTotals returns the sum of all bytes received and sent across the network
// for all peers. It is safe for concurrent access.
func (s *server) NetTotals() (uint64, uint64) {
s.bytesMutex.Lock()
defer s.bytesMutex.Unlock()
return s.bytesReceived, s.bytesSent
return atomic.LoadUint64(&s.bytesReceived),
atomic.LoadUint64(&s.bytesSent)
}
// UpdatePeerHeights updates the heights of all peers who have have announced