[lbry] build: remove neutrino and bitcoind support

This commit is contained in:
Roy Lee 2021-08-25 14:03:05 -07:00
parent 0e82225623
commit 8701ecb329
11 changed files with 49 additions and 4115 deletions

View file

@ -10,15 +10,12 @@ import (
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"path/filepath"
"runtime" "runtime"
"sync" "sync"
"github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/rpc/legacyrpc" "github.com/btcsuite/btcwallet/rpc/legacyrpc"
"github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightninglabs/neutrino"
) )
var ( var (
@ -156,44 +153,10 @@ func rpcClientConnectLoop(legacyRPCServer *legacyrpc.Server, loader *wallet.Load
err error err error
) )
if cfg.UseSPV { chainClient, err = startChainRPC(certs)
var ( if err != nil {
chainService *neutrino.ChainService log.Errorf("Unable to open connection to consensus RPC server: %v", err)
spvdb walletdb.DB continue
)
netDir := networkDir(cfg.AppDataDir.Value, activeNet.Params)
spvdb, err = walletdb.Create(
"bdb", filepath.Join(netDir, "neutrino.db"),
true, cfg.DBTimeout,
)
if err != nil {
log.Errorf("Unable to create Neutrino DB: %s", err)
continue
}
defer spvdb.Close()
chainService, err = neutrino.NewChainService(
neutrino.Config{
DataDir: netDir,
Database: spvdb,
ChainParams: *activeNet.Params,
ConnectPeers: cfg.ConnectPeers,
AddPeers: cfg.AddPeers,
})
if err != nil {
log.Errorf("Couldn't create Neutrino ChainService: %s", err)
continue
}
chainClient = chain.NewNeutrinoClient(activeNet.Params, chainService)
err = chainClient.Start()
if err != nil {
log.Errorf("Couldn't start Neutrino client: %s", err)
}
} else {
chainClient, err = startChainRPC(certs)
if err != nil {
log.Errorf("Unable to open connection to consensus RPC server: %v", err)
continue
}
} }
// Rather than inlining this logic directly into the loader // Rather than inlining this logic directly into the loader

File diff suppressed because it is too large Load diff

View file

@ -1,602 +0,0 @@
package chain
import (
"bytes"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/gozmq"
"github.com/lightningnetwork/lnd/ticker"
)
const (
// rawBlockZMQCommand is the command used to receive raw block
// notifications from bitcoind through ZMQ.
rawBlockZMQCommand = "rawblock"
// rawTxZMQCommand is the command used to receive raw transaction
// notifications from bitcoind through ZMQ.
rawTxZMQCommand = "rawtx"
// maxRawBlockSize is the maximum size in bytes for a raw block received
// from bitcoind through ZMQ.
maxRawBlockSize = 4e6
// maxRawTxSize is the maximum size in bytes for a raw transaction
// received from bitcoind through ZMQ.
maxRawTxSize = maxRawBlockSize
// seqNumLen is the length of the sequence number of a message sent from
// bitcoind through ZMQ.
seqNumLen = 4
// errBlockPrunedStr is the error message returned by bitcoind upon
// calling GetBlock on a pruned block.
errBlockPrunedStr = "Block not available (pruned data)"
)
// BitcoindConfig contains all of the parameters required to establish a
// connection to a bitcoind's RPC.
type BitcoindConfig struct {
// ChainParams are the chain parameters the bitcoind server is running
// on.
ChainParams *chaincfg.Params
// Host is the IP address and port of the bitcoind's RPC server.
Host string
// User is the username to use to authenticate to bitcoind's RPC server.
User string
// Pass is the passphrase to use to authenticate to bitcoind's RPC
// server.
Pass string
// ZMQBlockHost is the IP address and port of the bitcoind's rawblock
// listener.
ZMQBlockHost string
// ZMQTxHost is the IP address and port of the bitcoind's rawtx
// listener.
ZMQTxHost string
// ZMQReadDeadline represents the read deadline we'll apply when reading
// ZMQ messages from either subscription.
ZMQReadDeadline time.Duration
// Dialer is a closure we'll use to dial Bitcoin peers. If the chain
// backend is running over Tor, this must support dialing peers over Tor
// as well.
Dialer Dialer
// PrunedModeMaxPeers is the maximum number of peers we'll attempt to
// retrieve pruned blocks from.
//
// NOTE: This only applies for pruned bitcoind nodes.
PrunedModeMaxPeers int
}
// BitcoindConn represents a persistent client connection to a bitcoind node
// that listens for events read from a ZMQ connection.
type BitcoindConn struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
// rescanClientCounter is an atomic counter that assigns a unique ID to
// each new bitcoind rescan client using the current bitcoind
// connection.
rescanClientCounter uint64
cfg BitcoindConfig
// client is the RPC client to the bitcoind node.
client *rpcclient.Client
// prunedBlockDispatcher handles all of the pruned block requests.
//
// NOTE: This is nil when the bitcoind node is not pruned.
prunedBlockDispatcher *PrunedBlockDispatcher
// zmqBlockConn is the ZMQ connection we'll use to read raw block
// events.
zmqBlockConn *gozmq.Conn
// zmqTxConn is the ZMQ connection we'll use to read raw transaction
// events.
zmqTxConn *gozmq.Conn
// rescanClients is the set of active bitcoind rescan clients to which
// ZMQ event notfications will be sent to.
rescanClientsMtx sync.Mutex
rescanClients map[uint64]*BitcoindClient
quit chan struct{}
wg sync.WaitGroup
}
// Dialer represents a way to dial Bitcoin peers. If the chain backend is
// running over Tor, this must support dialing peers over Tor as well.
type Dialer = func(string) (net.Conn, error)
// NewBitcoindConn creates a client connection to the node described by the host
// string. The ZMQ connections are established immediately to ensure liveness.
// If the remote node does not operate on the same bitcoin network as described
// by the passed chain parameters, the connection will be disconnected.
func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) {
clientCfg := &rpcclient.ConnConfig{
Host: cfg.Host,
User: cfg.User,
Pass: cfg.Pass,
DisableAutoReconnect: false,
DisableConnectOnNew: true,
DisableTLS: true,
HTTPPostMode: true,
}
client, err := rpcclient.New(clientCfg, nil)
if err != nil {
return nil, err
}
// Verify that the node is running on the expected network.
net, err := getCurrentNet(client)
if err != nil {
return nil, err
}
if net != cfg.ChainParams.Net {
return nil, fmt.Errorf("expected network %v, got %v",
cfg.ChainParams.Net, net)
}
// Check if the node is pruned, as we'll need to perform additional
// operations if so.
chainInfo, err := client.GetBlockChainInfo()
if err != nil {
return nil, fmt.Errorf("unable to determine if bitcoind is "+
"pruned: %v", err)
}
// Establish two different ZMQ connections to bitcoind to retrieve block
// and transaction event notifications. We'll use two as a separation of
// concern to ensure one type of event isn't dropped from the connection
// queue due to another type of event filling it up.
zmqBlockConn, err := gozmq.Subscribe(
cfg.ZMQBlockHost, []string{rawBlockZMQCommand},
cfg.ZMQReadDeadline,
)
if err != nil {
return nil, fmt.Errorf("unable to subscribe for zmq block "+
"events: %v", err)
}
zmqTxConn, err := gozmq.Subscribe(
cfg.ZMQTxHost, []string{rawTxZMQCommand}, cfg.ZMQReadDeadline,
)
if err != nil {
zmqBlockConn.Close()
return nil, fmt.Errorf("unable to subscribe for zmq tx "+
"events: %v", err)
}
// Only initialize the PrunedBlockDispatcher when the connected bitcoind
// node is pruned.
var prunedBlockDispatcher *PrunedBlockDispatcher
if chainInfo.Pruned {
prunedBlockDispatcher, err = NewPrunedBlockDispatcher(
&PrunedBlockDispatcherConfig{
ChainParams: cfg.ChainParams,
NumTargetPeers: cfg.PrunedModeMaxPeers,
Dial: cfg.Dialer,
GetPeers: client.GetPeerInfo,
GetNodeAddresses: client.GetNodeAddresses,
PeerReadyTimeout: defaultPeerReadyTimeout,
RefreshPeersTicker: ticker.New(defaultRefreshPeersInterval),
MaxRequestInvs: wire.MaxInvPerMsg,
},
)
if err != nil {
return nil, err
}
}
return &BitcoindConn{
cfg: *cfg,
client: client,
prunedBlockDispatcher: prunedBlockDispatcher,
zmqBlockConn: zmqBlockConn,
zmqTxConn: zmqTxConn,
rescanClients: make(map[uint64]*BitcoindClient),
quit: make(chan struct{}),
}, nil
}
// Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If
// successful, a goroutine is spawned to read events from the ZMQ connection.
// It's possible for this function to fail due to a limited number of connection
// attempts. This is done to prevent waiting forever on the connection to be
// established in the case that the node is down.
func (c *BitcoindConn) Start() error {
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
return nil
}
// If we're connected to a pruned backend, we'll need to also start our
// pruned block dispatcher to handle pruned block requests.
if c.prunedBlockDispatcher != nil {
log.Debug("Detected pruned bitcoind backend")
if err := c.prunedBlockDispatcher.Start(); err != nil {
return err
}
}
c.wg.Add(2)
go c.blockEventHandler()
go c.txEventHandler()
return nil
}
// Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any
// active rescan clients.
func (c *BitcoindConn) Stop() {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return
}
for _, client := range c.rescanClients {
client.Stop()
}
close(c.quit)
c.client.Shutdown()
c.zmqBlockConn.Close()
c.zmqTxConn.Close()
if c.prunedBlockDispatcher != nil {
c.prunedBlockDispatcher.Stop()
}
c.client.WaitForShutdown()
c.wg.Wait()
}
// blockEventHandler reads raw blocks events from the ZMQ block socket and
// forwards them along to the current rescan clients.
//
// NOTE: This must be run as a goroutine.
func (c *BitcoindConn) blockEventHandler() {
defer c.wg.Done()
log.Info("Started listening for bitcoind block notifications via ZMQ "+
"on", c.zmqBlockConn.RemoteAddr())
// Set up the buffers we expect our messages to consume. ZMQ
// messages from bitcoind include three parts: the command, the
// data, and the sequence number.
//
// We'll allocate a fixed data slice that we'll reuse when reading
// blocks from bitcoind through ZMQ. There's no need to recycle this
// slice (zero out) after using it, as further reads will overwrite the
// slice and we'll only be deserializing the bytes needed.
var (
command [len(rawBlockZMQCommand)]byte
seqNum [seqNumLen]byte
data = make([]byte, maxRawBlockSize)
)
for {
// Before attempting to read from the ZMQ socket, we'll make
// sure to check if we've been requested to shut down.
select {
case <-c.quit:
return
default:
}
// Poll an event from the ZMQ socket.
var (
bufs = [][]byte{command[:], data, seqNum[:]}
err error
)
bufs, err = c.zmqBlockConn.Receive(bufs)
if err != nil {
// EOF should only be returned if the connection was
// explicitly closed, so we can exit at this point.
if err == io.EOF {
return
}
// It's possible that the connection to the socket
// continuously times out, so we'll prevent logging this
// error to prevent spamming the logs.
netErr, ok := err.(net.Error)
if ok && netErr.Timeout() {
log.Trace("Re-establishing timed out ZMQ " +
"block connection")
continue
}
log.Errorf("Unable to receive ZMQ %v message: %v",
rawBlockZMQCommand, err)
continue
}
// We have an event! We'll now ensure it is a block event,
// deserialize it, and report it to the different rescan
// clients.
eventType := string(bufs[0])
switch eventType {
case rawBlockZMQCommand:
block := &wire.MsgBlock{}
r := bytes.NewReader(bufs[1])
if err := block.Deserialize(r); err != nil {
log.Errorf("Unable to deserialize block: %v",
err)
continue
}
c.rescanClientsMtx.Lock()
for _, client := range c.rescanClients {
select {
case client.zmqBlockNtfns <- block:
case <-client.quit:
case <-c.quit:
c.rescanClientsMtx.Unlock()
return
}
}
c.rescanClientsMtx.Unlock()
default:
// It's possible that the message wasn't fully read if
// bitcoind shuts down, which will produce an unreadable
// event type. To prevent from logging it, we'll make
// sure it conforms to the ASCII standard.
if eventType == "" || !isASCII(eventType) {
continue
}
log.Warnf("Received unexpected event type from %v "+
"subscription: %v", rawBlockZMQCommand,
eventType)
}
}
}
// txEventHandler reads raw blocks events from the ZMQ block socket and forwards
// them along to the current rescan clients.
//
// NOTE: This must be run as a goroutine.
func (c *BitcoindConn) txEventHandler() {
defer c.wg.Done()
log.Info("Started listening for bitcoind transaction notifications "+
"via ZMQ on", c.zmqTxConn.RemoteAddr())
// Set up the buffers we expect our messages to consume. ZMQ
// messages from bitcoind include three parts: the command, the
// data, and the sequence number.
//
// We'll allocate a fixed data slice that we'll reuse when reading
// transactions from bitcoind through ZMQ. There's no need to recycle
// this slice (zero out) after using it, as further reads will overwrite
// the slice and we'll only be deserializing the bytes needed.
var (
command [len(rawTxZMQCommand)]byte
seqNum [seqNumLen]byte
data = make([]byte, maxRawTxSize)
)
for {
// Before attempting to read from the ZMQ socket, we'll make
// sure to check if we've been requested to shut down.
select {
case <-c.quit:
return
default:
}
// Poll an event from the ZMQ socket.
var (
bufs = [][]byte{command[:], data, seqNum[:]}
err error
)
bufs, err = c.zmqTxConn.Receive(bufs)
if err != nil {
// EOF should only be returned if the connection was
// explicitly closed, so we can exit at this point.
if err == io.EOF {
return
}
// It's possible that the connection to the socket
// continuously times out, so we'll prevent logging this
// error to prevent spamming the logs.
netErr, ok := err.(net.Error)
if ok && netErr.Timeout() {
log.Trace("Re-establishing timed out ZMQ " +
"transaction connection")
continue
}
log.Errorf("Unable to receive ZMQ %v message: %v",
rawTxZMQCommand, err)
continue
}
// We have an event! We'll now ensure it is a transaction event,
// deserialize it, and report it to the different rescan
// clients.
eventType := string(bufs[0])
switch eventType {
case rawTxZMQCommand:
tx := &wire.MsgTx{}
r := bytes.NewReader(bufs[1])
if err := tx.Deserialize(r); err != nil {
log.Errorf("Unable to deserialize "+
"transaction: %v", err)
continue
}
c.rescanClientsMtx.Lock()
for _, client := range c.rescanClients {
select {
case client.zmqTxNtfns <- tx:
case <-client.quit:
case <-c.quit:
c.rescanClientsMtx.Unlock()
return
}
}
c.rescanClientsMtx.Unlock()
default:
// It's possible that the message wasn't fully read if
// bitcoind shuts down, which will produce an unreadable
// event type. To prevent from logging it, we'll make
// sure it conforms to the ASCII standard.
if eventType == "" || !isASCII(eventType) {
continue
}
log.Warnf("Received unexpected event type from %v "+
"subscription: %v", rawTxZMQCommand, eventType)
}
}
}
// getCurrentNet returns the network on which the bitcoind node is running.
func getCurrentNet(client *rpcclient.Client) (wire.BitcoinNet, error) {
hash, err := client.GetBlockHash(0)
if err != nil {
return 0, err
}
switch *hash {
case *chaincfg.TestNet3Params.GenesisHash:
return chaincfg.TestNet3Params.Net, nil
case *chaincfg.RegressionNetParams.GenesisHash:
return chaincfg.RegressionNetParams.Net, nil
case *chaincfg.SigNetParams.GenesisHash:
return chaincfg.SigNetParams.Net, nil
case *chaincfg.MainNetParams.GenesisHash:
return chaincfg.MainNetParams.Net, nil
default:
return 0, fmt.Errorf("unknown network with genesis hash %v", hash)
}
}
// NewBitcoindClient returns a bitcoind client using the current bitcoind
// connection. This allows us to share the same connection using multiple
// clients.
func (c *BitcoindConn) NewBitcoindClient() *BitcoindClient {
return &BitcoindClient{
quit: make(chan struct{}),
id: atomic.AddUint64(&c.rescanClientCounter, 1),
chainConn: c,
rescanUpdate: make(chan interface{}),
watchedAddresses: make(map[string]struct{}),
watchedOutPoints: make(map[wire.OutPoint]struct{}),
watchedTxs: make(map[chainhash.Hash]struct{}),
notificationQueue: NewConcurrentQueue(20),
zmqTxNtfns: make(chan *wire.MsgTx),
zmqBlockNtfns: make(chan *wire.MsgBlock),
mempool: make(map[chainhash.Hash]struct{}),
expiredMempool: make(map[int32]map[chainhash.Hash]struct{}),
}
}
// AddClient adds a client to the set of active rescan clients of the current
// chain connection. This allows the connection to include the specified client
// in its notification delivery.
//
// NOTE: This function is safe for concurrent access.
func (c *BitcoindConn) AddClient(client *BitcoindClient) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()
c.rescanClients[client.id] = client
}
// RemoveClient removes the client with the given ID from the set of active
// rescan clients. Once removed, the client will no longer receive block and
// transaction notifications from the chain connection.
//
// NOTE: This function is safe for concurrent access.
func (c *BitcoindConn) RemoveClient(id uint64) {
c.rescanClientsMtx.Lock()
defer c.rescanClientsMtx.Unlock()
delete(c.rescanClients, id)
}
// isBlockPrunedErr determines if the error returned by the GetBlock RPC
// corresponds to the requested block being pruned.
func isBlockPrunedErr(err error) bool {
rpcErr, ok := err.(*btcjson.RPCError)
return ok && rpcErr.Code == btcjson.ErrRPCMisc &&
rpcErr.Message == errBlockPrunedStr
}
// GetBlock returns a raw block from the server given its hash. If the server
// has already pruned the block, it will be retrieved from one of its peers.
func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
block, err := c.client.GetBlock(hash)
// Got the block from the backend successfully, return it.
if err == nil {
return block, nil
}
// We failed getting the block from the backend for whatever reason. If
// it wasn't due to the block being pruned, return the error
// immediately.
if !isBlockPrunedErr(err) || c.prunedBlockDispatcher == nil {
return nil, err
}
// Now that we know the block has been pruned for sure, request it from
// our backend peers.
blockChan, errChan := c.prunedBlockDispatcher.Query(
[]*chainhash.Hash{hash},
)
for {
select {
case block := <-blockChan:
return block, nil
case err := <-errChan:
if err != nil {
return nil, err
}
// errChan fired before blockChan with a nil error, wait
// for the block now.
case <-c.quit:
return nil, ErrBitcoindClientShuttingDown
}
}
}
// isASCII is a helper method that checks whether all bytes in `data` would be
// printable ASCII characters if interpreted as a string.
func isASCII(s string) bool {
for _, c := range s {
if c < 32 || c > 126 {
return false
}
}
return true
}

View file

@ -21,7 +21,6 @@ func BackEnds() []string {
return []string{ return []string{
"bitcoind", "bitcoind",
"btcd", "btcd",
"neutrino",
} }
} }

View file

@ -1,265 +1,9 @@
package chain package chain
import ( import (
"errors"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightninglabs/neutrino"
"github.com/lightninglabs/neutrino/headerfs"
) )
// NeutrinoClient is an implementation of the btcwalet chain.Interface interface.
type NeutrinoClient struct {
CS *neutrino.ChainService
chainParams *chaincfg.Params
// We currently support one rescan/notifiction goroutine per client
rescan *neutrino.Rescan
enqueueNotification chan interface{}
dequeueNotification chan interface{}
startTime time.Time
lastProgressSent bool
lastFilteredBlockHeader *wire.BlockHeader
currentBlock chan *waddrmgr.BlockStamp
quit chan struct{}
rescanQuit chan struct{}
rescanErr <-chan error
wg sync.WaitGroup
started bool
scanning bool
finished bool
isRescan bool
clientMtx sync.Mutex
}
// NewNeutrinoClient creates a new NeutrinoClient struct with a backing
// ChainService.
func NewNeutrinoClient(chainParams *chaincfg.Params,
chainService *neutrino.ChainService) *NeutrinoClient {
return &NeutrinoClient{
CS: chainService,
chainParams: chainParams,
}
}
// BackEnd returns the name of the driver.
func (s *NeutrinoClient) BackEnd() string {
return "neutrino"
}
// Start replicates the RPC client's Start method.
func (s *NeutrinoClient) Start() error {
if err := s.CS.Start(); err != nil {
return fmt.Errorf("error starting chain service: %v", err)
}
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if !s.started {
s.enqueueNotification = make(chan interface{})
s.dequeueNotification = make(chan interface{})
s.currentBlock = make(chan *waddrmgr.BlockStamp)
s.quit = make(chan struct{})
s.started = true
s.wg.Add(1)
go func() {
select {
case s.enqueueNotification <- ClientConnected{}:
case <-s.quit:
}
}()
go s.notificationHandler()
}
return nil
}
// Stop replicates the RPC client's Stop method.
func (s *NeutrinoClient) Stop() {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
if !s.started {
return
}
close(s.quit)
s.started = false
}
// WaitForShutdown replicates the RPC client's WaitForShutdown method.
func (s *NeutrinoClient) WaitForShutdown() {
s.wg.Wait()
}
// GetBlock replicates the RPC client's GetBlock command.
func (s *NeutrinoClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
// TODO(roasbeef): add a block cache?
// * which evication strategy? depends on use case
// Should the block cache be INSIDE neutrino instead of in btcwallet?
block, err := s.CS.GetBlock(*hash)
if err != nil {
return nil, err
}
return block.MsgBlock(), nil
}
// GetBlockHeight gets the height of a block by its hash. It serves as a
// replacement for the use of GetBlockVerboseTxAsync for the wallet package
// since we can't actually return a FutureGetBlockVerboseResult because the
// underlying type is private to rpcclient.
func (s *NeutrinoClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
return s.CS.GetBlockHeight(hash)
}
// GetBestBlock replicates the RPC client's GetBestBlock command.
func (s *NeutrinoClient) GetBestBlock() (*chainhash.Hash, int32, error) {
chainTip, err := s.CS.BestBlock()
if err != nil {
return nil, 0, err
}
return &chainTip.Hash, chainTip.Height, nil
}
// BlockStamp returns the latest block notified by the client, or an error
// if the client has been shut down.
func (s *NeutrinoClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
select {
case bs := <-s.currentBlock:
return bs, nil
case <-s.quit:
return nil, errors.New("disconnected")
}
}
// GetBlockHash returns the block hash for the given height, or an error if the
// client has been shut down or the hash at the block height doesn't exist or
// is unknown.
func (s *NeutrinoClient) GetBlockHash(height int64) (*chainhash.Hash, error) {
return s.CS.GetBlockHash(height)
}
// GetBlockHeader returns the block header for the given block hash, or an error
// if the client has been shut down or the hash doesn't exist or is unknown.
func (s *NeutrinoClient) GetBlockHeader(
blockHash *chainhash.Hash) (*wire.BlockHeader, error) {
return s.CS.GetBlockHeader(blockHash)
}
// IsCurrent returns whether the chain backend considers its view of the network
// as "current".
func (s *NeutrinoClient) IsCurrent() bool {
return s.CS.IsCurrent()
}
// SendRawTransaction replicates the RPC client's SendRawTransaction command.
func (s *NeutrinoClient) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) (
*chainhash.Hash, error) {
err := s.CS.SendTransaction(tx)
if err != nil {
return nil, err
}
hash := tx.TxHash()
return &hash, nil
}
// FilterBlocks scans the blocks contained in the FilterBlocksRequest for any
// addresses of interest. For each requested block, the corresponding compact
// filter will first be checked for matches, skipping those that do not report
// anything. If the filter returns a positive match, the full block will be
// fetched and filtered. This method returns a FilterBlocksResponse for the first
// block containing a matching address. If no matches are found in the range of
// blocks requested, the returned response will be nil.
func (s *NeutrinoClient) FilterBlocks(
req *FilterBlocksRequest) (*FilterBlocksResponse, error) {
blockFilterer := NewBlockFilterer(s.chainParams, req)
// Construct the watchlist using the addresses and outpoints contained
// in the filter blocks request.
watchList, err := buildFilterBlocksWatchList(req)
if err != nil {
return nil, err
}
// Iterate over the requested blocks, fetching the compact filter for
// each one, and matching it against the watchlist generated above. If
// the filter returns a positive match, the full block is then requested
// and scanned for addresses using the block filterer.
for i, blk := range req.Blocks {
// TODO(wilmer): Investigate why polling it still necessary
// here. While testing, I ran into a few instances where the
// filter was not retrieved, leading to a panic. This should not
// happen in most cases thanks to the query logic revamp within
// Neutrino, but it seems there's still an uncovered edge case.
filter, err := s.pollCFilter(&blk.Hash)
if err != nil {
return nil, err
}
// Skip any empty filters.
if filter == nil || filter.N() == 0 {
continue
}
key := builder.DeriveKey(&blk.Hash)
matched, err := filter.MatchAny(key, watchList)
if err != nil {
return nil, err
} else if !matched {
continue
}
log.Infof("Fetching block height=%d hash=%v",
blk.Height, blk.Hash)
// TODO(conner): can optimize bandwidth by only fetching
// stripped blocks
rawBlock, err := s.GetBlock(&blk.Hash)
if err != nil {
return nil, err
}
if !blockFilterer.FilterBlock(rawBlock) {
continue
}
// If any external or internal addresses were detected in this
// block, we return them to the caller so that the rescan
// windows can widened with subsequent addresses. The
// `BatchIndex` is returned so that the caller can compute the
// *next* block from which to begin again.
resp := &FilterBlocksResponse{
BatchIndex: uint32(i),
BlockMeta: blk,
FoundExternalAddrs: blockFilterer.FoundExternal,
FoundInternalAddrs: blockFilterer.FoundInternal,
FoundOutPoints: blockFilterer.FoundOutPoints,
RelevantTxns: blockFilterer.RelevantTxns,
}
return resp, nil
}
// No addresses were found for this range.
return nil, nil
}
// buildFilterBlocksWatchList constructs a watchlist used for matching against a // buildFilterBlocksWatchList constructs a watchlist used for matching against a
// cfilter from a FilterBlocksRequest. The watchlist will be populated with all // cfilter from a FilterBlocksRequest. The watchlist will be populated with all
// external addresses, internal addresses, and outpoints contained in the // external addresses, internal addresses, and outpoints contained in the
@ -303,455 +47,3 @@ func buildFilterBlocksWatchList(req *FilterBlocksRequest) ([][]byte, error) {
return watchList, nil return watchList, nil
} }
// pollCFilter attempts to fetch a CFilter from the neutrino client. This is
// used to get around the fact that the filter headers may lag behind the
// highest known block header.
func (s *NeutrinoClient) pollCFilter(hash *chainhash.Hash) (*gcs.Filter, error) {
var (
filter *gcs.Filter
err error
count int
)
const maxFilterRetries = 50
for count < maxFilterRetries {
if count > 0 {
time.Sleep(100 * time.Millisecond)
}
filter, err = s.CS.GetCFilter(
*hash, wire.GCSFilterRegular, neutrino.OptimisticBatch(),
)
if err != nil {
count++
continue
}
return filter, nil
}
return nil, err
}
// Rescan replicates the RPC client's Rescan command.
func (s *NeutrinoClient) Rescan(startHash *chainhash.Hash, addrs []btcutil.Address,
outPoints map[wire.OutPoint]btcutil.Address) error {
s.clientMtx.Lock()
if !s.started {
s.clientMtx.Unlock()
return fmt.Errorf("can't do a rescan when the chain client " +
"is not started")
}
if s.scanning {
// Restart the rescan by killing the existing rescan.
close(s.rescanQuit)
rescan := s.rescan
s.clientMtx.Unlock()
rescan.WaitForShutdown()
s.clientMtx.Lock()
s.rescan = nil
s.rescanErr = nil
}
s.rescanQuit = make(chan struct{})
s.scanning = true
s.finished = false
s.lastProgressSent = false
s.lastFilteredBlockHeader = nil
s.isRescan = true
s.clientMtx.Unlock()
bestBlock, err := s.CS.BestBlock()
if err != nil {
return fmt.Errorf("can't get chain service's best block: %s", err)
}
header, err := s.CS.GetBlockHeader(&bestBlock.Hash)
if err != nil {
return fmt.Errorf("can't get block header for hash %v: %s",
bestBlock.Hash, err)
}
// If the wallet is already fully caught up, or the rescan has started
// with state that indicates a "fresh" wallet, we'll send a
// notification indicating the rescan has "finished".
if header.BlockHash() == *startHash {
s.clientMtx.Lock()
s.finished = true
rescanQuit := s.rescanQuit
s.clientMtx.Unlock()
// Release the lock while dispatching the notification since
// it's possible for the notificationHandler to be waiting to
// acquire it before receiving the notification.
select {
case s.enqueueNotification <- &RescanFinished{
Hash: startHash,
Height: bestBlock.Height,
Time: header.Timestamp,
}:
case <-s.quit:
return nil
case <-rescanQuit:
return nil
}
}
var inputsToWatch []neutrino.InputWithScript
for op, addr := range outPoints {
addrScript, err := txscript.PayToAddrScript(addr)
if err != nil {
return err
}
inputsToWatch = append(inputsToWatch, neutrino.InputWithScript{
OutPoint: op,
PkScript: addrScript,
})
}
s.clientMtx.Lock()
newRescan := neutrino.NewRescan(
&neutrino.RescanChainSource{
ChainService: s.CS,
},
neutrino.NotificationHandlers(rpcclient.NotificationHandlers{
OnBlockConnected: s.onBlockConnected,
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.StartBlock(&headerfs.BlockStamp{Hash: *startHash}),
neutrino.StartTime(s.startTime),
neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...),
neutrino.WatchInputs(inputsToWatch...),
)
s.rescan = newRescan
s.rescanErr = s.rescan.Start()
s.clientMtx.Unlock()
return nil
}
// NotifyBlocks replicates the RPC client's NotifyBlocks command.
func (s *NeutrinoClient) NotifyBlocks() error {
s.clientMtx.Lock()
// If we're scanning, we're already notifying on blocks. Otherwise,
// start a rescan without watching any addresses.
if !s.scanning {
s.clientMtx.Unlock()
return s.NotifyReceived([]btcutil.Address{})
}
s.clientMtx.Unlock()
return nil
}
// NotifyReceived replicates the RPC client's NotifyReceived command.
func (s *NeutrinoClient) NotifyReceived(addrs []btcutil.Address) error {
s.clientMtx.Lock()
// If we have a rescan running, we just need to add the appropriate
// addresses to the watch list.
if s.scanning {
s.clientMtx.Unlock()
return s.rescan.Update(neutrino.AddAddrs(addrs...))
}
s.rescanQuit = make(chan struct{})
s.scanning = true
// Don't need RescanFinished or RescanProgress notifications.
s.finished = true
s.lastProgressSent = true
s.lastFilteredBlockHeader = nil
// Rescan with just the specified addresses.
newRescan := neutrino.NewRescan(
&neutrino.RescanChainSource{
ChainService: s.CS,
},
neutrino.NotificationHandlers(rpcclient.NotificationHandlers{
OnBlockConnected: s.onBlockConnected,
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.StartTime(s.startTime),
neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...),
)
s.rescan = newRescan
s.rescanErr = s.rescan.Start()
s.clientMtx.Unlock()
return nil
}
// Notifications replicates the RPC client's Notifications method.
func (s *NeutrinoClient) Notifications() <-chan interface{} {
return s.dequeueNotification
}
// SetStartTime is a non-interface method to set the birthday of the wallet
// using this object. Since only a single rescan at a time is currently
// supported, only one birthday needs to be set. This does not fully restart a
// running rescan, so should not be used to update a rescan while it is running.
// TODO: When factoring out to multiple rescans per Neutrino client, add a
// birthday per client.
func (s *NeutrinoClient) SetStartTime(startTime time.Time) {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
s.startTime = startTime
}
// onFilteredBlockConnected sends appropriate notifications to the notification
// channel.
func (s *NeutrinoClient) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*btcutil.Tx) {
ntfn := FilteredBlockConnected{
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
},
Time: header.Timestamp,
},
}
for _, tx := range relevantTxs {
rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(),
header.Timestamp)
if err != nil {
log.Errorf("Cannot create transaction record for "+
"relevant tx: %s", err)
// TODO(aakselrod): Return?
continue
}
ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec)
}
select {
case s.enqueueNotification <- ntfn:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
s.clientMtx.Lock()
s.lastFilteredBlockHeader = header
s.clientMtx.Unlock()
// Handle RescanFinished notification if required.
s.dispatchRescanFinished()
}
// onBlockDisconnected sends appropriate notifications to the notification
// channel.
func (s *NeutrinoClient) onBlockDisconnected(hash *chainhash.Hash, height int32,
t time.Time) {
select {
case s.enqueueNotification <- BlockDisconnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
},
Time: t,
}:
case <-s.quit:
case <-s.rescanQuit:
}
}
func (s *NeutrinoClient) onBlockConnected(hash *chainhash.Hash, height int32,
time time.Time) {
// TODO: Move this closure out and parameterize it? Is it useful
// outside here?
sendRescanProgress := func() {
select {
case s.enqueueNotification <- &RescanProgress{
Hash: hash,
Height: height,
Time: time,
}:
case <-s.quit:
case <-s.rescanQuit:
}
}
// Only send BlockConnected notification if we're processing blocks
// before the birthday. Otherwise, we can just update using
// RescanProgress notifications.
if time.Before(s.startTime) {
// Send a RescanProgress notification every 10K blocks.
if height%10000 == 0 {
s.clientMtx.Lock()
shouldSend := s.isRescan && !s.finished
s.clientMtx.Unlock()
if shouldSend {
sendRescanProgress()
}
}
} else {
// Send a RescanProgress notification if we're just going over
// the boundary between pre-birthday and post-birthday blocks,
// and note that we've sent it.
s.clientMtx.Lock()
if !s.lastProgressSent {
shouldSend := s.isRescan && !s.finished
if shouldSend {
s.clientMtx.Unlock()
sendRescanProgress()
s.clientMtx.Lock()
s.lastProgressSent = true
}
}
s.clientMtx.Unlock()
select {
case s.enqueueNotification <- BlockConnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
},
Time: time,
}:
case <-s.quit:
case <-s.rescanQuit:
}
}
// Check if we're able to dispatch our final RescanFinished notification
// after processing this block.
s.dispatchRescanFinished()
}
// dispatchRescanFinished determines whether we're able to dispatch our final
// RescanFinished notification in order to mark the wallet as synced with the
// chain. If the notification has already been dispatched, then it won't be done
// again.
func (s *NeutrinoClient) dispatchRescanFinished() {
bs, err := s.CS.BestBlock()
if err != nil {
log.Errorf("Can't get chain service's best block: %s", err)
return
}
s.clientMtx.Lock()
// Only send the RescanFinished notification once.
if s.lastFilteredBlockHeader == nil || s.finished {
s.clientMtx.Unlock()
return
}
// Only send the RescanFinished notification once the underlying chain
// service sees itself as current.
if bs.Hash != s.lastFilteredBlockHeader.BlockHash() {
s.clientMtx.Unlock()
return
}
s.finished = s.CS.IsCurrent() && s.lastProgressSent
if !s.finished {
s.clientMtx.Unlock()
return
}
header := s.lastFilteredBlockHeader
s.clientMtx.Unlock()
select {
case s.enqueueNotification <- &RescanFinished{
Hash: &bs.Hash,
Height: bs.Height,
Time: header.Timestamp,
}:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
}
// notificationHandler queues and dequeues notifications. There are currently
// no bounds on the queue, so the dequeue channel should be read continually to
// avoid running out of memory.
func (s *NeutrinoClient) notificationHandler() {
hash, height, err := s.GetBestBlock()
if err != nil {
log.Errorf("Failed to get best block from chain service: %s",
err)
s.Stop()
s.wg.Done()
return
}
bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
// TODO: Rather than leaving this as an unbounded queue for all types of
// notifications, try dropping ones where a later enqueued notification
// can fully invalidate one waiting to be processed. For example,
// blockconnected notifications for greater block heights can remove the
// need to process earlier blockconnected notifications still waiting
// here.
var notifications []interface{}
enqueue := s.enqueueNotification
var dequeue chan interface{}
var next interface{}
out:
for {
s.clientMtx.Lock()
rescanErr := s.rescanErr
s.clientMtx.Unlock()
select {
case n, ok := <-enqueue:
if !ok {
// If no notifications are queued for handling,
// the queue is finished.
if len(notifications) == 0 {
break out
}
// nil channel so no more reads can occur.
enqueue = nil
continue
}
if len(notifications) == 0 {
next = n
dequeue = s.dequeueNotification
}
notifications = append(notifications, n)
case dequeue <- next:
if n, ok := next.(BlockConnected); ok {
bs = &waddrmgr.BlockStamp{
Height: n.Height,
Hash: n.Hash,
}
}
notifications[0] = nil
notifications = notifications[1:]
if len(notifications) != 0 {
next = notifications[0]
} else {
// If no more notifications can be enqueued, the
// queue is finished.
if enqueue == nil {
break out
}
dequeue = nil
}
case err := <-rescanErr:
if err != nil {
log.Errorf("Neutrino rescan ended with error: %s", err)
}
case s.currentBlock <- bs:
case <-s.quit:
break out
}
}
s.Stop()
close(s.dequeueNotification)
s.wg.Done()
}

View file

@ -1,666 +0,0 @@
package chain
import (
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/neutrino/query"
"github.com/lightningnetwork/lnd/ticker"
)
const (
// defaultRefreshPeersInterval represents the default polling interval
// at which we attempt to refresh the set of known peers.
defaultRefreshPeersInterval = 30 * time.Second
// defaultPeerReadyTimeout is the default amount of time we'll wait for
// a query peer to be ready to receive incoming block requests. Peers
// cannot respond to requests until the version exchange is completed
// upon connection establishment.
defaultPeerReadyTimeout = 15 * time.Second
// requiredServices are the requires services we require any candidate
// peers to signal such that we can retrieve pruned blocks from them.
requiredServices = wire.SFNodeNetwork | wire.SFNodeWitness
// prunedNodeService is the service bit signaled by pruned nodes on the
// network. Note that this service bit can also be signaled by full
// nodes, except that they also signal wire.SFNodeNetwork, where as
// pruned nodes don't.
prunedNodeService wire.ServiceFlag = 1 << 10
)
// queryPeer represents a Bitcoin network peer that we'll query for blocks.
// The ready channel serves as a signal for us to know when we can be sending
// queries to the peer. Any messages received from the peer are sent through the
// msgsRecvd channel.
type queryPeer struct {
*peer.Peer
ready chan struct{}
msgsRecvd chan wire.Message
quit chan struct{}
}
// signalUponDisconnect closes the peer's quit chan to signal it has
// disconnected.
func (p *queryPeer) signalUponDisconnect(f func()) {
go func() {
p.WaitForDisconnect()
close(p.quit)
f()
}()
}
// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin messages
// received from this peer will be sent on the returned channel. A closure is
// also returned, that should be called to cancel the subscription.
//
// NOTE: This method exists to satisfy the query.Peer interface.
func (p *queryPeer) SubscribeRecvMsg() (<-chan wire.Message, func()) {
return p.msgsRecvd, func() {}
}
// OnDisconnect returns a channel that will be closed once the peer disconnects.
//
// NOTE: This method exists to satisfy the query.Peer interface.
func (p *queryPeer) OnDisconnect() <-chan struct{} {
return p.quit
}
// PrunedBlockDispatcherConfig encompasses all of the dependencies required by
// the PrunedBlockDispatcher to carry out its duties.
type PrunedBlockDispatcherConfig struct {
// ChainParams represents the parameters of the current active chain.
ChainParams *chaincfg.Params
// NumTargetPeer represents the target number of peers we should
// maintain connections with. This exists to prevent establishing
// connections to all of the bitcoind's peers, which would be
// unnecessary and ineffecient.
NumTargetPeers int
// Dial establishes connections to Bitcoin peers. This must support
// dialing peers running over Tor if the backend also supports it.
Dial func(string) (net.Conn, error)
// GetPeers retrieves the active set of peers known to the backend node.
GetPeers func() ([]btcjson.GetPeerInfoResult, error)
// GetNodeAddresses returns random reachable addresses known to the
// backend node. An optional number of addresses to return can be
// provided, otherwise 8 are returned by default.
GetNodeAddresses func(*int32) ([]btcjson.GetNodeAddressesResult, error)
// PeerReadyTimeout is the amount of time we'll wait for a query peer to
// be ready to receive incoming block requests. Peers cannot respond to
// requests until the version exchange is completed upon connection
// establishment.
PeerReadyTimeout time.Duration
// RefreshPeersTicker is the polling ticker that signals us when we
// should attempt to refresh the set of known peers.
RefreshPeersTicker ticker.Ticker
// AllowSelfPeerConns is only used to allow the tests to bypass the peer
// self connection detecting and disconnect logic since they
// intentionally do so for testing purposes.
AllowSelfPeerConns bool
// MaxRequestInvs dictates how many invs we should fit in a single
// getdata request to a peer. This only exists to facilitate the testing
// of a request spanning multiple getdata messages.
MaxRequestInvs int
}
// PrunedBlockDispatcher enables a chain client to request blocks that the
// server has already pruned. This is done by connecting to the server's full
// node peers and querying them directly. Ideally, this is a capability
// supported by the server, though this is not yet possible with bitcoind.
type PrunedBlockDispatcher struct {
cfg PrunedBlockDispatcherConfig
// workManager handles satisfying all of our incoming pruned block
// requests.
workManager *query.WorkManager
// blocksQueried represents the set of pruned blocks we've been
// requested to query. Each block maps to a list of clients waiting to
// be notified once the block is received.
//
// NOTE: The blockMtx lock must always be held when accessing this
// field.
blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock
blockMtx sync.Mutex
// currentPeers represents the set of peers we're currently connected
// to. Each peer found here will have a worker spawned within the
// workManager to handle our queries.
//
// NOTE: The peerMtx lock must always be held when accessing this
// field.
currentPeers map[string]*peer.Peer
// bannedPeers represents the set of peers who have sent us an invalid
// reply corresponding to a query. Peers within this set should not be
// dialed.
//
// NOTE: The peerMtx lock must always be held when accessing this
// field.
bannedPeers map[string]struct{}
peerMtx sync.Mutex
// peersConnected is the channel through which we'll send new peers
// we've established connections to.
peersConnected chan query.Peer
// timeSource provides a mechanism to add several time samples which are
// used to determine a median time which is then used as an offset to
// the local clock when validating blocks received from peers.
timeSource blockchain.MedianTimeSource
quit chan struct{}
wg sync.WaitGroup
}
// NewPrunedBlockDispatcher initializes a new PrunedBlockDispatcher instance
// backed by the given config.
func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) (
*PrunedBlockDispatcher, error) {
if cfg.NumTargetPeers < 1 {
return nil, errors.New("config option NumTargetPeer must be >= 1")
}
if cfg.MaxRequestInvs > wire.MaxInvPerMsg {
return nil, fmt.Errorf("config option MaxRequestInvs must be "+
"<= %v", wire.MaxInvPerMsg)
}
peersConnected := make(chan query.Peer)
return &PrunedBlockDispatcher{
cfg: *cfg,
workManager: query.New(&query.Config{
ConnectedPeers: func() (<-chan query.Peer, func(), error) {
return peersConnected, func() {}, nil
},
NewWorker: query.NewWorker,
Ranking: query.NewPeerRanking(),
}),
blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock),
currentPeers: make(map[string]*peer.Peer),
bannedPeers: make(map[string]struct{}),
peersConnected: peersConnected,
timeSource: blockchain.NewMedianTime(),
quit: make(chan struct{}),
}, nil
}
// Start allows the PrunedBlockDispatcher to begin handling incoming block
// requests.
func (d *PrunedBlockDispatcher) Start() error {
log.Tracef("Starting pruned block dispatcher")
if err := d.workManager.Start(); err != nil {
return err
}
d.wg.Add(1)
go d.pollPeers()
return nil
}
// Stop stops the PrunedBlockDispatcher from accepting any more incoming block
// requests.
func (d *PrunedBlockDispatcher) Stop() {
log.Tracef("Stopping pruned block dispatcher")
close(d.quit)
d.wg.Wait()
_ = d.workManager.Stop()
}
// pollPeers continuously polls the backend node for new peers to establish
// connections to.
func (d *PrunedBlockDispatcher) pollPeers() {
defer d.wg.Done()
if err := d.connectToPeers(); err != nil {
log.Warnf("Unable to establish peer connections: %v", err)
}
d.cfg.RefreshPeersTicker.Resume()
defer d.cfg.RefreshPeersTicker.Stop()
for {
select {
case <-d.cfg.RefreshPeersTicker.Ticks():
// Quickly determine if we need any more peer
// connections. If we don't, we'll wait for our next
// tick.
d.peerMtx.Lock()
peersNeeded := d.cfg.NumTargetPeers - len(d.currentPeers)
d.peerMtx.Unlock()
if peersNeeded <= 0 {
continue
}
// If we do, attempt to establish connections until
// we've reached our target number.
if err := d.connectToPeers(); err != nil {
log.Warnf("Failed to establish peer "+
"connections: %v", err)
continue
}
case <-d.quit:
return
}
}
}
// connectToPeers attempts to establish new peer connections until the target
// number is reached. Once a connection is successfully established, the peer is
// sent through the peersConnected channel to notify the internal workManager.
func (d *PrunedBlockDispatcher) connectToPeers() error {
// Refresh the list of peers our backend is currently connected to, and
// filter out any that do not meet our requirements.
peers, err := d.cfg.GetPeers()
if err != nil {
return err
}
addrs, err := filterPeers(peers)
if err != nil {
return err
}
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
for _, addr := range addrs {
needMore, err := d.connectToPeer(addr)
if err != nil {
log.Debugf("Failed connecting to peer %v: %v", addr, err)
continue
}
if !needMore {
return nil
}
}
// We still need more addresses so we'll also invoke the
// `getnodeaddresses` RPC to receive random reachable addresses. We'll
// also filter out any that do not meet our requirements. The nil
// argument will return a default number of addresses, which is
// currently 8. We don't care how many addresses are returned as long as
// 1 is returned, since this will be polled regularly if needed.
nodeAddrs, err := d.cfg.GetNodeAddresses(nil)
if err != nil {
return err
}
addrs = filterNodeAddrs(nodeAddrs)
for _, addr := range addrs {
if _, err := d.connectToPeer(addr); err != nil {
log.Debugf("Failed connecting to peer %v: %v", addr, err)
}
}
return nil
}
// connectToPeer attempts to establish a connection to the given peer and waits
// up to PeerReadyTimeout for the version exchange to complete so that we can
// begin sending it our queries.
func (d *PrunedBlockDispatcher) connectToPeer(addr string) (bool, error) {
// Prevent connections to peers we've already connected to or we've
// banned.
d.peerMtx.Lock()
_, isBanned := d.bannedPeers[addr]
_, isConnected := d.currentPeers[addr]
d.peerMtx.Unlock()
if isBanned || isConnected {
return true, nil
}
peer, err := d.newQueryPeer(addr)
if err != nil {
return true, fmt.Errorf("unable to configure query peer %v: "+
"%v", addr, err)
}
// Establish the connection and wait for the protocol negotiation to
// complete.
conn, err := d.cfg.Dial(addr)
if err != nil {
return true, err
}
peer.AssociateConnection(conn)
select {
case <-peer.ready:
case <-time.After(d.cfg.PeerReadyTimeout):
peer.Disconnect()
return true, errors.New("timed out waiting for protocol negotiation")
case <-d.quit:
return false, errors.New("shutting down")
}
// Remove the peer once it has disconnected.
peer.signalUponDisconnect(func() {
d.peerMtx.Lock()
delete(d.currentPeers, peer.Addr())
d.peerMtx.Unlock()
})
d.peerMtx.Lock()
d.currentPeers[addr] = peer.Peer
numPeers := len(d.currentPeers)
d.peerMtx.Unlock()
// Notify the new peer connection to our workManager.
select {
case d.peersConnected <- peer:
case <-d.quit:
return false, errors.New("shutting down")
}
// Request more peer connections if we haven't reached our target number
// with the new peer.
return numPeers < d.cfg.NumTargetPeers, nil
}
// filterPeers filters out any peers which cannot handle arbitrary witness block
// requests, i.e., any peer which is not considered a segwit-enabled
// "full-node".
func filterPeers(peers []btcjson.GetPeerInfoResult) ([]string, error) {
var eligible []string
for _, peer := range peers {
rawServices, err := hex.DecodeString(peer.Services)
if err != nil {
return nil, err
}
services := wire.ServiceFlag(binary.BigEndian.Uint64(rawServices))
if !satisfiesRequiredServices(services) {
continue
}
eligible = append(eligible, peer.Addr)
}
return eligible, nil
}
// filterNodeAddrs filters out any peers which cannot handle arbitrary witness
// block requests, i.e., any peer which is not considered a segwit-enabled
// "full-node".
func filterNodeAddrs(nodeAddrs []btcjson.GetNodeAddressesResult) []string {
var eligible []string
for _, nodeAddr := range nodeAddrs {
services := wire.ServiceFlag(nodeAddr.Services)
if !satisfiesRequiredServices(services) {
continue
}
eligible = append(eligible, nodeAddr.Address)
}
return eligible
}
// satisfiesRequiredServices determines whether the services signaled by a peer
// satisfy our requirements for retrieving pruned blocks from them. We need the
// full chain, and witness data as well. Note that we ignore the limited
// (pruned bit) as nodes can have the full data and set that as well. Pure
// pruned nodes won't set the network bit.
func satisfiesRequiredServices(services wire.ServiceFlag) bool {
return services&requiredServices == requiredServices
}
// newQueryPeer creates a new peer instance configured to relay any received
// messages to the internal workManager.
func (d *PrunedBlockDispatcher) newQueryPeer(addr string) (*queryPeer, error) {
ready := make(chan struct{})
msgsRecvd := make(chan wire.Message)
cfg := &peer.Config{
ChainParams: d.cfg.ChainParams,
// We're not interested in transactions, so disable their relay.
DisableRelayTx: true,
Listeners: peer.MessageListeners{
// Add the remote peer time as a sample for creating an
// offset against the local clock to keep the network
// time in sync.
OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
d.timeSource.AddTimeSample(p.Addr(), msg.Timestamp)
return nil
},
// Register a callback to signal us when we can start
// querying the peer for blocks.
OnVerAck: func(*peer.Peer, *wire.MsgVerAck) {
close(ready)
},
// Register a callback to signal us whenever the peer
// has sent us a block message.
OnRead: func(p *peer.Peer, _ int, msg wire.Message, err error) {
if err != nil {
return
}
var block *wire.MsgBlock
switch msg := msg.(type) {
case *wire.MsgBlock:
block = msg
case *wire.MsgVersion, *wire.MsgVerAck,
*wire.MsgPing, *wire.MsgPong:
return
default:
log.Debugf("Received unexpected message "+
"%T from peer %v", msg, p.Addr())
return
}
select {
case msgsRecvd <- block:
case <-d.quit:
}
},
},
AllowSelfConns: true,
}
p, err := peer.NewOutboundPeer(cfg, addr)
if err != nil {
return nil, err
}
return &queryPeer{
Peer: p,
ready: ready,
msgsRecvd: msgsRecvd,
quit: make(chan struct{}),
}, nil
}
// banPeer bans a peer by disconnecting them and ensuring we don't reconnect.
func (d *PrunedBlockDispatcher) banPeer(peer string) {
d.peerMtx.Lock()
defer d.peerMtx.Unlock()
d.bannedPeers[peer] = struct{}{}
if p, ok := d.currentPeers[peer]; ok {
p.Disconnect()
}
}
// Query submits a request to query the information of the given blocks.
func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash,
opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) {
reqs, blockChan, err := d.newRequest(blocks)
if err != nil {
errChan := make(chan error, 1)
errChan <- err
return nil, errChan
}
var errChan chan error
if len(reqs) > 0 {
errChan = d.workManager.Query(reqs, opts...)
}
return blockChan, errChan
}
// newRequest construct a new query request for the given blocks to submit to
// the internal workManager. A channel is also returned through which the
// requested blocks are sent through.
func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) (
[]*query.Request, <-chan *wire.MsgBlock, error) {
// Make sure the channel is buffered enough to handle all blocks.
blockChan := make(chan *wire.MsgBlock, len(blocks))
d.blockMtx.Lock()
defer d.blockMtx.Unlock()
// Each GetData message can only include up to MaxRequestInvs invs,
// and each block consumes a single inv.
var (
reqs []*query.Request
getData *wire.MsgGetData
)
for i, block := range blocks {
if getData == nil {
getData = wire.NewMsgGetData()
}
if _, ok := d.blocksQueried[*block]; !ok {
log.Debugf("Queuing new block %v for request", *block)
inv := wire.NewInvVect(wire.InvTypeBlock, block)
if err := getData.AddInvVect(inv); err != nil {
return nil, nil, err
}
} else {
log.Debugf("Received new request for pending query of "+
"block %v", *block)
}
d.blocksQueried[*block] = append(
d.blocksQueried[*block], blockChan,
)
// If we have any invs to request, or we've reached the maximum
// allowed, queue the getdata message as is, and proceed to the
// next if any.
if (len(getData.InvList) > 0 && i == len(blocks)-1) ||
len(getData.InvList) == d.cfg.MaxRequestInvs {
reqs = append(reqs, &query.Request{
Req: getData,
HandleResp: d.handleResp,
})
getData = nil
}
}
return reqs, blockChan, nil
}
// handleResp is a response handler that will be called for every message
// received from the peer that the request was made to. It should validate the
// response against the request made, and return a Progress indicating whether
// the request was answered by this particular response.
//
// NOTE: Since the worker's job queue will be stalled while this method is
// running, it should not be doing any expensive operations. It should validate
// the response and immediately return the progress. The response should be
// handed off to another goroutine for processing.
func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message,
peer string) query.Progress {
// We only expect MsgBlock as replies.
block, ok := resp.(*wire.MsgBlock)
if !ok {
return query.Progress{
Progressed: false,
Finished: false,
}
}
// We only serve MsgGetData requests.
getData, ok := req.(*wire.MsgGetData)
if !ok {
return query.Progress{
Progressed: false,
Finished: false,
}
}
// Check that we've actually queried for this block and validate it.
blockHash := block.BlockHash()
d.blockMtx.Lock()
blockChans, ok := d.blocksQueried[blockHash]
if !ok {
d.blockMtx.Unlock()
return query.Progress{
Progressed: false,
Finished: false,
}
}
err := blockchain.CheckBlockSanity(
btcutil.NewBlock(block), d.cfg.ChainParams.PowLimit,
d.timeSource,
)
if err != nil {
d.blockMtx.Unlock()
log.Warnf("Received invalid block %v from peer %v: %v",
blockHash, peer, err)
d.banPeer(peer)
return query.Progress{
Progressed: false,
Finished: false,
}
}
// Once validated, we can safely remove it.
delete(d.blocksQueried, blockHash)
// Check whether we have any other pending blocks we've yet to receive.
// If we do, we'll mark the response as progressing our query, but not
// completing it yet.
progress := query.Progress{Progressed: true, Finished: true}
for _, inv := range getData.InvList {
if _, ok := d.blocksQueried[inv.Hash]; ok {
progress.Finished = false
break
}
}
d.blockMtx.Unlock()
// Launch a goroutine to notify all clients of the block as we don't
// want to potentially block our workManager.
d.wg.Add(1)
go func() {
defer d.wg.Done()
for _, blockChan := range blockChans {
select {
case blockChan <- block:
case <-d.quit:
return
}
}
}()
return progress
}

View file

@ -1,659 +0,0 @@
package chain
import (
"encoding/binary"
"encoding/hex"
"fmt"
"net"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
)
var (
addrCounter int32 // Increased atomically.
chainParams = chaincfg.RegressionNetParams
)
func nextAddr() string {
port := atomic.AddInt32(&addrCounter, 1)
return fmt.Sprintf("10.0.0.1:%d", port)
}
// prunedBlockDispatcherHarness is a harness used to facilitate the testing of the
// PrunedBlockDispatcher.
type prunedBlockDispatcherHarness struct {
t *testing.T
dispatcher *PrunedBlockDispatcher
hashes []*chainhash.Hash
blocks map[chainhash.Hash]*wire.MsgBlock
peerMtx sync.Mutex
peers map[string]*peer.Peer
fallbackAddrs map[string]*peer.Peer
localConns map[string]net.Conn // Connections to peers.
remoteConns map[string]net.Conn // Connections from peers.
dialedPeer chan string
queriedPeer chan struct{}
blocksQueried map[chainhash.Hash]int
shouldReply uint32 // 0 == true, 1 == false, 2 == invalid reply
}
// newNetworkBlockTestHarness initializes a new PrunedBlockDispatcher test harness
// backed by a custom chain and peers.
func newNetworkBlockTestHarness(t *testing.T, numBlocks,
numPeers, numWorkers uint32) *prunedBlockDispatcherHarness {
h := &prunedBlockDispatcherHarness{
t: t,
dispatcher: &PrunedBlockDispatcher{},
peers: make(map[string]*peer.Peer, numPeers),
fallbackAddrs: make(map[string]*peer.Peer, numPeers),
localConns: make(map[string]net.Conn, numPeers),
remoteConns: make(map[string]net.Conn, numPeers),
dialedPeer: make(chan string),
queriedPeer: make(chan struct{}),
blocksQueried: make(map[chainhash.Hash]int),
shouldReply: 0,
}
h.hashes, h.blocks = genBlockChain(numBlocks)
for i := uint32(0); i < numPeers; i++ {
h.addPeer(false)
}
dial := func(addr string) (net.Conn, error) {
go func() {
h.dialedPeer <- addr
}()
h.peerMtx.Lock()
defer h.peerMtx.Unlock()
localConn, ok := h.localConns[addr]
if !ok {
return nil, fmt.Errorf("local conn %v not found", addr)
}
remoteConn, ok := h.remoteConns[addr]
if !ok {
return nil, fmt.Errorf("remote conn %v not found", addr)
}
if p, ok := h.peers[addr]; ok {
p.AssociateConnection(remoteConn)
}
if p, ok := h.fallbackAddrs[addr]; ok {
p.AssociateConnection(remoteConn)
}
return localConn, nil
}
var err error
h.dispatcher, err = NewPrunedBlockDispatcher(&PrunedBlockDispatcherConfig{
ChainParams: &chainParams,
NumTargetPeers: int(numWorkers),
Dial: dial,
GetPeers: func() ([]btcjson.GetPeerInfoResult, error) {
h.peerMtx.Lock()
defer h.peerMtx.Unlock()
res := make([]btcjson.GetPeerInfoResult, 0, len(h.peers))
for addr, peer := range h.peers {
var rawServices [8]byte
binary.BigEndian.PutUint64(
rawServices[:], uint64(peer.Services()),
)
res = append(res, btcjson.GetPeerInfoResult{
Addr: addr,
Services: hex.EncodeToString(rawServices[:]),
})
}
return res, nil
},
GetNodeAddresses: func(*int32) ([]btcjson.GetNodeAddressesResult, error) {
h.peerMtx.Lock()
defer h.peerMtx.Unlock()
res := make(
[]btcjson.GetNodeAddressesResult, 0,
len(h.fallbackAddrs),
)
for addr, peer := range h.fallbackAddrs {
res = append(res, btcjson.GetNodeAddressesResult{
Services: uint64(peer.Services()),
Address: addr,
})
}
return res, nil
},
PeerReadyTimeout: time.Hour,
RefreshPeersTicker: ticker.NewForce(time.Hour),
AllowSelfPeerConns: true,
MaxRequestInvs: wire.MaxInvPerMsg,
})
require.NoError(t, err)
return h
}
// start starts the PrunedBlockDispatcher and asserts that connections are made
// to all available peers.
func (h *prunedBlockDispatcherHarness) start() {
h.t.Helper()
err := h.dispatcher.Start()
require.NoError(h.t, err)
h.peerMtx.Lock()
numPeers := len(h.peers)
h.peerMtx.Unlock()
for i := 0; i < numPeers; i++ {
h.assertPeerDialed()
}
}
// stop stops the PrunedBlockDispatcher and asserts that all internal fields of
// the harness have been properly consumed.
func (h *prunedBlockDispatcherHarness) stop() {
h.dispatcher.Stop()
select {
case <-h.dialedPeer:
h.t.Fatal("did not consume all dialedPeer signals")
default:
}
select {
case <-h.queriedPeer:
h.t.Fatal("did not consume all queriedPeer signals")
default:
}
require.Empty(h.t, h.blocksQueried)
}
// addPeer adds a new random peer available for use by the
// PrunedBlockDispatcher.
func (h *prunedBlockDispatcherHarness) addPeer(fallback bool) string {
addr := nextAddr()
h.peerMtx.Lock()
defer h.peerMtx.Unlock()
h.resetPeer(addr, fallback)
return addr
}
// resetPeer resets the internal peer connection state allowing the
// PrunedBlockDispatcher to establish a mock connection to it.
func (h *prunedBlockDispatcherHarness) resetPeer(addr string, fallback bool) {
if fallback {
h.fallbackAddrs[addr] = h.newPeer()
} else {
h.peers[addr] = h.newPeer()
}
// Establish a mock connection between us and each peer.
inConn, outConn := pipe(
&conn{localAddr: addr, remoteAddr: "10.0.0.1:8333"},
&conn{localAddr: "10.0.0.1:8333", remoteAddr: addr},
)
h.localConns[addr] = outConn
h.remoteConns[addr] = inConn
}
// newPeer returns a new properly configured peer.Peer instance that will be
// used by the PrunedBlockDispatcher.
func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer {
return peer.NewInboundPeer(&peer.Config{
ChainParams: &chainParams,
DisableRelayTx: true,
Listeners: peer.MessageListeners{
OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) {
go func() {
h.queriedPeer <- struct{}{}
}()
for _, inv := range msg.InvList {
// Invs should always be for blocks.
require.Equal(h.t, wire.InvTypeBlock, inv.Type)
// Invs should always be for known blocks.
block, ok := h.blocks[inv.Hash]
require.True(h.t, ok)
switch atomic.LoadUint32(&h.shouldReply) {
// Don't reply if requested.
case 1:
continue
// Make the block invalid and send it.
case 2:
block = produceInvalidBlock(block)
}
go p.QueueMessage(block, nil)
}
},
},
Services: wire.SFNodeNetwork | wire.SFNodeWitness,
AllowSelfConns: true,
})
}
// query requests the given blocks from the PrunedBlockDispatcher.
func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) (
<-chan *wire.MsgBlock, <-chan error) {
h.t.Helper()
blockChan, errChan := h.dispatcher.Query(blocks)
select {
case err := <-errChan:
require.NoError(h.t, err)
default:
}
for _, block := range blocks {
h.blocksQueried[*block]++
}
return blockChan, errChan
}
// disablePeerReplies prevents the query peer from replying.
func (h *prunedBlockDispatcherHarness) disablePeerReplies() {
atomic.StoreUint32(&h.shouldReply, 1)
}
// enablePeerReplies allows the query peer to reply.
func (h *prunedBlockDispatcherHarness) enablePeerReplies() {
atomic.StoreUint32(&h.shouldReply, 0)
}
// enableInvalidPeerReplies
func (h *prunedBlockDispatcherHarness) enableInvalidPeerReplies() {
atomic.StoreUint32(&h.shouldReply, 2)
}
// refreshPeers forces the RefreshPeersTicker to fire.
func (h *prunedBlockDispatcherHarness) refreshPeers() {
h.t.Helper()
h.dispatcher.cfg.RefreshPeersTicker.(*ticker.Force).Force <- time.Now()
}
// disconnectPeer simulates a peer disconnecting from the PrunedBlockDispatcher.
func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string, fallback bool) {
h.t.Helper()
h.peerMtx.Lock()
defer h.peerMtx.Unlock()
require.Contains(h.t, h.peers, addr)
// Obtain the current number of peers before disconnecting such that we
// can block until the peer has been fully disconnected.
h.dispatcher.peerMtx.Lock()
numPeers := len(h.dispatcher.currentPeers)
h.dispatcher.peerMtx.Unlock()
h.peers[addr].Disconnect()
require.Eventually(h.t, func() bool {
h.dispatcher.peerMtx.Lock()
defer h.dispatcher.peerMtx.Unlock()
return len(h.dispatcher.currentPeers) == numPeers-1
}, time.Second, 200*time.Millisecond)
// Reset the peer connection state to allow connections to them again.
h.resetPeer(addr, fallback)
}
// assertPeerDialed asserts that a connection was made to the given peer.
func (h *prunedBlockDispatcherHarness) assertPeerDialed() {
h.t.Helper()
select {
case <-h.dialedPeer:
case <-time.After(5 * time.Second):
h.t.Fatalf("expected peer to be dialed")
}
}
// assertPeerDialedWithAddr asserts that a connection was made to the given peer.
func (h *prunedBlockDispatcherHarness) assertPeerDialedWithAddr(addr string) {
h.t.Helper()
select {
case dialedAddr := <-h.dialedPeer:
require.Equal(h.t, addr, dialedAddr)
case <-time.After(5 * time.Second):
h.t.Fatalf("expected peer to be dialed")
}
}
// assertPeerQueried asserts that query was sent to the given peer.
func (h *prunedBlockDispatcherHarness) assertPeerQueried() {
h.t.Helper()
select {
case <-h.queriedPeer:
case <-time.After(5 * time.Second):
h.t.Fatalf("expected a peer to be queried")
}
}
// assertPeerReplied asserts that the query peer replies with a block the
// PrunedBlockDispatcher queried for.
func (h *prunedBlockDispatcherHarness) assertPeerReplied(
blockChan <-chan *wire.MsgBlock, errChan <-chan error,
expectCompletionSignal bool) {
h.t.Helper()
select {
case block := <-blockChan:
blockHash := block.BlockHash()
_, ok := h.blocksQueried[blockHash]
require.True(h.t, ok)
expBlock, ok := h.blocks[blockHash]
require.True(h.t, ok)
require.Equal(h.t, expBlock, block)
// Decrement how many clients queried the same block. Once we
// have none left, remove it from the map.
h.blocksQueried[blockHash]--
if h.blocksQueried[blockHash] == 0 {
delete(h.blocksQueried, blockHash)
}
case <-time.After(5 * time.Second):
select {
case err := <-errChan:
h.t.Fatalf("received unexpected error send: %v", err)
default:
}
h.t.Fatal("expected reply from peer")
}
// If we should expect a nil error to be sent by the internal
// workManager to signal completion of the request, wait for it now.
if expectCompletionSignal {
select {
case err := <-errChan:
require.NoError(h.t, err)
case <-time.After(5 * time.Second):
h.t.Fatal("expected nil err to signal completion")
}
}
}
// assertNoPeerDialed asserts that the PrunedBlockDispatcher hasn't established
// a new peer connection.
func (h *prunedBlockDispatcherHarness) assertNoPeerDialed() {
h.t.Helper()
select {
case peer := <-h.dialedPeer:
h.t.Fatalf("unexpected connection established with peer %v", peer)
case <-time.After(2 * time.Second):
}
}
// assertNoReply asserts that the peer hasn't replied to a query.
func (h *prunedBlockDispatcherHarness) assertNoReply(
blockChan <-chan *wire.MsgBlock, errChan <-chan error) {
h.t.Helper()
select {
case block := <-blockChan:
h.t.Fatalf("received unexpected block %v", block.BlockHash())
case err := <-errChan:
h.t.Fatalf("received unexpected error send: %v", err)
case <-time.After(2 * time.Second):
}
}
// TestPrunedBlockDispatcherQuerySameBlock tests that client requests for the
// same block result in only fetching the block once while pending.
func TestPrunedBlockDispatcherQuerySameBlock(t *testing.T) {
t.Parallel()
const numBlocks = 1
const numPeers = 5
const numRequests = numBlocks * numPeers
h := newNetworkBlockTestHarness(t, numBlocks, numPeers, numPeers)
h.start()
defer h.stop()
// Queue all the block requests one by one.
blockChans := make([]<-chan *wire.MsgBlock, 0, numRequests)
errChans := make([]<-chan error, 0, numRequests)
for i := 0; i < numRequests; i++ {
blockChan, errChan := h.query(h.hashes)
blockChans = append(blockChans, blockChan)
errChans = append(errChans, errChan)
}
// We should only see one query.
h.assertPeerQueried()
for i := 0; i < numRequests; i++ {
h.assertPeerReplied(blockChans[i], errChans[i], i == 0)
}
}
// TestPrunedBlockDispatcherMultipleGetData tests that a client requesting blocks
// that span across multiple queries works as intended.
func TestPrunedBlockDispatcherMultipleGetData(t *testing.T) {
t.Parallel()
const maxRequestInvs = 5
const numBlocks = (maxRequestInvs * 5) + 1
h := newNetworkBlockTestHarness(t, numBlocks, 1, 1)
h.dispatcher.cfg.MaxRequestInvs = maxRequestInvs
h.start()
defer h.stop()
// Request all blocks.
blockChan, errChan := h.query(h.hashes)
// Since we have more blocks than can fit in a single GetData message,
// we should expect multiple queries. For each query, we should expect
// wire.MaxInvPerMsg replies until we've received all of them.
blocksRecvd := 0
numMsgs := (numBlocks / maxRequestInvs)
if numBlocks%wire.MaxInvPerMsg > 0 {
numMsgs++
}
for i := 0; i < numMsgs; i++ {
h.assertPeerQueried()
for j := 0; j < maxRequestInvs; j++ {
expectCompletionSignal := blocksRecvd == numBlocks-1
h.assertPeerReplied(
blockChan, errChan, expectCompletionSignal,
)
blocksRecvd++
if blocksRecvd == numBlocks {
break
}
}
}
}
// TestPrunedBlockDispatcherMultipleQueryPeers tests that client requests are
// distributed across multiple query peers.
func TestPrunedBlockDispatcherMultipleQueryPeers(t *testing.T) {
t.Parallel()
const numBlocks = 10
const numPeers = numBlocks / 2
h := newNetworkBlockTestHarness(t, numBlocks, numPeers, numPeers)
h.start()
defer h.stop()
// Queue all the block requests one by one.
blockChans := make([]<-chan *wire.MsgBlock, 0, numBlocks)
errChans := make([]<-chan error, 0, numBlocks)
for i := 0; i < numBlocks; i++ {
blockChan, errChan := h.query(h.hashes[i : i+1])
blockChans = append(blockChans, blockChan)
errChans = append(errChans, errChan)
}
// We should see one query per block.
for i := 0; i < numBlocks; i++ {
h.assertPeerQueried()
h.assertPeerReplied(blockChans[i], errChans[i], true)
}
}
// TestPrunedBlockDispatcherPeerPoller ensures that the peer poller can detect
// when more connections are required to satisfy a request.
func TestPrunedBlockDispatcherPeerPoller(t *testing.T) {
t.Parallel()
// Initialize our harness as usual, but don't create any peers yet.
h := newNetworkBlockTestHarness(t, 1, 0, 2)
h.start()
defer h.stop()
// We shouldn't see any peers dialed since we don't have any.
h.assertNoPeerDialed()
// We'll then query for a block.
blockChan, errChan := h.query(h.hashes)
// Refresh our peers. This would dial some peers, but we don't have any
// yet.
h.refreshPeers()
h.assertNoPeerDialed()
// Add a new peer and force a refresh. We should see the peer be dialed.
// We'll disable replies for now, as we'll want to test the disconnect
// case.
h.disablePeerReplies()
peer := h.addPeer(false)
h.refreshPeers()
h.assertPeerDialedWithAddr(peer)
h.assertPeerQueried()
// Disconnect our peer and re-enable replies.
h.disconnectPeer(peer, false)
h.enablePeerReplies()
h.assertNoReply(blockChan, errChan)
// Force a refresh once again. Since the peer has disconnected, a new
// connection should be made and the peer should be queried again.
h.refreshPeers()
h.assertPeerDialed()
h.assertPeerQueried()
// Add a fallback addresses and force refresh our peers again. We can
// afford to have one more query peer, so a connection should be made.
fallbackPeer := h.addPeer(true)
h.refreshPeers()
h.assertPeerDialedWithAddr(fallbackPeer)
// Now that we know we've connected to the peer, we should be able to
// receive their response.
h.assertPeerReplied(blockChan, errChan, true)
}
// TestPrunedBlockDispatcherInvalidBlock ensures that validation is performed on
// blocks received from peers, and that any peers which have sent an invalid
// block are banned and not connected to.
func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) {
t.Parallel()
h := newNetworkBlockTestHarness(t, 1, 1, 1)
h.start()
defer h.stop()
// We'll start the test by signaling our peer to send an invalid block.
h.enableInvalidPeerReplies()
// We'll then query for a block. We shouldn't see a response as the
// block should have failed validation.
blockChan, errChan := h.query(h.hashes)
h.assertPeerQueried()
h.assertNoReply(blockChan, errChan)
// Since the peer sent us an invalid block, they should have been
// disconnected and banned. Refreshing our peers shouldn't result in a
// new connection attempt because we don't have any other peers
// available.
h.refreshPeers()
h.assertNoPeerDialed()
// Signal to our peers to send valid replies and add a new peer.
h.enablePeerReplies()
_ = h.addPeer(false)
// Force a refresh, which should cause our new peer to be dialed and
// queried. We expect them to send a valid block and fulfill our
// request.
h.refreshPeers()
h.assertPeerDialed()
h.assertPeerQueried()
h.assertPeerReplied(blockChan, errChan, true)
}
func TestSatisfiesRequiredServices(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
services wire.ServiceFlag
ok bool
}{
{
name: "full node, segwit",
services: wire.SFNodeWitness | wire.SFNodeNetwork,
ok: true,
},
{
name: "full node segwit, signals limited",
services: wire.SFNodeWitness | wire.SFNodeNetwork | prunedNodeService,
ok: true,
},
{
name: "full node, no segwit",
services: wire.SFNodeNetwork,
ok: false,
},
{
name: "segwit, pure pruned",
services: wire.SFNodeWitness | prunedNodeService,
ok: false,
},
}
for _, testCase := range testCases {
ok := satisfiesRequiredServices(testCase.services)
require.Equal(
t, testCase.ok, ok, fmt.Sprintf("test case: %v", testCase.name),
)
}
}

View file

@ -23,7 +23,6 @@ import (
"github.com/btcsuite/btcwallet/netparams" "github.com/btcsuite/btcwallet/netparams"
"github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wallet"
flags "github.com/jessevdk/go-flags" flags "github.com/jessevdk/go-flags"
"github.com/lightninglabs/neutrino"
) )
const ( const (
@ -274,9 +273,6 @@ func loadConfig() (*config, []string, error) {
UseSPV: false, UseSPV: false,
AddPeers: []string{}, AddPeers: []string{},
ConnectPeers: []string{}, ConnectPeers: []string{},
MaxPeers: neutrino.MaxPeers,
BanDuration: neutrino.BanDuration,
BanThreshold: neutrino.BanThreshold,
DBTimeout: wallet.DefaultDBTimeout, DBTimeout: wallet.DefaultDBTimeout,
} }
@ -545,61 +541,55 @@ func loadConfig() (*config, []string, error) {
"::1": {}, "::1": {},
} }
if cfg.UseSPV { if cfg.RPCConnect == "" {
neutrino.MaxPeers = cfg.MaxPeers cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort)
neutrino.BanDuration = cfg.BanDuration }
neutrino.BanThreshold = cfg.BanThreshold
// Add default port to connect flag if missing.
cfg.RPCConnect, err = cfgutil.NormalizeAddress(cfg.RPCConnect,
activeNet.RPCClientPort)
if err != nil {
fmt.Fprintf(os.Stderr,
"Invalid rpcconnect network address: %v\n", err)
return nil, nil, err
}
RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect)
if err != nil {
return nil, nil, err
}
if cfg.DisableClientTLS {
if _, ok := localhostListeners[RPCHost]; !ok {
str := "%s: the --noclienttls option may not be used " +
"when connecting RPC to non localhost " +
"addresses: %s"
err := fmt.Errorf(str, funcName, cfg.RPCConnect)
fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err
}
} else { } else {
if cfg.RPCConnect == "" { // If CAFile is unset, choose either the copy or local btcd cert.
cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort) if !cfg.CAFile.ExplicitlySet() {
} cfg.CAFile.Value = filepath.Join(cfg.AppDataDir.Value, defaultCAFilename)
// Add default port to connect flag if missing. // If the CA copy does not exist, check if we're connecting to
cfg.RPCConnect, err = cfgutil.NormalizeAddress(cfg.RPCConnect, // a local btcd and switch to its RPC cert if it exists.
activeNet.RPCClientPort) certExists, err := cfgutil.FileExists(cfg.CAFile.Value)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr,
"Invalid rpcconnect network address: %v\n", err)
return nil, nil, err
}
RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect)
if err != nil {
return nil, nil, err
}
if cfg.DisableClientTLS {
if _, ok := localhostListeners[RPCHost]; !ok {
str := "%s: the --noclienttls option may not be used " +
"when connecting RPC to non localhost " +
"addresses: %s"
err := fmt.Errorf(str, funcName, cfg.RPCConnect)
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
fmt.Fprintln(os.Stderr, usageMessage)
return nil, nil, err return nil, nil, err
} }
} else { if !certExists {
// If CAFile is unset, choose either the copy or local btcd cert. if _, ok := localhostListeners[RPCHost]; ok {
if !cfg.CAFile.ExplicitlySet() { btcdCertExists, err := cfgutil.FileExists(
cfg.CAFile.Value = filepath.Join(cfg.AppDataDir.Value, defaultCAFilename) btcdDefaultCAFile)
if err != nil {
// If the CA copy does not exist, check if we're connecting to fmt.Fprintln(os.Stderr, err)
// a local btcd and switch to its RPC cert if it exists. return nil, nil, err
certExists, err := cfgutil.FileExists(cfg.CAFile.Value) }
if err != nil { if btcdCertExists {
fmt.Fprintln(os.Stderr, err) cfg.CAFile.Value = btcdDefaultCAFile
return nil, nil, err
}
if !certExists {
if _, ok := localhostListeners[RPCHost]; ok {
btcdCertExists, err := cfgutil.FileExists(
btcdDefaultCAFile)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return nil, nil, err
}
if btcdCertExists {
cfg.CAFile.Value = btcdDefaultCAFile
}
} }
} }
} }

1
go.sum
View file

@ -74,7 +74,6 @@ github.com/lightninglabs/neutrino v0.12.1 h1:9umzk5kKNc/l3bAyak8ClSRP1qSulnjc6kp
github.com/lightninglabs/neutrino v0.12.1/go.mod h1:GlKninWpRBbL7b8G0oQ36/8downfnFwKsr0hbRA6E/E= github.com/lightninglabs/neutrino v0.12.1/go.mod h1:GlKninWpRBbL7b8G0oQ36/8downfnFwKsr0hbRA6E/E=
github.com/lightningnetwork/lnd/clock v1.0.1 h1:QQod8+m3KgqHdvVMV+2DRNNZS1GRFir8mHZYA+Z2hFo= github.com/lightningnetwork/lnd/clock v1.0.1 h1:QQod8+m3KgqHdvVMV+2DRNNZS1GRFir8mHZYA+Z2hFo=
github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg= github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg=
github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0=
github.com/lightningnetwork/lnd/queue v1.0.1/go.mod h1:vaQwexir73flPW43Mrm7JOgJHmcEFBWWSl9HlyASoms= github.com/lightningnetwork/lnd/queue v1.0.1/go.mod h1:vaQwexir73flPW43Mrm7JOgJHmcEFBWWSl9HlyASoms=
github.com/lightningnetwork/lnd/ticker v1.0.0 h1:S1b60TEGoTtCe2A0yeB+ecoj/kkS4qpwh6l+AkQEZwU= github.com/lightningnetwork/lnd/ticker v1.0.0 h1:S1b60TEGoTtCe2A0yeB+ecoj/kkS4qpwh6l+AkQEZwU=
github.com/lightningnetwork/lnd/ticker v1.0.0/go.mod h1:iaLXJiVgI1sPANIF2qYYUJXjoksPNvGNYowB8aRbpX0= github.com/lightningnetwork/lnd/ticker v1.0.0/go.mod h1:iaLXJiVgI1sPANIF2qYYUJXjoksPNvGNYowB8aRbpX0=

2
log.go
View file

@ -18,7 +18,6 @@ import (
"github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/wtxmgr" "github.com/btcsuite/btcwallet/wtxmgr"
"github.com/jrick/logrotate/rotator" "github.com/jrick/logrotate/rotator"
"github.com/lightninglabs/neutrino"
) )
// logWriter implements an io.Writer that outputs to both standard output and // logWriter implements an io.Writer that outputs to both standard output and
@ -70,7 +69,6 @@ func init() {
rpcclient.UseLogger(chainLog) rpcclient.UseLogger(chainLog)
rpcserver.UseLogger(grpcLog) rpcserver.UseLogger(grpcLog)
legacyrpc.UseLogger(legacyRPCLog) legacyrpc.UseLogger(legacyRPCLog)
neutrino.UseLogger(btcnLog)
} }
// subsystemLoggers maps each subsystem identifier to its associated logger. // subsystemLoggers maps each subsystem identifier to its associated logger.

View file

@ -193,14 +193,6 @@ func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) {
} }
w.chainClient = chainClient w.chainClient = chainClient
// If the chain client is a NeutrinoClient instance, set a birthday so
// we don't download all the filters as we go.
switch cc := chainClient.(type) {
case *chain.NeutrinoClient:
cc.SetStartTime(w.Manager.Birthday())
case *chain.BitcoindClient:
cc.SetBirthday(w.Manager.Birthday())
}
w.chainClientLock.Unlock() w.chainClientLock.Unlock()
// TODO: It would be preferable to either run these goroutines // TODO: It would be preferable to either run these goroutines
@ -364,21 +356,12 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error {
return err return err
} }
// Neutrino relies on the information given to it by the cfheader server
// so it knows exactly whether it's synced up to the server's state or
// not, even on dev chains. To recover a Neutrino wallet, we need to
// make sure it's synced before we start scanning for addresses,
// otherwise we might miss some if we only scan up to its current sync
// point.
neutrinoRecovery := chainClient.BackEnd() == "neutrino" &&
w.recoveryWindow > 0
// We'll wait until the backend is synced to ensure we get the latest // We'll wait until the backend is synced to ensure we get the latest
// MaxReorgDepth blocks to store. We don't do this for development // MaxReorgDepth blocks to store. We don't do this for development
// environments as we can't guarantee a lively chain, except for // environments as we can't guarantee a lively chain, except for
// Neutrino, where the cfheader server tells us what it believes the // Neutrino, where the cfheader server tells us what it believes the
// chain tip is. // chain tip is.
if !w.isDevEnv() || neutrinoRecovery { if !w.isDevEnv() {
log.Debug("Waiting for chain backend to sync to tip") log.Debug("Waiting for chain backend to sync to tip")
if err := w.waitUntilBackendSynced(chainClient); err != nil { if err := w.waitUntilBackendSynced(chainClient); err != nil {
return err return err
@ -2285,18 +2268,6 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier,
return nil, err return nil, err
} }
start = startHeader.Height start = startHeader.Height
case *chain.BitcoindClient:
var err error
start, err = client.GetBlockHeight(startBlock.hash)
if err != nil {
return nil, err
}
case *chain.NeutrinoClient:
var err error
start, err = client.GetBlockHeight(startBlock.hash)
if err != nil {
return nil, err
}
} }
} }
} }
@ -2316,18 +2287,6 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier,
return nil, err return nil, err
} }
end = endHeader.Height end = endHeader.Height
case *chain.BitcoindClient:
var err error
start, err = client.GetBlockHeight(endBlock.hash)
if err != nil {
return nil, err
}
case *chain.NeutrinoClient:
var err error
end, err = client.GetBlockHeight(endBlock.hash)
if err != nil {
return nil, err
}
} }
} }
} }