From 32adc3c43f11d57e15af8e867b841b87036ca47c Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 15 May 2017 21:20:44 -0600 Subject: [PATCH] Make rescan a struct, add spending tx to GetUtxo, start integration. --- chain/interface.go | 25 +++ config.go | 16 ++ log.go | 49 ++++++ rpc/legacyrpc/methods.go | 36 +++- rpc/legacyrpc/server.go | 2 +- spvsvc/spvchain/driver.go | 70 ++++++++ spvsvc/spvchain/rescan.go | 271 +++++++++++++++++++++++----- spvsvc/spvchain/sync_test.go | 331 +++++++++++++++++++++-------------- wallet/wallet.go | 4 +- 9 files changed, 617 insertions(+), 187 deletions(-) create mode 100644 chain/interface.go create mode 100644 spvsvc/spvchain/driver.go diff --git a/chain/interface.go b/chain/interface.go new file mode 100644 index 0000000..32e0bc4 --- /dev/null +++ b/chain/interface.go @@ -0,0 +1,25 @@ +package chain + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcwallet/waddrmgr" +) + +// Interface allows more than one backing blockchain source, such as a +// btcd RPC chain server, or an SPV library, as long as we write a driver for +// it. +type Interface interface { + Start() error + Stop() + WaitForShutdown() + GetBestBlock() (*chainhash.Hash, int32, error) + GetBlock(*chainhash.Hash) (*wire.MsgBlock, error) + BlockStamp() (*waddrmgr.BlockStamp, error) + SendRawTransaction(*wire.MsgTx, bool) (*chainhash.Hash, error) + Rescan(*chainhash.Hash, []btcutil.Address, []*wire.OutPoint) error + NotifyReceived([]btcutil.Address) error + NotifyBlocks() error + Notifications() <-chan interface{} +} diff --git a/config.go b/config.go index 7219294..5f47eba 100644 --- a/config.go +++ b/config.go @@ -13,11 +13,13 @@ import ( "runtime" "sort" "strings" + "time" "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwallet/internal/cfgutil" "github.com/btcsuite/btcwallet/internal/legacy/keystore" "github.com/btcsuite/btcwallet/netparams" + "github.com/btcsuite/btcwallet/spvsvc/spvchain" "github.com/btcsuite/btcwallet/wallet" flags "github.com/jessevdk/go-flags" ) @@ -70,6 +72,14 @@ type config struct { ProxyUser string `long:"proxyuser" description:"Username for proxy server"` ProxyPass string `long:"proxypass" default-mask:"-" description:"Password for proxy server"` + // SPV client options + UseSPV bool `long:"usespv" description:"Enables the experimental use of SPV rather than RPC for chain synchronization"` + AddPeers []string `short:"a" long:"addpeer" description:"Add a peer to connect with at startup"` + ConnectPeers []string `long:"connect" description:"Connect only to the specified peers at startup"` + MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"` + BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"` + BanThreshold uint32 `long:"banthreshold" description:"Maximum allowed ban score before disconnecting and banning misbehaving peers."` + // RPC server options // // The legacy server is still enabled by default (and eventually will be @@ -257,6 +267,12 @@ func loadConfig() (*config, []string, error) { LegacyRPCMaxClients: defaultRPCMaxClients, LegacyRPCMaxWebsockets: defaultRPCMaxWebsockets, DataDir: cfgutil.NewExplicitString(defaultAppDataDir), + UseSPV: false, + AddPeers: []string{}, + ConnectPeers: []string{}, + MaxPeers: spvchain.MaxPeers, + BanDuration: spvchain.BanDuration, + BanThreshold: spvchain.BanThreshold, } // Pre-parse the command line options to see if an alternative config diff --git a/log.go b/log.go index 9d6670e..8f55218 100644 --- a/log.go +++ b/log.go @@ -13,6 +13,7 @@ import ( "github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/rpc/legacyrpc" "github.com/btcsuite/btcwallet/rpc/rpcserver" + "github.com/btcsuite/btcwallet/spvsvc/spvchain" "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wtxmgr" "github.com/jrick/logrotate/rotator" @@ -72,6 +73,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "CHNS": chainLog, "GRPC": grpcLog, "RPCS": legacyRPCLog, + "SPVC": spvchainLog, } // initLogRotator initializes the logging rotater to write logs to logFile and @@ -83,6 +85,53 @@ func initLogRotator(logFile string) { if err != nil { fmt.Fprintf(os.Stderr, "failed to create log directory: %v\n", err) os.Exit(1) +======= +// logClosure is used to provide a closure over expensive logging operations +// so don't have to be performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} + +// useLogger updates the logger references for subsystemID to logger. Invalid +// subsystems are ignored. +func useLogger(subsystemID string, logger btclog.Logger) { + if _, ok := subsystemLoggers[subsystemID]; !ok { + return + } + subsystemLoggers[subsystemID] = logger + + switch subsystemID { + case "BTCW": + log = logger + case "WLLT": + walletLog = logger + wallet.UseLogger(logger) + case "TXST": + txmgrLog = logger + wtxmgr.UseLogger(logger) + case "CHNS": + chainLog = logger + chain.UseLogger(logger) + btcrpcclient.UseLogger(logger) + case "GRPC": + grpcLog = logger + rpcserver.UseLogger(logger) + case "RPCS": + legacyRPCLog = logger + legacyrpc.UseLogger(logger) + case "SPVC": + spvchainLog = logger + spvchain.UseLogger(logger) } r, err := rotator.New(logFile, 10*1024, false, 3) if err != nil { diff --git a/rpc/legacyrpc/methods.go b/rpc/legacyrpc/methods.go index bc24a35..5b3d92a 100644 --- a/rpc/legacyrpc/methods.go +++ b/rpc/legacyrpc/methods.go @@ -165,7 +165,7 @@ type lazyHandler func() (interface{}, *btcjson.RPCError) // returning a closure that will execute it with the (required) wallet and // (optional) consensus RPC server. If no handlers are found and the // chainClient is not nil, the returned handler performs RPC passthrough. -func lazyApplyHandler(request *btcjson.Request, w *wallet.Wallet, chainClient *chain.RPCClient) lazyHandler { +func lazyApplyHandler(request *btcjson.Request, w *wallet.Wallet, chainClient chain.Interface) lazyHandler { handlerData, ok := rpcHandlers[request.Method] if ok && handlerData.handlerWithChain != nil && w != nil && chainClient != nil { return func() (interface{}, *btcjson.RPCError) { @@ -173,11 +173,20 @@ func lazyApplyHandler(request *btcjson.Request, w *wallet.Wallet, chainClient *c if err != nil { return nil, btcjson.ErrRPCInvalidRequest } - resp, err := handlerData.handlerWithChain(cmd, w, chainClient) - if err != nil { - return nil, jsonError(err) + switch client := chainClient.(type) { + case *chain.RPCClient: + resp, err := handlerData.handlerWithChain(cmd, + w, client) + if err != nil { + return nil, jsonError(err) + } + return resp, nil + default: + return nil, &btcjson.RPCError{ + Code: -1, + Message: "Chain RPC is inactive", + } } - return resp, nil } } if ok && handlerData.handler != nil && w != nil { @@ -202,11 +211,20 @@ func lazyApplyHandler(request *btcjson.Request, w *wallet.Wallet, chainClient *c Message: "Chain RPC is inactive", } } - resp, err := chainClient.RawRequest(request.Method, request.Params) - if err != nil { - return nil, jsonError(err) + switch client := chainClient.(type) { + case *chain.RPCClient: + resp, err := client.RawRequest(request.Method, + request.Params) + if err != nil { + return nil, jsonError(err) + } + return &resp, nil + default: + return nil, &btcjson.RPCError{ + Code: -1, + Message: "Chain RPC is inactive", + } } - return &resp, nil } } diff --git a/rpc/legacyrpc/server.go b/rpc/legacyrpc/server.go index 7e159ea..b43d65d 100644 --- a/rpc/legacyrpc/server.go +++ b/rpc/legacyrpc/server.go @@ -61,7 +61,7 @@ type Server struct { httpServer http.Server wallet *wallet.Wallet walletLoader *wallet.Loader - chainClient *chain.RPCClient + chainClient chain.Interface handlerLookup func(string) (requestHandler, bool) handlerMu sync.Mutex diff --git a/spvsvc/spvchain/driver.go b/spvsvc/spvchain/driver.go new file mode 100644 index 0000000..b5f9d67 --- /dev/null +++ b/spvsvc/spvchain/driver.go @@ -0,0 +1,70 @@ +package spvchain + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/waddrmgr" +) + +// SPVChain is an implementation of the btcwalet chain.Interface interface. +type SPVChain struct { + cs *ChainService +} + +// NewSPVChain creates a new SPVChain struct with a backing ChainService +func NewSPVChain(chainService *ChainService) *SPVChain { + return &SPVChain{ + cs: chainService, + } +} + +// Start replicates the RPC client's Start method. +func (s *SPVChain) Start() error { + s.cs.Start() + return nil +} + +// Stop replicates the RPC client's Stop method. +func (s *SPVChain) Stop() { + s.cs.Stop() +} + +// WaitForShutdown replicates the RPC client's WaitForShutdown method. +func (s *SPVChain) WaitForShutdown() { + s.cs.Stop() +} + +// SendRawTransaction replicates the RPC client's SendRawTransaction command. +func (s *SPVChain) SendRawTransaction(tx *wire.MsgTx, allowHighFees bool) ( + *chainhash.Hash, error) { + err := s.cs.SendTransaction(tx) + if err != nil { + return nil, err + } + hash := tx.TxHash() + return &hash, nil +} + +// GetBlock replicates the RPC client's GetBlock command. +func (s *SPVChain) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { + block, err := s.cs.GetBlockFromNetwork(*hash) + if err != nil { + return nil, err + } + return block.MsgBlock(), nil +} + +// GetBestBlock replicates the RPC client's GetBestBlock command. +func (s *SPVChain) GetBestBlock() (*chainhash.Hash, int32, error) { + header, height, err := s.cs.LatestBlock() + if err != nil { + return nil, 0, err + } + hash := header.BlockHash() + return &hash, int32(height), nil +} + +// BlockStamp replicates the RPC client's BlockStamp command. +func (s *SPVChain) BlockStamp() (*waddrmgr.BlockStamp, error) { + return s.cs.SyncedTo() +} diff --git a/spvsvc/spvchain/rescan.go b/spvsvc/spvchain/rescan.go index 00f16e4..5b6cb03 100644 --- a/spvsvc/spvchain/rescan.go +++ b/spvsvc/spvchain/rescan.go @@ -5,6 +5,7 @@ package spvchain import ( "bytes" "fmt" + "sync/atomic" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -22,14 +23,17 @@ var () // Functional parameters for Rescan type rescanOptions struct { + chain *ChainService queryOptions []QueryOption ntfn btcrpcclient.NotificationHandlers startBlock *waddrmgr.BlockStamp endBlock *waddrmgr.BlockStamp watchAddrs []btcutil.Address watchOutPoints []wire.OutPoint - watchTXIDs []chainhash.Hash + watchTxIDs []chainhash.Hash + watchList [][]byte txIdx uint32 + update <-chan *updateOptions quit <-chan struct{} } @@ -99,12 +103,12 @@ func WatchOutPoints(watchOutPoints ...wire.OutPoint) RescanOption { } } -// WatchTXIDs specifies the outpoints to watch for on-chain spends. Each +// WatchTxIDs specifies the outpoints to watch for on-chain spends. Each // call to this function adds to the list of outpoints being watched rather // than replacing the list. -func WatchTXIDs(watchTXIDs ...chainhash.Hash) RescanOption { +func WatchTxIDs(watchTxIDs ...chainhash.Hash) RescanOption { return func(ro *rescanOptions) { - ro.watchTXIDs = append(ro.watchTXIDs, watchTXIDs...) + ro.watchTxIDs = append(ro.watchTxIDs, watchTxIDs...) } } @@ -126,6 +130,14 @@ func QuitChan(quit <-chan struct{}) RescanOption { } } +// updateChan specifies an update channel. This is for internal use by the +// Rescan.Update functionality. +func updateChan(update <-chan *updateOptions) RescanOption { + return func(ro *rescanOptions) { + ro.update = update + } +} + // Rescan is a single-threaded function that uses headers from the database and // functional options as arguments. func (s *ChainService) Rescan(options ...RescanOption) error { @@ -137,20 +149,20 @@ func (s *ChainService) Rescan(options ...RescanOption) error { for _, option := range options { option(ro) } + ro.chain = s - var watchList [][]byte // If we have something to watch, create a watch list. for _, addr := range ro.watchAddrs { - watchList = append(watchList, addr.ScriptAddress()) + ro.watchList = append(ro.watchList, addr.ScriptAddress()) } for _, op := range ro.watchOutPoints { - watchList = append(watchList, + ro.watchList = append(ro.watchList, builder.OutPointToFilterEntry(op)) } - for _, txid := range ro.watchTXIDs { - watchList = append(watchList, txid[:]) + for _, txid := range ro.watchTxIDs { + ro.watchList = append(ro.watchList, txid[:]) } - if len(watchList) == 0 { + if len(ro.watchList) == 0 { return fmt.Errorf("Rescan must specify addresses and/or " + "outpoints and/or TXIDs to watch") } @@ -244,6 +256,15 @@ rescanLoop: case <-ro.quit: s.unsubscribeBlockMsgs(subscription) return nil + case update := <-ro.update: + rewound, err := ro.updateFilter(update, + &curStamp, &curHeader) + if err != nil { + return err + } + if rewound { + current = false + } case header := <-blockConnected: // Only deal with the next block from what we // know about. Otherwise, it's in the future. @@ -326,12 +347,12 @@ rescanLoop: } if bFilter != nil && bFilter.N() != 0 { // We see if any relevant transactions match. - matched, err = bFilter.MatchAny(key, watchList) + matched, err = bFilter.MatchAny(key, ro.watchList) if err != nil { return err } } - if len(ro.watchTXIDs) > 0 { + if len(ro.watchTxIDs) > 0 { eFilter, err = s.GetCFilter(curStamp.Hash, true) if err != nil { return err @@ -339,7 +360,7 @@ rescanLoop: } if eFilter != nil && eFilter.N() != 0 { // We see if any relevant transactions match. - matched, err = eFilter.MatchAny(key, watchList) + matched, err = eFilter.MatchAny(key, ro.watchList) if err != nil { return err } @@ -361,9 +382,7 @@ rescanLoop: "(%s) from network", curStamp.Height, curStamp.Hash) } - relevantTxs, err = notifyBlock(block, - &ro.watchOutPoints, ro.watchAddrs, - ro.watchTXIDs, &watchList, ro.ntfn) + relevantTxs, err = ro.notifyBlock(block) if err != nil { return err } @@ -376,15 +395,84 @@ rescanLoop: ro.endBlock.Height { return nil } + select { + case update := <-ro.update: + rewound, err := ro.updateFilter(update, &curStamp, + &curHeader) + if err != nil { + return err + } + if rewound { + current = false + } + default: + } } } +// updateFilter atomically updates the filter and rewinds to the specified +// height if not 0. +func (ro *rescanOptions) updateFilter(update *updateOptions, + curStamp *waddrmgr.BlockStamp, curHeader *wire.BlockHeader) (bool, + error) { + ro.watchAddrs = append(ro.watchAddrs, + update.addrs...) + ro.watchOutPoints = append(ro.watchOutPoints, + update.outPoints...) + ro.watchTxIDs = append(ro.watchTxIDs, + update.txIDs...) + for _, addr := range update.addrs { + ro.watchList = append(ro.watchList, addr.ScriptAddress()) + } + for _, op := range update.outPoints { + ro.watchList = append(ro.watchList, + builder.OutPointToFilterEntry(op)) + } + for _, txid := range update.txIDs { + ro.watchList = append(ro.watchList, txid[:]) + } + // Rewind if requested + if update.rewind == 0 { + return false, nil + } + var header wire.BlockHeader + var height uint32 + var rewound bool + var err error + for curStamp.Height > int32(update.rewind) { + if ro.ntfn.OnBlockDisconnected != nil { + ro.ntfn.OnBlockDisconnected(&curStamp.Hash, + curStamp.Height, curHeader.Timestamp) + } + if ro.ntfn.OnFilteredBlockDisconnected != nil { + ro.ntfn.OnFilteredBlockDisconnected(curStamp.Height, + curHeader) + } + // Don't rewind past the last block we need to disconnect, + // because otherwise we connect the last known good block + // without ever disconnecting it. + if curStamp.Height == int32(update.rewind+1) { + break + } + // Rewind and continue. + header, height, err = + ro.chain.GetBlockByHash(curHeader.PrevBlock) + if err != nil { + return rewound, err + } + *curHeader = header + curStamp.Height = int32(height) + curStamp.Hash = curHeader.BlockHash() + rewound = true + } + return rewound, nil +} + // notifyBlock notifies listeners based on the block filter. It writes back to // the outPoints argument the updated list of outpoints to monitor based on // matched addresses. -func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, - addrs []btcutil.Address, txids []chainhash.Hash, watchList *[][]byte, - ntfn btcrpcclient.NotificationHandlers) ([]*btcutil.Tx, error) { +func (ro *rescanOptions) notifyBlock(block *btcutil.Block) ([]*btcutil.Tx, + error) { var relevantTxs []*btcutil.Tx blockHeader := block.MsgBlock().Header details := btcjson.BlockDetails{ @@ -396,7 +484,7 @@ func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, relevant := false txDetails := details txDetails.Index = txIdx - for _, hash := range txids { + for _, hash := range ro.watchTxIDs { if hash == *(tx.Hash()) { relevant = true break @@ -406,11 +494,11 @@ func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, if relevant { break } - for _, op := range *outPoints { + for _, op := range ro.watchOutPoints { if in.PreviousOutPoint == op { relevant = true - if ntfn.OnRedeemingTx != nil { - ntfn.OnRedeemingTx(tx, + if ro.ntfn.OnRedeemingTx != nil { + ro.ntfn.OnRedeemingTx(tx, &txDetails) } break @@ -422,7 +510,7 @@ func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, if err != nil { continue } - for _, addr := range addrs { + for _, addr := range ro.watchAddrs { if relevant { break } @@ -435,13 +523,15 @@ func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, Hash: *hash, Index: uint32(outIdx), } - *outPoints = append(*outPoints, + ro.watchOutPoints = append( + ro.watchOutPoints, outPoint) - *watchList = append(*watchList, + ro.watchList = append( + ro.watchList, builder.OutPointToFilterEntry( outPoint)) - if ntfn.OnRecvTx != nil { - ntfn.OnRecvTx(tx, + if ro.ntfn.OnRecvTx != nil { + ro.ntfn.OnRecvTx(tx, &txDetails) } } @@ -455,12 +545,102 @@ func notifyBlock(block *btcutil.Block, outPoints *[]wire.OutPoint, return relevantTxs, nil } +// Rescan is an object that represents a long-running rescan/notification +// client with updateable filters. It's meant to be close to a drop-in +// replacement for the btcd rescan and notification functionality used in +// wallets. It only contains information about whether a goroutine is running. +type Rescan struct { + running uint32 + updateChan chan<- *updateOptions +} + +// NewRescan returns a rescan object that runs in another goroutine and has an +// updateable filter. It returns the long-running rescan object, and a channel +// which returns any error on termination of the rescan process. +func (s *ChainService) NewRescan(options ...RescanOption) (Rescan, + <-chan error) { + updChan := make(chan *updateOptions) + errChan := make(chan error) + rescan := Rescan{ + running: 1, + updateChan: updChan, + } + go func() { + err := s.Rescan(append(options, updateChan(updChan))...) + atomic.StoreUint32(&rescan.running, 0) + errChan <- err + }() + return rescan, errChan +} + +// Functional parameters for Update. +type updateOptions struct { + addrs []btcutil.Address + outPoints []wire.OutPoint + txIDs []chainhash.Hash + rewind uint32 +} + +// UpdateOption is a functional option argument for the Rescan.Update method. +type UpdateOption func(uo *updateOptions) + +func defaultUpdateOptions() *updateOptions { + return &updateOptions{} +} + +// AddAddrs adds addresses to the filter. +func AddAddrs(addrs ...btcutil.Address) UpdateOption { + return func(uo *updateOptions) { + uo.addrs = append(uo.addrs, addrs...) + } +} + +// AddOutPoints adds outpoints to the filter. +func AddOutPoints(outPoints ...wire.OutPoint) UpdateOption { + return func(uo *updateOptions) { + uo.outPoints = append(uo.outPoints, outPoints...) + } +} + +// AddTxIDs adds TxIDs to the filter. +func AddTxIDs(txIDs ...chainhash.Hash) UpdateOption { + return func(uo *updateOptions) { + uo.txIDs = append(uo.txIDs, txIDs...) + } +} + +// Rewind rewinds the rescan to the specified height (meaning, disconnects down +// to the block immediately after the specified height) and restarts it from +// that point with the (possibly) newly expanded filter. Especially useful when +// called in the same Update() as one of the previous three options. +func Rewind(height uint32) UpdateOption { + return func(uo *updateOptions) { + uo.rewind = height + } +} + +// Update sends an update to a long-running rescan/notification goroutine. +func (r *Rescan) Update(options ...UpdateOption) error { + running := atomic.LoadUint32(&r.running) + if running != 1 { + return fmt.Errorf("Rescan is already done and cannot be " + + "updated.") + } + uo := defaultUpdateOptions() + for _, option := range options { + option(uo) + } + r.updateChan <- uo + return nil +} + // GetUtxo gets the appropriate TxOut or errors if it's spent. The option // WatchOutPoints (with a single outpoint) is required. StartBlock can be used // to give a hint about which block the transaction is in, and TxIdx can be used // to give a hint of which transaction in the block matches it (coinbase is 0, // first normal transaction is 1, etc.). -func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { +func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, + *wire.MsgTx, error) { ro := defaultRescanOptions() ro.startBlock = &waddrmgr.BlockStamp{ Hash: *s.chainParams.GenesisHash, @@ -470,7 +650,7 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { option(ro) } if len(ro.watchOutPoints) != 1 { - return nil, fmt.Errorf("Must pass exactly one OutPoint.") + return nil, nil, fmt.Errorf("Must pass exactly one OutPoint.") } watchList := [][]byte{ builder.OutPointToFilterEntry(ro.watchOutPoints[0]), @@ -483,7 +663,7 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { Height: int32(curHeight), } if err != nil { - return nil, err + return nil, nil, err } // Find our earliest possible block. @@ -519,8 +699,9 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { filter, err := s.GetCFilter(curStamp.Hash, false, ro.queryOptions...) if err != nil { - return nil, fmt.Errorf("Couldn't get basic filter for "+ - "block %d (%s)", curStamp.Height, curStamp.Hash) + return nil, nil, fmt.Errorf("Couldn't get basic "+ + "filter for block %d (%s)", curStamp.Height, + curStamp.Hash) } matched := false if filter != nil { @@ -528,14 +709,14 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { &curStamp.Hash), watchList) } if err != nil { - return nil, err + return nil, nil, err } if !matched { filter, err = s.GetCFilter(curStamp.Hash, true, ro.queryOptions...) if err != nil { - return nil, fmt.Errorf("Couldn't get extended "+ - "filter for block %d (%s)", + return nil, nil, fmt.Errorf("Couldn't get "+ + "extended filter for block %d (%s)", curStamp.Height, curStamp.Hash) } if filter != nil { @@ -550,11 +731,12 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { block, err := s.GetBlockFromNetwork(curStamp.Hash, ro.queryOptions...) if err != nil { - return nil, err + return nil, nil, err } if block == nil { - return nil, fmt.Errorf("Couldn't get block %d "+ - "(%s)", curStamp.Height, curStamp.Hash) + return nil, nil, fmt.Errorf("Couldn't get "+ + "block %d (%s)", curStamp.Height, + curStamp.Hash) } // If we've spent the output in this block, return an // error stating that the output is spent. @@ -562,10 +744,7 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { for _, ti := range tx.MsgTx().TxIn { if ti.PreviousOutPoint == ro.watchOutPoints[0] { - return nil, fmt.Errorf( - "OutPoint %s has been "+ - "spent", - ro.watchOutPoints[0]) + return nil, tx.MsgTx(), nil } } } @@ -576,7 +755,7 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { ro.watchOutPoints[0].Hash { return tx.MsgTx(). TxOut[ro.watchOutPoints[0]. - Index], nil + Index], nil, nil } } } @@ -584,14 +763,14 @@ func (s *ChainService) GetUtxo(options ...RescanOption) (*wire.TxOut, error) { // far. curStamp.Height-- if curStamp.Height < ro.startBlock.Height { - return nil, fmt.Errorf("Couldn't find "+ + return nil, nil, fmt.Errorf("Couldn't find "+ "transaction %s", ro.watchOutPoints[0].Hash) } header, err := s.GetBlockByHeight( uint32(curStamp.Height)) if err != nil { - return nil, err + return nil, nil, err } curStamp.Hash = header.BlockHash() } diff --git a/spvsvc/spvchain/sync_test.go b/spvsvc/spvchain/sync_test.go index 8c8623a..6c2578d 100644 --- a/spvsvc/spvchain/sync_test.go +++ b/spvsvc/spvchain/sync_test.go @@ -60,6 +60,33 @@ var ( // "bd": OnBlockDisconnected // "fd": OnFilteredBlockDisconnected wantLog = func() (log []byte) { + for i := 796; i <= 800; i++ { + // BlockConnected and FilteredBlockConnected + log = append(log, []byte("bcfc")...) + // 0 relevant TXs + log = append(log, 0x00) + } + // Block with one relevant (receive) transaction + log = append(log, []byte("bcrvfc")...) + log = append(log, 0x01) + // 124 blocks with nothing + for i := 802; i <= 925; i++ { + log = append(log, []byte("bcfc")...) + log = append(log, 0x00) + } + // Block with 1 redeeming transaction + log = append(log, []byte("bcrdfc")...) + log = append(log, 0x01) + // Block with nothing + log = append(log, []byte("bcfc")...) + log = append(log, 0x00) + // Update with rewind - rewind back to 795, add another address, + // and see more interesting transactions. + for i := 927; i >= 796; i-- { + // BlockDisconnected and FilteredBlockDisconnected + log = append(log, []byte("bdfd")...) + } + // Forward to 800 for i := 796; i <= 800; i++ { // BlockConnected and FilteredBlockConnected log = append(log, []byte("bcfc")...) @@ -368,8 +395,24 @@ func TestSetup(t *testing.T) { if err != nil { t.Fatalf("Unable to send raw transaction to node: %s", err) } + privKey2, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("Couldn't generate private key: %s", err) + } + addr2, err := secSrc.add(privKey2) + if err != nil { + t.Fatalf("Couldn't create address from key: %s", err) + } + script2, err := secSrc.GetScript(addr2) + if err != nil { + t.Fatalf("Couldn't create script from address: %s", err) + } + out2 := wire.TxOut{ + PkScript: script2, + Value: 1000000000, + } // Fee rate is satoshis per byte - tx2, err := h1.CreateTransaction([]*wire.TxOut{&out1}, 1000) + tx2, err := h1.CreateTransaction([]*wire.TxOut{&out2}, 1000) if err != nil { t.Fatalf("Couldn't create transaction from script: %s", err) } @@ -393,7 +436,7 @@ func TestSetup(t *testing.T) { err = svc.Rescan( spvchain.StartBlock(&startBlock), spvchain.EndBlock(&endBlock), - spvchain.WatchTXIDs(tx1.TxHash()), + spvchain.WatchTxIDs(tx1.TxHash()), spvchain.NotificationHandlers(btcrpcclient.NotificationHandlers{ OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, @@ -441,7 +484,7 @@ func TestSetup(t *testing.T) { t.Fatalf("Couldn't find the index of our output in transaction"+ " %s", tx1.TxHash()) } - txo, err := svc.GetUtxo( + txo, redeemingTx, err := svc.GetUtxo( spvchain.WatchOutPoints(ourOutPoint), spvchain.StartBlock(&waddrmgr.BlockStamp{Height: 801}), ) @@ -458,7 +501,7 @@ func TestSetup(t *testing.T) { // results. quitRescan := make(chan struct{}) startBlock = waddrmgr.BlockStamp{Height: 795} - err = startRescan(t, svc, addr1, &startBlock, quitRescan) + rescan, errChan := startRescan(t, svc, addr1, &startBlock, quitRescan) if err != nil { t.Fatalf("Couldn't start a rescan for %s: %s", addr1, err) } @@ -466,10 +509,9 @@ func TestSetup(t *testing.T) { if err != nil { t.Fatalf("Couldn't sync ChainService: %s", err) } - numTXs, _, err := checkRescanStatus() - if numTXs != 2 { - t.Fatalf("Wrong number of relevant transactions. Want: 2, got:"+ + if numTXs != 1 { + t.Fatalf("Wrong number of relevant transactions. Want: 1, got:"+ " %d", numTXs) } @@ -493,7 +535,8 @@ func TestSetup(t *testing.T) { inputValues []btcutil.Amount, scripts [][]byte, err error) { ourIndex := 1 << 30 // Should work on 32-bit systems for i, txo := range tx.TxOut { - if bytes.Equal(txo.PkScript, script1) { + if bytes.Equal(txo.PkScript, script1) || + bytes.Equal(txo.PkScript, script2) { ourIndex = i } } @@ -524,32 +567,32 @@ func TestSetup(t *testing.T) { // Create another address to send to so we don't trip the rescan with // the old address and we can test monitoring both OutPoint usage and // receipt by addresses. - privKey2, err := btcec.NewPrivateKey(btcec.S256()) + privKey3, err := btcec.NewPrivateKey(btcec.S256()) if err != nil { t.Fatalf("Couldn't generate private key: %s", err) } - addr2, err := secSrc.add(privKey2) + addr3, err := secSrc.add(privKey3) if err != nil { t.Fatalf("Couldn't create address from key: %s", err) } - script2, err := secSrc.GetScript(addr2) + script3, err := secSrc.GetScript(addr3) if err != nil { t.Fatalf("Couldn't create script from address: %s", err) } - out2 := wire.TxOut{ - PkScript: script2, + out3 := wire.TxOut{ + PkScript: script3, Value: 500000000, } // Spend the first transaction and mine a block. authTx1, err := txauthor.NewUnsignedTransaction( []*wire.TxOut{ - &out2, + &out3, }, // Fee rate is satoshis per kilobyte 1024000, inSrc(*tx1), func() ([]byte, error) { - return script2, nil + return script3, nil }, ) if err != nil { @@ -573,20 +616,20 @@ func TestSetup(t *testing.T) { t.Fatalf("Couldn't sync ChainService: %s", err) } numTXs, _, err = checkRescanStatus() - if numTXs != 3 { - t.Fatalf("Wrong number of relevant transactions. Want: 3, got:"+ + if numTXs != 2 { + t.Fatalf("Wrong number of relevant transactions. Want: 2, got:"+ " %d", numTXs) } // Spend the second transaction and mine a block. authTx2, err := txauthor.NewUnsignedTransaction( []*wire.TxOut{ - &out2, + &out3, }, // Fee rate is satoshis per kilobyte 1024000, inSrc(*tx2), func() ([]byte, error) { - return script2, nil + return script3, nil }, ) if err != nil { @@ -610,10 +653,27 @@ func TestSetup(t *testing.T) { t.Fatalf("Couldn't sync ChainService: %s", err) } numTXs, _, err = checkRescanStatus() + if numTXs != 2 { + t.Fatalf("Wrong number of relevant transactions. Want: 2, got:"+ + " %d", numTXs) + } + + // Update the filter with the second address, and we should have 2 more + // relevant transactions. + err = rescan.Update(spvchain.AddAddrs(addr2), spvchain.Rewind(795)) + if err != nil { + t.Fatalf("Couldn't update the rescan filter: %s", err) + } + err = waitForSync(t, svc, h1) + if err != nil { + t.Fatalf("Couldn't sync ChainService: %s", err) + } + numTXs, _, err = checkRescanStatus() if numTXs != 4 { t.Fatalf("Wrong number of relevant transactions. Want: 4, got:"+ " %d", numTXs) } + // Generate a block with a nonstandard coinbase to generate a basic // filter with 0 entries. _, err = h1.GenerateAndSubmitBlockWithCustomCoinbaseOutputs( @@ -631,13 +691,17 @@ func TestSetup(t *testing.T) { } // Check and make sure the previous UTXO is now spent. - _, err = svc.GetUtxo( + txo, redeemingTx, err = svc.GetUtxo( spvchain.WatchOutPoints(ourOutPoint), spvchain.StartBlock(&waddrmgr.BlockStamp{Height: 801}), ) - if err.Error() != fmt.Sprintf("OutPoint %s has been spent", - ourOutPoint) { - t.Fatalf("UTXO %s not seen as spent: %s", ourOutPoint, err) + if err != nil { + t.Fatalf("Couldn't get UTXO %s: %s", ourOutPoint, err) + } + if redeemingTx.TxHash() != authTx1.Tx.TxHash() { + t.Fatalf("Redeeming transaction doesn't match expected "+ + "transaction: want %s, got %s", authTx1.Tx.TxHash(), + redeemingTx.TxHash()) } // Test that we can get blocks and cfilters via P2P and decide which are @@ -682,9 +746,26 @@ func TestSetup(t *testing.T) { } close(quitRescan) + err = <-errChan + if err != nil { + t.Fatalf("Rescan ended with error: %s", err) + } if !bytes.Equal(wantLog, gotLog) { - t.Fatalf("Rescan event logs incorrect.\nWant: %s\nGot: %s\n", - wantLog, gotLog) + leastBytes := len(wantLog) + if len(gotLog) < leastBytes { + leastBytes = len(gotLog) + } + diffIndex := 0 + for i := 0; i < leastBytes; i++ { + if wantLog[i] != gotLog[i] { + diffIndex = i + break + } + } + t.Fatalf("Rescan event logs differ starting at %d.\nWant: %s\n"+ + "Got: %s\nDifference - want: %s\nDifference -- got: "+ + "%s", diffIndex, wantLog, gotLog, wantLog[diffIndex:], + gotLog[diffIndex:]) } } @@ -1117,110 +1198,102 @@ func testRandomBlocks(t *testing.T, svc *spvchain.ChainService, // on the flow of the test. The rescan starts at the genesis block and the // notifications continue until the `quit` channel is closed. func startRescan(t *testing.T, svc *spvchain.ChainService, addr btcutil.Address, - startBlock *waddrmgr.BlockStamp, quit <-chan struct{}) error { - go func() { - err := svc.Rescan( - spvchain.QuitChan(quit), - spvchain.WatchAddrs(addr), - spvchain.StartBlock(startBlock), - spvchain.NotificationHandlers( - btcrpcclient.NotificationHandlers{ - OnBlockConnected: func( - hash *chainhash.Hash, - height int32, time time.Time) { - rescanMtx.Lock() - gotLog = append(gotLog, - []byte("bc")...) - curBlockHeight = height - rescanMtx.Unlock() - }, - OnBlockDisconnected: func( - hash *chainhash.Hash, - height int32, time time.Time) { - rescanMtx.Lock() - delete(ourKnownTxsByBlock, *hash) - gotLog = append(gotLog, - []byte("bd")...) - curBlockHeight = height - 1 - rescanMtx.Unlock() - }, - OnRecvTx: func(tx *btcutil.Tx, - details *btcjson.BlockDetails) { - rescanMtx.Lock() - hash, err := chainhash. - NewHashFromStr( - details.Hash) - if err != nil { - t.Errorf("Couldn't "+ - "decode hash "+ - "%s: %s", - details.Hash, - err) - } - ourKnownTxsByBlock[*hash] = append( - ourKnownTxsByBlock[*hash], - tx) - gotLog = append(gotLog, - []byte("rv")...) - rescanMtx.Unlock() - }, - OnRedeemingTx: func(tx *btcutil.Tx, - details *btcjson.BlockDetails) { - rescanMtx.Lock() - hash, err := chainhash. - NewHashFromStr( - details.Hash) - if err != nil { - t.Errorf("Couldn't "+ - "decode hash "+ - "%s: %s", - details.Hash, - err) - } - ourKnownTxsByBlock[*hash] = append( - ourKnownTxsByBlock[*hash], - tx) - gotLog = append(gotLog, - []byte("rd")...) - rescanMtx.Unlock() - }, - OnFilteredBlockConnected: func( - height int32, - header *wire.BlockHeader, - relevantTxs []*btcutil.Tx) { - rescanMtx.Lock() - ourKnownTxsByFilteredBlock[header.BlockHash()] = - relevantTxs - gotLog = append(gotLog, - []byte("fc")...) - gotLog = append(gotLog, - uint8(len(relevantTxs))) - curFilteredBlockHeight = height - rescanMtx.Unlock() - }, - OnFilteredBlockDisconnected: func( - height int32, - header *wire.BlockHeader) { - rescanMtx.Lock() - delete(ourKnownTxsByFilteredBlock, - header.BlockHash()) - gotLog = append(gotLog, - []byte("fd")...) - curFilteredBlockHeight = - height - 1 - rescanMtx.Unlock() - }, - }), - ) - if logLevel != btclog.Off { - if err != nil { - t.Logf("Rescan ended: %s", err) - } else { - t.Logf("Rescan ended successfully") - } - } - }() - return nil + startBlock *waddrmgr.BlockStamp, quit <-chan struct{}) (spvchain.Rescan, + <-chan error) { + rescan, errChan := svc.NewRescan( + spvchain.QuitChan(quit), + spvchain.WatchAddrs(addr), + spvchain.StartBlock(startBlock), + spvchain.NotificationHandlers( + btcrpcclient.NotificationHandlers{ + OnBlockConnected: func( + hash *chainhash.Hash, + height int32, time time.Time) { + rescanMtx.Lock() + gotLog = append(gotLog, + []byte("bc")...) + curBlockHeight = height + rescanMtx.Unlock() + }, + OnBlockDisconnected: func( + hash *chainhash.Hash, + height int32, time time.Time) { + rescanMtx.Lock() + delete(ourKnownTxsByBlock, *hash) + gotLog = append(gotLog, + []byte("bd")...) + curBlockHeight = height - 1 + rescanMtx.Unlock() + }, + OnRecvTx: func(tx *btcutil.Tx, + details *btcjson.BlockDetails) { + rescanMtx.Lock() + hash, err := chainhash. + NewHashFromStr( + details.Hash) + if err != nil { + t.Errorf("Couldn't "+ + "decode hash "+ + "%s: %s", + details.Hash, + err) + } + ourKnownTxsByBlock[*hash] = append( + ourKnownTxsByBlock[*hash], + tx) + gotLog = append(gotLog, + []byte("rv")...) + rescanMtx.Unlock() + }, + OnRedeemingTx: func(tx *btcutil.Tx, + details *btcjson.BlockDetails) { + rescanMtx.Lock() + hash, err := chainhash. + NewHashFromStr( + details.Hash) + if err != nil { + t.Errorf("Couldn't "+ + "decode hash "+ + "%s: %s", + details.Hash, + err) + } + ourKnownTxsByBlock[*hash] = append( + ourKnownTxsByBlock[*hash], + tx) + gotLog = append(gotLog, + []byte("rd")...) + rescanMtx.Unlock() + }, + OnFilteredBlockConnected: func( + height int32, + header *wire.BlockHeader, + relevantTxs []*btcutil.Tx) { + rescanMtx.Lock() + ourKnownTxsByFilteredBlock[header.BlockHash()] = + relevantTxs + gotLog = append(gotLog, + []byte("fc")...) + gotLog = append(gotLog, + uint8(len(relevantTxs))) + curFilteredBlockHeight = height + rescanMtx.Unlock() + }, + OnFilteredBlockDisconnected: func( + height int32, + header *wire.BlockHeader) { + rescanMtx.Lock() + delete(ourKnownTxsByFilteredBlock, + header.BlockHash()) + gotLog = append(gotLog, + []byte("fd")...) + curFilteredBlockHeight = + height - 1 + rescanMtx.Unlock() + }, + }), + ) + return rescan, errChan } // checkRescanStatus returns the number of relevant transactions we currently diff --git a/wallet/wallet.go b/wallet/wallet.go index 61ffc50..4aaa740 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -174,7 +174,7 @@ func (w *Wallet) SynchronizeRPC(chainClient *chain.RPCClient) { // consensus RPC server is set. This function and all functions that call it // are unstable and will need to be moved when the syncing code is moved out of // the wallet. -func (w *Wallet) requireChainClient() (*chain.RPCClient, error) { +func (w *Wallet) requireChainClient() (chain.Interface, error) { w.chainClientLock.Lock() chainClient := w.chainClient w.chainClientLock.Unlock() @@ -189,7 +189,7 @@ func (w *Wallet) requireChainClient() (*chain.RPCClient, error) { // // This function is unstable and will be removed once sync logic is moved out of // the wallet. -func (w *Wallet) ChainClient() *chain.RPCClient { +func (w *Wallet) ChainClient() chain.Interface { w.chainClientLock.Lock() chainClient := w.chainClient w.chainClientLock.Unlock()