Move to protecting all peer stats by the same mutex.

In practise the races caused by not protecting these quite simply didn't
matter, they couldn't actually cause any damage whatsoever. However, I
am sick of hearing about these essentially false positivies whenever
someone runs the race detector (yes, i know that race detector has no
false positives but this was effectively harmess).

verified to shut the detector up by dhill.
This commit is contained in:
Owain G. Ainsworth 2014-04-23 17:36:33 +01:00
parent 88ea84cf12
commit 4d44eeb877
2 changed files with 52 additions and 31 deletions

79
peer.go
View file

@ -136,30 +136,20 @@ type outMsg struct {
// to push messages to the peer. Internally they use QueueMessage. // to push messages to the peer. Internally they use QueueMessage.
type peer struct { type peer struct {
server *server server *server
protocolVersion uint32
btcnet btcwire.BitcoinNet btcnet btcwire.BitcoinNet
services btcwire.ServiceFlag
started int32 started int32
conn net.Conn conn net.Conn
addr string addr string
na *btcwire.NetAddress na *btcwire.NetAddress
timeConnected time.Time
lastSend time.Time
lastRecv time.Time
bytesReceived uint64
bytesSent uint64
inbound bool inbound bool
connected int32 connected int32
disconnect int32 // only to be used atomically disconnect int32 // only to be used atomically
persistent bool persistent bool
versionKnown bool
versionMutex sync.Mutex
knownAddresses map[string]bool knownAddresses map[string]bool
knownInventory *MruInventoryMap knownInventory *MruInventoryMap
knownInvMutex sync.Mutex knownInvMutex sync.Mutex
requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager
requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager
lastBlock int32
retryCount int64 retryCount int64
prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager
prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager
@ -175,11 +165,20 @@ type peer struct {
txProcessed chan bool txProcessed chan bool
blockProcessed chan bool blockProcessed chan bool
quit chan bool quit chan bool
StatsMtx sync.Mutex // protects all statistics below here.
versionKnown bool
protocolVersion uint32
services btcwire.ServiceFlag
timeConnected time.Time
lastSend time.Time
lastRecv time.Time
bytesReceived uint64
bytesSent uint64
userAgent string userAgent string
pingStatsMtx sync.Mutex // protects lastPing* lastBlock int32
lastPingNonce uint64 // Set to nonce if we have a pending ping. lastPingNonce uint64 // Set to nonce if we have a pending ping.
lastPingTime time.Time // Time we sent last ping. lastPingTime time.Time // Time we sent last ping.
lastPingMicros int64 // Time for last ping to return. lastPingMicros int64 // Time for last ping to return.
} }
// String returns the peer's address and directionality as a human-readable // String returns the peer's address and directionality as a human-readable
@ -212,12 +211,21 @@ func (p *peer) AddKnownInventory(invVect *btcwire.InvVect) {
// VersionKnown returns the whether or not the version of a peer is known locally. // VersionKnown returns the whether or not the version of a peer is known locally.
// It is safe for concurrent access. // It is safe for concurrent access.
func (p *peer) VersionKnown() bool { func (p *peer) VersionKnown() bool {
p.versionMutex.Lock() p.StatsMtx.Lock()
defer p.versionMutex.Unlock() defer p.StatsMtx.Unlock()
return p.versionKnown return p.versionKnown
} }
// ProtocolVersion returns the peer protocol version in a manner that is safe
// for concurrent access.
func (p *peer) ProtocolVersion() uint32 {
p.StatsMtx.Lock()
defer p.StatsMtx.Unlock()
return p.protocolVersion
}
// pushVersionMsg sends a version message to the connected peer using the // pushVersionMsg sends a version message to the connected peer using the
// current state. // current state.
func (p *peer) pushVersionMsg() error { func (p *peer) pushVersionMsg() error {
@ -287,12 +295,12 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
return return
} }
p.StatsMtx.Lock() // Updating a bunch of stats.
// Limit to one version message per peer. // Limit to one version message per peer.
p.versionMutex.Lock()
if p.versionKnown { if p.versionKnown {
p.logError("Only one version message per peer is allowed %s.", p.logError("Only one version message per peer is allowed %s.",
p) p)
p.versionMutex.Unlock() p.StatsMtx.Unlock()
p.Disconnect() p.Disconnect()
return return
} }
@ -300,7 +308,6 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// Negotiate the protocol version. // Negotiate the protocol version.
p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion))
p.versionKnown = true p.versionKnown = true
p.versionMutex.Unlock()
peerLog.Debugf("Negotiated protocol version %d for peer %s", peerLog.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p) p.protocolVersion, p)
p.lastBlock = msg.LastBlock p.lastBlock = msg.LastBlock
@ -312,6 +319,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// Set the remote peer's user agent. // Set the remote peer's user agent.
p.userAgent = msg.UserAgent p.userAgent = msg.UserAgent
p.StatsMtx.Unlock()
// Inbound connections. // Inbound connections.
if p.inbound { if p.inbound {
// Set up a NetAddress for the peer to be used with AddrManager. // Set up a NetAddress for the peer to be used with AddrManager.
@ -354,7 +363,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) {
// Request known addresses if the server address manager needs // Request known addresses if the server address manager needs
// more and the peer has a protocol version new enough to // more and the peer has a protocol version new enough to
// include a timestamp with addresses. // include a timestamp with addresses.
hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion hasTimestamp := p.ProtocolVersion() >=
btcwire.NetAddressTimeVersion
if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { if p.server.addrManager.NeedMoreAddresses() && hasTimestamp {
p.QueueMessage(btcwire.NewMsgGetAddr(), nil) p.QueueMessage(btcwire.NewMsgGetAddr(), nil)
} }
@ -908,7 +918,7 @@ func (p *peer) pushAddrMsg(addresses []*btcwire.NetAddress) error {
// is used to notify the server about advertised addresses. // is used to notify the server about advertised addresses.
func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) {
// Ignore old style addresses which don't include a timestamp. // Ignore old style addresses which don't include a timestamp.
if p.protocolVersion < btcwire.NetAddressTimeVersion { if p.ProtocolVersion() < btcwire.NetAddressTimeVersion {
return return
} }
@ -952,7 +962,7 @@ func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) {
// is considered a successful ping. // is considered a successful ping.
func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { func (p *peer) handlePingMsg(msg *btcwire.MsgPing) {
// Only Reply with pong is message comes from a new enough client. // Only Reply with pong is message comes from a new enough client.
if p.protocolVersion > btcwire.BIP0031Version { if p.ProtocolVersion() > btcwire.BIP0031Version {
// Include nonce from ping so pong can be identified. // Include nonce from ping so pong can be identified.
p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil) p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil)
} }
@ -963,8 +973,8 @@ func (p *peer) handlePingMsg(msg *btcwire.MsgPing) {
// previosuly we update our ping time statistics. If the client is too old or // previosuly we update our ping time statistics. If the client is too old or
// we had not send a ping we ignore it. // we had not send a ping we ignore it.
func (p *peer) handlePongMsg(msg *btcwire.MsgPong) { func (p *peer) handlePongMsg(msg *btcwire.MsgPong) {
p.pingStatsMtx.Lock() p.StatsMtx.Lock()
defer p.pingStatsMtx.Unlock() defer p.StatsMtx.Unlock()
// Arguably we could use a buffered channel here sending data // Arguably we could use a buffered channel here sending data
// in a fifo manner whenever we send a ping, or a list keeping track of // in a fifo manner whenever we send a ping, or a list keeping track of
@ -984,8 +994,11 @@ func (p *peer) handlePongMsg(msg *btcwire.MsgPong) {
// readMessage reads the next bitcoin message from the peer with logging. // readMessage reads the next bitcoin message from the peer with logging.
func (p *peer) readMessage() (btcwire.Message, []byte, error) { func (p *peer) readMessage() (btcwire.Message, []byte, error) {
n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.protocolVersion, p.btcnet) n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.ProtocolVersion(),
p.btcnet)
p.StatsMtx.Lock()
p.bytesReceived += uint64(n) p.bytesReceived += uint64(n)
p.StatsMtx.Unlock()
p.server.AddBytesReceived(uint64(n)) p.server.AddBytesReceived(uint64(n))
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -1045,7 +1058,8 @@ func (p *peer) writeMessage(msg btcwire.Message) {
})) }))
peerLog.Tracef("%v", newLogClosure(func() string { peerLog.Tracef("%v", newLogClosure(func() string {
var buf bytes.Buffer var buf bytes.Buffer
err := btcwire.WriteMessage(&buf, msg, p.protocolVersion, p.btcnet) err := btcwire.WriteMessage(&buf, msg, p.ProtocolVersion(),
p.btcnet)
if err != nil { if err != nil {
return err.Error() return err.Error()
} }
@ -1053,8 +1067,11 @@ func (p *peer) writeMessage(msg btcwire.Message) {
})) }))
// Write the message to the peer. // Write the message to the peer.
n, err := btcwire.WriteMessageN(p.conn, msg, p.protocolVersion, p.btcnet) n, err := btcwire.WriteMessageN(p.conn, msg, p.ProtocolVersion(),
p.btcnet)
p.StatsMtx.Lock()
p.bytesSent += uint64(n) p.bytesSent += uint64(n)
p.StatsMtx.Unlock()
p.server.AddBytesSent(uint64(n)) p.server.AddBytesSent(uint64(n))
if err != nil { if err != nil {
p.Disconnect() p.Disconnect()
@ -1126,7 +1143,9 @@ out:
} }
break out break out
} }
p.StatsMtx.Lock()
p.lastRecv = time.Now() p.lastRecv = time.Now()
p.StatsMtx.Unlock()
// Ensure version message comes first. // Ensure version message comes first.
if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.VersionKnown() { if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.VersionKnown() {
@ -1395,12 +1414,12 @@ out:
case *btcwire.MsgPing: case *btcwire.MsgPing:
// expects pong // expects pong
// Also set up statistics. // Also set up statistics.
p.pingStatsMtx.Lock() p.StatsMtx.Lock()
if p.protocolVersion > btcwire.BIP0031Version { if p.protocolVersion > btcwire.BIP0031Version {
p.lastPingNonce = m.Nonce p.lastPingNonce = m.Nonce
p.lastPingTime = time.Now() p.lastPingTime = time.Now()
} }
p.pingStatsMtx.Unlock() p.StatsMtx.Unlock()
case *btcwire.MsgMemPool: case *btcwire.MsgMemPool:
// Should return an inv. // Should return an inv.
case *btcwire.MsgGetData: case *btcwire.MsgGetData:
@ -1418,7 +1437,9 @@ out:
pingTimer.Reset(pingTimeoutMinutes * time.Minute) pingTimer.Reset(pingTimeoutMinutes * time.Minute)
} }
p.writeMessage(msg.msg) p.writeMessage(msg.msg)
p.StatsMtx.Lock()
p.lastSend = time.Now() p.lastSend = time.Now()
p.StatsMtx.Unlock()
if msg.doneChan != nil { if msg.doneChan != nil {
msg.doneChan <- true msg.doneChan <- true
} }

View file

@ -352,6 +352,7 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
// however it is statistics for purely informational purposes // however it is statistics for purely informational purposes
// and we don't really care if they are raced to get the new // and we don't really care if they are raced to get the new
// version. // version.
p.StatsMtx.Lock()
info := &btcjson.GetPeerInfoResult{ info := &btcjson.GetPeerInfoResult{
Addr: p.addr, Addr: p.addr,
Services: fmt.Sprintf("%08d", p.services), Services: fmt.Sprintf("%08d", p.services),
@ -367,14 +368,13 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) {
BanScore: 0, BanScore: 0,
SyncNode: p == syncPeer, SyncNode: p == syncPeer,
} }
p.pingStatsMtx.Lock()
info.PingTime = p.lastPingMicros info.PingTime = p.lastPingMicros
if p.lastPingNonce != 0 { if p.lastPingNonce != 0 {
wait := time.Now().Sub(p.lastPingTime).Nanoseconds() wait := time.Now().Sub(p.lastPingTime).Nanoseconds()
// We actually want microseconds. // We actually want microseconds.
info.PingWait = wait / 1000 info.PingWait = wait / 1000
} }
p.pingStatsMtx.Unlock() p.StatsMtx.Unlock()
infos = append(infos, info) infos = append(infos, info)
}) })
msg.reply <- infos msg.reply <- infos