diff --git a/chain/chain.go b/chain/chain.go index 31a5560..7ad8863 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -40,13 +40,6 @@ type Client struct { dequeueNotification chan interface{} currentBlock chan *waddrmgr.BlockStamp - // Notification channels regarding the state of the client. These exist - // so other components can listen in on chain activity. These are - // initialized as nil, and must be created by calling one of the Listen* - // methods. - connected chan bool - notificationLock sync.Locker - quit chan struct{} wg sync.WaitGroup started bool @@ -65,7 +58,6 @@ func NewClient(chainParams *chaincfg.Params, connect, user, pass string, certs [ enqueueNotification: make(chan interface{}), dequeueNotification: make(chan interface{}), currentBlock: make(chan *waddrmgr.BlockStamp), - notificationLock: new(sync.Mutex), quit: make(chan struct{}), } ntfnCallbacks := btcrpcclient.NotificationHandlers{ @@ -155,6 +147,10 @@ func (c *Client) WaitForShutdown() { // btcrpcclient callbacks, which isn't very Go-like and doesn't allow // blocking client calls. type ( + // ClientConnected is a notification for when a client connection is + // opened or reestablished to the chain server. + ClientConnected struct{} + // BlockConnected is a notification for a newly-attached block to the // best chain. BlockConnected waddrmgr.BlockStamp @@ -234,7 +230,7 @@ func parseBlock(block *btcws.BlockDetails) (blk *txstore.Block, idx int, err err func (c *Client) onClientConnect() { log.Info("Established websocket RPC connection to btcd") - c.notifyConnected(true) + c.enqueueNotification <- ClientConnected{} } func (c *Client) onBlockConnected(hash *wire.ShaHash, height int32) { @@ -354,49 +350,3 @@ out: close(c.dequeueNotification) c.wg.Done() } - -// ErrDuplicateListen is returned for any attempts to listen for the same -// notification more than once. If callers must pass along a notifiation to -// multiple places, they must broadcast it themself. -var ErrDuplicateListen = errors.New("duplicate listen") - -type noopLocker struct{} - -func (noopLocker) Lock() {} -func (noopLocker) Unlock() {} - -// ListenConnected returns a channel that passes the current connection state -// of the client. This will be automatically sent to when the client is first -// connected, as well as the current state whenever NotifyConnected is -// forcibly called. -// -// If this is called twice, ErrDuplicateListen is returned. -func (c *Client) ListenConnected() (<-chan bool, error) { - c.notificationLock.Lock() - defer c.notificationLock.Unlock() - - if c.connected != nil { - return nil, ErrDuplicateListen - } - c.connected = make(chan bool) - c.notificationLock = noopLocker{} - return c.connected, nil -} - -func (c *Client) notifyConnected(connected bool) { - c.notificationLock.Lock() - if c.connected != nil { - c.connected <- connected - } - c.notificationLock.Unlock() -} - -// NotifyConnected sends the channel notification for a connected or -// disconnected client. This is exported so it can be called by other -// packages which require notifying the current connection state. -// -// TODO: This shouldn't exist, but the current notification API requires it. -func (c *Client) NotifyConnected() { - connected := !c.Client.Disconnected() - c.notifyConnected(connected) -} diff --git a/chainntfns.go b/chainntfns.go index cbe4006..db91956 100644 --- a/chainntfns.go +++ b/chainntfns.go @@ -25,9 +25,22 @@ import ( ) func (w *Wallet) handleChainNotifications() { + sync := func(w *Wallet) { + // At the moment there is no recourse if the rescan fails for + // some reason, however, the wallet will not be marked synced + // and many methods will error early since the wallet is known + // to be out of date. + err := w.syncWithChain() + if err != nil && !w.ShuttingDown() { + log.Warnf("Unable to synchronize wallet to chain: %v", err) + } + } + for n := range w.chainSvr.Notifications() { var err error switch n := n.(type) { + case chain.ClientConnected: + go sync(w) case chain.BlockConnected: w.connectBlock(waddrmgr.BlockStamp(n)) case chain.BlockDisconnected: @@ -59,7 +72,7 @@ func (w *Wallet) connectBlock(bs waddrmgr.BlockStamp) { } if err := w.Manager.SetSyncedTo(&bs); err != nil { - log.Errorf("failed to update address manager sync state in "+ + log.Errorf("Failed to update address manager sync state in "+ "connect block for hash %v (height %d): %v", bs.Hash, bs.Height, err) } diff --git a/rescan.go b/rescan.go index c3213f1..b9b4534 100644 --- a/rescan.go +++ b/rescan.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwallet/chain" + "github.com/btcsuite/btcwallet/txstore" "github.com/btcsuite/btcwallet/waddrmgr" ) @@ -33,9 +34,8 @@ type RescanProgressMsg struct { // RescanFinishedMsg reports the addresses that were rescanned when a // rescanfinished message was received rescanning a batch of addresses. type RescanFinishedMsg struct { - Addresses []btcutil.Address - Notification *chain.RescanFinished - WasInitialSync bool + Addresses []btcutil.Address + Notification *chain.RescanFinished } // RescanJob is a job to be processed by the RescanManager. The job includes @@ -147,9 +147,8 @@ out: continue } w.rescanFinished <- &RescanFinishedMsg{ - Addresses: curBatch.addrs, - Notification: n, - WasInitialSync: curBatch.initialSync, + Addresses: curBatch.addrs, + Notification: n, } curBatch, nextBatch = nextBatch, nil @@ -200,30 +199,25 @@ out: n := msg.Notification addrs := msg.Addresses noun := pickNoun(len(addrs), "address", "addresses") - if msg.WasInitialSync { - w.ResendUnminedTxs() - - bs := waddrmgr.BlockStamp{ - Hash: *n.Hash, - Height: n.Height, - } - err := w.Manager.SetSyncedTo(&bs) - if err != nil { - log.Errorf("Failed to update address "+ - "manager sync state for hash "+ - "%v (height %d): %v", n.Hash, - n.Height, err) - } - w.notifyConnectedBlock(bs) - - // Mark wallet as synced to chain so connected - // and disconnected block notifications are - // processed. - close(w.chainSynced) - } log.Infof("Finished rescan for %d %s (synced to block "+ "%s, height %d)", len(addrs), noun, n.Hash, n.Height) + bs := waddrmgr.BlockStamp{n.Height, *n.Hash} + if err := w.Manager.SetSyncedTo(&bs); err != nil { + log.Errorf("Failed to update address manager "+ + "sync state for hash %v (height %d): %v", + n.Hash, n.Height, err) + } + w.SetChainSynced(true) + + go w.ResendUnminedTxs() + + // TODO(jrick): The current websocket API requires + // notifying the block the rescan synced through to + // every connected client. This is code smell and + // should be removed or replaced with a more + // appropiate notification when the API is redone. + w.notifyConnectedBlock(bs) case <-w.quit: break out @@ -254,27 +248,18 @@ func (w *Wallet) rescanRPCHandler() { w.wg.Done() } -// RescanActiveAddresses begins a rescan for all active addresses of a wallet. -// This is intended to be used to sync a wallet back up to the current best -// block in the main chain, and is considered an initial sync rescan. -func (w *Wallet) RescanActiveAddresses() error { - addrs, err := w.Manager.AllActiveAddresses() - if err != nil { - return err - } - - // in case there are no addresses, we can skip queuing the rescan job - if len(addrs) == 0 { - close(w.chainSynced) +// Rescan begins a rescan for all active addresses and unspent outputs of +// a wallet. This is intended to be used to sync a wallet back up to the +// current best block in the main chain, and is considered an initial sync +// rescan. +func (w *Wallet) Rescan(addrs []btcutil.Address, unspent []txstore.Credit) error { + // Avoid rescan if there is no work to do. + if len(addrs) == 0 && len(unspent) == 0 { return nil } - unspents, err := w.TxStore.UnspentOutputs() - if err != nil { - return err - } - outpoints := make([]*wire.OutPoint, len(unspents)) - for i, output := range unspents { + outpoints := make([]*wire.OutPoint, len(unspent)) + for i, output := range unspent { outpoints[i] = output.OutPoint() } diff --git a/rpcserver.go b/rpcserver.go index 7365f56..a21d2ae 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -290,18 +290,17 @@ type rpcServer struct { // Channels read from other components from which notifications are // created. - connectedBlocks <-chan waddrmgr.BlockStamp - disconnectedBlocks <-chan waddrmgr.BlockStamp - newCredits <-chan txstore.Credit - newDebits <-chan txstore.Debits - minedCredits <-chan txstore.Credit - minedDebits <-chan txstore.Debits - managerLocked <-chan bool - confirmedBalance <-chan btcutil.Amount - unconfirmedBalance <-chan btcutil.Amount - chainServerConnected <-chan bool - registerWalletNtfns chan struct{} - registerChainSvrNtfns chan struct{} + connectedBlocks <-chan waddrmgr.BlockStamp + disconnectedBlocks <-chan waddrmgr.BlockStamp + newCredits <-chan txstore.Credit + newDebits <-chan txstore.Debits + minedCredits <-chan txstore.Credit + minedDebits <-chan txstore.Debits + managerLocked <-chan bool + confirmedBalance <-chan btcutil.Amount + unconfirmedBalance <-chan btcutil.Amount + //chainServerConnected <-chan bool + registerWalletNtfns chan struct{} // enqueueNotification and dequeueNotification handle both sides of an // infinitly growing queue for websocket client notifications. @@ -336,7 +335,6 @@ func newRPCServer(listenAddrs []string, maxPost, maxWebsockets int64) (*rpcServe registerWSC: make(chan *websocketClient), unregisterWSC: make(chan *websocketClient), registerWalletNtfns: make(chan struct{}), - registerChainSvrNtfns: make(chan struct{}), enqueueNotification: make(chan wsClientNotification), dequeueNotification: make(chan wsClientNotification), notificationHandlerQuit: make(chan struct{}), @@ -565,10 +563,6 @@ func (s *rpcServer) SetWallet(wallet *Wallet) { // With both the wallet and chain server set, all handlers are // ok to run. s.handlerLookup = lookupAnyHandler - - // Make sure already connected websocket clients get a notification - // if the chain RPC client connection is set and connected. - s.chainSvr.NotifyConnected() } } @@ -582,7 +576,6 @@ func (s *rpcServer) SetChainServer(chainSvr *chain.Client) { defer s.handlerLock.Unlock() s.chainSvr = chainSvr - s.registerChainSvrNtfns <- struct{}{} if s.wallet != nil { // If the wallet had already been set, there's no reason to keep @@ -941,13 +934,6 @@ func (s *rpcServer) WebsocketClientRPC(wsc *websocketClient) { return } - // TODO(jrick): this is crappy. kill it. - s.handlerLock.Lock() - if s.chainSvr != nil { - s.chainSvr.NotifyConnected() - } - s.handlerLock.Unlock() - // WebsocketClientRead is intentionally not run with the waitgroup // so it is ignored during shutdown. This is to prevent a hang during // shutdown where the goroutine is blocked on a read of the @@ -1127,8 +1113,6 @@ out: s.enqueueNotification <- confirmedBalance(n) case n := <-s.unconfirmedBalance: s.enqueueNotification <- unconfirmedBalance(n) - case n := <-s.chainServerConnected: - s.enqueueNotification <- btcdConnected(n) // Registration of all notifications is done by the handler so // it doesn't require another rpcServer mutex. @@ -1199,24 +1183,6 @@ out: s.confirmedBalance = confirmedBalance s.unconfirmedBalance = unconfirmedBalance - case <-s.registerChainSvrNtfns: - chainServerConnected, err := s.chainSvr.ListenConnected() - if err != nil { - log.Errorf("Could not register for chain server "+ - "connection changes: %v", err) - continue - } - s.chainServerConnected = chainServerConnected - - // Make sure already connected websocket clients get a - // notification for the current client connection state. - // - // TODO(jrick): I am appalled by doing this but trying - // not to change how notifications work for the moment. - // A revamped notification API without this horror will - // be implemented soon. - go s.chainSvr.NotifyConnected() - case <-s.quit: break out } @@ -1237,9 +1203,7 @@ func (s *rpcServer) drainNotifications() { case <-s.minedDebits: case <-s.confirmedBalance: case <-s.unconfirmedBalance: - case <-s.chainServerConnected: case <-s.registerWalletNtfns: - case <-s.registerChainSvrNtfns: } } } diff --git a/wallet.go b/wallet.go index d3fc0b6..23c4b8e 100644 --- a/wallet.go +++ b/wallet.go @@ -84,9 +84,10 @@ type Wallet struct { Manager *waddrmgr.Manager TxStore *txstore.Store - chainSvr *chain.Client - chainSvrLock sync.Locker - chainSynced chan struct{} // closed when synced + chainSvr *chain.Client + chainSvrLock sync.Locker + chainSvrSynced bool + chainSvrSyncMtx sync.Mutex lockedOutpoints map[wire.OutPoint]struct{} FeeIncrement btcutil.Amount @@ -131,7 +132,6 @@ func newWallet(mgr *waddrmgr.Manager, txs *txstore.Store) *Wallet { Manager: mgr, TxStore: txs, chainSvrLock: new(sync.Mutex), - chainSynced: make(chan struct{}), lockedOutpoints: map[wire.OutPoint]struct{}{}, FeeIncrement: defaultFeeIncrement, rescanAddJob: make(chan *RescanJob), @@ -319,13 +319,6 @@ func (w *Wallet) Start(chainServer *chain.Client) { go w.rescanBatchHandler() go w.rescanProgressHandler() go w.rescanRPCHandler() - - go func() { - err := w.syncWithChain() - if err != nil && !w.ShuttingDown() { - log.Warnf("Unable to synchronize wallet to chain: %v", err) - } - }() } // Stop signals all wallet goroutines to shutdown. @@ -366,40 +359,61 @@ func (w *Wallet) WaitForShutdown() { // ChainSynced returns whether the wallet has been attached to a chain server // and synced up to the best block on the main chain. func (w *Wallet) ChainSynced() bool { - select { - case <-w.chainSynced: - return true - default: - return false + w.chainSvrSyncMtx.Lock() + synced := w.chainSvrSynced + w.chainSvrSyncMtx.Unlock() + return synced +} + +// SetChainSynced marks whether the wallet is connected to and currently in sync +// with the latest block notified by the chain server. +// +// NOTE: Due to an API limitation with btcrpcclient, this may return true after +// the client disconnected (and is attempting a reconnect). This will be unknown +// until the reconnect notification is received, at which point the wallet can be +// marked out of sync again until after the next rescan completes. +func (w *Wallet) SetChainSynced(synced bool) { + w.chainSvrSyncMtx.Lock() + w.chainSvrSynced = synced + w.chainSvrSyncMtx.Unlock() +} + +// activeData returns the currently-active receiving addresses and all unspent +// outputs. This is primarely intended to provide the parameters for a +// rescan request. +func (w *Wallet) activeData() ([]btcutil.Address, []txstore.Credit, error) { + addrs, err := w.Manager.AllActiveAddresses() + if err != nil { + return nil, nil, err } + unspent, err := w.TxStore.UnspentOutputs() + return addrs, unspent, err } -// WaitForChainSync blocks until a wallet has been synced with the main chain -// of an attached chain server. -func (w *Wallet) WaitForChainSync() { - <-w.chainSynced -} - -// SyncedChainTip returns the hash and height of the block of the most -// recently seen block in the main chain. It returns errors if the -// wallet has not yet been marked as synched with the chain. -func (w *Wallet) SyncedChainTip() (*waddrmgr.BlockStamp, error) { - select { - case <-w.chainSynced: - return w.chainSvr.BlockStamp() - default: - return nil, ErrNotSynced +// syncWithChain brings the wallet up to date with the current chain server +// connection. It creates a rescan request and blocks until the rescan has +// finished. +// +func (w *Wallet) syncWithChain() error { + // Request notifications for connected and disconnected blocks. + // + // TODO(jrick): Either request this notification only once, or when + // btcrpcclient is modified to allow some notification request to not + // automatically resent on reconnect, include the notifyblocks request + // as well. I am leaning towards allowing off all btcrpcclient + // notification re-registrations, in which case the code here should be + // left as is. + err := w.chainSvr.NotifyBlocks() + if err != nil { + return err } -} -func (w *Wallet) syncWithChain() (err error) { - defer func() { - if err == nil { - // Request notifications for connected and disconnected - // blocks. - err = w.chainSvr.NotifyBlocks() - } - }() + // Request notifications for transactions sending to all wallet + // addresses. + addrs, unspent, err := w.activeData() + if err != nil { + return err + } // TODO(jrick): How should this handle a synced height earlier than // the chain server best block? @@ -411,8 +425,8 @@ func (w *Wallet) syncWithChain() (err error) { bs := iter.BlockStamp() log.Debugf("Checking for previous saved block with height %v hash %v", bs.Height, bs.Hash) - - if _, err := w.chainSvr.GetBlock(&bs.Hash); err != nil { + _, err = w.chainSvr.GetBlock(&bs.Hash) + if err != nil { continue } @@ -433,7 +447,7 @@ func (w *Wallet) syncWithChain() (err error) { break } - return w.RescanActiveAddresses() + return w.Rescan(addrs, unspent) } type (