From f02166e5de6ce944af9041c1339fa1ec09772845 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 9 Jul 2019 00:41:16 -0700 Subject: [PATCH] chain: explicitly close connections in BitcoindConn's Stop method Since it's now possible for gozmq.Conn to block when calling Receive, BitcoindConn hangs upon being stopped because its goroutines are waiting for a message to be delivered. To address this, we modify it to close its ZMQ connections driving the goroutines once it's been stopped. This allows the goroutines to unblock by detecting the EOF error and exiting. --- chain/bitcoind_conn.go | 114 ++++++++++++++++++++++------------------- go.mod | 3 +- go.sum | 5 +- 3 files changed, 67 insertions(+), 55 deletions(-) diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index e89631c..c522a1c 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -3,6 +3,7 @@ package chain import ( "bytes" "fmt" + "io" "net" "sync" "sync/atomic" @@ -33,17 +34,13 @@ type BitcoindConn struct { // client is the RPC client to the bitcoind node. client *rpcclient.Client - // zmqBlockHost is the host listening for ZMQ connections that will be - // responsible for delivering raw transaction events. - zmqBlockHost string + // zmqBlockConn is the ZMQ connection we'll use to read raw block + // events. + zmqBlockConn *gozmq.Conn - // zmqTxHost is the host listening for ZMQ connections that will be - // responsible for delivering raw transaction events. - zmqTxHost string - - // zmqPollInterval is the interval at which we'll attempt to retrieve an - // event from the ZMQ connection. - zmqPollInterval time.Duration + // zmqTxConn is the ZMQ connection we'll use to read raw transaction + // events. + zmqTxConn *gozmq.Conn // rescanClients is the set of active bitcoind rescan clients to which // ZMQ event notfications will be sent to. @@ -55,10 +52,9 @@ type BitcoindConn struct { } // NewBitcoindConn creates a client connection to the node described by the host -// string. The connection is not established immediately, but must be done using -// the Start method. If the remote node does not operate on the same bitcoin -// network as described by the passed chain parameters, the connection will be -// disconnected. +// string. The ZMQ connections are established immediately to ensure liveness. +// If the remote node does not operate on the same bitcoin network as described +// by the passed chain parameters, the connection will be disconnected. func NewBitcoindConn(chainParams *chaincfg.Params, host, user, pass, zmqBlockHost, zmqTxHost string, zmqPollInterval time.Duration) (*BitcoindConn, error) { @@ -78,14 +74,36 @@ func NewBitcoindConn(chainParams *chaincfg.Params, return nil, err } + // Establish two different ZMQ connections to bitcoind to retrieve block + // and transaction event notifications. We'll use two as a separation of + // concern to ensure one type of event isn't dropped from the connection + // queue due to another type of event filling it up. + zmqBlockConn, err := gozmq.Subscribe( + zmqBlockHost, []string{"rawblock"}, zmqPollInterval, + ) + if err != nil { + client.Disconnect() + return nil, fmt.Errorf("unable to subscribe for zmq block "+ + "events: %v", err) + } + + zmqTxConn, err := gozmq.Subscribe( + zmqTxHost, []string{"rawtx"}, zmqPollInterval, + ) + if err != nil { + client.Disconnect() + zmqBlockConn.Close() + return nil, fmt.Errorf("unable to subscribe for zmq tx "+ + "events: %v", err) + } + conn := &BitcoindConn{ - chainParams: chainParams, - client: client, - zmqBlockHost: zmqBlockHost, - zmqTxHost: zmqTxHost, - zmqPollInterval: zmqPollInterval, - rescanClients: make(map[uint64]*BitcoindClient), - quit: make(chan struct{}), + chainParams: chainParams, + client: client, + zmqBlockConn: zmqBlockConn, + zmqTxConn: zmqTxConn, + rescanClients: make(map[uint64]*BitcoindClient), + quit: make(chan struct{}), } return conn, nil @@ -113,31 +131,9 @@ func (c *BitcoindConn) Start() error { c.chainParams.Net, net) } - // Establish two different ZMQ connections to bitcoind to retrieve block - // and transaction event notifications. We'll use two as a separation of - // concern to ensure one type of event isn't dropped from the connection - // queue due to another type of event filling it up. - zmqBlockConn, err := gozmq.Subscribe( - c.zmqBlockHost, []string{"rawblock"}, c.zmqPollInterval, - ) - if err != nil { - c.client.Disconnect() - return fmt.Errorf("unable to subscribe for zmq block events: "+ - "%v", err) - } - - zmqTxConn, err := gozmq.Subscribe( - c.zmqTxHost, []string{"rawtx"}, c.zmqPollInterval, - ) - if err != nil { - c.client.Disconnect() - return fmt.Errorf("unable to subscribe for zmq tx events: %v", - err) - } - c.wg.Add(2) - go c.blockEventHandler(zmqBlockConn) - go c.txEventHandler(zmqTxConn) + go c.blockEventHandler() + go c.txEventHandler() return nil } @@ -155,6 +151,8 @@ func (c *BitcoindConn) Stop() { close(c.quit) c.client.Shutdown() + c.zmqBlockConn.Close() + c.zmqTxConn.Close() c.client.WaitForShutdown() c.wg.Wait() @@ -164,12 +162,11 @@ func (c *BitcoindConn) Stop() { // forwards them along to the current rescan clients. // // NOTE: This must be run as a goroutine. -func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { +func (c *BitcoindConn) blockEventHandler() { defer c.wg.Done() - defer conn.Close() log.Info("Started listening for bitcoind block notifications via ZMQ "+ - "on", c.zmqBlockHost) + "on", c.zmqBlockConn.RemoteAddr()) for { // Before attempting to read from the ZMQ socket, we'll make @@ -181,8 +178,14 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { } // Poll an event from the ZMQ socket. - msgBytes, err := conn.Receive() + msgBytes, err := c.zmqBlockConn.Receive() if err != nil { + // EOF should only be returned if the connection was + // explicitly closed, so we can exit at this point. + if err == io.EOF { + return + } + // It's possible that the connection to the socket // continuously times out, so we'll prevent logging this // error to prevent spamming the logs. @@ -240,12 +243,11 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) { // them along to the current rescan clients. // // NOTE: This must be run as a goroutine. -func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) { +func (c *BitcoindConn) txEventHandler() { defer c.wg.Done() - defer conn.Close() log.Info("Started listening for bitcoind transaction notifications "+ - "via ZMQ on", c.zmqTxHost) + "via ZMQ on", c.zmqTxConn.RemoteAddr()) for { // Before attempting to read from the ZMQ socket, we'll make @@ -257,8 +259,14 @@ func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) { } // Poll an event from the ZMQ socket. - msgBytes, err := conn.Receive() + msgBytes, err := c.zmqTxConn.Receive() if err != nil { + // EOF should only be returned if the connection was + // explicitly closed, so we can exit at this point. + if err == io.EOF { + return + } + // It's possible that the connection to the socket // continuously times out, so we'll prevent logging this // error to prevent spamming the logs. diff --git a/go.mod b/go.mod index 6f83b2c..1145245 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,9 @@ require ( github.com/jrick/logrotate v1.0.0 github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect github.com/kr/pretty v0.1.0 // indirect - github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885 + github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d github.com/lightninglabs/neutrino v0.0.0-20190313035638-e1ad4c33fb18 + go.etcd.io/bbolt v1.3.3 // indirect golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect diff --git a/go.sum b/go.sum index 664ee2e..2410dd9 100644 --- a/go.sum +++ b/go.sum @@ -66,8 +66,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885 h1:fTLuPUkaKIIV0+gA1IxiBDvDxtF8tzpSF6N6NfFGmsU= github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885/go.mod h1:KUh15naRlx/TmUMFS/p4JJrCrE6F7RGF7rsnvuu45E4= +github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE= +github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130 h1:6sZc23+5VbEz2uiHxW12xvS4JYZ3hhgkE5qxzwxaXzg= github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130/go.mod h1:KJq43Fu9ceitbJsSXMILcT4mGDNI/crKmPIkDOZXFyM= github.com/lightninglabs/neutrino v0.0.0-20190213031021-ae4583a89cfb h1:Bwqgn9JXHo7I19lb4zTH2Xb0bfHgNuAJugQE7s00xqA= @@ -90,6 +91,8 @@ go.etcd.io/bbolt v1.3.0 h1:oY10fI923Q5pVCVt1GBTZMn8LHo5M+RCInFpeMnV4QI= go.etcd.io/bbolt v1.3.0/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=