Detect chain reorgs that happen during a rescan.

While here, switch rescan method to use hashes, not heights, to refer
to starting and ending blocks.

Closes #151.

ok @davecgh
This commit is contained in:
Josh Rickmar 2014-07-17 00:41:43 -05:00
parent da993eb034
commit 8e74343747

View file

@ -18,6 +18,7 @@ import (
"time" "time"
"code.google.com/p/go.crypto/ripemd160" "code.google.com/p/go.crypto/ripemd160"
"github.com/conformal/btcdb"
"github.com/conformal/btcjson" "github.com/conformal/btcjson"
"github.com/conformal/btcscript" "github.com/conformal/btcscript"
"github.com/conformal/btcutil" "github.com/conformal/btcutil"
@ -1460,6 +1461,13 @@ type rescanKeys struct {
unspent map[btcwire.OutPoint]struct{} unspent map[btcwire.OutPoint]struct{}
} }
// ErrRescanReorg defines the error that is returned when an unrecoverable
// reorganize is detected during a rescan.
var ErrRescanReorg = btcjson.Error{
Code: btcjson.ErrDatabase.Code,
Message: "Reorganize",
}
// rescanBlock rescans all transactions in a single block. This is a helper // rescanBlock rescans all transactions in a single block. This is a helper
// function for handleRescan. // function for handleRescan.
func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) { func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) {
@ -1593,8 +1601,63 @@ func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) {
} }
} }
// recoverFromReorg attempts to recover from a detected reorganize during a
// rescan. It fetches a new range of block shas from the database and
// verifies that the new range of blocks is on the same fork as a previous
// range of blocks. If this condition does not hold true, the JSON-RPC error
// for an unrecoverable reorganize is returned.
func recoverFromReorg(db btcdb.Db, minBlock, maxBlock int64,
lastBlock *btcutil.Block) ([]btcwire.ShaHash, *btcjson.Error) {
hashList, err := db.FetchHeightRange(minBlock, maxBlock)
if err != nil {
rpcsLog.Errorf("Error looking up block range: %v", err)
return nil, &btcjson.ErrDatabase
}
if lastBlock == nil || len(hashList) == 0 {
return hashList, nil
}
blk, err := db.FetchBlockBySha(&hashList[0])
if err != nil {
rpcsLog.Errorf("Error looking up possibly reorged block: %v",
err)
return nil, &btcjson.ErrDatabase
}
jsonErr := descendantBlock(lastBlock, blk)
if jsonErr != nil {
return nil, jsonErr
}
return hashList, nil
}
// descendantBlock returns the appropiate JSON-RPC error if a current block
// 'cur' fetched during a reorganize is not a direct child of the parent block
// 'prev'.
func descendantBlock(prev, cur *btcutil.Block) *btcjson.Error {
curSha := &cur.MsgBlock().Header.PrevBlock
prevSha, err := prev.Sha()
if err != nil {
rpcsLog.Errorf("Unknown problem creating block sha: %v", err)
return &btcjson.ErrInternal
}
if !prevSha.IsEqual(curSha) {
rpcsLog.Errorf("Stopping rescan for reorged block %v "+
"(replaced by block %v)", prevSha, curSha)
return &ErrRescanReorg
}
return nil
}
// handleRescan implements the rescan command extension for websocket // handleRescan implements the rescan command extension for websocket
// connections. // connections.
//
// NOTE: This does not smartly handle reorgs, and fixing requires database
// changes (for safe, concurrent access to full block ranges, and support
// for other chains than the best chain). It will, however, detect whether
// a reorg removed a block that was previously processed, and result in the
// handler erroring. Clients must handle this by finding a block still in
// the chain (perhaps from a rescanprogress notification) to resume their
// rescan.
func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) { func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error) {
cmd, ok := icmd.(*btcws.RescanCmd) cmd, ok := icmd.(*btcws.RescanCmd)
if !ok { if !ok {
@ -1678,9 +1741,30 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
lookups.unspent[*outpoint] = struct{}{} lookups.unspent[*outpoint] = struct{}{}
} }
minBlock := int64(cmd.BeginBlock) db := wsc.server.server.db
maxBlock := int64(cmd.EndBlock)
lastBlock := int64(-1) // -1 indicates no blocks scanned minBlockSha, err := btcwire.NewShaHashFromStr(cmd.BeginBlock)
if err != nil {
return nil, &btcjson.ErrDecodeHexString
}
minBlock, err := db.FetchBlockHeightBySha(minBlockSha)
if err != nil {
return nil, &btcjson.ErrBlockNotFound
}
maxBlock := btcdb.AllShas
if cmd.EndBlock != "" {
maxBlockSha, err := btcwire.NewShaHashFromStr(cmd.EndBlock)
if err != nil {
return nil, &btcjson.ErrDecodeHexString
}
maxBlock, err = db.FetchBlockHeightBySha(maxBlockSha)
if err != nil {
return nil, &btcjson.ErrBlockNotFound
}
}
var lastBlock *btcutil.Block
// A ticker is created to wait at least 10 seconds before notifying the // A ticker is created to wait at least 10 seconds before notifying the
// websocket client of the current progress completed by the rescan. // websocket client of the current progress completed by the rescan.
@ -1689,7 +1773,7 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
// FetchHeightRange may not return a complete list of block shas for // FetchHeightRange may not return a complete list of block shas for
// the given range, so fetch range as many times as necessary. // the given range, so fetch range as many times as necessary.
db := wsc.server.server.db fetchRange:
for minBlock < maxBlock { for minBlock < maxBlock {
hashList, err := db.FetchHeightRange(minBlock, maxBlock) hashList, err := db.FetchHeightRange(minBlock, maxBlock)
if err != nil { if err != nil {
@ -1700,11 +1784,56 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
break break
} }
loopHashList:
for i := range hashList { for i := range hashList {
blk, err := db.FetchBlockBySha(&hashList[i]) blk, err := db.FetchBlockBySha(&hashList[i])
if err != nil { if err != nil {
rpcsLog.Errorf("Error looking up block sha: %v", err) // Only handle reorgs if a block could not be
return nil, &btcjson.ErrDatabase // found for the hash.
if err != btcdb.BlockShaMissing {
rpcsLog.Errorf("Error looking up "+
"block: %v", err)
return nil, &btcjson.ErrDatabase
}
// If an absolute max block was specified, don't
// attempt to handle the reorg.
if maxBlock != btcdb.AllShas {
rpcsLog.Errorf("Stopping rescan for "+
"reorged block %v",
cmd.EndBlock)
return nil, &ErrRescanReorg
}
// If the lookup for the previously valid block
// hash failed, there may have been a reorg.
// Fetch a new range of block hashes and verify
// that the previously processed block (if there
// was any) still exists in the database. If it
// doesn't, we error.
//
// A goto is used to branch executation back to
// before the range was evaluated, as it must be
// reevaluated for the new hashList.
minBlock += int64(i)
var jsonErr *btcjson.Error
hashList, jsonErr = recoverFromReorg(db, minBlock,
maxBlock, lastBlock)
if jsonErr != nil {
return nil, jsonErr
}
if len(hashList) == 0 {
break fetchRange
}
goto loopHashList
}
if i == 0 && lastBlock != nil {
// Ensure the new hashList is on the same fork
// as the last block from the old hashList.
jsonErr := descendantBlock(lastBlock, blk)
if jsonErr != nil {
return nil, jsonErr
}
} }
// A select statement is used to stop rescans if the // A select statement is used to stop rescans if the
@ -1716,6 +1845,7 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
return nil, nil return nil, nil
default: default:
rescanBlock(wsc, &lookups, blk) rescanBlock(wsc, &lookups, blk)
lastBlock = blk
} }
// Periodically notify the client of the progress // Periodically notify the client of the progress
@ -1727,7 +1857,9 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
continue continue
} }
n := btcws.NewRescanProgressNtfn(int32(blk.Height())) n := btcws.NewRescanProgressNtfn(hashList[i].String(),
int32(blk.Height()),
blk.MsgBlock().Header.Timestamp.Unix())
mn, err := n.MarshalJSON() mn, err := n.MarshalJSON()
if err != nil { if err != nil {
rpcsLog.Errorf("Failed to marshal rescan "+ rpcsLog.Errorf("Failed to marshal rescan "+
@ -1744,7 +1876,6 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
} }
minBlock += int64(len(hashList)) minBlock += int64(len(hashList))
lastBlock = minBlock - 1
} }
// Notify websocket client of the finished rescan. Due to how btcd // Notify websocket client of the finished rescan. Due to how btcd
@ -1754,7 +1885,14 @@ func handleRescan(wsc *wsClient, icmd btcjson.Cmd) (interface{}, *btcjson.Error)
// received before the rescan RPC returns. Therefore, another method // received before the rescan RPC returns. Therefore, another method
// is needed to safely inform clients that all rescan notifiations have // is needed to safely inform clients that all rescan notifiations have
// been sent. // been sent.
n := btcws.NewRescanFinishedNtfn(int32(lastBlock)) blkSha, err := lastBlock.Sha()
if err != nil {
rpcsLog.Errorf("Unknown problem creating block sha: %v", err)
return nil, &btcjson.ErrInternal
}
n := btcws.NewRescanFinishedNtfn(blkSha.String(),
int32(lastBlock.Height()),
lastBlock.MsgBlock().Header.Timestamp.Unix())
if mn, err := n.MarshalJSON(); err != nil { if mn, err := n.MarshalJSON(); err != nil {
rpcsLog.Errorf("Failed to marshal rescan finished "+ rpcsLog.Errorf("Failed to marshal rescan finished "+
"notification: %v", err) "notification: %v", err)