peer: Minor function definition order cleanup.

This rearranges some of the function definitions that pertain to initial
peer version negotiation and bringup so they are more consistent with
the preferred order used throughout the codebase.  In particular, the
functions are defined before they're first used and generally as close
as possible to the first use when they're defined in the same file.

There are no functional changes.

Backported from Decred.
This commit is contained in:
Dave Collins 2018-08-10 20:03:20 -05:00
parent f899737d7f
commit fd78330fd3
No known key found for this signature in database
GPG key ID: B8904D9D9C93D1F2

View file

@ -1,4 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2013-2018 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
@ -801,85 +802,6 @@ func (p *Peer) IsWitnessEnabled() bool {
return witnessEnabled
}
// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
var blockNum int32
if p.cfg.NewestBlock != nil {
var err error
_, blockNum, err = p.cfg.NewestBlock()
if err != nil {
return nil, err
}
}
theirNA := p.na
// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
}
}
// Create a wire.NetAddress with only the services set to use as the
// "addrme" in the version message.
//
// Older nodes previously added the IP and port information to the
// address manager which proved to be unreliable as an inbound
// connection from a peer didn't necessarily mean the peer itself
// accepted inbound connections.
//
// Also, the timestamp is unused in the version message.
ourNA := &wire.NetAddress{
Services: p.cfg.Services,
}
// Generate a unique nonce for this peer so self connections can be
// detected. This is accomplished by adding it to a size-limited map of
// recently seen nonces.
nonce := uint64(rand.Int63())
sentNonces.Add(nonce)
// Version message.
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = wire.SFNodeNetwork
// Advertise the services flag
msg.Services = p.cfg.Services
// Advertise our max supported protocol version.
msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = p.cfg.DisableRelayTx
return msg, nil
}
// PushAddrMsg sends an addr message to the connected peer using the provided
// addresses. This function is useful over manually sending the message via
// QueueMessage since it automatically limits the addresses to the maximum
@ -1041,72 +963,6 @@ func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string
<-doneChan
}
// handleRemoteVersionMsg is invoked when a version bitcoin message is received
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)
}
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
// Negotiate the protocol version.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
// Determine if the peer would like to receive witness data with
// transactions, or not.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.witnessEnabled = true
}
p.flagsMtx.Unlock()
// Once the version message has been exchanged, we're able to determine
// if this peer knows how to encode witness data over the wire
// protocol. If so, then we'll switch to a decoding mode which is
// prepared for the new transaction format introduced as part of
// BIP0144.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.wireEncoding = wire.WitnessEncoding
}
return nil
}
// handlePingMsg is invoked when a peer receives a ping bitcoin message. For
// recent clients (protocol version > BIP0031Version), it replies with a pong
// message. For older clients, it does nothing and anything other than failure
@ -1992,40 +1848,6 @@ func (p *Peer) QueueInventory(invVect *wire.InvVect) {
p.outputInvChan <- invVect
}
// AssociateConnection associates the given conn to the peer. Calling this
// function when the peer is already connected will have no effect.
func (p *Peer) AssociateConnection(conn net.Conn) {
// Already connected?
if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
return
}
p.conn = conn
p.timeConnected = time.Now()
if p.inbound {
p.addr = p.conn.RemoteAddr().String()
// Set up a NetAddress for the peer to be used with AddrManager. We
// only do this inbound because outbound set this up at connection time
// and no point recomputing.
na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
if err != nil {
log.Errorf("Cannot create remote net address: %v", err)
p.Disconnect()
return
}
p.na = na
}
go func() {
if err := p.start(); err != nil {
log.Debugf("Cannot start peer %v: %v", p, err)
p.Disconnect()
}
}()
}
// Connected returns whether or not the peer is currently connected.
//
// This function is safe for concurrent access.
@ -2049,6 +1871,213 @@ func (p *Peer) Disconnect() {
close(p.quit)
}
// handleRemoteVersionMsg is invoked when a version bitcoin message is received
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)
}
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
// Negotiate the protocol version.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
// Determine if the peer would like to receive witness data with
// transactions, or not.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.witnessEnabled = true
}
p.flagsMtx.Unlock()
// Once the version message has been exchanged, we're able to determine
// if this peer knows how to encode witness data over the wire
// protocol. If so, then we'll switch to a decoding mode which is
// prepared for the new transaction format introduced as part of
// BIP0144.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.wireEncoding = wire.WitnessEncoding
}
return nil
}
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
remoteVerMsg, ok := msg.(*wire.MsgVersion)
if !ok {
errStr := "A version message must precede all others"
log.Errorf(errStr)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
}
if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
return err
}
if p.cfg.Listeners.OnVersion != nil {
p.cfg.Listeners.OnVersion(p, remoteVerMsg)
}
return nil
}
// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
var blockNum int32
if p.cfg.NewestBlock != nil {
var err error
_, blockNum, err = p.cfg.NewestBlock()
if err != nil {
return nil, err
}
}
theirNA := p.na
// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
}
}
// Create a wire.NetAddress with only the services set to use as the
// "addrme" in the version message.
//
// Older nodes previously added the IP and port information to the
// address manager which proved to be unreliable as an inbound
// connection from a peer didn't necessarily mean the peer itself
// accepted inbound connections.
//
// Also, the timestamp is unused in the version message.
ourNA := &wire.NetAddress{
Services: p.cfg.Services,
}
// Generate a unique nonce for this peer so self connections can be
// detected. This is accomplished by adding it to a size-limited map of
// recently seen nonces.
nonce := uint64(rand.Int63())
sentNonces.Add(nonce)
// Version message.
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = wire.SFNodeNetwork
// Advertise the services flag
msg.Services = p.cfg.Services
// Advertise our max supported protocol version.
msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = p.cfg.DisableRelayTx
return msg, nil
}
// writeLocalVersionMsg writes our version message to the remote peer.
func (p *Peer) writeLocalVersionMsg() error {
localVerMsg, err := p.localVersionMsg()
if err != nil {
return err
}
return p.writeMessage(localVerMsg, wire.LatestEncoding)
}
// negotiateInboundProtocol waits to receive a version message from the peer
// then sends our version message. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
}
return p.writeLocalVersionMsg()
}
// negotiateOutboundProtocol sends our version message then waits to receive a
// version message from the peer. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil {
return err
}
return p.readRemoteVersionMsg()
}
// start begins processing input and output messages.
func (p *Peer) start() error {
log.Tracef("Starting peer %s", p)
@ -2086,6 +2115,40 @@ func (p *Peer) start() error {
return nil
}
// AssociateConnection associates the given conn to the peer. Calling this
// function when the peer is already connected will have no effect.
func (p *Peer) AssociateConnection(conn net.Conn) {
// Already connected?
if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
return
}
p.conn = conn
p.timeConnected = time.Now()
if p.inbound {
p.addr = p.conn.RemoteAddr().String()
// Set up a NetAddress for the peer to be used with AddrManager. We
// only do this inbound because outbound set this up at connection time
// and no point recomputing.
na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
if err != nil {
log.Errorf("Cannot create remote net address: %v", err)
p.Disconnect()
return
}
p.na = na
}
go func() {
if err := p.start(); err != nil {
log.Debugf("Cannot start peer %v: %v", p, err)
p.Disconnect()
}
}()
}
// WaitForDisconnect waits until the peer has completely disconnected and all
// resources are cleaned up. This will happen if either the local or remote
// side has been disconnected or the peer is forcibly disconnected via
@ -2094,68 +2157,6 @@ func (p *Peer) WaitForDisconnect() {
<-p.quit
}
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
remoteVerMsg, ok := msg.(*wire.MsgVersion)
if !ok {
errStr := "A version message must precede all others"
log.Errorf(errStr)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
}
if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
return err
}
if p.cfg.Listeners.OnVersion != nil {
p.cfg.Listeners.OnVersion(p, remoteVerMsg)
}
return nil
}
// writeLocalVersionMsg writes our version message to the remote peer.
func (p *Peer) writeLocalVersionMsg() error {
localVerMsg, err := p.localVersionMsg()
if err != nil {
return err
}
return p.writeMessage(localVerMsg, wire.LatestEncoding)
}
// negotiateInboundProtocol waits to receive a version message from the peer
// then sends our version message. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
}
return p.writeLocalVersionMsg()
}
// negotiateOutboundProtocol sends our version message then waits to receive a
// version message from the peer. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil {
return err
}
return p.readRemoteVersionMsg()
}
// newPeerBase returns a new base bitcoin peer based on the inbound flag. This
// is used by the NewInboundPeer and NewOutboundPeer functions to perform base
// setup needed by both types of peers.