Fix bugs and add SendTransaction, which isn't that great.

This commit is contained in:
Alex 2017-05-12 11:05:30 -06:00 committed by Olaoluwa Osuntokun
parent 59f3db7a1b
commit 8b734ce696
4 changed files with 271 additions and 261 deletions

View file

@ -3,6 +3,8 @@
package spvchain package spvchain
import ( import (
"fmt"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -207,7 +209,8 @@ func (s *ChainService) queryPeers(
// to the channel if we quit before // to the channel if we quit before
// reading the channel. // reading the channel.
sentChan := make(chan struct{}, 1) sentChan := make(chan struct{}, 1)
sp.QueueMessage(queryMsg, sentChan) sp.QueueMessageWithEncoding(queryMsg,
sentChan, wire.WitnessEncoding)
select { select {
case <-sentChan: case <-sentChan:
case <-quit: case <-quit:
@ -279,7 +282,7 @@ checkResponses:
// GetCFilter gets a cfilter from the database. Failing that, it requests the // GetCFilter gets a cfilter from the database. Failing that, it requests the
// cfilter from the network and writes it to the database. // cfilter from the network and writes it to the database.
func (s *ChainService) GetCFilter(blockHash chainhash.Hash, func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
extended bool, options ...QueryOption) *gcs.Filter { extended bool, options ...QueryOption) (*gcs.Filter, error) {
getFilter := s.GetBasicFilter getFilter := s.GetBasicFilter
getHeader := s.GetBasicHeader getHeader := s.GetBasicHeader
putFilter := s.putBasicFilter putFilter := s.putBasicFilter
@ -290,22 +293,34 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
} }
filter, err := getFilter(blockHash) filter, err := getFilter(blockHash)
if err == nil && filter != nil { if err == nil && filter != nil {
return filter return filter, nil
} }
// We didn't get the filter from the DB, so we'll set it to nil and try // We didn't get the filter from the DB, so we'll set it to nil and try
// to get it from the network. // to get it from the network.
filter = nil filter = nil
block, _, err := s.GetBlockByHash(blockHash) block, _, err := s.GetBlockByHash(blockHash)
if err != nil || block.BlockHash() != blockHash { if err != nil {
return nil return nil, err
}
if block.BlockHash() != blockHash {
return nil, fmt.Errorf("Couldn't get header for block %s "+
"from database", blockHash)
} }
curHeader, err := getHeader(blockHash) curHeader, err := getHeader(blockHash)
if err != nil { if err != nil {
return nil return nil, fmt.Errorf("Couldn't get cfheader for block %s "+
"from database", blockHash)
} }
prevHeader, err := getHeader(block.PrevBlock) prevHeader, err := getHeader(block.PrevBlock)
if err != nil { if err != nil {
return nil return nil, fmt.Errorf("Couldn't get cfheader for block %s "+
"from database", blockHash)
}
// If we're expecting a zero filter, just return a nil filter and don't
// bother trying to get it from the network. The caller will know
// there's no error because we're also returning a nil error.
if builder.MakeHeaderForFilter(nil, *prevHeader) == *curHeader {
return nil, nil
} }
s.queryPeers( s.queryPeers(
// Send a wire.GetCFilterMsg // Send a wire.GetCFilterMsg
@ -364,19 +379,21 @@ func (s *ChainService) GetCFilter(blockHash chainhash.Hash,
log.Tracef("Wrote filter for block %s, extended: %t", log.Tracef("Wrote filter for block %s, extended: %t",
blockHash, extended) blockHash, extended)
} }
return filter return filter, nil
} }
// GetBlockFromNetwork gets a block by requesting it from the network, one peer // GetBlockFromNetwork gets a block by requesting it from the network, one peer
// at a time, until one answers. // at a time, until one answers.
func (s *ChainService) GetBlockFromNetwork( func (s *ChainService) GetBlockFromNetwork(
blockHash chainhash.Hash, options ...QueryOption) *btcutil.Block { blockHash chainhash.Hash, options ...QueryOption) (*btcutil.Block,
error) {
blockHeader, height, err := s.GetBlockByHash(blockHash) blockHeader, height, err := s.GetBlockByHash(blockHash)
if err != nil || blockHeader.BlockHash() != blockHash { if err != nil || blockHeader.BlockHash() != blockHash {
return nil return nil, fmt.Errorf("Couldn't get header for block %s "+
"from database", blockHash)
} }
getData := wire.NewMsgGetData() getData := wire.NewMsgGetData()
getData.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, getData.AddInvVect(wire.NewInvVect(wire.InvTypeWitnessBlock,
&blockHash)) &blockHash))
// The block is only updated from the checkResponse function argument, // The block is only updated from the checkResponse function argument,
// which is always called single-threadedly. We don't check the block // which is always called single-threadedly. We don't check the block
@ -441,5 +458,37 @@ func (s *ChainService) GetBlockFromNetwork(
}, },
options..., options...,
) )
return foundBlock if foundBlock == nil {
return nil, fmt.Errorf("Couldn't retrieve block %s from "+
"network", blockHash)
}
return foundBlock, nil
}
// SendTransaction sends a transaction to each peer. It returns an error if any
// peer rejects the transaction for any reason than that it's already known.
// TODO: Better privacy by sending to only one random peer and watching
// propagation, requires better peer selection support in query API.
func (s *ChainService) SendTransaction(tx *wire.MsgTx,
options ...QueryOption) error {
var err error
s.queryPeers(
tx,
func(sp *serverPeer, resp wire.Message, quit chan<- struct{}) {
switch response := resp.(type) {
case *wire.MsgReject:
if response.Hash == tx.TxHash() &&
!strings.Contains(response.Reason,
"already have transaction") {
err = log.Errorf("Transaction %s "+
"rejected by %s: %s",
tx.TxHash(), sp.Addr(),
response.Reason)
close(quit)
}
}
},
options...,
)
return err
} }

View file

@ -320,7 +320,10 @@ rescanLoop:
var err error var err error
key := builder.DeriveKey(&curStamp.Hash) key := builder.DeriveKey(&curStamp.Hash)
matched := false matched := false
bFilter = s.GetCFilter(curStamp.Hash, false) bFilter, err = s.GetCFilter(curStamp.Hash, false)
if err != nil {
return err
}
if bFilter != nil && bFilter.N() != 0 { if bFilter != nil && bFilter.N() != 0 {
// We see if any relevant transactions match. // We see if any relevant transactions match.
matched, err = bFilter.MatchAny(key, watchList) matched, err = bFilter.MatchAny(key, watchList)
@ -329,7 +332,10 @@ rescanLoop:
} }
} }
if len(ro.watchTXIDs) > 0 { if len(ro.watchTXIDs) > 0 {
eFilter = s.GetCFilter(curStamp.Hash, true) eFilter, err = s.GetCFilter(curStamp.Hash, true)
if err != nil {
return err
}
} }
if eFilter != nil && eFilter.N() != 0 { if eFilter != nil && eFilter.N() != 0 {
// We see if any relevant transactions match. // We see if any relevant transactions match.
@ -345,11 +351,14 @@ rescanLoop:
// We've matched. Now we actually get the block // We've matched. Now we actually get the block
// and cycle through the transactions to see // and cycle through the transactions to see
// which ones are relevant. // which ones are relevant.
block = s.GetBlockFromNetwork( block, err = s.GetBlockFromNetwork(
curStamp.Hash, ro.queryOptions...) curStamp.Hash, ro.queryOptions...)
if err != nil {
return err
}
if block == nil { if block == nil {
return fmt.Errorf("Couldn't get block "+ return fmt.Errorf("Couldn't get block %d "+
"%d (%s)", curStamp.Height, "(%s) from network", curStamp.Height,
curStamp.Hash) curStamp.Hash)
} }
relevantTxs, err = notifyBlock(block, relevantTxs, err = notifyBlock(block,
@ -409,9 +418,7 @@ func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint,
} }
} }
for outIdx, out := range tx.MsgTx().TxOut { for outIdx, out := range tx.MsgTx().TxOut {
pushedData, err := pushedData, err := txscript.PushedData(out.PkScript)
txscript.PushedData(
out.PkScript)
if err != nil { if err != nil {
continue continue
} }
@ -503,42 +510,51 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) {
} }
} }
log.Tracef("Starting scan for output spend from known block %d (%s) "+ log.Tracef("Starting scan for output spend from known block %d (%s) "+
"back to block %d (%s)", curStamp.Height, curStamp.Hash) "back to block %d (%s)", curStamp.Height, curStamp.Hash,
ro.startBlock.Height, ro.startBlock.Hash)
for { for {
// Check the basic filter for the spend and the extended filter // Check the basic filter for the spend and the extended filter
// for the transaction in which the outpout is funded. // for the transaction in which the outpout is funded.
filter := s.GetCFilter(curStamp.Hash, false, filter, err := s.GetCFilter(curStamp.Hash, false,
ro.queryOptions...) ro.queryOptions...)
if filter == nil { if err != nil {
return nil, fmt.Errorf("Couldn't get basic filter for "+ return nil, fmt.Errorf("Couldn't get basic filter for "+
"block %d (%s)", curStamp.Height, curStamp.Hash) "block %d (%s)", curStamp.Height, curStamp.Hash)
} }
matched, err := filter.MatchAny(builder.DeriveKey( matched := false
&curStamp.Hash), watchList) if filter != nil {
matched, err = filter.MatchAny(builder.DeriveKey(
&curStamp.Hash), watchList)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !matched { if !matched {
filter = s.GetCFilter(curStamp.Hash, true, filter, err = s.GetCFilter(curStamp.Hash, true,
ro.queryOptions...) ro.queryOptions...)
if filter == nil { if err != nil {
return nil, fmt.Errorf("Couldn't get extended "+ return nil, fmt.Errorf("Couldn't get extended "+
"filter for block %d (%s)", "filter for block %d (%s)",
curStamp.Height, curStamp.Hash) curStamp.Height, curStamp.Hash)
} }
matched, err = filter.MatchAny(builder.DeriveKey( if filter != nil {
&curStamp.Hash), watchList) matched, err = filter.MatchAny(
builder.DeriveKey(&curStamp.Hash),
watchList)
}
} }
// If either is matched, download the block and check to see // If either is matched, download the block and check to see
// what we have. // what we have.
if matched { if matched {
block := s.GetBlockFromNetwork(curStamp.Hash, block, err := s.GetBlockFromNetwork(curStamp.Hash,
ro.queryOptions...) ro.queryOptions...)
if err != nil {
return nil, err
}
if block == nil { if block == nil {
return nil, fmt.Errorf("Couldn't get "+ return nil, fmt.Errorf("Couldn't get block %d "+
"block %d (%s)", "(%s)", curStamp.Height, curStamp.Hash)
curStamp.Height, curStamp.Hash)
} }
// If we've spent the output in this block, return an // If we've spent the output in this block, return an
// error stating that the output is spent. // error stating that the output is spent.

View file

@ -21,7 +21,6 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
) )
@ -43,11 +42,11 @@ var (
UserAgentVersion = "0.0.1-alpha" UserAgentVersion = "0.0.1-alpha"
// Services describes the services that are supported by the server. // Services describes the services that are supported by the server.
Services = wire.SFNodeCF Services = wire.SFNodeWitness | wire.SFNodeCF
// RequiredServices describes the services that are required to be // RequiredServices describes the services that are required to be
// supported by outbound peers. // supported by outbound peers.
RequiredServices = wire.SFNodeNetwork | wire.SFNodeCF RequiredServices = wire.SFNodeNetwork | wire.SFNodeWitness | wire.SFNodeCF
// BanThreshold is the maximum ban score before a peer is banned. // BanThreshold is the maximum ban score before a peer is banned.
BanThreshold = uint32(100) BanThreshold = uint32(100)
@ -1052,33 +1051,6 @@ func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer
return false return false
} }
// sendUnminedTxs iterates through all transactions that spend from wallet
// credits that are not known to have been mined into a block, and attempts to
// send each to the chain server for relay.
//
// TODO: This should return an error if any of these lookups or sends fail, but
// since send errors due to double spends need to be handled gracefully and this
// isn't done yet, all sending errors are simply logged.
func (s *ChainService) sendUnminedTxs(w *wallet.Wallet) error {
/*txs, err := w.TxStore.UnminedTxs()
if err != nil {
return err
}
rpcClient := s.rpcClient
for _, tx := range txs {
resp, err := rpcClient.SendRawTransaction(tx, false)
if err != nil {
// TODO(jrick): Check error for if this tx is a double spend,
// remove it if so.
log.Debugf("Could not resend transaction %v: %v",
tx.TxHash(), err)
continue
}
log.Debugf("Resent unmined transaction %v", resp)
}*/
return nil
}
// PublishTransaction sends the transaction to the consensus RPC server so it // PublishTransaction sends the transaction to the consensus RPC server so it
// can be propigated to other nodes and eventually mined. // can be propigated to other nodes and eventually mined.
func (s *ChainService) PublishTransaction(tx *wire.MsgTx) error { func (s *ChainService) PublishTransaction(tx *wire.MsgTx) error {
@ -1087,31 +1059,6 @@ func (s *ChainService) PublishTransaction(tx *wire.MsgTx) error {
return nil return nil
} }
// AnnounceNewTransactions generates and relays inventory vectors and notifies
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
// are added to the mempool.
func (s *ChainService) AnnounceNewTransactions( /*newTxs []*mempool.TxDesc*/ ) {
// Generate and relay inventory vectors for all newly accepted
// transactions into the memory pool due to the original being
// accepted.
/*for _, txD := range newTxs {
// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash())
s.RelayInventory(iv, txD)
if s.rpcServer != nil {
// Notify websocket clients about mempool transactions.
s.rpcServer.ntfnMgr.NotifyMempoolTx(txD.Tx, true)
// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
s.rpcServer.gbtWorkState.NotifyMempoolTx(
s.txMemPool.LastUpdated())
}
}*/
}
// newPeerConfig returns the configuration for the given serverPeer. // newPeerConfig returns the configuration for the given serverPeer.
func newPeerConfig(sp *serverPeer) *peer.Config { func newPeerConfig(sp *serverPeer) *peer.Config {
return &peer.Config{ return &peer.Config{
@ -1189,64 +1136,6 @@ func (s *ChainService) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHe
} }
} }
// rebroadcastHandler keeps track of user submitted inventories that we have
// sent out but have not yet made it into a block. We periodically rebroadcast
// them in case our peers restarted or otherwise lost track of them.
func (s *ChainService) rebroadcastHandler() {
// Wait 5 min before first tx rebroadcast.
timer := time.NewTimer(5 * time.Minute)
//pendingInvs := make(map[wire.InvVect]interface{})
out:
for {
select {
/*case riv := <-s.modifyRebroadcastInv:
switch msg := riv.(type) {
// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg.invVect] = msg.data
// When an InvVect has been added to a block, we can
// now remove it, if it was present.
case broadcastInventoryDel:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
}
}*/
case <-timer.C: /*
// Any inventory we have has not made it into a block
// yet. We periodically resubmit them until they have.
for iv, data := range pendingInvs {
ivCopy := iv
s.RelayInventory(&ivCopy, data)
}
// Process at a random time up to 30mins (in seconds)
// in the future.
timer.Reset(time.Second *
time.Duration(randomUint16Number(1800))) */
case <-s.quit:
break out
}
}
timer.Stop()
// Drain channels before exiting so nothing is left waiting around
// to send.
/*cleanup:
for {
select {
//case <-s.modifyRebroadcastInv:
default:
break cleanup
}
}*/
s.wg.Done()
}
// ChainParams returns a copy of the ChainService's chaincfg.Params. // ChainParams returns a copy of the ChainService's chaincfg.Params.
func (s *ChainService) ChainParams() chaincfg.Params { func (s *ChainService) ChainParams() chaincfg.Params {
return s.chainParams return s.chainParams
@ -1261,10 +1150,8 @@ func (s *ChainService) Start() {
// Start the peer handler which in turn starts the address and block // Start the peer handler which in turn starts the address and block
// managers. // managers.
s.wg.Add(2) s.wg.Add(1)
go s.peerHandler() go s.peerHandler()
go s.rebroadcastHandler()
} }
// Stop gracefully shuts down the server by stopping and disconnecting all // Stop gracefully shuts down the server by stopping and disconnecting all

View file

@ -23,6 +23,7 @@ import (
"github.com/btcsuite/btclog" "github.com/btcsuite/btclog"
"github.com/btcsuite/btcrpcclient" "github.com/btcsuite/btcrpcclient"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcutil/gcs"
"github.com/btcsuite/btcutil/gcs/builder" "github.com/btcsuite/btcutil/gcs/builder"
"github.com/btcsuite/btcwallet/spvsvc/spvchain" "github.com/btcsuite/btcwallet/spvsvc/spvchain"
"github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/waddrmgr"
@ -37,7 +38,7 @@ var (
// log messages from the tests themselves as well. Keep in mind some // log messages from the tests themselves as well. Keep in mind some
// log messages may not appear in order due to use of multiple query // log messages may not appear in order due to use of multiple query
// goroutines in the tests. // goroutines in the tests.
logLevel = btclog.Off logLevel = btclog.TraceLvl
syncTimeout = 30 * time.Second syncTimeout = 30 * time.Second
syncUpdate = time.Second syncUpdate = time.Second
// Don't set this too high for your platform, or the tests will miss // Don't set this too high for your platform, or the tests will miss
@ -323,10 +324,8 @@ func TestSetup(t *testing.T) {
}, },
} }
spvchain.Services = 0
spvchain.MaxPeers = 3 spvchain.MaxPeers = 3
spvchain.BanDuration = 5 * time.Second spvchain.BanDuration = 5 * time.Second
spvchain.RequiredServices = wire.SFNodeNetwork
spvchain.WaitForMoreCFHeaders = time.Second spvchain.WaitForMoreCFHeaders = time.Second
svc, err := spvchain.NewChainService(config) svc, err := spvchain.NewChainService(config)
if err != nil { if err != nil {
@ -341,14 +340,6 @@ func TestSetup(t *testing.T) {
t.Fatalf("Couldn't sync ChainService: %s", err) t.Fatalf("Couldn't sync ChainService: %s", err)
} }
// Test that we can get blocks and cfilters via P2P and decide which are
// valid and which aren't.
// TODO: Split this out into a benchmark.
err = testRandomBlocks(t, svc, h1)
if err != nil {
t.Fatalf("Testing blocks and cfilters failed: %s", err)
}
// Generate an address and send it some coins on the h1 chain. We use // Generate an address and send it some coins on the h1 chain. We use
// this to test rescans and notifications. // this to test rescans and notifications.
secSrc := newSecSource(&modParams) secSrc := newSecSource(&modParams)
@ -568,9 +559,10 @@ func TestSetup(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Couldn't sign transaction: %s", err) t.Fatalf("Couldn't sign transaction: %s", err)
} }
_, err = h1.Node.SendRawTransaction(authTx1.Tx, true) banPeer(svc, h2)
err = svc.SendTransaction(authTx1.Tx, queryOptions...)
if err != nil { if err != nil {
t.Fatalf("Unable to send raw transaction to node: %s", err) t.Fatalf("Unable to send transaction to network: %s", err)
} }
_, err = h1.Node.Generate(1) _, err = h1.Node.Generate(1)
if err != nil { if err != nil {
@ -604,9 +596,10 @@ func TestSetup(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Couldn't sign transaction: %s", err) t.Fatalf("Couldn't sign transaction: %s", err)
} }
_, err = h1.Node.SendRawTransaction(authTx2.Tx, true) banPeer(svc, h2)
err = svc.SendTransaction(authTx2.Tx, queryOptions...)
if err != nil { if err != nil {
t.Fatalf("Unable to send raw transaction to node: %s", err) t.Fatalf("Unable to send transaction to network: %s", err)
} }
_, err = h1.Node.Generate(1) _, err = h1.Node.Generate(1)
if err != nil { if err != nil {
@ -621,17 +614,20 @@ func TestSetup(t *testing.T) {
t.Fatalf("Wrong number of relevant transactions. Want: 4, got:"+ t.Fatalf("Wrong number of relevant transactions. Want: 4, got:"+
" %d", numTXs) " %d", numTXs)
} }
// Generate 1 blocks on h1, one at a time, to make sure the // Generate a block with a nonstandard coinbase to generate a basic
// ChainService instance stays caught up. // filter with 0 entries.
for i := 0; i < 1; i++ { _, err = h1.GenerateAndSubmitBlockWithCustomCoinbaseOutputs(
_, err = h1.Node.Generate(1) []*btcutil.Tx{}, rpctest.BlockVersion, time.Time{},
if err != nil { []wire.TxOut{{
t.Fatalf("Couldn't generate/submit block: %s", err) Value: 0,
} PkScript: []byte{},
err = waitForSync(t, svc, h1) }})
if err != nil { if err != nil {
t.Fatalf("Couldn't sync ChainService: %s", err) t.Fatalf("Couldn't generate/submit block: %s", err)
} }
err = waitForSync(t, svc, h1)
if err != nil {
t.Fatalf("Couldn't sync ChainService: %s", err)
} }
// Check and make sure the previous UTXO is now spent. // Check and make sure the previous UTXO is now spent.
@ -644,8 +640,17 @@ func TestSetup(t *testing.T) {
t.Fatalf("UTXO %s not seen as spent: %s", ourOutPoint, err) t.Fatalf("UTXO %s not seen as spent: %s", ourOutPoint, err)
} }
// Test that we can get blocks and cfilters via P2P and decide which are
// valid and which aren't.
// TODO: Split this out into a benchmark.
err = testRandomBlocks(t, svc, h1)
if err != nil {
t.Fatalf("Testing blocks and cfilters failed: %s", err)
}
// Generate 5 blocks on h2 and wait for ChainService to sync to the // Generate 5 blocks on h2 and wait for ChainService to sync to the
// newly-best chain on h2. // newly-best chain on h2. This includes the transactions sent via
// svc.SendTransaction earlier, so we'll have to
_, err = h2.Node.Generate(5) _, err = h2.Node.Generate(5)
if err != nil { if err != nil {
t.Fatalf("Couldn't generate/submit blocks: %s", err) t.Fatalf("Couldn't generate/submit blocks: %s", err)
@ -913,8 +918,12 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
return return
} }
// Get block from network. // Get block from network.
haveBlock := svc.GetBlockFromNetwork(blockHash, haveBlock, err := svc.GetBlockFromNetwork(blockHash,
queryOptions...) queryOptions...)
if err != nil {
errChan <- err
return
}
if haveBlock == nil { if haveBlock == nil {
errChan <- fmt.Errorf("Couldn't get block %d "+ errChan <- fmt.Errorf("Couldn't get block %d "+
"(%s) from network", height, blockHash) "(%s) from network", height, blockHash)
@ -939,11 +948,10 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
return return
} }
// Get basic cfilter from network. // Get basic cfilter from network.
haveFilter := svc.GetCFilter(blockHash, false, haveFilter, err := svc.GetCFilter(blockHash, false,
queryOptions...) queryOptions...)
if haveFilter == nil { if err != nil {
errChan <- fmt.Errorf("Couldn't get basic "+ errChan <- err
"filter for block %d", height)
return return
} }
// Get basic cfilter from RPC. // Get basic cfilter from RPC.
@ -956,7 +964,11 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
return return
} }
// Check that network and RPC cfilters match. // Check that network and RPC cfilters match.
if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { var haveBytes []byte
if haveFilter != nil {
haveBytes = haveFilter.NBytes()
}
if !bytes.Equal(haveBytes, wantFilter.Data) {
errChan <- fmt.Errorf("Basic filter from P2P "+ errChan <- fmt.Errorf("Basic filter from P2P "+
"network/DB doesn't match RPC value "+ "network/DB doesn't match RPC value "+
"for block %d", height) "for block %d", height)
@ -965,7 +977,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
// Calculate basic filter from block. // Calculate basic filter from block.
calcFilter, err := builder.BuildBasicFilter( calcFilter, err := builder.BuildBasicFilter(
haveBlock.MsgBlock()) haveBlock.MsgBlock())
if err != nil { if err != nil && err != gcs.ErrNoData {
errChan <- fmt.Errorf("Couldn't build basic "+ errChan <- fmt.Errorf("Couldn't build basic "+
"filter for block %d (%s): %s", height, "filter for block %d (%s): %s", height,
blockHash, err) blockHash, err)
@ -973,7 +985,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
} }
// Check that the network value matches the calculated // Check that the network value matches the calculated
// value from the block. // value from the block.
if !reflect.DeepEqual(*haveFilter, *calcFilter) { if !reflect.DeepEqual(haveFilter, calcFilter) {
errChan <- fmt.Errorf("Basic filter from P2P "+ errChan <- fmt.Errorf("Basic filter from P2P "+
"network/DB doesn't match calculated "+ "network/DB doesn't match calculated "+
"value for block %d", height) "value for block %d", height)
@ -1007,11 +1019,10 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
return return
} }
// Get extended cfilter from network // Get extended cfilter from network
haveFilter = svc.GetCFilter(blockHash, true, haveFilter, err = svc.GetCFilter(blockHash, true,
queryOptions...) queryOptions...)
if haveFilter == nil { if err != nil {
errChan <- fmt.Errorf("Couldn't get extended "+ errChan <- err
"filter for block %d", height)
return return
} }
// Get extended cfilter from RPC // Get extended cfilter from RPC
@ -1024,7 +1035,10 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
return return
} }
// Check that network and RPC cfilters match // Check that network and RPC cfilters match
if !bytes.Equal(haveFilter.NBytes(), wantFilter.Data) { if haveFilter != nil {
haveBytes = haveFilter.NBytes()
}
if !bytes.Equal(haveBytes, wantFilter.Data) {
errChan <- fmt.Errorf("Extended filter from "+ errChan <- fmt.Errorf("Extended filter from "+
"P2P network/DB doesn't match RPC "+ "P2P network/DB doesn't match RPC "+
"value for block %d", height) "value for block %d", height)
@ -1033,7 +1047,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
// Calculate extended filter from block // Calculate extended filter from block
calcFilter, err = builder.BuildExtFilter( calcFilter, err = builder.BuildExtFilter(
haveBlock.MsgBlock()) haveBlock.MsgBlock())
if err != nil { if err != nil && err != gcs.ErrNoData {
errChan <- fmt.Errorf("Couldn't build extended"+ errChan <- fmt.Errorf("Couldn't build extended"+
" filter for block %d (%s): %s", height, " filter for block %d (%s): %s", height,
blockHash, err) blockHash, err)
@ -1041,7 +1055,7 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
} }
// Check that the network value matches the calculated // Check that the network value matches the calculated
// value from the block. // value from the block.
if !reflect.DeepEqual(*haveFilter, *calcFilter) { if !reflect.DeepEqual(haveFilter, calcFilter) {
errChan <- fmt.Errorf("Extended filter from "+ errChan <- fmt.Errorf("Extended filter from "+
"P2P network/DB doesn't match "+ "P2P network/DB doesn't match "+
"calculated value for block %d", height) "calculated value for block %d", height)
@ -1104,76 +1118,108 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService,
// notifications continue until the `quit` channel is closed. // notifications continue until the `quit` channel is closed.
func startRescan(t *testing.T, svc *spvchain.ChainService, addr btcutil.Address, func startRescan(t *testing.T, svc *spvchain.ChainService, addr btcutil.Address,
startBlock *waddrmgr.BlockStamp, quit <-chan struct{}) error { startBlock *waddrmgr.BlockStamp, quit <-chan struct{}) error {
go svc.Rescan( go func() {
spvchain.QuitChan(quit), err := svc.Rescan(
spvchain.WatchAddrs(addr), spvchain.QuitChan(quit),
spvchain.StartBlock(startBlock), spvchain.WatchAddrs(addr),
spvchain.NotificationHandlers(btcrpcclient.NotificationHandlers{ spvchain.StartBlock(startBlock),
OnBlockConnected: func(hash *chainhash.Hash, spvchain.NotificationHandlers(
height int32, time time.Time) { btcrpcclient.NotificationHandlers{
rescanMtx.Lock() OnBlockConnected: func(
gotLog = append(gotLog, []byte("bc")...) hash *chainhash.Hash,
curBlockHeight = height height int32, time time.Time) {
rescanMtx.Unlock() rescanMtx.Lock()
}, gotLog = append(gotLog,
OnBlockDisconnected: func(hash *chainhash.Hash, []byte("bc")...)
height int32, time time.Time) { curBlockHeight = height
rescanMtx.Lock() rescanMtx.Unlock()
delete(ourKnownTxsByBlock, *hash) },
gotLog = append(gotLog, []byte("bd")...) OnBlockDisconnected: func(
curBlockHeight = height - 1 hash *chainhash.Hash,
rescanMtx.Unlock() height int32, time time.Time) {
}, rescanMtx.Lock()
OnRecvTx: func(tx *btcutil.Tx, delete(ourKnownTxsByBlock, *hash)
details *btcjson.BlockDetails) { gotLog = append(gotLog,
rescanMtx.Lock() []byte("bd")...)
hash, err := chainhash.NewHashFromStr( curBlockHeight = height - 1
details.Hash) rescanMtx.Unlock()
if err != nil { },
t.Errorf("Couldn't decode hash %s: %s", OnRecvTx: func(tx *btcutil.Tx,
details.Hash, err) details *btcjson.BlockDetails) {
} rescanMtx.Lock()
ourKnownTxsByBlock[*hash] = append( hash, err := chainhash.
ourKnownTxsByBlock[*hash], tx) NewHashFromStr(
gotLog = append(gotLog, []byte("rv")...) details.Hash)
rescanMtx.Unlock() if err != nil {
}, t.Errorf("Couldn't "+
OnRedeemingTx: func(tx *btcutil.Tx, "decode hash "+
details *btcjson.BlockDetails) { "%s: %s",
rescanMtx.Lock() details.Hash,
hash, err := chainhash.NewHashFromStr( err)
details.Hash) }
if err != nil { ourKnownTxsByBlock[*hash] = append(
t.Errorf("Couldn't decode hash %s: %s", ourKnownTxsByBlock[*hash],
details.Hash, err) tx)
} gotLog = append(gotLog,
ourKnownTxsByBlock[*hash] = append( []byte("rv")...)
ourKnownTxsByBlock[*hash], tx) rescanMtx.Unlock()
gotLog = append(gotLog, []byte("rd")...) },
rescanMtx.Unlock() OnRedeemingTx: func(tx *btcutil.Tx,
}, details *btcjson.BlockDetails) {
OnFilteredBlockConnected: func(height int32, rescanMtx.Lock()
header *wire.BlockHeader, hash, err := chainhash.
relevantTxs []*btcutil.Tx) { NewHashFromStr(
rescanMtx.Lock() details.Hash)
ourKnownTxsByFilteredBlock[header.BlockHash()] = if err != nil {
relevantTxs t.Errorf("Couldn't "+
gotLog = append(gotLog, []byte("fc")...) "decode hash "+
gotLog = append(gotLog, uint8(len(relevantTxs))) "%s: %s",
curFilteredBlockHeight = height details.Hash,
rescanMtx.Unlock() err)
}, }
OnFilteredBlockDisconnected: func(height int32, ourKnownTxsByBlock[*hash] = append(
header *wire.BlockHeader) { ourKnownTxsByBlock[*hash],
rescanMtx.Lock() tx)
delete(ourKnownTxsByFilteredBlock, gotLog = append(gotLog,
header.BlockHash()) []byte("rd")...)
gotLog = append(gotLog, []byte("fd")...) rescanMtx.Unlock()
curFilteredBlockHeight = height - 1 },
rescanMtx.Unlock() OnFilteredBlockConnected: func(
}, height int32,
}), header *wire.BlockHeader,
) relevantTxs []*btcutil.Tx) {
rescanMtx.Lock()
ourKnownTxsByFilteredBlock[header.BlockHash()] =
relevantTxs
gotLog = append(gotLog,
[]byte("fc")...)
gotLog = append(gotLog,
uint8(len(relevantTxs)))
curFilteredBlockHeight = height
rescanMtx.Unlock()
},
OnFilteredBlockDisconnected: func(
height int32,
header *wire.BlockHeader) {
rescanMtx.Lock()
delete(ourKnownTxsByFilteredBlock,
header.BlockHash())
gotLog = append(gotLog,
[]byte("fd")...)
curFilteredBlockHeight =
height - 1
rescanMtx.Unlock()
},
}),
)
if logLevel != btclog.Off {
if err != nil {
t.Logf("Rescan ended: %s", err)
} else {
t.Logf("Rescan ended successfully")
}
}
}()
return nil return nil
} }
@ -1203,3 +1249,15 @@ func checkRescanStatus() (int, int32, error) {
} }
return txCount[0], curBlockHeight, nil return txCount[0], curBlockHeight, nil
} }
// banPeer bans and disconnects the requested harness from the ChainService
// instance for BanDuration seconds.
func banPeer(svc *spvchain.ChainService, harness *rpctest.Harness) {
peers := svc.Peers()
for _, peer := range peers {
if peer.Addr() == harness.P2PAddress() {
svc.BanPeer(peer)
peer.Disconnect()
}
}
}