package chain import ( "bytes" "fmt" "io" "net" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/rpcclient" "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/gozmq" ) // BitcoindConn represents a persistent client connection to a bitcoind node // that listens for events read from a ZMQ connection. type BitcoindConn struct { started int32 // To be used atomically. stopped int32 // To be used atomically. // rescanClientCounter is an atomic counter that assigns a unique ID to // each new bitcoind rescan client using the current bitcoind // connection. rescanClientCounter uint64 // chainParams identifies the current network the bitcoind node is // running on. chainParams *chaincfg.Params // client is the RPC client to the bitcoind node. client *rpcclient.Client // zmqBlockConn is the ZMQ connection we'll use to read raw block // events. zmqBlockConn *gozmq.Conn // zmqTxConn is the ZMQ connection we'll use to read raw transaction // events. zmqTxConn *gozmq.Conn // rescanClients is the set of active bitcoind rescan clients to which // ZMQ event notfications will be sent to. rescanClientsMtx sync.Mutex rescanClients map[uint64]*BitcoindClient quit chan struct{} wg sync.WaitGroup } // NewBitcoindConn creates a client connection to the node described by the host // string. The ZMQ connections are established immediately to ensure liveness. // If the remote node does not operate on the same bitcoin network as described // by the passed chain parameters, the connection will be disconnected. func NewBitcoindConn(chainParams *chaincfg.Params, host, user, pass, zmqBlockHost, zmqTxHost string, zmqPollInterval time.Duration) (*BitcoindConn, error) { clientCfg := &rpcclient.ConnConfig{ Host: host, User: user, Pass: pass, DisableAutoReconnect: false, DisableConnectOnNew: true, DisableTLS: true, HTTPPostMode: true, } client, err := rpcclient.New(clientCfg, nil) if err != nil { return nil, err } // Establish two different ZMQ connections to bitcoind to retrieve block // and transaction event notifications. We'll use two as a separation of // concern to ensure one type of event isn't dropped from the connection // queue due to another type of event filling it up. zmqBlockConn, err := gozmq.Subscribe( zmqBlockHost, []string{"rawblock"}, zmqPollInterval, ) if err != nil { return nil, fmt.Errorf("unable to subscribe for zmq block "+ "events: %v", err) } zmqTxConn, err := gozmq.Subscribe( zmqTxHost, []string{"rawtx"}, zmqPollInterval, ) if err != nil { zmqBlockConn.Close() return nil, fmt.Errorf("unable to subscribe for zmq tx "+ "events: %v", err) } conn := &BitcoindConn{ chainParams: chainParams, client: client, zmqBlockConn: zmqBlockConn, zmqTxConn: zmqTxConn, rescanClients: make(map[uint64]*BitcoindClient), quit: make(chan struct{}), } return conn, nil } // Start attempts to establish a RPC and ZMQ connection to a bitcoind node. If // successful, a goroutine is spawned to read events from the ZMQ connection. // It's possible for this function to fail due to a limited number of connection // attempts. This is done to prevent waiting forever on the connection to be // established in the case that the node is down. func (c *BitcoindConn) Start() error { if !atomic.CompareAndSwapInt32(&c.started, 0, 1) { return nil } // Verify that the node is running on the expected network. net, err := c.getCurrentNet() if err != nil { return err } if net != c.chainParams.Net { return fmt.Errorf("expected network %v, got %v", c.chainParams.Net, net) } c.wg.Add(2) go c.blockEventHandler() go c.txEventHandler() return nil } // Stop terminates the RPC and ZMQ connection to a bitcoind node and removes any // active rescan clients. func (c *BitcoindConn) Stop() { if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) { return } for _, client := range c.rescanClients { client.Stop() } close(c.quit) c.client.Shutdown() c.zmqBlockConn.Close() c.zmqTxConn.Close() c.client.WaitForShutdown() c.wg.Wait() } // blockEventHandler reads raw blocks events from the ZMQ block socket and // forwards them along to the current rescan clients. // // NOTE: This must be run as a goroutine. func (c *BitcoindConn) blockEventHandler() { defer c.wg.Done() log.Info("Started listening for bitcoind block notifications via ZMQ "+ "on", c.zmqBlockConn.RemoteAddr()) for { // Before attempting to read from the ZMQ socket, we'll make // sure to check if we've been requested to shut down. select { case <-c.quit: return default: } // Poll an event from the ZMQ socket. msgBytes, err := c.zmqBlockConn.Receive() if err != nil { // EOF should only be returned if the connection was // explicitly closed, so we can exit at this point. if err == io.EOF { return } // It's possible that the connection to the socket // continuously times out, so we'll prevent logging this // error to prevent spamming the logs. netErr, ok := err.(net.Error) if ok && netErr.Timeout() { continue } log.Errorf("Unable to receive ZMQ rawblock message: %v", err) continue } // We have an event! We'll now ensure it is a block event, // deserialize it, and report it to the different rescan // clients. eventType := string(msgBytes[0]) switch eventType { case "rawblock": block := &wire.MsgBlock{} r := bytes.NewReader(msgBytes[1]) if err := block.Deserialize(r); err != nil { log.Errorf("Unable to deserialize block: %v", err) continue } c.rescanClientsMtx.Lock() for _, client := range c.rescanClients { select { case client.zmqBlockNtfns <- block: case <-client.quit: case <-c.quit: c.rescanClientsMtx.Unlock() return } } c.rescanClientsMtx.Unlock() default: // It's possible that the message wasn't fully read if // bitcoind shuts down, which will produce an unreadable // event type. To prevent from logging it, we'll make // sure it conforms to the ASCII standard. if eventType == "" || !isASCII(eventType) { continue } log.Warnf("Received unexpected event type from "+ "rawblock subscription: %v", eventType) } } } // txEventHandler reads raw blocks events from the ZMQ block socket and forwards // them along to the current rescan clients. // // NOTE: This must be run as a goroutine. func (c *BitcoindConn) txEventHandler() { defer c.wg.Done() log.Info("Started listening for bitcoind transaction notifications "+ "via ZMQ on", c.zmqTxConn.RemoteAddr()) for { // Before attempting to read from the ZMQ socket, we'll make // sure to check if we've been requested to shut down. select { case <-c.quit: return default: } // Poll an event from the ZMQ socket. msgBytes, err := c.zmqTxConn.Receive() if err != nil { // EOF should only be returned if the connection was // explicitly closed, so we can exit at this point. if err == io.EOF { return } // It's possible that the connection to the socket // continuously times out, so we'll prevent logging this // error to prevent spamming the logs. netErr, ok := err.(net.Error) if ok && netErr.Timeout() { continue } log.Errorf("Unable to receive ZMQ rawtx message: %v", err) continue } // We have an event! We'll now ensure it is a transaction event, // deserialize it, and report it to the different rescan // clients. eventType := string(msgBytes[0]) switch eventType { case "rawtx": tx := &wire.MsgTx{} r := bytes.NewReader(msgBytes[1]) if err := tx.Deserialize(r); err != nil { log.Errorf("Unable to deserialize "+ "transaction: %v", err) continue } c.rescanClientsMtx.Lock() for _, client := range c.rescanClients { select { case client.zmqTxNtfns <- tx: case <-client.quit: case <-c.quit: c.rescanClientsMtx.Unlock() return } } c.rescanClientsMtx.Unlock() default: // It's possible that the message wasn't fully read if // bitcoind shuts down, which will produce an unreadable // event type. To prevent from logging it, we'll make // sure it conforms to the ASCII standard. if eventType == "" || !isASCII(eventType) { continue } log.Warnf("Received unexpected event type from rawtx "+ "subscription: %v", eventType) } } } // getCurrentNet returns the network on which the bitcoind node is running. func (c *BitcoindConn) getCurrentNet() (wire.BitcoinNet, error) { hash, err := c.client.GetBlockHash(0) if err != nil { return 0, err } switch *hash { case *chaincfg.TestNet3Params.GenesisHash: return chaincfg.TestNet3Params.Net, nil case *chaincfg.RegressionNetParams.GenesisHash: return chaincfg.RegressionNetParams.Net, nil case *chaincfg.MainNetParams.GenesisHash: return chaincfg.MainNetParams.Net, nil default: return 0, fmt.Errorf("unknown network with genesis hash %v", hash) } } // NewBitcoindClient returns a bitcoind client using the current bitcoind // connection. This allows us to share the same connection using multiple // clients. func (c *BitcoindConn) NewBitcoindClient() *BitcoindClient { return &BitcoindClient{ quit: make(chan struct{}), id: atomic.AddUint64(&c.rescanClientCounter, 1), chainParams: c.chainParams, chainConn: c, rescanUpdate: make(chan interface{}), watchedAddresses: make(map[string]struct{}), watchedOutPoints: make(map[wire.OutPoint]struct{}), watchedTxs: make(map[chainhash.Hash]struct{}), notificationQueue: NewConcurrentQueue(20), zmqTxNtfns: make(chan *wire.MsgTx), zmqBlockNtfns: make(chan *wire.MsgBlock), mempool: make(map[chainhash.Hash]struct{}), expiredMempool: make(map[int32]map[chainhash.Hash]struct{}), } } // AddClient adds a client to the set of active rescan clients of the current // chain connection. This allows the connection to include the specified client // in its notification delivery. // // NOTE: This function is safe for concurrent access. func (c *BitcoindConn) AddClient(client *BitcoindClient) { c.rescanClientsMtx.Lock() defer c.rescanClientsMtx.Unlock() c.rescanClients[client.id] = client } // RemoveClient removes the client with the given ID from the set of active // rescan clients. Once removed, the client will no longer receive block and // transaction notifications from the chain connection. // // NOTE: This function is safe for concurrent access. func (c *BitcoindConn) RemoveClient(id uint64) { c.rescanClientsMtx.Lock() defer c.rescanClientsMtx.Unlock() delete(c.rescanClients, id) } // isASCII is a helper method that checks whether all bytes in `data` would be // printable ASCII characters if interpreted as a string. func isASCII(s string) bool { for _, c := range s { if c < 32 || c > 126 { return false } } return true }