From 8701ecb329a390fb3b68243e0dc8a55e750b51b1 Mon Sep 17 00:00:00 2001 From: Roy Lee Date: Wed, 25 Aug 2021 14:03:05 -0700 Subject: [PATCH] [lbry] build: remove neutrino and bitcoind support --- btcwallet.go | 45 +- chain/bitcoind_client.go | 1339 ------------------------- chain/bitcoind_conn.go | 602 ----------- chain/interface.go | 1 - chain/neutrino.go | 708 ------------- chain/pruned_block_dispatcher.go | 666 ------------ chain/pruned_block_dispatcher_test.go | 659 ------------ config.go | 98 +- go.sum | 1 - log.go | 2 - wallet/wallet.go | 43 +- 11 files changed, 49 insertions(+), 4115 deletions(-) delete mode 100644 chain/bitcoind_client.go delete mode 100644 chain/bitcoind_conn.go delete mode 100644 chain/pruned_block_dispatcher.go delete mode 100644 chain/pruned_block_dispatcher_test.go diff --git a/btcwallet.go b/btcwallet.go index 6974fa7..7f2d780 100644 --- a/btcwallet.go +++ b/btcwallet.go @@ -10,15 +10,12 @@ import ( "net/http" _ "net/http/pprof" "os" - "path/filepath" "runtime" "sync" "github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/rpc/legacyrpc" "github.com/btcsuite/btcwallet/wallet" - "github.com/btcsuite/btcwallet/walletdb" - "github.com/lightninglabs/neutrino" ) var ( @@ -156,44 +153,10 @@ func rpcClientConnectLoop(legacyRPCServer *legacyrpc.Server, loader *wallet.Load err error ) - if cfg.UseSPV { - var ( - chainService *neutrino.ChainService - spvdb walletdb.DB - ) - netDir := networkDir(cfg.AppDataDir.Value, activeNet.Params) - spvdb, err = walletdb.Create( - "bdb", filepath.Join(netDir, "neutrino.db"), - true, cfg.DBTimeout, - ) - if err != nil { - log.Errorf("Unable to create Neutrino DB: %s", err) - continue - } - defer spvdb.Close() - chainService, err = neutrino.NewChainService( - neutrino.Config{ - DataDir: netDir, - Database: spvdb, - ChainParams: *activeNet.Params, - ConnectPeers: cfg.ConnectPeers, - AddPeers: cfg.AddPeers, - }) - if err != nil { - log.Errorf("Couldn't create Neutrino ChainService: %s", err) - continue - } - chainClient = chain.NewNeutrinoClient(activeNet.Params, chainService) - err = chainClient.Start() - if err != nil { - log.Errorf("Couldn't start Neutrino client: %s", err) - } - } else { - chainClient, err = startChainRPC(certs) - if err != nil { - log.Errorf("Unable to open connection to consensus RPC server: %v", err) - continue - } + chainClient, err = startChainRPC(certs) + if err != nil { + log.Errorf("Unable to open connection to consensus RPC server: %v", err) + continue } // Rather than inlining this logic directly into the loader diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go deleted file mode 100644 index caaa3a7..0000000 --- a/chain/bitcoind_client.go +++ /dev/null @@ -1,1339 +0,0 @@ -package chain - -import ( - "container/list" - "encoding/hex" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/btcsuite/btcd/btcjson" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/txscript" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" - "github.com/btcsuite/btcwallet/waddrmgr" - "github.com/btcsuite/btcwallet/wtxmgr" -) - -var ( - // ErrBitcoindClientShuttingDown is an error returned when we attempt - // to receive a notification for a specific item and the bitcoind client - // is in the middle of shutting down. - ErrBitcoindClientShuttingDown = errors.New("client is shutting down") -) - -// BitcoindClient represents a persistent client connection to a bitcoind server -// for information regarding the current best block chain. -type BitcoindClient struct { - // notifyBlocks signals whether the client is sending block - // notifications to the caller. This must be used atomically. - notifyBlocks uint32 - - started int32 // To be used atomically. - stopped int32 // To be used atomically. - - // birthday is the earliest time for which we should begin scanning the - // chain. - birthday time.Time - - // id is the unique ID of this client assigned by the backing bitcoind - // connection. - id uint64 - - // chainConn is the backing client to our rescan client that contains - // the RPC and ZMQ connections to a bitcoind node. - chainConn *BitcoindConn - - // bestBlock keeps track of the tip of the current best chain. - bestBlockMtx sync.RWMutex - bestBlock waddrmgr.BlockStamp - - // rescanUpdate is a channel will be sent items that we should match - // transactions against while processing a chain rescan to determine if - // they are relevant to the client. - rescanUpdate chan interface{} - - // watchedAddresses, watchedOutPoints, and watchedTxs are the set of - // items we should match transactions against while processing a chain - // rescan to determine if they are relevant to the client. - watchMtx sync.RWMutex - watchedAddresses map[string]struct{} - watchedOutPoints map[wire.OutPoint]struct{} - watchedTxs map[chainhash.Hash]struct{} - - // mempool keeps track of all relevant transactions that have yet to be - // confirmed. This is used to shortcut the filtering process of a - // transaction when a new confirmed transaction notification is - // received. - // - // NOTE: This requires the watchMtx to be held. - mempool map[chainhash.Hash]struct{} - - // expiredMempool keeps track of a set of confirmed transactions along - // with the height at which they were included in a block. These - // transactions will then be removed from the mempool after a period of - // 288 blocks. This is done to ensure the transactions are safe from a - // reorg in the chain. - // - // NOTE: This requires the watchMtx to be held. - expiredMempool map[int32]map[chainhash.Hash]struct{} - - // notificationQueue is a concurrent unbounded queue that handles - // dispatching notifications to the subscriber of this client. - // - // TODO: Rather than leaving this as an unbounded queue for all types of - // notifications, try dropping ones where a later enqueued notification - // can fully invalidate one waiting to be processed. For example, - // BlockConnected notifications for greater block heights can remove the - // need to process earlier notifications still waiting to be processed. - notificationQueue *ConcurrentQueue - - // zmqTxNtfns is a channel through which ZMQ transaction events will be - // retrieved from the backing bitcoind connection. - zmqTxNtfns chan *wire.MsgTx - - // zmqBlockNtfns is a channel through which ZMQ block events will be - // retrieved from the backing bitcoind connection. - zmqBlockNtfns chan *wire.MsgBlock - - quit chan struct{} - wg sync.WaitGroup -} - -// A compile-time check to ensure that BitcoindClient satisfies the -// chain.Interface interface. -var _ Interface = (*BitcoindClient)(nil) - -// BackEnd returns the name of the driver. -func (c *BitcoindClient) BackEnd() string { - return "bitcoind" -} - -// GetBestBlock returns the highest block known to bitcoind. -func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) { - bcinfo, err := c.chainConn.client.GetBlockChainInfo() - if err != nil { - return nil, 0, err - } - - hash, err := chainhash.NewHashFromStr(bcinfo.BestBlockHash) - if err != nil { - return nil, 0, err - } - - return hash, bcinfo.Blocks, nil -} - -// GetBlockHeight returns the height for the hash, if known, or returns an -// error. -func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { - header, err := c.chainConn.client.GetBlockHeaderVerbose(hash) - if err != nil { - return 0, err - } - - return header.Height, nil -} - -// GetBlock returns a block from the hash. -func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { - return c.chainConn.GetBlock(hash) -} - -// GetBlockVerbose returns a verbose block from the hash. -func (c *BitcoindClient) GetBlockVerbose( - hash *chainhash.Hash) (*btcjson.GetBlockVerboseResult, error) { - - return c.chainConn.client.GetBlockVerbose(hash) -} - -// GetBlockHash returns a block hash from the height. -func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) { - return c.chainConn.client.GetBlockHash(height) -} - -// GetBlockHeader returns a block header from the hash. -func (c *BitcoindClient) GetBlockHeader( - hash *chainhash.Hash) (*wire.BlockHeader, error) { - - return c.chainConn.client.GetBlockHeader(hash) -} - -// GetBlockHeaderVerbose returns a block header from the hash. -func (c *BitcoindClient) GetBlockHeaderVerbose( - hash *chainhash.Hash) (*btcjson.GetBlockHeaderVerboseResult, error) { - - return c.chainConn.client.GetBlockHeaderVerbose(hash) -} - -// IsCurrent returns whether the chain backend considers its view of the network -// as "current". -func (c *BitcoindClient) IsCurrent() bool { - bestHash, _, err := c.GetBestBlock() - if err != nil { - return false - } - bestHeader, err := c.GetBlockHeader(bestHash) - if err != nil { - return false - } - return bestHeader.Timestamp.After(time.Now().Add(-isCurrentDelta)) -} - -// GetRawTransactionVerbose returns a transaction from the tx hash. -func (c *BitcoindClient) GetRawTransactionVerbose( - hash *chainhash.Hash) (*btcjson.TxRawResult, error) { - - return c.chainConn.client.GetRawTransactionVerbose(hash) -} - -// GetTxOut returns a txout from the outpoint info provided. -func (c *BitcoindClient) GetTxOut(txHash *chainhash.Hash, index uint32, - mempool bool) (*btcjson.GetTxOutResult, error) { - - return c.chainConn.client.GetTxOut(txHash, index, mempool) -} - -// SendRawTransaction sends a raw transaction via bitcoind. -func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx, - allowHighFees bool) (*chainhash.Hash, error) { - - return c.chainConn.client.SendRawTransaction(tx, allowHighFees) -} - -// Notifications returns a channel to retrieve notifications from. -// -// NOTE: This is part of the chain.Interface interface. -func (c *BitcoindClient) Notifications() <-chan interface{} { - return c.notificationQueue.ChanOut() -} - -// NotifyReceived allows the chain backend to notify the caller whenever a -// transaction pays to any of the given addresses. -// -// NOTE: This is part of the chain.Interface interface. -func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { - _ = c.NotifyBlocks() - - select { - case c.rescanUpdate <- addrs: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - - return nil -} - -// NotifySpent allows the chain backend to notify the caller whenever a -// transaction spends any of the given outpoints. -func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { - _ = c.NotifyBlocks() - - select { - case c.rescanUpdate <- outPoints: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - - return nil -} - -// NotifyTx allows the chain backend to notify the caller whenever any of the -// given transactions confirm within the chain. -func (c *BitcoindClient) NotifyTx(txids []chainhash.Hash) error { - _ = c.NotifyBlocks() - - select { - case c.rescanUpdate <- txids: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - - return nil -} - -// NotifyBlocks allows the chain backend to notify the caller whenever a block -// is connected or disconnected. -// -// NOTE: This is part of the chain.Interface interface. -func (c *BitcoindClient) NotifyBlocks() error { - // We'll guard the goroutine being spawned below by the notifyBlocks - // variable we'll use atomically. We'll make sure to reset it in case of - // a failure before spawning the goroutine so that it can be retried. - if !atomic.CompareAndSwapUint32(&c.notifyBlocks, 0, 1) { - return nil - } - - // Re-evaluate our known best block since it's possible that blocks have - // occurred between now and when the client was created. This ensures we - // don't detect a new notified block as a potential reorg. - bestHash, bestHeight, err := c.GetBestBlock() - if err != nil { - atomic.StoreUint32(&c.notifyBlocks, 0) - return fmt.Errorf("unable to retrieve best block: %v", err) - } - bestHeader, err := c.GetBlockHeaderVerbose(bestHash) - if err != nil { - atomic.StoreUint32(&c.notifyBlocks, 0) - return fmt.Errorf("unable to retrieve header for best block: "+ - "%v", err) - } - - c.bestBlockMtx.Lock() - c.bestBlock.Hash = *bestHash - c.bestBlock.Height = bestHeight - c.bestBlock.Timestamp = time.Unix(bestHeader.Time, 0) - c.bestBlockMtx.Unlock() - - // Include the client in the set of rescan clients of the backing - // bitcoind connection in order to receive ZMQ event notifications for - // new blocks and transactions. - c.chainConn.AddClient(c) - - c.wg.Add(1) - go c.ntfnHandler() - - return nil -} - -// shouldNotifyBlocks determines whether the client should send block -// notifications to the caller. -func (c *BitcoindClient) shouldNotifyBlocks() bool { - return atomic.LoadUint32(&c.notifyBlocks) == 1 -} - -// LoadTxFilter uses the given filters to what we should match transactions -// against to determine if they are relevant to the client. The reset argument -// is used to reset the current filters. -// -// The current filters supported are of the following types: -// []btcutil.Address -// []wire.OutPoint -// []*wire.OutPoint -// map[wire.OutPoint]btcutil.Address -// []chainhash.Hash -// []*chainhash.Hash -func (c *BitcoindClient) LoadTxFilter(reset bool, filters ...interface{}) error { - if reset { - select { - case c.rescanUpdate <- struct{}{}: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - } - - updateFilter := func(filter interface{}) error { - select { - case c.rescanUpdate <- filter: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - - return nil - } - - // In order to make this operation atomic, we'll iterate through the - // filters twice: the first to ensure there aren't any unsupported - // filter types, and the second to actually update our filters. - for _, filter := range filters { - switch filter := filter.(type) { - case []btcutil.Address, []wire.OutPoint, []*wire.OutPoint, - map[wire.OutPoint]btcutil.Address, []chainhash.Hash, - []*chainhash.Hash: - - // Proceed to check the next filter type. - default: - return fmt.Errorf("unsupported filter type %T", filter) - } - } - - for _, filter := range filters { - if err := updateFilter(filter); err != nil { - return err - } - } - - return nil -} - -// RescanBlocks rescans any blocks passed, returning only the blocks that -// matched as []btcjson.BlockDetails. -func (c *BitcoindClient) RescanBlocks( - blockHashes []chainhash.Hash) ([]btcjson.RescannedBlock, error) { - - rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes)) - for _, hash := range blockHashes { - hash := hash - - header, err := c.GetBlockHeaderVerbose(&hash) - if err != nil { - log.Warnf("Unable to get header %s from bitcoind: %s", - hash, err) - continue - } - - // Prevent fetching the block completely if we know we shouldn't - // filter it. - if !c.shouldFilterBlock(time.Unix(header.Time, 0)) { - continue - } - - block, err := c.GetBlock(&hash) - if err != nil { - log.Warnf("Unable to get block %s from bitcoind: %s", - hash, err) - continue - } - - relevantTxs := c.filterBlock(block, header.Height, false) - if len(relevantTxs) > 0 { - rescannedBlock := btcjson.RescannedBlock{ - Hash: hash.String(), - } - for _, tx := range relevantTxs { - rescannedBlock.Transactions = append( - rescannedBlock.Transactions, - hex.EncodeToString(tx.SerializedTx), - ) - } - - rescannedBlocks = append(rescannedBlocks, rescannedBlock) - } - } - - return rescannedBlocks, nil -} - -// Rescan rescans from the block with the given hash until the current block, -// after adding the passed addresses and outpoints to the client's watch list. -func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, - addresses []btcutil.Address, outPoints map[wire.OutPoint]btcutil.Address) error { - - // A block hash is required to use as the starting point of the rescan. - if blockHash == nil { - return errors.New("rescan requires a starting block hash") - } - - // We'll then update our filters with the given outpoints and addresses. - select { - case c.rescanUpdate <- addresses: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - - select { - case c.rescanUpdate <- outPoints: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - - // Once the filters have been updated, we can begin the rescan. - select { - case c.rescanUpdate <- *blockHash: - case <-c.quit: - return ErrBitcoindClientShuttingDown - } - - return nil -} - -// Start initializes the bitcoind rescan client using the backing bitcoind -// connection and starts all goroutines necessary in order to process rescans -// and ZMQ notifications. -// -// NOTE: This is part of the chain.Interface interface. -func (c *BitcoindClient) Start() error { - if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { - return nil - } - - // Start the notification queue and immediately dispatch a - // ClientConnected notification to the caller. This is needed as some of - // the callers will require this notification before proceeding. - c.notificationQueue.Start() - c.notificationQueue.ChanIn() <- ClientConnected{} - - // Retrieve the best block of the chain. - bestHash, bestHeight, err := c.GetBestBlock() - if err != nil { - return fmt.Errorf("unable to retrieve best block: %v", err) - } - bestHeader, err := c.GetBlockHeaderVerbose(bestHash) - if err != nil { - return fmt.Errorf("unable to retrieve header for best block: "+ - "%v", err) - } - - c.bestBlockMtx.Lock() - c.bestBlock = waddrmgr.BlockStamp{ - Hash: *bestHash, - Height: bestHeight, - Timestamp: time.Unix(bestHeader.Time, 0), - } - c.bestBlockMtx.Unlock() - - c.wg.Add(1) - go c.rescanHandler() - - return nil -} - -// Stop stops the bitcoind rescan client from processing rescans and ZMQ -// notifications. -// -// NOTE: This is part of the chain.Interface interface. -func (c *BitcoindClient) Stop() { - if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { - return - } - - close(c.quit) - - // Remove this client's reference from the bitcoind connection to - // prevent sending notifications to it after it's been stopped. - c.chainConn.RemoveClient(c.id) - - c.notificationQueue.Stop() -} - -// WaitForShutdown blocks until the client has finished disconnecting and all -// handlers have exited. -// -// NOTE: This is part of the chain.Interface interface. -func (c *BitcoindClient) WaitForShutdown() { - c.wg.Wait() -} - -// rescanHandler handles the logic needed for the caller to trigger a chain -// rescan. -// -// NOTE: This must be called as a goroutine. -func (c *BitcoindClient) rescanHandler() { - defer c.wg.Done() - - for { - select { - case update := <-c.rescanUpdate: - switch update := update.(type) { - - // We're clearing the filters. - case struct{}: - c.watchMtx.Lock() - c.watchedOutPoints = make(map[wire.OutPoint]struct{}) - c.watchedAddresses = make(map[string]struct{}) - c.watchedTxs = make(map[chainhash.Hash]struct{}) - c.watchMtx.Unlock() - - // We're adding the addresses to our filter. - case []btcutil.Address: - c.watchMtx.Lock() - for _, addr := range update { - c.watchedAddresses[addr.String()] = struct{}{} - } - c.watchMtx.Unlock() - - // We're adding the outpoints to our filter. - case []wire.OutPoint: - c.watchMtx.Lock() - for _, op := range update { - c.watchedOutPoints[op] = struct{}{} - } - c.watchMtx.Unlock() - case []*wire.OutPoint: - c.watchMtx.Lock() - for _, op := range update { - c.watchedOutPoints[*op] = struct{}{} - } - c.watchMtx.Unlock() - - // We're adding the outpoints that map to the scripts - // that we should scan for to our filter. - case map[wire.OutPoint]btcutil.Address: - c.watchMtx.Lock() - for op := range update { - c.watchedOutPoints[op] = struct{}{} - } - c.watchMtx.Unlock() - - // We're adding the transactions to our filter. - case []chainhash.Hash: - c.watchMtx.Lock() - for _, txid := range update { - c.watchedTxs[txid] = struct{}{} - } - c.watchMtx.Unlock() - case []*chainhash.Hash: - c.watchMtx.Lock() - for _, txid := range update { - c.watchedTxs[*txid] = struct{}{} - } - c.watchMtx.Unlock() - - // We're starting a rescan from the hash. - case chainhash.Hash: - if err := c.rescan(update); err != nil { - log.Errorf("Unable to complete chain "+ - "rescan: %v", err) - } - default: - log.Warnf("Received unexpected filter type %T", - update) - } - case <-c.quit: - return - } - } -} - -// ntfnHandler handles the logic to retrieve ZMQ notifications from the backing -// bitcoind connection. -// -// NOTE: This must be called as a goroutine. -func (c *BitcoindClient) ntfnHandler() { - defer c.wg.Done() - - for { - select { - case tx := <-c.zmqTxNtfns: - if _, _, err := c.filterTx(tx, nil, true); err != nil { - log.Errorf("Unable to filter transaction %v: %v", - tx.TxHash(), err) - } - case newBlock := <-c.zmqBlockNtfns: - // If the new block's previous hash matches the best - // hash known to us, then the new block is the next - // successor, so we'll update our best block to reflect - // this and determine if this new block matches any of - // our existing filters. - c.bestBlockMtx.RLock() - bestBlock := c.bestBlock - c.bestBlockMtx.RUnlock() - if newBlock.Header.PrevBlock == bestBlock.Hash { - newBlockHeight := bestBlock.Height + 1 - _ = c.filterBlock(newBlock, newBlockHeight, true) - - // With the block successfully filtered, we'll - // make it our new best block. - bestBlock.Hash = newBlock.BlockHash() - bestBlock.Height = newBlockHeight - bestBlock.Timestamp = newBlock.Header.Timestamp - - c.bestBlockMtx.Lock() - c.bestBlock = bestBlock - c.bestBlockMtx.Unlock() - - continue - } - - // Otherwise, we've encountered a reorg. - if err := c.reorg(bestBlock, newBlock); err != nil { - log.Errorf("Unable to process chain reorg: %v", - err) - } - case <-c.quit: - return - } - } -} - -// SetBirthday sets the birthday of the bitcoind rescan client. -// -// NOTE: This should be done before the client has been started in order for it -// to properly carry its duties. -func (c *BitcoindClient) SetBirthday(t time.Time) { - c.birthday = t -} - -// BlockStamp returns the latest block notified by the client, or an error -// if the client has been shut down. -func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { - c.bestBlockMtx.RLock() - bestBlock := c.bestBlock - c.bestBlockMtx.RUnlock() - - return &bestBlock, nil -} - -// onBlockConnected is a callback that's executed whenever a new block has been -// detected. This will queue a BlockConnected notification to the caller. -func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, - timestamp time.Time) { - - if c.shouldNotifyBlocks() { - select { - case c.notificationQueue.ChanIn() <- BlockConnected{ - Block: wtxmgr.Block{ - Hash: *hash, - Height: height, - }, - Time: timestamp, - }: - case <-c.quit: - } - } -} - -// onFilteredBlockConnected is an alternative callback that's executed whenever -// a new block has been detected. It serves the same purpose as -// onBlockConnected, but it also includes a list of the relevant transactions -// found within the block being connected. This will queue a -// FilteredBlockConnected notification to the caller. -func (c *BitcoindClient) onFilteredBlockConnected(height int32, - header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { - - if c.shouldNotifyBlocks() { - select { - case c.notificationQueue.ChanIn() <- FilteredBlockConnected{ - Block: &wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - Hash: header.BlockHash(), - Height: height, - }, - Time: header.Timestamp, - }, - RelevantTxs: relevantTxs, - }: - case <-c.quit: - } - } -} - -// onBlockDisconnected is a callback that's executed whenever a block has been -// disconnected. This will queue a BlockDisconnected notification to the caller -// with the details of the block being disconnected. -func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, - timestamp time.Time) { - - if c.shouldNotifyBlocks() { - select { - case c.notificationQueue.ChanIn() <- BlockDisconnected{ - Block: wtxmgr.Block{ - Hash: *hash, - Height: height, - }, - Time: timestamp, - }: - case <-c.quit: - } - } -} - -// onRelevantTx is a callback that's executed whenever a transaction is relevant -// to the caller. This means that the transaction matched a specific item in the -// client's different filters. This will queue a RelevantTx notification to the -// caller. -func (c *BitcoindClient) onRelevantTx(tx *wtxmgr.TxRecord, - blockDetails *btcjson.BlockDetails) { - - block, err := parseBlock(blockDetails) - if err != nil { - log.Errorf("Unable to send onRelevantTx notification, failed "+ - "parse block: %v", err) - return - } - - select { - case c.notificationQueue.ChanIn() <- RelevantTx{ - TxRecord: tx, - Block: block, - }: - case <-c.quit: - } -} - -// onRescanProgress is a callback that's executed whenever a rescan is in -// progress. This will queue a RescanProgress notification to the caller with -// the current rescan progress details. -func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, - timestamp time.Time) { - - select { - case c.notificationQueue.ChanIn() <- &RescanProgress{ - Hash: hash, - Height: height, - Time: timestamp, - }: - case <-c.quit: - } -} - -// onRescanFinished is a callback that's executed whenever a rescan has -// finished. This will queue a RescanFinished notification to the caller with -// the details of the last block in the range of the rescan. -func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, - timestamp time.Time) { - - select { - case c.notificationQueue.ChanIn() <- &RescanFinished{ - Hash: hash, - Height: height, - Time: timestamp, - }: - case <-c.quit: - } -} - -// reorg processes a reorganization during chain synchronization. This is -// separate from a rescan's handling of a reorg. This will rewind back until it -// finds a common ancestor and notify all the new blocks since then. -func (c *BitcoindClient) reorg(currentBlock waddrmgr.BlockStamp, - reorgBlock *wire.MsgBlock) error { - - // Retrieve the best known height based on the block which caused the - // reorg. This way, we can preserve the chain of blocks we need to - // retrieve. - bestHash := reorgBlock.BlockHash() - bestHeight, err := c.GetBlockHeight(&bestHash) - if err != nil { - return fmt.Errorf("unable to get block height for %v: %v", - bestHash, err) - } - - log.Debugf("Possible reorg at block: height=%v, hash=%v", bestHeight, - bestHash) - - if bestHeight < currentBlock.Height { - log.Debugf("Detected multiple reorgs: best_height=%v below "+ - "current_height=%v", bestHeight, currentBlock.Height) - return nil - } - - // We'll now keep track of all the blocks known to the *chain*, starting - // from the best block known to us until the best block in the chain. - // This will let us fast-forward despite any future reorgs. - blocksToNotify := list.New() - blocksToNotify.PushFront(reorgBlock) - previousBlock := reorgBlock.Header.PrevBlock - for i := bestHeight - 1; i >= currentBlock.Height; i-- { - block, err := c.GetBlock(&previousBlock) - if err != nil { - return fmt.Errorf("unable to get block %v: %v", - previousBlock, err) - } - blocksToNotify.PushFront(block) - previousBlock = block.Header.PrevBlock - } - - // Rewind back to the last common ancestor block using the previous - // block hash from each header to avoid any race conditions. If we - // encounter more reorgs, they'll be queued and we'll repeat the cycle. - // - // We'll start by retrieving the header to the best block known to us. - currentHeader, err := c.GetBlockHeader(¤tBlock.Hash) - if err != nil { - return fmt.Errorf("unable to get block header for %v: %v", - currentBlock.Hash, err) - } - - // Then, we'll walk backwards in the chain until we find our common - // ancestor. - for previousBlock != currentHeader.PrevBlock { - // Since the previous hashes don't match, the current block has - // been reorged out of the chain, so we should send a - // BlockDisconnected notification for it. - log.Debugf("Disconnecting block: height=%v, hash=%v", - currentBlock.Height, currentBlock.Hash) - - c.onBlockDisconnected( - ¤tBlock.Hash, currentBlock.Height, - currentBlock.Timestamp, - ) - - // Our current block should now reflect the previous one to - // continue the common ancestor search. - prevBlock := ¤tHeader.PrevBlock - currentHeader, err = c.GetBlockHeader(prevBlock) - if err != nil { - return fmt.Errorf("unable to get block header for %v: %v", - prevBlock, err) - } - - currentBlock.Height-- - currentBlock.Hash = currentHeader.PrevBlock - currentBlock.Timestamp = currentHeader.Timestamp - - // Store the correct block in our list in order to notify it - // once we've found our common ancestor. - block, err := c.GetBlock(&previousBlock) - if err != nil { - return fmt.Errorf("unable to get block %v: %v", - previousBlock, err) - } - blocksToNotify.PushFront(block) - previousBlock = block.Header.PrevBlock - } - - // Disconnect the last block from the old chain. Since the previous - // block remains the same between the old and new chains, the tip will - // now be the last common ancestor. - log.Debugf("Disconnecting block: height=%v, hash=%v", - currentBlock.Height, currentBlock.Hash) - - c.onBlockDisconnected( - ¤tBlock.Hash, currentBlock.Height, currentHeader.Timestamp, - ) - - currentBlock.Height-- - - // Now we fast-forward to the new block, notifying along the way. - for blocksToNotify.Front() != nil { - nextBlock := blocksToNotify.Front().Value.(*wire.MsgBlock) - nextHeight := currentBlock.Height + 1 - nextHash := nextBlock.BlockHash() - nextHeader, err := c.GetBlockHeader(&nextHash) - if err != nil { - return fmt.Errorf("unable to get block header for %v: %v", - nextHash, err) - } - - _ = c.filterBlock(nextBlock, nextHeight, true) - - currentBlock.Height = nextHeight - currentBlock.Hash = nextHash - currentBlock.Timestamp = nextHeader.Timestamp - - blocksToNotify.Remove(blocksToNotify.Front()) - } - - c.bestBlockMtx.Lock() - c.bestBlock = currentBlock - c.bestBlockMtx.Unlock() - - return nil -} - -// FilterBlocks scans the blocks contained in the FilterBlocksRequest for any -// addresses of interest. Each block will be fetched and filtered sequentially, -// returning a FilterBlocksReponse for the first block containing a matching -// address. If no matches are found in the range of blocks requested, the -// returned response will be nil. -// -// NOTE: This is part of the chain.Interface interface. -func (c *BitcoindClient) FilterBlocks( - req *FilterBlocksRequest) (*FilterBlocksResponse, error) { - - blockFilterer := NewBlockFilterer(c.chainConn.cfg.ChainParams, req) - - // Iterate over the requested blocks, fetching each from the rpc client. - // Each block will scanned using the reverse addresses indexes generated - // above, breaking out early if any addresses are found. - for i, block := range req.Blocks { - // TODO(conner): add prefetching, since we already know we'll be - // fetching *every* block - rawBlock, err := c.GetBlock(&block.Hash) - if err != nil { - return nil, err - } - - if !blockFilterer.FilterBlock(rawBlock) { - continue - } - - // If any external or internal addresses were detected in this - // block, we return them to the caller so that the rescan - // windows can widened with subsequent addresses. The - // `BatchIndex` is returned so that the caller can compute the - // *next* block from which to begin again. - resp := &FilterBlocksResponse{ - BatchIndex: uint32(i), - BlockMeta: block, - FoundExternalAddrs: blockFilterer.FoundExternal, - FoundInternalAddrs: blockFilterer.FoundInternal, - FoundOutPoints: blockFilterer.FoundOutPoints, - RelevantTxns: blockFilterer.RelevantTxns, - } - - return resp, nil - } - - // No addresses were found for this range. - return nil, nil -} - -// rescan performs a rescan of the chain using a bitcoind backend, from the -// specified hash to the best known hash, while watching out for reorgs that -// happen during the rescan. It uses the addresses and outputs being tracked by -// the client in the watch list. This is called only within a queue processing -// loop. -func (c *BitcoindClient) rescan(start chainhash.Hash) error { - // We start by getting the best already processed block. We only use - // the height, as the hash can change during a reorganization, which we - // catch by testing connectivity from known blocks to the previous - // block. - bestHash, bestHeight, err := c.GetBestBlock() - if err != nil { - return err - } - bestHeader, err := c.GetBlockHeaderVerbose(bestHash) - if err != nil { - return err - } - bestBlock := waddrmgr.BlockStamp{ - Hash: *bestHash, - Height: bestHeight, - Timestamp: time.Unix(bestHeader.Time, 0), - } - - // Create a list of headers sorted in forward order. We'll use this in - // the event that we need to backtrack due to a chain reorg. - headers := list.New() - previousHeader, err := c.GetBlockHeaderVerbose(&start) - if err != nil { - return err - } - previousHash, err := chainhash.NewHashFromStr(previousHeader.Hash) - if err != nil { - return err - } - headers.PushBack(previousHeader) - - // Cycle through all of the blocks known to bitcoind, being mindful of - // reorgs. - for i := previousHeader.Height + 1; i <= bestBlock.Height; i++ { - hash, err := c.GetBlockHash(int64(i)) - if err != nil { - return err - } - - // If the previous header is before the wallet birthday, fetch - // the current header and construct a dummy block, rather than - // fetching the whole block itself. This speeds things up as we - // no longer have to fetch the whole block when we know it won't - // match any of our filters. - var block *wire.MsgBlock - afterBirthday := previousHeader.Time >= c.birthday.Unix() - if !afterBirthday { - header, err := c.GetBlockHeader(hash) - if err != nil { - return err - } - block = &wire.MsgBlock{ - Header: *header, - } - - afterBirthday = c.birthday.Before(header.Timestamp) - if afterBirthday { - c.onRescanProgress( - previousHash, i, - block.Header.Timestamp, - ) - } - } - - if afterBirthday { - block, err = c.GetBlock(hash) - if err != nil { - return err - } - } - - for block.Header.PrevBlock.String() != previousHeader.Hash { - // If we're in this for loop, it looks like we've been - // reorganized. We now walk backwards to the common - // ancestor between the best chain and the known chain. - // - // First, we signal a disconnected block to rewind the - // rescan state. - c.onBlockDisconnected( - previousHash, previousHeader.Height, - time.Unix(previousHeader.Time, 0), - ) - - // Get the previous block of the best chain. - hash, err := c.GetBlockHash(int64(i - 1)) - if err != nil { - return err - } - block, err = c.GetBlock(hash) - if err != nil { - return err - } - - // Then, we'll the get the header of this previous - // block. - if headers.Back() != nil { - // If it's already in the headers list, we can - // just get it from there and remove the - // current hash. - headers.Remove(headers.Back()) - if headers.Back() != nil { - previousHeader = headers.Back(). - Value.(*btcjson.GetBlockHeaderVerboseResult) - previousHash, err = chainhash.NewHashFromStr( - previousHeader.Hash, - ) - if err != nil { - return err - } - } - } else { - // Otherwise, we get it from bitcoind. - previousHash, err = chainhash.NewHashFromStr( - previousHeader.PreviousHash, - ) - if err != nil { - return err - } - previousHeader, err = c.GetBlockHeaderVerbose( - previousHash, - ) - if err != nil { - return err - } - } - } - - // Now that we've ensured we haven't come across a reorg, we'll - // add the current block header to our list of headers. - blockHash := block.BlockHash() - previousHash = &blockHash - previousHeader = &btcjson.GetBlockHeaderVerboseResult{ - Hash: blockHash.String(), - Height: i, - PreviousHash: block.Header.PrevBlock.String(), - Time: block.Header.Timestamp.Unix(), - } - headers.PushBack(previousHeader) - - // Notify the block and any of its relevant transacations. - _ = c.filterBlock(block, i, true) - - if i%10000 == 0 { - c.onRescanProgress( - previousHash, i, block.Header.Timestamp, - ) - } - - // If we've reached the previously best known block, check to - // make sure the underlying node hasn't synchronized additional - // blocks. If it has, update the best known block and continue - // to rescan to that point. - if i == bestBlock.Height { - bestHash, bestHeight, err = c.GetBestBlock() - if err != nil { - return err - } - bestHeader, err = c.GetBlockHeaderVerbose(bestHash) - if err != nil { - return err - } - - bestBlock.Hash = *bestHash - bestBlock.Height = bestHeight - bestBlock.Timestamp = time.Unix(bestHeader.Time, 0) - } - } - - c.onRescanFinished(bestHash, bestHeight, time.Unix(bestHeader.Time, 0)) - - return nil -} - -// shouldFilterBlock determines whether we should filter a block based on its -// timestamp or our watch list. -func (c *BitcoindClient) shouldFilterBlock(blockTimestamp time.Time) bool { - c.watchMtx.RLock() - hasEmptyFilter := len(c.watchedAddresses) == 0 && - len(c.watchedOutPoints) == 0 && len(c.watchedTxs) == 0 - c.watchMtx.RUnlock() - - return !(blockTimestamp.Before(c.birthday) || hasEmptyFilter) -} - -// filterBlock filters a block for watched outpoints and addresses, and returns -// any matching transactions, sending notifications along the way. -func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32, - notify bool) []*wtxmgr.TxRecord { - - // If this block happened before the client's birthday or we have - // nothing to filter for, then we'll skip it entirely. - blockHash := block.BlockHash() - if !c.shouldFilterBlock(block.Header.Timestamp) { - if notify { - c.onFilteredBlockConnected(height, &block.Header, nil) - c.onBlockConnected( - &blockHash, height, block.Header.Timestamp, - ) - } - return nil - } - - if c.shouldNotifyBlocks() { - log.Debugf("Filtering block %d (%s) with %d transactions", - height, block.BlockHash(), len(block.Transactions)) - } - - // Create a block details template to use for all of the confirmed - // transactions found within this block. - blockDetails := &btcjson.BlockDetails{ - Hash: blockHash.String(), - Height: height, - Time: block.Header.Timestamp.Unix(), - } - - // Now, we'll through all of the transactions in the block keeping track - // of any relevant to the caller. - var relevantTxs []*wtxmgr.TxRecord - confirmedTxs := make(map[chainhash.Hash]struct{}) - for i, tx := range block.Transactions { - // Update the index in the block details with the index of this - // transaction. - blockDetails.Index = i - isRelevant, rec, err := c.filterTx(tx, blockDetails, notify) - if err != nil { - log.Warnf("Unable to filter transaction %v: %v", - tx.TxHash(), err) - continue - } - - if isRelevant { - relevantTxs = append(relevantTxs, rec) - confirmedTxs[tx.TxHash()] = struct{}{} - } - } - - // Update the expiration map by setting the block's confirmed - // transactions and deleting any in the mempool that were confirmed - // over 288 blocks ago. - c.watchMtx.Lock() - c.expiredMempool[height] = confirmedTxs - if oldBlock, ok := c.expiredMempool[height-288]; ok { - for txHash := range oldBlock { - delete(c.mempool, txHash) - } - delete(c.expiredMempool, height-288) - } - c.watchMtx.Unlock() - - if notify { - c.onFilteredBlockConnected(height, &block.Header, relevantTxs) - c.onBlockConnected(&blockHash, height, block.Header.Timestamp) - } - - return relevantTxs -} - -// filterTx determines whether a transaction is relevant to the client by -// inspecting the client's different filters. -func (c *BitcoindClient) filterTx(tx *wire.MsgTx, - blockDetails *btcjson.BlockDetails, - notify bool) (bool, *wtxmgr.TxRecord, error) { - - txDetails := btcutil.NewTx(tx) - if blockDetails != nil { - txDetails.SetIndex(blockDetails.Index) - } - - rec, err := wtxmgr.NewTxRecordFromMsgTx(txDetails.MsgTx(), time.Now()) - if err != nil { - log.Errorf("Cannot create transaction record for relevant "+ - "tx: %v", err) - return false, nil, err - } - if blockDetails != nil { - rec.Received = time.Unix(blockDetails.Time, 0) - } - - // We'll begin the filtering process by holding the lock to ensure we - // match exactly against what's currently in the filters. - c.watchMtx.Lock() - defer c.watchMtx.Unlock() - - // If we've already seen this transaction and it's now been confirmed, - // then we'll shortcut the filter process by immediately sending a - // notification to the caller that the filter matches. - if _, ok := c.mempool[tx.TxHash()]; ok { - if notify && blockDetails != nil { - c.onRelevantTx(rec, blockDetails) - } - return true, rec, nil - } - - // Otherwise, this is a new transaction we have yet to see. We'll need - // to determine if this transaction is somehow relevant to the caller. - var isRelevant bool - - // We'll start by checking all inputs and determining whether it spends - // an existing outpoint or a pkScript encoded as an address in our watch - // list. - for _, txIn := range tx.TxIn { - // If it matches an outpoint in our watch list, we can exit our - // loop early. - if _, ok := c.watchedOutPoints[txIn.PreviousOutPoint]; ok { - isRelevant = true - break - } - - // Otherwise, we'll check whether it matches a pkScript in our - // watch list encoded as an address. To do so, we'll re-derive - // the pkScript of the output the input is attempting to spend. - pkScript, err := txscript.ComputePkScript( - txIn.SignatureScript, txIn.Witness, - ) - if err != nil { - // Non-standard outputs can be safely skipped. - continue - } - addr, err := pkScript.Address(c.chainConn.cfg.ChainParams) - if err != nil { - // Non-standard outputs can be safely skipped. - continue - } - if _, ok := c.watchedAddresses[addr.String()]; ok { - isRelevant = true - break - } - } - - // We'll also cycle through its outputs to determine if it pays to - // any of the currently watched addresses. If an output matches, we'll - // add it to our watch list. - for i, txOut := range tx.TxOut { - _, addrs, _, err := txscript.ExtractPkScriptAddrs( - txOut.PkScript, c.chainConn.cfg.ChainParams, - ) - if err != nil { - // Non-standard outputs can be safely skipped. - continue - } - - for _, addr := range addrs { - if _, ok := c.watchedAddresses[addr.String()]; ok { - isRelevant = true - op := wire.OutPoint{ - Hash: tx.TxHash(), - Index: uint32(i), - } - c.watchedOutPoints[op] = struct{}{} - } - } - } - - // If the transaction didn't pay to any of our watched addresses, we'll - // check if we're currently watching for the hash of this transaction. - if !isRelevant { - if _, ok := c.watchedTxs[tx.TxHash()]; ok { - isRelevant = true - } - } - - // If the transaction is not relevant to us, we can simply exit. - if !isRelevant { - return false, rec, nil - } - - // Otherwise, the transaction matched our filters, so we should dispatch - // a notification for it. If it's still unconfirmed, we'll include it in - // our mempool so that it can also be notified as part of - // FilteredBlockConnected once it confirms. - if blockDetails == nil { - c.mempool[tx.TxHash()] = struct{}{} - } - - c.onRelevantTx(rec, blockDetails) - - return true, rec, nil -} diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go deleted file mode 100644 index 3b1fdd2..0000000 --- a/chain/bitcoind_conn.go +++ /dev/null @@ -1,602 +0,0 @@ -package chain - -import ( - "bytes" - "fmt" - "io" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/btcsuite/btcd/btcjson" - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/rpcclient" - "github.com/btcsuite/btcd/wire" - "github.com/lightninglabs/gozmq" - "github.com/lightningnetwork/lnd/ticker" -) - -const ( - // rawBlockZMQCommand is the command used to receive raw block - // notifications from bitcoind through ZMQ. - rawBlockZMQCommand = "rawblock" - - // rawTxZMQCommand is the command used to receive raw transaction - // notifications from bitcoind through ZMQ. - rawTxZMQCommand = "rawtx" - - // maxRawBlockSize is the maximum size in bytes for a raw block received - // from bitcoind through ZMQ. - maxRawBlockSize = 4e6 - - // maxRawTxSize is the maximum size in bytes for a raw transaction - // received from bitcoind through ZMQ. - maxRawTxSize = maxRawBlockSize - - // seqNumLen is the length of the sequence number of a message sent from - // bitcoind through ZMQ. - seqNumLen = 4 - - // errBlockPrunedStr is the error message returned by bitcoind upon - // calling GetBlock on a pruned block. - errBlockPrunedStr = "Block not available (pruned data)" -) - -// BitcoindConfig contains all of the parameters required to establish a -// connection to a bitcoind's RPC. -type BitcoindConfig struct { - // ChainParams are the chain parameters the bitcoind server is running - // on. - ChainParams *chaincfg.Params - - // Host is the IP address and port of the bitcoind's RPC server. - Host string - - // User is the username to use to authenticate to bitcoind's RPC server. - User string - - // Pass is the passphrase to use to authenticate to bitcoind's RPC - // server. - Pass string - - // ZMQBlockHost is the IP address and port of the bitcoind's rawblock - // listener. - ZMQBlockHost string - - // ZMQTxHost is the IP address and port of the bitcoind's rawtx - // listener. - ZMQTxHost string - - // ZMQReadDeadline represents the read deadline we'll apply when reading - // ZMQ messages from either subscription. - ZMQReadDeadline time.Duration - - // Dialer is a closure we'll use to dial Bitcoin peers. If the chain - // backend is running over Tor, this must support dialing peers over Tor - // as well. - Dialer Dialer - - // PrunedModeMaxPeers is the maximum number of peers we'll attempt to - // retrieve pruned blocks from. - // - // NOTE: This only applies for pruned bitcoind nodes. - PrunedModeMaxPeers int -} - -// BitcoindConn represents a persistent client connection to a bitcoind node -// that listens for events read from a ZMQ connection. -type BitcoindConn struct { - started int32 // To be used atomically. - stopped int32 // To be used atomically. - - // rescanClientCounter is an atomic counter that assigns a unique ID to - // each new bitcoind rescan client using the current bitcoind - // connection. - rescanClientCounter uint64 - - cfg BitcoindConfig - - // client is the RPC client to the bitcoind node. - client *rpcclient.Client - - // prunedBlockDispatcher handles all of the pruned block requests. - // - // NOTE: This is nil when the bitcoind node is not pruned. - prunedBlockDispatcher *PrunedBlockDispatcher - - // zmqBlockConn is the ZMQ connection we'll use to read raw block - // events. - zmqBlockConn *gozmq.Conn - - // zmqTxConn is the ZMQ connection we'll use to read raw transaction - // events. - zmqTxConn *gozmq.Conn - - // rescanClients is the set of active bitcoind rescan clients to which - // ZMQ event notfications will be sent to. - rescanClientsMtx sync.Mutex - rescanClients map[uint64]*BitcoindClient - - quit chan struct{} - wg sync.WaitGroup -} - -// Dialer represents a way to dial Bitcoin peers. If the chain backend is -// running over Tor, this must support dialing peers over Tor as well. -type Dialer = func(string) (net.Conn, error) - -// NewBitcoindConn creates a client connection to the node described by the host -// string. The ZMQ connections are established immediately to ensure liveness. -// If the remote node does not operate on the same bitcoin network as described -// by the passed chain parameters, the connection will be disconnected. -func NewBitcoindConn(cfg *BitcoindConfig) (*BitcoindConn, error) { - clientCfg := &rpcclient.ConnConfig{ - Host: cfg.Host, - User: cfg.User, - Pass: cfg.Pass, - DisableAutoReconnect: false, - DisableConnectOnNew: true, - DisableTLS: true, - HTTPPostMode: true, - } - client, err := rpcclient.New(clientCfg, nil) - if err != nil { - return nil, err - } - - // Verify that the node is running on the expected network. - net, err := getCurrentNet(client) - if err != nil { - return nil, err - } - if net != cfg.ChainParams.Net { - return nil, fmt.Errorf("expected network %v, got %v", - cfg.ChainParams.Net, net) - } - - // Check if the node is pruned, as we'll need to perform additional - // operations if so. - chainInfo, err := client.GetBlockChainInfo() - if err != nil { - return nil, fmt.Errorf("unable to determine if bitcoind is "+ - "pruned: %v", err) - } - - // Establish two different ZMQ connections to bitcoind to retrieve block - // and transaction event notifications. We'll use two as a separation of - // concern to ensure one type of event isn't dropped from the connection - // queue due to another type of event filling it up. - zmqBlockConn, err := gozmq.Subscribe( - cfg.ZMQBlockHost, []string{rawBlockZMQCommand}, - cfg.ZMQReadDeadline, - ) - if err != nil { - return nil, fmt.Errorf("unable to subscribe for zmq block "+ - "events: %v", err) - } - - zmqTxConn, err := gozmq.Subscribe( - cfg.ZMQTxHost, []string{rawTxZMQCommand}, cfg.ZMQReadDeadline, - ) - if err != nil { - zmqBlockConn.Close() - return nil, fmt.Errorf("unable to subscribe for zmq tx "+ - "events: %v", err) - } - - // Only initialize the PrunedBlockDispatcher when the connected bitcoind - // node is pruned. - var prunedBlockDispatcher *PrunedBlockDispatcher - if chainInfo.Pruned { - prunedBlockDispatcher, err = NewPrunedBlockDispatcher( - &PrunedBlockDispatcherConfig{ - ChainParams: cfg.ChainParams, - NumTargetPeers: cfg.PrunedModeMaxPeers, - Dial: cfg.Dialer, - GetPeers: client.GetPeerInfo, - GetNodeAddresses: client.GetNodeAddresses, - PeerReadyTimeout: defaultPeerReadyTimeout, - RefreshPeersTicker: ticker.New(defaultRefreshPeersInterval), - MaxRequestInvs: wire.MaxInvPerMsg, - }, - ) - if err != nil { - return nil, err - } - } - - return &BitcoindConn{ - cfg: *cfg, - client: client, - prunedBlockDispatcher: prunedBlockDispatcher, - zmqBlockConn: zmqBlockConn, - zmqTxConn: zmqTxConn, - rescanClients: make(map[uint64]*BitcoindClient), - quit: make(chan struct{}), - }, nil -} - -// Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If -// successful, a goroutine is spawned to read events from the ZMQ connection. -// It's possible for this function to fail due to a limited number of connection -// attempts. This is done to prevent waiting forever on the connection to be -// established in the case that the node is down. -func (c *BitcoindConn) Start() error { - if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { - return nil - } - - // If we're connected to a pruned backend, we'll need to also start our - // pruned block dispatcher to handle pruned block requests. - if c.prunedBlockDispatcher != nil { - log.Debug("Detected pruned bitcoind backend") - if err := c.prunedBlockDispatcher.Start(); err != nil { - return err - } - } - - c.wg.Add(2) - go c.blockEventHandler() - go c.txEventHandler() - - return nil -} - -// Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any -// active rescan clients. -func (c *BitcoindConn) Stop() { - if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { - return - } - - for _, client := range c.rescanClients { - client.Stop() - } - - close(c.quit) - c.client.Shutdown() - c.zmqBlockConn.Close() - c.zmqTxConn.Close() - - if c.prunedBlockDispatcher != nil { - c.prunedBlockDispatcher.Stop() - } - - c.client.WaitForShutdown() - c.wg.Wait() -} - -// blockEventHandler reads raw blocks events from the ZMQ block socket and -// forwards them along to the current rescan clients. -// -// NOTE: This must be run as a goroutine. -func (c *BitcoindConn) blockEventHandler() { - defer c.wg.Done() - - log.Info("Started listening for bitcoind block notifications via ZMQ "+ - "on", c.zmqBlockConn.RemoteAddr()) - - // Set up the buffers we expect our messages to consume. ZMQ - // messages from bitcoind include three parts: the command, the - // data, and the sequence number. - // - // We'll allocate a fixed data slice that we'll reuse when reading - // blocks from bitcoind through ZMQ. There's no need to recycle this - // slice (zero out) after using it, as further reads will overwrite the - // slice and we'll only be deserializing the bytes needed. - var ( - command [len(rawBlockZMQCommand)]byte - seqNum [seqNumLen]byte - data = make([]byte, maxRawBlockSize) - ) - - for { - // Before attempting to read from the ZMQ socket, we'll make - // sure to check if we've been requested to shut down. - select { - case <-c.quit: - return - default: - } - - // Poll an event from the ZMQ socket. - var ( - bufs = [][]byte{command[:], data, seqNum[:]} - err error - ) - bufs, err = c.zmqBlockConn.Receive(bufs) - if err != nil { - // EOF should only be returned if the connection was - // explicitly closed, so we can exit at this point. - if err == io.EOF { - return - } - - // It's possible that the connection to the socket - // continuously times out, so we'll prevent logging this - // error to prevent spamming the logs. - netErr, ok := err.(net.Error) - if ok && netErr.Timeout() { - log.Trace("Re-establishing timed out ZMQ " + - "block connection") - continue - } - - log.Errorf("Unable to receive ZMQ %v message: %v", - rawBlockZMQCommand, err) - continue - } - - // We have an event! We'll now ensure it is a block event, - // deserialize it, and report it to the different rescan - // clients. - eventType := string(bufs[0]) - switch eventType { - case rawBlockZMQCommand: - block := &wire.MsgBlock{} - r := bytes.NewReader(bufs[1]) - if err := block.Deserialize(r); err != nil { - log.Errorf("Unable to deserialize block: %v", - err) - continue - } - - c.rescanClientsMtx.Lock() - for _, client := range c.rescanClients { - select { - case client.zmqBlockNtfns <- block: - case <-client.quit: - case <-c.quit: - c.rescanClientsMtx.Unlock() - return - } - } - c.rescanClientsMtx.Unlock() - default: - // It's possible that the message wasn't fully read if - // bitcoind shuts down, which will produce an unreadable - // event type. To prevent from logging it, we'll make - // sure it conforms to the ASCII standard. - if eventType == "" || !isASCII(eventType) { - continue - } - - log.Warnf("Received unexpected event type from %v "+ - "subscription: %v", rawBlockZMQCommand, - eventType) - } - } -} - -// txEventHandler reads raw blocks events from the ZMQ block socket and forwards -// them along to the current rescan clients. -// -// NOTE: This must be run as a goroutine. -func (c *BitcoindConn) txEventHandler() { - defer c.wg.Done() - - log.Info("Started listening for bitcoind transaction notifications "+ - "via ZMQ on", c.zmqTxConn.RemoteAddr()) - - // Set up the buffers we expect our messages to consume. ZMQ - // messages from bitcoind include three parts: the command, the - // data, and the sequence number. - // - // We'll allocate a fixed data slice that we'll reuse when reading - // transactions from bitcoind through ZMQ. There's no need to recycle - // this slice (zero out) after using it, as further reads will overwrite - // the slice and we'll only be deserializing the bytes needed. - var ( - command [len(rawTxZMQCommand)]byte - seqNum [seqNumLen]byte - data = make([]byte, maxRawTxSize) - ) - - for { - // Before attempting to read from the ZMQ socket, we'll make - // sure to check if we've been requested to shut down. - select { - case <-c.quit: - return - default: - } - - // Poll an event from the ZMQ socket. - var ( - bufs = [][]byte{command[:], data, seqNum[:]} - err error - ) - bufs, err = c.zmqTxConn.Receive(bufs) - if err != nil { - // EOF should only be returned if the connection was - // explicitly closed, so we can exit at this point. - if err == io.EOF { - return - } - - // It's possible that the connection to the socket - // continuously times out, so we'll prevent logging this - // error to prevent spamming the logs. - netErr, ok := err.(net.Error) - if ok && netErr.Timeout() { - log.Trace("Re-establishing timed out ZMQ " + - "transaction connection") - continue - } - - log.Errorf("Unable to receive ZMQ %v message: %v", - rawTxZMQCommand, err) - continue - } - - // We have an event! We'll now ensure it is a transaction event, - // deserialize it, and report it to the different rescan - // clients. - eventType := string(bufs[0]) - switch eventType { - case rawTxZMQCommand: - tx := &wire.MsgTx{} - r := bytes.NewReader(bufs[1]) - if err := tx.Deserialize(r); err != nil { - log.Errorf("Unable to deserialize "+ - "transaction: %v", err) - continue - } - - c.rescanClientsMtx.Lock() - for _, client := range c.rescanClients { - select { - case client.zmqTxNtfns <- tx: - case <-client.quit: - case <-c.quit: - c.rescanClientsMtx.Unlock() - return - } - } - c.rescanClientsMtx.Unlock() - default: - // It's possible that the message wasn't fully read if - // bitcoind shuts down, which will produce an unreadable - // event type. To prevent from logging it, we'll make - // sure it conforms to the ASCII standard. - if eventType == "" || !isASCII(eventType) { - continue - } - - log.Warnf("Received unexpected event type from %v "+ - "subscription: %v", rawTxZMQCommand, eventType) - } - } -} - -// getCurrentNet returns the network on which the bitcoind node is running. -func getCurrentNet(client *rpcclient.Client) (wire.BitcoinNet, error) { - hash, err := client.GetBlockHash(0) - if err != nil { - return 0, err - } - - switch *hash { - case *chaincfg.TestNet3Params.GenesisHash: - return chaincfg.TestNet3Params.Net, nil - case *chaincfg.RegressionNetParams.GenesisHash: - return chaincfg.RegressionNetParams.Net, nil - case *chaincfg.SigNetParams.GenesisHash: - return chaincfg.SigNetParams.Net, nil - case *chaincfg.MainNetParams.GenesisHash: - return chaincfg.MainNetParams.Net, nil - default: - return 0, fmt.Errorf("unknown network with genesis hash %v", hash) - } -} - -// NewBitcoindClient returns a bitcoind client using the current bitcoind -// connection. This allows us to share the same connection using multiple -// clients. -func (c *BitcoindConn) NewBitcoindClient() *BitcoindClient { - return &BitcoindClient{ - quit: make(chan struct{}), - - id: atomic.AddUint64(&c.rescanClientCounter, 1), - - chainConn: c, - - rescanUpdate: make(chan interface{}), - watchedAddresses: make(map[string]struct{}), - watchedOutPoints: make(map[wire.OutPoint]struct{}), - watchedTxs: make(map[chainhash.Hash]struct{}), - - notificationQueue: NewConcurrentQueue(20), - zmqTxNtfns: make(chan *wire.MsgTx), - zmqBlockNtfns: make(chan *wire.MsgBlock), - - mempool: make(map[chainhash.Hash]struct{}), - expiredMempool: make(map[int32]map[chainhash.Hash]struct{}), - } -} - -// AddClient adds a client to the set of active rescan clients of the current -// chain connection. This allows the connection to include the specified client -// in its notification delivery. -// -// NOTE: This function is safe for concurrent access. -func (c *BitcoindConn) AddClient(client *BitcoindClient) { - c.rescanClientsMtx.Lock() - defer c.rescanClientsMtx.Unlock() - - c.rescanClients[client.id] = client -} - -// RemoveClient removes the client with the given ID from the set of active -// rescan clients. Once removed, the client will no longer receive block and -// transaction notifications from the chain connection. -// -// NOTE: This function is safe for concurrent access. -func (c *BitcoindConn) RemoveClient(id uint64) { - c.rescanClientsMtx.Lock() - defer c.rescanClientsMtx.Unlock() - - delete(c.rescanClients, id) -} - -// isBlockPrunedErr determines if the error returned by the GetBlock RPC -// corresponds to the requested block being pruned. -func isBlockPrunedErr(err error) bool { - rpcErr, ok := err.(*btcjson.RPCError) - return ok && rpcErr.Code == btcjson.ErrRPCMisc && - rpcErr.Message == errBlockPrunedStr -} - -// GetBlock returns a raw block from the server given its hash. If the server -// has already pruned the block, it will be retrieved from one of its peers. -func (c *BitcoindConn) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { - block, err := c.client.GetBlock(hash) - // Got the block from the backend successfully, return it. - if err == nil { - return block, nil - } - - // We failed getting the block from the backend for whatever reason. If - // it wasn't due to the block being pruned, return the error - // immediately. - if !isBlockPrunedErr(err) || c.prunedBlockDispatcher == nil { - return nil, err - } - - // Now that we know the block has been pruned for sure, request it from - // our backend peers. - blockChan, errChan := c.prunedBlockDispatcher.Query( - []*chainhash.Hash{hash}, - ) - - for { - select { - case block := <-blockChan: - return block, nil - - case err := <-errChan: - if err != nil { - return nil, err - } - - // errChan fired before blockChan with a nil error, wait - // for the block now. - - case <-c.quit: - return nil, ErrBitcoindClientShuttingDown - } - } -} - -// isASCII is a helper method that checks whether all bytes in `data` would be -// printable ASCII characters if interpreted as a string. -func isASCII(s string) bool { - for _, c := range s { - if c < 32 || c > 126 { - return false - } - } - return true -} diff --git a/chain/interface.go b/chain/interface.go index da23e14..3e8c8a7 100644 --- a/chain/interface.go +++ b/chain/interface.go @@ -21,7 +21,6 @@ func BackEnds() []string { return []string{ "bitcoind", "btcd", - "neutrino", } } diff --git a/chain/neutrino.go b/chain/neutrino.go index cc5dde1..0080559 100644 --- a/chain/neutrino.go +++ b/chain/neutrino.go @@ -1,265 +1,9 @@ package chain import ( - "errors" - "fmt" - "sync" - "time" - - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/txscript" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" - "github.com/btcsuite/btcutil/gcs" - "github.com/btcsuite/btcutil/gcs/builder" - "github.com/btcsuite/btcwallet/waddrmgr" - "github.com/btcsuite/btcwallet/wtxmgr" - "github.com/lightninglabs/neutrino" - "github.com/lightninglabs/neutrino/headerfs" ) -// NeutrinoClient is an implementation of the btcwalet chain.Interface interface. -type NeutrinoClient struct { - CS *neutrino.ChainService - - chainParams *chaincfg.Params - - // We currently support one rescan/notifiction goroutine per client - rescan *neutrino.Rescan - - enqueueNotification chan interface{} - dequeueNotification chan interface{} - startTime time.Time - lastProgressSent bool - lastFilteredBlockHeader *wire.BlockHeader - currentBlock chan *waddrmgr.BlockStamp - - quit chan struct{} - rescanQuit chan struct{} - rescanErr <-chan error - wg sync.WaitGroup - started bool - scanning bool - finished bool - isRescan bool - - clientMtx sync.Mutex -} - -// NewNeutrinoClient creates a new NeutrinoClient struct with a backing -// ChainService. -func NewNeutrinoClient(chainParams *chaincfg.Params, - chainService *neutrino.ChainService) *NeutrinoClient { - - return &NeutrinoClient{ - CS: chainService, - chainParams: chainParams, - } -} - -// BackEnd returns the name of the driver. -func (s *NeutrinoClient) BackEnd() string { - return "neutrino" -} - -// Start replicates the RPC client's Start method. -func (s *NeutrinoClient) Start() error { - if err := s.CS.Start(); err != nil { - return fmt.Errorf("error starting chain service: %v", err) - } - - s.clientMtx.Lock() - defer s.clientMtx.Unlock() - if !s.started { - s.enqueueNotification = make(chan interface{}) - s.dequeueNotification = make(chan interface{}) - s.currentBlock = make(chan *waddrmgr.BlockStamp) - s.quit = make(chan struct{}) - s.started = true - s.wg.Add(1) - go func() { - select { - case s.enqueueNotification <- ClientConnected{}: - case <-s.quit: - } - }() - go s.notificationHandler() - } - return nil -} - -// Stop replicates the RPC client's Stop method. -func (s *NeutrinoClient) Stop() { - s.clientMtx.Lock() - defer s.clientMtx.Unlock() - if !s.started { - return - } - close(s.quit) - s.started = false -} - -// WaitForShutdown replicates the RPC client's WaitForShutdown method. -func (s *NeutrinoClient) WaitForShutdown() { - s.wg.Wait() -} - -// GetBlock replicates the RPC client's GetBlock command. -func (s *NeutrinoClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { - // TODO(roasbeef): add a block cache? - // * which evication strategy? depends on use case - // Should the block cache be INSIDE neutrino instead of in btcwallet? - block, err := s.CS.GetBlock(*hash) - if err != nil { - return nil, err - } - return block.MsgBlock(), nil -} - -// GetBlockHeight gets the height of a block by its hash. It serves as a -// replacement for the use of GetBlockVerboseTxAsync for the wallet package -// since we can't actually return a FutureGetBlockVerboseResult because the -// underlying type is private to rpcclient. -func (s *NeutrinoClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { - return s.CS.GetBlockHeight(hash) -} - -// GetBestBlock replicates the RPC client's GetBestBlock command. -func (s *NeutrinoClient) GetBestBlock() (*chainhash.Hash, int32, error) { - chainTip, err := s.CS.BestBlock() - if err != nil { - return nil, 0, err - } - - return &chainTip.Hash, chainTip.Height, nil -} - -// BlockStamp returns the latest block notified by the client, or an error -// if the client has been shut down. -func (s *NeutrinoClient) BlockStamp() (*waddrmgr.BlockStamp, error) { - select { - case bs := <-s.currentBlock: - return bs, nil - case <-s.quit: - return nil, errors.New("disconnected") - } -} - -// GetBlockHash returns the block hash for the given height, or an error if the -// client has been shut down or the hash at the block height doesn't exist or -// is unknown. -func (s *NeutrinoClient) GetBlockHash(height int64) (*chainhash.Hash, error) { - return s.CS.GetBlockHash(height) -} - -// GetBlockHeader returns the block header for the given block hash, or an error -// if the client has been shut down or the hash doesn't exist or is unknown. -func (s *NeutrinoClient) GetBlockHeader( - blockHash *chainhash.Hash) (*wire.BlockHeader, error) { - return s.CS.GetBlockHeader(blockHash) -} - -// IsCurrent returns whether the chain backend considers its view of the network -// as "current". -func (s *NeutrinoClient) IsCurrent() bool { - return s.CS.IsCurrent() -} - -// SendRawTransaction replicates the RPC client's SendRawTransaction command. -func (s *NeutrinoClient) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) ( - *chainhash.Hash, error) { - err := s.CS.SendTransaction(tx) - if err != nil { - return nil, err - } - hash := tx.TxHash() - return &hash, nil -} - -// FilterBlocks scans the blocks contained in the FilterBlocksRequest for any -// addresses of interest. For each requested block, the corresponding compact -// filter will first be checked for matches, skipping those that do not report -// anything. If the filter returns a positive match, the full block will be -// fetched and filtered. This method returns a FilterBlocksResponse for the first -// block containing a matching address. If no matches are found in the range of -// blocks requested, the returned response will be nil. -func (s *NeutrinoClient) FilterBlocks( - req *FilterBlocksRequest) (*FilterBlocksResponse, error) { - - blockFilterer := NewBlockFilterer(s.chainParams, req) - - // Construct the watchlist using the addresses and outpoints contained - // in the filter blocks request. - watchList, err := buildFilterBlocksWatchList(req) - if err != nil { - return nil, err - } - - // Iterate over the requested blocks, fetching the compact filter for - // each one, and matching it against the watchlist generated above. If - // the filter returns a positive match, the full block is then requested - // and scanned for addresses using the block filterer. - for i, blk := range req.Blocks { - // TODO(wilmer): Investigate why polling it still necessary - // here. While testing, I ran into a few instances where the - // filter was not retrieved, leading to a panic. This should not - // happen in most cases thanks to the query logic revamp within - // Neutrino, but it seems there's still an uncovered edge case. - filter, err := s.pollCFilter(&blk.Hash) - if err != nil { - return nil, err - } - - // Skip any empty filters. - if filter == nil || filter.N() == 0 { - continue - } - - key := builder.DeriveKey(&blk.Hash) - matched, err := filter.MatchAny(key, watchList) - if err != nil { - return nil, err - } else if !matched { - continue - } - - log.Infof("Fetching block height=%d hash=%v", - blk.Height, blk.Hash) - - // TODO(conner): can optimize bandwidth by only fetching - // stripped blocks - rawBlock, err := s.GetBlock(&blk.Hash) - if err != nil { - return nil, err - } - - if !blockFilterer.FilterBlock(rawBlock) { - continue - } - - // If any external or internal addresses were detected in this - // block, we return them to the caller so that the rescan - // windows can widened with subsequent addresses. The - // `BatchIndex` is returned so that the caller can compute the - // *next* block from which to begin again. - resp := &FilterBlocksResponse{ - BatchIndex: uint32(i), - BlockMeta: blk, - FoundExternalAddrs: blockFilterer.FoundExternal, - FoundInternalAddrs: blockFilterer.FoundInternal, - FoundOutPoints: blockFilterer.FoundOutPoints, - RelevantTxns: blockFilterer.RelevantTxns, - } - - return resp, nil - } - - // No addresses were found for this range. - return nil, nil -} - // buildFilterBlocksWatchList constructs a watchlist used for matching against a // cfilter from a FilterBlocksRequest. The watchlist will be populated with all // external addresses, internal addresses, and outpoints contained in the @@ -303,455 +47,3 @@ func buildFilterBlocksWatchList(req *FilterBlocksRequest) ([][]byte, error) { return watchList, nil } - -// pollCFilter attempts to fetch a CFilter from the neutrino client. This is -// used to get around the fact that the filter headers may lag behind the -// highest known block header. -func (s *NeutrinoClient) pollCFilter(hash *chainhash.Hash) (*gcs.Filter, error) { - var ( - filter *gcs.Filter - err error - count int - ) - - const maxFilterRetries = 50 - for count < maxFilterRetries { - if count > 0 { - time.Sleep(100 * time.Millisecond) - } - - filter, err = s.CS.GetCFilter( - *hash, wire.GCSFilterRegular, neutrino.OptimisticBatch(), - ) - if err != nil { - count++ - continue - } - - return filter, nil - } - - return nil, err -} - -// Rescan replicates the RPC client's Rescan command. -func (s *NeutrinoClient) Rescan(startHash *chainhash.Hash, addrs []btcutil.Address, - outPoints map[wire.OutPoint]btcutil.Address) error { - - s.clientMtx.Lock() - if !s.started { - s.clientMtx.Unlock() - return fmt.Errorf("can't do a rescan when the chain client " + - "is not started") - } - if s.scanning { - // Restart the rescan by killing the existing rescan. - close(s.rescanQuit) - rescan := s.rescan - s.clientMtx.Unlock() - rescan.WaitForShutdown() - s.clientMtx.Lock() - s.rescan = nil - s.rescanErr = nil - } - s.rescanQuit = make(chan struct{}) - s.scanning = true - s.finished = false - s.lastProgressSent = false - s.lastFilteredBlockHeader = nil - s.isRescan = true - s.clientMtx.Unlock() - - bestBlock, err := s.CS.BestBlock() - if err != nil { - return fmt.Errorf("can't get chain service's best block: %s", err) - } - header, err := s.CS.GetBlockHeader(&bestBlock.Hash) - if err != nil { - return fmt.Errorf("can't get block header for hash %v: %s", - bestBlock.Hash, err) - } - - // If the wallet is already fully caught up, or the rescan has started - // with state that indicates a "fresh" wallet, we'll send a - // notification indicating the rescan has "finished". - if header.BlockHash() == *startHash { - s.clientMtx.Lock() - s.finished = true - rescanQuit := s.rescanQuit - s.clientMtx.Unlock() - - // Release the lock while dispatching the notification since - // it's possible for the notificationHandler to be waiting to - // acquire it before receiving the notification. - select { - case s.enqueueNotification <- &RescanFinished{ - Hash: startHash, - Height: bestBlock.Height, - Time: header.Timestamp, - }: - case <-s.quit: - return nil - case <-rescanQuit: - return nil - } - } - - var inputsToWatch []neutrino.InputWithScript - for op, addr := range outPoints { - addrScript, err := txscript.PayToAddrScript(addr) - if err != nil { - return err - } - - inputsToWatch = append(inputsToWatch, neutrino.InputWithScript{ - OutPoint: op, - PkScript: addrScript, - }) - } - - s.clientMtx.Lock() - newRescan := neutrino.NewRescan( - &neutrino.RescanChainSource{ - ChainService: s.CS, - }, - neutrino.NotificationHandlers(rpcclient.NotificationHandlers{ - OnBlockConnected: s.onBlockConnected, - OnFilteredBlockConnected: s.onFilteredBlockConnected, - OnBlockDisconnected: s.onBlockDisconnected, - }), - neutrino.StartBlock(&headerfs.BlockStamp{Hash: *startHash}), - neutrino.StartTime(s.startTime), - neutrino.QuitChan(s.rescanQuit), - neutrino.WatchAddrs(addrs...), - neutrino.WatchInputs(inputsToWatch...), - ) - s.rescan = newRescan - s.rescanErr = s.rescan.Start() - s.clientMtx.Unlock() - - return nil -} - -// NotifyBlocks replicates the RPC client's NotifyBlocks command. -func (s *NeutrinoClient) NotifyBlocks() error { - s.clientMtx.Lock() - // If we're scanning, we're already notifying on blocks. Otherwise, - // start a rescan without watching any addresses. - if !s.scanning { - s.clientMtx.Unlock() - return s.NotifyReceived([]btcutil.Address{}) - } - s.clientMtx.Unlock() - return nil -} - -// NotifyReceived replicates the RPC client's NotifyReceived command. -func (s *NeutrinoClient) NotifyReceived(addrs []btcutil.Address) error { - s.clientMtx.Lock() - - // If we have a rescan running, we just need to add the appropriate - // addresses to the watch list. - if s.scanning { - s.clientMtx.Unlock() - return s.rescan.Update(neutrino.AddAddrs(addrs...)) - } - - s.rescanQuit = make(chan struct{}) - s.scanning = true - - // Don't need RescanFinished or RescanProgress notifications. - s.finished = true - s.lastProgressSent = true - s.lastFilteredBlockHeader = nil - - // Rescan with just the specified addresses. - newRescan := neutrino.NewRescan( - &neutrino.RescanChainSource{ - ChainService: s.CS, - }, - neutrino.NotificationHandlers(rpcclient.NotificationHandlers{ - OnBlockConnected: s.onBlockConnected, - OnFilteredBlockConnected: s.onFilteredBlockConnected, - OnBlockDisconnected: s.onBlockDisconnected, - }), - neutrino.StartTime(s.startTime), - neutrino.QuitChan(s.rescanQuit), - neutrino.WatchAddrs(addrs...), - ) - s.rescan = newRescan - s.rescanErr = s.rescan.Start() - s.clientMtx.Unlock() - return nil -} - -// Notifications replicates the RPC client's Notifications method. -func (s *NeutrinoClient) Notifications() <-chan interface{} { - return s.dequeueNotification -} - -// SetStartTime is a non-interface method to set the birthday of the wallet -// using this object. Since only a single rescan at a time is currently -// supported, only one birthday needs to be set. This does not fully restart a -// running rescan, so should not be used to update a rescan while it is running. -// TODO: When factoring out to multiple rescans per Neutrino client, add a -// birthday per client. -func (s *NeutrinoClient) SetStartTime(startTime time.Time) { - s.clientMtx.Lock() - defer s.clientMtx.Unlock() - - s.startTime = startTime -} - -// onFilteredBlockConnected sends appropriate notifications to the notification -// channel. -func (s *NeutrinoClient) onFilteredBlockConnected(height int32, - header *wire.BlockHeader, relevantTxs []*btcutil.Tx) { - ntfn := FilteredBlockConnected{ - Block: &wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - Hash: header.BlockHash(), - Height: height, - }, - Time: header.Timestamp, - }, - } - for _, tx := range relevantTxs { - rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(), - header.Timestamp) - if err != nil { - log.Errorf("Cannot create transaction record for "+ - "relevant tx: %s", err) - // TODO(aakselrod): Return? - continue - } - ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec) - } - - select { - case s.enqueueNotification <- ntfn: - case <-s.quit: - return - case <-s.rescanQuit: - return - } - - s.clientMtx.Lock() - s.lastFilteredBlockHeader = header - s.clientMtx.Unlock() - - // Handle RescanFinished notification if required. - s.dispatchRescanFinished() -} - -// onBlockDisconnected sends appropriate notifications to the notification -// channel. -func (s *NeutrinoClient) onBlockDisconnected(hash *chainhash.Hash, height int32, - t time.Time) { - select { - case s.enqueueNotification <- BlockDisconnected{ - Block: wtxmgr.Block{ - Hash: *hash, - Height: height, - }, - Time: t, - }: - case <-s.quit: - case <-s.rescanQuit: - } -} - -func (s *NeutrinoClient) onBlockConnected(hash *chainhash.Hash, height int32, - time time.Time) { - // TODO: Move this closure out and parameterize it? Is it useful - // outside here? - sendRescanProgress := func() { - select { - case s.enqueueNotification <- &RescanProgress{ - Hash: hash, - Height: height, - Time: time, - }: - case <-s.quit: - case <-s.rescanQuit: - } - } - // Only send BlockConnected notification if we're processing blocks - // before the birthday. Otherwise, we can just update using - // RescanProgress notifications. - if time.Before(s.startTime) { - // Send a RescanProgress notification every 10K blocks. - if height%10000 == 0 { - s.clientMtx.Lock() - shouldSend := s.isRescan && !s.finished - s.clientMtx.Unlock() - if shouldSend { - sendRescanProgress() - } - } - } else { - // Send a RescanProgress notification if we're just going over - // the boundary between pre-birthday and post-birthday blocks, - // and note that we've sent it. - s.clientMtx.Lock() - if !s.lastProgressSent { - shouldSend := s.isRescan && !s.finished - if shouldSend { - s.clientMtx.Unlock() - sendRescanProgress() - s.clientMtx.Lock() - s.lastProgressSent = true - } - } - s.clientMtx.Unlock() - select { - case s.enqueueNotification <- BlockConnected{ - Block: wtxmgr.Block{ - Hash: *hash, - Height: height, - }, - Time: time, - }: - case <-s.quit: - case <-s.rescanQuit: - } - } - - // Check if we're able to dispatch our final RescanFinished notification - // after processing this block. - s.dispatchRescanFinished() -} - -// dispatchRescanFinished determines whether we're able to dispatch our final -// RescanFinished notification in order to mark the wallet as synced with the -// chain. If the notification has already been dispatched, then it won't be done -// again. -func (s *NeutrinoClient) dispatchRescanFinished() { - bs, err := s.CS.BestBlock() - if err != nil { - log.Errorf("Can't get chain service's best block: %s", err) - return - } - - s.clientMtx.Lock() - // Only send the RescanFinished notification once. - if s.lastFilteredBlockHeader == nil || s.finished { - s.clientMtx.Unlock() - return - } - - // Only send the RescanFinished notification once the underlying chain - // service sees itself as current. - if bs.Hash != s.lastFilteredBlockHeader.BlockHash() { - s.clientMtx.Unlock() - return - } - - s.finished = s.CS.IsCurrent() && s.lastProgressSent - if !s.finished { - s.clientMtx.Unlock() - return - } - - header := s.lastFilteredBlockHeader - s.clientMtx.Unlock() - - select { - case s.enqueueNotification <- &RescanFinished{ - Hash: &bs.Hash, - Height: bs.Height, - Time: header.Timestamp, - }: - case <-s.quit: - return - case <-s.rescanQuit: - return - } -} - -// notificationHandler queues and dequeues notifications. There are currently -// no bounds on the queue, so the dequeue channel should be read continually to -// avoid running out of memory. -func (s *NeutrinoClient) notificationHandler() { - hash, height, err := s.GetBestBlock() - if err != nil { - log.Errorf("Failed to get best block from chain service: %s", - err) - s.Stop() - s.wg.Done() - return - } - - bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height} - - // TODO: Rather than leaving this as an unbounded queue for all types of - // notifications, try dropping ones where a later enqueued notification - // can fully invalidate one waiting to be processed. For example, - // blockconnected notifications for greater block heights can remove the - // need to process earlier blockconnected notifications still waiting - // here. - - var notifications []interface{} - enqueue := s.enqueueNotification - var dequeue chan interface{} - var next interface{} -out: - for { - s.clientMtx.Lock() - rescanErr := s.rescanErr - s.clientMtx.Unlock() - select { - case n, ok := <-enqueue: - if !ok { - // If no notifications are queued for handling, - // the queue is finished. - if len(notifications) == 0 { - break out - } - // nil channel so no more reads can occur. - enqueue = nil - continue - } - if len(notifications) == 0 { - next = n - dequeue = s.dequeueNotification - } - notifications = append(notifications, n) - - case dequeue <- next: - if n, ok := next.(BlockConnected); ok { - bs = &waddrmgr.BlockStamp{ - Height: n.Height, - Hash: n.Hash, - } - } - - notifications[0] = nil - notifications = notifications[1:] - if len(notifications) != 0 { - next = notifications[0] - } else { - // If no more notifications can be enqueued, the - // queue is finished. - if enqueue == nil { - break out - } - dequeue = nil - } - - case err := <-rescanErr: - if err != nil { - log.Errorf("Neutrino rescan ended with error: %s", err) - } - - case s.currentBlock <- bs: - - case <-s.quit: - break out - } - } - - s.Stop() - close(s.dequeueNotification) - s.wg.Done() -} diff --git a/chain/pruned_block_dispatcher.go b/chain/pruned_block_dispatcher.go deleted file mode 100644 index 4a80a5f..0000000 --- a/chain/pruned_block_dispatcher.go +++ /dev/null @@ -1,666 +0,0 @@ -package chain - -import ( - "encoding/binary" - "encoding/hex" - "errors" - "fmt" - "math/rand" - "net" - "sync" - "time" - - "github.com/btcsuite/btcd/blockchain" - "github.com/btcsuite/btcd/btcjson" - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/peer" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" - "github.com/lightninglabs/neutrino/query" - "github.com/lightningnetwork/lnd/ticker" -) - -const ( - // defaultRefreshPeersInterval represents the default polling interval - // at which we attempt to refresh the set of known peers. - defaultRefreshPeersInterval = 30 * time.Second - - // defaultPeerReadyTimeout is the default amount of time we'll wait for - // a query peer to be ready to receive incoming block requests. Peers - // cannot respond to requests until the version exchange is completed - // upon connection establishment. - defaultPeerReadyTimeout = 15 * time.Second - - // requiredServices are the requires services we require any candidate - // peers to signal such that we can retrieve pruned blocks from them. - requiredServices = wire.SFNodeNetwork | wire.SFNodeWitness - - // prunedNodeService is the service bit signaled by pruned nodes on the - // network. Note that this service bit can also be signaled by full - // nodes, except that they also signal wire.SFNodeNetwork, where as - // pruned nodes don't. - prunedNodeService wire.ServiceFlag = 1 << 10 -) - -// queryPeer represents a Bitcoin network peer that we'll query for blocks. -// The ready channel serves as a signal for us to know when we can be sending -// queries to the peer. Any messages received from the peer are sent through the -// msgsRecvd channel. -type queryPeer struct { - *peer.Peer - ready chan struct{} - msgsRecvd chan wire.Message - quit chan struct{} -} - -// signalUponDisconnect closes the peer's quit chan to signal it has -// disconnected. -func (p *queryPeer) signalUponDisconnect(f func()) { - go func() { - p.WaitForDisconnect() - close(p.quit) - f() - }() -} - -// SubscribeRecvMsg adds a OnRead subscription to the peer. All bitcoin messages -// received from this peer will be sent on the returned channel. A closure is -// also returned, that should be called to cancel the subscription. -// -// NOTE: This method exists to satisfy the query.Peer interface. -func (p *queryPeer) SubscribeRecvMsg() (<-chan wire.Message, func()) { - return p.msgsRecvd, func() {} -} - -// OnDisconnect returns a channel that will be closed once the peer disconnects. -// -// NOTE: This method exists to satisfy the query.Peer interface. -func (p *queryPeer) OnDisconnect() <-chan struct{} { - return p.quit -} - -// PrunedBlockDispatcherConfig encompasses all of the dependencies required by -// the PrunedBlockDispatcher to carry out its duties. -type PrunedBlockDispatcherConfig struct { - // ChainParams represents the parameters of the current active chain. - ChainParams *chaincfg.Params - - // NumTargetPeer represents the target number of peers we should - // maintain connections with. This exists to prevent establishing - // connections to all of the bitcoind's peers, which would be - // unnecessary and ineffecient. - NumTargetPeers int - - // Dial establishes connections to Bitcoin peers. This must support - // dialing peers running over Tor if the backend also supports it. - Dial func(string) (net.Conn, error) - - // GetPeers retrieves the active set of peers known to the backend node. - GetPeers func() ([]btcjson.GetPeerInfoResult, error) - - // GetNodeAddresses returns random reachable addresses known to the - // backend node. An optional number of addresses to return can be - // provided, otherwise 8 are returned by default. - GetNodeAddresses func(*int32) ([]btcjson.GetNodeAddressesResult, error) - - // PeerReadyTimeout is the amount of time we'll wait for a query peer to - // be ready to receive incoming block requests. Peers cannot respond to - // requests until the version exchange is completed upon connection - // establishment. - PeerReadyTimeout time.Duration - - // RefreshPeersTicker is the polling ticker that signals us when we - // should attempt to refresh the set of known peers. - RefreshPeersTicker ticker.Ticker - - // AllowSelfPeerConns is only used to allow the tests to bypass the peer - // self connection detecting and disconnect logic since they - // intentionally do so for testing purposes. - AllowSelfPeerConns bool - - // MaxRequestInvs dictates how many invs we should fit in a single - // getdata request to a peer. This only exists to facilitate the testing - // of a request spanning multiple getdata messages. - MaxRequestInvs int -} - -// PrunedBlockDispatcher enables a chain client to request blocks that the -// server has already pruned. This is done by connecting to the server's full -// node peers and querying them directly. Ideally, this is a capability -// supported by the server, though this is not yet possible with bitcoind. -type PrunedBlockDispatcher struct { - cfg PrunedBlockDispatcherConfig - - // workManager handles satisfying all of our incoming pruned block - // requests. - workManager *query.WorkManager - - // blocksQueried represents the set of pruned blocks we've been - // requested to query. Each block maps to a list of clients waiting to - // be notified once the block is received. - // - // NOTE: The blockMtx lock must always be held when accessing this - // field. - blocksQueried map[chainhash.Hash][]chan *wire.MsgBlock - blockMtx sync.Mutex - - // currentPeers represents the set of peers we're currently connected - // to. Each peer found here will have a worker spawned within the - // workManager to handle our queries. - // - // NOTE: The peerMtx lock must always be held when accessing this - // field. - currentPeers map[string]*peer.Peer - - // bannedPeers represents the set of peers who have sent us an invalid - // reply corresponding to a query. Peers within this set should not be - // dialed. - // - // NOTE: The peerMtx lock must always be held when accessing this - // field. - bannedPeers map[string]struct{} - peerMtx sync.Mutex - - // peersConnected is the channel through which we'll send new peers - // we've established connections to. - peersConnected chan query.Peer - - // timeSource provides a mechanism to add several time samples which are - // used to determine a median time which is then used as an offset to - // the local clock when validating blocks received from peers. - timeSource blockchain.MedianTimeSource - - quit chan struct{} - wg sync.WaitGroup -} - -// NewPrunedBlockDispatcher initializes a new PrunedBlockDispatcher instance -// backed by the given config. -func NewPrunedBlockDispatcher(cfg *PrunedBlockDispatcherConfig) ( - *PrunedBlockDispatcher, error) { - - if cfg.NumTargetPeers < 1 { - return nil, errors.New("config option NumTargetPeer must be >= 1") - } - if cfg.MaxRequestInvs > wire.MaxInvPerMsg { - return nil, fmt.Errorf("config option MaxRequestInvs must be "+ - "<= %v", wire.MaxInvPerMsg) - } - - peersConnected := make(chan query.Peer) - return &PrunedBlockDispatcher{ - cfg: *cfg, - workManager: query.New(&query.Config{ - ConnectedPeers: func() (<-chan query.Peer, func(), error) { - return peersConnected, func() {}, nil - }, - NewWorker: query.NewWorker, - Ranking: query.NewPeerRanking(), - }), - blocksQueried: make(map[chainhash.Hash][]chan *wire.MsgBlock), - currentPeers: make(map[string]*peer.Peer), - bannedPeers: make(map[string]struct{}), - peersConnected: peersConnected, - timeSource: blockchain.NewMedianTime(), - quit: make(chan struct{}), - }, nil -} - -// Start allows the PrunedBlockDispatcher to begin handling incoming block -// requests. -func (d *PrunedBlockDispatcher) Start() error { - log.Tracef("Starting pruned block dispatcher") - - if err := d.workManager.Start(); err != nil { - return err - } - - d.wg.Add(1) - go d.pollPeers() - - return nil -} - -// Stop stops the PrunedBlockDispatcher from accepting any more incoming block -// requests. -func (d *PrunedBlockDispatcher) Stop() { - log.Tracef("Stopping pruned block dispatcher") - - close(d.quit) - d.wg.Wait() - - _ = d.workManager.Stop() -} - -// pollPeers continuously polls the backend node for new peers to establish -// connections to. -func (d *PrunedBlockDispatcher) pollPeers() { - defer d.wg.Done() - - if err := d.connectToPeers(); err != nil { - log.Warnf("Unable to establish peer connections: %v", err) - } - - d.cfg.RefreshPeersTicker.Resume() - defer d.cfg.RefreshPeersTicker.Stop() - - for { - select { - case <-d.cfg.RefreshPeersTicker.Ticks(): - // Quickly determine if we need any more peer - // connections. If we don't, we'll wait for our next - // tick. - d.peerMtx.Lock() - peersNeeded := d.cfg.NumTargetPeers - len(d.currentPeers) - d.peerMtx.Unlock() - if peersNeeded <= 0 { - continue - } - - // If we do, attempt to establish connections until - // we've reached our target number. - if err := d.connectToPeers(); err != nil { - log.Warnf("Failed to establish peer "+ - "connections: %v", err) - continue - } - - case <-d.quit: - return - } - } -} - -// connectToPeers attempts to establish new peer connections until the target -// number is reached. Once a connection is successfully established, the peer is -// sent through the peersConnected channel to notify the internal workManager. -func (d *PrunedBlockDispatcher) connectToPeers() error { - // Refresh the list of peers our backend is currently connected to, and - // filter out any that do not meet our requirements. - peers, err := d.cfg.GetPeers() - if err != nil { - return err - } - addrs, err := filterPeers(peers) - if err != nil { - return err - } - rand.Shuffle(len(addrs), func(i, j int) { - addrs[i], addrs[j] = addrs[j], addrs[i] - }) - - for _, addr := range addrs { - needMore, err := d.connectToPeer(addr) - if err != nil { - log.Debugf("Failed connecting to peer %v: %v", addr, err) - continue - } - if !needMore { - return nil - } - } - - // We still need more addresses so we'll also invoke the - // `getnodeaddresses` RPC to receive random reachable addresses. We'll - // also filter out any that do not meet our requirements. The nil - // argument will return a default number of addresses, which is - // currently 8. We don't care how many addresses are returned as long as - // 1 is returned, since this will be polled regularly if needed. - nodeAddrs, err := d.cfg.GetNodeAddresses(nil) - if err != nil { - return err - } - addrs = filterNodeAddrs(nodeAddrs) - for _, addr := range addrs { - if _, err := d.connectToPeer(addr); err != nil { - log.Debugf("Failed connecting to peer %v: %v", addr, err) - } - } - - return nil -} - -// connectToPeer attempts to establish a connection to the given peer and waits -// up to PeerReadyTimeout for the version exchange to complete so that we can -// begin sending it our queries. -func (d *PrunedBlockDispatcher) connectToPeer(addr string) (bool, error) { - // Prevent connections to peers we've already connected to or we've - // banned. - d.peerMtx.Lock() - _, isBanned := d.bannedPeers[addr] - _, isConnected := d.currentPeers[addr] - d.peerMtx.Unlock() - if isBanned || isConnected { - return true, nil - } - - peer, err := d.newQueryPeer(addr) - if err != nil { - return true, fmt.Errorf("unable to configure query peer %v: "+ - "%v", addr, err) - } - - // Establish the connection and wait for the protocol negotiation to - // complete. - conn, err := d.cfg.Dial(addr) - if err != nil { - return true, err - } - peer.AssociateConnection(conn) - - select { - case <-peer.ready: - case <-time.After(d.cfg.PeerReadyTimeout): - peer.Disconnect() - return true, errors.New("timed out waiting for protocol negotiation") - case <-d.quit: - return false, errors.New("shutting down") - } - - // Remove the peer once it has disconnected. - peer.signalUponDisconnect(func() { - d.peerMtx.Lock() - delete(d.currentPeers, peer.Addr()) - d.peerMtx.Unlock() - }) - - d.peerMtx.Lock() - d.currentPeers[addr] = peer.Peer - numPeers := len(d.currentPeers) - d.peerMtx.Unlock() - - // Notify the new peer connection to our workManager. - select { - case d.peersConnected <- peer: - case <-d.quit: - return false, errors.New("shutting down") - } - - // Request more peer connections if we haven't reached our target number - // with the new peer. - return numPeers < d.cfg.NumTargetPeers, nil -} - -// filterPeers filters out any peers which cannot handle arbitrary witness block -// requests, i.e., any peer which is not considered a segwit-enabled -// "full-node". -func filterPeers(peers []btcjson.GetPeerInfoResult) ([]string, error) { - var eligible []string - for _, peer := range peers { - rawServices, err := hex.DecodeString(peer.Services) - if err != nil { - return nil, err - } - services := wire.ServiceFlag(binary.BigEndian.Uint64(rawServices)) - if !satisfiesRequiredServices(services) { - continue - } - eligible = append(eligible, peer.Addr) - } - return eligible, nil -} - -// filterNodeAddrs filters out any peers which cannot handle arbitrary witness -// block requests, i.e., any peer which is not considered a segwit-enabled -// "full-node". -func filterNodeAddrs(nodeAddrs []btcjson.GetNodeAddressesResult) []string { - var eligible []string - for _, nodeAddr := range nodeAddrs { - services := wire.ServiceFlag(nodeAddr.Services) - if !satisfiesRequiredServices(services) { - continue - } - eligible = append(eligible, nodeAddr.Address) - } - return eligible -} - -// satisfiesRequiredServices determines whether the services signaled by a peer -// satisfy our requirements for retrieving pruned blocks from them. We need the -// full chain, and witness data as well. Note that we ignore the limited -// (pruned bit) as nodes can have the full data and set that as well. Pure -// pruned nodes won't set the network bit. -func satisfiesRequiredServices(services wire.ServiceFlag) bool { - return services&requiredServices == requiredServices -} - -// newQueryPeer creates a new peer instance configured to relay any received -// messages to the internal workManager. -func (d *PrunedBlockDispatcher) newQueryPeer(addr string) (*queryPeer, error) { - ready := make(chan struct{}) - msgsRecvd := make(chan wire.Message) - - cfg := &peer.Config{ - ChainParams: d.cfg.ChainParams, - // We're not interested in transactions, so disable their relay. - DisableRelayTx: true, - Listeners: peer.MessageListeners{ - // Add the remote peer time as a sample for creating an - // offset against the local clock to keep the network - // time in sync. - OnVersion: func(p *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject { - d.timeSource.AddTimeSample(p.Addr(), msg.Timestamp) - return nil - }, - // Register a callback to signal us when we can start - // querying the peer for blocks. - OnVerAck: func(*peer.Peer, *wire.MsgVerAck) { - close(ready) - }, - // Register a callback to signal us whenever the peer - // has sent us a block message. - OnRead: func(p *peer.Peer, _ int, msg wire.Message, err error) { - if err != nil { - return - } - - var block *wire.MsgBlock - switch msg := msg.(type) { - case *wire.MsgBlock: - block = msg - case *wire.MsgVersion, *wire.MsgVerAck, - *wire.MsgPing, *wire.MsgPong: - return - default: - log.Debugf("Received unexpected message "+ - "%T from peer %v", msg, p.Addr()) - return - } - - select { - case msgsRecvd <- block: - case <-d.quit: - } - }, - }, - AllowSelfConns: true, - } - p, err := peer.NewOutboundPeer(cfg, addr) - if err != nil { - return nil, err - } - - return &queryPeer{ - Peer: p, - ready: ready, - msgsRecvd: msgsRecvd, - quit: make(chan struct{}), - }, nil -} - -// banPeer bans a peer by disconnecting them and ensuring we don't reconnect. -func (d *PrunedBlockDispatcher) banPeer(peer string) { - d.peerMtx.Lock() - defer d.peerMtx.Unlock() - - d.bannedPeers[peer] = struct{}{} - if p, ok := d.currentPeers[peer]; ok { - p.Disconnect() - } -} - -// Query submits a request to query the information of the given blocks. -func (d *PrunedBlockDispatcher) Query(blocks []*chainhash.Hash, - opts ...query.QueryOption) (<-chan *wire.MsgBlock, <-chan error) { - - reqs, blockChan, err := d.newRequest(blocks) - if err != nil { - errChan := make(chan error, 1) - errChan <- err - return nil, errChan - } - - var errChan chan error - if len(reqs) > 0 { - errChan = d.workManager.Query(reqs, opts...) - } - return blockChan, errChan -} - -// newRequest construct a new query request for the given blocks to submit to -// the internal workManager. A channel is also returned through which the -// requested blocks are sent through. -func (d *PrunedBlockDispatcher) newRequest(blocks []*chainhash.Hash) ( - []*query.Request, <-chan *wire.MsgBlock, error) { - - // Make sure the channel is buffered enough to handle all blocks. - blockChan := make(chan *wire.MsgBlock, len(blocks)) - - d.blockMtx.Lock() - defer d.blockMtx.Unlock() - - // Each GetData message can only include up to MaxRequestInvs invs, - // and each block consumes a single inv. - var ( - reqs []*query.Request - getData *wire.MsgGetData - ) - for i, block := range blocks { - if getData == nil { - getData = wire.NewMsgGetData() - } - - if _, ok := d.blocksQueried[*block]; !ok { - log.Debugf("Queuing new block %v for request", *block) - inv := wire.NewInvVect(wire.InvTypeBlock, block) - if err := getData.AddInvVect(inv); err != nil { - return nil, nil, err - } - } else { - log.Debugf("Received new request for pending query of "+ - "block %v", *block) - } - - d.blocksQueried[*block] = append( - d.blocksQueried[*block], blockChan, - ) - - // If we have any invs to request, or we've reached the maximum - // allowed, queue the getdata message as is, and proceed to the - // next if any. - if (len(getData.InvList) > 0 && i == len(blocks)-1) || - len(getData.InvList) == d.cfg.MaxRequestInvs { - - reqs = append(reqs, &query.Request{ - Req: getData, - HandleResp: d.handleResp, - }) - getData = nil - } - } - - return reqs, blockChan, nil -} - -// handleResp is a response handler that will be called for every message -// received from the peer that the request was made to. It should validate the -// response against the request made, and return a Progress indicating whether -// the request was answered by this particular response. -// -// NOTE: Since the worker's job queue will be stalled while this method is -// running, it should not be doing any expensive operations. It should validate -// the response and immediately return the progress. The response should be -// handed off to another goroutine for processing. -func (d *PrunedBlockDispatcher) handleResp(req, resp wire.Message, - peer string) query.Progress { - - // We only expect MsgBlock as replies. - block, ok := resp.(*wire.MsgBlock) - if !ok { - return query.Progress{ - Progressed: false, - Finished: false, - } - } - - // We only serve MsgGetData requests. - getData, ok := req.(*wire.MsgGetData) - if !ok { - return query.Progress{ - Progressed: false, - Finished: false, - } - } - - // Check that we've actually queried for this block and validate it. - blockHash := block.BlockHash() - d.blockMtx.Lock() - blockChans, ok := d.blocksQueried[blockHash] - if !ok { - d.blockMtx.Unlock() - return query.Progress{ - Progressed: false, - Finished: false, - } - } - - err := blockchain.CheckBlockSanity( - btcutil.NewBlock(block), d.cfg.ChainParams.PowLimit, - d.timeSource, - ) - if err != nil { - d.blockMtx.Unlock() - - log.Warnf("Received invalid block %v from peer %v: %v", - blockHash, peer, err) - d.banPeer(peer) - - return query.Progress{ - Progressed: false, - Finished: false, - } - } - - // Once validated, we can safely remove it. - delete(d.blocksQueried, blockHash) - - // Check whether we have any other pending blocks we've yet to receive. - // If we do, we'll mark the response as progressing our query, but not - // completing it yet. - progress := query.Progress{Progressed: true, Finished: true} - for _, inv := range getData.InvList { - if _, ok := d.blocksQueried[inv.Hash]; ok { - progress.Finished = false - break - } - } - d.blockMtx.Unlock() - - // Launch a goroutine to notify all clients of the block as we don't - // want to potentially block our workManager. - d.wg.Add(1) - go func() { - defer d.wg.Done() - - for _, blockChan := range blockChans { - select { - case blockChan <- block: - case <-d.quit: - return - } - } - }() - - return progress -} diff --git a/chain/pruned_block_dispatcher_test.go b/chain/pruned_block_dispatcher_test.go deleted file mode 100644 index 50af406..0000000 --- a/chain/pruned_block_dispatcher_test.go +++ /dev/null @@ -1,659 +0,0 @@ -package chain - -import ( - "encoding/binary" - "encoding/hex" - "fmt" - "net" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/btcsuite/btcd/btcjson" - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/peer" - "github.com/btcsuite/btcd/wire" - "github.com/lightningnetwork/lnd/ticker" - "github.com/stretchr/testify/require" -) - -var ( - addrCounter int32 // Increased atomically. - - chainParams = chaincfg.RegressionNetParams -) - -func nextAddr() string { - port := atomic.AddInt32(&addrCounter, 1) - return fmt.Sprintf("10.0.0.1:%d", port) -} - -// prunedBlockDispatcherHarness is a harness used to facilitate the testing of the -// PrunedBlockDispatcher. -type prunedBlockDispatcherHarness struct { - t *testing.T - - dispatcher *PrunedBlockDispatcher - - hashes []*chainhash.Hash - blocks map[chainhash.Hash]*wire.MsgBlock - - peerMtx sync.Mutex - peers map[string]*peer.Peer - fallbackAddrs map[string]*peer.Peer - localConns map[string]net.Conn // Connections to peers. - remoteConns map[string]net.Conn // Connections from peers. - - dialedPeer chan string - queriedPeer chan struct{} - blocksQueried map[chainhash.Hash]int - - shouldReply uint32 // 0 == true, 1 == false, 2 == invalid reply -} - -// newNetworkBlockTestHarness initializes a new PrunedBlockDispatcher test harness -// backed by a custom chain and peers. -func newNetworkBlockTestHarness(t *testing.T, numBlocks, - numPeers, numWorkers uint32) *prunedBlockDispatcherHarness { - - h := &prunedBlockDispatcherHarness{ - t: t, - dispatcher: &PrunedBlockDispatcher{}, - peers: make(map[string]*peer.Peer, numPeers), - fallbackAddrs: make(map[string]*peer.Peer, numPeers), - localConns: make(map[string]net.Conn, numPeers), - remoteConns: make(map[string]net.Conn, numPeers), - dialedPeer: make(chan string), - queriedPeer: make(chan struct{}), - blocksQueried: make(map[chainhash.Hash]int), - shouldReply: 0, - } - - h.hashes, h.blocks = genBlockChain(numBlocks) - for i := uint32(0); i < numPeers; i++ { - h.addPeer(false) - } - - dial := func(addr string) (net.Conn, error) { - go func() { - h.dialedPeer <- addr - }() - - h.peerMtx.Lock() - defer h.peerMtx.Unlock() - - localConn, ok := h.localConns[addr] - if !ok { - return nil, fmt.Errorf("local conn %v not found", addr) - } - remoteConn, ok := h.remoteConns[addr] - if !ok { - return nil, fmt.Errorf("remote conn %v not found", addr) - } - - if p, ok := h.peers[addr]; ok { - p.AssociateConnection(remoteConn) - } - if p, ok := h.fallbackAddrs[addr]; ok { - p.AssociateConnection(remoteConn) - } - return localConn, nil - } - - var err error - h.dispatcher, err = NewPrunedBlockDispatcher(&PrunedBlockDispatcherConfig{ - ChainParams: &chainParams, - NumTargetPeers: int(numWorkers), - Dial: dial, - GetPeers: func() ([]btcjson.GetPeerInfoResult, error) { - h.peerMtx.Lock() - defer h.peerMtx.Unlock() - - res := make([]btcjson.GetPeerInfoResult, 0, len(h.peers)) - for addr, peer := range h.peers { - var rawServices [8]byte - binary.BigEndian.PutUint64( - rawServices[:], uint64(peer.Services()), - ) - - res = append(res, btcjson.GetPeerInfoResult{ - Addr: addr, - Services: hex.EncodeToString(rawServices[:]), - }) - } - - return res, nil - }, - GetNodeAddresses: func(*int32) ([]btcjson.GetNodeAddressesResult, error) { - h.peerMtx.Lock() - defer h.peerMtx.Unlock() - - res := make( - []btcjson.GetNodeAddressesResult, 0, - len(h.fallbackAddrs), - ) - for addr, peer := range h.fallbackAddrs { - res = append(res, btcjson.GetNodeAddressesResult{ - Services: uint64(peer.Services()), - Address: addr, - }) - } - return res, nil - }, - PeerReadyTimeout: time.Hour, - RefreshPeersTicker: ticker.NewForce(time.Hour), - AllowSelfPeerConns: true, - MaxRequestInvs: wire.MaxInvPerMsg, - }) - require.NoError(t, err) - - return h -} - -// start starts the PrunedBlockDispatcher and asserts that connections are made -// to all available peers. -func (h *prunedBlockDispatcherHarness) start() { - h.t.Helper() - - err := h.dispatcher.Start() - require.NoError(h.t, err) - - h.peerMtx.Lock() - numPeers := len(h.peers) - h.peerMtx.Unlock() - - for i := 0; i < numPeers; i++ { - h.assertPeerDialed() - } -} - -// stop stops the PrunedBlockDispatcher and asserts that all internal fields of -// the harness have been properly consumed. -func (h *prunedBlockDispatcherHarness) stop() { - h.dispatcher.Stop() - - select { - case <-h.dialedPeer: - h.t.Fatal("did not consume all dialedPeer signals") - default: - } - - select { - case <-h.queriedPeer: - h.t.Fatal("did not consume all queriedPeer signals") - default: - } - - require.Empty(h.t, h.blocksQueried) -} - -// addPeer adds a new random peer available for use by the -// PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) addPeer(fallback bool) string { - addr := nextAddr() - - h.peerMtx.Lock() - defer h.peerMtx.Unlock() - - h.resetPeer(addr, fallback) - return addr -} - -// resetPeer resets the internal peer connection state allowing the -// PrunedBlockDispatcher to establish a mock connection to it. -func (h *prunedBlockDispatcherHarness) resetPeer(addr string, fallback bool) { - if fallback { - h.fallbackAddrs[addr] = h.newPeer() - } else { - h.peers[addr] = h.newPeer() - } - - // Establish a mock connection between us and each peer. - inConn, outConn := pipe( - &conn{localAddr: addr, remoteAddr: "10.0.0.1:8333"}, - &conn{localAddr: "10.0.0.1:8333", remoteAddr: addr}, - ) - h.localConns[addr] = outConn - h.remoteConns[addr] = inConn -} - -// newPeer returns a new properly configured peer.Peer instance that will be -// used by the PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) newPeer() *peer.Peer { - return peer.NewInboundPeer(&peer.Config{ - ChainParams: &chainParams, - DisableRelayTx: true, - Listeners: peer.MessageListeners{ - OnGetData: func(p *peer.Peer, msg *wire.MsgGetData) { - go func() { - h.queriedPeer <- struct{}{} - }() - - for _, inv := range msg.InvList { - // Invs should always be for blocks. - require.Equal(h.t, wire.InvTypeBlock, inv.Type) - - // Invs should always be for known blocks. - block, ok := h.blocks[inv.Hash] - require.True(h.t, ok) - - switch atomic.LoadUint32(&h.shouldReply) { - // Don't reply if requested. - case 1: - continue - // Make the block invalid and send it. - case 2: - block = produceInvalidBlock(block) - } - - go p.QueueMessage(block, nil) - } - }, - }, - Services: wire.SFNodeNetwork | wire.SFNodeWitness, - AllowSelfConns: true, - }) -} - -// query requests the given blocks from the PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) query(blocks []*chainhash.Hash) ( - <-chan *wire.MsgBlock, <-chan error) { - - h.t.Helper() - - blockChan, errChan := h.dispatcher.Query(blocks) - select { - case err := <-errChan: - require.NoError(h.t, err) - default: - } - - for _, block := range blocks { - h.blocksQueried[*block]++ - } - - return blockChan, errChan -} - -// disablePeerReplies prevents the query peer from replying. -func (h *prunedBlockDispatcherHarness) disablePeerReplies() { - atomic.StoreUint32(&h.shouldReply, 1) -} - -// enablePeerReplies allows the query peer to reply. -func (h *prunedBlockDispatcherHarness) enablePeerReplies() { - atomic.StoreUint32(&h.shouldReply, 0) -} - -// enableInvalidPeerReplies -func (h *prunedBlockDispatcherHarness) enableInvalidPeerReplies() { - atomic.StoreUint32(&h.shouldReply, 2) -} - -// refreshPeers forces the RefreshPeersTicker to fire. -func (h *prunedBlockDispatcherHarness) refreshPeers() { - h.t.Helper() - - h.dispatcher.cfg.RefreshPeersTicker.(*ticker.Force).Force <- time.Now() -} - -// disconnectPeer simulates a peer disconnecting from the PrunedBlockDispatcher. -func (h *prunedBlockDispatcherHarness) disconnectPeer(addr string, fallback bool) { - h.t.Helper() - - h.peerMtx.Lock() - defer h.peerMtx.Unlock() - - require.Contains(h.t, h.peers, addr) - - // Obtain the current number of peers before disconnecting such that we - // can block until the peer has been fully disconnected. - h.dispatcher.peerMtx.Lock() - numPeers := len(h.dispatcher.currentPeers) - h.dispatcher.peerMtx.Unlock() - - h.peers[addr].Disconnect() - - require.Eventually(h.t, func() bool { - h.dispatcher.peerMtx.Lock() - defer h.dispatcher.peerMtx.Unlock() - return len(h.dispatcher.currentPeers) == numPeers-1 - }, time.Second, 200*time.Millisecond) - - // Reset the peer connection state to allow connections to them again. - h.resetPeer(addr, fallback) -} - -// assertPeerDialed asserts that a connection was made to the given peer. -func (h *prunedBlockDispatcherHarness) assertPeerDialed() { - h.t.Helper() - - select { - case <-h.dialedPeer: - case <-time.After(5 * time.Second): - h.t.Fatalf("expected peer to be dialed") - } -} - -// assertPeerDialedWithAddr asserts that a connection was made to the given peer. -func (h *prunedBlockDispatcherHarness) assertPeerDialedWithAddr(addr string) { - h.t.Helper() - - select { - case dialedAddr := <-h.dialedPeer: - require.Equal(h.t, addr, dialedAddr) - case <-time.After(5 * time.Second): - h.t.Fatalf("expected peer to be dialed") - } -} - -// assertPeerQueried asserts that query was sent to the given peer. -func (h *prunedBlockDispatcherHarness) assertPeerQueried() { - h.t.Helper() - - select { - case <-h.queriedPeer: - case <-time.After(5 * time.Second): - h.t.Fatalf("expected a peer to be queried") - } -} - -// assertPeerReplied asserts that the query peer replies with a block the -// PrunedBlockDispatcher queried for. -func (h *prunedBlockDispatcherHarness) assertPeerReplied( - blockChan <-chan *wire.MsgBlock, errChan <-chan error, - expectCompletionSignal bool) { - - h.t.Helper() - - select { - case block := <-blockChan: - blockHash := block.BlockHash() - _, ok := h.blocksQueried[blockHash] - require.True(h.t, ok) - - expBlock, ok := h.blocks[blockHash] - require.True(h.t, ok) - require.Equal(h.t, expBlock, block) - - // Decrement how many clients queried the same block. Once we - // have none left, remove it from the map. - h.blocksQueried[blockHash]-- - if h.blocksQueried[blockHash] == 0 { - delete(h.blocksQueried, blockHash) - } - - case <-time.After(5 * time.Second): - select { - case err := <-errChan: - h.t.Fatalf("received unexpected error send: %v", err) - default: - } - h.t.Fatal("expected reply from peer") - } - - // If we should expect a nil error to be sent by the internal - // workManager to signal completion of the request, wait for it now. - if expectCompletionSignal { - select { - case err := <-errChan: - require.NoError(h.t, err) - case <-time.After(5 * time.Second): - h.t.Fatal("expected nil err to signal completion") - } - } -} - -// assertNoPeerDialed asserts that the PrunedBlockDispatcher hasn't established -// a new peer connection. -func (h *prunedBlockDispatcherHarness) assertNoPeerDialed() { - h.t.Helper() - - select { - case peer := <-h.dialedPeer: - h.t.Fatalf("unexpected connection established with peer %v", peer) - case <-time.After(2 * time.Second): - } -} - -// assertNoReply asserts that the peer hasn't replied to a query. -func (h *prunedBlockDispatcherHarness) assertNoReply( - blockChan <-chan *wire.MsgBlock, errChan <-chan error) { - - h.t.Helper() - - select { - case block := <-blockChan: - h.t.Fatalf("received unexpected block %v", block.BlockHash()) - case err := <-errChan: - h.t.Fatalf("received unexpected error send: %v", err) - case <-time.After(2 * time.Second): - } -} - -// TestPrunedBlockDispatcherQuerySameBlock tests that client requests for the -// same block result in only fetching the block once while pending. -func TestPrunedBlockDispatcherQuerySameBlock(t *testing.T) { - t.Parallel() - - const numBlocks = 1 - const numPeers = 5 - const numRequests = numBlocks * numPeers - - h := newNetworkBlockTestHarness(t, numBlocks, numPeers, numPeers) - h.start() - defer h.stop() - - // Queue all the block requests one by one. - blockChans := make([]<-chan *wire.MsgBlock, 0, numRequests) - errChans := make([]<-chan error, 0, numRequests) - for i := 0; i < numRequests; i++ { - blockChan, errChan := h.query(h.hashes) - blockChans = append(blockChans, blockChan) - errChans = append(errChans, errChan) - } - - // We should only see one query. - h.assertPeerQueried() - for i := 0; i < numRequests; i++ { - h.assertPeerReplied(blockChans[i], errChans[i], i == 0) - } -} - -// TestPrunedBlockDispatcherMultipleGetData tests that a client requesting blocks -// that span across multiple queries works as intended. -func TestPrunedBlockDispatcherMultipleGetData(t *testing.T) { - t.Parallel() - - const maxRequestInvs = 5 - const numBlocks = (maxRequestInvs * 5) + 1 - - h := newNetworkBlockTestHarness(t, numBlocks, 1, 1) - h.dispatcher.cfg.MaxRequestInvs = maxRequestInvs - h.start() - defer h.stop() - - // Request all blocks. - blockChan, errChan := h.query(h.hashes) - - // Since we have more blocks than can fit in a single GetData message, - // we should expect multiple queries. For each query, we should expect - // wire.MaxInvPerMsg replies until we've received all of them. - blocksRecvd := 0 - numMsgs := (numBlocks / maxRequestInvs) - if numBlocks%wire.MaxInvPerMsg > 0 { - numMsgs++ - } - for i := 0; i < numMsgs; i++ { - h.assertPeerQueried() - for j := 0; j < maxRequestInvs; j++ { - expectCompletionSignal := blocksRecvd == numBlocks-1 - h.assertPeerReplied( - blockChan, errChan, expectCompletionSignal, - ) - - blocksRecvd++ - if blocksRecvd == numBlocks { - break - } - } - } -} - -// TestPrunedBlockDispatcherMultipleQueryPeers tests that client requests are -// distributed across multiple query peers. -func TestPrunedBlockDispatcherMultipleQueryPeers(t *testing.T) { - t.Parallel() - - const numBlocks = 10 - const numPeers = numBlocks / 2 - - h := newNetworkBlockTestHarness(t, numBlocks, numPeers, numPeers) - h.start() - defer h.stop() - - // Queue all the block requests one by one. - blockChans := make([]<-chan *wire.MsgBlock, 0, numBlocks) - errChans := make([]<-chan error, 0, numBlocks) - for i := 0; i < numBlocks; i++ { - blockChan, errChan := h.query(h.hashes[i : i+1]) - blockChans = append(blockChans, blockChan) - errChans = append(errChans, errChan) - } - - // We should see one query per block. - for i := 0; i < numBlocks; i++ { - h.assertPeerQueried() - h.assertPeerReplied(blockChans[i], errChans[i], true) - } -} - -// TestPrunedBlockDispatcherPeerPoller ensures that the peer poller can detect -// when more connections are required to satisfy a request. -func TestPrunedBlockDispatcherPeerPoller(t *testing.T) { - t.Parallel() - - // Initialize our harness as usual, but don't create any peers yet. - h := newNetworkBlockTestHarness(t, 1, 0, 2) - h.start() - defer h.stop() - - // We shouldn't see any peers dialed since we don't have any. - h.assertNoPeerDialed() - - // We'll then query for a block. - blockChan, errChan := h.query(h.hashes) - - // Refresh our peers. This would dial some peers, but we don't have any - // yet. - h.refreshPeers() - h.assertNoPeerDialed() - - // Add a new peer and force a refresh. We should see the peer be dialed. - // We'll disable replies for now, as we'll want to test the disconnect - // case. - h.disablePeerReplies() - peer := h.addPeer(false) - h.refreshPeers() - h.assertPeerDialedWithAddr(peer) - h.assertPeerQueried() - - // Disconnect our peer and re-enable replies. - h.disconnectPeer(peer, false) - h.enablePeerReplies() - h.assertNoReply(blockChan, errChan) - - // Force a refresh once again. Since the peer has disconnected, a new - // connection should be made and the peer should be queried again. - h.refreshPeers() - h.assertPeerDialed() - h.assertPeerQueried() - - // Add a fallback addresses and force refresh our peers again. We can - // afford to have one more query peer, so a connection should be made. - fallbackPeer := h.addPeer(true) - h.refreshPeers() - h.assertPeerDialedWithAddr(fallbackPeer) - - // Now that we know we've connected to the peer, we should be able to - // receive their response. - h.assertPeerReplied(blockChan, errChan, true) -} - -// TestPrunedBlockDispatcherInvalidBlock ensures that validation is performed on -// blocks received from peers, and that any peers which have sent an invalid -// block are banned and not connected to. -func TestPrunedBlockDispatcherInvalidBlock(t *testing.T) { - t.Parallel() - - h := newNetworkBlockTestHarness(t, 1, 1, 1) - h.start() - defer h.stop() - - // We'll start the test by signaling our peer to send an invalid block. - h.enableInvalidPeerReplies() - - // We'll then query for a block. We shouldn't see a response as the - // block should have failed validation. - blockChan, errChan := h.query(h.hashes) - h.assertPeerQueried() - h.assertNoReply(blockChan, errChan) - - // Since the peer sent us an invalid block, they should have been - // disconnected and banned. Refreshing our peers shouldn't result in a - // new connection attempt because we don't have any other peers - // available. - h.refreshPeers() - h.assertNoPeerDialed() - - // Signal to our peers to send valid replies and add a new peer. - h.enablePeerReplies() - _ = h.addPeer(false) - - // Force a refresh, which should cause our new peer to be dialed and - // queried. We expect them to send a valid block and fulfill our - // request. - h.refreshPeers() - h.assertPeerDialed() - h.assertPeerQueried() - h.assertPeerReplied(blockChan, errChan, true) -} - -func TestSatisfiesRequiredServices(t *testing.T) { - t.Parallel() - - testCases := []struct { - name string - services wire.ServiceFlag - ok bool - }{ - { - name: "full node, segwit", - services: wire.SFNodeWitness | wire.SFNodeNetwork, - ok: true, - }, - { - name: "full node segwit, signals limited", - services: wire.SFNodeWitness | wire.SFNodeNetwork | prunedNodeService, - ok: true, - }, - { - name: "full node, no segwit", - services: wire.SFNodeNetwork, - ok: false, - }, - { - name: "segwit, pure pruned", - services: wire.SFNodeWitness | prunedNodeService, - ok: false, - }, - } - for _, testCase := range testCases { - ok := satisfiesRequiredServices(testCase.services) - require.Equal( - t, testCase.ok, ok, fmt.Sprintf("test case: %v", testCase.name), - ) - } -} diff --git a/config.go b/config.go index 6a8263f..22875bb 100644 --- a/config.go +++ b/config.go @@ -23,7 +23,6 @@ import ( "github.com/btcsuite/btcwallet/netparams" "github.com/btcsuite/btcwallet/wallet" flags "github.com/jessevdk/go-flags" - "github.com/lightninglabs/neutrino" ) const ( @@ -274,9 +273,6 @@ func loadConfig() (*config, []string, error) { UseSPV: false, AddPeers: []string{}, ConnectPeers: []string{}, - MaxPeers: neutrino.MaxPeers, - BanDuration: neutrino.BanDuration, - BanThreshold: neutrino.BanThreshold, DBTimeout: wallet.DefaultDBTimeout, } @@ -545,61 +541,55 @@ func loadConfig() (*config, []string, error) { "::1": {}, } - if cfg.UseSPV { - neutrino.MaxPeers = cfg.MaxPeers - neutrino.BanDuration = cfg.BanDuration - neutrino.BanThreshold = cfg.BanThreshold + if cfg.RPCConnect == "" { + cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort) + } + + // Add default port to connect flag if missing. + cfg.RPCConnect, err = cfgutil.NormalizeAddress(cfg.RPCConnect, + activeNet.RPCClientPort) + if err != nil { + fmt.Fprintf(os.Stderr, + "Invalid rpcconnect network address: %v\n", err) + return nil, nil, err + } + + RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect) + if err != nil { + return nil, nil, err + } + if cfg.DisableClientTLS { + if _, ok := localhostListeners[RPCHost]; !ok { + str := "%s: the --noclienttls option may not be used " + + "when connecting RPC to non localhost " + + "addresses: %s" + err := fmt.Errorf(str, funcName, cfg.RPCConnect) + fmt.Fprintln(os.Stderr, err) + fmt.Fprintln(os.Stderr, usageMessage) + return nil, nil, err + } } else { - if cfg.RPCConnect == "" { - cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort) - } + // If CAFile is unset, choose either the copy or local btcd cert. + if !cfg.CAFile.ExplicitlySet() { + cfg.CAFile.Value = filepath.Join(cfg.AppDataDir.Value, defaultCAFilename) - // Add default port to connect flag if missing. - cfg.RPCConnect, err = cfgutil.NormalizeAddress(cfg.RPCConnect, - activeNet.RPCClientPort) - if err != nil { - fmt.Fprintf(os.Stderr, - "Invalid rpcconnect network address: %v\n", err) - return nil, nil, err - } - - RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect) - if err != nil { - return nil, nil, err - } - if cfg.DisableClientTLS { - if _, ok := localhostListeners[RPCHost]; !ok { - str := "%s: the --noclienttls option may not be used " + - "when connecting RPC to non localhost " + - "addresses: %s" - err := fmt.Errorf(str, funcName, cfg.RPCConnect) + // If the CA copy does not exist, check if we're connecting to + // a local btcd and switch to its RPC cert if it exists. + certExists, err := cfgutil.FileExists(cfg.CAFile.Value) + if err != nil { fmt.Fprintln(os.Stderr, err) - fmt.Fprintln(os.Stderr, usageMessage) return nil, nil, err } - } else { - // If CAFile is unset, choose either the copy or local btcd cert. - if !cfg.CAFile.ExplicitlySet() { - cfg.CAFile.Value = filepath.Join(cfg.AppDataDir.Value, defaultCAFilename) - - // If the CA copy does not exist, check if we're connecting to - // a local btcd and switch to its RPC cert if it exists. - certExists, err := cfgutil.FileExists(cfg.CAFile.Value) - if err != nil { - fmt.Fprintln(os.Stderr, err) - return nil, nil, err - } - if !certExists { - if _, ok := localhostListeners[RPCHost]; ok { - btcdCertExists, err := cfgutil.FileExists( - btcdDefaultCAFile) - if err != nil { - fmt.Fprintln(os.Stderr, err) - return nil, nil, err - } - if btcdCertExists { - cfg.CAFile.Value = btcdDefaultCAFile - } + if !certExists { + if _, ok := localhostListeners[RPCHost]; ok { + btcdCertExists, err := cfgutil.FileExists( + btcdDefaultCAFile) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return nil, nil, err + } + if btcdCertExists { + cfg.CAFile.Value = btcdDefaultCAFile } } } diff --git a/go.sum b/go.sum index f789217..7d1ccc1 100644 --- a/go.sum +++ b/go.sum @@ -74,7 +74,6 @@ github.com/lightninglabs/neutrino v0.12.1 h1:9umzk5kKNc/l3bAyak8ClSRP1qSulnjc6kp github.com/lightninglabs/neutrino v0.12.1/go.mod h1:GlKninWpRBbL7b8G0oQ36/8downfnFwKsr0hbRA6E/E= github.com/lightningnetwork/lnd/clock v1.0.1 h1:QQod8+m3KgqHdvVMV+2DRNNZS1GRFir8mHZYA+Z2hFo= github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg= -github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0= github.com/lightningnetwork/lnd/queue v1.0.1/go.mod h1:vaQwexir73flPW43Mrm7JOgJHmcEFBWWSl9HlyASoms= github.com/lightningnetwork/lnd/ticker v1.0.0 h1:S1b60TEGoTtCe2A0yeB+ecoj/kkS4qpwh6l+AkQEZwU= github.com/lightningnetwork/lnd/ticker v1.0.0/go.mod h1:iaLXJiVgI1sPANIF2qYYUJXjoksPNvGNYowB8aRbpX0= diff --git a/log.go b/log.go index a441889..19b4184 100644 --- a/log.go +++ b/log.go @@ -18,7 +18,6 @@ import ( "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wtxmgr" "github.com/jrick/logrotate/rotator" - "github.com/lightninglabs/neutrino" ) // logWriter implements an io.Writer that outputs to both standard output and @@ -70,7 +69,6 @@ func init() { rpcclient.UseLogger(chainLog) rpcserver.UseLogger(grpcLog) legacyrpc.UseLogger(legacyRPCLog) - neutrino.UseLogger(btcnLog) } // subsystemLoggers maps each subsystem identifier to its associated logger. diff --git a/wallet/wallet.go b/wallet/wallet.go index 482e57c..bcfd2c5 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -193,14 +193,6 @@ func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) { } w.chainClient = chainClient - // If the chain client is a NeutrinoClient instance, set a birthday so - // we don't download all the filters as we go. - switch cc := chainClient.(type) { - case *chain.NeutrinoClient: - cc.SetStartTime(w.Manager.Birthday()) - case *chain.BitcoindClient: - cc.SetBirthday(w.Manager.Birthday()) - } w.chainClientLock.Unlock() // TODO: It would be preferable to either run these goroutines @@ -364,21 +356,12 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error { return err } - // Neutrino relies on the information given to it by the cfheader server - // so it knows exactly whether it's synced up to the server's state or - // not, even on dev chains. To recover a Neutrino wallet, we need to - // make sure it's synced before we start scanning for addresses, - // otherwise we might miss some if we only scan up to its current sync - // point. - neutrinoRecovery := chainClient.BackEnd() == "neutrino" && - w.recoveryWindow > 0 - // We'll wait until the backend is synced to ensure we get the latest // MaxReorgDepth blocks to store. We don't do this for development // environments as we can't guarantee a lively chain, except for // Neutrino, where the cfheader server tells us what it believes the // chain tip is. - if !w.isDevEnv() || neutrinoRecovery { + if !w.isDevEnv() { log.Debug("Waiting for chain backend to sync to tip") if err := w.waitUntilBackendSynced(chainClient); err != nil { return err @@ -2285,18 +2268,6 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier, return nil, err } start = startHeader.Height - case *chain.BitcoindClient: - var err error - start, err = client.GetBlockHeight(startBlock.hash) - if err != nil { - return nil, err - } - case *chain.NeutrinoClient: - var err error - start, err = client.GetBlockHeight(startBlock.hash) - if err != nil { - return nil, err - } } } } @@ -2316,18 +2287,6 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier, return nil, err } end = endHeader.Height - case *chain.BitcoindClient: - var err error - start, err = client.GetBlockHeight(endBlock.hash) - if err != nil { - return nil, err - } - case *chain.NeutrinoClient: - var err error - end, err = client.GetBlockHeight(endBlock.hash) - if err != nil { - return nil, err - } } } }