From 36ca842905a2c0d97d99b13a9eecabcd7aac98a9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 8 Nov 2019 12:38:28 -0800 Subject: [PATCH 1/3] chain: extract ZMQ command strings into constants --- chain/bitcoind_conn.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index cbf4863..8dafe92 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -16,6 +16,16 @@ import ( "github.com/lightninglabs/gozmq" ) +const ( + // rawBlockZMQCommand is the command used to receive raw block + // notifications from bitcoind through ZMQ. + rawBlockZMQCommand = "rawblock" + + // rawTxZMQCommand is the command used to receive raw transaction + // notifications from bitcoind through ZMQ. + rawTxZMQCommand = "rawtx" +) + // BitcoindConn represents a persistent client connection to a bitcoind node // that listens for events read from a ZMQ connection. type BitcoindConn struct { @@ -79,7 +89,7 @@ func NewBitcoindConn(chainParams *chaincfg.Params, // concern to ensure one type of event isn't dropped from the connection // queue due to another type of event filling it up. zmqBlockConn, err := gozmq.Subscribe( - zmqBlockHost, []string{"rawblock"}, zmqPollInterval, + zmqBlockHost, []string{rawBlockZMQCommand}, zmqPollInterval, ) if err != nil { return nil, fmt.Errorf("unable to subscribe for zmq block "+ @@ -87,7 +97,7 @@ func NewBitcoindConn(chainParams *chaincfg.Params, } zmqTxConn, err := gozmq.Subscribe( - zmqTxHost, []string{"rawtx"}, zmqPollInterval, + zmqTxHost, []string{rawTxZMQCommand}, zmqPollInterval, ) if err != nil { zmqBlockConn.Close() @@ -190,8 +200,8 @@ func (c *BitcoindConn) blockEventHandler() { continue } - log.Errorf("Unable to receive ZMQ rawblock message: %v", - err) + log.Errorf("Unable to receive ZMQ %v message: %v", + rawBlockZMQCommand, err) continue } @@ -200,7 +210,7 @@ func (c *BitcoindConn) blockEventHandler() { // clients. eventType := string(msgBytes[0]) switch eventType { - case "rawblock": + case rawBlockZMQCommand: block := &wire.MsgBlock{} r := bytes.NewReader(msgBytes[1]) if err := block.Deserialize(r); err != nil { @@ -229,8 +239,9 @@ func (c *BitcoindConn) blockEventHandler() { continue } - log.Warnf("Received unexpected event type from "+ - "rawblock subscription: %v", eventType) + log.Warnf("Received unexpected event type from %v "+ + "subscription: %v", rawBlockZMQCommand, + eventType) } } } @@ -271,8 +282,8 @@ func (c *BitcoindConn) txEventHandler() { continue } - log.Errorf("Unable to receive ZMQ rawtx message: %v", - err) + log.Errorf("Unable to receive ZMQ %v message: %v", + rawTxZMQCommand, err) continue } @@ -281,7 +292,7 @@ func (c *BitcoindConn) txEventHandler() { // clients. eventType := string(msgBytes[0]) switch eventType { - case "rawtx": + case rawTxZMQCommand: tx := &wire.MsgTx{} r := bytes.NewReader(msgBytes[1]) if err := tx.Deserialize(r); err != nil { @@ -310,8 +321,8 @@ func (c *BitcoindConn) txEventHandler() { continue } - log.Warnf("Received unexpected event type from rawtx "+ - "subscription: %v", eventType) + log.Warnf("Received unexpected event type from %v "+ + "subscription: %v", rawTxZMQCommand, eventType) } } } From 58ac163d48b72095670321b789e948294467eb09 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 8 Nov 2019 12:39:00 -0800 Subject: [PATCH 2/3] chain: trace log timed out ZMQ connections --- chain/bitcoind_conn.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 8dafe92..01f01f9 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -197,6 +197,8 @@ func (c *BitcoindConn) blockEventHandler() { // error to prevent spamming the logs. netErr, ok := err.(net.Error) if ok && netErr.Timeout() { + log.Trace("Re-establishing timed out ZMQ " + + "block connection") continue } @@ -279,6 +281,8 @@ func (c *BitcoindConn) txEventHandler() { // error to prevent spamming the logs. netErr, ok := err.(net.Error) if ok && netErr.Timeout() { + log.Trace("Re-establishing timed out ZMQ " + + "transaction connection") continue } From d7d2f14d61f3b9569a46243a18a6ed7ea2192d0e Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 8 Nov 2019 13:21:12 -0800 Subject: [PATCH 3/3] build+chain: reuse buffers when reading ZMQ messages from bitcoind `bitcoind` notifies transactions once they're accepted into the mempool and once they're confirmed in a block. Previously, reading a message from ZMQ would allocate a buffer with the size of the message. This can cause nodes to perform a large number of allocations within a small amount periodically (3000 300B allocations every 10 mins on average), which can cause a lot of GC pressure on lower resourced nodes. To remedy this, we introduce two static buffers, one for blocks and another for transactions, that will be reused for every message read. Each is constrained by its maximum expected size. --- chain/bitcoind_conn.go | 60 +++++++++++++++++++++++++++++++++++++----- go.mod | 2 +- go.sum | 4 +-- 3 files changed, 57 insertions(+), 9 deletions(-) diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index 01f01f9..9029527 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -24,6 +24,18 @@ const ( // rawTxZMQCommand is the command used to receive raw transaction // notifications from bitcoind through ZMQ. rawTxZMQCommand = "rawtx" + + // maxRawBlockSize is the maximum size in bytes for a raw block received + // from bitcoind through ZMQ. + maxRawBlockSize = 4e6 + + // maxRawTxSize is the maximum size in bytes for a raw transaction + // received from bitcoind through ZMQ. + maxRawTxSize = maxRawBlockSize + + // seqNumLen is the length of the sequence number of a message sent from + // bitcoind through ZMQ. + seqNumLen = 4 ) // BitcoindConn represents a persistent client connection to a bitcoind node @@ -174,6 +186,20 @@ func (c *BitcoindConn) blockEventHandler() { log.Info("Started listening for bitcoind block notifications via ZMQ "+ "on", c.zmqBlockConn.RemoteAddr()) + // Set up the buffers we expect our messages to consume. ZMQ + // messages from bitcoind include three parts: the command, the + // data, and the sequence number. + // + // We'll allocate a fixed data slice that we'll reuse when reading + // blocks from bitcoind through ZMQ. There's no need to recycle this + // slice (zero out) after using it, as further reads will overwrite the + // slice and we'll only be deserializing the bytes needed. + var ( + command [len(rawBlockZMQCommand)]byte + seqNum [seqNumLen]byte + data = make([]byte, maxRawBlockSize) + ) + for { // Before attempting to read from the ZMQ socket, we'll make // sure to check if we've been requested to shut down. @@ -184,7 +210,11 @@ func (c *BitcoindConn) blockEventHandler() { } // Poll an event from the ZMQ socket. - msgBytes, err := c.zmqBlockConn.Receive() + var ( + bufs = [][]byte{command[:], data, seqNum[:]} + err error + ) + bufs, err = c.zmqBlockConn.Receive(bufs) if err != nil { // EOF should only be returned if the connection was // explicitly closed, so we can exit at this point. @@ -210,11 +240,11 @@ func (c *BitcoindConn) blockEventHandler() { // We have an event! We'll now ensure it is a block event, // deserialize it, and report it to the different rescan // clients. - eventType := string(msgBytes[0]) + eventType := string(bufs[0]) switch eventType { case rawBlockZMQCommand: block := &wire.MsgBlock{} - r := bytes.NewReader(msgBytes[1]) + r := bytes.NewReader(bufs[1]) if err := block.Deserialize(r); err != nil { log.Errorf("Unable to deserialize block: %v", err) @@ -258,6 +288,20 @@ func (c *BitcoindConn) txEventHandler() { log.Info("Started listening for bitcoind transaction notifications "+ "via ZMQ on", c.zmqTxConn.RemoteAddr()) + // Set up the buffers we expect our messages to consume. ZMQ + // messages from bitcoind include three parts: the command, the + // data, and the sequence number. + // + // We'll allocate a fixed data slice that we'll reuse when reading + // transactions from bitcoind through ZMQ. There's no need to recycle + // this slice (zero out) after using it, as further reads will overwrite + // the slice and we'll only be deserializing the bytes needed. + var ( + command [len(rawTxZMQCommand)]byte + seqNum [seqNumLen]byte + data = make([]byte, maxRawTxSize) + ) + for { // Before attempting to read from the ZMQ socket, we'll make // sure to check if we've been requested to shut down. @@ -268,7 +312,11 @@ func (c *BitcoindConn) txEventHandler() { } // Poll an event from the ZMQ socket. - msgBytes, err := c.zmqTxConn.Receive() + var ( + bufs = [][]byte{command[:], data, seqNum[:]} + err error + ) + bufs, err = c.zmqTxConn.Receive(bufs) if err != nil { // EOF should only be returned if the connection was // explicitly closed, so we can exit at this point. @@ -294,11 +342,11 @@ func (c *BitcoindConn) txEventHandler() { // We have an event! We'll now ensure it is a transaction event, // deserialize it, and report it to the different rescan // clients. - eventType := string(msgBytes[0]) + eventType := string(bufs[0]) switch eventType { case rawTxZMQCommand: tx := &wire.MsgTx{} - r := bytes.NewReader(msgBytes[1]) + r := bytes.NewReader(bufs[1]) if err := tx.Deserialize(r); err != nil { log.Errorf("Unable to deserialize "+ "transaction: %v", err) diff --git a/go.mod b/go.mod index 4b87112..57fc23f 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jessevdk/go-flags v1.4.0 github.com/jrick/logrotate v1.0.0 github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect - github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d + github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf github.com/lightninglabs/neutrino v0.10.0 golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 diff --git a/go.sum b/go.sum index fcc2ae1..5d112ac 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE= -github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= +github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI/f/O0Avg7t8sqkPo78HFzjmeYFl6DPnc= +github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= github.com/lightninglabs/neutrino v0.10.0 h1:yWVy2cOCCXbKFdpYCE9vD1fWRJDd9FtGXhUws4l9RkU= github.com/lightninglabs/neutrino v0.10.0/go.mod h1:C3KhCMk1Mcx3j8v0qRVWM1Ow6rIJSvSPnUAq00ZNAfk= github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0=