From 9e5250e6d7c071aae1783a9762f61afca2e677d5 Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 20 May 2017 20:36:40 -0600 Subject: [PATCH] More neutrino integration into btcwallet. --- chain/interface.go | 8 ++++ chain/neutrino.go | 96 +++++++++++++++++++++++++++++++------------- glide.lock | 2 +- wallet/chainntfns.go | 15 +++++++ 4 files changed, 91 insertions(+), 30 deletions(-) diff --git a/chain/interface.go b/chain/interface.go index 67e864e..a84487f 100644 --- a/chain/interface.go +++ b/chain/interface.go @@ -40,6 +40,14 @@ type ( // best chain. BlockConnected wtxmgr.BlockMeta + // FilteredBlockConnected is an alternate notification that contains + // both block and relevant transaction information in one struct, which + // allows atomic updates. + FilteredBlockConnected struct { + Block *wtxmgr.BlockMeta + RelevantTxs []*wtxmgr.TxRecord + } + // BlockDisconnected is a notifcation that the block described by the // BlockStamp was reorganized out of the best chain. BlockDisconnected wtxmgr.BlockMeta diff --git a/chain/neutrino.go b/chain/neutrino.go index 4900a8c..79397a8 100644 --- a/chain/neutrino.go +++ b/chain/neutrino.go @@ -20,7 +20,7 @@ type SPVChain struct { cs *neutrino.ChainService // We currently support one rescan/notifiction goroutine per client - rescan *neutrino.Rescan + rescan neutrino.Rescan enqueueNotification chan interface{} dequeueNotification chan interface{} @@ -31,6 +31,7 @@ type SPVChain struct { wg sync.WaitGroup started bool scanning bool + finished bool clientMtx sync.Mutex } @@ -131,23 +132,59 @@ func (s *SPVChain) Rescan(startHash *chainhash.Hash, addrs []btcutil.Address, } s.rescanQuit = make(chan struct{}) s.scanning = true + s.finished = false s.clientMtx.Unlock() - return s.cs.Rescan( + watchOutPoints := make([]wire.OutPoint, 0, len(outPoints)) + for _, op := range outPoints { + watchOutPoints = append(watchOutPoints, *op) + } + s.rescan = s.cs.NewRescan( neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{ OnFilteredBlockConnected: s.onFilteredBlockConnected, OnBlockDisconnected: s.onBlockDisconnected, }), neutrino.QuitChan(s.rescanQuit), + neutrino.WatchAddrs(addrs...), + neutrino.WatchOutPoints(watchOutPoints...), ) + return nil } // NotifyBlocks replicates the RPC client's NotifyBlocks command. func (s *SPVChain) NotifyBlocks() error { + s.clientMtx.Lock() + defer s.clientMtx.Unlock() + // If we're scanning, we're already notifying on blocks. Otherwise, + // start a rescan without watching any addresses. + if !s.scanning { + return s.NotifyReceived([]btcutil.Address{}) + } return nil } // NotifyReceived replicates the RPC client's NotifyReceived command. -func (s *SPVChain) NotifyReceived() error { +func (s *SPVChain) NotifyReceived(addrs []btcutil.Address) error { + // If we have a rescan running, we just need to add the appropriate + // addresses to the watch list. + s.clientMtx.Lock() + if s.scanning { + s.clientMtx.Unlock() + return s.rescan.Update(neutrino.AddAddrs(addrs...)) + } + s.rescanQuit = make(chan struct{}) + s.scanning = true + // Don't need RescanFinished notifications. + s.finished = true + s.clientMtx.Unlock() + // Rescan with just the specified addresses. + s.rescan = s.cs.NewRescan( + neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{ + OnFilteredBlockConnected: s.onFilteredBlockConnected, + OnBlockDisconnected: s.onBlockDisconnected, + }), + neutrino.QuitChan(s.rescanQuit), + neutrino.WatchAddrs(addrs...), + ) return nil } @@ -160,46 +197,47 @@ func (s *SPVChain) Notifications() <-chan interface{} { // channel. func (s *SPVChain) onFilteredBlockConnected(height int32, header *wire.BlockHeader, relevantTxs []*btcutil.Tx) { - blockMeta := wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - Hash: header.BlockHash(), - Height: height, + ntfn := FilteredBlockConnected{ + Block: &wtxmgr.BlockMeta{ + Block: wtxmgr.Block{ + Hash: header.BlockHash(), + Height: height, + }, + Time: header.Timestamp, }, - Time: header.Timestamp, + } + for _, tx := range relevantTxs { + rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(), + header.Timestamp) + if err != nil { + log.Errorf("Cannot create transaction record for "+ + "relevant tx: %s", err) + // TODO(aakselrod): Return? + continue + } + ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec) } select { - case s.enqueueNotification <- BlockConnected(blockMeta): + case s.enqueueNotification <- ntfn: case <-s.quit: return case <-s.rescanQuit: return } - for _, tx := range relevantTxs { - rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(), - blockMeta.Time) - if err != nil { - log.Errorf("Cannot create transaction record for "+ - "relevant tx: %s", err) - // TODO(aakselrod): Continue? - return - } - select { - case s.enqueueNotification <- RelevantTx{ - TxRecord: rec, - Block: &blockMeta, - }: - case <-s.quit: - return - case <-s.rescanQuit: - return - } - } bs, err := s.cs.SyncedTo() if err != nil { log.Errorf("Can't get chain service's best block: %s", err) return } if bs.Hash == header.BlockHash() { + // Only send the RescanFinished notification once. + s.clientMtx.Lock() + if s.finished { + s.clientMtx.Unlock() + return + } + s.finished = true + s.clientMtx.Unlock() select { case s.enqueueNotification <- RescanFinished{ Hash: &bs.Hash, diff --git a/glide.lock b/glide.lock index 3a46534..2a09c6d 100644 --- a/glide.lock +++ b/glide.lock @@ -97,7 +97,7 @@ imports: - name: github.com/kkdai/bstream version: f391b8402d23024e7c0f624b31267a89998fca95 - name: github.com/lightninglabs/neutrino - version: 3a3b87d375c492f22de128f4f5df889e0340bd35 + version: 7306107b67bb4eea6f70bc598d28049ea00ac442 repo: git@github.com:lightninglabs/neutrino - name: golang.org/x/crypto version: 0fe963104e9d1877082f8fb38f816fcd97eb1d10 diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index 2a18eb6..bc0ab30 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -52,6 +52,21 @@ func (w *Wallet) handleChainNotifications() { return w.addRelevantTx(tx, n.TxRecord, n.Block) }) notificationName = "recvtx/redeemingtx" + case chain.FilteredBlockConnected: + // Atomically update for the whole block. + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + err := w.connectBlock(tx, *n.Block) + if err != nil { + return err + } + for _, rec := range n.RelevantTxs { + err := w.addRelevantTx(tx, rec, n.Block) + if err != nil { + return err + } + } + return nil + }) // The following are handled by the wallet's rescan // goroutines, so just pass them there.