Merge pull request #1247 from davecgh/peer_minor_cleanup

peer: Minor function definition order cleanup.
This commit is contained in:
Olaoluwa Osuntokun 2018-08-22 19:41:16 -07:00 committed by GitHub
commit 9f436585d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,4 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers // Copyright (c) 2013-2018 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC // Use of this source code is governed by an ISC
// license that can be found in the LICENSE file. // license that can be found in the LICENSE file.
@ -801,85 +802,6 @@ func (p *Peer) IsWitnessEnabled() bool {
return witnessEnabled return witnessEnabled
} }
// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
var blockNum int32
if p.cfg.NewestBlock != nil {
var err error
_, blockNum, err = p.cfg.NewestBlock()
if err != nil {
return nil, err
}
}
theirNA := p.na
// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
}
}
// Create a wire.NetAddress with only the services set to use as the
// "addrme" in the version message.
//
// Older nodes previously added the IP and port information to the
// address manager which proved to be unreliable as an inbound
// connection from a peer didn't necessarily mean the peer itself
// accepted inbound connections.
//
// Also, the timestamp is unused in the version message.
ourNA := &wire.NetAddress{
Services: p.cfg.Services,
}
// Generate a unique nonce for this peer so self connections can be
// detected. This is accomplished by adding it to a size-limited map of
// recently seen nonces.
nonce := uint64(rand.Int63())
sentNonces.Add(nonce)
// Version message.
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = wire.SFNodeNetwork
// Advertise the services flag
msg.Services = p.cfg.Services
// Advertise our max supported protocol version.
msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = p.cfg.DisableRelayTx
return msg, nil
}
// PushAddrMsg sends an addr message to the connected peer using the provided // PushAddrMsg sends an addr message to the connected peer using the provided
// addresses. This function is useful over manually sending the message via // addresses. This function is useful over manually sending the message via
// QueueMessage since it automatically limits the addresses to the maximum // QueueMessage since it automatically limits the addresses to the maximum
@ -1041,72 +963,6 @@ func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string
<-doneChan <-doneChan
} }
// handleRemoteVersionMsg is invoked when a version bitcoin message is received
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)
}
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
// Negotiate the protocol version.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
// Determine if the peer would like to receive witness data with
// transactions, or not.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.witnessEnabled = true
}
p.flagsMtx.Unlock()
// Once the version message has been exchanged, we're able to determine
// if this peer knows how to encode witness data over the wire
// protocol. If so, then we'll switch to a decoding mode which is
// prepared for the new transaction format introduced as part of
// BIP0144.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.wireEncoding = wire.WitnessEncoding
}
return nil
}
// handlePingMsg is invoked when a peer receives a ping bitcoin message. For // handlePingMsg is invoked when a peer receives a ping bitcoin message. For
// recent clients (protocol version > BIP0031Version), it replies with a pong // recent clients (protocol version > BIP0031Version), it replies with a pong
// message. For older clients, it does nothing and anything other than failure // message. For older clients, it does nothing and anything other than failure
@ -1992,40 +1848,6 @@ func (p *Peer) QueueInventory(invVect *wire.InvVect) {
p.outputInvChan <- invVect p.outputInvChan <- invVect
} }
// AssociateConnection associates the given conn to the peer. Calling this
// function when the peer is already connected will have no effect.
func (p *Peer) AssociateConnection(conn net.Conn) {
// Already connected?
if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
return
}
p.conn = conn
p.timeConnected = time.Now()
if p.inbound {
p.addr = p.conn.RemoteAddr().String()
// Set up a NetAddress for the peer to be used with AddrManager. We
// only do this inbound because outbound set this up at connection time
// and no point recomputing.
na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
if err != nil {
log.Errorf("Cannot create remote net address: %v", err)
p.Disconnect()
return
}
p.na = na
}
go func() {
if err := p.start(); err != nil {
log.Debugf("Cannot start peer %v: %v", p, err)
p.Disconnect()
}
}()
}
// Connected returns whether or not the peer is currently connected. // Connected returns whether or not the peer is currently connected.
// //
// This function is safe for concurrent access. // This function is safe for concurrent access.
@ -2049,6 +1871,213 @@ func (p *Peer) Disconnect() {
close(p.quit) close(p.quit)
} }
// handleRemoteVersionMsg is invoked when a version bitcoin message is received
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)
}
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
// Negotiate the protocol version.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
// Determine if the peer would like to receive witness data with
// transactions, or not.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.witnessEnabled = true
}
p.flagsMtx.Unlock()
// Once the version message has been exchanged, we're able to determine
// if this peer knows how to encode witness data over the wire
// protocol. If so, then we'll switch to a decoding mode which is
// prepared for the new transaction format introduced as part of
// BIP0144.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.wireEncoding = wire.WitnessEncoding
}
return nil
}
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
remoteVerMsg, ok := msg.(*wire.MsgVersion)
if !ok {
errStr := "A version message must precede all others"
log.Errorf(errStr)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
}
if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
return err
}
if p.cfg.Listeners.OnVersion != nil {
p.cfg.Listeners.OnVersion(p, remoteVerMsg)
}
return nil
}
// localVersionMsg creates a version message that can be used to send to the
// remote peer.
func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
var blockNum int32
if p.cfg.NewestBlock != nil {
var err error
_, blockNum, err = p.cfg.NewestBlock()
if err != nil {
return nil, err
}
}
theirNA := p.na
// If we are behind a proxy and the connection comes from the proxy then
// we return an unroutable address as their address. This is to prevent
// leaking the tor proxy address.
if p.cfg.Proxy != "" {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
}
}
// Create a wire.NetAddress with only the services set to use as the
// "addrme" in the version message.
//
// Older nodes previously added the IP and port information to the
// address manager which proved to be unreliable as an inbound
// connection from a peer didn't necessarily mean the peer itself
// accepted inbound connections.
//
// Also, the timestamp is unused in the version message.
ourNA := &wire.NetAddress{
Services: p.cfg.Services,
}
// Generate a unique nonce for this peer so self connections can be
// detected. This is accomplished by adding it to a size-limited map of
// recently seen nonces.
nonce := uint64(rand.Int63())
sentNonces.Add(nonce)
// Version message.
msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = wire.SFNodeNetwork
// Advertise the services flag
msg.Services = p.cfg.Services
// Advertise our max supported protocol version.
msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
// Advertise if inv messages for transactions are desired.
msg.DisableRelayTx = p.cfg.DisableRelayTx
return msg, nil
}
// writeLocalVersionMsg writes our version message to the remote peer.
func (p *Peer) writeLocalVersionMsg() error {
localVerMsg, err := p.localVersionMsg()
if err != nil {
return err
}
return p.writeMessage(localVerMsg, wire.LatestEncoding)
}
// negotiateInboundProtocol waits to receive a version message from the peer
// then sends our version message. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
}
return p.writeLocalVersionMsg()
}
// negotiateOutboundProtocol sends our version message then waits to receive a
// version message from the peer. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil {
return err
}
return p.readRemoteVersionMsg()
}
// start begins processing input and output messages. // start begins processing input and output messages.
func (p *Peer) start() error { func (p *Peer) start() error {
log.Tracef("Starting peer %s", p) log.Tracef("Starting peer %s", p)
@ -2086,6 +2115,40 @@ func (p *Peer) start() error {
return nil return nil
} }
// AssociateConnection associates the given conn to the peer. Calling this
// function when the peer is already connected will have no effect.
func (p *Peer) AssociateConnection(conn net.Conn) {
// Already connected?
if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
return
}
p.conn = conn
p.timeConnected = time.Now()
if p.inbound {
p.addr = p.conn.RemoteAddr().String()
// Set up a NetAddress for the peer to be used with AddrManager. We
// only do this inbound because outbound set this up at connection time
// and no point recomputing.
na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
if err != nil {
log.Errorf("Cannot create remote net address: %v", err)
p.Disconnect()
return
}
p.na = na
}
go func() {
if err := p.start(); err != nil {
log.Debugf("Cannot start peer %v: %v", p, err)
p.Disconnect()
}
}()
}
// WaitForDisconnect waits until the peer has completely disconnected and all // WaitForDisconnect waits until the peer has completely disconnected and all
// resources are cleaned up. This will happen if either the local or remote // resources are cleaned up. This will happen if either the local or remote
// side has been disconnected or the peer is forcibly disconnected via // side has been disconnected or the peer is forcibly disconnected via
@ -2094,68 +2157,6 @@ func (p *Peer) WaitForDisconnect() {
<-p.quit <-p.quit
} }
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
remoteVerMsg, ok := msg.(*wire.MsgVersion)
if !ok {
errStr := "A version message must precede all others"
log.Errorf(errStr)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
}
if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
return err
}
if p.cfg.Listeners.OnVersion != nil {
p.cfg.Listeners.OnVersion(p, remoteVerMsg)
}
return nil
}
// writeLocalVersionMsg writes our version message to the remote peer.
func (p *Peer) writeLocalVersionMsg() error {
localVerMsg, err := p.localVersionMsg()
if err != nil {
return err
}
return p.writeMessage(localVerMsg, wire.LatestEncoding)
}
// negotiateInboundProtocol waits to receive a version message from the peer
// then sends our version message. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
}
return p.writeLocalVersionMsg()
}
// negotiateOutboundProtocol sends our version message then waits to receive a
// version message from the peer. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateOutboundProtocol() error {
if err := p.writeLocalVersionMsg(); err != nil {
return err
}
return p.readRemoteVersionMsg()
}
// newPeerBase returns a new base bitcoin peer based on the inbound flag. This // newPeerBase returns a new base bitcoin peer based on the inbound flag. This
// is used by the NewInboundPeer and NewOutboundPeer functions to perform base // is used by the NewInboundPeer and NewOutboundPeer functions to perform base
// setup needed by both types of peers. // setup needed by both types of peers.