diff --git a/chain/bitcoind.go b/chain/bitcoind.go deleted file mode 100644 index 0e10ec1..0000000 --- a/chain/bitcoind.go +++ /dev/null @@ -1,1259 +0,0 @@ -package chain - -import ( - "bytes" - "container/list" - "encoding/hex" - "errors" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/btcsuite/btcd/btcjson" - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/rpcclient" - "github.com/btcsuite/btcd/txscript" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" - "github.com/btcsuite/btcwallet/waddrmgr" - "github.com/btcsuite/btcwallet/wtxmgr" - "github.com/lightninglabs/gozmq" -) - -// BitcoindClient represents a persistent client connection to a bitcoind server -// for information regarding the current best block chain. -type BitcoindClient struct { - client *rpcclient.Client - connConfig *rpcclient.ConnConfig // Work around unexported field - chainParams *chaincfg.Params - - zmqConnect string - zmqPollInterval time.Duration - - enqueueNotification chan interface{} - dequeueNotification chan interface{} - currentBlock chan *waddrmgr.BlockStamp - - clientMtx sync.RWMutex - rescanUpdate chan interface{} - startTime time.Time - watchOutPoints map[wire.OutPoint]struct{} - watchAddrs map[string]struct{} - watchTxIDs map[chainhash.Hash]struct{} - notify uint32 - - quit chan struct{} - wg sync.WaitGroup - started bool - quitMtx sync.Mutex - - memPool map[chainhash.Hash]struct{} - memPoolExp map[int32]map[chainhash.Hash]struct{} -} - -// NewBitcoindClient creates a client connection to the server described by the -// connect string. If disableTLS is false, the remote RPC certificate must be -// provided in the certs slice. The connection is not established immediately, -// but must be done using the Start method. If the remote server does not -// operate on the same bitcoin network as described by the passed chain -// parameters, the connection will be disconnected. -func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass, - zmqConnect string, zmqPollInterval time.Duration) (*BitcoindClient, - error) { - - client := &BitcoindClient{ - connConfig: &rpcclient.ConnConfig{ - Host: connect, - User: user, - Pass: pass, - DisableAutoReconnect: false, - DisableConnectOnNew: true, - DisableTLS: true, - HTTPPostMode: true, - }, - chainParams: chainParams, - zmqConnect: zmqConnect, - zmqPollInterval: zmqPollInterval, - enqueueNotification: make(chan interface{}), - dequeueNotification: make(chan interface{}), - currentBlock: make(chan *waddrmgr.BlockStamp), - rescanUpdate: make(chan interface{}), - watchOutPoints: make(map[wire.OutPoint]struct{}), - watchAddrs: make(map[string]struct{}), - watchTxIDs: make(map[chainhash.Hash]struct{}), - quit: make(chan struct{}), - memPool: make(map[chainhash.Hash]struct{}), - memPoolExp: make(map[int32]map[chainhash.Hash]struct{}), - } - rpcClient, err := rpcclient.New(client.connConfig, nil) - if err != nil { - return nil, err - } - client.client = rpcClient - return client, nil -} - -// BackEnd returns the name of the driver. -func (c *BitcoindClient) BackEnd() string { - return "bitcoind" -} - -// GetCurrentNet returns the network on which the bitcoind instance is running. -func (c *BitcoindClient) GetCurrentNet() (wire.BitcoinNet, error) { - hash, err := c.client.GetBlockHash(0) - if err != nil { - return 0, err - } - - switch *hash { - case *chaincfg.TestNet3Params.GenesisHash: - return chaincfg.TestNet3Params.Net, nil - case *chaincfg.RegressionNetParams.GenesisHash: - return chaincfg.RegressionNetParams.Net, nil - case *chaincfg.MainNetParams.GenesisHash: - return chaincfg.MainNetParams.Net, nil - default: - return 0, errors.New("unknown network") - } -} - -// GetBestBlock returns the highest block known to bitcoind. -func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) { - bcinfo, err := c.client.GetBlockChainInfo() - if err != nil { - return nil, 0, err - } - - hash, err := chainhash.NewHashFromStr(bcinfo.BestBlockHash) - if err != nil { - return nil, 0, err - } - - return hash, bcinfo.Blocks, nil -} - -// GetBlockHeight returns the height for the hash, if known, or returns an -// error. -func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { - header, err := c.GetBlockHeaderVerbose(hash) - if err != nil { - return 0, err - } - - return header.Height, nil -} - -// GetBlock returns a block from the hash. -func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, - error) { - return c.client.GetBlock(hash) -} - -// GetBlockVerbose returns a verbose block from the hash. -func (c *BitcoindClient) GetBlockVerbose(hash *chainhash.Hash) ( - *btcjson.GetBlockVerboseResult, error) { - return c.client.GetBlockVerbose(hash) -} - -// GetBlockHash returns a block hash from the height. -func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) { - return c.client.GetBlockHash(height) -} - -// GetBlockHeader returns a block header from the hash. -func (c *BitcoindClient) GetBlockHeader( - hash *chainhash.Hash) (*wire.BlockHeader, error) { - return c.client.GetBlockHeader(hash) -} - -// GetBlockHeaderVerbose returns a block header from the hash. -func (c *BitcoindClient) GetBlockHeaderVerbose(hash *chainhash.Hash) ( - *btcjson.GetBlockHeaderVerboseResult, error) { - return c.client.GetBlockHeaderVerbose(hash) -} - -// GetRawTransactionVerbose returns a transaction from the tx hash. -func (c *BitcoindClient) GetRawTransactionVerbose(hash *chainhash.Hash) ( - *btcjson.TxRawResult, error) { - return c.client.GetRawTransactionVerbose(hash) -} - -// GetTxOut returns a txout from the outpoint info provided. -func (c *BitcoindClient) GetTxOut(txHash *chainhash.Hash, index uint32, - mempool bool) (*btcjson.GetTxOutResult, error) { - return c.client.GetTxOut(txHash, index, mempool) -} - -// NotifyReceived updates the watch list with the passed addresses. -func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { - c.NotifyBlocks() - select { - case c.rescanUpdate <- addrs: - case <-c.quit: - } - return nil -} - -// NotifySpent updates the watch list with the passed outPoints. -func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { - c.NotifyBlocks() - select { - case c.rescanUpdate <- outPoints: - case <-c.quit: - } - return nil -} - -// NotifyTxIDs updates the watch list with the passed TxIDs. -func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error { - c.NotifyBlocks() - select { - case c.rescanUpdate <- txids: - case <-c.quit: - } - return nil -} - -// NotifyBlocks enables notifications. -func (c *BitcoindClient) NotifyBlocks() error { - atomic.StoreUint32(&c.notify, 1) - return nil -} - -// notifying returns true if notifications have been turned on; false otherwise. -func (c *BitcoindClient) notifying() bool { - return (atomic.LoadUint32(&c.notify) == 1) -} - -// LoadTxFilter updates the transaction watchlists for the client. Acceptable -// arguments after `reset` are any combination of []btcutil.Address, -// []wire.OutPoint, []*wire.OutPoint, []chainhash.Hash, -// map[wire.OutPoint]btcutil.Address, and []*chainhash.Hash. -func (c *BitcoindClient) LoadTxFilter(reset bool, - watchLists ...interface{}) error { - - // If we reset, signal that. - if reset { - select { - case c.rescanUpdate <- reset: - case <-c.quit: - return nil - } - } - - // This helper function will send an update to the filter. If the quit - // channel is closed, it will allow the outer loop below to finish, - // but skip over any updates as the quit case is triggered each time. - sendList := func(list interface{}) { - select { - case c.rescanUpdate <- list: - case <-c.quit: - } - } - - for _, watchList := range watchLists { - switch list := watchList.(type) { - - case map[wire.OutPoint]btcutil.Address: - sendList(list) - - case []wire.OutPoint: - sendList(list) - - case []*wire.OutPoint: - sendList(list) - - case []btcutil.Address: - sendList(list) - - case []chainhash.Hash: - sendList(list) - - case []*chainhash.Hash: - sendList(list) - - default: - log.Warnf("Couldn't add item to filter: unknown type") - } - } - return nil -} - -// RescanBlocks rescans any blocks passed, returning only the blocks that -// matched as []btcjson.BlockDetails. -func (c *BitcoindClient) RescanBlocks(blockHashes []chainhash.Hash) ( - []btcjson.RescannedBlock, error) { - - rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes)) - for _, hash := range blockHashes { - header, err := c.GetBlockHeaderVerbose(&hash) - if err != nil { - log.Warnf("Unable to get header %s from bitcoind: %s", - hash, err) - continue - } - - block, err := c.GetBlock(&hash) - if err != nil { - log.Warnf("Unable to get block %s from bitcoind: %s", - hash, err) - continue - } - - relevantTxes, err := c.filterBlock(block, header.Height, false) - if len(relevantTxes) > 0 { - rescannedBlock := btcjson.RescannedBlock{ - Hash: hash.String(), - } - for _, tx := range relevantTxes { - rescannedBlock.Transactions = append( - rescannedBlock.Transactions, - hex.EncodeToString(tx.SerializedTx), - ) - } - rescannedBlocks = append(rescannedBlocks, - rescannedBlock) - } - } - return rescannedBlocks, nil -} - -// Rescan rescans from the block with the given hash until the current block, -// after adding the passed addresses and outpoints to the client's watch list. -func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, - addrs []btcutil.Address, outPoints map[wire.OutPoint]btcutil.Address) error { - - if blockHash == nil { - return errors.New("rescan requires a starting block hash") - } - - // Update addresses. - select { - case c.rescanUpdate <- addrs: - case <-c.quit: - return nil - } - - // Update outpoints. - select { - case c.rescanUpdate <- outPoints: - case <-c.quit: - return nil - } - - // Kick off the rescan with the starting block hash. - select { - case c.rescanUpdate <- blockHash: - case <-c.quit: - return nil - } - - return nil -} - -// SendRawTransaction sends a raw transaction via bitcoind. -func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx, - allowHighFees bool) (*chainhash.Hash, error) { - - return c.client.SendRawTransaction(tx, allowHighFees) -} - -// Start attempts to establish a client connection with the remote server. -// If successful, handler goroutines are started to process notifications -// sent by the server. After a limited number of connection attempts, this -// function gives up, and therefore will not block forever waiting for the -// connection to be established to a server that may not exist. -func (c *BitcoindClient) Start() error { - // Verify that the server is running on the expected network. - net, err := c.GetCurrentNet() - if err != nil { - c.client.Disconnect() - return err - } - if net != c.chainParams.Net { - c.client.Disconnect() - return errors.New("mismatched networks") - } - - // Connect a ZMQ socket for block notifications - zmqClient, err := gozmq.Subscribe(c.zmqConnect, []string{"rawblock", - "rawtx"}, c.zmqPollInterval) - if err != nil { - return err - } - - c.quitMtx.Lock() - c.started = true - c.quitMtx.Unlock() - - c.wg.Add(2) - go c.handler() - go c.socketHandler(zmqClient) - return nil -} - -// Stop disconnects the client and signals the shutdown of all goroutines -// started by Start. -func (c *BitcoindClient) Stop() { - c.quitMtx.Lock() - select { - case <-c.quit: - default: - close(c.quit) - c.client.Shutdown() - - if !c.started { - close(c.dequeueNotification) - } - } - c.quitMtx.Unlock() -} - -// WaitForShutdown blocks until both the client has finished disconnecting -// and all handlers have exited. -func (c *BitcoindClient) WaitForShutdown() { - c.client.WaitForShutdown() - c.wg.Wait() -} - -// Notifications returns a channel of parsed notifications sent by the remote -// bitcoin RPC server. This channel must be continually read or the process -// may abort for running out memory, as unread notifications are queued for -// later reads. -func (c *BitcoindClient) Notifications() <-chan interface{} { - return c.dequeueNotification -} - -// SetStartTime is a non-interface method to set the birthday of the wallet -// using this object. Since only a single rescan at a time is currently -// supported, only one birthday needs to be set. This does not fully restart a -// running rescan, so should not be used to update a rescan while it is running. -// TODO: When factoring out to multiple rescans per bitcoind client, add a -// birthday per client. -func (c *BitcoindClient) SetStartTime(startTime time.Time) { - c.clientMtx.Lock() - defer c.clientMtx.Unlock() - - c.startTime = startTime -} - -// BlockStamp returns the latest block notified by the client, or an error -// if the client has been shut down. -func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { - select { - case bs := <-c.currentBlock: - return bs, nil - case <-c.quit: - return nil, errors.New("disconnected") - } -} - -func (c *BitcoindClient) onClientConnect() { - select { - case c.enqueueNotification <- ClientConnected{}: - case <-c.quit: - } -} - -func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) { - if c.notifying() { - select { - case c.enqueueNotification <- BlockConnected{ - Block: wtxmgr.Block{ - Hash: *hash, - Height: height, - }, - Time: time, - }: - case <-c.quit: - } - } -} - -func (c *BitcoindClient) onFilteredBlockConnected(height int32, - header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { - if c.notifying() { - select { - case c.enqueueNotification <- FilteredBlockConnected{ - Block: &wtxmgr.BlockMeta{ - Block: wtxmgr.Block{ - Hash: header.BlockHash(), - Height: height, - }, - Time: header.Timestamp, - }, - RelevantTxs: relevantTxs, - }: - case <-c.quit: - } - } -} - -func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) { - if c.notifying() { - select { - case c.enqueueNotification <- BlockDisconnected{ - Block: wtxmgr.Block{ - Hash: *hash, - Height: height, - }, - Time: time, - }: - case <-c.quit: - } - } -} - -func (c *BitcoindClient) onRelevantTx(rec *wtxmgr.TxRecord, - block *btcjson.BlockDetails) { - blk, err := parseBlock(block) - if err != nil { - // Log and drop improper notification. - log.Errorf("recvtx notification bad block: %v", err) - return - } - - select { - case c.enqueueNotification <- RelevantTx{rec, blk}: - case <-c.quit: - } -} - -func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) { - select { - case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}: - case <-c.quit: - } -} - -func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) { - log.Infof("Rescan finished at %d (%s)", height, hash) - select { - case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}: - case <-c.quit: - } - -} - -// socketHandler reads events from the ZMQ socket, processes them as -// appropriate, and queues them as btcd or neutrino would. -func (c *BitcoindClient) socketHandler(zmqClient *gozmq.Conn) { - defer c.wg.Done() - defer zmqClient.Close() - - log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect) - c.onClientConnect() - - // Get initial conditions. - bestHash, bestHeight, err := c.GetBestBlock() - if err != nil { - log.Error(err) - return - } - bestHeader, err := c.GetBlockHeaderVerbose(bestHash) - if err != nil { - log.Error(err) - return - } - bs := &waddrmgr.BlockStamp{ - Height: bestHeight, - Hash: *bestHash, - Timestamp: time.Unix(bestHeader.Time, 0), - } - -mainLoop: - for { - selectLoop: - for { - // Check for any requests before we poll events from - // bitcoind. - select { - - // Quit if requested - case <-c.quit: - return - - // Update our monitored watchlists or do a rescan. - case event := <-c.rescanUpdate: - switch e := event.(type) { - - // We're clearing the watchlists. - case struct{}: - c.clientMtx.Lock() - c.watchAddrs = make(map[string]struct{}) - c.watchTxIDs = make(map[chainhash.Hash]struct{}) - c.watchOutPoints = - make(map[wire.OutPoint]struct{}) - c.clientMtx.Unlock() - - // We're updating monitored addresses. - case []btcutil.Address: - c.clientMtx.Lock() - for _, addr := range e { - c.watchAddrs[addr.EncodeAddress()] = - struct{}{} - } - c.clientMtx.Unlock() - - // We're updating monitored outpoints from - // pointers. - case []*wire.OutPoint: - c.clientMtx.Lock() - for _, op := range e { - c.watchOutPoints[*op] = struct{}{} - } - c.clientMtx.Unlock() - case []wire.OutPoint: - c.clientMtx.Lock() - for _, op := range e { - c.watchOutPoints[op] = struct{}{} - } - c.clientMtx.Unlock() - - // We're updating monitored outpoints that map - // to the scripts that we should scan for. - case map[wire.OutPoint]btcutil.Address: - c.clientMtx.Lock() - for op := range e { - c.watchOutPoints[op] = struct{}{} - } - c.clientMtx.Unlock() - - // We're adding monitored TXIDs. - case []*chainhash.Hash: - c.clientMtx.Lock() - for _, txid := range e { - c.watchTxIDs[*txid] = struct{}{} - } - c.clientMtx.Unlock() - case []chainhash.Hash: - c.clientMtx.Lock() - for _, txid := range e { - c.watchTxIDs[txid] = struct{}{} - } - c.clientMtx.Unlock() - - // We're rescanning from the passed hash. - case *chainhash.Hash: - err = c.rescan(e) - if err != nil { - log.Errorf("rescan failed: %s", - err) - } - } - default: - break selectLoop - } - } - - // Now, poll events from bitcoind. - msgBytes, err := zmqClient.Receive() - if err != nil { - switch e := err.(type) { - case net.Error: - if !e.Timeout() { - log.Error(err) - } - default: - log.Error(err) - } - continue mainLoop - } - - // We have an event! - switch string(msgBytes[0]) { - - // We have a transaction, so process it. - case "rawtx": - tx := &wire.MsgTx{} - err = tx.Deserialize(bytes.NewBuffer(msgBytes[1])) - if err != nil { - log.Error(err) - continue mainLoop - } - // filterTx automatically detects whether this tx has - // been mined and responds appropriately. - _, _, err := c.filterTx(tx, nil, true) - if err != nil { - log.Error(err) - } - - // We have a raw block, so we process it. - case "rawblock": - block := &wire.MsgBlock{} - err = block.Deserialize(bytes.NewBuffer(msgBytes[1])) - if err != nil { - log.Error(err) - continue mainLoop - } - - // Check if the block is logically next. If not, we - // have a reorg. - if block.Header.PrevBlock == bs.Hash { - // No reorg. Notify the subscriber of the block. - bs.Hash = block.BlockHash() - bs.Height++ - bs.Timestamp = block.Header.Timestamp - _, err = c.filterBlock(block, bs.Height, true) - if err != nil { - log.Error(err) - } - continue mainLoop - } - - // We have a reorg. - err = c.reorg(bs, block) - if err != nil { - log.Errorf("Error during reorg: %v", err) - } - - // Our event is not a block or other type we're - // watching, so we ignore it. - default: - } - } -} - -// reorg processes a reorganization during chain synchronization. This is -// separate from a rescan's handling of a reorg. -func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) error { - // We rewind until we find a common ancestor between the known chain - //and the current chain, and then fast forward again. This relies on - // being able to fetch both from bitcoind; to change that would require - // changes in downstream code. - // TODO: Make this more robust in order not to rely on this behavior. - log.Debugf("Possible reorg at block %s", block.BlockHash()) - knownHeader, err := c.GetBlockHeader(&bs.Hash) - if err != nil { - return err - } - - // We also get the best known height based on the block which was - // notified. This way, we can preserve the chain of blocks we need to - // retrieve. - bestHash := block.BlockHash() - bestHeight, err := c.GetBlockHeight(&bestHash) - if err != nil { - return err - } - if bestHeight < bs.Height { - log.Debug("multiple reorgs in a row") - return nil - } - - // We track the block headers from the notified block to the current - // block at the known block height. This will let us fast-forward - // despite any future reorgs. - var reorgBlocks list.List - reorgBlocks.PushFront(block) - for i := bestHeight - 1; i >= bs.Height; i-- { - block, err = c.GetBlock(&block.Header.PrevBlock) - if err != nil { - return err - } - reorgBlocks.PushFront(block) - } - - // Now we rewind back to the last common ancestor block, using the - // prevblock hash from each header to avoid any race conditions. If we - // get more reorgs, they'll be queued and we'll repeat the cycle. - for block.Header.PrevBlock != knownHeader.PrevBlock { - log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash) - c.onBlockDisconnected(&bs.Hash, bs.Height, - knownHeader.Timestamp) - bs.Height-- - bs.Hash = knownHeader.PrevBlock - block, err = c.GetBlock(&block.Header.PrevBlock) - if err != nil { - return err - } - reorgBlocks.PushFront(block) - knownHeader, err = c.GetBlockHeader(&knownHeader.PrevBlock) - if err != nil { - return err - } - bs.Timestamp = knownHeader.Timestamp - } - - // Disconnect the last block from the old chain. Since the PrevBlock is - // equal between the old and new chains, the tip will now be the last - // common ancestor. - log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash) - c.onBlockDisconnected(&bs.Hash, bs.Height, knownHeader.Timestamp) - bs.Height-- - - // Now we fast-forward to the notified block, notifying along the way. - for reorgBlocks.Front() != nil { - block = reorgBlocks.Front().Value.(*wire.MsgBlock) - bs.Height++ - bs.Hash = block.BlockHash() - c.filterBlock(block, bs.Height, true) - reorgBlocks.Remove(reorgBlocks.Front()) - } - - return nil -} - -// FilterBlocks scans the blocks contained in the FilterBlocksRequest for any -// addresses of interest. Each block will be fetched and filtered sequentially, -// returning a FilterBlocksReponse for the first block containing a matching -// address. If no matches are found in the range of blocks requested, the -// returned response will be nil. -func (c *BitcoindClient) FilterBlocks( - req *FilterBlocksRequest) (*FilterBlocksResponse, error) { - - blockFilterer := NewBlockFilterer(c.chainParams, req) - - // Iterate over the requested blocks, fetching each from the rpc client. - // Each block will scanned using the reverse addresses indexes generated - // above, breaking out early if any addresses are found. - for i, block := range req.Blocks { - // TODO(conner): add prefetching, since we already know we'll be - // fetching *every* block - rawBlock, err := c.client.GetBlock(&block.Hash) - if err != nil { - return nil, err - } - - if !blockFilterer.FilterBlock(rawBlock) { - continue - } - - // If any external or internal addresses were detected in this - // block, we return them to the caller so that the rescan - // windows can widened with subsequent addresses. The - // `BatchIndex` is returned so that the caller can compute the - // *next* block from which to begin again. - resp := &FilterBlocksResponse{ - BatchIndex: uint32(i), - BlockMeta: block, - FoundExternalAddrs: blockFilterer.FoundExternal, - FoundInternalAddrs: blockFilterer.FoundInternal, - FoundOutPoints: blockFilterer.FoundOutPoints, - RelevantTxns: blockFilterer.RelevantTxns, - } - - return resp, nil - } - - // No addresses were found for this range. - return nil, nil -} - -// rescan performs a rescan of the chain using a bitcoind back-end, from the -// specified hash to the best-known hash, while watching out for reorgs that -// happen during the rescan. It uses the addresses and outputs being tracked -// by the client in the watch list. This is called only within a queue -// processing loop. -func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { - // We start by getting the best already-processed block. We only use - // the height, as the hash can change during a reorganization, which we - // catch by testing connectivity from known blocks to the previous - // block. - log.Infof("Starting rescan from block %s", hash) - bestHash, bestHeight, err := c.GetBestBlock() - if err != nil { - return err - } - bestHeader, err := c.GetBlockHeaderVerbose(bestHash) - if err != nil { - return err - } - bestBlock := &waddrmgr.BlockStamp{ - Hash: *bestHash, - Height: bestHeight, - Timestamp: time.Unix(bestHeader.Time, 0), - } - lastHeader, err := c.GetBlockHeaderVerbose(hash) - if err != nil { - return err - } - lastHash, err := chainhash.NewHashFromStr(lastHeader.Hash) - if err != nil { - return err - } - firstHeader := lastHeader - - headers := list.New() - headers.PushBack(lastHeader) - - // We always send a RescanFinished message when we're done. - defer func() { - c.onRescanFinished(lastHash, lastHeader.Height, time.Unix( - lastHeader.Time, 0)) - }() - - // Cycle through all of the blocks known to bitcoind, being mindful of - // reorgs. - for i := firstHeader.Height + 1; i <= bestBlock.Height; i++ { - // Get the block at the current height. - hash, err := c.GetBlockHash(int64(i)) - if err != nil { - return err - } - - // This relies on the fact that bitcoind returns blocks from - // non-best chains it knows about. - // TODO: Make this more robust in order to not rely on this - // behavior. - // - // If the last known header isn't after the wallet birthday, - // try only fetching the next header and constructing a dummy - // block. If, in this event, the next header's timestamp is - // after the wallet birthday, go ahead and fetch the full block. - var block *wire.MsgBlock - c.clientMtx.RLock() - afterBirthday := lastHeader.Time >= c.startTime.Unix() - c.clientMtx.RUnlock() - if !afterBirthday { - header, err := c.GetBlockHeader(hash) - if err != nil { - return err - } - block = &wire.MsgBlock{ - Header: *header, - } - c.clientMtx.RLock() - afterBirthday = c.startTime.Before(header.Timestamp) - if afterBirthday { - c.onRescanProgress(lastHash, i, - block.Header.Timestamp) - } - c.clientMtx.RUnlock() - } - - if afterBirthday { - block, err = c.GetBlock(hash) - if err != nil { - return err - } - } - - for block.Header.PrevBlock.String() != lastHeader.Hash { - // If we're in this for loop, it looks like we've been - // reorganized. We now walk backwards to the common - // ancestor between the best chain and the known chain. - // - // First, we signal a disconnected block to rewind the - // rescan state. - c.onBlockDisconnected(lastHash, lastHeader.Height, - time.Unix(lastHeader.Time, 0)) - - // Next, we get the previous block of the best chain. - hash, err = c.GetBlockHash(int64(i - 1)) - if err != nil { - return err - } - - block, err = c.GetBlock(hash) - if err != nil { - return err - } - - // Then, we get the previous header for the known chain. - if headers.Back() != nil { - // If it's already in the headers list, we can - // just get it from there and remove the - // current hash). - headers.Remove(headers.Back()) - if headers.Back() != nil { - lastHeader = headers.Back(). - Value.(*btcjson. - GetBlockHeaderVerboseResult) - lastHash, err = chainhash. - NewHashFromStr(lastHeader.Hash) - if err != nil { - return err - } - } - } else { - // Otherwise, we get it from bitcoind. - lastHash, err = chainhash.NewHashFromStr( - lastHeader.PreviousHash) - if err != nil { - return err - } - lastHeader, err = c.GetBlockHeaderVerbose( - lastHash) - if err != nil { - return err - } - } - } - - // We are at the latest known block, so we notify. - lastHeader = &btcjson.GetBlockHeaderVerboseResult{ - Hash: block.BlockHash().String(), - Height: i, - PreviousHash: block.Header.PrevBlock.String(), - Time: block.Header.Timestamp.Unix(), - } - blockHash := block.BlockHash() - lastHash = &blockHash - headers.PushBack(lastHeader) - - _, err = c.filterBlock(block, i, true) - if err != nil { - return err - } - - if i%10000 == 0 { - c.onRescanProgress(lastHash, i, block.Header.Timestamp) - } - - // If we've reached the previously best-known block, check to - // make sure the underlying node hasn't synchronized additional - // blocks. If it has, update the best-known block and continue - // to rescan to that point. - if i == bestBlock.Height { - bestHash, bestHeight, err = c.GetBestBlock() - if err != nil { - return err - } - bestHeader, err = c.GetBlockHeaderVerbose(bestHash) - if err != nil { - return err - } - bestBlock = &waddrmgr.BlockStamp{ - Hash: *bestHash, - Height: bestHeight, - Timestamp: time.Unix(bestHeader.Time, 0), - } - } - } - - return nil -} - -// filterBlock filters a block for watched outpoints and addresses, and returns -// any matching transactions, sending notifications along the way. -func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32, - notify bool) ([]*wtxmgr.TxRecord, error) { - // If we're earlier than wallet birthday, don't do any notifications. - c.clientMtx.RLock() - startTime := c.startTime - c.clientMtx.RUnlock() - if block.Header.Timestamp.Before(startTime) { - return nil, nil - } - - // Only mention that we're filtering a block if the client wallet has - // started monitoring the chain. - if !c.notifying() { - log.Debugf("Filtering block %d (%s) with %d transactions", - height, block.BlockHash(), len(block.Transactions)) - } - - // Create block details for notifications. - blockHash := block.BlockHash() - blockDetails := &btcjson.BlockDetails{ - Hash: blockHash.String(), - Height: height, - Time: block.Header.Timestamp.Unix(), - } - - // Cycle through all transactions in the block. - var relevantTxs []*wtxmgr.TxRecord - blockConfirmed := make(map[chainhash.Hash]struct{}) - for i, tx := range block.Transactions { - // Update block and tx details for notifications. - blockDetails.Index = i - found, rec, err := c.filterTx(tx, blockDetails, notify) - if err != nil { - log.Warnf("Unable to filter tx: %v", err) - continue - } - if found { - relevantTxs = append(relevantTxs, rec) - blockConfirmed[tx.TxHash()] = struct{}{} - } - } - - // Update the expiration map by setting the block's confirmed - // transactions and deleting any in the mempool that were confirmed - // over 288 blocks ago. - c.clientMtx.Lock() - c.memPoolExp[height] = blockConfirmed - if oldBlock, ok := c.memPoolExp[height-288]; ok { - for txHash := range oldBlock { - delete(c.memPool, txHash) - } - delete(c.memPoolExp, height-288) - } - c.clientMtx.Unlock() - - if notify { - c.onFilteredBlockConnected(height, &block.Header, relevantTxs) - c.onBlockConnected(&blockHash, height, block.Header.Timestamp) - } - - return relevantTxs, nil -} - -// filterTx filters a single transaction against the client's watch list. -func (c *BitcoindClient) filterTx(tx *wire.MsgTx, - blockDetails *btcjson.BlockDetails, notify bool) (bool, - *wtxmgr.TxRecord, error) { - - txDetails := btcutil.NewTx(tx) - if blockDetails != nil { - txDetails.SetIndex(blockDetails.Index) - } - - rec, err := wtxmgr.NewTxRecordFromMsgTx(txDetails.MsgTx(), time.Now()) - if err != nil { - log.Errorf("Cannot create transaction record for relevant "+ - "tx: %v", err) - return false, nil, err - } - if blockDetails != nil { - rec.Received = time.Unix(blockDetails.Time, 0) - } - - var notifyTx bool - - // If we already know this is a relevant tx from a previous ntfn, we - // can shortcut the filter process and let the caller know the filter - // matches. - c.clientMtx.RLock() - if _, ok := c.memPool[tx.TxHash()]; ok { - c.clientMtx.RUnlock() - if notify && blockDetails != nil { - c.onRelevantTx(rec, blockDetails) - } - return true, rec, nil - } - c.clientMtx.RUnlock() - - // Cycle through outputs and check if we've matched a known address. - // Add any matched outpoints to watchOutPoints. - for i, out := range tx.TxOut { - _, addrs, _, err := txscript.ExtractPkScriptAddrs( - out.PkScript, c.chainParams) - if err != nil { - log.Debugf("Couldn't parse output script in %s:%d: %v", - tx.TxHash(), i, err) - continue - } - for _, addr := range addrs { - c.clientMtx.RLock() - if _, ok := c.watchAddrs[addr.EncodeAddress()]; ok { - notifyTx = true - c.watchOutPoints[wire.OutPoint{ - Hash: tx.TxHash(), - Index: uint32(i), - }] = struct{}{} - } - c.clientMtx.RUnlock() - } - } - - // If an output hasn't already matched, see if an input will. - if !notifyTx { - for _, in := range tx.TxIn { - c.clientMtx.RLock() - if _, ok := c.watchOutPoints[in.PreviousOutPoint]; ok { - c.clientMtx.RUnlock() - notifyTx = true - break - } - c.clientMtx.RUnlock() - } - } - - // If we have a match and it's not mined, notify the TX. If the TX is - // mined, we notify as part of FilteredBlockConnected. The boolean map - // value will let us know if we last saw it as mined or unmined. - if notifyTx { - c.clientMtx.Lock() - if _, ok := c.memPool[tx.TxHash()]; blockDetails == nil || !ok { - c.onRelevantTx(rec, blockDetails) - } - c.memPool[tx.TxHash()] = struct{}{} - c.clientMtx.Unlock() - } - - return notifyTx, rec, nil -} - -// handler maintains a queue of notifications and the current state (best -// block) of the chain. -func (c *BitcoindClient) handler() { - hash, height, err := c.GetBestBlock() - if err != nil { - log.Errorf("Failed to receive best block from chain server: %v", err) - c.Stop() - c.wg.Done() - return - } - - bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height} - - // TODO: Rather than leaving this as an unbounded queue for all types of - // notifications, try dropping ones where a later enqueued notification - // can fully invalidate one waiting to be processed. For example, - // blockconnected notifications for greater block heights can remove the - // need to process earlier blockconnected notifications still waiting - // here. - - // TODO(aakselrod): Factor this logic out so it can be reused for each - // chain back end, rather than copying it. - - var notifications []interface{} - enqueue := c.enqueueNotification - var dequeue chan interface{} - var next interface{} -out: - for { - select { - case n, ok := <-enqueue: - if !ok { - // If no notifications are queued for handling, - // the queue is finished. - if len(notifications) == 0 { - break out - } - // nil channel so no more reads can occur. - enqueue = nil - continue - } - if len(notifications) == 0 { - next = n - dequeue = c.dequeueNotification - } - notifications = append(notifications, n) - - case dequeue <- next: - if n, ok := next.(BlockConnected); ok { - bs = &waddrmgr.BlockStamp{ - Height: n.Height, - Hash: n.Hash, - } - } - - notifications[0] = nil - notifications = notifications[1:] - if len(notifications) != 0 { - next = notifications[0] - } else { - // If no more notifications can be enqueued, the - // queue is finished. - if enqueue == nil { - break out - } - dequeue = nil - } - - case c.currentBlock <- bs: - - case <-c.quit: - break out - } - } - - c.Stop() - close(c.dequeueNotification) - c.wg.Done() -} diff --git a/chain/bitcoind_client.go b/chain/bitcoind_client.go new file mode 100644 index 0000000..a136c00 --- /dev/null +++ b/chain/bitcoind_client.go @@ -0,0 +1,1254 @@ +package chain + +import ( + "container/list" + "encoding/hex" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/btcjson" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/btcsuite/btcwallet/waddrmgr" + "github.com/btcsuite/btcwallet/wtxmgr" +) + +var ( + // ErrBitcoindClientShuttingDown is an error returned when we attempt + // to receive a notification for a specific item and the bitcoind client + // is in the middle of shutting down. + ErrBitcoindClientShuttingDown = errors.New("client is shutting down") +) + +// BitcoindClient represents a persistent client connection to a bitcoind server +// for information regarding the current best block chain. +type BitcoindClient struct { + started int32 // To be used atomically. + stopped int32 // To be used atomically. + + // birthday is the earliest time for which we should begin scanning the + // chain. + birthday time.Time + + // chainParams are the parameters of the current chain this client is + // active under. + chainParams *chaincfg.Params + + // id is the unique ID of this client assigned by the backing bitcoind + // connection. + id uint64 + + // chainConn is the backing client to our rescan client that contains + // the RPC and ZMQ connections to a bitcoind node. + chainConn *BitcoindConn + + // bestBlock keeps track of the tip of the current best chain. + bestBlockMtx sync.RWMutex + bestBlock waddrmgr.BlockStamp + + // notifyBlocks signals whether the client is sending block + // notifications to the caller. + notifyBlocks uint32 + + // rescanUpdate is a channel will be sent items that we should match + // transactions against while processing a chain rescan to determine if + // they are relevant to the client. + rescanUpdate chan interface{} + + // watchedAddresses, watchedOutPoints, and watchedTxs are the set of + // items we should match transactions against while processing a chain + // rescan to determine if they are relevant to the client. + watchMtx sync.RWMutex + watchedAddresses map[string]struct{} + watchedOutPoints map[wire.OutPoint]struct{} + watchedTxs map[chainhash.Hash]struct{} + + // mempool keeps track of all relevant transactions that have yet to be + // confirmed. This is used to shortcut the filtering process of a + // transaction when a new confirmed transaction notification is + // received. + // + // NOTE: This requires the watchMtx to be held. + mempool map[chainhash.Hash]struct{} + + // expiredMempool keeps track of a set of confirmed transactions along + // with the height at which they were included in a block. These + // transactions will then be removed from the mempool after a period of + // 288 blocks. This is done to ensure the transactions are safe from a + // reorg in the chain. + // + // NOTE: This requires the watchMtx to be held. + expiredMempool map[int32]map[chainhash.Hash]struct{} + + // notificationQueue is a concurrent unbounded queue that handles + // dispatching notifications to the subscriber of this client. + // + // TODO: Rather than leaving this as an unbounded queue for all types of + // notifications, try dropping ones where a later enqueued notification + // can fully invalidate one waiting to be processed. For example, + // BlockConnected notifications for greater block heights can remove the + // need to process earlier notifications still waiting to be processed. + notificationQueue *ConcurrentQueue + + // zmqTxNtfns is a channel through which ZMQ transaction events will be + // retrieved from the backing bitcoind connection. + zmqTxNtfns chan *wire.MsgTx + + // zmqBlockNtfns is a channel through which ZMQ block events will be + // retrieved from the backing bitcoind connection. + zmqBlockNtfns chan *wire.MsgBlock + + quit chan struct{} + wg sync.WaitGroup +} + +// BackEnd returns the name of the driver. +func (c *BitcoindClient) BackEnd() string { + return "bitcoind" +} + +// GetBestBlock returns the highest block known to bitcoind. +func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) { + bcinfo, err := c.chainConn.client.GetBlockChainInfo() + if err != nil { + return nil, 0, err + } + + hash, err := chainhash.NewHashFromStr(bcinfo.BestBlockHash) + if err != nil { + return nil, 0, err + } + + return hash, bcinfo.Blocks, nil +} + +// GetBlockHeight returns the height for the hash, if known, or returns an +// error. +func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { + header, err := c.chainConn.client.GetBlockHeaderVerbose(hash) + if err != nil { + return 0, err + } + + return header.Height, nil +} + +// GetBlock returns a block from the hash. +func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, error) { + return c.chainConn.client.GetBlock(hash) +} + +// GetBlockVerbose returns a verbose block from the hash. +func (c *BitcoindClient) GetBlockVerbose( + hash *chainhash.Hash) (*btcjson.GetBlockVerboseResult, error) { + + return c.chainConn.client.GetBlockVerbose(hash) +} + +// GetBlockHash returns a block hash from the height. +func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) { + return c.chainConn.client.GetBlockHash(height) +} + +// GetBlockHeader returns a block header from the hash. +func (c *BitcoindClient) GetBlockHeader( + hash *chainhash.Hash) (*wire.BlockHeader, error) { + + return c.chainConn.client.GetBlockHeader(hash) +} + +// GetBlockHeaderVerbose returns a block header from the hash. +func (c *BitcoindClient) GetBlockHeaderVerbose( + hash *chainhash.Hash) (*btcjson.GetBlockHeaderVerboseResult, error) { + + return c.chainConn.client.GetBlockHeaderVerbose(hash) +} + +// GetRawTransactionVerbose returns a transaction from the tx hash. +func (c *BitcoindClient) GetRawTransactionVerbose( + hash *chainhash.Hash) (*btcjson.TxRawResult, error) { + + return c.chainConn.client.GetRawTransactionVerbose(hash) +} + +// GetTxOut returns a txout from the outpoint info provided. +func (c *BitcoindClient) GetTxOut(txHash *chainhash.Hash, index uint32, + mempool bool) (*btcjson.GetTxOutResult, error) { + + return c.chainConn.client.GetTxOut(txHash, index, mempool) +} + +// SendRawTransaction sends a raw transaction via bitcoind. +func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx, + allowHighFees bool) (*chainhash.Hash, error) { + + return c.chainConn.client.SendRawTransaction(tx, allowHighFees) +} + +// Notifications returns a channel to retrieve notifications from. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) Notifications() <-chan interface{} { + return c.notificationQueue.ChanOut() +} + +// NotifyReceived allows the chain backend to notify the caller whenever a +// transaction pays to any of the given addresses. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { + c.NotifyBlocks() + + select { + case c.rescanUpdate <- addrs: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + + return nil +} + +// NotifySpent allows the chain backend to notify the caller whenever a +// transaction spends any of the given outpoints. +func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { + c.NotifyBlocks() + + select { + case c.rescanUpdate <- outPoints: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + + return nil +} + +// NotifyTx allows the chain backend to notify the caller whenever any of the +// given transactions confirm within the chain. +func (c *BitcoindClient) NotifyTx(txids []chainhash.Hash) error { + c.NotifyBlocks() + + select { + case c.rescanUpdate <- txids: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + + return nil +} + +// NotifyBlocks allows the chain backend to notify the caller whenever a block +// is connected or disconnected. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) NotifyBlocks() error { + atomic.StoreUint32(&c.notifyBlocks, 1) + return nil +} + +// shouldNotifyBlocks determines whether the client should send block +// notifications to the caller. +func (c *BitcoindClient) shouldNotifyBlocks() bool { + return atomic.LoadUint32(&c.notifyBlocks) == 1 +} + +// LoadTxFilter uses the given filters to what we should match transactions +// against to determine if they are relevant to the client. The reset argument +// is used to reset the current filters. +// +// The current filters supported are of the following types: +// []btcutil.Address +// []wire.OutPoint +// []*wire.OutPoint +// map[wire.OutPoint]btcutil.Address +// []chainhash.Hash +// []*chainhash.Hash +func (c *BitcoindClient) LoadTxFilter(reset bool, filters ...interface{}) error { + if reset { + select { + case c.rescanUpdate <- struct{}{}: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + } + + updateFilter := func(filter interface{}) error { + select { + case c.rescanUpdate <- filter: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + + return nil + } + + // In order to make this operation atomic, we'll iterate through the + // filters twice: the first to ensure there aren't any unsupported + // filter types, and the second to actually update our filters. + for _, filter := range filters { + switch filter := filter.(type) { + case []btcutil.Address, []wire.OutPoint, []*wire.OutPoint, + map[wire.OutPoint]btcutil.Address, []chainhash.Hash, + []*chainhash.Hash: + + // Proceed to check the next filter type. + default: + return fmt.Errorf("unsupported filter type %T", filter) + } + } + + for _, filter := range filters { + if err := updateFilter(filter); err != nil { + return err + } + } + + return nil +} + +// RescanBlocks rescans any blocks passed, returning only the blocks that +// matched as []btcjson.BlockDetails. +func (c *BitcoindClient) RescanBlocks( + blockHashes []chainhash.Hash) ([]btcjson.RescannedBlock, error) { + + rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes)) + for _, hash := range blockHashes { + header, err := c.GetBlockHeaderVerbose(&hash) + if err != nil { + log.Warnf("Unable to get header %s from bitcoind: %s", + hash, err) + continue + } + + block, err := c.GetBlock(&hash) + if err != nil { + log.Warnf("Unable to get block %s from bitcoind: %s", + hash, err) + continue + } + + relevantTxs, err := c.filterBlock(block, header.Height, false) + if len(relevantTxs) > 0 { + rescannedBlock := btcjson.RescannedBlock{ + Hash: hash.String(), + } + for _, tx := range relevantTxs { + rescannedBlock.Transactions = append( + rescannedBlock.Transactions, + hex.EncodeToString(tx.SerializedTx), + ) + } + + rescannedBlocks = append(rescannedBlocks, rescannedBlock) + } + } + + return rescannedBlocks, nil +} + +// Rescan rescans from the block with the given hash until the current block, +// after adding the passed addresses and outpoints to the client's watch list. +func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, + addresses []btcutil.Address, outPoints map[wire.OutPoint]btcutil.Address) error { + + // A block hash is required to use as the starting point of the rescan. + if blockHash == nil { + return errors.New("rescan requires a starting block hash") + } + + // We'll then update our filters with the given outpoints and addresses. + select { + case c.rescanUpdate <- addresses: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + + select { + case c.rescanUpdate <- outPoints: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + + // Once the filters have been updated, we can begin the rescan. + select { + case c.rescanUpdate <- *blockHash: + case <-c.quit: + return ErrBitcoindClientShuttingDown + } + + return nil +} + +// Start initializes the bitcoind rescan client using the backing bitcoind +// connection and starts all goroutines necessary in order to process rescans +// and ZMQ notifications. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) Start() error { + if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { + return nil + } + + // Start the notification queue and immediately dispatch a + // ClientConnected notification to the caller. This is needed as some of + // the callers will require this notification before proceeding. + c.notificationQueue.Start() + c.notificationQueue.ChanIn() <- ClientConnected{} + + // Retrieve the best block of the chain. + bestHash, bestHeight, err := c.GetBestBlock() + if err != nil { + return fmt.Errorf("unable to retrieve best block: %v", err) + } + bestHeader, err := c.GetBlockHeaderVerbose(bestHash) + if err != nil { + return fmt.Errorf("unable to retrieve header for best block: "+ + "%v", err) + } + + c.bestBlockMtx.Lock() + c.bestBlock = waddrmgr.BlockStamp{ + Hash: *bestHash, + Height: bestHeight, + Timestamp: time.Unix(bestHeader.Time, 0), + } + c.bestBlockMtx.Unlock() + + // Once the client has started successfully, we'll include it in the set + // of rescan clients of the backing bitcoind connection in order to + // received ZMQ event notifications. + c.chainConn.AddClient(c) + + c.wg.Add(2) + go c.rescanHandler() + go c.ntfnHandler() + + return nil +} + +// Stop stops the bitcoind rescan client from processing rescans and ZMQ +// notifications. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) Stop() { + if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { + return + } + + close(c.quit) + + // Remove this client's reference from the bitcoind connection to + // prevent sending notifications to it after it's been stopped. + c.chainConn.RemoveClient(c.id) + + c.notificationQueue.Stop() +} + +// WaitForShutdown blocks until the client has finished disconnecting and all +// handlers have exited. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) WaitForShutdown() { + c.wg.Wait() +} + +// rescanHandler handles the logic needed for the caller to trigger a chain +// rescan. +// +// NOTE: This must be called as a goroutine. +func (c *BitcoindClient) rescanHandler() { + defer c.wg.Done() + + for { + select { + case update := <-c.rescanUpdate: + switch update := update.(type) { + + // We're clearing the filters. + case struct{}: + c.watchMtx.Lock() + c.watchedOutPoints = make(map[wire.OutPoint]struct{}) + c.watchedAddresses = make(map[string]struct{}) + c.watchedTxs = make(map[chainhash.Hash]struct{}) + c.watchMtx.Unlock() + + // We're adding the addresses to our filter. + case []btcutil.Address: + c.watchMtx.Lock() + for _, addr := range update { + c.watchedAddresses[addr.String()] = struct{}{} + } + c.watchMtx.Unlock() + + // We're adding the outpoints to our filter. + case []wire.OutPoint: + c.watchMtx.Lock() + for _, op := range update { + c.watchedOutPoints[op] = struct{}{} + } + c.watchMtx.Unlock() + case []*wire.OutPoint: + c.watchMtx.Lock() + for _, op := range update { + c.watchedOutPoints[*op] = struct{}{} + } + c.watchMtx.Unlock() + + // We're adding the outpoints that map to the scripts + // that we should scan for to our filter. + case map[wire.OutPoint]btcutil.Address: + c.watchMtx.Lock() + for op := range update { + c.watchedOutPoints[op] = struct{}{} + } + c.watchMtx.Unlock() + + // We're adding the transactions to our filter. + case []chainhash.Hash: + c.watchMtx.Lock() + for _, txid := range update { + c.watchedTxs[txid] = struct{}{} + } + c.watchMtx.Unlock() + case []*chainhash.Hash: + c.watchMtx.Lock() + for _, txid := range update { + c.watchedTxs[*txid] = struct{}{} + } + c.watchMtx.Unlock() + + // We're starting a rescan from the hash. + case chainhash.Hash: + if err := c.rescan(update); err != nil { + log.Errorf("Unable to complete chain "+ + "rescan: %v", err) + } + default: + log.Warnf("Received unexpected filter type %T", + update) + } + case <-c.quit: + return + } + } +} + +// ntfnHandler handles the logic to retrieve ZMQ notifications from the backing +// bitcoind connection. +// +// NOTE: This must be called as a goroutine. +func (c *BitcoindClient) ntfnHandler() { + defer c.wg.Done() + + for { + select { + case tx := <-c.zmqTxNtfns: + if _, _, err := c.filterTx(tx, nil, true); err != nil { + log.Error(err) + } + case newBlock := <-c.zmqBlockNtfns: + // If the new block's previous hash matches the best + // hash known to us, then the new block is the next + // successor, so we'll update our best block to reflect + // this and determine if this new block matches any of + // our existing filters. + c.bestBlockMtx.Lock() + bestBlock := c.bestBlock + c.bestBlockMtx.Unlock() + if newBlock.Header.PrevBlock == bestBlock.Hash { + newBlockHeight := bestBlock.Height + 1 + _, err := c.filterBlock( + newBlock, newBlockHeight, true, + ) + if err != nil { + log.Error(err) + continue + } + + // With the block succesfully filtered, we'll + // make it our new best block. + bestBlock.Hash = newBlock.BlockHash() + bestBlock.Height = newBlockHeight + bestBlock.Timestamp = newBlock.Header.Timestamp + + c.bestBlockMtx.Lock() + c.bestBlock = bestBlock + c.bestBlockMtx.Unlock() + + continue + } + + // Otherwise, we've encountered a reorg. + if err := c.reorg(bestBlock, newBlock); err != nil { + log.Errorf("Unable to process chain reorg: %v", + err) + } + case <-c.quit: + return + } + } +} + +// BlockStamp returns the latest block notified by the client, or an error +// if the client has been shut down. +func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { + c.bestBlockMtx.RLock() + bestBlock := c.bestBlock + c.bestBlockMtx.RUnlock() + + return &bestBlock, nil +} + +// onBlockConnected is a callback that's executed whenever a new block has been +// detected. This will queue a BlockConnected notification to the caller. +func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, + timestamp time.Time) { + + if c.shouldNotifyBlocks() { + select { + case c.notificationQueue.ChanIn() <- BlockConnected{ + Block: wtxmgr.Block{ + Hash: *hash, + Height: height, + }, + Time: timestamp, + }: + case <-c.quit: + } + } +} + +// onFilteredBlockConnected is an alternative callback that's executed whenever +// a new block has been detected. It serves the same purpose as +// onBlockConnected, but it also includes a list of the relevant transactions +// found within the block being connected. This will queue a +// FilteredBlockConnected notification to the caller. +func (c *BitcoindClient) onFilteredBlockConnected(height int32, + header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { + + if c.shouldNotifyBlocks() { + select { + case c.notificationQueue.ChanIn() <- FilteredBlockConnected{ + Block: &wtxmgr.BlockMeta{ + Block: wtxmgr.Block{ + Hash: header.BlockHash(), + Height: height, + }, + Time: header.Timestamp, + }, + RelevantTxs: relevantTxs, + }: + case <-c.quit: + } + } +} + +// onBlockDisconnected is a callback that's executed whenever a block has been +// disconnected. This will queue a BlockDisconnected notification to the caller +// with the details of the block being disconnected. +func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, + timestamp time.Time) { + + if c.shouldNotifyBlocks() { + select { + case c.notificationQueue.ChanIn() <- BlockDisconnected{ + Block: wtxmgr.Block{ + Hash: *hash, + Height: height, + }, + Time: timestamp, + }: + case <-c.quit: + } + } +} + +// onRelevantTx is a callback that's executed whenever a transaction is relevant +// to the caller. This means that the transaction matched a specific item in the +// client's different filters. This will queue a RelevantTx notification to the +// caller. +func (c *BitcoindClient) onRelevantTx(tx *wtxmgr.TxRecord, + blockDetails *btcjson.BlockDetails) { + + block, err := parseBlock(blockDetails) + if err != nil { + log.Errorf("Unable to send onRelevantTx notification, failed "+ + "parse block: %v", err) + return + } + + select { + case c.notificationQueue.ChanIn() <- RelevantTx{ + TxRecord: tx, + Block: block, + }: + case <-c.quit: + } +} + +// onRescanProgress is a callback that's executed whenever a rescan is in +// progress. This will queue a RescanProgress notification to the caller with +// the current rescan progress details. +func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, + timestamp time.Time) { + + select { + case c.notificationQueue.ChanIn() <- &RescanProgress{ + Hash: hash, + Height: height, + Time: timestamp, + }: + case <-c.quit: + } +} + +// onRescanFinished is a callback that's executed whenever a rescan has +// finished. This will queue a RescanFinished notification to the caller with +// the details of the last block in the range of the rescan. +func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, + timestamp time.Time) { + + log.Infof("Rescan finished at %d (%s)", height, hash) + + select { + case c.notificationQueue.ChanIn() <- &RescanFinished{ + Hash: hash, + Height: height, + Time: timestamp, + }: + case <-c.quit: + } +} + +// reorg processes a reorganization during chain synchronization. This is +// separate from a rescan's handling of a reorg. This will rewind back until it +// finds a common ancestor and notify all the new blocks since then. +func (c *BitcoindClient) reorg(currentBlock waddrmgr.BlockStamp, + reorgBlock *wire.MsgBlock) error { + + log.Debugf("Possible reorg at block %s", reorgBlock.BlockHash()) + + // Retrieve the best known height based on the block which caused the + // reorg. This way, we can preserve the chain of blocks we need to + // retrieve. + bestHash := reorgBlock.BlockHash() + bestHeight, err := c.GetBlockHeight(&bestHash) + if err != nil { + return err + } + + if bestHeight < currentBlock.Height { + log.Debug("Detected multiple reorgs") + return nil + } + + // We'll now keep track of all the blocks known to the *chain*, starting + // from the best block known to us until the best block in the chain. + // This will let us fast-forward despite any future reorgs. + blocksToNotify := list.New() + blocksToNotify.PushFront(reorgBlock) + previousBlock := reorgBlock.Header.PrevBlock + for i := bestHeight - 1; i >= currentBlock.Height; i-- { + block, err := c.GetBlock(&previousBlock) + if err != nil { + return err + } + blocksToNotify.PushFront(block) + previousBlock = block.Header.PrevBlock + } + + // Rewind back to the last common ancestor block using the previous + // block hash from each header to avoid any race conditions. If we + // encounter more reorgs, they'll be queued and we'll repeat the cycle. + // + // We'll start by retrieving the header to the best block known to us. + currentHeader, err := c.GetBlockHeader(¤tBlock.Hash) + if err != nil { + return err + } + + // Then, we'll walk backwards in the chain until we find our common + // ancestor. + for previousBlock != currentHeader.PrevBlock { + // Since the previous hashes don't match, the current block has + // been reorged out of the chain, so we should send a + // BlockDisconnected notification for it. + log.Debugf("Disconnecting block %d (%v)", currentBlock.Height, + currentBlock.Hash) + + c.onBlockDisconnected( + ¤tBlock.Hash, currentBlock.Height, + currentBlock.Timestamp, + ) + + // Our current block should now reflect the previous one to + // continue the common ancestor search. + currentHeader, err = c.GetBlockHeader(¤tHeader.PrevBlock) + if err != nil { + return err + } + + currentBlock.Height-- + currentBlock.Hash = currentHeader.PrevBlock + currentBlock.Timestamp = currentHeader.Timestamp + + // Store the correct block in our list in order to notify it + // once we've found our common ancestor. + block, err := c.GetBlock(&previousBlock) + if err != nil { + return err + } + blocksToNotify.PushFront(block) + previousBlock = block.Header.PrevBlock + } + + // Disconnect the last block from the old chain. Since the previous + // block remains the same between the old and new chains, the tip will + // now be the last common ancestor. + log.Debugf("Disconnecting block %d (%v)", currentBlock.Height, + currentBlock.Hash) + + c.onBlockDisconnected( + ¤tBlock.Hash, currentBlock.Height, currentHeader.Timestamp, + ) + + currentBlock.Height-- + + // Now we fast-forward to the new block, notifying along the way. + for blocksToNotify.Front() != nil { + nextBlock := blocksToNotify.Front().Value.(*wire.MsgBlock) + nextHeight := currentBlock.Height + 1 + nextHash := nextBlock.BlockHash() + nextHeader, err := c.GetBlockHeader(&nextHash) + if err != nil { + return err + } + + _, err = c.filterBlock(nextBlock, nextHeight, true) + if err != nil { + return err + } + + currentBlock.Height = nextHeight + currentBlock.Hash = nextHash + currentBlock.Timestamp = nextHeader.Timestamp + + blocksToNotify.Remove(blocksToNotify.Front()) + } + + c.bestBlockMtx.Lock() + c.bestBlock = currentBlock + c.bestBlockMtx.Unlock() + + return nil +} + +// FilterBlocks scans the blocks contained in the FilterBlocksRequest for any +// addresses of interest. Each block will be fetched and filtered sequentially, +// returning a FilterBlocksReponse for the first block containing a matching +// address. If no matches are found in the range of blocks requested, the +// returned response will be nil. +// +// NOTE: This is part of the chain.Interface interface. +func (c *BitcoindClient) FilterBlocks( + req *FilterBlocksRequest) (*FilterBlocksResponse, error) { + + blockFilterer := NewBlockFilterer(c.chainParams, req) + + // Iterate over the requested blocks, fetching each from the rpc client. + // Each block will scanned using the reverse addresses indexes generated + // above, breaking out early if any addresses are found. + for i, block := range req.Blocks { + // TODO(conner): add prefetching, since we already know we'll be + // fetching *every* block + rawBlock, err := c.GetBlock(&block.Hash) + if err != nil { + return nil, err + } + + if !blockFilterer.FilterBlock(rawBlock) { + continue + } + + // If any external or internal addresses were detected in this + // block, we return them to the caller so that the rescan + // windows can widened with subsequent addresses. The + // `BatchIndex` is returned so that the caller can compute the + // *next* block from which to begin again. + resp := &FilterBlocksResponse{ + BatchIndex: uint32(i), + BlockMeta: block, + FoundExternalAddrs: blockFilterer.FoundExternal, + FoundInternalAddrs: blockFilterer.FoundInternal, + FoundOutPoints: blockFilterer.FoundOutPoints, + RelevantTxns: blockFilterer.RelevantTxns, + } + + return resp, nil + } + + // No addresses were found for this range. + return nil, nil +} + +// rescan performs a rescan of the chain using a bitcoind backend, from the +// specified hash to the best known hash, while watching out for reorgs that +// happen during the rescan. It uses the addresses and outputs being tracked by +// the client in the watch list. This is called only within a queue processing +// loop. +func (c *BitcoindClient) rescan(start chainhash.Hash) error { + log.Infof("Starting rescan from block %s", start) + + // We start by getting the best already processed block. We only use + // the height, as the hash can change during a reorganization, which we + // catch by testing connectivity from known blocks to the previous + // block. + bestHash, bestHeight, err := c.GetBestBlock() + if err != nil { + return err + } + bestHeader, err := c.GetBlockHeaderVerbose(bestHash) + if err != nil { + return err + } + bestBlock := waddrmgr.BlockStamp{ + Hash: *bestHash, + Height: bestHeight, + Timestamp: time.Unix(bestHeader.Time, 0), + } + + // Create a list of headers sorted in forward order. We'll use this in + // the event that we need to backtrack due to a chain reorg. + headers := list.New() + previousHeader, err := c.GetBlockHeaderVerbose(&start) + if err != nil { + return err + } + previousHash, err := chainhash.NewHashFromStr(previousHeader.Hash) + if err != nil { + return err + } + headers.PushBack(previousHeader) + + // Queue a RescanFinished notification to the caller with the last block + // processed throughout the rescan once done. + defer c.onRescanFinished( + previousHash, previousHeader.Height, + time.Unix(previousHeader.Time, 0), + ) + + // Cycle through all of the blocks known to bitcoind, being mindful of + // reorgs. + for i := previousHeader.Height + 1; i <= bestBlock.Height; i++ { + hash, err := c.GetBlockHash(int64(i)) + if err != nil { + return err + } + + // If the previous header is before the wallet birthday, fetch + // the current header and construct a dummy block, rather than + // fetching the whole block itself. This speeds things up as we + // no longer have to fetch the whole block when we know it won't + // match any of our filters. + var block *wire.MsgBlock + afterBirthday := previousHeader.Time >= c.birthday.Unix() + if !afterBirthday { + header, err := c.GetBlockHeader(hash) + if err != nil { + return err + } + block = &wire.MsgBlock{ + Header: *header, + } + + afterBirthday = c.birthday.Before(header.Timestamp) + if afterBirthday { + c.onRescanProgress( + previousHash, i, + block.Header.Timestamp, + ) + } + } + + if afterBirthday { + block, err = c.GetBlock(hash) + if err != nil { + return err + } + } + + for block.Header.PrevBlock.String() != previousHeader.Hash { + // If we're in this for loop, it looks like we've been + // reorganized. We now walk backwards to the common + // ancestor between the best chain and the known chain. + // + // First, we signal a disconnected block to rewind the + // rescan state. + c.onBlockDisconnected( + previousHash, previousHeader.Height, + time.Unix(previousHeader.Time, 0), + ) + + // Get the previous block of the best chain. + hash, err := c.GetBlockHash(int64(i - 1)) + if err != nil { + return err + } + block, err = c.GetBlock(hash) + if err != nil { + return err + } + + // Then, we'll the get the header of this previous + // block. + if headers.Back() != nil { + // If it's already in the headers list, we can + // just get it from there and remove the + // current hash. + headers.Remove(headers.Back()) + if headers.Back() != nil { + previousHeader = headers.Back(). + Value.(*btcjson.GetBlockHeaderVerboseResult) + previousHash, err = chainhash.NewHashFromStr( + previousHeader.Hash, + ) + if err != nil { + return err + } + } + } else { + // Otherwise, we get it from bitcoind. + previousHash, err = chainhash.NewHashFromStr( + previousHeader.PreviousHash, + ) + if err != nil { + return err + } + previousHeader, err = c.GetBlockHeaderVerbose( + previousHash, + ) + if err != nil { + return err + } + } + } + + // Now that we've ensured we haven't come across a reorg, we'll + // add the current block header to our list of headers. + blockHash := block.BlockHash() + previousHash = &blockHash + previousHeader = &btcjson.GetBlockHeaderVerboseResult{ + Hash: blockHash.String(), + Height: i, + PreviousHash: block.Header.PrevBlock.String(), + Time: block.Header.Timestamp.Unix(), + } + headers.PushBack(previousHeader) + + // Notify the block and any of its relevant transacations. + if _, err = c.filterBlock(block, i, true); err != nil { + return err + } + + if i%10000 == 0 { + c.onRescanProgress( + previousHash, i, block.Header.Timestamp, + ) + } + + // If we've reached the previously best known block, check to + // make sure the underlying node hasn't synchronized additional + // blocks. If it has, update the best known block and continue + // to rescan to that point. + if i == bestBlock.Height { + bestHash, bestHeight, err = c.GetBestBlock() + if err != nil { + return err + } + bestHeader, err = c.GetBlockHeaderVerbose(bestHash) + if err != nil { + return err + } + + bestBlock.Hash = *bestHash + bestBlock.Height = bestHeight + bestBlock.Timestamp = time.Unix(bestHeader.Time, 0) + } + } + + return nil +} + +// filterBlock filters a block for watched outpoints and addresses, and returns +// any matching transactions, sending notifications along the way. +func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32, + notify bool) ([]*wtxmgr.TxRecord, error) { + + // If this block happened before the client's birthday, then we'll skip + // it entirely. + if block.Header.Timestamp.Before(c.birthday) { + return nil, nil + } + + if c.shouldNotifyBlocks() { + log.Debugf("Filtering block %d (%s) with %d transactions", + height, block.BlockHash(), len(block.Transactions)) + } + + // Create a block details template to use for all of the confirmed + // transactions found within this block. + blockHash := block.BlockHash() + blockDetails := &btcjson.BlockDetails{ + Hash: blockHash.String(), + Height: height, + Time: block.Header.Timestamp.Unix(), + } + + // Now, we'll through all of the transactions in the block keeping track + // of any relevant to the caller. + var relevantTxs []*wtxmgr.TxRecord + confirmedTxs := make(map[chainhash.Hash]struct{}) + for i, tx := range block.Transactions { + // Update the index in the block details with the index of this + // transaction. + blockDetails.Index = i + isRelevant, rec, err := c.filterTx(tx, blockDetails, notify) + if err != nil { + log.Warnf("Unable to filter transaction %v: %v", + tx.TxHash(), err) + continue + } + + if isRelevant { + relevantTxs = append(relevantTxs, rec) + confirmedTxs[tx.TxHash()] = struct{}{} + } + } + + // Update the expiration map by setting the block's confirmed + // transactions and deleting any in the mempool that were confirmed + // over 288 blocks ago. + c.watchMtx.Lock() + c.expiredMempool[height] = confirmedTxs + if oldBlock, ok := c.expiredMempool[height-288]; ok { + for txHash := range oldBlock { + delete(c.mempool, txHash) + } + delete(c.expiredMempool, height-288) + } + c.watchMtx.Unlock() + + if notify { + c.onFilteredBlockConnected(height, &block.Header, relevantTxs) + c.onBlockConnected(&blockHash, height, block.Header.Timestamp) + } + + return relevantTxs, nil +} + +// filterTx determines whether a transaction is relevant to the client by +// inspecting the client's different filters. +func (c *BitcoindClient) filterTx(tx *wire.MsgTx, + blockDetails *btcjson.BlockDetails, + notify bool) (bool, *wtxmgr.TxRecord, error) { + + txDetails := btcutil.NewTx(tx) + if blockDetails != nil { + txDetails.SetIndex(blockDetails.Index) + } + + rec, err := wtxmgr.NewTxRecordFromMsgTx(txDetails.MsgTx(), time.Now()) + if err != nil { + log.Errorf("Cannot create transaction record for relevant "+ + "tx: %v", err) + return false, nil, err + } + if blockDetails != nil { + rec.Received = time.Unix(blockDetails.Time, 0) + } + + // We'll begin the filtering process by holding the lock to ensure we + // match exactly against what's currently in the filters. + c.watchMtx.Lock() + defer c.watchMtx.Unlock() + + // If we've already seen this transaction and it's now been confirmed, + // then we'll shortcut the filter process by immediately sending a + // notification to the caller that the filter matches. + if _, ok := c.mempool[tx.TxHash()]; ok { + if notify && blockDetails != nil { + c.onRelevantTx(rec, blockDetails) + } + return true, rec, nil + } + + // Otherwise, this is a new transaction we have yet to see. We'll need + // to determine if this transaction is somehow relevant to the caller. + var isRelevant bool + + // We'll start by cycling through its outputs to determine if it pays to + // any of the currently watched addresses. If an output matches, we'll + // add it to our watch list. + for i, out := range tx.TxOut { + _, addrs, _, err := txscript.ExtractPkScriptAddrs( + out.PkScript, c.chainParams, + ) + if err != nil { + log.Debugf("Unable to parse output script in %s:%d: %v", + tx.TxHash(), i, err) + continue + } + + for _, addr := range addrs { + if _, ok := c.watchedAddresses[addr.String()]; ok { + isRelevant = true + op := wire.OutPoint{ + Hash: tx.TxHash(), + Index: uint32(i), + } + c.watchedOutPoints[op] = struct{}{} + } + } + } + + // If the transaction didn't pay to any of our watched addresses, we'll + // check if we're currently watching for the hash of this transaction. + if !isRelevant { + if _, ok := c.watchedTxs[tx.TxHash()]; ok { + isRelevant = true + } + } + + // If the transaction didn't pay to any of our watched hashes, we'll + // check if it spends any of our watched outpoints. + if !isRelevant { + for _, in := range tx.TxIn { + if _, ok := c.watchedOutPoints[in.PreviousOutPoint]; ok { + isRelevant = true + break + } + } + } + + // If the transaction is not relevant to us, we can simply exit. + if !isRelevant { + return false, rec, nil + } + + // Otherwise, the transaction matched our filters, so we should dispatch + // a notification for it. If it's still unconfirmed, we'll include it in + // our mempool so that it can also be notified as part of + // FilteredBlockConnected once it confirms. + if blockDetails == nil { + c.mempool[tx.TxHash()] = struct{}{} + } + + c.onRelevantTx(rec, blockDetails) + + return true, rec, nil +} diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go new file mode 100644 index 0000000..963afe3 --- /dev/null +++ b/chain/bitcoind_conn.go @@ -0,0 +1,362 @@ +package chain + +import ( + "bytes" + "fmt" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/rpcclient" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/gozmq" +) + +// BitcoindConn represents a persistent client connection to a bitcoind node +// that listens for events read from a ZMQ connection. +type BitcoindConn struct { + started int32 // To be used atomically. + stopped int32 // To be used atomically. + + // rescanClientCounter is an atomic counter that assigns a unique ID to + // each new bitcoind rescan client using the current bitcoind + // connection. + rescanClientCounter uint64 + + // chainParams identifies the current network the bitcoind node is + // running on. + chainParams *chaincfg.Params + + // client is the RPC client to the bitcoind node. + client *rpcclient.Client + + // zmqBlockHost is the host listening for ZMQ connections that will be + // responsible for delivering raw transaction events. + zmqBlockHost string + + // zmqTxHost is the host listening for ZMQ connections that will be + // responsible for delivering raw transaction events. + zmqTxHost string + + // zmqPollInterval is the interval at which we'll attempt to retrieve an + // event from the ZMQ connection. + zmqPollInterval time.Duration + + // rescanClients is the set of active bitcoind rescan clients to which + // ZMQ event notfications will be sent to. + rescanClientsMtx sync.Mutex + rescanClients map[uint64]*BitcoindClient + + quit chan struct{} + wg sync.WaitGroup +} + +// NewBitcoindConn creates a client connection to the node described by the host +// string. The connection is not established immediately, but must be done using +// the Start method. If the remote node does not operate on the same bitcoin +// network as described by the passed chain parameters, the connection will be +// disconnected. +func NewBitcoindConn(chainParams *chaincfg.Params, + host, user, pass, zmqBlockHost, zmqTxHost string, + zmqPollInterval time.Duration) (*BitcoindConn, error) { + + clientCfg := &rpcclient.ConnConfig{ + Host: host, + User: user, + Pass: pass, + DisableAutoReconnect: false, + DisableConnectOnNew: true, + DisableTLS: true, + HTTPPostMode: true, + } + + client, err := rpcclient.New(clientCfg, nil) + if err != nil { + return nil, err + } + + conn := &BitcoindConn{ + chainParams: chainParams, + client: client, + zmqBlockHost: zmqBlockHost, + zmqTxHost: zmqTxHost, + zmqPollInterval: zmqPollInterval, + rescanClients: make(map[uint64]*BitcoindClient), + quit: make(chan struct{}), + } + + return conn, nil +} + +// Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If +// successful, a goroutine is spawned to read events from the ZMQ connection. +// It's possible for this function to fail due to a limited number of connection +// attempts. This is done to prevent waiting forever on the connection to be +// established in the case that the node is down. +func (c *BitcoindConn) Start() error { + if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { + return nil + } + + // Verify that the node is running on the expected network. + net, err := c.getCurrentNet() + if err != nil { + c.client.Disconnect() + return err + } + if net != c.chainParams.Net { + c.client.Disconnect() + return fmt.Errorf("expected network %v, got %v", + c.chainParams.Net, net) + } + + // Establish two different ZMQ connections to bitcoind to retrieve block + // and transaction event notifications. We'll use two as a separation of + // concern to ensure one type of event isn't dropped from the connection + // queue due to another type of event filling it up. + zmqBlockConn, err := gozmq.Subscribe( + c.zmqBlockHost, []string{"rawblock"}, c.zmqPollInterval, + ) + if err != nil { + c.client.Disconnect() + return fmt.Errorf("unable to subscribe for zmq block events: "+ + "%v", err) + } + + zmqTxConn, err := gozmq.Subscribe( + c.zmqTxHost, []string{"rawtx"}, c.zmqPollInterval, + ) + if err != nil { + c.client.Disconnect() + return fmt.Errorf("unable to subscribe for zmq tx events: %v", + err) + } + + c.wg.Add(2) + go c.blockEventHandler(zmqBlockConn) + go c.txEventHandler(zmqTxConn) + + return nil +} + +// Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any +// active rescan clients. +func (c *BitcoindConn) Stop() { + if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { + return + } + + for _, client := range c.rescanClients { + client.Stop() + } + + close(c.quit) + c.client.Shutdown() + + c.client.WaitForShutdown() + c.wg.Wait() +} + +// blockEventHandler reads raw blocks events from the ZMQ block socket and +// forwards them along to the current rescan clients. +// +// NOTE: This must be run as a goroutine. +func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { + defer c.wg.Done() + defer conn.Close() + + log.Info("Started listening for bitcoind block notifications via ZMQ ", + "on", c.zmqBlockHost) + + for { + // Before attempting to read from the ZMQ socket, we'll make + // sure to check if we've been requested to shut down. + select { + case <-c.quit: + return + default: + } + + // Poll an event from the ZMQ socket. It's possible that the + // connection to the socket continuously times out, so we'll + // prevent logging this error to prevent spamming the logs. + msgBytes, err := conn.Receive() + if err != nil { + err, ok := err.(net.Error) + if !ok || !err.Timeout() { + log.Error(err) + } + + continue + } + + // We have an event! We'll now ensure it is a block event, + // deserialize it, and report it to the different rescan + // clients. + eventType := string(msgBytes[0]) + switch eventType { + case "rawblock": + block := &wire.MsgBlock{} + r := bytes.NewReader(msgBytes[1]) + if err := block.Deserialize(r); err != nil { + log.Errorf("Unable to deserialize block: %v", + err) + continue + } + + c.rescanClientsMtx.Lock() + for _, client := range c.rescanClients { + select { + case client.zmqBlockNtfns <- block: + case <-client.quit: + case <-c.quit: + c.rescanClientsMtx.Unlock() + return + } + } + c.rescanClientsMtx.Unlock() + default: + log.Warnf("Received unexpected event type from "+ + "rawblock subscription: %v", eventType) + } + } +} + +// txEventHandler reads raw blocks events from the ZMQ block socket and forwards +// them along to the current rescan clients. +// +// NOTE: This must be run as a goroutine. +func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) { + defer c.wg.Done() + defer conn.Close() + + log.Info("Started listening for bitcoind transaction notifications "+ + "via ZMQ on ", c.zmqTxHost) + + for { + // Before attempting to read from the ZMQ socket, we'll make + // sure to check if we've been requested to shut down. + select { + case <-c.quit: + return + default: + } + + // Poll an event from the ZMQ socket. It's possible that the + // connection to the socket continuously times out, so we'll + // prevent logging this error to prevent spamming the logs. + msgBytes, err := conn.Receive() + if err != nil { + err, ok := err.(net.Error) + if !ok || !err.Timeout() { + log.Error(err) + } + + continue + } + + // We have an event! We'll now ensure it is a transaction event, + // deserialize it, and report it to the different rescan + // clients. + eventType := string(msgBytes[0]) + switch eventType { + case "rawtx": + tx := &wire.MsgTx{} + r := bytes.NewReader(msgBytes[1]) + if err := tx.Deserialize(r); err != nil { + log.Errorf("Unable to deserialize "+ + "transaction: %v", err) + continue + } + + c.rescanClientsMtx.Lock() + for _, client := range c.rescanClients { + select { + case client.zmqTxNtfns <- tx: + case <-client.quit: + case <-c.quit: + c.rescanClientsMtx.Unlock() + return + } + } + c.rescanClientsMtx.Unlock() + default: + log.Warnf("Received unexpected event type from rawtx "+ + "subscription: %v", eventType) + } + } +} + +// getCurrentNet returns the network on which the bitcoind node is running. +func (c *BitcoindConn) getCurrentNet() (wire.BitcoinNet, error) { + hash, err := c.client.GetBlockHash(0) + if err != nil { + return 0, err + } + + switch *hash { + case *chaincfg.TestNet3Params.GenesisHash: + return chaincfg.TestNet3Params.Net, nil + case *chaincfg.RegressionNetParams.GenesisHash: + return chaincfg.RegressionNetParams.Net, nil + case *chaincfg.MainNetParams.GenesisHash: + return chaincfg.MainNetParams.Net, nil + default: + return 0, fmt.Errorf("unknown network with genesis hash %v", hash) + } +} + +// NewBitcoindClient returns a bitcoind client using the current bitcoind +// connection. This allows us to share the same connection using multiple +// clients. The birthday signifies the earliest time for which we should begin +// scanning the chain. +func (c *BitcoindConn) NewBitcoindClient(birthday time.Time) *BitcoindClient { + return &BitcoindClient{ + quit: make(chan struct{}), + + id: atomic.AddUint64(&c.rescanClientCounter, 1), + + birthday: birthday, + chainParams: c.chainParams, + chainConn: c, + + rescanUpdate: make(chan interface{}), + watchedAddresses: make(map[string]struct{}), + watchedOutPoints: make(map[wire.OutPoint]struct{}), + watchedTxs: make(map[chainhash.Hash]struct{}), + + notificationQueue: NewConcurrentQueue(20), + zmqTxNtfns: make(chan *wire.MsgTx), + zmqBlockNtfns: make(chan *wire.MsgBlock), + + mempool: make(map[chainhash.Hash]struct{}), + expiredMempool: make(map[int32]map[chainhash.Hash]struct{}), + } +} + +// AddClient adds a client to the set of active rescan clients of the current +// chain connection. This allows the connection to include the specified client +// in its notification delivery. +// +// NOTE: This function is safe for concurrent access. +func (c *BitcoindConn) AddClient(client *BitcoindClient) { + c.rescanClientsMtx.Lock() + defer c.rescanClientsMtx.Unlock() + + c.rescanClients[client.id] = client +} + +// RemoveClient removes the client with the given ID from the set of active +// rescan clients. Once removed, the client will no longer receive block and +// transaction notifications from the chain connection. +// +// NOTE: This function is safe for concurrent access. +func (c *BitcoindConn) RemoveClient(id uint64) { + c.rescanClientsMtx.Lock() + defer c.rescanClientsMtx.Unlock() + + delete(c.rescanClients, id) +} diff --git a/chain/queue.go b/chain/queue.go new file mode 100644 index 0000000..b30515b --- /dev/null +++ b/chain/queue.go @@ -0,0 +1,88 @@ +package chain + +import ( + "container/list" +) + +// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. +// Clients interact with the queue by pushing items into the in channel and +// popping items from the out channel. There is a goroutine that manages moving +// items from the in channel to the out channel in the correct order that must +// be started by calling Start(). +type ConcurrentQueue struct { + chanIn chan interface{} + chanOut chan interface{} + quit chan struct{} + overflow *list.List +} + +// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is +// the capacity of the output channel. When the size of the queue is below this +// threshold, pushes do not incur the overhead of the less efficient overflow +// structure. +func NewConcurrentQueue(bufferSize int) *ConcurrentQueue { + return &ConcurrentQueue{ + chanIn: make(chan interface{}), + chanOut: make(chan interface{}, bufferSize), + quit: make(chan struct{}), + overflow: list.New(), + } +} + +// ChanIn returns a channel that can be used to push new items into the queue. +func (cq *ConcurrentQueue) ChanIn() chan<- interface{} { + return cq.chanIn +} + +// ChanOut returns a channel that can be used to pop items from the queue. +func (cq *ConcurrentQueue) ChanOut() <-chan interface{} { + return cq.chanOut +} + +// Start begins a goroutine that manages moving items from the in channel to +// the out channel. The queue tries to move items directly to the out channel +// minimize overhead, but if the out channel is full it pushes items to an +// overflow queue. This must be called before using the queue. +func (cq *ConcurrentQueue) Start() { + go func() { + for { + nextElement := cq.overflow.Front() + if nextElement == nil { + // The overflow queue is empty, so incoming + // items can be pushed directly to the output + // channel. However, if output channel is full, + // we'll push to the overflow list instead. + select { + case item := <-cq.chanIn: + select { + case cq.chanOut <- item: + case <-cq.quit: + return + default: + cq.overflow.PushBack(item) + } + case <-cq.quit: + return + } + } else { + // The overflow queue is not empty, so any new + // items get pushed to the back to preserve + // order. + select { + case item := <-cq.chanIn: + cq.overflow.PushBack(item) + case cq.chanOut <- nextElement.Value: + cq.overflow.Remove(nextElement) + case <-cq.quit: + return + } + } + } + }() +} + +// Stop ends the goroutine that moves items from the in channel to the out +// channel. +func (cq *ConcurrentQueue) Stop() { + close(cq.quit) +} diff --git a/wallet/chainntfns.go b/wallet/chainntfns.go index e110cb2..670e136 100644 --- a/wallet/chainntfns.go +++ b/wallet/chainntfns.go @@ -16,10 +16,11 @@ import ( ) func (w *Wallet) handleChainNotifications() { + defer w.wg.Done() + chainClient, err := w.requireChainClient() if err != nil { log.Errorf("handleChainNotifications called without RPC client") - w.wg.Done() return } @@ -84,80 +85,88 @@ func (w *Wallet) handleChainNotifications() { return err } - for n := range chainClient.Notifications() { - var notificationName string - var err error - switch n := n.(type) { - case chain.ClientConnected: - go sync(w) - case chain.BlockConnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.connectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockconnected" - case chain.BlockDisconnected: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) - }) - notificationName = "blockdisconnected" - case chain.RelevantTx: - err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { - return w.addRelevantTx(tx, n.TxRecord, n.Block) - }) - notificationName = "recvtx/redeemingtx" - case chain.FilteredBlockConnected: - // Atomically update for the whole block. - if len(n.RelevantTxs) > 0 { - err = walletdb.Update(w.db, func( - tx walletdb.ReadWriteTx) error { - var err error - for _, rec := range n.RelevantTxs { - err = w.addRelevantTx(tx, rec, - n.Block) - if err != nil { - return err - } - } - return nil - }) + for { + select { + case n, ok := <-chainClient.Notifications(): + if !ok { + return } - notificationName = "filteredblockconnected" - // The following require some database maintenance, but also - // need to be reported to the wallet's rescan goroutine. - case *chain.RescanProgress: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return + var notificationName string + var err error + switch n := n.(type) { + case chain.ClientConnected: + go sync(w) + case chain.BlockConnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.connectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockconnected" + case chain.BlockDisconnected: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.disconnectBlock(tx, wtxmgr.BlockMeta(n)) + }) + notificationName = "blockdisconnected" + case chain.RelevantTx: + err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error { + return w.addRelevantTx(tx, n.TxRecord, n.Block) + }) + notificationName = "recvtx/redeemingtx" + case chain.FilteredBlockConnected: + // Atomically update for the whole block. + if len(n.RelevantTxs) > 0 { + err = walletdb.Update(w.db, func( + tx walletdb.ReadWriteTx) error { + var err error + for _, rec := range n.RelevantTxs { + err = w.addRelevantTx(tx, rec, + n.Block) + if err != nil { + return err + } + } + return nil + }) + } + notificationName = "filteredblockconnected" + + // The following require some database maintenance, but also + // need to be reported to the wallet's rescan goroutine. + case *chain.RescanProgress: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } + case *chain.RescanFinished: + err = catchUpHashes(w, chainClient, n.Height) + notificationName = "rescanprogress" + w.SetChainSynced(true) + select { + case w.rescanNotifications <- n: + case <-w.quitChan(): + return + } } - case *chain.RescanFinished: - err = catchUpHashes(w, chainClient, n.Height) - notificationName = "rescanprogress" - w.SetChainSynced(true) - select { - case w.rescanNotifications <- n: - case <-w.quitChan(): - return - } - } - if err != nil { - // On out-of-sync blockconnected notifications, only - // send a debug message. - errStr := "Failed to process consensus server " + - "notification (name: `%s`, detail: `%v`)" - if notificationName == "blockconnected" && - strings.Contains(err.Error(), - "couldn't get hash from database") { - log.Debugf(errStr, notificationName, err) - } else { - log.Errorf(errStr, notificationName, err) + if err != nil { + // On out-of-sync blockconnected notifications, only + // send a debug message. + errStr := "Failed to process consensus server " + + "notification (name: `%s`, detail: `%v`)" + if notificationName == "blockconnected" && + strings.Contains(err.Error(), + "couldn't get hash from database") { + log.Debugf(errStr, notificationName, err) + } else { + log.Errorf(errStr, notificationName, err) + } } + case <-w.quit: + return } } - w.wg.Done() } // connectBlock handles a chain server notification by marking a wallet diff --git a/wallet/wallet.go b/wallet/wallet.go index a9948ec..8411494 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -173,8 +173,6 @@ func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) { switch cc := chainClient.(type) { case *chain.NeutrinoClient: cc.SetStartTime(w.Manager.Birthday()) - case *chain.BitcoindClient: - cc.SetStartTime(w.Manager.Birthday()) } w.chainClientLock.Unlock()