Rescan after btcd reconnect.

Otherwise, if the websocket connection to btcd is lost and
reestablished, there is no sync to the current best block, and btcd
will not notify wallet of new relevant transactions.
This commit is contained in:
Josh Rickmar 2015-02-19 15:01:22 -05:00
parent 5ad37374fb
commit 0a7b10d051
5 changed files with 117 additions and 191 deletions

View file

@ -40,13 +40,6 @@ type Client struct {
dequeueNotification chan interface{} dequeueNotification chan interface{}
currentBlock chan *waddrmgr.BlockStamp currentBlock chan *waddrmgr.BlockStamp
// Notification channels regarding the state of the client. These exist
// so other components can listen in on chain activity. These are
// initialized as nil, and must be created by calling one of the Listen*
// methods.
connected chan bool
notificationLock sync.Locker
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
started bool started bool
@ -65,7 +58,6 @@ func NewClient(chainParams *chaincfg.Params, connect, user, pass string, certs [
enqueueNotification: make(chan interface{}), enqueueNotification: make(chan interface{}),
dequeueNotification: make(chan interface{}), dequeueNotification: make(chan interface{}),
currentBlock: make(chan *waddrmgr.BlockStamp), currentBlock: make(chan *waddrmgr.BlockStamp),
notificationLock: new(sync.Mutex),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
ntfnCallbacks := btcrpcclient.NotificationHandlers{ ntfnCallbacks := btcrpcclient.NotificationHandlers{
@ -155,6 +147,10 @@ func (c *Client) WaitForShutdown() {
// btcrpcclient callbacks, which isn't very Go-like and doesn't allow // btcrpcclient callbacks, which isn't very Go-like and doesn't allow
// blocking client calls. // blocking client calls.
type ( type (
// ClientConnected is a notification for when a client connection is
// opened or reestablished to the chain server.
ClientConnected struct{}
// BlockConnected is a notification for a newly-attached block to the // BlockConnected is a notification for a newly-attached block to the
// best chain. // best chain.
BlockConnected waddrmgr.BlockStamp BlockConnected waddrmgr.BlockStamp
@ -234,7 +230,7 @@ func parseBlock(block *btcws.BlockDetails) (blk *txstore.Block, idx int, err err
func (c *Client) onClientConnect() { func (c *Client) onClientConnect() {
log.Info("Established websocket RPC connection to btcd") log.Info("Established websocket RPC connection to btcd")
c.notifyConnected(true) c.enqueueNotification <- ClientConnected{}
} }
func (c *Client) onBlockConnected(hash *wire.ShaHash, height int32) { func (c *Client) onBlockConnected(hash *wire.ShaHash, height int32) {
@ -354,49 +350,3 @@ out:
close(c.dequeueNotification) close(c.dequeueNotification)
c.wg.Done() c.wg.Done()
} }
// ErrDuplicateListen is returned for any attempts to listen for the same
// notification more than once. If callers must pass along a notifiation to
// multiple places, they must broadcast it themself.
var ErrDuplicateListen = errors.New("duplicate listen")
type noopLocker struct{}
func (noopLocker) Lock() {}
func (noopLocker) Unlock() {}
// ListenConnected returns a channel that passes the current connection state
// of the client. This will be automatically sent to when the client is first
// connected, as well as the current state whenever NotifyConnected is
// forcibly called.
//
// If this is called twice, ErrDuplicateListen is returned.
func (c *Client) ListenConnected() (<-chan bool, error) {
c.notificationLock.Lock()
defer c.notificationLock.Unlock()
if c.connected != nil {
return nil, ErrDuplicateListen
}
c.connected = make(chan bool)
c.notificationLock = noopLocker{}
return c.connected, nil
}
func (c *Client) notifyConnected(connected bool) {
c.notificationLock.Lock()
if c.connected != nil {
c.connected <- connected
}
c.notificationLock.Unlock()
}
// NotifyConnected sends the channel notification for a connected or
// disconnected client. This is exported so it can be called by other
// packages which require notifying the current connection state.
//
// TODO: This shouldn't exist, but the current notification API requires it.
func (c *Client) NotifyConnected() {
connected := !c.Client.Disconnected()
c.notifyConnected(connected)
}

View file

@ -25,9 +25,22 @@ import (
) )
func (w *Wallet) handleChainNotifications() { func (w *Wallet) handleChainNotifications() {
sync := func(w *Wallet) {
// At the moment there is no recourse if the rescan fails for
// some reason, however, the wallet will not be marked synced
// and many methods will error early since the wallet is known
// to be out of date.
err := w.syncWithChain()
if err != nil && !w.ShuttingDown() {
log.Warnf("Unable to synchronize wallet to chain: %v", err)
}
}
for n := range w.chainSvr.Notifications() { for n := range w.chainSvr.Notifications() {
var err error var err error
switch n := n.(type) { switch n := n.(type) {
case chain.ClientConnected:
go sync(w)
case chain.BlockConnected: case chain.BlockConnected:
w.connectBlock(waddrmgr.BlockStamp(n)) w.connectBlock(waddrmgr.BlockStamp(n))
case chain.BlockDisconnected: case chain.BlockDisconnected:
@ -59,7 +72,7 @@ func (w *Wallet) connectBlock(bs waddrmgr.BlockStamp) {
} }
if err := w.Manager.SetSyncedTo(&bs); err != nil { if err := w.Manager.SetSyncedTo(&bs); err != nil {
log.Errorf("failed to update address manager sync state in "+ log.Errorf("Failed to update address manager sync state in "+
"connect block for hash %v (height %d): %v", bs.Hash, "connect block for hash %v (height %d): %v", bs.Hash,
bs.Height, err) bs.Height, err)
} }

View file

@ -20,6 +20,7 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/txstore"
"github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/waddrmgr"
) )
@ -33,9 +34,8 @@ type RescanProgressMsg struct {
// RescanFinishedMsg reports the addresses that were rescanned when a // RescanFinishedMsg reports the addresses that were rescanned when a
// rescanfinished message was received rescanning a batch of addresses. // rescanfinished message was received rescanning a batch of addresses.
type RescanFinishedMsg struct { type RescanFinishedMsg struct {
Addresses []btcutil.Address Addresses []btcutil.Address
Notification *chain.RescanFinished Notification *chain.RescanFinished
WasInitialSync bool
} }
// RescanJob is a job to be processed by the RescanManager. The job includes // RescanJob is a job to be processed by the RescanManager. The job includes
@ -147,9 +147,8 @@ out:
continue continue
} }
w.rescanFinished <- &RescanFinishedMsg{ w.rescanFinished <- &RescanFinishedMsg{
Addresses: curBatch.addrs, Addresses: curBatch.addrs,
Notification: n, Notification: n,
WasInitialSync: curBatch.initialSync,
} }
curBatch, nextBatch = nextBatch, nil curBatch, nextBatch = nextBatch, nil
@ -200,30 +199,25 @@ out:
n := msg.Notification n := msg.Notification
addrs := msg.Addresses addrs := msg.Addresses
noun := pickNoun(len(addrs), "address", "addresses") noun := pickNoun(len(addrs), "address", "addresses")
if msg.WasInitialSync {
w.ResendUnminedTxs()
bs := waddrmgr.BlockStamp{
Hash: *n.Hash,
Height: n.Height,
}
err := w.Manager.SetSyncedTo(&bs)
if err != nil {
log.Errorf("Failed to update address "+
"manager sync state for hash "+
"%v (height %d): %v", n.Hash,
n.Height, err)
}
w.notifyConnectedBlock(bs)
// Mark wallet as synced to chain so connected
// and disconnected block notifications are
// processed.
close(w.chainSynced)
}
log.Infof("Finished rescan for %d %s (synced to block "+ log.Infof("Finished rescan for %d %s (synced to block "+
"%s, height %d)", len(addrs), noun, n.Hash, "%s, height %d)", len(addrs), noun, n.Hash,
n.Height) n.Height)
bs := waddrmgr.BlockStamp{n.Height, *n.Hash}
if err := w.Manager.SetSyncedTo(&bs); err != nil {
log.Errorf("Failed to update address manager "+
"sync state for hash %v (height %d): %v",
n.Hash, n.Height, err)
}
w.SetChainSynced(true)
go w.ResendUnminedTxs()
// TODO(jrick): The current websocket API requires
// notifying the block the rescan synced through to
// every connected client. This is code smell and
// should be removed or replaced with a more
// appropiate notification when the API is redone.
w.notifyConnectedBlock(bs)
case <-w.quit: case <-w.quit:
break out break out
@ -254,27 +248,18 @@ func (w *Wallet) rescanRPCHandler() {
w.wg.Done() w.wg.Done()
} }
// RescanActiveAddresses begins a rescan for all active addresses of a wallet. // Rescan begins a rescan for all active addresses and unspent outputs of
// This is intended to be used to sync a wallet back up to the current best // a wallet. This is intended to be used to sync a wallet back up to the
// block in the main chain, and is considered an initial sync rescan. // current best block in the main chain, and is considered an initial sync
func (w *Wallet) RescanActiveAddresses() error { // rescan.
addrs, err := w.Manager.AllActiveAddresses() func (w *Wallet) Rescan(addrs []btcutil.Address, unspent []txstore.Credit) error {
if err != nil { // Avoid rescan if there is no work to do.
return err if len(addrs) == 0 && len(unspent) == 0 {
}
// in case there are no addresses, we can skip queuing the rescan job
if len(addrs) == 0 {
close(w.chainSynced)
return nil return nil
} }
unspents, err := w.TxStore.UnspentOutputs() outpoints := make([]*wire.OutPoint, len(unspent))
if err != nil { for i, output := range unspent {
return err
}
outpoints := make([]*wire.OutPoint, len(unspents))
for i, output := range unspents {
outpoints[i] = output.OutPoint() outpoints[i] = output.OutPoint()
} }

View file

@ -290,18 +290,17 @@ type rpcServer struct {
// Channels read from other components from which notifications are // Channels read from other components from which notifications are
// created. // created.
connectedBlocks <-chan waddrmgr.BlockStamp connectedBlocks <-chan waddrmgr.BlockStamp
disconnectedBlocks <-chan waddrmgr.BlockStamp disconnectedBlocks <-chan waddrmgr.BlockStamp
newCredits <-chan txstore.Credit newCredits <-chan txstore.Credit
newDebits <-chan txstore.Debits newDebits <-chan txstore.Debits
minedCredits <-chan txstore.Credit minedCredits <-chan txstore.Credit
minedDebits <-chan txstore.Debits minedDebits <-chan txstore.Debits
managerLocked <-chan bool managerLocked <-chan bool
confirmedBalance <-chan btcutil.Amount confirmedBalance <-chan btcutil.Amount
unconfirmedBalance <-chan btcutil.Amount unconfirmedBalance <-chan btcutil.Amount
chainServerConnected <-chan bool //chainServerConnected <-chan bool
registerWalletNtfns chan struct{} registerWalletNtfns chan struct{}
registerChainSvrNtfns chan struct{}
// enqueueNotification and dequeueNotification handle both sides of an // enqueueNotification and dequeueNotification handle both sides of an
// infinitly growing queue for websocket client notifications. // infinitly growing queue for websocket client notifications.
@ -336,7 +335,6 @@ func newRPCServer(listenAddrs []string, maxPost, maxWebsockets int64) (*rpcServe
registerWSC: make(chan *websocketClient), registerWSC: make(chan *websocketClient),
unregisterWSC: make(chan *websocketClient), unregisterWSC: make(chan *websocketClient),
registerWalletNtfns: make(chan struct{}), registerWalletNtfns: make(chan struct{}),
registerChainSvrNtfns: make(chan struct{}),
enqueueNotification: make(chan wsClientNotification), enqueueNotification: make(chan wsClientNotification),
dequeueNotification: make(chan wsClientNotification), dequeueNotification: make(chan wsClientNotification),
notificationHandlerQuit: make(chan struct{}), notificationHandlerQuit: make(chan struct{}),
@ -565,10 +563,6 @@ func (s *rpcServer) SetWallet(wallet *Wallet) {
// With both the wallet and chain server set, all handlers are // With both the wallet and chain server set, all handlers are
// ok to run. // ok to run.
s.handlerLookup = lookupAnyHandler s.handlerLookup = lookupAnyHandler
// Make sure already connected websocket clients get a notification
// if the chain RPC client connection is set and connected.
s.chainSvr.NotifyConnected()
} }
} }
@ -582,7 +576,6 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) {
defer s.handlerLock.Unlock() defer s.handlerLock.Unlock()
s.chainSvr = chainSvr s.chainSvr = chainSvr
s.registerChainSvrNtfns <- struct{}{}
if s.wallet != nil { if s.wallet != nil {
// If the wallet had already been set, there's no reason to keep // If the wallet had already been set, there's no reason to keep
@ -941,13 +934,6 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) {
return return
} }
// TODO(jrick): this is crappy. kill it.
s.handlerLock.Lock()
if s.chainSvr != nil {
s.chainSvr.NotifyConnected()
}
s.handlerLock.Unlock()
// WebsocketClientRead is intentionally not run with the waitgroup // WebsocketClientRead is intentionally not run with the waitgroup
// so it is ignored during shutdown. This is to prevent a hang during // so it is ignored during shutdown. This is to prevent a hang during
// shutdown where the goroutine is blocked on a read of the // shutdown where the goroutine is blocked on a read of the
@ -1127,8 +1113,6 @@ out:
s.enqueueNotification <- confirmedBalance(n) s.enqueueNotification <- confirmedBalance(n)
case n := <-s.unconfirmedBalance: case n := <-s.unconfirmedBalance:
s.enqueueNotification <- unconfirmedBalance(n) s.enqueueNotification <- unconfirmedBalance(n)
case n := <-s.chainServerConnected:
s.enqueueNotification <- btcdConnected(n)
// Registration of all notifications is done by the handler so // Registration of all notifications is done by the handler so
// it doesn't require another rpcServer mutex. // it doesn't require another rpcServer mutex.
@ -1199,24 +1183,6 @@ out:
s.confirmedBalance = confirmedBalance s.confirmedBalance = confirmedBalance
s.unconfirmedBalance = unconfirmedBalance s.unconfirmedBalance = unconfirmedBalance
case <-s.registerChainSvrNtfns:
chainServerConnected, err := s.chainSvr.ListenConnected()
if err != nil {
log.Errorf("Could not register for chain server "+
"connection changes: %v", err)
continue
}
s.chainServerConnected = chainServerConnected
// Make sure already connected websocket clients get a
// notification for the current client connection state.
//
// TODO(jrick): I am appalled by doing this but trying
// not to change how notifications work for the moment.
// A revamped notification API without this horror will
// be implemented soon.
go s.chainSvr.NotifyConnected()
case <-s.quit: case <-s.quit:
break out break out
} }
@ -1237,9 +1203,7 @@ func (s *rpcServer) drainNotifications() {
case <-s.minedDebits: case <-s.minedDebits:
case <-s.confirmedBalance: case <-s.confirmedBalance:
case <-s.unconfirmedBalance: case <-s.unconfirmedBalance:
case <-s.chainServerConnected:
case <-s.registerWalletNtfns: case <-s.registerWalletNtfns:
case <-s.registerChainSvrNtfns:
} }
} }
} }

100
wallet.go
View file

@ -84,9 +84,10 @@ type Wallet struct {
Manager *waddrmgr.Manager Manager *waddrmgr.Manager
TxStore *txstore.Store TxStore *txstore.Store
chainSvr *chain.Client chainSvr *chain.Client
chainSvrLock sync.Locker chainSvrLock sync.Locker
chainSynced chan struct{} // closed when synced chainSvrSynced bool
chainSvrSyncMtx sync.Mutex
lockedOutpoints map[wire.OutPoint]struct{} lockedOutpoints map[wire.OutPoint]struct{}
FeeIncrement btcutil.Amount FeeIncrement btcutil.Amount
@ -131,7 +132,6 @@ func newWallet(mgr *waddrmgr.Manager, txs *txstore.Store) *Wallet {
Manager: mgr, Manager: mgr,
TxStore: txs, TxStore: txs,
chainSvrLock: new(sync.Mutex), chainSvrLock: new(sync.Mutex),
chainSynced: make(chan struct{}),
lockedOutpoints: map[wire.OutPoint]struct{}{}, lockedOutpoints: map[wire.OutPoint]struct{}{},
FeeIncrement: defaultFeeIncrement, FeeIncrement: defaultFeeIncrement,
rescanAddJob: make(chan *RescanJob), rescanAddJob: make(chan *RescanJob),
@ -319,13 +319,6 @@ func (w *Wallet) Start(chainServer *chain.Client) {
go w.rescanBatchHandler() go w.rescanBatchHandler()
go w.rescanProgressHandler() go w.rescanProgressHandler()
go w.rescanRPCHandler() go w.rescanRPCHandler()
go func() {
err := w.syncWithChain()
if err != nil && !w.ShuttingDown() {
log.Warnf("Unable to synchronize wallet to chain: %v", err)
}
}()
} }
// Stop signals all wallet goroutines to shutdown. // Stop signals all wallet goroutines to shutdown.
@ -366,40 +359,61 @@ func (w *Wallet) WaitForShutdown() {
// ChainSynced returns whether the wallet has been attached to a chain server // ChainSynced returns whether the wallet has been attached to a chain server
// and synced up to the best block on the main chain. // and synced up to the best block on the main chain.
func (w *Wallet) ChainSynced() bool { func (w *Wallet) ChainSynced() bool {
select { w.chainSvrSyncMtx.Lock()
case <-w.chainSynced: synced := w.chainSvrSynced
return true w.chainSvrSyncMtx.Unlock()
default: return synced
return false }
// SetChainSynced marks whether the wallet is connected to and currently in sync
// with the latest block notified by the chain server.
//
// NOTE: Due to an API limitation with btcrpcclient, this may return true after
// the client disconnected (and is attempting a reconnect). This will be unknown
// until the reconnect notification is received, at which point the wallet can be
// marked out of sync again until after the next rescan completes.
func (w *Wallet) SetChainSynced(synced bool) {
w.chainSvrSyncMtx.Lock()
w.chainSvrSynced = synced
w.chainSvrSyncMtx.Unlock()
}
// activeData returns the currently-active receiving addresses and all unspent
// outputs. This is primarely intended to provide the parameters for a
// rescan request.
func (w *Wallet) activeData() ([]btcutil.Address, []txstore.Credit, error) {
addrs, err := w.Manager.AllActiveAddresses()
if err != nil {
return nil, nil, err
} }
unspent, err := w.TxStore.UnspentOutputs()
return addrs, unspent, err
} }
// WaitForChainSync blocks until a wallet has been synced with the main chain // syncWithChain brings the wallet up to date with the current chain server
// of an attached chain server. // connection. It creates a rescan request and blocks until the rescan has
func (w *Wallet) WaitForChainSync() { // finished.
<-w.chainSynced //
} func (w *Wallet) syncWithChain() error {
// Request notifications for connected and disconnected blocks.
// SyncedChainTip returns the hash and height of the block of the most //
// recently seen block in the main chain. It returns errors if the // TODO(jrick): Either request this notification only once, or when
// wallet has not yet been marked as synched with the chain. // btcrpcclient is modified to allow some notification request to not
func (w *Wallet) SyncedChainTip() (*waddrmgr.BlockStamp, error) { // automatically resent on reconnect, include the notifyblocks request
select { // as well. I am leaning towards allowing off all btcrpcclient
case <-w.chainSynced: // notification re-registrations, in which case the code here should be
return w.chainSvr.BlockStamp() // left as is.
default: err := w.chainSvr.NotifyBlocks()
return nil, ErrNotSynced if err != nil {
return err
} }
}
func (w *Wallet) syncWithChain() (err error) { // Request notifications for transactions sending to all wallet
defer func() { // addresses.
if err == nil { addrs, unspent, err := w.activeData()
// Request notifications for connected and disconnected if err != nil {
// blocks. return err
err = w.chainSvr.NotifyBlocks() }
}
}()
// TODO(jrick): How should this handle a synced height earlier than // TODO(jrick): How should this handle a synced height earlier than
// the chain server best block? // the chain server best block?
@ -411,8 +425,8 @@ func (w *Wallet) syncWithChain() (err error) {
bs := iter.BlockStamp() bs := iter.BlockStamp()
log.Debugf("Checking for previous saved block with height %v hash %v", log.Debugf("Checking for previous saved block with height %v hash %v",
bs.Height, bs.Hash) bs.Height, bs.Hash)
_, err = w.chainSvr.GetBlock(&bs.Hash)
if _, err := w.chainSvr.GetBlock(&bs.Hash); err != nil { if err != nil {
continue continue
} }
@ -433,7 +447,7 @@ func (w *Wallet) syncWithChain() (err error) {
break break
} }
return w.RescanActiveAddresses() return w.Rescan(addrs, unspent)
} }
type ( type (