From 594e2ab48a8e309c910c4fee7ce73a529e26aa52 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 28 Jun 2019 15:43:47 -0700 Subject: [PATCH] rpc: extract primary rescan logic into scanBlockChunks helper func --- rpcwebsocket.go | 379 ++++++++++++++++++++++++++---------------------- 1 file changed, 203 insertions(+), 176 deletions(-) diff --git a/rpcwebsocket.go b/rpcwebsocket.go index eddac5ab..85e1eaa3 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -2321,6 +2321,198 @@ func descendantBlock(prevHash *chainhash.Hash, curBlock *btcutil.Block) error { return nil } +// scanBlockChunks executes a rescan in chunked stages. We do this to limit the +// amount of memory that we'll allocate to a given rescan. Every so often, +// we'll send back a rescan progress notification to the websockets client. The +// final block and block hash that we've scanned will be returned. +func scanBlockChunks(wsc *wsClient, cmd *btcjson.RescanCmd, lookups *rescanKeys, minBlock, + maxBlock int32, chain *blockchain.BlockChain) ( + *btcutil.Block, *chainhash.Hash, error) { + + // lastBlock and lastBlockHash track the previously-rescanned block. + // They equal nil when no previous blocks have been rescanned. + var ( + lastBlock *btcutil.Block + lastBlockHash *chainhash.Hash + ) + + // A ticker is created to wait at least 10 seconds before notifying the + // websocket client of the current progress completed by the rescan. + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + // Instead of fetching all block shas at once, fetch in smaller chunks + // to ensure large rescans consume a limited amount of memory. +fetchRange: + for minBlock < maxBlock { + // Limit the max number of hashes to fetch at once to the + // maximum number of items allowed in a single inventory. + // This value could be higher since it's not creating inventory + // messages, but this mirrors the limiting logic used in the + // peer-to-peer protocol. + maxLoopBlock := maxBlock + if maxLoopBlock-minBlock > wire.MaxInvPerMsg { + maxLoopBlock = minBlock + wire.MaxInvPerMsg + } + hashList, err := chain.HeightRange(minBlock, maxLoopBlock) + if err != nil { + rpcsLog.Errorf("Error looking up block range: %v", err) + return nil, nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCDatabase, + Message: "Database error: " + err.Error(), + } + } + if len(hashList) == 0 { + // The rescan is finished if no blocks hashes for this + // range were successfully fetched and a stop block + // was provided. + if maxBlock != math.MaxInt32 { + break + } + + // If the rescan is through the current block, set up + // the client to continue to receive notifications + // regarding all rescanned addresses and the current set + // of unspent outputs. + // + // This is done safely by temporarily grabbing exclusive + // access of the block manager. If no more blocks have + // been attached between this pause and the fetch above, + // then it is safe to register the websocket client for + // continuous notifications if necessary. Otherwise, + // continue the fetch loop again to rescan the new + // blocks (or error due to an irrecoverable reorganize). + pauseGuard := wsc.server.cfg.SyncMgr.Pause() + best := wsc.server.cfg.Chain.BestSnapshot() + curHash := &best.Hash + again := true + if lastBlockHash == nil || *lastBlockHash == *curHash { + again = false + n := wsc.server.ntfnMgr + n.RegisterSpentRequests(wsc, lookups.unspentSlice()) + n.RegisterTxOutAddressRequests(wsc, cmd.Addresses) + } + close(pauseGuard) + if err != nil { + rpcsLog.Errorf("Error fetching best block "+ + "hash: %v", err) + return nil, nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCDatabase, + Message: "Database error: " + + err.Error(), + } + } + if again { + continue + } + break + } + + loopHashList: + for i := range hashList { + blk, err := chain.BlockByHash(&hashList[i]) + if err != nil { + // Only handle reorgs if a block could not be + // found for the hash. + if dbErr, ok := err.(database.Error); !ok || + dbErr.ErrorCode != database.ErrBlockNotFound { + + rpcsLog.Errorf("Error looking up "+ + "block: %v", err) + return nil, nil, &btcjson.RPCError{ + Code: btcjson.ErrRPCDatabase, + Message: "Database error: " + + err.Error(), + } + } + + // If an absolute max block was specified, don't + // attempt to handle the reorg. + if maxBlock != math.MaxInt32 { + rpcsLog.Errorf("Stopping rescan for "+ + "reorged block %v", + cmd.EndBlock) + return nil, nil, &ErrRescanReorg + } + + // If the lookup for the previously valid block + // hash failed, there may have been a reorg. + // Fetch a new range of block hashes and verify + // that the previously processed block (if there + // was any) still exists in the database. If it + // doesn't, we error. + // + // A goto is used to branch executation back to + // before the range was evaluated, as it must be + // reevaluated for the new hashList. + minBlock += int32(i) + hashList, err = recoverFromReorg( + chain, minBlock, maxBlock, lastBlockHash, + ) + if err != nil { + return nil, nil, err + } + if len(hashList) == 0 { + break fetchRange + } + goto loopHashList + } + if i == 0 && lastBlockHash != nil { + // Ensure the new hashList is on the same fork + // as the last block from the old hashList. + jsonErr := descendantBlock(lastBlockHash, blk) + if jsonErr != nil { + return nil, nil, jsonErr + } + } + + // A select statement is used to stop rescans if the + // client requesting the rescan has disconnected. + select { + case <-wsc.quit: + rpcsLog.Debugf("Stopped rescan at height %v "+ + "for disconnected client", blk.Height()) + return nil, nil, nil + default: + rescanBlock(wsc, lookups, blk) + lastBlock = blk + lastBlockHash = blk.Hash() + } + + // Periodically notify the client of the progress + // completed. Continue with next block if no progress + // notification is needed yet. + select { + case <-ticker.C: // fallthrough + default: + continue + } + + n := btcjson.NewRescanProgressNtfn( + hashList[i].String(), blk.Height(), + blk.MsgBlock().Header.Timestamp.Unix(), + ) + mn, err := btcjson.MarshalCmd(nil, n) + if err != nil { + rpcsLog.Errorf("Failed to marshal rescan "+ + "progress notification: %v", err) + continue + } + + if err = wsc.QueueNotification(mn); err == ErrClientQuit { + // Finished if the client disconnected. + rpcsLog.Debugf("Stopped rescan at height %v "+ + "for disconnected client", blk.Height()) + return nil, nil, nil + } + } + + minBlock += int32(len(hashList)) + } + + return lastBlock, lastBlockHash, nil +} + // handleRescan implements the rescan command extension for websocket // connections. // @@ -2396,181 +2588,15 @@ func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) { } } - // lastBlock and lastBlockHash track the previously-rescanned block. - // They equal nil when no previous blocks have been rescanned. - var lastBlock *btcutil.Block - var lastBlockHash *chainhash.Hash - - // A ticker is created to wait at least 10 seconds before notifying the - // websocket client of the current progress completed by the rescan. - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - // Instead of fetching all block shas at once, fetch in smaller chunks - // to ensure large rescans consume a limited amount of memory. -fetchRange: - for minBlock < maxBlock { - // Limit the max number of hashes to fetch at once to the - // maximum number of items allowed in a single inventory. - // This value could be higher since it's not creating inventory - // messages, but this mirrors the limiting logic used in the - // peer-to-peer protocol. - maxLoopBlock := maxBlock - if maxLoopBlock-minBlock > wire.MaxInvPerMsg { - maxLoopBlock = minBlock + wire.MaxInvPerMsg - } - hashList, err := chain.HeightRange(minBlock, maxLoopBlock) + // With all the arguments parsed, we'll execute our chunked rescan + // which will notify the clients of any address deposits or output + // spends. + lastBlock, lastBlockHash, err = scanBlockChunks( + wsc, cmd, &lookups, minBlock, maxBlock, chain, + ) if err != nil { - rpcsLog.Errorf("Error looking up block range: %v", err) - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCDatabase, - Message: "Database error: " + err.Error(), - } + return nil, err } - if len(hashList) == 0 { - // The rescan is finished if no blocks hashes for this - // range were successfully fetched and a stop block - // was provided. - if maxBlock != math.MaxInt32 { - break - } - - // If the rescan is through the current block, set up - // the client to continue to receive notifications - // regarding all rescanned addresses and the current set - // of unspent outputs. - // - // This is done safely by temporarily grabbing exclusive - // access of the block manager. If no more blocks have - // been attached between this pause and the fetch above, - // then it is safe to register the websocket client for - // continuous notifications if necessary. Otherwise, - // continue the fetch loop again to rescan the new - // blocks (or error due to an irrecoverable reorganize). - pauseGuard := wsc.server.cfg.SyncMgr.Pause() - best := wsc.server.cfg.Chain.BestSnapshot() - curHash := &best.Hash - again := true - if lastBlockHash == nil || *lastBlockHash == *curHash { - again = false - n := wsc.server.ntfnMgr - n.RegisterSpentRequests(wsc, lookups.unspentSlice()) - n.RegisterTxOutAddressRequests(wsc, cmd.Addresses) - } - close(pauseGuard) - if err != nil { - rpcsLog.Errorf("Error fetching best block "+ - "hash: %v", err) - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCDatabase, - Message: "Database error: " + - err.Error(), - } - } - if again { - continue - } - break - } - - loopHashList: - for i := range hashList { - blk, err := chain.BlockByHash(&hashList[i]) - if err != nil { - // Only handle reorgs if a block could not be - // found for the hash. - if dbErr, ok := err.(database.Error); !ok || - dbErr.ErrorCode != database.ErrBlockNotFound { - - rpcsLog.Errorf("Error looking up "+ - "block: %v", err) - return nil, &btcjson.RPCError{ - Code: btcjson.ErrRPCDatabase, - Message: "Database error: " + - err.Error(), - } - } - - // If an absolute max block was specified, don't - // attempt to handle the reorg. - if maxBlock != math.MaxInt32 { - rpcsLog.Errorf("Stopping rescan for "+ - "reorged block %v", - cmd.EndBlock) - return nil, &ErrRescanReorg - } - - // If the lookup for the previously valid block - // hash failed, there may have been a reorg. - // Fetch a new range of block hashes and verify - // that the previously processed block (if there - // was any) still exists in the database. If it - // doesn't, we error. - // - // A goto is used to branch executation back to - // before the range was evaluated, as it must be - // reevaluated for the new hashList. - minBlock += int32(i) - hashList, err = recoverFromReorg(chain, - minBlock, maxBlock, lastBlockHash) - if err != nil { - return nil, err - } - if len(hashList) == 0 { - break fetchRange - } - goto loopHashList - } - if i == 0 && lastBlockHash != nil { - // Ensure the new hashList is on the same fork - // as the last block from the old hashList. - jsonErr := descendantBlock(lastBlockHash, blk) - if jsonErr != nil { - return nil, jsonErr - } - } - - // A select statement is used to stop rescans if the - // client requesting the rescan has disconnected. - select { - case <-wsc.quit: - rpcsLog.Debugf("Stopped rescan at height %v "+ - "for disconnected client", blk.Height()) - return nil, nil - default: - rescanBlock(wsc, &lookups, blk) - lastBlock = blk - lastBlockHash = blk.Hash() - } - - // Periodically notify the client of the progress - // completed. Continue with next block if no progress - // notification is needed yet. - select { - case <-ticker.C: // fallthrough - default: - continue - } - - n := btcjson.NewRescanProgressNtfn(hashList[i].String(), - blk.Height(), blk.MsgBlock().Header.Timestamp.Unix()) - mn, err := btcjson.MarshalCmd(nil, n) - if err != nil { - rpcsLog.Errorf("Failed to marshal rescan "+ - "progress notification: %v", err) - continue - } - - if err = wsc.QueueNotification(mn); err == ErrClientQuit { - // Finished if the client disconnected. - rpcsLog.Debugf("Stopped rescan at height %v "+ - "for disconnected client", blk.Height()) - return nil, nil - } - } - - minBlock += int32(len(hashList)) - } // Notify websocket client of the finished rescan. Due to how btcd // asynchronously queues notifications to not block calling code, @@ -2579,9 +2605,10 @@ fetchRange: // received before the rescan RPC returns. Therefore, another method // is needed to safely inform clients that all rescan notifications have // been sent. - n := btcjson.NewRescanFinishedNtfn(lastBlockHash.String(), - lastBlock.Height(), - lastBlock.MsgBlock().Header.Timestamp.Unix()) + n := btcjson.NewRescanFinishedNtfn( + lastBlockHash.String(), lastBlock.Height(), + lastBlock.MsgBlock().Header.Timestamp.Unix(), + ) if mn, err := btcjson.MarshalCmd(nil, n); err != nil { rpcsLog.Errorf("Failed to marshal rescan finished "+ "notification: %v", err)