From 4d44eeb877b4811b4952ccaf2bb5a2b4f654a3c3 Mon Sep 17 00:00:00 2001 From: "Owain G. Ainsworth" Date: Wed, 23 Apr 2014 17:36:33 +0100 Subject: [PATCH] Move to protecting all peer stats by the same mutex. In practise the races caused by not protecting these quite simply didn't matter, they couldn't actually cause any damage whatsoever. However, I am sick of hearing about these essentially false positivies whenever someone runs the race detector (yes, i know that race detector has no false positives but this was effectively harmess). verified to shut the detector up by dhill. --- peer.go | 79 +++++++++++++++++++++++++++++++++++-------------------- server.go | 4 +-- 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/peer.go b/peer.go index 29dc0ae3..a4ae8f0e 100644 --- a/peer.go +++ b/peer.go @@ -136,30 +136,20 @@ type outMsg struct { // to push messages to the peer. Internally they use QueueMessage. type peer struct { server *server - protocolVersion uint32 btcnet btcwire.BitcoinNet - services btcwire.ServiceFlag started int32 conn net.Conn addr string na *btcwire.NetAddress - timeConnected time.Time - lastSend time.Time - lastRecv time.Time - bytesReceived uint64 - bytesSent uint64 inbound bool connected int32 disconnect int32 // only to be used atomically persistent bool - versionKnown bool - versionMutex sync.Mutex knownAddresses map[string]bool knownInventory *MruInventoryMap knownInvMutex sync.Mutex requestedTxns map[btcwire.ShaHash]bool // owned by blockmanager requestedBlocks map[btcwire.ShaHash]bool // owned by blockmanager - lastBlock int32 retryCount int64 prevGetBlocksBegin *btcwire.ShaHash // owned by blockmanager prevGetBlocksStop *btcwire.ShaHash // owned by blockmanager @@ -175,11 +165,20 @@ type peer struct { txProcessed chan bool blockProcessed chan bool quit chan bool + StatsMtx sync.Mutex // protects all statistics below here. + versionKnown bool + protocolVersion uint32 + services btcwire.ServiceFlag + timeConnected time.Time + lastSend time.Time + lastRecv time.Time + bytesReceived uint64 + bytesSent uint64 userAgent string - pingStatsMtx sync.Mutex // protects lastPing* - lastPingNonce uint64 // Set to nonce if we have a pending ping. - lastPingTime time.Time // Time we sent last ping. - lastPingMicros int64 // Time for last ping to return. + lastBlock int32 + lastPingNonce uint64 // Set to nonce if we have a pending ping. + lastPingTime time.Time // Time we sent last ping. + lastPingMicros int64 // Time for last ping to return. } // String returns the peer's address and directionality as a human-readable @@ -212,12 +211,21 @@ func (p *peer) AddKnownInventory(invVect *btcwire.InvVect) { // VersionKnown returns the whether or not the version of a peer is known locally. // It is safe for concurrent access. func (p *peer) VersionKnown() bool { - p.versionMutex.Lock() - defer p.versionMutex.Unlock() + p.StatsMtx.Lock() + defer p.StatsMtx.Unlock() return p.versionKnown } +// ProtocolVersion returns the peer protocol version in a manner that is safe +// for concurrent access. +func (p *peer) ProtocolVersion() uint32 { + p.StatsMtx.Lock() + defer p.StatsMtx.Unlock() + + return p.protocolVersion +} + // pushVersionMsg sends a version message to the connected peer using the // current state. func (p *peer) pushVersionMsg() error { @@ -287,12 +295,12 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { return } + p.StatsMtx.Lock() // Updating a bunch of stats. // Limit to one version message per peer. - p.versionMutex.Lock() if p.versionKnown { p.logError("Only one version message per peer is allowed %s.", p) - p.versionMutex.Unlock() + p.StatsMtx.Unlock() p.Disconnect() return } @@ -300,7 +308,6 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Negotiate the protocol version. p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) p.versionKnown = true - p.versionMutex.Unlock() peerLog.Debugf("Negotiated protocol version %d for peer %s", p.protocolVersion, p) p.lastBlock = msg.LastBlock @@ -312,6 +319,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Set the remote peer's user agent. p.userAgent = msg.UserAgent + p.StatsMtx.Unlock() + // Inbound connections. if p.inbound { // Set up a NetAddress for the peer to be used with AddrManager. @@ -354,7 +363,8 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Request known addresses if the server address manager needs // more and the peer has a protocol version new enough to // include a timestamp with addresses. - hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion + hasTimestamp := p.ProtocolVersion() >= + btcwire.NetAddressTimeVersion if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { p.QueueMessage(btcwire.NewMsgGetAddr(), nil) } @@ -908,7 +918,7 @@ func (p *peer) pushAddrMsg(addresses []*btcwire.NetAddress) error { // is used to notify the server about advertised addresses. func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { // Ignore old style addresses which don't include a timestamp. - if p.protocolVersion < btcwire.NetAddressTimeVersion { + if p.ProtocolVersion() < btcwire.NetAddressTimeVersion { return } @@ -952,7 +962,7 @@ func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { // is considered a successful ping. func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { // Only Reply with pong is message comes from a new enough client. - if p.protocolVersion > btcwire.BIP0031Version { + if p.ProtocolVersion() > btcwire.BIP0031Version { // Include nonce from ping so pong can be identified. p.QueueMessage(btcwire.NewMsgPong(msg.Nonce), nil) } @@ -963,8 +973,8 @@ func (p *peer) handlePingMsg(msg *btcwire.MsgPing) { // previosuly we update our ping time statistics. If the client is too old or // we had not send a ping we ignore it. func (p *peer) handlePongMsg(msg *btcwire.MsgPong) { - p.pingStatsMtx.Lock() - defer p.pingStatsMtx.Unlock() + p.StatsMtx.Lock() + defer p.StatsMtx.Unlock() // Arguably we could use a buffered channel here sending data // in a fifo manner whenever we send a ping, or a list keeping track of @@ -984,8 +994,11 @@ func (p *peer) handlePongMsg(msg *btcwire.MsgPong) { // readMessage reads the next bitcoin message from the peer with logging. func (p *peer) readMessage() (btcwire.Message, []byte, error) { - n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.protocolVersion, p.btcnet) + n, msg, buf, err := btcwire.ReadMessageN(p.conn, p.ProtocolVersion(), + p.btcnet) + p.StatsMtx.Lock() p.bytesReceived += uint64(n) + p.StatsMtx.Unlock() p.server.AddBytesReceived(uint64(n)) if err != nil { return nil, nil, err @@ -1045,7 +1058,8 @@ func (p *peer) writeMessage(msg btcwire.Message) { })) peerLog.Tracef("%v", newLogClosure(func() string { var buf bytes.Buffer - err := btcwire.WriteMessage(&buf, msg, p.protocolVersion, p.btcnet) + err := btcwire.WriteMessage(&buf, msg, p.ProtocolVersion(), + p.btcnet) if err != nil { return err.Error() } @@ -1053,8 +1067,11 @@ func (p *peer) writeMessage(msg btcwire.Message) { })) // Write the message to the peer. - n, err := btcwire.WriteMessageN(p.conn, msg, p.protocolVersion, p.btcnet) + n, err := btcwire.WriteMessageN(p.conn, msg, p.ProtocolVersion(), + p.btcnet) + p.StatsMtx.Lock() p.bytesSent += uint64(n) + p.StatsMtx.Unlock() p.server.AddBytesSent(uint64(n)) if err != nil { p.Disconnect() @@ -1126,7 +1143,9 @@ out: } break out } + p.StatsMtx.Lock() p.lastRecv = time.Now() + p.StatsMtx.Unlock() // Ensure version message comes first. if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.VersionKnown() { @@ -1395,12 +1414,12 @@ out: case *btcwire.MsgPing: // expects pong // Also set up statistics. - p.pingStatsMtx.Lock() + p.StatsMtx.Lock() if p.protocolVersion > btcwire.BIP0031Version { p.lastPingNonce = m.Nonce p.lastPingTime = time.Now() } - p.pingStatsMtx.Unlock() + p.StatsMtx.Unlock() case *btcwire.MsgMemPool: // Should return an inv. case *btcwire.MsgGetData: @@ -1418,7 +1437,9 @@ out: pingTimer.Reset(pingTimeoutMinutes * time.Minute) } p.writeMessage(msg.msg) + p.StatsMtx.Lock() p.lastSend = time.Now() + p.StatsMtx.Unlock() if msg.doneChan != nil { msg.doneChan <- true } diff --git a/server.go b/server.go index a4591f72..5592912c 100644 --- a/server.go +++ b/server.go @@ -352,6 +352,7 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { // however it is statistics for purely informational purposes // and we don't really care if they are raced to get the new // version. + p.StatsMtx.Lock() info := &btcjson.GetPeerInfoResult{ Addr: p.addr, Services: fmt.Sprintf("%08d", p.services), @@ -367,14 +368,13 @@ func (s *server) handleQuery(querymsg interface{}, state *peerState) { BanScore: 0, SyncNode: p == syncPeer, } - p.pingStatsMtx.Lock() info.PingTime = p.lastPingMicros if p.lastPingNonce != 0 { wait := time.Now().Sub(p.lastPingTime).Nanoseconds() // We actually want microseconds. info.PingWait = wait / 1000 } - p.pingStatsMtx.Unlock() + p.StatsMtx.Unlock() infos = append(infos, info) }) msg.reply <- infos