lbcwallet/chain/bitcoind.go
Olaoluwa Osuntokun 6a0e6da280 chain: continue to accept []wire.OutPoint for rescan updates for bitcoind
In this commit, we fix a recently introduced bug that would cause
callers that attempted to get a spend notifications for a particular
outpoint to no longer get that precise notification. In a prior commit,
in preparation for the new neutrino gcs filter format, we added a
mapping from script to outpoint that will be used by neutrino, and later
other backends as well. However, we still need to match on outpoint
slices for bitcoind today.
2018-07-17 19:05:17 -07:00

1249 lines
34 KiB
Go

package chain
import (
"bytes"
"container/list"
"encoding/hex"
"errors"
"net"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightninglabs/gozmq"
)
// BitcoindClient represents a persistent client connection to a bitcoind server
// for information regarding the current best block chain.
type BitcoindClient struct {
client *rpcclient.Client
connConfig *rpcclient.ConnConfig // Work around unexported field
chainParams *chaincfg.Params
zmqConnect string
zmqPollInterval time.Duration
enqueueNotification chan interface{}
dequeueNotification chan interface{}
currentBlock chan *waddrmgr.BlockStamp
clientMtx sync.RWMutex
rescanUpdate chan interface{}
startTime time.Time
watchOutPoints map[wire.OutPoint]struct{}
watchAddrs map[string]struct{}
watchTxIDs map[chainhash.Hash]struct{}
notify uint32
quit chan struct{}
wg sync.WaitGroup
started bool
quitMtx sync.Mutex
memPool map[chainhash.Hash]struct{}
memPoolExp map[int32]map[chainhash.Hash]struct{}
}
// NewBitcoindClient creates a client connection to the server described by the
// connect string. If disableTLS is false, the remote RPC certificate must be
// provided in the certs slice. The connection is not established immediately,
// but must be done using the Start method. If the remote server does not
// operate on the same bitcoin network as described by the passed chain
// parameters, the connection will be disconnected.
func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass,
zmqConnect string, zmqPollInterval time.Duration) (*BitcoindClient,
error) {
client := &BitcoindClient{
connConfig: &rpcclient.ConnConfig{
Host: connect,
User: user,
Pass: pass,
DisableAutoReconnect: false,
DisableConnectOnNew: true,
DisableTLS: true,
HTTPPostMode: true,
},
chainParams: chainParams,
zmqConnect: zmqConnect,
zmqPollInterval: zmqPollInterval,
enqueueNotification: make(chan interface{}),
dequeueNotification: make(chan interface{}),
currentBlock: make(chan *waddrmgr.BlockStamp),
rescanUpdate: make(chan interface{}),
watchOutPoints: make(map[wire.OutPoint]struct{}),
watchAddrs: make(map[string]struct{}),
watchTxIDs: make(map[chainhash.Hash]struct{}),
quit: make(chan struct{}),
memPool: make(map[chainhash.Hash]struct{}),
memPoolExp: make(map[int32]map[chainhash.Hash]struct{}),
}
rpcClient, err := rpcclient.New(client.connConfig, nil)
if err != nil {
return nil, err
}
client.client = rpcClient
return client, nil
}
// BackEnd returns the name of the driver.
func (c *BitcoindClient) BackEnd() string {
return "bitcoind"
}
// GetCurrentNet returns the network on which the bitcoind instance is running.
func (c *BitcoindClient) GetCurrentNet() (wire.BitcoinNet, error) {
hash, err := c.client.GetBlockHash(0)
if err != nil {
return 0, err
}
switch *hash {
case *chaincfg.TestNet3Params.GenesisHash:
return chaincfg.TestNet3Params.Net, nil
case *chaincfg.RegressionNetParams.GenesisHash:
return chaincfg.RegressionNetParams.Net, nil
case *chaincfg.MainNetParams.GenesisHash:
return chaincfg.MainNetParams.Net, nil
default:
return 0, errors.New("unknown network")
}
}
// GetBestBlock returns the highest block known to bitcoind.
func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) {
bcinfo, err := c.client.GetBlockChainInfo()
if err != nil {
return nil, 0, err
}
hash, err := chainhash.NewHashFromStr(bcinfo.BestBlockHash)
if err != nil {
return nil, 0, err
}
return hash, bcinfo.Blocks, nil
}
// GetBlockHeight returns the height for the hash, if known, or returns an
// error.
func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) {
header, err := c.GetBlockHeaderVerbose(hash)
if err != nil {
return 0, err
}
return header.Height, nil
}
// GetBlock returns a block from the hash.
func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock,
error) {
return c.client.GetBlock(hash)
}
// GetBlockVerbose returns a verbose block from the hash.
func (c *BitcoindClient) GetBlockVerbose(hash *chainhash.Hash) (
*btcjson.GetBlockVerboseResult, error) {
return c.client.GetBlockVerbose(hash)
}
// GetBlockHash returns a block hash from the height.
func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) {
return c.client.GetBlockHash(height)
}
// GetBlockHeader returns a block header from the hash.
func (c *BitcoindClient) GetBlockHeader(
hash *chainhash.Hash) (*wire.BlockHeader, error) {
return c.client.GetBlockHeader(hash)
}
// GetBlockHeaderVerbose returns a block header from the hash.
func (c *BitcoindClient) GetBlockHeaderVerbose(hash *chainhash.Hash) (
*btcjson.GetBlockHeaderVerboseResult, error) {
return c.client.GetBlockHeaderVerbose(hash)
}
// GetRawTransactionVerbose returns a transaction from the tx hash.
func (c *BitcoindClient) GetRawTransactionVerbose(hash *chainhash.Hash) (
*btcjson.TxRawResult, error) {
return c.client.GetRawTransactionVerbose(hash)
}
// GetTxOut returns a txout from the outpoint info provided.
func (c *BitcoindClient) GetTxOut(txHash *chainhash.Hash, index uint32,
mempool bool) (*btcjson.GetTxOutResult, error) {
return c.client.GetTxOut(txHash, index, mempool)
}
// NotifyReceived updates the watch list with the passed addresses.
func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error {
c.NotifyBlocks()
select {
case c.rescanUpdate <- addrs:
case <-c.quit:
}
return nil
}
// NotifySpent updates the watch list with the passed outPoints.
func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error {
c.NotifyBlocks()
select {
case c.rescanUpdate <- outPoints:
case <-c.quit:
}
return nil
}
// NotifyTxIDs updates the watch list with the passed TxIDs.
func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error {
c.NotifyBlocks()
select {
case c.rescanUpdate <- txids:
case <-c.quit:
}
return nil
}
// NotifyBlocks enables notifications.
func (c *BitcoindClient) NotifyBlocks() error {
atomic.StoreUint32(&c.notify, 1)
return nil
}
// notifying returns true if notifications have been turned on; false otherwise.
func (c *BitcoindClient) notifying() bool {
return (atomic.LoadUint32(&c.notify) == 1)
}
// LoadTxFilter updates the transaction watchlists for the client. Acceptable
// arguments after `reset` are any combination of []btcutil.Address,
// []wire.OutPoint, []*wire.OutPoint, []chainhash.Hash, and []*chainhash.Hash.
func (c *BitcoindClient) LoadTxFilter(reset bool,
watchLists ...interface{}) error {
// If we reset, signal that.
if reset {
select {
case c.rescanUpdate <- reset:
case <-c.quit:
return nil
}
}
// This helper function will send an update to the filter. If the quit
// channel is closed, it will allow the outer loop below to finish,
// but skip over any updates as the quit case is triggered each time.
sendList := func(list interface{}) {
select {
case c.rescanUpdate <- list:
case <-c.quit:
}
}
for _, watchList := range watchLists {
switch list := watchList.(type) {
case []wire.OutPoint:
sendList(list)
case []*wire.OutPoint:
sendList(list)
case []btcutil.Address:
sendList(list)
case []chainhash.Hash:
sendList(list)
case []*chainhash.Hash:
sendList(list)
default:
log.Warnf("Couldn't add item to filter: unknown type")
}
}
return nil
}
// RescanBlocks rescans any blocks passed, returning only the blocks that
// matched as []btcjson.BlockDetails.
func (c *BitcoindClient) RescanBlocks(blockHashes []chainhash.Hash) (
[]btcjson.RescannedBlock, error) {
rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes))
for _, hash := range blockHashes {
header, err := c.GetBlockHeaderVerbose(&hash)
if err != nil {
log.Warnf("Unable to get header %s from bitcoind: %s",
hash, err)
continue
}
block, err := c.GetBlock(&hash)
if err != nil {
log.Warnf("Unable to get block %s from bitcoind: %s",
hash, err)
continue
}
relevantTxes, err := c.filterBlock(block, header.Height, false)
if len(relevantTxes) > 0 {
rescannedBlock := btcjson.RescannedBlock{
Hash: hash.String(),
}
for _, tx := range relevantTxes {
rescannedBlock.Transactions = append(
rescannedBlock.Transactions,
hex.EncodeToString(tx.SerializedTx),
)
}
rescannedBlocks = append(rescannedBlocks,
rescannedBlock)
}
}
return rescannedBlocks, nil
}
// Rescan rescans from the block with the given hash until the current block,
// after adding the passed addresses and outpoints to the client's watch list.
func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash,
addrs []btcutil.Address, outPoints map[wire.OutPoint]btcutil.Address) error {
if blockHash == nil {
return errors.New("rescan requires a starting block hash")
}
// Update addresses.
select {
case c.rescanUpdate <- addrs:
case <-c.quit:
return nil
}
// Update outpoints.
select {
case c.rescanUpdate <- outPoints:
case <-c.quit:
return nil
}
// Kick off the rescan with the starting block hash.
select {
case c.rescanUpdate <- blockHash:
case <-c.quit:
return nil
}
return nil
}
// SendRawTransaction sends a raw transaction via bitcoind.
func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx,
allowHighFees bool) (*chainhash.Hash, error) {
return c.client.SendRawTransaction(tx, allowHighFees)
}
// Start attempts to establish a client connection with the remote server.
// If successful, handler goroutines are started to process notifications
// sent by the server. After a limited number of connection attempts, this
// function gives up, and therefore will not block forever waiting for the
// connection to be established to a server that may not exist.
func (c *BitcoindClient) Start() error {
// Verify that the server is running on the expected network.
net, err := c.GetCurrentNet()
if err != nil {
c.client.Disconnect()
return err
}
if net != c.chainParams.Net {
c.client.Disconnect()
return errors.New("mismatched networks")
}
// Connect a ZMQ socket for block notifications
zmqClient, err := gozmq.Subscribe(c.zmqConnect, []string{"rawblock",
"rawtx"}, c.zmqPollInterval)
if err != nil {
return err
}
c.quitMtx.Lock()
c.started = true
c.quitMtx.Unlock()
c.wg.Add(2)
go c.handler()
go c.socketHandler(zmqClient)
return nil
}
// Stop disconnects the client and signals the shutdown of all goroutines
// started by Start.
func (c *BitcoindClient) Stop() {
c.quitMtx.Lock()
select {
case <-c.quit:
default:
close(c.quit)
c.client.Shutdown()
if !c.started {
close(c.dequeueNotification)
}
}
c.quitMtx.Unlock()
}
// WaitForShutdown blocks until both the client has finished disconnecting
// and all handlers have exited.
func (c *BitcoindClient) WaitForShutdown() {
c.client.WaitForShutdown()
c.wg.Wait()
}
// Notifications returns a channel of parsed notifications sent by the remote
// bitcoin RPC server. This channel must be continually read or the process
// may abort for running out memory, as unread notifications are queued for
// later reads.
func (c *BitcoindClient) Notifications() <-chan interface{} {
return c.dequeueNotification
}
// SetStartTime is a non-interface method to set the birthday of the wallet
// using this object. Since only a single rescan at a time is currently
// supported, only one birthday needs to be set. This does not fully restart a
// running rescan, so should not be used to update a rescan while it is running.
// TODO: When factoring out to multiple rescans per bitcoind client, add a
// birthday per client.
func (c *BitcoindClient) SetStartTime(startTime time.Time) {
c.clientMtx.Lock()
defer c.clientMtx.Unlock()
c.startTime = startTime
}
// BlockStamp returns the latest block notified by the client, or an error
// if the client has been shut down.
func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) {
select {
case bs := <-c.currentBlock:
return bs, nil
case <-c.quit:
return nil, errors.New("disconnected")
}
}
func (c *BitcoindClient) onClientConnect() {
select {
case c.enqueueNotification <- ClientConnected{}:
case <-c.quit:
}
}
func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) {
if c.notifying() {
select {
case c.enqueueNotification <- BlockConnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
},
Time: time,
}:
case <-c.quit:
}
}
}
func (c *BitcoindClient) onFilteredBlockConnected(height int32,
header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) {
if c.notifying() {
select {
case c.enqueueNotification <- FilteredBlockConnected{
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: header.BlockHash(),
Height: height,
},
Time: header.Timestamp,
},
RelevantTxs: relevantTxs,
}:
case <-c.quit:
}
}
}
func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) {
if c.notifying() {
select {
case c.enqueueNotification <- BlockDisconnected{
Block: wtxmgr.Block{
Hash: *hash,
Height: height,
},
Time: time,
}:
case <-c.quit:
}
}
}
func (c *BitcoindClient) onRelevantTx(rec *wtxmgr.TxRecord,
block *btcjson.BlockDetails) {
blk, err := parseBlock(block)
if err != nil {
// Log and drop improper notification.
log.Errorf("recvtx notification bad block: %v", err)
return
}
select {
case c.enqueueNotification <- RelevantTx{rec, blk}:
case <-c.quit:
}
}
func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) {
select {
case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}:
case <-c.quit:
}
}
func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) {
log.Infof("Rescan finished at %d (%s)", height, hash)
select {
case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}:
case <-c.quit:
}
}
// socketHandler reads events from the ZMQ socket, processes them as
// appropriate, and queues them as btcd or neutrino would.
func (c *BitcoindClient) socketHandler(zmqClient *gozmq.Conn) {
defer c.wg.Done()
defer zmqClient.Close()
log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect)
c.onClientConnect()
// Get initial conditions.
bestHash, bestHeight, err := c.GetBestBlock()
if err != nil {
log.Error(err)
return
}
bestHeader, err := c.GetBlockHeaderVerbose(bestHash)
if err != nil {
log.Error(err)
return
}
bs := &waddrmgr.BlockStamp{
Height: bestHeight,
Hash: *bestHash,
Timestamp: time.Unix(bestHeader.Time, 0),
}
mainLoop:
for {
selectLoop:
for {
// Check for any requests before we poll events from
// bitcoind.
select {
// Quit if requested
case <-c.quit:
return
// Update our monitored watchlists or do a rescan.
case event := <-c.rescanUpdate:
switch e := event.(type) {
// We're clearing the watchlists.
case struct{}:
c.clientMtx.Lock()
c.watchAddrs = make(map[string]struct{})
c.watchTxIDs = make(map[chainhash.Hash]struct{})
c.watchOutPoints =
make(map[wire.OutPoint]struct{})
c.clientMtx.Unlock()
// We're updating monitored addresses.
case []btcutil.Address:
c.clientMtx.Lock()
for _, addr := range e {
c.watchAddrs[addr.EncodeAddress()] =
struct{}{}
}
c.clientMtx.Unlock()
// We're updating monitored outpoints from
// pointers.
case []*wire.OutPoint:
c.clientMtx.Lock()
for _, op := range e {
c.watchOutPoints[*op] = struct{}{}
}
c.clientMtx.Unlock()
case []wire.OutPoint:
c.clientMtx.Lock()
for _, op := range e {
c.watchOutPoints[op] = struct{}{}
}
c.clientMtx.Unlock()
// We're updating monitored outpoints that map
// to the scripts that we should scan for.
case map[wire.OutPoint]btcutil.Address:
c.clientMtx.Lock()
for op := range e {
c.watchOutPoints[op] = struct{}{}
}
c.clientMtx.Unlock()
// We're adding monitored TXIDs.
case []*chainhash.Hash:
c.clientMtx.Lock()
for _, txid := range e {
c.watchTxIDs[*txid] = struct{}{}
}
c.clientMtx.Unlock()
case []chainhash.Hash:
c.clientMtx.Lock()
for _, txid := range e {
c.watchTxIDs[txid] = struct{}{}
}
c.clientMtx.Unlock()
// We're rescanning from the passed hash.
case *chainhash.Hash:
err = c.rescan(e)
if err != nil {
log.Errorf("rescan failed: %s",
err)
}
}
default:
break selectLoop
}
}
// Now, poll events from bitcoind.
msgBytes, err := zmqClient.Receive()
if err != nil {
switch e := err.(type) {
case net.Error:
if !e.Timeout() {
log.Error(err)
}
default:
log.Error(err)
}
continue mainLoop
}
// We have an event!
switch string(msgBytes[0]) {
// We have a transaction, so process it.
case "rawtx":
tx := &wire.MsgTx{}
err = tx.Deserialize(bytes.NewBuffer(msgBytes[1]))
if err != nil {
log.Error(err)
continue mainLoop
}
// filterTx automatically detects whether this tx has
// been mined and responds appropriately.
_, _, err := c.filterTx(tx, nil, true)
if err != nil {
log.Error(err)
}
// We have a raw block, so we process it.
case "rawblock":
block := &wire.MsgBlock{}
err = block.Deserialize(bytes.NewBuffer(msgBytes[1]))
if err != nil {
log.Error(err)
continue mainLoop
}
// Check if the block is logically next. If not, we
// have a reorg.
if block.Header.PrevBlock == bs.Hash {
// No reorg. Notify the subscriber of the block.
bs.Hash = block.BlockHash()
bs.Height++
bs.Timestamp = block.Header.Timestamp
_, err = c.filterBlock(block, bs.Height, true)
if err != nil {
log.Error(err)
}
continue mainLoop
}
// We have a reorg.
err = c.reorg(bs, block)
if err != nil {
log.Errorf("Error during reorg: %v", err)
}
// Our event is not a block or other type we're
// watching, so we ignore it.
default:
}
}
}
// reorg processes a reorganization during chain synchronization. This is
// separate from a rescan's handling of a reorg.
func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) error {
// We rewind until we find a common ancestor between the known chain
//and the current chain, and then fast forward again. This relies on
// being able to fetch both from bitcoind; to change that would require
// changes in downstream code.
// TODO: Make this more robust in order not to rely on this behavior.
log.Debugf("Possible reorg at block %s", block.BlockHash())
knownHeader, err := c.GetBlockHeader(&bs.Hash)
if err != nil {
return err
}
// We also get the best known height based on the block which was
// notified. This way, we can preserve the chain of blocks we need to
// retrieve.
bestHash := block.BlockHash()
bestHeight, err := c.GetBlockHeight(&bestHash)
if err != nil {
return err
}
if bestHeight < bs.Height {
log.Debug("multiple reorgs in a row")
return nil
}
// We track the block headers from the notified block to the current
// block at the known block height. This will let us fast-forward
// despite any future reorgs.
var reorgBlocks list.List
reorgBlocks.PushFront(block)
for i := bestHeight - 1; i >= bs.Height; i-- {
block, err = c.GetBlock(&block.Header.PrevBlock)
if err != nil {
return err
}
reorgBlocks.PushFront(block)
}
// Now we rewind back to the last common ancestor block, using the
// prevblock hash from each header to avoid any race conditions. If we
// get more reorgs, they'll be queued and we'll repeat the cycle.
for block.Header.PrevBlock != knownHeader.PrevBlock {
log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash)
c.onBlockDisconnected(&bs.Hash, bs.Height,
knownHeader.Timestamp)
bs.Height--
bs.Hash = knownHeader.PrevBlock
block, err = c.GetBlock(&block.Header.PrevBlock)
if err != nil {
return err
}
reorgBlocks.PushFront(block)
knownHeader, err = c.GetBlockHeader(&knownHeader.PrevBlock)
if err != nil {
return err
}
bs.Timestamp = knownHeader.Timestamp
}
// Disconnect the last block from the old chain. Since the PrevBlock is
// equal between the old and new chains, the tip will now be the last
// common ancestor.
log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash)
c.onBlockDisconnected(&bs.Hash, bs.Height, knownHeader.Timestamp)
bs.Height--
// Now we fast-forward to the notified block, notifying along the way.
for reorgBlocks.Front() != nil {
block = reorgBlocks.Front().Value.(*wire.MsgBlock)
bs.Height++
bs.Hash = block.BlockHash()
c.filterBlock(block, bs.Height, true)
reorgBlocks.Remove(reorgBlocks.Front())
}
return nil
}
// FilterBlocks scans the blocks contained in the FilterBlocksRequest for any
// addresses of interest. Each block will be fetched and filtered sequentially,
// returning a FilterBlocksReponse for the first block containing a matching
// address. If no matches are found in the range of blocks requested, the
// returned response will be nil.
func (c *BitcoindClient) FilterBlocks(
req *FilterBlocksRequest) (*FilterBlocksResponse, error) {
blockFilterer := NewBlockFilterer(c.chainParams, req)
// Iterate over the requested blocks, fetching each from the rpc client.
// Each block will scanned using the reverse addresses indexes generated
// above, breaking out early if any addresses are found.
for i, block := range req.Blocks {
// TODO(conner): add prefetching, since we already know we'll be
// fetching *every* block
rawBlock, err := c.client.GetBlock(&block.Hash)
if err != nil {
return nil, err
}
if !blockFilterer.FilterBlock(rawBlock) {
continue
}
// If any external or internal addresses were detected in this
// block, we return them to the caller so that the rescan
// windows can widened with subsequent addresses. The
// `BatchIndex` is returned so that the caller can compute the
// *next* block from which to begin again.
resp := &FilterBlocksResponse{
BatchIndex: uint32(i),
BlockMeta: block,
FoundExternalAddrs: blockFilterer.FoundExternal,
FoundInternalAddrs: blockFilterer.FoundInternal,
FoundOutPoints: blockFilterer.FoundOutPoints,
RelevantTxns: blockFilterer.RelevantTxns,
}
return resp, nil
}
// No addresses were found for this range.
return nil, nil
}
// rescan performs a rescan of the chain using a bitcoind back-end, from the
// specified hash to the best-known hash, while watching out for reorgs that
// happen during the rescan. It uses the addresses and outputs being tracked
// by the client in the watch list. This is called only within a queue
// processing loop.
func (c *BitcoindClient) rescan(hash *chainhash.Hash) error {
// We start by getting the best already-processed block. We only use
// the height, as the hash can change during a reorganization, which we
// catch by testing connectivity from known blocks to the previous
// block.
log.Infof("Starting rescan from block %s", hash)
bestHash, bestHeight, err := c.GetBestBlock()
if err != nil {
return err
}
bestHeader, err := c.GetBlockHeaderVerbose(bestHash)
if err != nil {
return err
}
bestBlock := &waddrmgr.BlockStamp{
Hash: *bestHash,
Height: bestHeight,
Timestamp: time.Unix(bestHeader.Time, 0),
}
lastHeader, err := c.GetBlockHeaderVerbose(hash)
if err != nil {
return err
}
lastHash, err := chainhash.NewHashFromStr(lastHeader.Hash)
if err != nil {
return err
}
firstHeader := lastHeader
headers := list.New()
headers.PushBack(lastHeader)
// We always send a RescanFinished message when we're done.
defer func() {
c.onRescanFinished(lastHash, lastHeader.Height, time.Unix(
lastHeader.Time, 0))
}()
// Cycle through all of the blocks known to bitcoind, being mindful of
// reorgs.
for i := firstHeader.Height + 1; i <= bestBlock.Height; i++ {
// Get the block at the current height.
hash, err := c.GetBlockHash(int64(i))
if err != nil {
return err
}
// This relies on the fact that bitcoind returns blocks from
// non-best chains it knows about.
// TODO: Make this more robust in order to not rely on this
// behavior.
//
// If the last known header isn't after the wallet birthday,
// try only fetching the next header and constructing a dummy
// block. If, in this event, the next header's timestamp is
// after the wallet birthday, go ahead and fetch the full block.
var block *wire.MsgBlock
c.clientMtx.RLock()
afterBirthday := lastHeader.Time >= c.startTime.Unix()
c.clientMtx.RUnlock()
if !afterBirthday {
header, err := c.GetBlockHeader(hash)
if err != nil {
return err
}
block = &wire.MsgBlock{
Header: *header,
}
c.clientMtx.RLock()
afterBirthday = c.startTime.Before(header.Timestamp)
if afterBirthday {
c.onRescanProgress(lastHash, i,
block.Header.Timestamp)
}
c.clientMtx.RUnlock()
}
if afterBirthday {
block, err = c.GetBlock(hash)
if err != nil {
return err
}
}
for block.Header.PrevBlock.String() != lastHeader.Hash {
// If we're in this for loop, it looks like we've been
// reorganized. We now walk backwards to the common
// ancestor between the best chain and the known chain.
//
// First, we signal a disconnected block to rewind the
// rescan state.
c.onBlockDisconnected(lastHash, lastHeader.Height,
time.Unix(lastHeader.Time, 0))
// Next, we get the previous block of the best chain.
hash, err = c.GetBlockHash(int64(i - 1))
if err != nil {
return err
}
block, err = c.GetBlock(hash)
if err != nil {
return err
}
// Then, we get the previous header for the known chain.
if headers.Back() != nil {
// If it's already in the headers list, we can
// just get it from there and remove the
// current hash).
headers.Remove(headers.Back())
if headers.Back() != nil {
lastHeader = headers.Back().
Value.(*btcjson.
GetBlockHeaderVerboseResult)
lastHash, err = chainhash.
NewHashFromStr(lastHeader.Hash)
if err != nil {
return err
}
}
} else {
// Otherwise, we get it from bitcoind.
lastHash, err = chainhash.NewHashFromStr(
lastHeader.PreviousHash)
if err != nil {
return err
}
lastHeader, err = c.GetBlockHeaderVerbose(
lastHash)
if err != nil {
return err
}
}
}
// We are at the latest known block, so we notify.
lastHeader = &btcjson.GetBlockHeaderVerboseResult{
Hash: block.BlockHash().String(),
Height: i,
PreviousHash: block.Header.PrevBlock.String(),
Time: block.Header.Timestamp.Unix(),
}
blockHash := block.BlockHash()
lastHash = &blockHash
headers.PushBack(lastHeader)
_, err = c.filterBlock(block, i, true)
if err != nil {
return err
}
if i%10000 == 0 {
c.onRescanProgress(lastHash, i, block.Header.Timestamp)
}
// If we've reached the previously best-known block, check to
// make sure the underlying node hasn't synchronized additional
// blocks. If it has, update the best-known block and continue
// to rescan to that point.
if i == bestBlock.Height {
bestHash, bestHeight, err = c.GetBestBlock()
if err != nil {
return err
}
bestHeader, err = c.GetBlockHeaderVerbose(bestHash)
if err != nil {
return err
}
bestBlock = &waddrmgr.BlockStamp{
Hash: *bestHash,
Height: bestHeight,
Timestamp: time.Unix(bestHeader.Time, 0),
}
}
}
return nil
}
// filterBlock filters a block for watched outpoints and addresses, and returns
// any matching transactions, sending notifications along the way.
func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32,
notify bool) ([]*wtxmgr.TxRecord, error) {
// If we're earlier than wallet birthday, don't do any notifications.
c.clientMtx.RLock()
startTime := c.startTime
c.clientMtx.RUnlock()
if block.Header.Timestamp.Before(startTime) {
return nil, nil
}
// Only mention that we're filtering a block if the client wallet has
// started monitoring the chain.
if !c.notifying() {
log.Debugf("Filtering block %d (%s) with %d transactions",
height, block.BlockHash(), len(block.Transactions))
}
// Create block details for notifications.
blockHash := block.BlockHash()
blockDetails := &btcjson.BlockDetails{
Hash: blockHash.String(),
Height: height,
Time: block.Header.Timestamp.Unix(),
}
// Cycle through all transactions in the block.
var relevantTxs []*wtxmgr.TxRecord
blockConfirmed := make(map[chainhash.Hash]struct{})
for i, tx := range block.Transactions {
// Update block and tx details for notifications.
blockDetails.Index = i
found, rec, err := c.filterTx(tx, blockDetails, notify)
if err != nil {
log.Warnf("Unable to filter tx: %v", err)
continue
}
if found {
relevantTxs = append(relevantTxs, rec)
blockConfirmed[tx.TxHash()] = struct{}{}
}
}
// Update the expiration map by setting the block's confirmed
// transactions and deleting any in the mempool that were confirmed
// over 288 blocks ago.
c.clientMtx.Lock()
c.memPoolExp[height] = blockConfirmed
if oldBlock, ok := c.memPoolExp[height-288]; ok {
for txHash := range oldBlock {
delete(c.memPool, txHash)
}
delete(c.memPoolExp, height-288)
}
c.clientMtx.Unlock()
if notify {
c.onFilteredBlockConnected(height, &block.Header, relevantTxs)
c.onBlockConnected(&blockHash, height, block.Header.Timestamp)
}
return relevantTxs, nil
}
// filterTx filters a single transaction against the client's watch list.
func (c *BitcoindClient) filterTx(tx *wire.MsgTx,
blockDetails *btcjson.BlockDetails, notify bool) (bool,
*wtxmgr.TxRecord, error) {
txDetails := btcutil.NewTx(tx)
if blockDetails != nil {
txDetails.SetIndex(blockDetails.Index)
}
rec, err := wtxmgr.NewTxRecordFromMsgTx(txDetails.MsgTx(), time.Now())
if err != nil {
log.Errorf("Cannot create transaction record for relevant "+
"tx: %v", err)
return false, nil, err
}
if blockDetails != nil {
rec.Received = time.Unix(blockDetails.Time, 0)
}
var notifyTx bool
// If we already know this is a relevant tx from a previous ntfn, we
// can shortcut the filter process and let the caller know the filter
// matches.
c.clientMtx.RLock()
if _, ok := c.memPool[tx.TxHash()]; ok {
c.clientMtx.RUnlock()
if notify && blockDetails != nil {
c.onRelevantTx(rec, blockDetails)
}
return true, rec, nil
}
c.clientMtx.RUnlock()
// Cycle through outputs and check if we've matched a known address.
// Add any matched outpoints to watchOutPoints.
for i, out := range tx.TxOut {
_, addrs, _, err := txscript.ExtractPkScriptAddrs(
out.PkScript, c.chainParams)
if err != nil {
log.Debugf("Couldn't parse output script in %s:%d: %v",
tx.TxHash(), i, err)
continue
}
for _, addr := range addrs {
c.clientMtx.RLock()
if _, ok := c.watchAddrs[addr.EncodeAddress()]; ok {
notifyTx = true
c.watchOutPoints[wire.OutPoint{
Hash: tx.TxHash(),
Index: uint32(i),
}] = struct{}{}
}
c.clientMtx.RUnlock()
}
}
// If an output hasn't already matched, see if an input will.
if !notifyTx {
for _, in := range tx.TxIn {
c.clientMtx.RLock()
if _, ok := c.watchOutPoints[in.PreviousOutPoint]; ok {
c.clientMtx.RUnlock()
notifyTx = true
break
}
c.clientMtx.RUnlock()
}
}
// If we have a match and it's not mined, notify the TX. If the TX is
// mined, we notify as part of FilteredBlockConnected. The boolean map
// value will let us know if we last saw it as mined or unmined.
if notifyTx {
c.clientMtx.Lock()
if _, ok := c.memPool[tx.TxHash()]; blockDetails == nil || !ok {
c.onRelevantTx(rec, blockDetails)
}
c.memPool[tx.TxHash()] = struct{}{}
c.clientMtx.Unlock()
}
return notifyTx, rec, nil
}
// handler maintains a queue of notifications and the current state (best
// block) of the chain.
func (c *BitcoindClient) handler() {
hash, height, err := c.GetBestBlock()
if err != nil {
log.Errorf("Failed to receive best block from chain server: %v", err)
c.Stop()
c.wg.Done()
return
}
bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height}
// TODO: Rather than leaving this as an unbounded queue for all types of
// notifications, try dropping ones where a later enqueued notification
// can fully invalidate one waiting to be processed. For example,
// blockconnected notifications for greater block heights can remove the
// need to process earlier blockconnected notifications still waiting
// here.
// TODO(aakselrod): Factor this logic out so it can be reused for each
// chain back end, rather than copying it.
var notifications []interface{}
enqueue := c.enqueueNotification
var dequeue chan interface{}
var next interface{}
out:
for {
select {
case n, ok := <-enqueue:
if !ok {
// If no notifications are queued for handling,
// the queue is finished.
if len(notifications) == 0 {
break out
}
// nil channel so no more reads can occur.
enqueue = nil
continue
}
if len(notifications) == 0 {
next = n
dequeue = c.dequeueNotification
}
notifications = append(notifications, n)
case dequeue <- next:
if n, ok := next.(BlockConnected); ok {
bs = &waddrmgr.BlockStamp{
Height: n.Height,
Hash: n.Hash,
}
}
notifications[0] = nil
notifications = notifications[1:]
if len(notifications) != 0 {
next = notifications[0]
} else {
// If no more notifications can be enqueued, the
// queue is finished.
if enqueue == nil {
break out
}
dequeue = nil
}
case c.currentBlock <- bs:
case <-c.quit:
break out
}
}
c.Stop()
close(c.dequeueNotification)
c.wg.Done()
}