From ddc841924bed49e057faa3ead34eec0bf3e5f431 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 4 May 2017 19:42:40 -0600 Subject: [PATCH] Add TXIDs to rescan. Fix/finish EndBlock. Disable testRandomBlocks. --- spvsvc/spvchain/rescan.go | 149 +++++++++++++++++++++-------------- spvsvc/spvchain/sync_test.go | 55 +++++++++++-- 2 files changed, 138 insertions(+), 66 deletions(-) diff --git a/spvsvc/spvchain/rescan.go b/spvsvc/spvchain/rescan.go index b83fc1f..7ac5038 100644 --- a/spvsvc/spvchain/rescan.go +++ b/spvsvc/spvchain/rescan.go @@ -28,6 +28,7 @@ type rescanOptions struct { endBlock *waddrmgr.BlockStamp watchAddrs []btcutil.Address watchOutPoints []wire.OutPoint + watchTXIDs []chainhash.Hash quit <-chan struct{} } @@ -68,12 +69,13 @@ func StartBlock(startBlock *waddrmgr.BlockStamp) RescanOption { // EndBlock specifies the end block. The hash is checked first; if there's no // such hash (zero hash avoids lookup), the height is checked next. If the -// height is 0 or the end block isn't specified, the quit channel MUST be -// specified as Rescan will sync to the tip of the blockchain and continue to -// stay in sync and pass notifications. This is enforced at runtime. -func EndBlock(startBlock *waddrmgr.BlockStamp) RescanOption { +// height is 0 or in the future or the end block isn't specified, the quit +// channel MUST be specified as Rescan will sync to the tip of the blockchain +// and continue to stay in sync and pass notifications. This is enforced at +// runtime. +func EndBlock(endBlock *waddrmgr.BlockStamp) RescanOption { return func(ro *rescanOptions) { - ro.startBlock = startBlock + ro.endBlock = endBlock } } @@ -96,6 +98,15 @@ func WatchOutPoints(watchOutPoints ...wire.OutPoint) RescanOption { } } +// WatchTXIDs specifies the outpoints to watch for on-chain spends. Each +// call to this function adds to the list of outpoints being watched rather +// than replacing the list. +func WatchTXIDs(watchTXIDs ...chainhash.Hash) RescanOption { + return func(ro *rescanOptions) { + ro.watchTXIDs = append(ro.watchTXIDs, watchTXIDs...) + } +} + // QuitChan specifies the quit channel. This can be used by the caller to let // an indefinite rescan (one with no EndBlock set) know it should gracefully // shut down. If this isn't specified, an end block MUST be specified as Rescan @@ -120,41 +131,39 @@ func (s *ChainService) Rescan(options ...RescanOption) error { var watchList [][]byte // If we have something to watch, create a watch list. - if len(ro.watchAddrs) != 0 || len(ro.watchOutPoints) != 0 { - for _, addr := range ro.watchAddrs { - watchList = append(watchList, addr.ScriptAddress()) - } - for _, op := range ro.watchOutPoints { - watchList = append(watchList, - builder.OutPointToFilterEntry(op)) - } - } else { + for _, addr := range ro.watchAddrs { + watchList = append(watchList, addr.ScriptAddress()) + } + for _, op := range ro.watchOutPoints { + watchList = append(watchList, + builder.OutPointToFilterEntry(op)) + } + for _, txid := range ro.watchTXIDs { + watchList = append(watchList, txid[:]) + } + if len(watchList) == 0 { return fmt.Errorf("Rescan must specify addresses and/or " + - "outpoints to watch") + "outpoints and/or TXIDs to watch") } // Check that we have either an end block or a quit channel. if ro.endBlock != nil { - if (ro.endBlock.Hash == chainhash.Hash{}) { - ro.endBlock.Height = 0 - } else { - _, height, err := s.GetBlockByHash( - ro.endBlock.Hash) + if (ro.endBlock.Hash != chainhash.Hash{}) { + _, height, err := s.GetBlockByHash(ro.endBlock.Hash) if err != nil { - ro.endBlock.Height = int32(height) + ro.endBlock.Hash = chainhash.Hash{} } else { - if height == 0 { - ro.endBlock.Hash = chainhash.Hash{} + ro.endBlock.Height = int32(height) + } + } + if (ro.endBlock.Hash == chainhash.Hash{}) { + if ro.endBlock.Height != 0 { + header, err := s.GetBlockByHeight( + uint32(ro.endBlock.Height)) + if err == nil { + ro.endBlock.Hash = header.BlockHash() } else { - header, err := - s.GetBlockByHeight(height) - if err == nil { - ro.endBlock.Hash = - header.BlockHash() - } else { - ro.endBlock = - &waddrmgr.BlockStamp{} - } + ro.endBlock = &waddrmgr.BlockStamp{} } } } @@ -298,48 +307,66 @@ rescanLoop: // get the basic filter from the DB or network. var block *btcutil.Block var relevantTxs []*btcutil.Tx - filter := s.GetCFilter(curStamp.Hash, false) - // If we have no transactions, we send a notification - if filter != nil && filter.N() != 0 { + var bFilter, eFilter *gcs.Filter + var err error + key := builder.DeriveKey(&curStamp.Hash) + matched := false + bFilter = s.GetCFilter(curStamp.Hash, false) + if bFilter != nil && bFilter.N() != 0 { // We see if any relevant transactions match. - key := builder.DeriveKey(&curStamp.Hash) - matched, err := filter.MatchAny(key, watchList) + matched, err = bFilter.MatchAny(key, watchList) if err != nil { return err } - if matched { - // We've matched. Now we actually get the block - // and cycle through the transactions to see - // which ones are relevant. - block = s.GetBlockFromNetwork( - curStamp.Hash, ro.queryOptions...) - if block == nil { - return fmt.Errorf("Couldn't get block "+ - "%d (%s)", curStamp.Height, - curStamp.Hash) - } - relevantTxs, err = notifyBlock(block, filter, - &ro.watchOutPoints, ro.watchAddrs, - &watchList, ro.ntfn) - if err != nil { - return err - } + } + if len(ro.watchTXIDs) > 0 { + eFilter = s.GetCFilter(curStamp.Hash, true) + } + if eFilter != nil && eFilter.N() != 0 { + // We see if any relevant transactions match. + matched, err = eFilter.MatchAny(key, watchList) + if err != nil { + return err + } + } + // If we have no transactions, we just send an + // OnFilteredBlockConnected notification with no relevant + // transactions. + if matched { + // We've matched. Now we actually get the block + // and cycle through the transactions to see + // which ones are relevant. + block = s.GetBlockFromNetwork( + curStamp.Hash, ro.queryOptions...) + if block == nil { + return fmt.Errorf("Couldn't get block "+ + "%d (%s)", curStamp.Height, + curStamp.Hash) + } + relevantTxs, err = notifyBlock(block, + &ro.watchOutPoints, ro.watchAddrs, + ro.watchTXIDs, &watchList, ro.ntfn) + if err != nil { + return err } } if ro.ntfn.OnFilteredBlockConnected != nil { ro.ntfn.OnFilteredBlockConnected(curStamp.Height, &curHeader, relevantTxs) } + if curStamp.Hash == ro.endBlock.Hash || curStamp.Height == + ro.endBlock.Height { + return nil + } } } // notifyBlock notifies listeners based on the block filter. It writes back to // the outPoints argument the updated list of outpoints to monitor based on // matched addresses. -func notifyBlock(block *btcutil.Block, filter *gcs.Filter, - outPoints *[]wire.OutPoint, addrs []btcutil.Address, - watchList *[][]byte, ntfn btcrpcclient.NotificationHandlers) ( - []*btcutil.Tx, error) { +func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, + addrs []btcutil.Address, txids []chainhash.Hash, watchList *[][]byte, + ntfn btcrpcclient.NotificationHandlers) ([]*btcutil.Tx, error) { var relevantTxs []*btcutil.Tx blockHeader := block.MsgBlock().Header details := btcjson.BlockDetails{ @@ -351,6 +378,12 @@ func notifyBlock(block *btcutil.Block, filter *gcs.Filter, relevant := false txDetails := details txDetails.Index = txIdx + for _, hash := range txids { + if hash == *(tx.Hash()) { + relevant = true + break + } + } for _, in := range tx.MsgTx().TxIn { if relevant { break diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go index c5356d0..9863332 100644 --- a/spvsvc/spvchain/sync_test.go +++ b/spvsvc/spvchain/sync_test.go @@ -343,10 +343,12 @@ func TestSetup(t *testing.T) { // Test that we can get blocks and cfilters via P2P and decide which are // valid and which aren't. - err = testRandomBlocks(t, svc, h1) + // TODO: This test is disabled until I factor it out into a benchmark. + // Otherwise, it takes too long. + /*err = testRandomBlocks(t, svc, h1) if err != nil { t.Fatalf("Testing blocks and cfilters failed: %s", err) - } + }*/ // Generate an address and send it some coins on the h1 chain. We use // this to test rescans and notifications. @@ -389,13 +391,54 @@ func TestSetup(t *testing.T) { if err != nil { t.Fatalf("Couldn't generate/submit block: %s", err) } + err = waitForSync(t, svc, h1) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %s", err) + } + // Do a rescan that searches only for a specific TXID + startBlock := waddrmgr.BlockStamp{Height: 795} + endBlock := waddrmgr.BlockStamp{Height: 801} + var foundTx *btcutil.Tx + err = svc.Rescan( + spvchain.StartBlock(&startBlock), + spvchain.EndBlock(&endBlock), + spvchain.WatchTXIDs(tx1.TxHash()), + spvchain.NotificationHandlers(btcrpcclient.NotificationHandlers{ + OnFilteredBlockConnected: func(height int32, + header *wire.BlockHeader, + relevantTxs []*btcutil.Tx) { + if height == 801 { + if len(relevantTxs) != 1 { + t.Fatalf("Didn't get expected "+ + "number of relevant "+ + "transactions from "+ + "rescan: want 1, got "+ + "%d", len(relevantTxs)) + } + if *(relevantTxs[0].Hash()) != + tx1.TxHash() { + t.Fatalf("Didn't get expected "+ + "relevant transaction:"+ + " want %s, got %s", + tx1.TxHash(), + relevantTxs[0].Hash()) + } + foundTx = relevantTxs[0] + } + }, + }), + ) + if err != nil || foundTx == nil || *(foundTx.Hash()) != tx1.TxHash() { + t.Fatalf("Couldn't rescan chain for transaction %s: %s", + tx1.TxHash(), err) + } // Start a rescan with notifications in another goroutine. We'll kill // it with a quit channel at the end and make sure we got the expected // results. quitRescan := make(chan struct{}) - startBlock := &waddrmgr.BlockStamp{Height: 795} - err = startRescan(t, svc, addr1, startBlock, quitRescan) + startBlock = waddrmgr.BlockStamp{Height: 795} + err = startRescan(t, svc, addr1, &startBlock, quitRescan) if err != nil { t.Fatalf("Couldn't start a rescan for %s: %s", addr1, err) } @@ -657,10 +700,6 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService, return fmt.Errorf("Couldn't get best snapshot from "+ "ChainService: %s", err) } - if logLevel != btclog.Off { - t.Logf("Synced to %d (%s)", haveBest.Height, - haveBest.Hash) - } } // Check if we're current. if !svc.IsCurrent() {