Merge pull request #744 from wpaulino/pruned-node-getnodeaddresses

chain: implement GetNodeAddresses fallback for PrunedBlockDispatcher
This commit is contained in:
Olaoluwa Osuntokun 2021-04-27 16:43:14 -07:00 committed by GitHub
commit cf6b7830cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 197 additions and 127 deletions

View file

@ -192,15 +192,14 @@ func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
if chainInfo.Pruned { if chainInfo.Pruned {
prunedBlockDispatcher, err = NewPrunedBlockDispatcher( prunedBlockDispatcher, err = NewPrunedBlockDispatcher(
&PrunedBlockDispatcherConfig{ &PrunedBlockDispatcherConfig{
ChainParams: cfg.ChainParams, ChainParams: cfg.ChainParams,
NumTargetPeers: cfg.PrunedModeMaxPeers, NumTargetPeers: cfg.PrunedModeMaxPeers,
Dial: cfg.Dialer, Dial: cfg.Dialer,
GetPeers: client.GetPeerInfo, GetPeers: client.GetPeerInfo,
PeerReadyTimeout: defaultPeerReadyTimeout, GetNodeAddresses: client.GetNodeAddresses,
RefreshPeersTicker: ticker.New( PeerReadyTimeout: defaultPeerReadyTimeout,
defaultRefreshPeersInterval, RefreshPeersTicker: ticker.New(defaultRefreshPeersInterval),
), MaxRequestInvs: wire.MaxInvPerMsg,
MaxRequestInvs: wire.MaxInvPerMsg,
}, },
) )
if err != nil { if err != nil {

View file

@ -37,8 +37,10 @@ const (
requiredServices = wire.SFNodeNetwork | wire.SFNodeWitness requiredServices = wire.SFNodeNetwork | wire.SFNodeWitness
// prunedNodeService is the service bit signaled by pruned nodes on the // prunedNodeService is the service bit signaled by pruned nodes on the
// network. // network. Note that this service bit can also be signaled by full
prunedNodeService wire.ServiceFlag = 1 << 11 // nodes, except that they also signal wire.SFNodeNetwork, where as
// pruned nodes don't.
prunedNodeService wire.ServiceFlag = 1 << 10
) )
// queryPeer represents a Bitcoin network peer that we'll query for blocks. // queryPeer represents a Bitcoin network peer that we'll query for blocks.
@ -97,6 +99,11 @@ type PrunedBlockDispatcherConfig struct {
// GetPeers retrieves the active set of peers known to the backend node. // GetPeers retrieves the active set of peers known to the backend node.
GetPeers func() ([]btcjson.GetPeerInfoResult, error) GetPeers func() ([]btcjson.GetPeerInfoResult, error)
// GetNodeAddresses returns random reachable addresses known to the
// backend node. An optional number of addresses to return can be
// provided, otherwise 8 are returned by default.
GetNodeAddresses func(*int32) ([]btcjson.GetNodeAddressesResult, error)
// PeerReadyTimeout is the amount of time we'll wait for a query peer to // PeerReadyTimeout is the amount of time we'll wait for a query peer to
// be ready to receive incoming block requests. Peers cannot respond to // be ready to receive incoming block requests. Peers cannot respond to
// requests until the version exchange is completed upon connection // requests until the version exchange is completed upon connection
@ -254,7 +261,7 @@ func (d *PrunedBlockDispatcher) pollPeers() {
// If we do, attempt to establish connections until // If we do, attempt to establish connections until
// we've reached our target number. // we've reached our target number.
if err := d.connectToPeers(); err != nil { if err := d.connectToPeers(); err != nil {
log.Warnf("Unable to establish peer "+ log.Warnf("Failed to establish peer "+
"connections: %v", err) "connections: %v", err)
continue continue
} }
@ -275,90 +282,150 @@ func (d *PrunedBlockDispatcher) connectToPeers() error {
if err != nil { if err != nil {
return err return err
} }
peers, err = filterPeers(peers) addrs, err := filterPeers(peers)
if err != nil { if err != nil {
return err return err
} }
rand.Shuffle(len(peers), func(i, j int) { rand.Shuffle(len(addrs), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i] addrs[i], addrs[j] = addrs[j], addrs[i]
}) })
// For each unbanned peer we don't already have a connection to, try to for _, addr := range addrs {
// establish one, and if successful, notify the peer. needMore, err := d.connectToPeer(addr)
for _, peer := range peers {
d.peerMtx.Lock()
_, isBanned := d.bannedPeers[peer.Addr]
_, isConnected := d.currentPeers[peer.Addr]
d.peerMtx.Unlock()
if isBanned || isConnected {
continue
}
queryPeer, err := d.newQueryPeer(peer)
if err != nil { if err != nil {
return fmt.Errorf("unable to configure query peer %v: "+ log.Debugf("Failed connecting to peer %v: %v", addr, err)
"%v", peer.Addr, err)
}
if err := d.connectToPeer(queryPeer); err != nil {
log.Debugf("Failed connecting to peer %v: %v",
peer.Addr, err)
continue continue
} }
if !needMore {
select { return nil
case d.peersConnected <- queryPeer:
case <-d.quit:
return errors.New("shutting down")
} }
}
// If the new peer helped us reach our target number, we're done // We still need more addresses so we'll also invoke the
// and can exit. // `getnodeaddresses` RPC to receive random reachable addresses. We'll
d.peerMtx.Lock() // also filter out any that do not meet our requirements. The nil
d.currentPeers[queryPeer.Addr()] = queryPeer.Peer // argument will return a default number of addresses, which is
numPeers := len(d.currentPeers) // currently 8. We don't care how many addresses are returned as long as
d.peerMtx.Unlock() // 1 is returned, since this will be polled regularly if needed.
if numPeers == d.cfg.NumTargetPeers { nodeAddrs, err := d.cfg.GetNodeAddresses(nil)
break if err != nil {
return err
}
addrs = filterNodeAddrs(nodeAddrs)
for _, addr := range addrs {
if _, err := d.connectToPeer(addr); err != nil {
log.Debugf("Failed connecting to peer %v: %v", addr, err)
} }
} }
return nil return nil
} }
// connectToPeer attempts to establish a connection to the given peer and waits
// up to PeerReadyTimeout for the version exchange to complete so that we can
// begin sending it our queries.
func (d *PrunedBlockDispatcher) connectToPeer(addr string) (bool, error) {
// Prevent connections to peers we've already connected to or we've
// banned.
d.peerMtx.Lock()
_, isBanned := d.bannedPeers[addr]
_, isConnected := d.currentPeers[addr]
d.peerMtx.Unlock()
if isBanned || isConnected {
return true, nil
}
peer, err := d.newQueryPeer(addr)
if err != nil {
return true, fmt.Errorf("unable to configure query peer %v: "+
"%v", addr, err)
}
// Establish the connection and wait for the protocol negotiation to
// complete.
conn, err := d.cfg.Dial(addr)
if err != nil {
return true, err
}
peer.AssociateConnection(conn)
select {
case <-peer.ready:
case <-time.After(d.cfg.PeerReadyTimeout):
peer.Disconnect()
return true, errors.New("timed out waiting for protocol negotiation")
case <-d.quit:
return false, errors.New("shutting down")
}
// Remove the peer once it has disconnected.
peer.signalUponDisconnect(func() {
d.peerMtx.Lock()
delete(d.currentPeers, peer.Addr())
d.peerMtx.Unlock()
})
d.peerMtx.Lock()
d.currentPeers[addr] = peer.Peer
numPeers := len(d.currentPeers)
d.peerMtx.Unlock()
// Notify the new peer connection to our workManager.
select {
case d.peersConnected <- peer:
case <-d.quit:
return false, errors.New("shutting down")
}
// Request more peer connections if we haven't reached our target number
// with the new peer.
return numPeers < d.cfg.NumTargetPeers, nil
}
// filterPeers filters out any peers which cannot handle arbitrary witness block // filterPeers filters out any peers which cannot handle arbitrary witness block
// requests, i.e., any peer which is not considered a segwit-enabled // requests, i.e., any peer which is not considered a segwit-enabled
// "full-node". // "full-node".
func filterPeers(peers []btcjson.GetPeerInfoResult) ( func filterPeers(peers []btcjson.GetPeerInfoResult) ([]string, error) {
[]btcjson.GetPeerInfoResult, error) { var eligible []string
var eligible []btcjson.GetPeerInfoResult
for _, peer := range peers { for _, peer := range peers {
rawServices, err := hex.DecodeString(peer.Services) rawServices, err := hex.DecodeString(peer.Services)
if err != nil { if err != nil {
return nil, err return nil, err
} }
services := wire.ServiceFlag(binary.BigEndian.Uint64(rawServices)) services := wire.ServiceFlag(binary.BigEndian.Uint64(rawServices))
if !satisfiesRequiredServices(services) {
// Skip nodes that cannot serve full block witness data.
if services&requiredServices != requiredServices {
continue continue
} }
// Skip pruned nodes. eligible = append(eligible, peer.Addr)
if services&prunedNodeService == prunedNodeService {
continue
}
eligible = append(eligible, peer)
} }
return eligible, nil return eligible, nil
} }
// filterNodeAddrs filters out any peers which cannot handle arbitrary witness
// block requests, i.e., any peer which is not considered a segwit-enabled
// "full-node".
func filterNodeAddrs(nodeAddrs []btcjson.GetNodeAddressesResult) []string {
var eligible []string
for _, nodeAddr := range nodeAddrs {
services := wire.ServiceFlag(nodeAddr.Services)
if !satisfiesRequiredServices(services) {
continue
}
eligible = append(eligible, nodeAddr.Address)
}
return eligible
}
// satisfiesRequiredServices determines whether the services signaled by a peer
// satisfy our requirements for retrieving pruned blocks from them.
func satisfiesRequiredServices(services wire.ServiceFlag) bool {
return services&requiredServices == requiredServices &&
services&prunedNodeService != prunedNodeService
}
// newQueryPeer creates a new peer instance configured to relay any received // newQueryPeer creates a new peer instance configured to relay any received
// messages to the internal workManager. // messages to the internal workManager.
func (d *PrunedBlockDispatcher) newQueryPeer( func (d *PrunedBlockDispatcher) newQueryPeer(addr string) (*queryPeer, error) {
peerInfo btcjson.GetPeerInfoResult) (*queryPeer, error) {
ready := make(chan struct{}) ready := make(chan struct{})
msgsRecvd := make(chan wire.Message) msgsRecvd := make(chan wire.Message)
@ -390,7 +457,8 @@ func (d *PrunedBlockDispatcher) newQueryPeer(
switch msg := msg.(type) { switch msg := msg.(type) {
case *wire.MsgBlock: case *wire.MsgBlock:
block = msg block = msg
case *wire.MsgVersion, *wire.MsgVerAck: case *wire.MsgVersion, *wire.MsgVerAck,
*wire.MsgPing, *wire.MsgPong:
return return
default: default:
log.Debugf("Received unexpected message "+ log.Debugf("Received unexpected message "+
@ -406,7 +474,7 @@ func (d *PrunedBlockDispatcher) newQueryPeer(
}, },
AllowSelfConns: true, AllowSelfConns: true,
} }
p, err := peer.NewOutboundPeer(cfg, peerInfo.Addr) p, err := peer.NewOutboundPeer(cfg, addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -419,35 +487,6 @@ func (d *PrunedBlockDispatcher) newQueryPeer(
}, nil }, nil
} }
// connectToPeer attempts to establish a connection to the given peer and waits
// up to PeerReadyTimeout for the version exchange to complete so that we can
// begin sending it our queries.
func (d *PrunedBlockDispatcher) connectToPeer(peer *queryPeer) error {
conn, err := d.cfg.Dial(peer.Addr())
if err != nil {
return err
}
peer.AssociateConnection(conn)
select {
case <-peer.ready:
case <-time.After(d.cfg.PeerReadyTimeout):
peer.Disconnect()
return errors.New("timed out waiting for protocol negotiation")
case <-d.quit:
return errors.New("shutting down")
}
// Remove the peer once it has disconnected.
peer.signalUponDisconnect(func() {
d.peerMtx.Lock()
delete(d.currentPeers, peer.Addr())
d.peerMtx.Unlock()
})
return nil
}
// banPeer bans a peer by disconnecting them and ensuring we don't reconnect. // banPeer bans a peer by disconnecting them and ensuring we don't reconnect.
func (d *PrunedBlockDispatcher) banPeer(peer string) { func (d *PrunedBlockDispatcher) banPeer(peer string) {
d.peerMtx.Lock() d.peerMtx.Lock()

View file

@ -5,7 +5,6 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net" "net"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -16,18 +15,10 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func init() {
b := btclog.NewBackend(os.Stdout)
l := b.Logger("")
l.SetLevel(btclog.LevelTrace)
UseLogger(l)
}
var ( var (
addrCounter int32 // Increased atomically. addrCounter int32 // Increased atomically.
@ -49,12 +40,13 @@ type prunedBlockDispatcherHarness struct {
hashes []*chainhash.Hash hashes []*chainhash.Hash
blocks map[chainhash.Hash]*wire.MsgBlock blocks map[chainhash.Hash]*wire.MsgBlock
peerMtx sync.Mutex peerMtx sync.Mutex
peers map[string]*peer.Peer peers map[string]*peer.Peer
localConns map[string]net.Conn // Connections to peers. fallbackAddrs map[string]*peer.Peer
remoteConns map[string]net.Conn // Connections from peers. localConns map[string]net.Conn // Connections to peers.
remoteConns map[string]net.Conn // Connections from peers.
dialedPeer chan struct{} dialedPeer chan string
queriedPeer chan struct{} queriedPeer chan struct{}
blocksQueried map[chainhash.Hash]int blocksQueried map[chainhash.Hash]int
@ -68,22 +60,25 @@ func newNetworkBlockTestHarness(t *testing.T, numBlocks,
h := &prunedBlockDispatcherHarness{ h := &prunedBlockDispatcherHarness{
t: t, t: t,
dispatcher: &PrunedBlockDispatcher{},
peers: make(map[string]*peer.Peer, numPeers), peers: make(map[string]*peer.Peer, numPeers),
fallbackAddrs: make(map[string]*peer.Peer, numPeers),
localConns: make(map[string]net.Conn, numPeers), localConns: make(map[string]net.Conn, numPeers),
remoteConns: make(map[string]net.Conn, numPeers), remoteConns: make(map[string]net.Conn, numPeers),
dialedPeer: make(chan struct{}), dialedPeer: make(chan string),
queriedPeer: make(chan struct{}), queriedPeer: make(chan struct{}),
blocksQueried: make(map[chainhash.Hash]int), blocksQueried: make(map[chainhash.Hash]int),
shouldReply: 0,
} }
h.hashes, h.blocks = genBlockChain(numBlocks) h.hashes, h.blocks = genBlockChain(numBlocks)
for i := uint32(0); i < numPeers; i++ { for i := uint32(0); i < numPeers; i++ {
h.addPeer() h.addPeer(false)
} }
dial := func(addr string) (net.Conn, error) { dial := func(addr string) (net.Conn, error) {
go func() { go func() {
h.dialedPeer <- struct{}{} h.dialedPeer <- addr
}() }()
h.peerMtx.Lock() h.peerMtx.Lock()
@ -98,7 +93,12 @@ func newNetworkBlockTestHarness(t *testing.T, numBlocks,
return nil, fmt.Errorf("remote conn %v not found", addr) return nil, fmt.Errorf("remote conn %v not found", addr)
} }
h.peers[addr].AssociateConnection(remoteConn) if p, ok := h.peers[addr]; ok {
p.AssociateConnection(remoteConn)
}
if p, ok := h.fallbackAddrs[addr]; ok {
p.AssociateConnection(remoteConn)
}
return localConn, nil return localConn, nil
} }
@ -126,6 +126,22 @@ func newNetworkBlockTestHarness(t *testing.T, numBlocks,
return res, nil return res, nil
}, },
GetNodeAddresses: func(*int32) ([]btcjson.GetNodeAddressesResult, error) {
h.peerMtx.Lock()
defer h.peerMtx.Unlock()
res := make(
[]btcjson.GetNodeAddressesResult, 0,
len(h.fallbackAddrs),
)
for addr, peer := range h.fallbackAddrs {
res = append(res, btcjson.GetNodeAddressesResult{
Services: uint64(peer.Services()),
Address: addr,
})
}
return res, nil
},
PeerReadyTimeout: time.Hour, PeerReadyTimeout: time.Hour,
RefreshPeersTicker: ticker.NewForce(time.Hour), RefreshPeersTicker: ticker.NewForce(time.Hour),
AllowSelfPeerConns: true, AllowSelfPeerConns: true,
@ -175,20 +191,24 @@ func (h *prunedBlockDispatcherHarness) stop() {
// addPeer adds a new random peer available for use by the // addPeer adds a new random peer available for use by the
// PrunedBlockDispatcher. // PrunedBlockDispatcher.
func (h *prunedBlockDispatcherHarness) addPeer() string { func (h *prunedBlockDispatcherHarness) addPeer(fallback bool) string {
addr := nextAddr() addr := nextAddr()
h.peerMtx.Lock() h.peerMtx.Lock()
defer h.peerMtx.Unlock() defer h.peerMtx.Unlock()
h.resetPeer(addr) h.resetPeer(addr, fallback)
return addr return addr
} }
// resetPeer resets the internal peer connection state allowing the // resetPeer resets the internal peer connection state allowing the
// PrunedBlockDispatcher to establish a mock connection to it. // PrunedBlockDispatcher to establish a mock connection to it.
func (h *prunedBlockDispatcherHarness) resetPeer(addr string) { func (h *prunedBlockDispatcherHarness) resetPeer(addr string, fallback bool) {
h.peers[addr] = h.newPeer() if fallback {
h.fallbackAddrs[addr] = h.newPeer()
} else {
h.peers[addr] = h.newPeer()
}
// Establish a mock connection between us and each peer. // Establish a mock connection between us and each peer.
inConn, outConn := pipe( inConn, outConn := pipe(
@ -280,7 +300,7 @@ func (h *prunedBlockDispatcherHarness) refreshPeers() {
} }
// disconnectPeer simulates a peer disconnecting from the PrunedBlockDispatcher. // disconnectPeer simulates a peer disconnecting from the PrunedBlockDispatcher.
func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string) { func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string, fallback bool) {
h.t.Helper() h.t.Helper()
h.peerMtx.Lock() h.peerMtx.Lock()
@ -303,7 +323,7 @@ func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string) {
}, time.Second, 200*time.Millisecond) }, time.Second, 200*time.Millisecond)
// Reset the peer connection state to allow connections to them again. // Reset the peer connection state to allow connections to them again.
h.resetPeer(addr) h.resetPeer(addr, fallback)
} }
// assertPeerDialed asserts that a connection was made to the given peer. // assertPeerDialed asserts that a connection was made to the given peer.
@ -317,6 +337,18 @@ func (h *prunedBlockDispatcherHarness) assertPeerDialed() {
} }
} }
// assertPeerDialedWithAddr asserts that a connection was made to the given peer.
func (h *prunedBlockDispatcherHarness) assertPeerDialedWithAddr(addr string) {
h.t.Helper()
select {
case dialedAddr := <-h.dialedPeer:
require.Equal(h.t, addr, dialedAddr)
case <-time.After(5 * time.Second):
h.t.Fatalf("expected peer to be dialed")
}
}
// assertPeerQueried asserts that query was sent to the given peer. // assertPeerQueried asserts that query was sent to the given peer.
func (h *prunedBlockDispatcherHarness) assertPeerQueried() { func (h *prunedBlockDispatcherHarness) assertPeerQueried() {
h.t.Helper() h.t.Helper()
@ -494,7 +526,7 @@ func TestPrunedBlockDispatcherMultipleQueryPeers(t *testing.T) {
// We should see one query per block. // We should see one query per block.
for i := 0; i < numBlocks; i++ { for i := 0; i < numBlocks; i++ {
h.assertPeerQueried() h.assertPeerQueried()
h.assertPeerReplied(blockChans[i], errChans[i], i == numBlocks-1) h.assertPeerReplied(blockChans[i], errChans[i], true)
} }
} }
@ -523,13 +555,13 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) {
// We'll disable replies for now, as we'll want to test the disconnect // We'll disable replies for now, as we'll want to test the disconnect
// case. // case.
h.disablePeerReplies() h.disablePeerReplies()
peer := h.addPeer() peer := h.addPeer(false)
h.refreshPeers() h.refreshPeers()
h.assertPeerDialed() h.assertPeerDialedWithAddr(peer)
h.assertPeerQueried() h.assertPeerQueried()
// Disconnect our peer and re-enable replies. // Disconnect our peer and re-enable replies.
h.disconnectPeer(peer) h.disconnectPeer(peer, false)
h.enablePeerReplies() h.enablePeerReplies()
h.assertNoReply(blockChan, errChan) h.assertNoReply(blockChan, errChan)
@ -539,11 +571,11 @@ func TestPrunedBlockDispatcherPeerPoller(t *testing.T) {
h.assertPeerDialed() h.assertPeerDialed()
h.assertPeerQueried() h.assertPeerQueried()
// Refresh our peers again. We can afford to have one more query peer, // Add a fallback addresses and force refresh our peers again. We can
// but there isn't another one available. We also shouldn't dial the one // afford to have one more query peer, so a connection should be made.
// we're currently connected to again. fallbackPeer := h.addPeer(true)
h.refreshPeers() h.refreshPeers()
h.assertNoPeerDialed() h.assertPeerDialedWithAddr(fallbackPeer)
// Now that we know we've connected to the peer, we should be able to // Now that we know we've connected to the peer, we should be able to
// receive their response. // receive their response.
@ -578,7 +610,7 @@ func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) {
// Signal to our peers to send valid replies and add a new peer. // Signal to our peers to send valid replies and add a new peer.
h.enablePeerReplies() h.enablePeerReplies()
_ = h.addPeer() _ = h.addPeer(false)
// Force a refresh, which should cause our new peer to be dialed and // Force a refresh, which should cause our new peer to be dialed and
// queried. We expect them to send a valid block and fulfill our // queried. We expect them to send a valid block and fulfill our