Merge pull request #1256 from davecgh/server_onversion_use_local_for_net_addr

server: Use local addr var in version handler.
This commit is contained in:
Olaoluwa Osuntokun 2018-10-12 17:27:28 -07:00 committed by GitHub
commit d2046eeb1b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 249 additions and 144 deletions

View file

@ -1,4 +1,5 @@
// Copyright (c) 2013-2016 The btcsuite developers
// Copyright (c) 2015-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
@ -937,6 +938,25 @@ func (a *AddrManager) Good(addr *wire.NetAddress) {
a.addrNew[newBucket][rmkey] = rmka
}
// SetServices sets the services for the giiven address to the provided value.
func (a *AddrManager) SetServices(addr *wire.NetAddress, services wire.ServiceFlag) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.find(addr)
if ka == nil {
return
}
// Update the services if needed.
if ka.na.Services != services {
// ka.na is immutable, so replace it.
naCopy := *ka.na
naCopy.Services = services
ka.na = &naCopy
}
}
// AddLocalAddress adds na to the list of known local addresses to advertise
// with the given priority.
func (a *AddrManager) AddLocalAddress(na *wire.NetAddress, priority AddressPriority) error {

View file

@ -1,4 +1,5 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2015-2018 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
@ -72,8 +73,9 @@ func Example_newOutboundPeer() {
Services: 0,
TrickleInterval: time.Second * 10,
Listeners: peer.MessageListeners{
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
fmt.Println("outbound: received version")
return nil
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}

View file

@ -34,9 +34,9 @@ const (
// inv message to a peer.
DefaultTrickleInterval = 10 * time.Second
// minAcceptableProtocolVersion is the lowest protocol version that a
// MinAcceptableProtocolVersion is the lowest protocol version that a
// connected peer may support.
minAcceptableProtocolVersion = wire.MultipleAddressVersion
MinAcceptableProtocolVersion = wire.MultipleAddressVersion
// outputBufferSize is the number of elements the output channels use.
outputBufferSize = 50
@ -187,7 +187,9 @@ type MessageListeners struct {
OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)
// OnVersion is invoked when a peer receives a version bitcoin message.
OnVersion func(p *Peer, msg *wire.MsgVersion)
// The caller may return a reject message in which case the message will
// be sent to the peer and the peer will be disconnected.
OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject
// OnVerAck is invoked when a peer receives a verack bitcoin message.
OnVerAck func(p *Peer, msg *wire.MsgVerAck)
@ -1369,7 +1371,7 @@ out:
p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
switch msg := rmsg.(type) {
case *wire.MsgVersion:
// Limit to one version message per peer.
p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
"duplicate version message", nil, true)
break out
@ -1875,26 +1877,42 @@ func (p *Peer) Disconnect() {
close(p.quit)
}
// handleRemoteVersionMsg is invoked when a version bitcoin message is received
// from the remote peer. It will return an error if the remote peer's version
// is not compatible with ours.
func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
// Notify and disconnect clients if the first message is not a version
// message.
msg, ok := remoteMsg.(*wire.MsgVersion)
if !ok {
reason := "a version message must precede all others"
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < minAcceptableProtocolVersion {
reason := fmt.Sprintf("protocol version must be %d or greater",
minAcceptableProtocolVersion)
return errors.New(reason)
}
// Negotiate the protocol version and set the services to what the remote
// peer advertised.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
p.services = msg.Services
p.flagsMtx.Unlock()
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
@ -1904,22 +1922,10 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
// Negotiate the protocol version.
// Set the peer's ID, user agent, and potentially the flag which
// specifies the witness support is enabled.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Set the peer's ID.
p.id = atomic.AddInt32(&nodeCount, 1)
// Set the supported services for the peer to what the remote peer
// advertised.
p.services = msg.Services
// Set the remote peer's user agent.
p.userAgent = msg.UserAgent
// Determine if the peer would like to receive witness data with
@ -1938,36 +1944,33 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error {
p.wireEncoding = wire.WitnessEncoding
}
return nil
}
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
msg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
remoteVerMsg, ok := msg.(*wire.MsgVersion)
if !ok {
errStr := "A version message must precede all others"
log.Errorf(errStr)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
errStr)
return p.writeMessage(rejectMsg, wire.LatestEncoding)
}
if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil {
return err
}
// Invoke the callback if specified.
if p.cfg.Listeners.OnVersion != nil {
p.cfg.Listeners.OnVersion(p, remoteVerMsg)
rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
if rejectMsg != nil {
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(rejectMsg.Reason)
}
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
// Send a reject message indicating the protocol version is
// obsolete and wait for the message to be sent before
// disconnecting.
reason := fmt.Sprintf("protocol version must be %d or greater",
MinAcceptableProtocolVersion)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}
return nil
}
@ -1992,7 +1995,8 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
// invalid proxy means poorly configured, be on the safe side.
if err != nil || p.na.IP.String() == proxyaddress {
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 0)
theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0,
theirNA.Services)
}
}
@ -2020,25 +2024,7 @@ func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
p.cfg.UserAgentComments...)
// XXX: bitcoind appears to always enable the full node services flag
// of the remote peer netaddress field in the version message regardless
// of whether it knows it supports it or not. Also, bitcoind sets
// the services field of the local peer to 0 regardless of support.
//
// Realistically, this should be set as follows:
// - For outgoing connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to 0 to indicate no services
// as they are still unknown
// - For incoming connections:
// - Set the local netaddress services to what the local peer
// actually supports
// - Set the remote netaddress services to the what was advertised by
// by the remote peer in its version message
msg.AddrYou.Services = wire.SFNodeNetwork
// Advertise the services flag
// Advertise local services.
msg.Services = p.cfg.Services
// Advertise our max supported protocol version.
@ -2099,9 +2085,11 @@ func (p *Peer) start() error {
select {
case err := <-negotiateErr:
if err != nil {
p.Disconnect()
return err
}
case <-time.After(negotiateTimeout):
p.Disconnect()
return errors.New("protocol negotiation timeout")
}
log.Debugf("Connected to %s", p.Addr())
@ -2224,14 +2212,13 @@ func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
}
if cfg.HostToNetAddress != nil {
na, err := cfg.HostToNetAddress(host, uint16(port), cfg.Services)
na, err := cfg.HostToNetAddress(host, uint16(port), 0)
if err != nil {
return nil, err
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port),
cfg.Services)
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
}
return p, nil

View file

@ -1,4 +1,5 @@
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2016-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
@ -431,8 +432,9 @@ func TestPeerListeners(t *testing.T) {
OnMerkleBlock: func(p *peer.Peer, msg *wire.MsgMerkleBlock) {
ok <- msg
},
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) {
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
ok <- msg
return nil
},
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
@ -856,6 +858,65 @@ func TestUnsupportedVersionPeer(t *testing.T) {
}
}
// TestDuplicateVersionMsg ensures that receiving a version message after one
// has already been received results in the peer being disconnected.
func TestDuplicateVersionMsg(t *testing.T) {
// Create a pair of peers that are connected to each other using a fake
// connection.
verack := make(chan struct{})
peerCfg := &peer.Config{
Listeners: peer.MessageListeners{
OnVerAck: func(p *peer.Peer, msg *wire.MsgVerAck) {
verack <- struct{}{}
},
},
UserAgentName: "peer",
UserAgentVersion: "1.0",
ChainParams: &chaincfg.MainNetParams,
Services: 0,
}
inConn, outConn := pipe(
&conn{laddr: "10.0.0.1:9108", raddr: "10.0.0.2:9108"},
&conn{laddr: "10.0.0.2:9108", raddr: "10.0.0.1:9108"},
)
outPeer, err := peer.NewOutboundPeer(peerCfg, inConn.laddr)
if err != nil {
t.Fatalf("NewOutboundPeer: unexpected err: %v\n", err)
}
outPeer.AssociateConnection(outConn)
inPeer := peer.NewInboundPeer(peerCfg)
inPeer.AssociateConnection(inConn)
// Wait for the veracks from the initial protocol version negotiation.
for i := 0; i < 2; i++ {
select {
case <-verack:
case <-time.After(time.Second):
t.Fatal("verack timeout")
}
}
// Queue a duplicate version message from the outbound peer and wait until
// it is sent.
done := make(chan struct{})
outPeer.QueueMessage(&wire.MsgVersion{}, done)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("send duplicate version timeout")
}
// Ensure the peer that is the recipient of the duplicate version closes the
// connection.
disconnected := make(chan struct{}, 1)
go func() {
inPeer.WaitForDisconnect()
disconnected <- struct{}{}
}()
select {
case <-disconnected:
case <-time.After(time.Second):
t.Fatal("peer did not disconnect")
}
}
func init() {
// Allow self connection when running the tests.
peer.TstAllowSelfConns()

149
server.go
View file

@ -1,5 +1,5 @@
// Copyright (c) 2013-2017 The btcsuite developers
// Copyright (c) 2015-2017 The Decred developers
// Copyright (c) 2015-2018 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
@ -385,10 +385,99 @@ func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
}
}
// hasServices returns whether or not the provided advertised service flags have
// all of the provided desired service flags set.
func hasServices(advertised, desired wire.ServiceFlag) bool {
return advertised&desired == desired
}
// OnVersion is invoked when a peer receives a version bitcoin message
// and is used to negotiate the protocol version details as well as kick start
// the communications.
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
// Update the address manager with the advertised services for outbound
// connections in case they have changed. This is not done for inbound
// connections to help prevent malicious behavior and is skipped when
// running on the simulation test network since it is only intended to
// connect to specified peers and actively avoids advertising and
// connecting to discovered peers.
//
// NOTE: This is done before rejecting peers that are too old to ensure
// it is updated regardless in the case a new minimum protocol version is
// enforced and the remote node has not upgraded yet.
isInbound := sp.Inbound()
remoteAddr := sp.NA()
addrManager := sp.server.addrManager
if !cfg.SimNet && !isInbound {
addrManager.SetServices(remoteAddr, msg.Services)
}
// Ignore peers that have a protcol version that is too old. The peer
// negotiation logic will disconnect it after this callback returns.
if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) {
return nil
}
// Reject outbound peers that are not full nodes.
wantServices := wire.SFNodeNetwork
if !isInbound && !hasServices(msg.Services, wantServices) {
missingServices := wantServices & ^msg.Services
srvrLog.Debugf("Rejecting peer %s with services %v due to not "+
"providing desired services %v", sp.Peer, msg.Services,
missingServices)
reason := fmt.Sprintf("required services %#x not offered",
uint64(missingServices))
return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason)
}
// Update the address manager and request known addresses from the
// remote peer for outbound connections. This is skipped when running
// on the simulation test network since it is only intended to connect
// to specified peers and actively avoids advertising and connecting to
// discovered peers.
if !cfg.SimNet && !isInbound {
// After soft-fork activation, only make outbound
// connection to peers if they flag that they're segwit
// enabled.
chain := sp.server.chain
segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
if err != nil {
peerLog.Errorf("Unable to query for segwit soft-fork state: %v",
err)
return nil
}
if segwitActive && !sp.IsWitnessEnabled() {
peerLog.Infof("Disconnecting non-segwit peer %v, isn't segwit "+
"enabled and we need more segwit enabled peers", sp)
sp.Disconnect()
return nil
}
// Advertise the local address when the server accepts incoming
// connections and it believes itself to be close to the best known tip.
if !cfg.DisableListen && sp.server.syncManager.IsCurrent() {
// Get address that best matches.
lna := addrManager.GetBestLocalAddress(remoteAddr)
if addrmgr.IsRoutable(lna) {
// Filter addresses the peer already knows about.
addresses := []*wire.NetAddress{lna}
sp.pushAddrMsg(addresses)
}
}
// Request known addresses if the server address manager needs
// more and the peer has a protocol version new enough to
// include a timestamp with addresses.
hasTimestamp := sp.ProtocolVersion() >= wire.NetAddressTimeVersion
if addrManager.NeedMoreAddresses() && hasTimestamp {
sp.QueueMessage(wire.NewMsgGetAddr(), nil)
}
// Mark the address as a known good address.
addrManager.Good(remoteAddr)
}
// Add the remote peer time as a sample for creating an offset against
// the local clock to keep the network time in sync.
sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
@ -400,63 +489,9 @@ func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
// is received.
sp.setDisableRelayTx(msg.DisableRelayTx)
// Update the address manager and request known addresses from the
// remote peer for outbound connections. This is skipped when running
// on the simulation test network since it is only intended to connect
// to specified peers and actively avoids advertising and connecting to
// discovered peers.
if !cfg.SimNet {
addrManager := sp.server.addrManager
// Outbound connections.
if !sp.Inbound() {
// After soft-fork activation, only make outbound
// connection to peers if they flag that they're segwit
// enabled.
chain := sp.server.chain
segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
if err != nil {
peerLog.Errorf("Unable to query for segwit "+
"soft-fork state: %v", err)
return
}
if segwitActive && !sp.IsWitnessEnabled() {
peerLog.Infof("Disconnecting non-segwit "+
"peer %v, isn't segwit enabled and "+
"we need more segwit enabled peers", sp)
sp.Disconnect()
return
}
// TODO(davec): Only do this if not doing the initial block
// download and the local address is routable.
if !cfg.DisableListen /* && isCurrent? */ {
// Get address that best matches.
lna := addrManager.GetBestLocalAddress(sp.NA())
if addrmgr.IsRoutable(lna) {
// Filter addresses the peer already knows about.
addresses := []*wire.NetAddress{lna}
sp.pushAddrMsg(addresses)
}
}
// Request known addresses if the server address manager needs
// more and the peer has a protocol version new enough to
// include a timestamp with addresses.
hasTimestamp := sp.ProtocolVersion() >=
wire.NetAddressTimeVersion
if addrManager.NeedMoreAddresses() && hasTimestamp {
sp.QueueMessage(wire.NewMsgGetAddr(), nil)
}
// Mark the address as a known good address.
addrManager.Good(sp.NA())
}
}
// Add valid peer to the server.
sp.server.AddPeer(sp)
return nil
}
// OnMemPool is invoked when a peer receives a mempool bitcoin message.