diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index e89631c..c522a1c 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -3,6 +3,7 @@ package chain import ( "bytes" "fmt" + "io" "net" "sync" "sync/atomic" @@ -33,17 +34,13 @@ type BitcoindConn struct { // client is the RPC client to the bitcoind node. client *rpcclient.Client - // zmqBlockHost is the host listening for ZMQ connections that will be - // responsible for delivering raw transaction events. - zmqBlockHost string + // zmqBlockConn is the ZMQ connection we'll use to read raw block + // events. + zmqBlockConn *gozmq.Conn - // zmqTxHost is the host listening for ZMQ connections that will be - // responsible for delivering raw transaction events. - zmqTxHost string - - // zmqPollInterval is the interval at which we'll attempt to retrieve an - // event from the ZMQ connection. - zmqPollInterval time.Duration + // zmqTxConn is the ZMQ connection we'll use to read raw transaction + // events. + zmqTxConn *gozmq.Conn // rescanClients is the set of active bitcoind rescan clients to which // ZMQ event notfications will be sent to. @@ -55,10 +52,9 @@ type BitcoindConn struct { } // NewBitcoindConn creates a client connection to the node described by the host -// string. The connection is not established immediately, but must be done using -// the Start method. If the remote node does not operate on the same bitcoin -// network as described by the passed chain parameters, the connection will be -// disconnected. +// string. The ZMQ connections are established immediately to ensure liveness. +// If the remote node does not operate on the same bitcoin network as described +// by the passed chain parameters, the connection will be disconnected. func NewBitcoindConn(chainParams *chaincfg.Params, host, user, pass, zmqBlockHost, zmqTxHost string, zmqPollInterval time.Duration) (*BitcoindConn, error) { @@ -78,14 +74,36 @@ func NewBitcoindConn(chainParams *chaincfg.Params, return nil, err } + // Establish two different ZMQ connections to bitcoind to retrieve block + // and transaction event notifications. We'll use two as a separation of + // concern to ensure one type of event isn't dropped from the connection + // queue due to another type of event filling it up. + zmqBlockConn, err := gozmq.Subscribe( + zmqBlockHost, []string{"rawblock"}, zmqPollInterval, + ) + if err != nil { + client.Disconnect() + return nil, fmt.Errorf("unable to subscribe for zmq block "+ + "events: %v", err) + } + + zmqTxConn, err := gozmq.Subscribe( + zmqTxHost, []string{"rawtx"}, zmqPollInterval, + ) + if err != nil { + client.Disconnect() + zmqBlockConn.Close() + return nil, fmt.Errorf("unable to subscribe for zmq tx "+ + "events: %v", err) + } + conn := &BitcoindConn{ - chainParams: chainParams, - client: client, - zmqBlockHost: zmqBlockHost, - zmqTxHost: zmqTxHost, - zmqPollInterval: zmqPollInterval, - rescanClients: make(map[uint64]*BitcoindClient), - quit: make(chan struct{}), + chainParams: chainParams, + client: client, + zmqBlockConn: zmqBlockConn, + zmqTxConn: zmqTxConn, + rescanClients: make(map[uint64]*BitcoindClient), + quit: make(chan struct{}), } return conn, nil @@ -113,31 +131,9 @@ func (c *BitcoindConn) Start() error { c.chainParams.Net, net) } - // Establish two different ZMQ connections to bitcoind to retrieve block - // and transaction event notifications. We'll use two as a separation of - // concern to ensure one type of event isn't dropped from the connection - // queue due to another type of event filling it up. - zmqBlockConn, err := gozmq.Subscribe( - c.zmqBlockHost, []string{"rawblock"}, c.zmqPollInterval, - ) - if err != nil { - c.client.Disconnect() - return fmt.Errorf("unable to subscribe for zmq block events: "+ - "%v", err) - } - - zmqTxConn, err := gozmq.Subscribe( - c.zmqTxHost, []string{"rawtx"}, c.zmqPollInterval, - ) - if err != nil { - c.client.Disconnect() - return fmt.Errorf("unable to subscribe for zmq tx events: %v", - err) - } - c.wg.Add(2) - go c.blockEventHandler(zmqBlockConn) - go c.txEventHandler(zmqTxConn) + go c.blockEventHandler() + go c.txEventHandler() return nil } @@ -155,6 +151,8 @@ func (c *BitcoindConn) Stop() { close(c.quit) c.client.Shutdown() + c.zmqBlockConn.Close() + c.zmqTxConn.Close() c.client.WaitForShutdown() c.wg.Wait() @@ -164,12 +162,11 @@ func (c *BitcoindConn) Stop() { // forwards them along to the current rescan clients. // // NOTE: This must be run as a goroutine. -func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { +func (c *BitcoindConn) blockEventHandler() { defer c.wg.Done() - defer conn.Close() log.Info("Started listening for bitcoind block notifications via ZMQ "+ - "on", c.zmqBlockHost) + "on", c.zmqBlockConn.RemoteAddr()) for { // Before attempting to read from the ZMQ socket, we'll make @@ -181,8 +178,14 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { } // Poll an event from the ZMQ socket. - msgBytes, err := conn.Receive() + msgBytes, err := c.zmqBlockConn.Receive() if err != nil { + // EOF should only be returned if the connection was + // explicitly closed, so we can exit at this point. + if err == io.EOF { + return + } + // It's possible that the connection to the socket // continuously times out, so we'll prevent logging this // error to prevent spamming the logs. @@ -240,12 +243,11 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { // them along to the current rescan clients. // // NOTE: This must be run as a goroutine. -func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) { +func (c *BitcoindConn) txEventHandler() { defer c.wg.Done() - defer conn.Close() log.Info("Started listening for bitcoind transaction notifications "+ - "via ZMQ on", c.zmqTxHost) + "via ZMQ on", c.zmqTxConn.RemoteAddr()) for { // Before attempting to read from the ZMQ socket, we'll make @@ -257,8 +259,14 @@ func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) { } // Poll an event from the ZMQ socket. - msgBytes, err := conn.Receive() + msgBytes, err := c.zmqTxConn.Receive() if err != nil { + // EOF should only be returned if the connection was + // explicitly closed, so we can exit at this point. + if err == io.EOF { + return + } + // It's possible that the connection to the socket // continuously times out, so we'll prevent logging this // error to prevent spamming the logs. diff --git a/go.mod b/go.mod index 6f83b2c..1145245 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,9 @@ require ( github.com/jrick/logrotate v1.0.0 github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect github.com/kr/pretty v0.1.0 // indirect - github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885 + github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d github.com/lightninglabs/neutrino v0.0.0-20190313035638-e1ad4c33fb18 + go.etcd.io/bbolt v1.3.3 // indirect golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect diff --git a/go.sum b/go.sum index 664ee2e..2410dd9 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885 h1:fTLuPUkaKIIV0+gA1IxiBDvDxtF8tzpSF6N6NfFGmsU= github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885/go.mod h1:KUh15naRlx/TmUMFS/p4JJrCrE6F7RGF7rsnvuu45E4= +github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE= +github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130 h1:6sZc23+5VbEz2uiHxW12xvS4JYZ3hhgkE5qxzwxaXzg= github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130/go.mod h1:KJq43Fu9ceitbJsSXMILcT4mGDNI/crKmPIkDOZXFyM= github.com/lightninglabs/neutrino v0.0.0-20190213031021-ae4583a89cfb h1:Bwqgn9JXHo7I19lb4zTH2Xb0bfHgNuAJugQE7s00xqA= @@ -90,6 +91,8 @@ go.etcd.io/bbolt v1.3.0 h1:oY10fI923Q5pVCVt1GBTZMn8LHo5M+RCInFpeMnV4QI= go.etcd.io/bbolt v1.3.0/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=