wallet/wallet: adds recovery for default scopes

This commit is contained in:
Conner Fromknecht 2018-04-26 01:49:03 -07:00 committed by Olaoluwa Osuntokun
parent 71ce1d5474
commit 4c6b9053b8

View file

@ -47,6 +47,11 @@ const (
InsecurePubPassphrase = "public"
walletDbWatchingOnlyName = "wowallet.db"
// recoveryBatchSize is the default number of blocks that will be
// scanned successively by the recovery manager, in the event that the
// wallet is started in recovery mode.
recoveryBatchSize = 2000
)
// ErrNotSynced describes an error where an operation cannot complete
@ -78,6 +83,8 @@ type Wallet struct {
lockedOutpoints map[wire.OutPoint]struct{}
recoveryWindow uint32
// Channels for rescan processing. Requests are added and merged with
// any waiting requests, before being sent to another goroutine to
// call the rescan RPC.
@ -335,6 +342,12 @@ func (w *Wallet) syncWithChain() error {
return err
}
startHeight := w.Manager.SyncedTo().Height
isRecovery := w.recoveryWindow > 0
isInitialSync := len(addrs) == 0 && len(unspent) == 0 &&
startHeight == 0
// TODO(jrick): How should this handle a synced height earlier than
// the chain server best block?
@ -345,7 +358,7 @@ func (w *Wallet) syncWithChain() error {
// addresses ever created, including those that don't need to be watched
// anymore. This code should be updated when this assumption is no
// longer true, but worst case would result in an unnecessary rescan.
if len(addrs) == 0 && len(unspent) == 0 && w.Manager.SyncedTo().Height == 0 {
if isInitialSync || isRecovery {
// Find the latest checkpoint's height. This lets us catch up to
// at least that checkpoint, since we're synchronizing from
// scratch, and lets us avoid a bunch of costly DB transactions
@ -378,7 +391,42 @@ func (w *Wallet) syncWithChain() error {
}
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
for height := int32(1); height <= bestHeight; height++ {
// Only allocate the recoveryMgr if we are actually in recovery
// mode.
var recoveryMgr *RecoveryManager
if isRecovery {
log.Infof("RECOVERY MODE ENABLED -- rescanning for "+
"used addresses with recovery_window=%d",
w.recoveryWindow)
// Initialize the recovery manager with a default batch
// size of 2000.
recoveryMgr = NewRecoveryManager(
w.recoveryWindow, recoveryBatchSize,
)
// In the event that this recovery is being resumed, we
// will need to repopulate all found addresses from the
// database. For basic recovery, we will only do so for
// the default scopes.
scopedMgrs, err := w.defaultScopeManagers()
if err != nil {
return err
}
txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey)
credits, err := w.TxStore.UnspentOutputs(txmgrNs)
if err != nil {
return err
}
err = recoveryMgr.Resurrect(ns, scopedMgrs, credits)
if err != nil {
return err
}
}
for height := startHeight; height <= bestHeight; height++ {
hash, err := chainClient.GetBlockHash(int64(height))
if err != nil {
tx.Rollback()
@ -422,16 +470,46 @@ func (w *Wallet) syncWithChain() error {
return err
}
// Check to see if this header's timestamp has surpassed
// our birthday. If we are in recovery mode and the
// check passes, we will add this block to our list of
// blocks to scan for recovered addresses.
timestamp := header.Timestamp
if isRecovery && timestamp.After(w.Manager.Birthday()) {
recoveryMgr.AddToBlockBatch(
hash, height, timestamp,
)
}
err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{
Hash: *hash,
Height: height,
Timestamp: header.Timestamp,
Timestamp: timestamp,
})
if err != nil {
tx.Rollback()
return err
}
// If we are in recovery mode, attempt a recovery on
// blocks that have been added to the recovery manager's
// block batch thus far. If block batch is empty, this
// will be a NOP.
if isRecovery && height%recoveryBatchSize == 0 {
err := w.recoverDefaultScopes(
chainClient, tx, ns,
recoveryMgr.BlockBatch(),
recoveryMgr.State(),
)
if err != nil {
tx.Rollback()
return err
}
// Clear the batch of all processed blocks.
recoveryMgr.ResetBlockBatch()
}
// Every 10K blocks, commit and start a new database TX.
if height%10000 == 0 {
err = tx.Commit()
@ -451,6 +529,19 @@ func (w *Wallet) syncWithChain() error {
}
}
// Perform one last recovery attempt for all blocks that were
// not batched at the default granularity of 2000 blocks.
if isRecovery {
err := w.recoverDefaultScopes(
chainClient, tx, ns, recoveryMgr.BlockBatch(),
recoveryMgr.State(),
)
if err != nil {
tx.Rollback()
return err
}
}
// Commit (or roll back) the final database transaction.
err = tx.Commit()
if err != nil {
@ -539,6 +630,410 @@ func (w *Wallet) syncWithChain() error {
return w.Rescan(addrs, unspent)
}
// defaultScopeManagers fetches the ScopedKeyManagers from the wallet using the
// default set of key scopes.
func (w *Wallet) defaultScopeManagers() (
map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager, error) {
scopedMgrs := make(map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager)
for _, scope := range waddrmgr.DefaultKeyScopes {
scopedMgr, err := w.Manager.FetchScopedKeyManager(scope)
if err != nil {
return nil, err
}
scopedMgrs[scope] = scopedMgr
}
return scopedMgrs, nil
}
// recoverDefaultScopes attempts to recover any addresses belonging to any
// active scoped key managers known to the wallet. Recovery of each scope's
// default account will be done iteratively against the same batch of blocks.
// TODO(conner): parallelize/pipeline/cache intermediate network requests
func (w *Wallet) recoverDefaultScopes(
chainClient chain.Interface,
tx walletdb.ReadWriteTx,
ns walletdb.ReadWriteBucket,
batch []wtxmgr.BlockMeta,
recoveryState *RecoveryState) error {
scopedMgrs, err := w.defaultScopeManagers()
if err != nil {
return err
}
return w.recoverScopedAddresses(
chainClient, tx, ns, batch, recoveryState, scopedMgrs,
)
}
// recoverAccountAddresses scans a range of blocks in attempts to recover any
// previously used addresses for a particular account derivation path. At a high
// level, the algorithm works as follows:
// 1) Ensure internal and external branch horizons are fully expanded.
// 2) Filter the entire range of blocks, stopping if a non-zero number of
// address are contained in a particular block.
// 3) Record all internal and external addresses found in the block.
// 4) Record any outpoints found in the block that should be watched for spends
// 5) Trim the range of blocks up to and including the one reporting the addrs.
// 6) Repeat from (1) if there are still more blocks in the range.
func (w *Wallet) recoverScopedAddresses(
chainClient chain.Interface,
tx walletdb.ReadWriteTx,
ns walletdb.ReadWriteBucket,
batch []wtxmgr.BlockMeta,
recoveryState *RecoveryState,
scopedMgrs map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager) error {
// If there are no blocks in the batch, we are done.
if len(batch) == 0 {
return nil
}
log.Infof("Scanning %d blocks for recoverable addresses", len(batch))
expandHorizons:
for scope, scopedMgr := range scopedMgrs {
scopeState := recoveryState.StateForScope(scope)
err := expandScopeHorizons(ns, scopedMgr, scopeState)
if err != nil {
return err
}
}
// With the internal and external horizons properly expanded, we now
// construct the filter blocks request. The request includes the range
// of blocks we intend to scan, in addition to the scope-index -> addr
// map for all internal and external branches.
filterReq := newFilterBlocksRequest(batch, scopedMgrs, recoveryState)
// Initiate the filter blocks request using our chain backend. If an
// error occurs, we are unable to proceed with the recovery.
filterResp, err := chainClient.FilterBlocks(filterReq)
if err != nil {
return err
}
// If the filter response is empty, this signals that the rest of the
// batch was completed, and no other addresses were discovered. As a
// result, no further modifications to our recovery state are required
// and we can proceed to the next batch.
if filterResp == nil {
return nil
}
// Otherwise, retrieve the block info for the block that detected a
// non-zero number of address matches.
block := batch[filterResp.BatchIndex]
// Log any non-trivial findings of addresses or outpoints.
logFilterBlocksResp(block, filterResp)
// Report any external or internal addresses found as a result of the
// appropriate branch recovery state. Adding indexes above the
// last-found index of either will result in the horizons being expanded
// upon the next iteration. Any found addresses are also marked used
// using the scoped key manager.
err = extendFoundAddresses(ns, filterResp, scopedMgrs, recoveryState)
if err != nil {
return err
}
// Update the global set of watched outpoints with any that were found
// in the block.
for outPoint := range filterResp.FoundOutPoints {
recoveryState.AddWatchedOutPoint(&outPoint)
}
// Finally, record all of the relevant transactions that were returned
// in the filter blocks response. This ensures that these transactions
// and their outputs are tracked when the final rescan is performed.
for _, txn := range filterResp.RelevantTxns {
txRecord, err := wtxmgr.NewTxRecordFromMsgTx(
txn, filterResp.BlockMeta.Time,
)
if err != nil {
return err
}
err = w.addRelevantTx(tx, txRecord, &filterResp.BlockMeta)
if err != nil {
return err
}
}
// Update the batch to indicate that we've processed all block through
// the one that returned found addresses.
batch = batch[filterResp.BatchIndex+1:]
// If this was not the last block in the batch, we will repeat the
// filtering process again after expanding our horizons.
if len(batch) > 0 {
goto expandHorizons
}
return nil
}
// expandScopeHorizons ensures that the ScopeRecoveryState has an adequately
// sized look ahead for both its internal and external branches. The keys
// derived here are added to the scope's recovery state, but do not affect the
// persistent state of the wallet. If any invalid child keys are detected, the
// horizon will be properly extended such that our lookahead always includes the
// proper number of valid child keys.
func expandScopeHorizons(ns walletdb.ReadWriteBucket,
scopedMgr *waddrmgr.ScopedKeyManager,
scopeState *ScopeRecoveryState) error {
// Compute the current external horizon and the number of addresses we
// must derive to ensure we maintain a sufficient recovery window for
// the external branch.
exHorizon, exWindow := scopeState.ExternalBranch.ExtendHorizon()
count, childIndex := uint32(0), exHorizon
for count < exWindow {
keyPath := externalKeyPath(childIndex)
addr, err := scopedMgr.DeriveFromKeyPath(ns, keyPath)
switch {
case err == hdkeychain.ErrInvalidChild:
// Record the existence of an invalid child with the
// external branch's recovery state. This also
// increments the branch's horizon so that it accounts
// for this skipped child index.
scopeState.ExternalBranch.MarkInvalidChild(childIndex)
childIndex++
continue
case err != nil:
return err
}
// Register the newly generated external address and child index
// with the external branch recovery state.
scopeState.ExternalBranch.AddAddr(childIndex, addr.Address())
childIndex++
count++
}
// Compute the current internal horizon and the number of addresses we
// must derive to ensure we maintain a sufficient recovery window for
// the internal branch.
inHorizon, inWindow := scopeState.InternalBranch.ExtendHorizon()
count, childIndex = 0, inHorizon
for count < inWindow {
keyPath := internalKeyPath(childIndex)
addr, err := scopedMgr.DeriveFromKeyPath(ns, keyPath)
switch {
case err == hdkeychain.ErrInvalidChild:
// Record the existence of an invalid child with the
// internal branch's recovery state. This also
// increments the branch's horizon so that it accounts
// for this skipped child index.
scopeState.InternalBranch.MarkInvalidChild(childIndex)
childIndex++
continue
case err != nil:
return err
}
// Register the newly generated internal address and child index
// with the internal branch recovery state.
scopeState.InternalBranch.AddAddr(childIndex, addr.Address())
childIndex++
count++
}
return nil
}
// externalKeyPath returns the relative external derivation path /0/0/index.
func externalKeyPath(index uint32) waddrmgr.DerivationPath {
return waddrmgr.DerivationPath{
Account: waddrmgr.DefaultAccountNum,
Branch: waddrmgr.ExternalBranch,
Index: index,
}
}
// internalKeyPath returns the relative internal derivation path /0/1/index.
func internalKeyPath(index uint32) waddrmgr.DerivationPath {
return waddrmgr.DerivationPath{
Account: waddrmgr.DefaultAccountNum,
Branch: waddrmgr.InternalBranch,
Index: index,
}
}
// newFilterBlocksRequest constructs FilterBlocksRequests using our current
// block range, scoped managers, and recovery state.
func newFilterBlocksRequest(batch []wtxmgr.BlockMeta,
scopedMgrs map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager,
recoveryState *RecoveryState) *chain.FilterBlocksRequest {
filterReq := &chain.FilterBlocksRequest{
Blocks: batch,
ExternalAddrs: make(map[waddrmgr.ScopedIndex]btcutil.Address),
InternalAddrs: make(map[waddrmgr.ScopedIndex]btcutil.Address),
WatchedOutPoints: recoveryState.WatchedOutPoints(),
}
// Populate the external and internal addresses by merging the addresses
// sets belong to all currently tracked scopes.
for scope := range scopedMgrs {
scopeState := recoveryState.StateForScope(scope)
for index, addr := range scopeState.ExternalBranch.Addrs() {
scopedIndex := waddrmgr.ScopedIndex{
Scope: scope,
Index: index,
}
filterReq.ExternalAddrs[scopedIndex] = addr
}
for index, addr := range scopeState.InternalBranch.Addrs() {
scopedIndex := waddrmgr.ScopedIndex{
Scope: scope,
Index: index,
}
filterReq.InternalAddrs[scopedIndex] = addr
}
}
return filterReq
}
// extendFoundAddresses accepts a filter blocks response that contains addresses
// found on chain, and advances the state of all relevant derivation paths to
// match the highest found child index for each branch.
func extendFoundAddresses(ns walletdb.ReadWriteBucket,
filterResp *chain.FilterBlocksResponse,
scopedMgrs map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager,
recoveryState *RecoveryState) error {
// Mark all recovered external addresses as used. This will be done only
// for scopes that reported a non-zero number of external addresses in
// this block.
for scope, indexes := range filterResp.FoundExternalAddrs {
// First, report all external child indexes found for this
// scope. This ensures that the external last-found index will
// be updated to include the maximum child index seen thus far.
scopeState := recoveryState.StateForScope(scope)
for index := range indexes {
scopeState.ExternalBranch.ReportFound(index)
}
scopedMgr := scopedMgrs[scope]
// Now, with all found addresses reported, derive and extend all
// external addresses up to and including the current last found
// index for this scope.
exNextUnfound := scopeState.ExternalBranch.NextUnfound()
exLastFound := exNextUnfound
if exLastFound > 0 {
exLastFound--
}
err := scopedMgr.ExtendExternalAddresses(
ns, waddrmgr.DefaultAccountNum, exLastFound,
)
if err != nil {
return err
}
// Finally, with the scope's addresses extended, we mark used
// the external addresses that were found in the block and
// belong to this scope.
for index := range indexes {
addr := scopeState.ExternalBranch.GetAddr(index)
err := scopedMgr.MarkUsed(ns, addr)
if err != nil {
return err
}
}
}
// Mark all recovered internal addresses as used. This will be done only
// for scopes that reported a non-zero number of internal addresses in
// this block.
for scope, indexes := range filterResp.FoundInternalAddrs {
// First, report all internal child indexes found for this
// scope. This ensures that the internal last-found index will
// be updated to include the maximum child index seen thus far.
scopeState := recoveryState.StateForScope(scope)
for index := range indexes {
scopeState.InternalBranch.ReportFound(index)
}
scopedMgr := scopedMgrs[scope]
// Now, with all found addresses reported, derive and extend all
// internal addresses up to and including the current last found
// index for this scope.
inNextUnfound := scopeState.InternalBranch.NextUnfound()
inLastFound := inNextUnfound
if inLastFound > 0 {
inLastFound--
}
err := scopedMgr.ExtendInternalAddresses(
ns, waddrmgr.DefaultAccountNum, inLastFound,
)
if err != nil {
return err
}
// Finally, with the scope's addresses extended, we mark used
// the internal addresses that were found in the blockand belong
// to this scope.
for index := range indexes {
addr := scopeState.InternalBranch.GetAddr(index)
err := scopedMgr.MarkUsed(ns, addr)
if err != nil {
return err
}
}
}
return nil
}
// logFilterBlocksResp provides useful logging information when filtering
// succeeded in finding relevant transactions.
func logFilterBlocksResp(block wtxmgr.BlockMeta,
resp *chain.FilterBlocksResponse) {
// Log the number of external addresses found in this block.
var nFoundExternal int
for _, indexes := range resp.FoundExternalAddrs {
nFoundExternal += len(indexes)
}
if nFoundExternal > 0 {
log.Infof("Recovered %d external addrs at height=%d hash=%v",
nFoundExternal, block.Height, block.Hash)
}
// Log the number of internal addresses found in this block.
var nFoundInternal int
for _, indexes := range resp.FoundInternalAddrs {
nFoundInternal += len(indexes)
}
if nFoundInternal > 0 {
log.Infof("Recovered %d internal addrs at height=%d hash=%v",
nFoundInternal, block.Height, block.Hash)
}
// Log the number of outpoints found in this block.
nFoundOutPoints := len(resp.FoundOutPoints)
if nFoundOutPoints > 0 {
log.Infof("Found %d spends from watched outpoints at "+
"height=%d hash=%v",
nFoundOutPoints, block.Height, block.Hash)
}
}
type (
createTxRequest struct {
account uint32
@ -2783,7 +3278,9 @@ func (w *Wallet) Database() walletdb.DB {
// Create creates an new wallet, writing it to an empty database. If the passed
// seed is non-nil, it is used. Otherwise, a secure random seed of the
// recommended length is generated.
func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Params) error {
func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Params,
birthday time.Time) error {
// If a seed was provided, ensure that it is of valid length. Otherwise,
// we generate a random seed for the wallet with the recommended seed
// length.
@ -2810,8 +3307,10 @@ func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Par
return err
}
err = waddrmgr.Create(addrmgrNs, seed, pubPass, privPass,
params, nil)
err = waddrmgr.Create(
addrmgrNs, seed, pubPass, privPass, params, nil,
birthday,
)
if err != nil {
return err
}
@ -2820,7 +3319,9 @@ func Create(db walletdb.DB, pubPass, privPass, seed []byte, params *chaincfg.Par
}
// Open loads an already-created wallet from the passed database and namespaces.
func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks, params *chaincfg.Params) (*Wallet, error) {
func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks,
params *chaincfg.Params, recoveryWindow uint32) (*Wallet, error) {
err := walletdb.View(db, func(tx walletdb.ReadTx) error {
waddrmgrBucket := tx.ReadBucket(waddrmgrNamespaceKey)
if waddrmgrBucket == nil {
@ -2879,6 +3380,7 @@ func Open(db walletdb.DB, pubPass []byte, cbs *waddrmgr.OpenCallbacks, params *c
Manager: addrMgr,
TxStore: txMgr,
lockedOutpoints: map[wire.OutPoint]struct{}{},
recoveryWindow: recoveryWindow,
rescanAddJob: make(chan *RescanJob),
rescanBatch: make(chan *rescanBatch),
rescanNotifications: make(chan interface{}),