ZMQ: add options to configure outbound message high water mark, aka SNDHWM

This commit is contained in:
mruddy 2018-08-24 20:42:03 -04:00
parent d387507aec
commit a4edb168b6
11 changed files with 59 additions and 14 deletions

View file

@ -42,6 +42,7 @@ class ZMQHandler():
self.zmqContext = zmq.asyncio.Context() self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")

View file

@ -46,6 +46,7 @@ class ZMQHandler():
self.zmqContext = zmq.asyncio.Context() self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")

View file

@ -66,10 +66,21 @@ Currently, the following notifications are supported:
The socket type is PUB and the address must be a valid ZeroMQ socket The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification. address. The same address can be used in more than one notification.
The option to set the PUB socket's outbound message high water mark
(SNDHWM) may be set individually for each notification:
-zmqpubhashtxhwm=n
-zmqpubhashblockhwm=n
-zmqpubrawblockhwm=n
-zmqpubrawtxhwm=n
The high water mark value must be an integer greater than or equal to 0.
For instance: For instance:
$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \ $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \
-zmqpubhashtxhwm=10000
Each PUB notification has a topic and body, where the header Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the corresponds to the notification type. For instance, for the

View file

@ -62,6 +62,7 @@
#include <openssl/crypto.h> #include <openssl/crypto.h>
#if ENABLE_ZMQ #if ENABLE_ZMQ
#include <zmq/zmqabstractnotifier.h>
#include <zmq/zmqnotificationinterface.h> #include <zmq/zmqnotificationinterface.h>
#include <zmq/zmqrpc.h> #include <zmq/zmqrpc.h>
#endif #endif
@ -418,11 +419,19 @@ void SetupServerArgs()
gArgs.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
#else #else
hidden_args.emplace_back("-zmqpubhashblock=<address>"); hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>"); hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>"); hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>"); hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
#endif #endif
gArgs.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), true, OptionsCategory::DEBUG_TEST); gArgs.AddArg("-checkblocks=<n>", strprintf("How many blocks to check at startup (default: %u, 0 = all)", DEFAULT_CHECKBLOCKS), true, OptionsCategory::DEBUG_TEST);

View file

@ -5,6 +5,7 @@
#include <zmq/zmqabstractnotifier.h> #include <zmq/zmqabstractnotifier.h>
#include <util.h> #include <util.h>
const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;
CZMQAbstractNotifier::~CZMQAbstractNotifier() CZMQAbstractNotifier::~CZMQAbstractNotifier()
{ {

View file

@ -15,7 +15,9 @@ typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
class CZMQAbstractNotifier class CZMQAbstractNotifier
{ {
public: public:
CZMQAbstractNotifier() : psocket(nullptr) { } static const int DEFAULT_ZMQ_SNDHWM {1000};
CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { }
virtual ~CZMQAbstractNotifier(); virtual ~CZMQAbstractNotifier();
template <typename T> template <typename T>
@ -28,6 +30,12 @@ public:
void SetType(const std::string &t) { type = t; } void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; } std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; } void SetAddress(const std::string &a) { address = a; }
int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; }
void SetOutboundMessageHighWaterMark(const int sndhwm) {
if (sndhwm >= 0) {
outbound_message_high_water_mark = sndhwm;
}
}
virtual bool Initialize(void *pcontext) = 0; virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0; virtual void Shutdown() = 0;
@ -39,6 +47,7 @@ protected:
void *psocket; void *psocket;
std::string type; std::string type;
std::string address; std::string address;
int outbound_message_high_water_mark; // aka SNDHWM
}; };
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H

View file

@ -59,6 +59,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
CZMQAbstractNotifier *notifier = factory(); CZMQAbstractNotifier *notifier = factory();
notifier->SetType(entry.first); notifier->SetType(entry.first);
notifier->SetAddress(address); notifier->SetAddress(address);
notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(notifier); notifiers.push_back(notifier);
} }
} }
@ -102,11 +103,11 @@ bool CZMQNotificationInterface::Initialize()
CZMQAbstractNotifier *notifier = *i; CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext)) if (notifier->Initialize(pcontext))
{ {
LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
} }
else else
{ {
LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break; break;
} }
} }
@ -128,7 +129,7 @@ void CZMQNotificationInterface::Shutdown()
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{ {
CZMQAbstractNotifier *notifier = *i; CZMQAbstractNotifier *notifier = *i;
LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown(); notifier->Shutdown();
} }
zmq_ctx_term(pcontext); zmq_ctx_term(pcontext);

View file

@ -76,8 +76,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false; return false;
} }
int rc = zmq_bind(psocket, address.c_str()); LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
if (rc!=0)
int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
if (rc != 0)
{
zmqError("Failed to set outbound message high water mark");
zmq_close(psocket);
return false;
}
rc = zmq_bind(psocket, address.c_str());
if (rc != 0)
{ {
zmqError("Failed to bind address"); zmqError("Failed to bind address");
zmq_close(psocket); zmq_close(psocket);
@ -120,7 +130,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
if (count == 1) if (count == 1)
{ {
LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
int linger = 0; int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket); zmq_close(psocket);

View file

@ -12,7 +12,7 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{ {
private: private:
uint32_t nSequence; //!< upcounting per message sequence number uint32_t nSequence {0U}; //!< upcounting per message sequence number
public: public:

View file

@ -22,7 +22,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
"[\n" "[\n"
" { (json object)\n" " { (json object)\n"
" \"type\": \"pubhashtx\", (string) Type of notification\n" " \"type\": \"pubhashtx\", (string) Type of notification\n"
" \"address\": \"...\" (string) Address of the publisher\n" " \"address\": \"...\", (string) Address of the publisher\n"
" \"hwm\": n (numeric) Outbound message high water mark\n"
" },\n" " },\n"
" ...\n" " ...\n"
"]\n" "]\n"
@ -38,6 +39,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
UniValue obj(UniValue::VOBJ); UniValue obj(UniValue::VOBJ);
obj.pushKV("type", n->GetType()); obj.pushKV("type", n->GetType());
obj.pushKV("address", n->GetAddress()); obj.pushKV("address", n->GetAddress());
obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark());
result.push_back(obj); result.push_back(obj);
} }
} }

View file

@ -121,10 +121,10 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Test the getzmqnotifications RPC") self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [ assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashblock", "address": ADDRESS}, {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000},
{"type": "pubhashtx", "address": ADDRESS}, {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000},
{"type": "pubrawblock", "address": ADDRESS}, {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000},
{"type": "pubrawtx", "address": ADDRESS}, {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000},
]) ])
assert_equal(self.nodes[1].getzmqnotifications(), []) assert_equal(self.nodes[1].getzmqnotifications(), [])