Merge pull request #577 from wpaulino/initial-sync-birthday-block

wallet/wallet: redefine initial sync to birthday block not being set
This commit is contained in:
Olaoluwa Osuntokun 2019-01-22 19:32:36 -08:00 committed by GitHub
commit ba03278a64
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -322,291 +322,45 @@ func (w *Wallet) activeData(dbtx walletdb.ReadTx) ([]btcutil.Address, []wtxmgr.C
// finished. The birthday block can be passed in, if set, to ensure we can
// properly detect if it gets rolled back.
func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error {
// To start, if we've yet to find our birthday stamp, we'll do so now.
if birthdayStamp == nil {
var err error
birthdayStamp, err = w.syncToBirthday()
if err != nil {
return err
}
}
// If the wallet requested an on-chain recovery of its funds, we'll do
// so now.
if w.recoveryWindow > 0 {
// We'll start the recovery from our birthday unless we were
// in the middle of a previous recovery attempt. If that's the
// case, we'll resume from that point.
startHeight := birthdayStamp.Height
walletHeight := w.Manager.SyncedTo().Height
if walletHeight > startHeight {
startHeight = walletHeight
}
if err := w.recovery(startHeight); err != nil {
return err
}
}
// Compare previously-seen blocks against the current chain. If any of
// these blocks no longer exist, rollback all of the missing blocks
// before catching up with the rescan.
rollback := false
rollbackStamp := w.Manager.SyncedTo()
chainClient, err := w.requireChainClient()
if err != nil {
return err
}
// Request notifications for transactions sending to all wallet
// addresses.
var (
addrs []btcutil.Address
unspent []wtxmgr.Credit
)
err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error {
var err error
addrs, unspent, err = w.activeData(dbtx)
return err
})
if err != nil {
return err
}
startHeight := w.Manager.SyncedTo().Height
// We'll mark this as our first sync if we don't have any unspent
// outputs as known by the wallet. This'll allow us to skip a full
// rescan at this height, and instead wait for the backend to catch up.
isInitialSync := len(unspent) == 0
isRecovery := w.recoveryWindow > 0
birthday := w.Manager.Birthday()
// TODO(jrick): How should this handle a synced height earlier than
// the chain server best block?
// When no addresses have been generated for the wallet, the rescan can
// be skipped.
//
// TODO: This is only correct because activeData above returns all
// addresses ever created, including those that don't need to be watched
// anymore. This code should be updated when this assumption is no
// longer true, but worst case would result in an unnecessary rescan.
if isInitialSync || isRecovery {
// Find the latest checkpoint's height. This lets us catch up to
// at least that checkpoint, since we're synchronizing from
// scratch, and lets us avoid a bunch of costly DB transactions
// in the case when we're using BDB for the walletdb backend and
// Neutrino for the chain.Interface backend, and the chain
// backend starts synchronizing at the same time as the wallet.
_, bestHeight, err := chainClient.GetBestBlock()
if err != nil {
return err
}
checkHeight := bestHeight
if len(w.chainParams.Checkpoints) > 0 {
checkHeight = w.chainParams.Checkpoints[len(
w.chainParams.Checkpoints)-1].Height
}
logHeight := checkHeight
if bestHeight > logHeight {
logHeight = bestHeight
}
log.Infof("Catching up block hashes to height %d, this will "+
"take a while...", logHeight)
// Initialize the first database transaction.
tx, err := w.db.BeginReadWriteTx()
if err != nil {
return err
}
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
// Only allocate the recoveryMgr if we are actually in recovery
// mode.
var recoveryMgr *RecoveryManager
if isRecovery {
log.Infof("RECOVERY MODE ENABLED -- rescanning for "+
"used addresses with recovery_window=%d",
w.recoveryWindow)
// Initialize the recovery manager with a default batch
// size of 2000.
recoveryMgr = NewRecoveryManager(
w.recoveryWindow, recoveryBatchSize,
w.chainParams,
)
// In the event that this recovery is being resumed, we
// will need to repopulate all found addresses from the
// database. For basic recovery, we will only do so for
// the default scopes.
scopedMgrs, err := w.defaultScopeManagers()
if err != nil {
return err
}
txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey)
credits, err := w.TxStore.UnspentOutputs(txmgrNs)
if err != nil {
return err
}
err = recoveryMgr.Resurrect(ns, scopedMgrs, credits)
if err != nil {
return err
}
}
for height := startHeight; height <= bestHeight; height++ {
hash, err := chainClient.GetBlockHash(int64(height))
if err != nil {
tx.Rollback()
return err
}
// If we're using the Neutrino backend, we can check if
// it's current or not. For other backends we'll assume
// it is current if the best height has reached the
// last checkpoint.
isCurrent := func(bestHeight int32) bool {
switch c := chainClient.(type) {
case *chain.NeutrinoClient:
return c.CS.IsCurrent()
}
return bestHeight >= checkHeight
}
// If we've found the best height the backend knows
// about, and the backend is still synchronizing, we'll
// wait. We can give it a little bit of time to
// synchronize further before updating the best height
// based on the backend. Once we see that the backend
// has advanced, we can catch up to it.
for height == bestHeight && !isCurrent(bestHeight) {
time.Sleep(100 * time.Millisecond)
_, bestHeight, err = chainClient.GetBestBlock()
if err != nil {
tx.Rollback()
return err
}
}
header, err := chainClient.GetBlockHeader(hash)
if err != nil {
return err
}
// Check to see if this header's timestamp has surpassed
// our birthday or if we've surpassed one previously.
timestamp := header.Timestamp
if timestamp.After(birthday) || birthdayStamp != nil {
// If this is the first block past our birthday,
// record the block stamp so that we can use
// this as the starting point for the rescan.
// This will ensure we don't miss transactions
// that are sent to the wallet during an initial
// sync.
//
// NOTE: The birthday persisted by the wallet is
// two days before the actual wallet birthday,
// to deal with potentially inaccurate header
// timestamps.
if birthdayStamp == nil {
birthdayStamp = &waddrmgr.BlockStamp{
Height: height,
Hash: *hash,
Timestamp: timestamp,
}
log.Debugf("Found birthday block: "+
"height=%d, hash=%v",
birthdayStamp.Height,
birthdayStamp.Hash)
err := w.Manager.SetBirthdayBlock(
ns, *birthdayStamp, true,
)
if err != nil {
tx.Rollback()
return err
}
}
// If we are in recovery mode and the check
// passes, we will add this block to our list of
// blocks to scan for recovered addresses.
if isRecovery {
recoveryMgr.AddToBlockBatch(
hash, height, timestamp,
)
}
}
err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{
Hash: *hash,
Height: height,
Timestamp: timestamp,
})
if err != nil {
tx.Rollback()
return err
}
// If we are in recovery mode, attempt a recovery on
// blocks that have been added to the recovery manager's
// block batch thus far. If block batch is empty, this
// will be a NOP.
if isRecovery && height%recoveryBatchSize == 0 {
err := w.recoverDefaultScopes(
chainClient, tx, ns,
recoveryMgr.BlockBatch(),
recoveryMgr.State(),
)
if err != nil {
tx.Rollback()
return err
}
// Clear the batch of all processed blocks.
recoveryMgr.ResetBlockBatch()
}
// Every 10K blocks, commit and start a new database TX.
if height%10000 == 0 {
err = tx.Commit()
if err != nil {
tx.Rollback()
return err
}
log.Infof("Caught up to height %d", height)
tx, err = w.db.BeginReadWriteTx()
if err != nil {
return err
}
ns = tx.ReadWriteBucket(waddrmgrNamespaceKey)
}
}
// Perform one last recovery attempt for all blocks that were
// not batched at the default granularity of 2000 blocks.
if isRecovery {
err := w.recoverDefaultScopes(
chainClient, tx, ns, recoveryMgr.BlockBatch(),
recoveryMgr.State(),
)
if err != nil {
tx.Rollback()
return err
}
}
// Commit (or roll back) the final database transaction.
err = tx.Commit()
if err != nil {
tx.Rollback()
return err
}
log.Info("Done catching up block hashes")
// Since we've spent some time catching up block hashes, we
// might have new addresses waiting for us that were requested
// during initial sync. Make sure we have those before we
// request a rescan later on.
err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error {
var err error
addrs, unspent, err = w.activeData(dbtx)
return err
})
if err != nil {
return err
}
}
// Compare previously-seen blocks against the chain server. If any of
// these blocks no longer exist, rollback all of the missing blocks
// before catching up with the rescan.
rollback := false
rollbackStamp := w.Manager.SyncedTo()
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
addrmgrNs := tx.ReadWriteBucket(waddrmgrNamespaceKey)
txmgrNs := tx.ReadWriteBucket(wtxmgrNamespaceKey)
for height := rollbackStamp.Height; true; height-- {
hash, err := w.Manager.BlockHash(addrmgrNs, height)
if err != nil {
@ -631,48 +385,41 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error {
rollback = true
}
if rollback {
err := w.Manager.SetSyncedTo(addrmgrNs, &rollbackStamp)
if err != nil {
return err
}
// Rollback unconfirms transactions at and beyond the
// passed height, so add one to the new synced-to height
// to prevent unconfirming txs from the synced-to block.
err = w.TxStore.Rollback(txmgrNs, rollbackStamp.Height+1)
// If a rollback did not happen, we can proceed safely.
if !rollback {
return nil
}
// Otherwise, we'll mark this as our new synced height.
err := w.Manager.SetSyncedTo(addrmgrNs, &rollbackStamp)
if err != nil {
return err
}
// If the rollback happened to go beyond our birthday stamp,
// we'll need to find a new one by syncing with the chain again
// until finding one.
if rollbackStamp.Height <= birthdayStamp.Height &&
rollbackStamp.Hash != birthdayStamp.Hash {
err := w.Manager.SetBirthdayBlock(
addrmgrNs, rollbackStamp, true,
)
if err != nil {
return err
}
}
return nil
// Finally, we'll roll back our transaction store to reflect the
// stale state. `Rollback` unconfirms transactions at and beyond
// the passed height, so add one to the new synced-to height to
// prevent unconfirming transactions in the synced-to block.
return w.TxStore.Rollback(txmgrNs, rollbackStamp.Height+1)
})
if err != nil {
return err
}
// If a birthday stamp was found during the initial sync and the
// rollback causes us to revert it, update the birthday stamp so that it
// points at the new tip.
birthdayRollback := false
if birthdayStamp != nil && rollbackStamp.Height <= birthdayStamp.Height {
birthdayStamp = &rollbackStamp
birthdayRollback = true
log.Debugf("Found new birthday block after rollback: "+
"height=%d, hash=%v", birthdayStamp.Height,
birthdayStamp.Hash)
err := walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
return w.Manager.SetBirthdayBlock(
ns, *birthdayStamp, true,
)
})
if err != nil {
return nil
}
}
// Request notifications for connected and disconnected blocks.
//
// TODO(jrick): Either request this notification only once, or when
@ -685,18 +432,306 @@ func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error {
return err
}
// If this was our initial sync, we're recovering from our seed, or our
// birthday was rolled back due to a chain reorg, we'll dispatch a
// rescan from our birthday block to ensure we detect all relevant
// on-chain events from this point.
if isInitialSync || isRecovery || birthdayRollback {
return w.rescanWithTarget(addrs, unspent, birthdayStamp)
// Finally, we'll trigger a wallet rescan from the currently synced tip
// and request notifications for transactions sending to all wallet
// addresses and spending all wallet UTXOs.
var (
addrs []btcutil.Address
unspent []wtxmgr.Credit
)
err = walletdb.View(w.db, func(dbtx walletdb.ReadTx) error {
addrs, unspent, err = w.activeData(dbtx)
return err
})
if err != nil {
return err
}
// Otherwise, we'll rescan from tip.
return w.rescanWithTarget(addrs, unspent, nil)
}
// scanChain is a helper method that scans the chain from the starting height
// until the tip of the chain. The onBlock callback can be used to perform
// certain operations for every block that we process as we scan the chain.
func (w *Wallet) scanChain(startHeight int32,
onBlock func(int32, *chainhash.Hash, *wire.BlockHeader) error) error {
chainClient, err := w.requireChainClient()
if err != nil {
return err
}
// isCurrent is a helper function that we'll use to determine if the
// chain backend is currently synced. When running with a btcd or
// bitcoind backend, It will use the height of the latest checkpoint as
// its lower bound.
var latestCheckptHeight int32
if len(w.chainParams.Checkpoints) > 0 {
latestCheckptHeight = w.chainParams.
Checkpoints[len(w.chainParams.Checkpoints)-1].Height
}
isCurrent := func(bestHeight int32) bool {
switch c := chainClient.(type) {
case *chain.NeutrinoClient:
return c.CS.IsCurrent()
}
return bestHeight >= latestCheckptHeight
}
// Determine the latest height known to the chain backend and begin
// scanning the chain from the start height up until this point.
_, bestHeight, err := chainClient.GetBestBlock()
if err != nil {
return err
}
for height := startHeight; height <= bestHeight; height++ {
hash, err := chainClient.GetBlockHash(int64(height))
if err != nil {
return err
}
header, err := chainClient.GetBlockHeader(hash)
if err != nil {
return err
}
if err := onBlock(height, hash, header); err != nil {
return err
}
// If we've reached our best height and we're not current, we'll
// wait for blocks at tip to ensure we go through all existent
// blocks.
for height == bestHeight && !isCurrent(bestHeight) {
time.Sleep(100 * time.Millisecond)
_, bestHeight, err = chainClient.GetBestBlock()
if err != nil {
return err
}
}
}
return nil
}
// syncToBirthday attempts to sync the wallet's point of view of the chain until
// it finds the first block whose timestamp is above the wallet's birthday. The
// wallet's birthday is already two days in the past of its actual birthday, so
// this is relatively safe to do.
func (w *Wallet) syncToBirthday() (*waddrmgr.BlockStamp, error) {
var birthdayStamp *waddrmgr.BlockStamp
birthday := w.Manager.Birthday()
tx, err := w.db.BeginReadWriteTx()
if err != nil {
return nil, err
}
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
// We'll begin scanning the chain from our last sync point until finding
// the first block with a timestamp greater than our birthday. We'll use
// this block to represent our birthday stamp. errDone is an error we'll
// use to signal that we've found it and no longer need to keep scanning
// the chain.
errDone := errors.New("done")
err = w.scanChain(w.Manager.SyncedTo().Height, func(height int32,
hash *chainhash.Hash, header *wire.BlockHeader) error {
if header.Timestamp.After(birthday) {
log.Debugf("Found birthday block: height=%d, hash=%v",
height, hash)
birthdayStamp = &waddrmgr.BlockStamp{
Hash: *hash,
Height: height,
Timestamp: header.Timestamp,
}
err := w.Manager.SetBirthdayBlock(
ns, *birthdayStamp, true,
)
if err != nil {
return err
}
}
err = w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{
Hash: *hash,
Height: height,
Timestamp: header.Timestamp,
})
if err != nil {
return err
}
// Checkpoint our state every 10K blocks.
if height%10000 == 0 {
if err := tx.Commit(); err != nil {
return err
}
log.Infof("Caught up to height %d", height)
tx, err = w.db.BeginReadWriteTx()
if err != nil {
return err
}
ns = tx.ReadWriteBucket(waddrmgrNamespaceKey)
}
// If we've found our birthday, we can return errDone to signal
// that we should stop scanning the chain and persist our state.
if birthdayStamp != nil {
return errDone
}
return nil
})
if err != nil && err != errDone {
tx.Rollback()
return nil, err
}
// If a birthday stamp has yet to be found, we'll return an error
// indicating so.
if birthdayStamp == nil {
tx.Rollback()
return nil, fmt.Errorf("did not find a suitable birthday "+
"block with a timestamp greater than %v", birthday)
}
if err := tx.Commit(); err != nil {
tx.Rollback()
return nil, err
}
return birthdayStamp, nil
}
// recovery attempts to recover any unspent outputs that pay to any of our
// addresses starting from the specified height.
//
// NOTE: The starting height must be at least the height of the wallet's
// birthday or later.
func (w *Wallet) recovery(startHeight int32) error {
log.Infof("RECOVERY MODE ENABLED -- rescanning for used addresses "+
"with recovery_window=%d", w.recoveryWindow)
// We'll initialize the recovery manager with a default batch size of
// 2000.
recoveryMgr := NewRecoveryManager(
w.recoveryWindow, recoveryBatchSize, w.chainParams,
)
// In the event that this recovery is being resumed, we will need to
// repopulate all found addresses from the database. For basic recovery,
// we will only do so for the default scopes.
scopedMgrs, err := w.defaultScopeManagers()
if err != nil {
return err
}
tx, err := w.db.BeginReadWriteTx()
if err != nil {
return err
}
txMgrNS := tx.ReadBucket(wtxmgrNamespaceKey)
credits, err := w.TxStore.UnspentOutputs(txMgrNS)
if err != nil {
tx.Rollback()
return err
}
addrMgrNS := tx.ReadWriteBucket(waddrmgrNamespaceKey)
err = recoveryMgr.Resurrect(addrMgrNS, scopedMgrs, credits)
if err != nil {
tx.Rollback()
return err
}
// We'll also retrieve our chain backend client in order to filter the
// blocks as we go.
chainClient, err := w.requireChainClient()
if err != nil {
tx.Rollback()
return err
}
// We'll begin scanning the chain from the specified starting height.
// Since we assume that the lowest height we start with will at least be
// that of our birthday, we can just add every block we process from
// this point forward to the recovery batch.
err = w.scanChain(startHeight, func(height int32,
hash *chainhash.Hash, header *wire.BlockHeader) error {
recoveryMgr.AddToBlockBatch(hash, height, header.Timestamp)
// We'll checkpoint our current batch every 2K blocks, so we'll
// need to start a new database transaction. If our current
// batch is empty, then this will act as a NOP.
if height%recoveryBatchSize == 0 {
blockBatch := recoveryMgr.BlockBatch()
err := w.recoverDefaultScopes(
chainClient, tx, addrMgrNS, blockBatch,
recoveryMgr.State(),
)
if err != nil {
return err
}
// Clear the batch of all processed blocks.
recoveryMgr.ResetBlockBatch()
if err := tx.Commit(); err != nil {
return err
}
log.Infof("Recovered addresses from blocks %d-%d",
blockBatch[0].Height,
blockBatch[len(blockBatch)-1].Height)
tx, err = w.db.BeginReadWriteTx()
if err != nil {
return err
}
addrMgrNS = tx.ReadWriteBucket(waddrmgrNamespaceKey)
}
// Since the recovery in a way acts as a rescan, we'll update
// the wallet's tip to point to the current block so that we
// don't unnecessarily rescan the same block again later on.
return w.Manager.SetSyncedTo(addrMgrNS, &waddrmgr.BlockStamp{
Hash: *hash,
Height: height,
Timestamp: header.Timestamp,
})
})
if err != nil {
tx.Rollback()
return err
}
// Now that we've reached the chain tip, we can process our final batch
// with the remaining blocks if it did not reach its maximum size.
blockBatch := recoveryMgr.BlockBatch()
err = w.recoverDefaultScopes(
chainClient, tx, addrMgrNS, blockBatch, recoveryMgr.State(),
)
if err != nil {
tx.Rollback()
return err
}
// With the recovery complete, we can persist our new state and exit.
if err := tx.Commit(); err != nil {
tx.Rollback()
return err
}
log.Infof("Recovered addresses from blocks %d-%d", blockBatch[0].Height,
blockBatch[len(blockBatch)-1].Height)
return nil
}
// defaultScopeManagers fetches the ScopedKeyManagers from the wallet using the
// default set of key scopes.
func (w *Wallet) defaultScopeManagers() (