More neutrino integration into btcwallet.

This commit is contained in:
Alex 2017-05-20 20:36:40 -06:00 committed by Olaoluwa Osuntokun
parent 3d81f856fd
commit 9e5250e6d7
4 changed files with 91 additions and 30 deletions

View file

@ -40,6 +40,14 @@ type (
// best chain.
BlockConnected wtxmgr.BlockMeta
// FilteredBlockConnected is an alternate notification that contains
// both block and relevant transaction information in one struct, which
// allows atomic updates.
FilteredBlockConnected struct {
Block *wtxmgr.BlockMeta
RelevantTxs []*wtxmgr.TxRecord
}
// BlockDisconnected is a notifcation that the block described by the
// BlockStamp was reorganized out of the best chain.
BlockDisconnected wtxmgr.BlockMeta

View file

@ -20,7 +20,7 @@ type SPVChain struct {
cs *neutrino.ChainService
// We currently support one rescan/notifiction goroutine per client
rescan *neutrino.Rescan
rescan neutrino.Rescan
enqueueNotification chan interface{}
dequeueNotification chan interface{}
@ -31,6 +31,7 @@ type SPVChain struct {
wg sync.WaitGroup
started bool
scanning bool
finished bool
clientMtx sync.Mutex
}
@ -131,23 +132,59 @@ func (s *SPVChain) Rescan(startHash *chainhash.Hash, addrs []btcutil.Address,
}
s.rescanQuit = make(chan struct{})
s.scanning = true
s.finished = false
s.clientMtx.Unlock()
return s.cs.Rescan(
watchOutPoints := make([]wire.OutPoint, 0, len(outPoints))
for _, op := range outPoints {
watchOutPoints = append(watchOutPoints, *op)
}
s.rescan = s.cs.NewRescan(
neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...),
neutrino.WatchOutPoints(watchOutPoints...),
)
return nil
}
// NotifyBlocks replicates the RPC client's NotifyBlocks command.
func (s *SPVChain) NotifyBlocks() error {
s.clientMtx.Lock()
defer s.clientMtx.Unlock()
// If we're scanning, we're already notifying on blocks. Otherwise,
// start a rescan without watching any addresses.
if !s.scanning {
return s.NotifyReceived([]btcutil.Address{})
}
return nil
}
// NotifyReceived replicates the RPC client's NotifyReceived command.
func (s *SPVChain) NotifyReceived() error {
func (s *SPVChain) NotifyReceived(addrs []btcutil.Address) error {
// If we have a rescan running, we just need to add the appropriate
// addresses to the watch list.
s.clientMtx.Lock()
if s.scanning {
s.clientMtx.Unlock()
return s.rescan.Update(neutrino.AddAddrs(addrs...))
}
s.rescanQuit = make(chan struct{})
s.scanning = true
// Don't need RescanFinished notifications.
s.finished = true
s.clientMtx.Unlock()
// Rescan with just the specified addresses.
s.rescan = s.cs.NewRescan(
neutrino.NotificationHandlers(btcrpcclient.NotificationHandlers{
OnFilteredBlockConnected: s.onFilteredBlockConnected,
OnBlockDisconnected: s.onBlockDisconnected,
}),
neutrino.QuitChan(s.rescanQuit),
neutrino.WatchAddrs(addrs...),
)
return nil
}
@ -160,46 +197,47 @@ func (s *SPVChain) Notifications() <-chan interface{} {
// channel.
func (s *SPVChain) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*btcutil.Tx) {
blockMeta := wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
ntfn := FilteredBlockConnected{
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
},
Time: header.Timestamp,
},
Time: header.Timestamp,
}
for _, tx := range relevantTxs {
rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(),
header.Timestamp)
if err != nil {
log.Errorf("Cannot create transaction record for "+
"relevant tx: %s", err)
// TODO(aakselrod): Return?
continue
}
ntfn.RelevantTxs = append(ntfn.RelevantTxs, rec)
}
select {
case s.enqueueNotification <- BlockConnected(blockMeta):
case s.enqueueNotification <- ntfn:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
for _, tx := range relevantTxs {
rec, err := wtxmgr.NewTxRecordFromMsgTx(tx.MsgTx(),
blockMeta.Time)
if err != nil {
log.Errorf("Cannot create transaction record for "+
"relevant tx: %s", err)
// TODO(aakselrod): Continue?
return
}
select {
case s.enqueueNotification <- RelevantTx{
TxRecord: rec,
Block: &blockMeta,
}:
case <-s.quit:
return
case <-s.rescanQuit:
return
}
}
bs, err := s.cs.SyncedTo()
if err != nil {
log.Errorf("Can't get chain service's best block: %s", err)
return
}
if bs.Hash == header.BlockHash() {
// Only send the RescanFinished notification once.
s.clientMtx.Lock()
if s.finished {
s.clientMtx.Unlock()
return
}
s.finished = true
s.clientMtx.Unlock()
select {
case s.enqueueNotification <- RescanFinished{
Hash: &bs.Hash,

2
glide.lock generated
View file

@ -97,7 +97,7 @@ imports:
- name: github.com/kkdai/bstream
version: f391b8402d23024e7c0f624b31267a89998fca95
- name: github.com/lightninglabs/neutrino
version: 3a3b87d375c492f22de128f4f5df889e0340bd35
version: 7306107b67bb4eea6f70bc598d28049ea00ac442
repo: git@github.com:lightninglabs/neutrino
- name: golang.org/x/crypto
version: 0fe963104e9d1877082f8fb38f816fcd97eb1d10

View file

@ -52,6 +52,21 @@ func (w *Wallet) handleChainNotifications() {
return w.addRelevantTx(tx, n.TxRecord, n.Block)
})
notificationName = "recvtx/redeemingtx"
case chain.FilteredBlockConnected:
// Atomically update for the whole block.
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
err := w.connectBlock(tx, *n.Block)
if err != nil {
return err
}
for _, rec := range n.RelevantTxs {
err := w.addRelevantTx(tx, rec, n.Block)
if err != nil {
return err
}
}
return nil
})
// The following are handled by the wallet's rescan
// goroutines, so just pass them there.