build+chain: reuse buffers when reading ZMQ messages from bitcoind
`bitcoind` notifies transactions once they're accepted into the mempool and once they're confirmed in a block. Previously, reading a message from ZMQ would allocate a buffer with the size of the message. This can cause nodes to perform a large number of allocations within a small amount periodically (3000 300B allocations every 10 mins on average), which can cause a lot of GC pressure on lower resourced nodes. To remedy this, we introduce two static buffers, one for blocks and another for transactions, that will be reused for every message read. Each is constrained by its maximum expected size.
This commit is contained in:
parent
58ac163d48
commit
d7d2f14d61
3 changed files with 57 additions and 9 deletions
|
@ -24,6 +24,18 @@ const (
|
|||
// rawTxZMQCommand is the command used to receive raw transaction
|
||||
// notifications from bitcoind through ZMQ.
|
||||
rawTxZMQCommand = "rawtx"
|
||||
|
||||
// maxRawBlockSize is the maximum size in bytes for a raw block received
|
||||
// from bitcoind through ZMQ.
|
||||
maxRawBlockSize = 4e6
|
||||
|
||||
// maxRawTxSize is the maximum size in bytes for a raw transaction
|
||||
// received from bitcoind through ZMQ.
|
||||
maxRawTxSize = maxRawBlockSize
|
||||
|
||||
// seqNumLen is the length of the sequence number of a message sent from
|
||||
// bitcoind through ZMQ.
|
||||
seqNumLen = 4
|
||||
)
|
||||
|
||||
// BitcoindConn represents a persistent client connection to a bitcoind node
|
||||
|
@ -174,6 +186,20 @@ func (c *BitcoindConn) blockEventHandler() {
|
|||
log.Info("Started listening for bitcoind block notifications via ZMQ "+
|
||||
"on", c.zmqBlockConn.RemoteAddr())
|
||||
|
||||
// Set up the buffers we expect our messages to consume. ZMQ
|
||||
// messages from bitcoind include three parts: the command, the
|
||||
// data, and the sequence number.
|
||||
//
|
||||
// We'll allocate a fixed data slice that we'll reuse when reading
|
||||
// blocks from bitcoind through ZMQ. There's no need to recycle this
|
||||
// slice (zero out) after using it, as further reads will overwrite the
|
||||
// slice and we'll only be deserializing the bytes needed.
|
||||
var (
|
||||
command [len(rawBlockZMQCommand)]byte
|
||||
seqNum [seqNumLen]byte
|
||||
data = make([]byte, maxRawBlockSize)
|
||||
)
|
||||
|
||||
for {
|
||||
// Before attempting to read from the ZMQ socket, we'll make
|
||||
// sure to check if we've been requested to shut down.
|
||||
|
@ -184,7 +210,11 @@ func (c *BitcoindConn) blockEventHandler() {
|
|||
}
|
||||
|
||||
// Poll an event from the ZMQ socket.
|
||||
msgBytes, err := c.zmqBlockConn.Receive()
|
||||
var (
|
||||
bufs = [][]byte{command[:], data, seqNum[:]}
|
||||
err error
|
||||
)
|
||||
bufs, err = c.zmqBlockConn.Receive(bufs)
|
||||
if err != nil {
|
||||
// EOF should only be returned if the connection was
|
||||
// explicitly closed, so we can exit at this point.
|
||||
|
@ -210,11 +240,11 @@ func (c *BitcoindConn) blockEventHandler() {
|
|||
// We have an event! We'll now ensure it is a block event,
|
||||
// deserialize it, and report it to the different rescan
|
||||
// clients.
|
||||
eventType := string(msgBytes[0])
|
||||
eventType := string(bufs[0])
|
||||
switch eventType {
|
||||
case rawBlockZMQCommand:
|
||||
block := &wire.MsgBlock{}
|
||||
r := bytes.NewReader(msgBytes[1])
|
||||
r := bytes.NewReader(bufs[1])
|
||||
if err := block.Deserialize(r); err != nil {
|
||||
log.Errorf("Unable to deserialize block: %v",
|
||||
err)
|
||||
|
@ -258,6 +288,20 @@ func (c *BitcoindConn) txEventHandler() {
|
|||
log.Info("Started listening for bitcoind transaction notifications "+
|
||||
"via ZMQ on", c.zmqTxConn.RemoteAddr())
|
||||
|
||||
// Set up the buffers we expect our messages to consume. ZMQ
|
||||
// messages from bitcoind include three parts: the command, the
|
||||
// data, and the sequence number.
|
||||
//
|
||||
// We'll allocate a fixed data slice that we'll reuse when reading
|
||||
// transactions from bitcoind through ZMQ. There's no need to recycle
|
||||
// this slice (zero out) after using it, as further reads will overwrite
|
||||
// the slice and we'll only be deserializing the bytes needed.
|
||||
var (
|
||||
command [len(rawTxZMQCommand)]byte
|
||||
seqNum [seqNumLen]byte
|
||||
data = make([]byte, maxRawTxSize)
|
||||
)
|
||||
|
||||
for {
|
||||
// Before attempting to read from the ZMQ socket, we'll make
|
||||
// sure to check if we've been requested to shut down.
|
||||
|
@ -268,7 +312,11 @@ func (c *BitcoindConn) txEventHandler() {
|
|||
}
|
||||
|
||||
// Poll an event from the ZMQ socket.
|
||||
msgBytes, err := c.zmqTxConn.Receive()
|
||||
var (
|
||||
bufs = [][]byte{command[:], data, seqNum[:]}
|
||||
err error
|
||||
)
|
||||
bufs, err = c.zmqTxConn.Receive(bufs)
|
||||
if err != nil {
|
||||
// EOF should only be returned if the connection was
|
||||
// explicitly closed, so we can exit at this point.
|
||||
|
@ -294,11 +342,11 @@ func (c *BitcoindConn) txEventHandler() {
|
|||
// We have an event! We'll now ensure it is a transaction event,
|
||||
// deserialize it, and report it to the different rescan
|
||||
// clients.
|
||||
eventType := string(msgBytes[0])
|
||||
eventType := string(bufs[0])
|
||||
switch eventType {
|
||||
case rawTxZMQCommand:
|
||||
tx := &wire.MsgTx{}
|
||||
r := bytes.NewReader(msgBytes[1])
|
||||
r := bytes.NewReader(bufs[1])
|
||||
if err := tx.Deserialize(r); err != nil {
|
||||
log.Errorf("Unable to deserialize "+
|
||||
"transaction: %v", err)
|
||||
|
|
2
go.mod
2
go.mod
|
@ -14,7 +14,7 @@ require (
|
|||
github.com/jessevdk/go-flags v1.4.0
|
||||
github.com/jrick/logrotate v1.0.0
|
||||
github.com/kkdai/bstream v0.0.0-20181106074824-b3251f7901ec // indirect
|
||||
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d
|
||||
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf
|
||||
github.com/lightninglabs/neutrino v0.10.0
|
||||
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67
|
||||
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006
|
||||
|
|
4
go.sum
4
go.sum
|
@ -53,8 +53,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
|
|||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d h1:tt8hwvxl6fksSfchjBGaWu+pnWJQfG1OWiCM20qOSAE=
|
||||
github.com/lightninglabs/gozmq v0.0.0-20190710231225-cea2a031735d/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
|
||||
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf h1:HZKvJUHlcXI/f/O0Avg7t8sqkPo78HFzjmeYFl6DPnc=
|
||||
github.com/lightninglabs/gozmq v0.0.0-20191113021534-d20a764486bf/go.mod h1:vxmQPeIQxPf6Jf9rM8R+B4rKBqLA2AjttNxkFBL2Plk=
|
||||
github.com/lightninglabs/neutrino v0.10.0 h1:yWVy2cOCCXbKFdpYCE9vD1fWRJDd9FtGXhUws4l9RkU=
|
||||
github.com/lightninglabs/neutrino v0.10.0/go.mod h1:C3KhCMk1Mcx3j8v0qRVWM1Ow6rIJSvSPnUAq00ZNAfk=
|
||||
github.com/lightningnetwork/lnd/queue v1.0.1 h1:jzJKcTy3Nj5lQrooJ3aaw9Lau3I0IwvQR5sqtjdv2R0=
|
||||
|
|
Loading…
Add table
Reference in a new issue