rpc: extract primary rescan logic into scanBlockChunks helper func

This commit is contained in:
Olaoluwa Osuntokun 2019-06-28 15:43:47 -07:00
parent 962a206e94
commit 594e2ab48a

View file

@ -2321,6 +2321,198 @@ func descendantBlock(prevHash *chainhash.Hash, curBlock *btcutil.Block) error {
return nil
}
// scanBlockChunks executes a rescan in chunked stages. We do this to limit the
// amount of memory that we'll allocate to a given rescan. Every so often,
// we'll send back a rescan progress notification to the websockets client. The
// final block and block hash that we've scanned will be returned.
func scanBlockChunks(wsc *wsClient, cmd *btcjson.RescanCmd, lookups *rescanKeys, minBlock,
maxBlock int32, chain *blockchain.BlockChain) (
*btcutil.Block, *chainhash.Hash, error) {
// lastBlock and lastBlockHash track the previously-rescanned block.
// They equal nil when no previous blocks have been rescanned.
var (
lastBlock *btcutil.Block
lastBlockHash *chainhash.Hash
)
// A ticker is created to wait at least 10 seconds before notifying the
// websocket client of the current progress completed by the rescan.
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
// Instead of fetching all block shas at once, fetch in smaller chunks
// to ensure large rescans consume a limited amount of memory.
fetchRange:
for minBlock < maxBlock {
// Limit the max number of hashes to fetch at once to the
// maximum number of items allowed in a single inventory.
// This value could be higher since it's not creating inventory
// messages, but this mirrors the limiting logic used in the
// peer-to-peer protocol.
maxLoopBlock := maxBlock
if maxLoopBlock-minBlock > wire.MaxInvPerMsg {
maxLoopBlock = minBlock + wire.MaxInvPerMsg
}
hashList, err := chain.HeightRange(minBlock, maxLoopBlock)
if err != nil {
rpcsLog.Errorf("Error looking up block range: %v", err)
return nil, nil, &btcjson.RPCError{
Code: btcjson.ErrRPCDatabase,
Message: "Database error: " + err.Error(),
}
}
if len(hashList) == 0 {
// The rescan is finished if no blocks hashes for this
// range were successfully fetched and a stop block
// was provided.
if maxBlock != math.MaxInt32 {
break
}
// If the rescan is through the current block, set up
// the client to continue to receive notifications
// regarding all rescanned addresses and the current set
// of unspent outputs.
//
// This is done safely by temporarily grabbing exclusive
// access of the block manager. If no more blocks have
// been attached between this pause and the fetch above,
// then it is safe to register the websocket client for
// continuous notifications if necessary. Otherwise,
// continue the fetch loop again to rescan the new
// blocks (or error due to an irrecoverable reorganize).
pauseGuard := wsc.server.cfg.SyncMgr.Pause()
best := wsc.server.cfg.Chain.BestSnapshot()
curHash := &best.Hash
again := true
if lastBlockHash == nil || *lastBlockHash == *curHash {
again = false
n := wsc.server.ntfnMgr
n.RegisterSpentRequests(wsc, lookups.unspentSlice())
n.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
}
close(pauseGuard)
if err != nil {
rpcsLog.Errorf("Error fetching best block "+
"hash: %v", err)
return nil, nil, &btcjson.RPCError{
Code: btcjson.ErrRPCDatabase,
Message: "Database error: " +
err.Error(),
}
}
if again {
continue
}
break
}
loopHashList:
for i := range hashList {
blk, err := chain.BlockByHash(&hashList[i])
if err != nil {
// Only handle reorgs if a block could not be
// found for the hash.
if dbErr, ok := err.(database.Error); !ok ||
dbErr.ErrorCode != database.ErrBlockNotFound {
rpcsLog.Errorf("Error looking up "+
"block: %v", err)
return nil, nil, &btcjson.RPCError{
Code: btcjson.ErrRPCDatabase,
Message: "Database error: " +
err.Error(),
}
}
// If an absolute max block was specified, don't
// attempt to handle the reorg.
if maxBlock != math.MaxInt32 {
rpcsLog.Errorf("Stopping rescan for "+
"reorged block %v",
cmd.EndBlock)
return nil, nil, &ErrRescanReorg
}
// If the lookup for the previously valid block
// hash failed, there may have been a reorg.
// Fetch a new range of block hashes and verify
// that the previously processed block (if there
// was any) still exists in the database. If it
// doesn't, we error.
//
// A goto is used to branch executation back to
// before the range was evaluated, as it must be
// reevaluated for the new hashList.
minBlock += int32(i)
hashList, err = recoverFromReorg(
chain, minBlock, maxBlock, lastBlockHash,
)
if err != nil {
return nil, nil, err
}
if len(hashList) == 0 {
break fetchRange
}
goto loopHashList
}
if i == 0 && lastBlockHash != nil {
// Ensure the new hashList is on the same fork
// as the last block from the old hashList.
jsonErr := descendantBlock(lastBlockHash, blk)
if jsonErr != nil {
return nil, nil, jsonErr
}
}
// A select statement is used to stop rescans if the
// client requesting the rescan has disconnected.
select {
case <-wsc.quit:
rpcsLog.Debugf("Stopped rescan at height %v "+
"for disconnected client", blk.Height())
return nil, nil, nil
default:
rescanBlock(wsc, lookups, blk)
lastBlock = blk
lastBlockHash = blk.Hash()
}
// Periodically notify the client of the progress
// completed. Continue with next block if no progress
// notification is needed yet.
select {
case <-ticker.C: // fallthrough
default:
continue
}
n := btcjson.NewRescanProgressNtfn(
hashList[i].String(), blk.Height(),
blk.MsgBlock().Header.Timestamp.Unix(),
)
mn, err := btcjson.MarshalCmd(nil, n)
if err != nil {
rpcsLog.Errorf("Failed to marshal rescan "+
"progress notification: %v", err)
continue
}
if err = wsc.QueueNotification(mn); err == ErrClientQuit {
// Finished if the client disconnected.
rpcsLog.Debugf("Stopped rescan at height %v "+
"for disconnected client", blk.Height())
return nil, nil, nil
}
}
minBlock += int32(len(hashList))
}
return lastBlock, lastBlockHash, nil
}
// handleRescan implements the rescan command extension for websocket
// connections.
//
@ -2396,181 +2588,15 @@ func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) {
}
}
// lastBlock and lastBlockHash track the previously-rescanned block.
// They equal nil when no previous blocks have been rescanned.
var lastBlock *btcutil.Block
var lastBlockHash *chainhash.Hash
// A ticker is created to wait at least 10 seconds before notifying the
// websocket client of the current progress completed by the rescan.
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
// Instead of fetching all block shas at once, fetch in smaller chunks
// to ensure large rescans consume a limited amount of memory.
fetchRange:
for minBlock < maxBlock {
// Limit the max number of hashes to fetch at once to the
// maximum number of items allowed in a single inventory.
// This value could be higher since it's not creating inventory
// messages, but this mirrors the limiting logic used in the
// peer-to-peer protocol.
maxLoopBlock := maxBlock
if maxLoopBlock-minBlock > wire.MaxInvPerMsg {
maxLoopBlock = minBlock + wire.MaxInvPerMsg
}
hashList, err := chain.HeightRange(minBlock, maxLoopBlock)
// With all the arguments parsed, we'll execute our chunked rescan
// which will notify the clients of any address deposits or output
// spends.
lastBlock, lastBlockHash, err = scanBlockChunks(
wsc, cmd, &lookups, minBlock, maxBlock, chain,
)
if err != nil {
rpcsLog.Errorf("Error looking up block range: %v", err)
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCDatabase,
Message: "Database error: " + err.Error(),
}
return nil, err
}
if len(hashList) == 0 {
// The rescan is finished if no blocks hashes for this
// range were successfully fetched and a stop block
// was provided.
if maxBlock != math.MaxInt32 {
break
}
// If the rescan is through the current block, set up
// the client to continue to receive notifications
// regarding all rescanned addresses and the current set
// of unspent outputs.
//
// This is done safely by temporarily grabbing exclusive
// access of the block manager. If no more blocks have
// been attached between this pause and the fetch above,
// then it is safe to register the websocket client for
// continuous notifications if necessary. Otherwise,
// continue the fetch loop again to rescan the new
// blocks (or error due to an irrecoverable reorganize).
pauseGuard := wsc.server.cfg.SyncMgr.Pause()
best := wsc.server.cfg.Chain.BestSnapshot()
curHash := &best.Hash
again := true
if lastBlockHash == nil || *lastBlockHash == *curHash {
again = false
n := wsc.server.ntfnMgr
n.RegisterSpentRequests(wsc, lookups.unspentSlice())
n.RegisterTxOutAddressRequests(wsc, cmd.Addresses)
}
close(pauseGuard)
if err != nil {
rpcsLog.Errorf("Error fetching best block "+
"hash: %v", err)
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCDatabase,
Message: "Database error: " +
err.Error(),
}
}
if again {
continue
}
break
}
loopHashList:
for i := range hashList {
blk, err := chain.BlockByHash(&hashList[i])
if err != nil {
// Only handle reorgs if a block could not be
// found for the hash.
if dbErr, ok := err.(database.Error); !ok ||
dbErr.ErrorCode != database.ErrBlockNotFound {
rpcsLog.Errorf("Error looking up "+
"block: %v", err)
return nil, &btcjson.RPCError{
Code: btcjson.ErrRPCDatabase,
Message: "Database error: " +
err.Error(),
}
}
// If an absolute max block was specified, don't
// attempt to handle the reorg.
if maxBlock != math.MaxInt32 {
rpcsLog.Errorf("Stopping rescan for "+
"reorged block %v",
cmd.EndBlock)
return nil, &ErrRescanReorg
}
// If the lookup for the previously valid block
// hash failed, there may have been a reorg.
// Fetch a new range of block hashes and verify
// that the previously processed block (if there
// was any) still exists in the database. If it
// doesn't, we error.
//
// A goto is used to branch executation back to
// before the range was evaluated, as it must be
// reevaluated for the new hashList.
minBlock += int32(i)
hashList, err = recoverFromReorg(chain,
minBlock, maxBlock, lastBlockHash)
if err != nil {
return nil, err
}
if len(hashList) == 0 {
break fetchRange
}
goto loopHashList
}
if i == 0 && lastBlockHash != nil {
// Ensure the new hashList is on the same fork
// as the last block from the old hashList.
jsonErr := descendantBlock(lastBlockHash, blk)
if jsonErr != nil {
return nil, jsonErr
}
}
// A select statement is used to stop rescans if the
// client requesting the rescan has disconnected.
select {
case <-wsc.quit:
rpcsLog.Debugf("Stopped rescan at height %v "+
"for disconnected client", blk.Height())
return nil, nil
default:
rescanBlock(wsc, &lookups, blk)
lastBlock = blk
lastBlockHash = blk.Hash()
}
// Periodically notify the client of the progress
// completed. Continue with next block if no progress
// notification is needed yet.
select {
case <-ticker.C: // fallthrough
default:
continue
}
n := btcjson.NewRescanProgressNtfn(hashList[i].String(),
blk.Height(), blk.MsgBlock().Header.Timestamp.Unix())
mn, err := btcjson.MarshalCmd(nil, n)
if err != nil {
rpcsLog.Errorf("Failed to marshal rescan "+
"progress notification: %v", err)
continue
}
if err = wsc.QueueNotification(mn); err == ErrClientQuit {
// Finished if the client disconnected.
rpcsLog.Debugf("Stopped rescan at height %v "+
"for disconnected client", blk.Height())
return nil, nil
}
}
minBlock += int32(len(hashList))
}
// Notify websocket client of the finished rescan. Due to how btcd
// asynchronously queues notifications to not block calling code,
@ -2579,9 +2605,10 @@ fetchRange:
// received before the rescan RPC returns. Therefore, another method
// is needed to safely inform clients that all rescan notifications have
// been sent.
n := btcjson.NewRescanFinishedNtfn(lastBlockHash.String(),
lastBlock.Height(),
lastBlock.MsgBlock().Header.Timestamp.Unix())
n := btcjson.NewRescanFinishedNtfn(
lastBlockHash.String(), lastBlock.Height(),
lastBlock.MsgBlock().Header.Timestamp.Unix(),
)
if mn, err := btcjson.MarshalCmd(nil, n); err != nil {
rpcsLog.Errorf("Failed to marshal rescan finished "+
"notification: %v", err)