Finish integration of Neutrino; still untested.

This commit is contained in:
Alex 2017-05-23 19:55:57 -06:00 committed by Olaoluwa Osuntokun
parent 9e5250e6d7
commit b5873a5b2c
6 changed files with 132 additions and 54 deletions

View file

@ -10,12 +10,15 @@ import (
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"path/filepath"
"runtime" "runtime"
"sync" "sync"
"github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/rpc/legacyrpc" "github.com/btcsuite/btcwallet/rpc/legacyrpc"
"github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightninglabs/neutrino"
) )
var ( var (
@ -140,14 +143,51 @@ func walletMain() error {
// associated with the server for RPC passthrough and to enable additional // associated with the server for RPC passthrough and to enable additional
// methods. // methods.
func rpcClientConnectLoop(legacyRPCServer *legacyrpc.Server, loader *wallet.Loader) { func rpcClientConnectLoop(legacyRPCServer *legacyrpc.Server, loader *wallet.Loader) {
certs := readCAFile() var certs []byte
if !cfg.UseSPV {
certs = readCAFile()
}
for { for {
chainClient, err := startChainRPC(certs) var (
chainClient chain.Interface
err error
)
if cfg.UseSPV {
var (
chainService *neutrino.ChainService
spvdb walletdb.DB
)
netDir := networkDir(cfg.AppDataDir.Value, activeNet.Params)
spvdb, err = walletdb.Create("bdb",
filepath.Join(netDir, "neutrino.db"))
defer spvdb.Close()
if err != nil {
log.Errorf("Unable to create Neutrino DB: %s", err)
continue
}
chainService, err = neutrino.NewChainService(
neutrino.Config{
DataDir: netDir,
Database: spvdb,
ChainParams: *activeNet.Params,
ConnectPeers: cfg.ConnectPeers,
AddPeers: cfg.AddPeers,
})
if err != nil {
log.Errorf("Couldn't create Neutrino ChainService: %s", err)
continue
}
chainService.Start()
chainClient = chain.NewSPVChain(chainService)
} else {
chainClient, err = startChainRPC(certs)
if err != nil { if err != nil {
log.Errorf("Unable to open connection to consensus RPC server: %v", err) log.Errorf("Unable to open connection to consensus RPC server: %v", err)
continue continue
} }
}
// Rather than inlining this logic directly into the loader // Rather than inlining this logic directly into the loader
// callback, a function variable is used to avoid running any of // callback, a function variable is used to avoid running any of

View file

@ -78,6 +78,7 @@ func (s *SPVChain) WaitForShutdown() {
func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
// TODO(roasbeef): add a block cache? // TODO(roasbeef): add a block cache?
// * which evication strategy? depends on use case // * which evication strategy? depends on use case
// Should the block cache be INSIDE neutrino instead of in btcwallet?
block, err := s.cs.GetBlockFromNetwork(*hash) block, err := s.cs.GetBlockFromNetwork(*hash)
if err != nil { if err != nil {
return nil, err return nil, err
@ -85,6 +86,18 @@ func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) {
return block.MsgBlock(), nil return block.MsgBlock(), nil
} }
// GetBlockHeight gets the height of a block by its hash. It serves as a
// replacement for the use of GetBlockVerboseTxAsync for the wallet package
// since we can't actually return a FutureGetBlockVerboseResult because the
// underlying type is private to btcrpcclient.
func (s *SPVChain) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
_, height, err := s.cs.GetBlockByHash(*hash)
if err != nil {
return 0, err
}
return int32(height), nil
}
// GetBestBlock replicates the RPC client's GetBestBlock command. // GetBestBlock replicates the RPC client's GetBestBlock command.
func (s *SPVChain) GetBestBlock() (*chainhash.Hash, int32, error) { func (s *SPVChain) GetBestBlock() (*chainhash.Hash, int32, error) {
header, height, err := s.cs.LatestBlock() header, height, err := s.cs.LatestBlock()

View file

@ -495,6 +495,17 @@ func loadConfig() (*config, []string, error) {
return nil, nil, err return nil, nil, err
} }
localhostListeners := map[string]struct{}{
"localhost": {},
"127.0.0.1": {},
"::1": {},
}
if cfg.UseSPV {
neutrino.MaxPeers = cfg.MaxPeers
neutrino.BanDuration = cfg.BanDuration
neutrino.BanThreshold = cfg.BanThreshold
} else {
if cfg.RPCConnect == "" { if cfg.RPCConnect == "" {
cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort) cfg.RPCConnect = net.JoinHostPort("localhost", activeNet.RPCClientPort)
} }
@ -508,11 +519,6 @@ func loadConfig() (*config, []string, error) {
return nil, nil, err return nil, nil, err
} }
localhostListeners := map[string]struct{}{
"localhost": {},
"127.0.0.1": {},
"::1": {},
}
RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect) RPCHost, _, err := net.SplitHostPort(cfg.RPCConnect)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -554,6 +560,7 @@ func loadConfig() (*config, []string, error) {
} }
} }
} }
}
// Only set default RPC listeners when there are no listeners set for // Only set default RPC listeners when there are no listeners set for
// the experimental RPC server. This is required to prevent the old RPC // the experimental RPC server. This is required to prevent the old RPC

2
glide.lock generated
View file

@ -97,7 +97,7 @@ imports:
- name: github.com/kkdai/bstream - name: github.com/kkdai/bstream
version: f391b8402d23024e7c0f624b31267a89998fca95 version: f391b8402d23024e7c0f624b31267a89998fca95
- name: github.com/lightninglabs/neutrino - name: github.com/lightninglabs/neutrino
version: 7306107b67bb4eea6f70bc598d28049ea00ac442 version: b96493137cf98477d84581f15c3c6ee60e9b9cec
repo: git@github.com:lightninglabs/neutrino repo: git@github.com:lightninglabs/neutrino
- name: golang.org/x/crypto - name: golang.org/x/crypto
version: 0fe963104e9d1877082f8fb38f816fcd97eb1d10 version: 0fe963104e9d1877082f8fb38f816fcd97eb1d10

View file

@ -258,7 +258,7 @@ func (s *Server) Stop() {
// functional bitcoin wallet RPC server. This can be called to enable RPC // functional bitcoin wallet RPC server. This can be called to enable RPC
// passthrough even before a loaded wallet is set, but the wallet's RPC client // passthrough even before a loaded wallet is set, but the wallet's RPC client
// is preferred. // is preferred.
func (s *Server) SetChainServer(chainClient *chain.RPCClient) { func (s *Server) SetChainServer(chainClient chain.Interface) {
s.handlerMu.Lock() s.handlerMu.Lock()
s.chainClient = chainClient s.chainClient = chainClient
s.handlerMu.Unlock() s.handlerMu.Unlock()

View file

@ -67,7 +67,7 @@ type Wallet struct {
Manager *waddrmgr.Manager Manager *waddrmgr.Manager
TxStore *wtxmgr.Store TxStore *wtxmgr.Store
chainClient *chain.RPCClient chainClient chain.Interface
chainClientLock sync.Mutex chainClientLock sync.Mutex
chainClientSynced bool chainClientSynced bool
chainClientSyncMtx sync.Mutex chainClientSyncMtx sync.Mutex
@ -139,7 +139,7 @@ func (w *Wallet) Start() {
// //
// This method is unstable and will be removed when all syncing logic is moved // This method is unstable and will be removed when all syncing logic is moved
// outside of the wallet package. // outside of the wallet package.
func (w *Wallet) SynchronizeRPC(chainClient *chain.RPCClient) { func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) {
w.quitMu.Lock() w.quitMu.Lock()
select { select {
case <-w.quit: case <-w.quit:
@ -1349,7 +1349,16 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier, cancel <
if chainClient == nil { if chainClient == nil {
return nil, errors.New("no chain server client") return nil, errors.New("no chain server client")
} }
startResp = chainClient.GetBlockVerboseTxAsync(startBlock.hash) switch client := chainClient.(type) {
case *chain.RPCClient:
startResp = client.GetBlockVerboseTxAsync(startBlock.hash)
case *chain.SPVChain:
var err error
start, err = client.GetBlockHeight(startBlock.hash)
if err != nil {
return nil, err
}
}
} }
} }
if endBlock != nil { if endBlock != nil {
@ -1359,7 +1368,16 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier, cancel <
if chainClient == nil { if chainClient == nil {
return nil, errors.New("no chain server client") return nil, errors.New("no chain server client")
} }
endResp = chainClient.GetBlockVerboseTxAsync(endBlock.hash) switch client := chainClient.(type) {
case *chain.RPCClient:
endResp = client.GetBlockVerboseTxAsync(endBlock.hash)
case *chain.SPVChain:
var err error
end, err = client.GetBlockHeight(endBlock.hash)
if err != nil {
return nil, err
}
}
} }
} }
if startResp != nil { if startResp != nil {