diff --git a/chain/bitcoind_conn.go b/chain/bitcoind_conn.go index cbf4863..9029527 100644 --- a/chain/bitcoind_conn.go +++ b/chain/bitcoind_conn.go @@ -16,6 +16,28 @@ import ( "github.com/lightninglabs/gozmq" ) +const ( + // rawBlockZMQCommand is the command used to receive raw block + // notifications from bitcoind through ZMQ. + rawBlockZMQCommand = "rawblock" + + // rawTxZMQCommand is the command used to receive raw transaction + // notifications from bitcoind through ZMQ. + rawTxZMQCommand = "rawtx" + + // maxRawBlockSize is the maximum size in bytes for a raw block received + // from bitcoind through ZMQ. + maxRawBlockSize = 4e6 + + // maxRawTxSize is the maximum size in bytes for a raw transaction + // received from bitcoind through ZMQ. + maxRawTxSize = maxRawBlockSize + + // seqNumLen is the length of the sequence number of a message sent from + // bitcoind through ZMQ. + seqNumLen = 4 +) + // BitcoindConn represents a persistent client connection to a bitcoind node // that listens for events read from a ZMQ connection. type BitcoindConn struct { @@ -79,7 +101,7 @@ func NewBitcoindConn(chainParams *chaincfg.Params, // concern to ensure one type of event isn't dropped from the connection // queue due to another type of event filling it up. zmqBlockConn, err := gozmq.Subscribe( - zmqBlockHost, []string{"rawblock"}, zmqPollInterval, + zmqBlockHost, []string{rawBlockZMQCommand}, zmqPollInterval, ) if err != nil { return nil, fmt.Errorf("unable to subscribe for zmq block "+ @@ -87,7 +109,7 @@ func NewBitcoindConn(chainParams *chaincfg.Params, } zmqTxConn, err := gozmq.Subscribe( - zmqTxHost, []string{"rawtx"}, zmqPollInterval, + zmqTxHost, []string{rawTxZMQCommand}, zmqPollInterval, ) if err != nil { zmqBlockConn.Close() @@ -164,6 +186,20 @@ func (c *BitcoindConn) blockEventHandler() { log.Info("Started listening for bitcoind block notifications via ZMQ "+ "on", c.zmqBlockConn.RemoteAddr()) + // Set up the buffers we expect our messages to consume. ZMQ + // messages from bitcoind include three parts: the command, the + // data, and the sequence number. + // + // We'll allocate a fixed data slice that we'll reuse when reading + // blocks from bitcoind through ZMQ. There's no need to recycle this + // slice (zero out) after using it, as further reads will overwrite the + // slice and we'll only be deserializing the bytes needed. + var ( + command [len(rawBlockZMQCommand)]byte + seqNum [seqNumLen]byte + data = make([]byte, maxRawBlockSize) + ) + for { // Before attempting to read from the ZMQ socket, we'll make // sure to check if we've been requested to shut down. @@ -174,7 +210,11 @@ func (c *BitcoindConn) blockEventHandler() { } // Poll an event from the ZMQ socket. - msgBytes, err := c.zmqBlockConn.Receive() + var ( + bufs = [][]byte{command[:], data, seqNum[:]} + err error + ) + bufs, err = c.zmqBlockConn.Receive(bufs) if err != nil { // EOF should only be returned if the connection was // explicitly closed, so we can exit at this point. @@ -187,22 +227,24 @@ func (c *BitcoindConn) blockEventHandler() { // error to prevent spamming the logs. netErr, ok := err.(net.Error) if ok && netErr.Timeout() { + log.Trace("Re-establishing timed out ZMQ " + + "block connection") continue } - log.Errorf("Unable to receive ZMQ rawblock message: %v", - err) + log.Errorf("Unable to receive ZMQ %v message: %v", + rawBlockZMQCommand, err) continue } // We have an event! We'll now ensure it is a block event, // deserialize it, and report it to the different rescan // clients. - eventType := string(msgBytes[0]) + eventType := string(bufs[0]) switch eventType { - case "rawblock": + case rawBlockZMQCommand: block := &wire.MsgBlock{} - r := bytes.NewReader(msgBytes[1]) + r := bytes.NewReader(bufs[1]) if err := block.Deserialize(r); err != nil { log.Errorf("Unable to deserialize block: %v", err) @@ -229,8 +271,9 @@ func (c *BitcoindConn) blockEventHandler() { continue } - log.Warnf("Received unexpected event type from "+ - "rawblock subscription: %v", eventType) + log.Warnf("Received unexpected event type from %v "+ + "subscription: %v", rawBlockZMQCommand, + eventType) } } } @@ -245,6 +288,20 @@ func (c *BitcoindConn) txEventHandler() { log.Info("Started listening for bitcoind transaction notifications "+ "via ZMQ on", c.zmqTxConn.RemoteAddr()) + // Set up the buffers we expect our messages to consume. ZMQ + // messages from bitcoind include three parts: the command, the + // data, and the sequence number. + // + // We'll allocate a fixed data slice that we'll reuse when reading + // transactions from bitcoind through ZMQ. There's no need to recycle + // this slice (zero out) after using it, as further reads will overwrite + // the slice and we'll only be deserializing the bytes needed. + var ( + command [len(rawTxZMQCommand)]byte + seqNum [seqNumLen]byte + data = make([]byte, maxRawTxSize) + ) + for { // Before attempting to read from the ZMQ socket, we'll make // sure to check if we've been requested to shut down. @@ -255,7 +312,11 @@ func (c *BitcoindConn) txEventHandler() { } // Poll an event from the ZMQ socket. - msgBytes, err := c.zmqTxConn.Receive() + var ( + bufs = [][]byte{command[:], data, seqNum[:]} + err error + ) + bufs, err = c.zmqTxConn.Receive(bufs) if err != nil { // EOF should only be returned if the connection was // explicitly closed, so we can exit at this point. @@ -268,22 +329,24 @@ func (c *BitcoindConn) txEventHandler() { // error to prevent spamming the logs. netErr, ok := err.(net.Error) if ok && netErr.Timeout() { + log.Trace("Re-establishing timed out ZMQ " + + "transaction connection") continue } - log.Errorf("Unable to receive ZMQ rawtx message: %v", - err) + log.Errorf("Unable to receive ZMQ %v message: %v", + rawTxZMQCommand, err) continue } // We have an event! We'll now ensure it is a transaction event, // deserialize it, and report it to the different rescan // clients. - eventType := string(msgBytes[0]) + eventType := string(bufs[0]) switch eventType { - case "rawtx": + case rawTxZMQCommand: tx := &wire.MsgTx{} - r := bytes.NewReader(msgBytes[1]) + r := bytes.NewReader(bufs[1]) if err := tx.Deserialize(r); err != nil { log.Errorf("Unable to deserialize "+ "transaction: %v", err) @@ -310,8 +373,8 @@ func (c *BitcoindConn) txEventHandler() { continue } - log.Warnf("Received unexpected event type from rawtx "+ - "subscription: %v", eventType) + log.Warnf("Received unexpected event type from %v "+ + "subscription: %v", rawTxZMQCommand, eventType) } } } diff --git a/go.mod b/go.mod index 4b87112..57fc23f 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jessevdk/go-flags v1.4.0 github.com/jrick/logrotate v1.0.0 github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect - github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d + github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf github.com/lightninglabs/neutrino v0.10.0 golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 diff --git a/go.sum b/go.sum index fcc2ae1..5d112ac 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE= -github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= +github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI/f/O0Avg7t8sqkPo78HFzjmeYFl6DPnc= +github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= github.com/lightninglabs/neutrino v0.10.0 h1:yWVy2cOCCXbKFdpYCE9vD1fWRJDd9FtGXhUws4l9RkU= github.com/lightninglabs/neutrino v0.10.0/go.mod h1:C3KhCMk1Mcx3j8v0qRVWM1Ow6rIJSvSPnUAq00ZNAfk= github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0=