Stop wallet and close wallet DB on interrupt.

This corrects and simplifies the shutdown logic for interrupts, the
walletrpc.WalletLoaderService/CloseWallet RPC, and the legacy stop RPC
by both stopping all wallet processes and closing the wallet database.
It appears that this behavior broke as part of the wallet package
refactor, causing occasional nil pointer panics and memory faults when
closing the wallet database with active transactions.

Fixes #282.

Fixes #283.
This commit is contained in:
Josh Rickmar 2016-03-10 19:35:35 -05:00
parent 24fc8bb6c5
commit 2b79aad79c
4 changed files with 54 additions and 29 deletions

View file

@ -16,7 +16,6 @@ import (
"github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/rpc/legacyrpc" "github.com/btcsuite/btcwallet/rpc/legacyrpc"
"github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/walletdb"
) )
var ( var (
@ -77,18 +76,8 @@ func walletMain() error {
go rpcClientConnectLoop(legacyRPCServer, loader) go rpcClientConnectLoop(legacyRPCServer, loader)
} }
var closeDB func() error loader.RunAfterLoad(func(w *wallet.Wallet) {
defer func() {
if closeDB != nil {
err := closeDB()
if err != nil {
log.Errorf("Unable to close wallet database: %v", err)
}
}
}()
loader.RunAfterLoad(func(w *wallet.Wallet, db walletdb.DB) {
startWalletRPCServices(w, rpcs, legacyRPCServer) startWalletRPCServices(w, rpcs, legacyRPCServer)
closeDB = db.Close
}) })
if !cfg.NoInitialLoad { if !cfg.NoInitialLoad {
@ -101,7 +90,15 @@ func walletMain() error {
} }
} }
// Shutdown the server(s) when interrupt signal is received. // Add interrupt handlers to shutdown the various process components
// before exiting. Interrupt handlers run in LIFO order, so the wallet
// (which should be closed last) is added first.
addInterruptHandler(func() {
err := loader.UnloadWallet()
if err != nil && err != wallet.ErrNotLoaded {
log.Errorf("Failed to close wallet: %v", err)
}
})
if rpcs != nil { if rpcs != nil {
addInterruptHandler(func() { addInterruptHandler(func() {
// TODO: Does this need to wait for the grpc server to // TODO: Does this need to wait for the grpc server to
@ -112,15 +109,15 @@ func walletMain() error {
}) })
} }
if legacyRPCServer != nil { if legacyRPCServer != nil {
go func() {
<-legacyRPCServer.RequestProcessShutdown()
simulateInterrupt()
}()
addInterruptHandler(func() { addInterruptHandler(func() {
log.Warn("Stopping legacy RPC server...") log.Warn("Stopping legacy RPC server...")
legacyRPCServer.Stop() legacyRPCServer.Stop()
log.Info("Legacy RPC server shutdown") log.Info("Legacy RPC server shutdown")
}) })
go func() {
<-legacyRPCServer.RequestProcessShutdown()
simulateInterrupt()
}()
} }
<-interruptHandlersDone <-interruptHandlersDone
@ -158,7 +155,7 @@ func rpcClientConnectLoop(legacyRPCServer *legacyrpc.Server, loader *wallet.Load
} }
} }
mu := new(sync.Mutex) mu := new(sync.Mutex)
loader.RunAfterLoad(func(w *wallet.Wallet, db walletdb.DB) { loader.RunAfterLoad(func(w *wallet.Wallet) {
mu.Lock() mu.Lock()
associate := associateRPCClient associate := associateRPCClient
mu.Unlock() mu.Unlock()

View file

@ -806,13 +806,13 @@ func (s *loaderServer) WalletExists(ctx context.Context, req *pb.WalletExistsReq
func (s *loaderServer) CloseWallet(ctx context.Context, req *pb.CloseWalletRequest) ( func (s *loaderServer) CloseWallet(ctx context.Context, req *pb.CloseWalletRequest) (
*pb.CloseWalletResponse, error) { *pb.CloseWalletResponse, error) {
loadedWallet, ok := s.loader.LoadedWallet() err := s.loader.UnloadWallet()
if !ok { if err == wallet.ErrNotLoaded {
return nil, grpc.Errorf(codes.FailedPrecondition, "wallet is not loaded") return nil, grpc.Errorf(codes.FailedPrecondition, "wallet is not loaded")
} }
if err != nil {
loadedWallet.Stop() return nil, translateError(err)
loadedWallet.WaitForShutdown() }
return &pb.CloseWalletResponse{}, nil return &pb.CloseWalletResponse{}, nil
} }

View file

@ -27,6 +27,10 @@ var (
// create a wallet when the loader has already done so. // create a wallet when the loader has already done so.
ErrLoaded = errors.New("wallet already loaded") ErrLoaded = errors.New("wallet already loaded")
// ErrNotLoaded describes the error condition of attempting to close a
// loaded wallet when a wallet has not been loaded.
ErrNotLoaded = errors.New("wallet is not loaded")
// ErrExists describes the error condition of attempting to create a new // ErrExists describes the error condition of attempting to create a new
// wallet when one exists already. // wallet when one exists already.
ErrExists = errors.New("wallet already exists") ErrExists = errors.New("wallet already exists")
@ -40,7 +44,7 @@ var (
// //
// Loader is safe for concurrent access. // Loader is safe for concurrent access.
type Loader struct { type Loader struct {
callbacks []func(*Wallet, walletdb.DB) callbacks []func(*Wallet)
chainParams *chaincfg.Params chainParams *chaincfg.Params
dbDirPath string dbDirPath string
wallet *Wallet wallet *Wallet
@ -60,7 +64,7 @@ func NewLoader(chainParams *chaincfg.Params, dbDirPath string) *Loader {
// additional wallets. Requires mutex to be locked. // additional wallets. Requires mutex to be locked.
func (l *Loader) onLoaded(w *Wallet, db walletdb.DB) { func (l *Loader) onLoaded(w *Wallet, db walletdb.DB) {
for _, fn := range l.callbacks { for _, fn := range l.callbacks {
fn(w, db) fn(w)
} }
l.wallet = w l.wallet = w
@ -71,13 +75,12 @@ func (l *Loader) onLoaded(w *Wallet, db walletdb.DB) {
// RunAfterLoad adds a function to be executed when the loader creates or opens // RunAfterLoad adds a function to be executed when the loader creates or opens
// a wallet. Functions are executed in a single goroutine in the order they are // a wallet. Functions are executed in a single goroutine in the order they are
// added. // added.
func (l *Loader) RunAfterLoad(fn func(*Wallet, walletdb.DB)) { func (l *Loader) RunAfterLoad(fn func(*Wallet)) {
l.mu.Lock() l.mu.Lock()
if l.wallet != nil { if l.wallet != nil {
w := l.wallet w := l.wallet
db := l.db
l.mu.Unlock() l.mu.Unlock()
fn(w, db) fn(w)
} else { } else {
l.callbacks = append(l.callbacks, fn) l.callbacks = append(l.callbacks, fn)
l.mu.Unlock() l.mu.Unlock()
@ -157,6 +160,7 @@ func (l *Loader) CreateNewWallet(pubPassphrase, privPassphrase, seed []byte) (*W
if err != nil { if err != nil {
return nil, err return nil, err
} }
w.Start()
l.onLoaded(w, db) l.onLoaded(w, db)
return w, nil return w, nil
@ -240,6 +244,30 @@ func (l *Loader) LoadedWallet() (*Wallet, bool) {
return w, w != nil return w, w != nil
} }
// UnloadWallet stops the loaded wallet, if any, and closes the wallet database.
// This returns ErrNotLoaded if the wallet has not been loaded with
// CreateNewWallet or LoadExistingWallet. The Loader may be reused if this
// function returns without error.
func (l *Loader) UnloadWallet() error {
defer l.mu.Unlock()
l.mu.Lock()
if l.wallet == nil {
return ErrNotLoaded
}
l.wallet.Stop()
l.wallet.WaitForShutdown()
err := l.db.Close()
if err != nil {
return err
}
l.wallet = nil
l.db = nil
return nil
}
func fileExists(filePath string) (bool, error) { func fileExists(filePath string) (bool, error) {
_, err := os.Stat(filePath) _, err := os.Stat(filePath)
if err != nil { if err != nil {

View file

@ -148,7 +148,7 @@ func createWallet(cfg *config) error {
// Import the addresses in the legacy keystore to the new wallet if // Import the addresses in the legacy keystore to the new wallet if
// any exist, locking each wallet again when finished. // any exist, locking each wallet again when finished.
loader.RunAfterLoad(func(w *wallet.Wallet, db walletdb.DB) { loader.RunAfterLoad(func(w *wallet.Wallet) {
defer legacyKeyStore.Lock() defer legacyKeyStore.Lock()
fmt.Println("Importing addresses from existing wallet...") fmt.Println("Importing addresses from existing wallet...")