diff --git a/wallet/wallet.go b/wallet/wallet.go index 0357254..e393ebb 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -47,6 +47,11 @@ const ( InsecurePubPassphrase = "public" walletDbWatchingOnlyName = "wowallet.db" + + // recoveryBatchSize is the default number of blocks that will be + // scanned successively by the recovery manager, in the event that the + // wallet is started in recovery mode. + recoveryBatchSize = 2000 ) // ErrNotSynced describes an error where an operation cannot complete @@ -78,6 +83,8 @@ type Wallet struct { lockedOutpoints map[wire.OutPoint]struct{} + recoveryWindow uint32 + // Channels for rescan processing. Requests are added and merged with // any waiting requests, before being sent to another goroutine to // call the rescan RPC. @@ -335,6 +342,12 @@ func (w *Wallet) syncWithChain() error { return err } + startHeight := w.Manager.SyncedTo().Height + + isRecovery := w.recoveryWindow > 0 + isInitialSync := len(addrs) == 0 && len(unspent) == 0 && + startHeight == 0 + // TODO(jrick): How should this handle a synced height earlier than // the chain server best block? @@ -345,7 +358,7 @@ func (w *Wallet) syncWithChain() error { // addresses ever created, including those that don't need to be watched // anymore. This code should be updated when this assumption is no // longer true, but worst case would result in an unnecessary rescan. - if len(addrs) == 0 && len(unspent) == 0 && w.Manager.SyncedTo().Height == 0 { + if isInitialSync || isRecovery { // Find the latest checkpoint's height. This lets us catch up to // at least that checkpoint, since we're synchronizing from // scratch, and lets us avoid a bunch of costly DB transactions @@ -378,7 +391,42 @@ func (w *Wallet) syncWithChain() error { } ns := tx.ReadWriteBucket(waddrmgrNamespaceKey) - for height := int32(1); height <= bestHeight; height++ { + // Only allocate the recoveryMgr if we are actually in recovery + // mode. + var recoveryMgr *RecoveryManager + if isRecovery { + log.Infof("RECOVERY MODE ENABLED -- rescanning for "+ + "used addresses with recovery_window=%d", + w.recoveryWindow) + + // Initialize the recovery manager with a default batch + // size of 2000. + recoveryMgr = NewRecoveryManager( + w.recoveryWindow, recoveryBatchSize, + ) + + // In the event that this recovery is being resumed, we + // will need to repopulate all found addresses from the + // database. For basic recovery, we will only do so for + // the default scopes. + scopedMgrs, err := w.defaultScopeManagers() + if err != nil { + return err + } + + txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey) + credits, err := w.TxStore.UnspentOutputs(txmgrNs) + if err != nil { + return err + } + + err = recoveryMgr.Resurrect(ns, scopedMgrs, credits) + if err != nil { + return err + } + } + + for height := startHeight; height <= bestHeight; height++ { hash, err := chainClient.GetBlockHash(int64(height)) if err != nil { tx.Rollback() @@ -422,16 +470,46 @@ func (w *Wallet) syncWithChain() error { return err } + // Check to see if this header's timestamp has surpassed + // our birthday. If we are in recovery mode and the + // check passes, we will add this block to our list of + // blocks to scan for recovered addresses. + timestamp := header.Timestamp + if isRecovery && timestamp.After(w.Manager.Birthday()) { + recoveryMgr.AddToBlockBatch( + hash, height, timestamp, + ) + } + err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{ Hash: *hash, Height: height, - Timestamp: header.Timestamp, + Timestamp: timestamp, }) if err != nil { tx.Rollback() return err } + // If we are in recovery mode, attempt a recovery on + // blocks that have been added to the recovery manager's + // block batch thus far. If block batch is empty, this + // will be a NOP. + if isRecovery && height%recoveryBatchSize == 0 { + err := w.recoverDefaultScopes( + chainClient, tx, ns, + recoveryMgr.BlockBatch(), + recoveryMgr.State(), + ) + if err != nil { + tx.Rollback() + return err + } + + // Clear the batch of all processed blocks. + recoveryMgr.ResetBlockBatch() + } + // Every 10K blocks, commit and start a new database TX. if height%10000 == 0 { err = tx.Commit() @@ -451,6 +529,19 @@ func (w *Wallet) syncWithChain() error { } } + // Perform one last recovery attempt for all blocks that were + // not batched at the default granularity of 2000 blocks. + if isRecovery { + err := w.recoverDefaultScopes( + chainClient, tx, ns, recoveryMgr.BlockBatch(), + recoveryMgr.State(), + ) + if err != nil { + tx.Rollback() + return err + } + } + // Commit (or roll back) the final database transaction. err = tx.Commit() if err != nil { @@ -539,6 +630,410 @@ func (w *Wallet) syncWithChain() error { return w.Rescan(addrs, unspent) } +// defaultScopeManagers fetches the ScopedKeyManagers from the wallet using the +// default set of key scopes. +func (w *Wallet) defaultScopeManagers() ( + map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager, error) { + + scopedMgrs := make(map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager) + for _, scope := range waddrmgr.DefaultKeyScopes { + scopedMgr, err := w.Manager.FetchScopedKeyManager(scope) + if err != nil { + return nil, err + } + + scopedMgrs[scope] = scopedMgr + } + + return scopedMgrs, nil +} + +// recoverDefaultScopes attempts to recover any addresses belonging to any +// active scoped key managers known to the wallet. Recovery of each scope's +// default account will be done iteratively against the same batch of blocks. +// TODO(conner): parallelize/pipeline/cache intermediate network requests +func (w *Wallet) recoverDefaultScopes( + chainClient chain.Interface, + tx walletdb.ReadWriteTx, + ns walletdb.ReadWriteBucket, + batch []wtxmgr.BlockMeta, + recoveryState *RecoveryState) error { + + scopedMgrs, err := w.defaultScopeManagers() + if err != nil { + return err + } + + return w.recoverScopedAddresses( + chainClient, tx, ns, batch, recoveryState, scopedMgrs, + ) +} + +// recoverAccountAddresses scans a range of blocks in attempts to recover any +// previously used addresses for a particular account derivation path. At a high +// level, the algorithm works as follows: +// 1) Ensure internal and external branch horizons are fully expanded. +// 2) Filter the entire range of blocks, stopping if a non-zero number of +// address are contained in a particular block. +// 3) Record all internal and external addresses found in the block. +// 4) Record any outpoints found in the block that should be watched for spends +// 5) Trim the range of blocks up to and including the one reporting the addrs. +// 6) Repeat from (1) if there are still more blocks in the range. +func (w *Wallet) recoverScopedAddresses( + chainClient chain.Interface, + tx walletdb.ReadWriteTx, + ns walletdb.ReadWriteBucket, + batch []wtxmgr.BlockMeta, + recoveryState *RecoveryState, + scopedMgrs map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager) error { + + // If there are no blocks in the batch, we are done. + if len(batch) == 0 { + return nil + } + + log.Infof("Scanning %d blocks for recoverable addresses", len(batch)) + +expandHorizons: + for scope, scopedMgr := range scopedMgrs { + scopeState := recoveryState.StateForScope(scope) + err := expandScopeHorizons(ns, scopedMgr, scopeState) + if err != nil { + return err + } + } + + // With the internal and external horizons properly expanded, we now + // construct the filter blocks request. The request includes the range + // of blocks we intend to scan, in addition to the scope-index -> addr + // map for all internal and external branches. + filterReq := newFilterBlocksRequest(batch, scopedMgrs, recoveryState) + + // Initiate the filter blocks request using our chain backend. If an + // error occurs, we are unable to proceed with the recovery. + filterResp, err := chainClient.FilterBlocks(filterReq) + if err != nil { + return err + } + + // If the filter response is empty, this signals that the rest of the + // batch was completed, and no other addresses were discovered. As a + // result, no further modifications to our recovery state are required + // and we can proceed to the next batch. + if filterResp == nil { + return nil + } + + // Otherwise, retrieve the block info for the block that detected a + // non-zero number of address matches. + block := batch[filterResp.BatchIndex] + + // Log any non-trivial findings of addresses or outpoints. + logFilterBlocksResp(block, filterResp) + + // Report any external or internal addresses found as a result of the + // appropriate branch recovery state. Adding indexes above the + // last-found index of either will result in the horizons being expanded + // upon the next iteration. Any found addresses are also marked used + // using the scoped key manager. + err = extendFoundAddresses(ns, filterResp, scopedMgrs, recoveryState) + if err != nil { + return err + } + + // Update the global set of watched outpoints with any that were found + // in the block. + for outPoint := range filterResp.FoundOutPoints { + recoveryState.AddWatchedOutPoint(&outPoint) + } + + // Finally, record all of the relevant transactions that were returned + // in the filter blocks response. This ensures that these transactions + // and their outputs are tracked when the final rescan is performed. + for _, txn := range filterResp.RelevantTxns { + txRecord, err := wtxmgr.NewTxRecordFromMsgTx( + txn, filterResp.BlockMeta.Time, + ) + if err != nil { + return err + } + + err = w.addRelevantTx(tx, txRecord, &filterResp.BlockMeta) + if err != nil { + return err + } + } + + // Update the batch to indicate that we've processed all block through + // the one that returned found addresses. + batch = batch[filterResp.BatchIndex+1:] + + // If this was not the last block in the batch, we will repeat the + // filtering process again after expanding our horizons. + if len(batch) > 0 { + goto expandHorizons + } + + return nil +} + +// expandScopeHorizons ensures that the ScopeRecoveryState has an adequately +// sized look ahead for both its internal and external branches. The keys +// derived here are added to the scope's recovery state, but do not affect the +// persistent state of the wallet. If any invalid child keys are detected, the +// horizon will be properly extended such that our lookahead always includes the +// proper number of valid child keys. +func expandScopeHorizons(ns walletdb.ReadWriteBucket, + scopedMgr *waddrmgr.ScopedKeyManager, + scopeState *ScopeRecoveryState) error { + + // Compute the current external horizon and the number of addresses we + // must derive to ensure we maintain a sufficient recovery window for + // the external branch. + exHorizon, exWindow := scopeState.ExternalBranch.ExtendHorizon() + count, childIndex := uint32(0), exHorizon + for count < exWindow { + keyPath := externalKeyPath(childIndex) + addr, err := scopedMgr.DeriveFromKeyPath(ns, keyPath) + switch { + case err == hdkeychain.ErrInvalidChild: + // Record the existence of an invalid child with the + // external branch's recovery state. This also + // increments the branch's horizon so that it accounts + // for this skipped child index. + scopeState.ExternalBranch.MarkInvalidChild(childIndex) + childIndex++ + continue + + case err != nil: + return err + } + + // Register the newly generated external address and child index + // with the external branch recovery state. + scopeState.ExternalBranch.AddAddr(childIndex, addr.Address()) + + childIndex++ + count++ + } + + // Compute the current internal horizon and the number of addresses we + // must derive to ensure we maintain a sufficient recovery window for + // the internal branch. + inHorizon, inWindow := scopeState.InternalBranch.ExtendHorizon() + count, childIndex = 0, inHorizon + for count < inWindow { + keyPath := internalKeyPath(childIndex) + addr, err := scopedMgr.DeriveFromKeyPath(ns, keyPath) + switch { + case err == hdkeychain.ErrInvalidChild: + // Record the existence of an invalid child with the + // internal branch's recovery state. This also + // increments the branch's horizon so that it accounts + // for this skipped child index. + scopeState.InternalBranch.MarkInvalidChild(childIndex) + childIndex++ + continue + + case err != nil: + return err + } + + // Register the newly generated internal address and child index + // with the internal branch recovery state. + scopeState.InternalBranch.AddAddr(childIndex, addr.Address()) + + childIndex++ + count++ + } + + return nil +} + +// externalKeyPath returns the relative external derivation path /0/0/index. +func externalKeyPath(index uint32) waddrmgr.DerivationPath { + return waddrmgr.DerivationPath{ + Account: waddrmgr.DefaultAccountNum, + Branch: waddrmgr.ExternalBranch, + Index: index, + } +} + +// internalKeyPath returns the relative internal derivation path /0/1/index. +func internalKeyPath(index uint32) waddrmgr.DerivationPath { + return waddrmgr.DerivationPath{ + Account: waddrmgr.DefaultAccountNum, + Branch: waddrmgr.InternalBranch, + Index: index, + } +} + +// newFilterBlocksRequest constructs FilterBlocksRequests using our current +// block range, scoped managers, and recovery state. +func newFilterBlocksRequest(batch []wtxmgr.BlockMeta, + scopedMgrs map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager, + recoveryState *RecoveryState) *chain.FilterBlocksRequest { + + filterReq := &chain.FilterBlocksRequest{ + Blocks: batch, + ExternalAddrs: make(map[waddrmgr.ScopedIndex]btcutil.Address), + InternalAddrs: make(map[waddrmgr.ScopedIndex]btcutil.Address), + WatchedOutPoints: recoveryState.WatchedOutPoints(), + } + + // Populate the external and internal addresses by merging the addresses + // sets belong to all currently tracked scopes. + for scope := range scopedMgrs { + scopeState := recoveryState.StateForScope(scope) + for index, addr := range scopeState.ExternalBranch.Addrs() { + scopedIndex := waddrmgr.ScopedIndex{ + Scope: scope, + Index: index, + } + filterReq.ExternalAddrs[scopedIndex] = addr + } + for index, addr := range scopeState.InternalBranch.Addrs() { + scopedIndex := waddrmgr.ScopedIndex{ + Scope: scope, + Index: index, + } + filterReq.InternalAddrs[scopedIndex] = addr + } + } + + return filterReq +} + +// extendFoundAddresses accepts a filter blocks response that contains addresses +// found on chain, and advances the state of all relevant derivation paths to +// match the highest found child index for each branch. +func extendFoundAddresses(ns walletdb.ReadWriteBucket, + filterResp *chain.FilterBlocksResponse, + scopedMgrs map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager, + recoveryState *RecoveryState) error { + + // Mark all recovered external addresses as used. This will be done only + // for scopes that reported a non-zero number of external addresses in + // this block. + for scope, indexes := range filterResp.FoundExternalAddrs { + // First, report all external child indexes found for this + // scope. This ensures that the external last-found index will + // be updated to include the maximum child index seen thus far. + scopeState := recoveryState.StateForScope(scope) + for index := range indexes { + scopeState.ExternalBranch.ReportFound(index) + } + + scopedMgr := scopedMgrs[scope] + + // Now, with all found addresses reported, derive and extend all + // external addresses up to and including the current last found + // index for this scope. + exNextUnfound := scopeState.ExternalBranch.NextUnfound() + + exLastFound := exNextUnfound + if exLastFound > 0 { + exLastFound-- + } + + err := scopedMgr.ExtendExternalAddresses( + ns, waddrmgr.DefaultAccountNum, exLastFound, + ) + if err != nil { + return err + } + + // Finally, with the scope's addresses extended, we mark used + // the external addresses that were found in the block and + // belong to this scope. + for index := range indexes { + addr := scopeState.ExternalBranch.GetAddr(index) + err := scopedMgr.MarkUsed(ns, addr) + if err != nil { + return err + } + } + } + + // Mark all recovered internal addresses as used. This will be done only + // for scopes that reported a non-zero number of internal addresses in + // this block. + for scope, indexes := range filterResp.FoundInternalAddrs { + // First, report all internal child indexes found for this + // scope. This ensures that the internal last-found index will + // be updated to include the maximum child index seen thus far. + scopeState := recoveryState.StateForScope(scope) + for index := range indexes { + scopeState.InternalBranch.ReportFound(index) + } + + scopedMgr := scopedMgrs[scope] + + // Now, with all found addresses reported, derive and extend all + // internal addresses up to and including the current last found + // index for this scope. + inNextUnfound := scopeState.InternalBranch.NextUnfound() + + inLastFound := inNextUnfound + if inLastFound > 0 { + inLastFound-- + } + err := scopedMgr.ExtendInternalAddresses( + ns, waddrmgr.DefaultAccountNum, inLastFound, + ) + if err != nil { + return err + } + + // Finally, with the scope's addresses extended, we mark used + // the internal addresses that were found in the blockand belong + // to this scope. + for index := range indexes { + addr := scopeState.InternalBranch.GetAddr(index) + err := scopedMgr.MarkUsed(ns, addr) + if err != nil { + return err + } + } + } + + return nil +} + +// logFilterBlocksResp provides useful logging information when filtering +// succeeded in finding relevant transactions. +func logFilterBlocksResp(block wtxmgr.BlockMeta, + resp *chain.FilterBlocksResponse) { + + // Log the number of external addresses found in this block. + var nFoundExternal int + for _, indexes := range resp.FoundExternalAddrs { + nFoundExternal += len(indexes) + } + if nFoundExternal > 0 { + log.Infof("Recovered %d external addrs at height=%d hash=%v", + nFoundExternal, block.Height, block.Hash) + } + + // Log the number of internal addresses found in this block. + var nFoundInternal int + for _, indexes := range resp.FoundInternalAddrs { + nFoundInternal += len(indexes) + } + if nFoundInternal > 0 { + log.Infof("Recovered %d internal addrs at height=%d hash=%v", + nFoundInternal, block.Height, block.Hash) + } + + // Log the number of outpoints found in this block. + nFoundOutPoints := len(resp.FoundOutPoints) + if nFoundOutPoints > 0 { + log.Infof("Found %d spends from watched outpoints at "+ + "height=%d hash=%v", + nFoundOutPoints, block.Height, block.Hash) + } +} + type ( createTxRequest struct { account uint32 @@ -2783,7 +3278,9 @@ func (w *Wallet) Database() walletdb.DB { // Create creates an new wallet, writing it to an empty database. If the passed // seed is non-nil, it is used. Otherwise, a secure random seed of the // recommended length is generated. -func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Params) error { +func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Params, + birthday time.Time) error { + // If a seed was provided, ensure that it is of valid length. Otherwise, // we generate a random seed for the wallet with the recommended seed // length. @@ -2810,8 +3307,10 @@ func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Par return err } - err = waddrmgr.Create(addrmgrNs, seed, pubPass, privPass, - params, nil) + err = waddrmgr.Create( + addrmgrNs, seed, pubPass, privPass, params, nil, + birthday, + ) if err != nil { return err } @@ -2820,7 +3319,9 @@ func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Par } // Open loads an already-created wallet from the passed database and namespaces. -func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks, params *chaincfg.Params) (*Wallet, error) { +func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks, + params *chaincfg.Params, recoveryWindow uint32) (*Wallet, error) { + err := walletdb.View(db, func(tx walletdb.ReadTx) error { waddrmgrBucket := tx.ReadBucket(waddrmgrNamespaceKey) if waddrmgrBucket == nil { @@ -2879,6 +3380,7 @@ func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks, params *c Manager: addrMgr, TxStore: txMgr, lockedOutpoints: map[wire.OutPoint]struct{}{}, + recoveryWindow: recoveryWindow, rescanAddJob: make(chan *RescanJob), rescanBatch: make(chan *rescanBatch), rescanNotifications: make(chan interface{}),