Merge pull request #660 from wpaulino/zmq-reuse-msg-buffers

build+chain: reuse buffers when reading ZMQ messages from bitcoind
This commit is contained in:
Olaoluwa Osuntokun 2019-11-12 18:45:29 -08:00 committed by GitHub
commit d14cee90e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 21 deletions

View file

@ -16,6 +16,28 @@ import (
"github.com/lightninglabs/gozmq" "github.com/lightninglabs/gozmq"
) )
const (
// rawBlockZMQCommand is the command used to receive raw block
// notifications from bitcoind through ZMQ.
rawBlockZMQCommand = "rawblock"
// rawTxZMQCommand is the command used to receive raw transaction
// notifications from bitcoind through ZMQ.
rawTxZMQCommand = "rawtx"
// maxRawBlockSize is the maximum size in bytes for a raw block received
// from bitcoind through ZMQ.
maxRawBlockSize = 4e6
// maxRawTxSize is the maximum size in bytes for a raw transaction
// received from bitcoind through ZMQ.
maxRawTxSize = maxRawBlockSize
// seqNumLen is the length of the sequence number of a message sent from
// bitcoind through ZMQ.
seqNumLen = 4
)
// BitcoindConn represents a persistent client connection to a bitcoind node // BitcoindConn represents a persistent client connection to a bitcoind node
// that listens for events read from a ZMQ connection. // that listens for events read from a ZMQ connection.
type BitcoindConn struct { type BitcoindConn struct {
@ -79,7 +101,7 @@ func NewBitcoindConn(chainParams *chaincfg.Params,
// concern to ensure one type of event isn't dropped from the connection // concern to ensure one type of event isn't dropped from the connection
// queue due to another type of event filling it up. // queue due to another type of event filling it up.
zmqBlockConn, err := gozmq.Subscribe( zmqBlockConn, err := gozmq.Subscribe(
zmqBlockHost, []string{"rawblock"}, zmqPollInterval, zmqBlockHost, []string{rawBlockZMQCommand}, zmqPollInterval,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to subscribe for zmq block "+ return nil, fmt.Errorf("unable to subscribe for zmq block "+
@ -87,7 +109,7 @@ func NewBitcoindConn(chainParams *chaincfg.Params,
} }
zmqTxConn, err := gozmq.Subscribe( zmqTxConn, err := gozmq.Subscribe(
zmqTxHost, []string{"rawtx"}, zmqPollInterval, zmqTxHost, []string{rawTxZMQCommand}, zmqPollInterval,
) )
if err != nil { if err != nil {
zmqBlockConn.Close() zmqBlockConn.Close()
@ -164,6 +186,20 @@ func (c *BitcoindConn) blockEventHandler() {
log.Info("Started listening for bitcoind block notifications via ZMQ "+ log.Info("Started listening for bitcoind block notifications via ZMQ "+
"on", c.zmqBlockConn.RemoteAddr()) "on", c.zmqBlockConn.RemoteAddr())
// Set up the buffers we expect our messages to consume. ZMQ
// messages from bitcoind include three parts: the command, the
// data, and the sequence number.
//
// We'll allocate a fixed data slice that we'll reuse when reading
// blocks from bitcoind through ZMQ. There's no need to recycle this
// slice (zero out) after using it, as further reads will overwrite the
// slice and we'll only be deserializing the bytes needed.
var (
command [len(rawBlockZMQCommand)]byte
seqNum [seqNumLen]byte
data = make([]byte, maxRawBlockSize)
)
for { for {
// Before attempting to read from the ZMQ socket, we'll make // Before attempting to read from the ZMQ socket, we'll make
// sure to check if we've been requested to shut down. // sure to check if we've been requested to shut down.
@ -174,7 +210,11 @@ func (c *BitcoindConn) blockEventHandler() {
} }
// Poll an event from the ZMQ socket. // Poll an event from the ZMQ socket.
msgBytes, err := c.zmqBlockConn.Receive() var (
bufs = [][]byte{command[:], data, seqNum[:]}
err error
)
bufs, err = c.zmqBlockConn.Receive(bufs)
if err != nil { if err != nil {
// EOF should only be returned if the connection was // EOF should only be returned if the connection was
// explicitly closed, so we can exit at this point. // explicitly closed, so we can exit at this point.
@ -187,22 +227,24 @@ func (c *BitcoindConn) blockEventHandler() {
// error to prevent spamming the logs. // error to prevent spamming the logs.
netErr, ok := err.(net.Error) netErr, ok := err.(net.Error)
if ok && netErr.Timeout() { if ok && netErr.Timeout() {
log.Trace("Re-establishing timed out ZMQ " +
"block connection")
continue continue
} }
log.Errorf("Unable to receive ZMQ rawblock message: %v", log.Errorf("Unable to receive ZMQ %v message: %v",
err) rawBlockZMQCommand, err)
continue continue
} }
// We have an event! We'll now ensure it is a block event, // We have an event! We'll now ensure it is a block event,
// deserialize it, and report it to the different rescan // deserialize it, and report it to the different rescan
// clients. // clients.
eventType := string(msgBytes[0]) eventType := string(bufs[0])
switch eventType { switch eventType {
case "rawblock": case rawBlockZMQCommand:
block := &wire.MsgBlock{} block := &wire.MsgBlock{}
r := bytes.NewReader(msgBytes[1]) r := bytes.NewReader(bufs[1])
if err := block.Deserialize(r); err != nil { if err := block.Deserialize(r); err != nil {
log.Errorf("Unable to deserialize block: %v", log.Errorf("Unable to deserialize block: %v",
err) err)
@ -229,8 +271,9 @@ func (c *BitcoindConn) blockEventHandler() {
continue continue
} }
log.Warnf("Received unexpected event type from "+ log.Warnf("Received unexpected event type from %v "+
"rawblock subscription: %v", eventType) "subscription: %v", rawBlockZMQCommand,
eventType)
} }
} }
} }
@ -245,6 +288,20 @@ func (c *BitcoindConn) txEventHandler() {
log.Info("Started listening for bitcoind transaction notifications "+ log.Info("Started listening for bitcoind transaction notifications "+
"via ZMQ on", c.zmqTxConn.RemoteAddr()) "via ZMQ on", c.zmqTxConn.RemoteAddr())
// Set up the buffers we expect our messages to consume. ZMQ
// messages from bitcoind include three parts: the command, the
// data, and the sequence number.
//
// We'll allocate a fixed data slice that we'll reuse when reading
// transactions from bitcoind through ZMQ. There's no need to recycle
// this slice (zero out) after using it, as further reads will overwrite
// the slice and we'll only be deserializing the bytes needed.
var (
command [len(rawTxZMQCommand)]byte
seqNum [seqNumLen]byte
data = make([]byte, maxRawTxSize)
)
for { for {
// Before attempting to read from the ZMQ socket, we'll make // Before attempting to read from the ZMQ socket, we'll make
// sure to check if we've been requested to shut down. // sure to check if we've been requested to shut down.
@ -255,7 +312,11 @@ func (c *BitcoindConn) txEventHandler() {
} }
// Poll an event from the ZMQ socket. // Poll an event from the ZMQ socket.
msgBytes, err := c.zmqTxConn.Receive() var (
bufs = [][]byte{command[:], data, seqNum[:]}
err error
)
bufs, err = c.zmqTxConn.Receive(bufs)
if err != nil { if err != nil {
// EOF should only be returned if the connection was // EOF should only be returned if the connection was
// explicitly closed, so we can exit at this point. // explicitly closed, so we can exit at this point.
@ -268,22 +329,24 @@ func (c *BitcoindConn) txEventHandler() {
// error to prevent spamming the logs. // error to prevent spamming the logs.
netErr, ok := err.(net.Error) netErr, ok := err.(net.Error)
if ok && netErr.Timeout() { if ok && netErr.Timeout() {
log.Trace("Re-establishing timed out ZMQ " +
"transaction connection")
continue continue
} }
log.Errorf("Unable to receive ZMQ rawtx message: %v", log.Errorf("Unable to receive ZMQ %v message: %v",
err) rawTxZMQCommand, err)
continue continue
} }
// We have an event! We'll now ensure it is a transaction event, // We have an event! We'll now ensure it is a transaction event,
// deserialize it, and report it to the different rescan // deserialize it, and report it to the different rescan
// clients. // clients.
eventType := string(msgBytes[0]) eventType := string(bufs[0])
switch eventType { switch eventType {
case "rawtx": case rawTxZMQCommand:
tx := &wire.MsgTx{} tx := &wire.MsgTx{}
r := bytes.NewReader(msgBytes[1]) r := bytes.NewReader(bufs[1])
if err := tx.Deserialize(r); err != nil { if err := tx.Deserialize(r); err != nil {
log.Errorf("Unable to deserialize "+ log.Errorf("Unable to deserialize "+
"transaction: %v", err) "transaction: %v", err)
@ -310,8 +373,8 @@ func (c *BitcoindConn) txEventHandler() {
continue continue
} }
log.Warnf("Received unexpected event type from rawtx "+ log.Warnf("Received unexpected event type from %v "+
"subscription: %v", eventType) "subscription: %v", rawTxZMQCommand, eventType)
} }
} }
} }

2
go.mod
View file

@ -14,7 +14,7 @@ require (
github.com/jessevdk/go-flags v1.4.0 github.com/jessevdk/go-flags v1.4.0
github.com/jrick/logrotate v1.0.0 github.com/jrick/logrotate v1.0.0
github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf
github.com/lightninglabs/neutrino v0.10.0 github.com/lightninglabs/neutrino v0.10.0
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 golang.org/x/net v0.0.0-20190206173232-65e2d4e15006

4
go.sum
View file

@ -53,8 +53,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE= github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI/f/O0Avg7t8sqkPo78HFzjmeYFl6DPnc=
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk= github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
github.com/lightninglabs/neutrino v0.10.0 h1:yWVy2cOCCXbKFdpYCE9vD1fWRJDd9FtGXhUws4l9RkU= github.com/lightninglabs/neutrino v0.10.0 h1:yWVy2cOCCXbKFdpYCE9vD1fWRJDd9FtGXhUws4l9RkU=
github.com/lightninglabs/neutrino v0.10.0/go.mod h1:C3KhCMk1Mcx3j8v0qRVWM1Ow6rIJSvSPnUAq00ZNAfk= github.com/lightninglabs/neutrino v0.10.0/go.mod h1:C3KhCMk1Mcx3j8v0qRVWM1Ow6rIJSvSPnUAq00ZNAfk=
github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0= github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0=