chain: explicitly close connections in BitcoindConn's Stop method

Since it's now possible for gozmq.Conn to block when calling Receive,
BitcoindConn hangs upon being stopped because its goroutines are waiting
for a message to be delivered. To address this, we modify it to close
its ZMQ connections driving the goroutines once it's been stopped. This
allows the goroutines to unblock by detecting the EOF error and exiting.
This commit is contained in:
Wilmer Paulino 2019-07-09 00:41:16 -07:00
parent cd66e82bc8
commit f02166e5de
No known key found for this signature in database
GPG key ID: 6DF57B9F9514972F
3 changed files with 67 additions and 55 deletions

View file

@ -3,6 +3,7 @@ package chain
import (
"bytes"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
@ -33,17 +34,13 @@ type BitcoindConn struct {
// client is the RPC client to the bitcoind node.
client *rpcclient.Client
// zmqBlockHost is the host listening for ZMQ connections that will be
// responsible for delivering raw transaction events.
zmqBlockHost string
// zmqBlockConn is the ZMQ connection we'll use to read raw block
// events.
zmqBlockConn *gozmq.Conn
// zmqTxHost is the host listening for ZMQ connections that will be
// responsible for delivering raw transaction events.
zmqTxHost string
// zmqPollInterval is the interval at which we'll attempt to retrieve an
// event from the ZMQ connection.
zmqPollInterval time.Duration
// zmqTxConn is the ZMQ connection we'll use to read raw transaction
// events.
zmqTxConn *gozmq.Conn
// rescanClients is the set of active bitcoind rescan clients to which
// ZMQ event notfications will be sent to.
@ -55,10 +52,9 @@ type BitcoindConn struct {
}
// NewBitcoindConn creates a client connection to the node described by the host
// string. The connection is not established immediately, but must be done using
// the Start method. If the remote node does not operate on the same bitcoin
// network as described by the passed chain parameters, the connection will be
// disconnected.
// string. The ZMQ connections are established immediately to ensure liveness.
// If the remote node does not operate on the same bitcoin network as described
// by the passed chain parameters, the connection will be disconnected.
func NewBitcoindConn(chainParams *chaincfg.Params,
host, user, pass, zmqBlockHost, zmqTxHost string,
zmqPollInterval time.Duration) (*BitcoindConn, error) {
@ -78,12 +74,34 @@ func NewBitcoindConn(chainParams *chaincfg.Params,
return nil, err
}
// Establish two different ZMQ connections to bitcoind to retrieve block
// and transaction event notifications. We'll use two as a separation of
// concern to ensure one type of event isn't dropped from the connection
// queue due to another type of event filling it up.
zmqBlockConn, err := gozmq.Subscribe(
zmqBlockHost, []string{"rawblock"}, zmqPollInterval,
)
if err != nil {
client.Disconnect()
return nil, fmt.Errorf("unable to subscribe for zmq block "+
"events: %v", err)
}
zmqTxConn, err := gozmq.Subscribe(
zmqTxHost, []string{"rawtx"}, zmqPollInterval,
)
if err != nil {
client.Disconnect()
zmqBlockConn.Close()
return nil, fmt.Errorf("unable to subscribe for zmq tx "+
"events: %v", err)
}
conn := &BitcoindConn{
chainParams: chainParams,
client: client,
zmqBlockHost: zmqBlockHost,
zmqTxHost: zmqTxHost,
zmqPollInterval: zmqPollInterval,
zmqBlockConn: zmqBlockConn,
zmqTxConn: zmqTxConn,
rescanClients: make(map[uint64]*BitcoindClient),
quit: make(chan struct{}),
}
@ -113,31 +131,9 @@ func (c *BitcoindConn) Start() error {
c.chainParams.Net, net)
}
// Establish two different ZMQ connections to bitcoind to retrieve block
// and transaction event notifications. We'll use two as a separation of
// concern to ensure one type of event isn't dropped from the connection
// queue due to another type of event filling it up.
zmqBlockConn, err := gozmq.Subscribe(
c.zmqBlockHost, []string{"rawblock"}, c.zmqPollInterval,
)
if err != nil {
c.client.Disconnect()
return fmt.Errorf("unable to subscribe for zmq block events: "+
"%v", err)
}
zmqTxConn, err := gozmq.Subscribe(
c.zmqTxHost, []string{"rawtx"}, c.zmqPollInterval,
)
if err != nil {
c.client.Disconnect()
return fmt.Errorf("unable to subscribe for zmq tx events: %v",
err)
}
c.wg.Add(2)
go c.blockEventHandler(zmqBlockConn)
go c.txEventHandler(zmqTxConn)
go c.blockEventHandler()
go c.txEventHandler()
return nil
}
@ -155,6 +151,8 @@ func (c *BitcoindConn) Stop() {
close(c.quit)
c.client.Shutdown()
c.zmqBlockConn.Close()
c.zmqTxConn.Close()
c.client.WaitForShutdown()
c.wg.Wait()
@ -164,12 +162,11 @@ func (c *BitcoindConn) Stop() {
// forwards them along to the current rescan clients.
//
// NOTE: This must be run as a goroutine.
func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
func (c *BitcoindConn) blockEventHandler() {
defer c.wg.Done()
defer conn.Close()
log.Info("Started listening for bitcoind block notifications via ZMQ "+
"on", c.zmqBlockHost)
"on", c.zmqBlockConn.RemoteAddr())
for {
// Before attempting to read from the ZMQ socket, we'll make
@ -181,8 +178,14 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
}
// Poll an event from the ZMQ socket.
msgBytes, err := conn.Receive()
msgBytes, err := c.zmqBlockConn.Receive()
if err != nil {
// EOF should only be returned if the connection was
// explicitly closed, so we can exit at this point.
if err == io.EOF {
return
}
// It's possible that the connection to the socket
// continuously times out, so we'll prevent logging this
// error to prevent spamming the logs.
@ -240,12 +243,11 @@ func (c *BitcoindConn) blockEventHandler(conn *gozmq.Conn) {
// them along to the current rescan clients.
//
// NOTE: This must be run as a goroutine.
func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) {
func (c *BitcoindConn) txEventHandler() {
defer c.wg.Done()
defer conn.Close()
log.Info("Started listening for bitcoind transaction notifications "+
"via ZMQ on", c.zmqTxHost)
"via ZMQ on", c.zmqTxConn.RemoteAddr())
for {
// Before attempting to read from the ZMQ socket, we'll make
@ -257,8 +259,14 @@ func (c *BitcoindConn) txEventHandler(conn *gozmq.Conn) {
}
// Poll an event from the ZMQ socket.
msgBytes, err := conn.Receive()
msgBytes, err := c.zmqTxConn.Receive()
if err != nil {
// EOF should only be returned if the connection was
// explicitly closed, so we can exit at this point.
if err == io.EOF {
return
}
// It's possible that the connection to the socket
// continuously times out, so we'll prevent logging this
// error to prevent spamming the logs.

3
go.mod
View file

@ -13,8 +13,9 @@ require (
github.com/jrick/logrotate v1.0.0
github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d
github.com/lightninglabs/neutrino v0.0.0-20190313035638-e1ad4c33fb18
go.etcd.io/bbolt v1.3.3 // indirect
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect

5
go.sum
View file

@ -66,8 +66,9 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885 h1:fTLuPUkaKIIV0+gA1IxiBDvDxtF8tzpSF6N6NfFGmsU=
github.com/lightninglabs/gozmq v0.0.0-20180324010646-462a8a753885/go.mod h1:KUh15naRlx/TmUMFS/p4JJrCrE6F7RGF7rsnvuu45E4=
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE=
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130 h1:6sZc23+5VbEz2uiHxW12xvS4JYZ3hhgkE5qxzwxaXzg=
github.com/lightninglabs/neutrino v0.0.0-20181017011010-4d6069299130/go.mod h1:KJq43Fu9ceitbJsSXMILcT4mGDNI/crKmPIkDOZXFyM=
github.com/lightninglabs/neutrino v0.0.0-20190213031021-ae4583a89cfb h1:Bwqgn9JXHo7I19lb4zTH2Xb0bfHgNuAJugQE7s00xqA=
@ -90,6 +91,8 @@ go.etcd.io/bbolt v1.3.0 h1:oY10fI923Q5pVCVt1GBTZMn8LHo5M+RCInFpeMnV4QI=
go.etcd.io/bbolt v1.3.0/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44 h1:9lP3x0pW80sDI6t1UMSLA4to18W7R7imwAI/sWS9S8Q=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=