lbcwallet/lbcwallet.go
2022-09-29 00:06:51 -07:00

224 lines
6 KiB
Go

// Copyright (c) 2013-2015 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"io/ioutil"
"net"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"sync"
"github.com/lbryio/lbcwallet/chain"
"github.com/lbryio/lbcwallet/rpc/legacyrpc"
"github.com/lbryio/lbcwallet/wallet"
"github.com/lbryio/lbcd/version"
)
var (
cfg *config
)
func main() {
// Use all processor cores.
runtime.GOMAXPROCS(runtime.NumCPU())
// Work around defer not working after os.Exit.
if err := walletMain(); err != nil {
os.Exit(1)
}
}
// walletMain is a work-around main function that is required since deferred
// functions (such as log flushing) are not called with calls to os.Exit.
// Instead, main runs this function and checks for a non-nil error, at which
// point any defers have already run, and if the error is non-nil, the program
// can be exited with an error exit status.
func walletMain() error {
// Load configuration and parse command line. This function also
// initializes logging and configures it accordingly.
tcfg, _, err := loadConfig()
if err != nil {
return err
}
cfg = tcfg
defer func() {
if logRotator != nil {
logRotator.Close()
}
}()
// Show version at startup.
log.Infof("Version %s", version.Full())
if cfg.Profile != "" {
go func() {
listenAddr := net.JoinHostPort("", cfg.Profile)
log.Infof("Profile server listening on %s", listenAddr)
profileRedirect := http.RedirectHandler("/debug/pprof",
http.StatusSeeOther)
http.Handle("/", profileRedirect)
log.Errorf("%v", http.ListenAndServe(listenAddr, nil))
}()
}
dbDir := networkDir(cfg.AppDataDir.Value, activeNet.Params)
loader := wallet.NewLoader(
activeNet.Params, dbDir, true, cfg.DBTimeout, 250,
)
// Create and start HTTP server to serve wallet client connections.
// This will be updated with the wallet and chain server RPC client
// created below after each is created.
legacyRPCServer, err := startRPCServers(loader)
if err != nil {
log.Errorf("Unable to create RPC servers: %v", err)
return err
}
go rpcClientConnectLoop(legacyRPCServer, loader)
loader.RunAfterLoad(func(w *wallet.Wallet) {
startWalletRPCServices(w, legacyRPCServer)
})
_, err = loader.OpenExistingWallet()
if err != nil {
log.Error(err)
return err
}
// Add interrupt handlers to shutdown the various process components
// before exiting. Interrupt handlers run in LIFO order, so the wallet
// (which should be closed last) is added first.
addInterruptHandler(func() {
err := loader.UnloadWallet()
if err != nil && err != wallet.ErrNotLoaded {
log.Errorf("Failed to close wallet: %v", err)
}
})
if legacyRPCServer != nil {
addInterruptHandler(func() {
log.Warn("Stopping legacy RPC server...")
legacyRPCServer.Stop()
log.Info("Legacy RPC server shutdown")
})
go func() {
<-legacyRPCServer.RequestProcessShutdown()
simulateInterrupt()
}()
}
<-interruptHandlersDone
log.Info("Shutdown complete")
return nil
}
// rpcClientConnectLoop continuously attempts a connection to the consensus RPC
// server. When a connection is established, the client is used to sync the
// loaded wallet, either immediately or when loaded at a later time.
//
// The legacy RPC is optional. If set, the connected RPC client will be
// associated with the server for RPC passthrough and to enable additional
// methods.
func rpcClientConnectLoop(legacyRPCServer *legacyrpc.Server, loader *wallet.Loader) {
certs := readCAFile()
for {
var (
chainClient chain.Interface
err error
)
chainClient, err = startChainRPC(certs)
if err != nil {
log.Errorf("Unable to open connection to consensus RPC server: %v", err)
continue
}
// Rather than inlining this logic directly into the loader
// callback, a function variable is used to avoid running any of
// this after the client disconnects by setting it to nil. This
// prevents the callback from associating a wallet loaded at a
// later time with a client that has already disconnected. A
// mutex is used to make this concurrent safe.
associateRPCClient := func(w *wallet.Wallet) {
w.SynchronizeRPC(chainClient)
if legacyRPCServer != nil {
legacyRPCServer.SetChainServer(chainClient)
}
}
mu := new(sync.Mutex)
loader.RunAfterLoad(func(w *wallet.Wallet) {
mu.Lock()
associate := associateRPCClient
mu.Unlock()
if associate != nil {
associate(w)
}
})
chainClient.WaitForShutdown()
mu.Lock()
associateRPCClient = nil
mu.Unlock()
loadedWallet, ok := loader.LoadedWallet()
if ok {
// Do not attempt a reconnect when the wallet was
// explicitly stopped.
if loadedWallet.ShuttingDown() {
return
}
loadedWallet.SetChainSynced(false)
// TODO: Rework the wallet so changing the RPC client
// does not require stopping and restarting everything.
loadedWallet.Stop()
loadedWallet.WaitForShutdown()
loadedWallet.Start()
}
}
}
func readCAFile() []byte {
// Read certificate file if TLS is not disabled.
var certs []byte
if !cfg.DisableClientTLS {
var err error
certs, err = ioutil.ReadFile(cfg.CAFile.Value)
if err != nil {
log.Warnf("Cannot open CA file: %v", err)
// If there's an error reading the CA file, continue
// with nil certs and without the client connection.
certs = nil
}
} else {
log.Info("Chain server RPC TLS is disabled")
}
return certs
}
// startChainRPC opens a RPC client connection to a server for blockchain
// services. This function uses the RPC options from the global config and
// there is no recovery in case the server is not available or if there is an
// authentication error. Instead, all requests to the client will simply error.
func startChainRPC(certs []byte) (*chain.RPCClient, error) {
log.Infof("Attempting RPC client connection to %v", cfg.RPCConnect)
rpcc, err := chain.NewRPCClient(activeNet.Params, cfg.RPCConnect,
cfg.RPCUser, cfg.RPCPass, certs, cfg.DisableClientTLS,
cfg.SkipVerify, 0)
if err != nil {
return nil, err
}
err = rpcc.Start()
return rpcc, err
}