Add TXIDs to rescan. Fix/finish EndBlock. Disable testRandomBlocks.

This commit is contained in:
Alex 2017-05-04 19:42:40 -06:00 committed by Olaoluwa Osuntokun
parent e7bae84662
commit ddc841924b
2 changed files with 138 additions and 66 deletions

View file

@ -28,6 +28,7 @@ type rescanOptions struct {
endBlock *waddrmgr.BlockStamp
watchAddrs []btcutil.Address
watchOutPoints []wire.OutPoint
watchTXIDs []chainhash.Hash
quit <-chan struct{}
}
@ -68,12 +69,13 @@ func StartBlock(startBlock *waddrmgr.BlockStamp) RescanOption {
// EndBlock specifies the end block. The hash is checked first; if there's no
// such hash (zero hash avoids lookup), the height is checked next. If the
// height is 0 or the end block isn't specified, the quit channel MUST be
// specified as Rescan will sync to the tip of the blockchain and continue to
// stay in sync and pass notifications. This is enforced at runtime.
func EndBlock(startBlock *waddrmgr.BlockStamp) RescanOption {
// height is 0 or in the future or the end block isn't specified, the quit
// channel MUST be specified as Rescan will sync to the tip of the blockchain
// and continue to stay in sync and pass notifications. This is enforced at
// runtime.
func EndBlock(endBlock *waddrmgr.BlockStamp) RescanOption {
return func(ro *rescanOptions) {
ro.startBlock = startBlock
ro.endBlock = endBlock
}
}
@ -96,6 +98,15 @@ func WatchOutPoints(watchOutPoints ...wire.OutPoint) RescanOption {
}
}
// WatchTXIDs specifies the outpoints to watch for on-chain spends. Each
// call to this function adds to the list of outpoints being watched rather
// than replacing the list.
func WatchTXIDs(watchTXIDs ...chainhash.Hash) RescanOption {
return func(ro *rescanOptions) {
ro.watchTXIDs = append(ro.watchTXIDs, watchTXIDs...)
}
}
// QuitChan specifies the quit channel. This can be used by the caller to let
// an indefinite rescan (one with no EndBlock set) know it should gracefully
// shut down. If this isn't specified, an end block MUST be specified as Rescan
@ -120,41 +131,39 @@ func (s *ChainService) Rescan(options ...RescanOption) error {
var watchList [][]byte
// If we have something to watch, create a watch list.
if len(ro.watchAddrs) != 0 || len(ro.watchOutPoints) != 0 {
for _, addr := range ro.watchAddrs {
watchList = append(watchList, addr.ScriptAddress())
}
for _, op := range ro.watchOutPoints {
watchList = append(watchList,
builder.OutPointToFilterEntry(op))
}
} else {
for _, addr := range ro.watchAddrs {
watchList = append(watchList, addr.ScriptAddress())
}
for _, op := range ro.watchOutPoints {
watchList = append(watchList,
builder.OutPointToFilterEntry(op))
}
for _, txid := range ro.watchTXIDs {
watchList = append(watchList, txid[:])
}
if len(watchList) == 0 {
return fmt.Errorf("Rescan must specify addresses and/or " +
"outpoints to watch")
"outpoints and/or TXIDs to watch")
}
// Check that we have either an end block or a quit channel.
if ro.endBlock != nil {
if (ro.endBlock.Hash == chainhash.Hash{}) {
ro.endBlock.Height = 0
} else {
_, height, err := s.GetBlockByHash(
ro.endBlock.Hash)
if (ro.endBlock.Hash != chainhash.Hash{}) {
_, height, err := s.GetBlockByHash(ro.endBlock.Hash)
if err != nil {
ro.endBlock.Height = int32(height)
ro.endBlock.Hash = chainhash.Hash{}
} else {
if height == 0 {
ro.endBlock.Hash = chainhash.Hash{}
ro.endBlock.Height = int32(height)
}
}
if (ro.endBlock.Hash == chainhash.Hash{}) {
if ro.endBlock.Height != 0 {
header, err := s.GetBlockByHeight(
uint32(ro.endBlock.Height))
if err == nil {
ro.endBlock.Hash = header.BlockHash()
} else {
header, err :=
s.GetBlockByHeight(height)
if err == nil {
ro.endBlock.Hash =
header.BlockHash()
} else {
ro.endBlock =
&waddrmgr.BlockStamp{}
}
ro.endBlock = &waddrmgr.BlockStamp{}
}
}
}
@ -298,48 +307,66 @@ rescanLoop:
// get the basic filter from the DB or network.
var block *btcutil.Block
var relevantTxs []*btcutil.Tx
filter := s.GetCFilter(curStamp.Hash, false)
// If we have no transactions, we send a notification
if filter != nil && filter.N() != 0 {
var bFilter, eFilter *gcs.Filter
var err error
key := builder.DeriveKey(&curStamp.Hash)
matched := false
bFilter = s.GetCFilter(curStamp.Hash, false)
if bFilter != nil && bFilter.N() != 0 {
// We see if any relevant transactions match.
key := builder.DeriveKey(&curStamp.Hash)
matched, err := filter.MatchAny(key, watchList)
matched, err = bFilter.MatchAny(key, watchList)
if err != nil {
return err
}
if matched {
// We've matched. Now we actually get the block
// and cycle through the transactions to see
// which ones are relevant.
block = s.GetBlockFromNetwork(
curStamp.Hash, ro.queryOptions...)
if block == nil {
return fmt.Errorf("Couldn't get block "+
"%d (%s)", curStamp.Height,
curStamp.Hash)
}
relevantTxs, err = notifyBlock(block, filter,
&ro.watchOutPoints, ro.watchAddrs,
&watchList, ro.ntfn)
if err != nil {
return err
}
}
if len(ro.watchTXIDs) > 0 {
eFilter = s.GetCFilter(curStamp.Hash, true)
}
if eFilter != nil && eFilter.N() != 0 {
// We see if any relevant transactions match.
matched, err = eFilter.MatchAny(key, watchList)
if err != nil {
return err
}
}
// If we have no transactions, we just send an
// OnFilteredBlockConnected notification with no relevant
// transactions.
if matched {
// We've matched. Now we actually get the block
// and cycle through the transactions to see
// which ones are relevant.
block = s.GetBlockFromNetwork(
curStamp.Hash, ro.queryOptions...)
if block == nil {
return fmt.Errorf("Couldn't get block "+
"%d (%s)", curStamp.Height,
curStamp.Hash)
}
relevantTxs, err = notifyBlock(block,
&ro.watchOutPoints, ro.watchAddrs,
ro.watchTXIDs, &watchList, ro.ntfn)
if err != nil {
return err
}
}
if ro.ntfn.OnFilteredBlockConnected != nil {
ro.ntfn.OnFilteredBlockConnected(curStamp.Height,
&curHeader, relevantTxs)
}
if curStamp.Hash == ro.endBlock.Hash || curStamp.Height ==
ro.endBlock.Height {
return nil
}
}
}
// notifyBlock notifies listeners based on the block filter. It writes back to
// the outPoints argument the updated list of outpoints to monitor based on
// matched addresses.
func notifyBlock(block *btcutil.Block, filter *gcs.Filter,
outPoints *[]wire.OutPoint, addrs []btcutil.Address,
watchList *[][]byte, ntfn btcrpcclient.NotificationHandlers) (
[]*btcutil.Tx, error) {
func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint,
addrs []btcutil.Address, txids []chainhash.Hash, watchList *[][]byte,
ntfn btcrpcclient.NotificationHandlers) ([]*btcutil.Tx, error) {
var relevantTxs []*btcutil.Tx
blockHeader := block.MsgBlock().Header
details := btcjson.BlockDetails{
@ -351,6 +378,12 @@ func notifyBlock(block *btcutil.Block, filter *gcs.Filter,
relevant := false
txDetails := details
txDetails.Index = txIdx
for _, hash := range txids {
if hash == *(tx.Hash()) {
relevant = true
break
}
}
for _, in := range tx.MsgTx().TxIn {
if relevant {
break

View file

@ -343,10 +343,12 @@ func TestSetup(t *testing.T) {
// Test that we can get blocks and cfilters via P2P and decide which are
// valid and which aren't.
err = testRandomBlocks(t, svc, h1)
// TODO: This test is disabled until I factor it out into a benchmark.
// Otherwise, it takes too long.
/*err = testRandomBlocks(t, svc, h1)
if err != nil {
t.Fatalf("Testing blocks and cfilters failed: %s", err)
}
}*/
// Generate an address and send it some coins on the h1 chain. We use
// this to test rescans and notifications.
@ -389,13 +391,54 @@ func TestSetup(t *testing.T) {
if err != nil {
t.Fatalf("Couldn't generate/submit block: %s", err)
}
err = waitForSync(t, svc, h1)
if err != nil {
t.Fatalf("Couldn't sync ChainService: %s", err)
}
// Do a rescan that searches only for a specific TXID
startBlock := waddrmgr.BlockStamp{Height: 795}
endBlock := waddrmgr.BlockStamp{Height: 801}
var foundTx *btcutil.Tx
err = svc.Rescan(
spvchain.StartBlock(&startBlock),
spvchain.EndBlock(&endBlock),
spvchain.WatchTXIDs(tx1.TxHash()),
spvchain.NotificationHandlers(btcrpcclient.NotificationHandlers{
OnFilteredBlockConnected: func(height int32,
header *wire.BlockHeader,
relevantTxs []*btcutil.Tx) {
if height == 801 {
if len(relevantTxs) != 1 {
t.Fatalf("Didn't get expected "+
"number of relevant "+
"transactions from "+
"rescan: want 1, got "+
"%d", len(relevantTxs))
}
if *(relevantTxs[0].Hash()) !=
tx1.TxHash() {
t.Fatalf("Didn't get expected "+
"relevant transaction:"+
" want %s, got %s",
tx1.TxHash(),
relevantTxs[0].Hash())
}
foundTx = relevantTxs[0]
}
},
}),
)
if err != nil || foundTx == nil || *(foundTx.Hash()) != tx1.TxHash() {
t.Fatalf("Couldn't rescan chain for transaction %s: %s",
tx1.TxHash(), err)
}
// Start a rescan with notifications in another goroutine. We'll kill
// it with a quit channel at the end and make sure we got the expected
// results.
quitRescan := make(chan struct{})
startBlock := &waddrmgr.BlockStamp{Height: 795}
err = startRescan(t, svc, addr1, startBlock, quitRescan)
startBlock = waddrmgr.BlockStamp{Height: 795}
err = startRescan(t, svc, addr1, &startBlock, quitRescan)
if err != nil {
t.Fatalf("Couldn't start a rescan for %s: %s", addr1, err)
}
@ -657,10 +700,6 @@ func waitForSync(t *testing.T, svc *spvchain.ChainService,
return fmt.Errorf("Couldn't get best snapshot from "+
"ChainService: %s", err)
}
if logLevel != btclog.Off {
t.Logf("Synced to %d (%s)", haveBest.Height,
haveBest.Hash)
}
}
// Check if we're current.
if !svc.IsCurrent() {