lbrycrd/src/zmq/zmqnotificationinterface.cpp
Wladimir J. van der Laan dac2caa371
Merge #14060: ZMQ: add options to configure outbound message high water mark, aka SNDHWM
a4edb168b6 ZMQ: add options to configure outbound message high water mark, aka SNDHWM (mruddy)

Pull request description:

  ZMQ: add options to configure outbound message high water mark, aka SNDHWM

  This is my attempt at https://github.com/bitcoin/bitcoin/pull/13315

Tree-SHA512: a4cc3bcf179776899261a97c8c4f31f35d1d8950fd71a09a79c5c064879b38e600b26824c89c4091d941502ed5b0255390882f7d44baf9e6dc49d685a86e8edb
2018-11-05 13:45:41 +01:00

199 lines
6 KiB
C++

// Copyright (c) 2015-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <zmq/zmqnotificationinterface.h>
#include <zmq/zmqpublishnotifier.h>
#include <version.h>
#include <validation.h>
#include <streams.h>
#include <util/system.h>
void zmqError(const char *str)
{
LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
}
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(nullptr)
{
}
CZMQNotificationInterface::~CZMQNotificationInterface()
{
Shutdown();
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
delete *i;
}
}
std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
{
std::list<const CZMQAbstractNotifier*> result;
for (const auto* n : notifiers) {
result.push_back(n);
}
return result;
}
CZMQNotificationInterface* CZMQNotificationInterface::Create()
{
CZMQNotificationInterface* notificationInterface = nullptr;
std::map<std::string, CZMQNotifierFactory> factories;
std::list<CZMQAbstractNotifier*> notifiers;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
for (const auto& entry : factories)
{
std::string arg("-zmq" + entry.first);
if (gArgs.IsArgSet(arg))
{
CZMQNotifierFactory factory = entry.second;
std::string address = gArgs.GetArg(arg, "");
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);
notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(notifier);
}
}
if (!notifiers.empty())
{
notificationInterface = new CZMQNotificationInterface();
notificationInterface->notifiers = notifiers;
if (!notificationInterface->Initialize())
{
delete notificationInterface;
notificationInterface = nullptr;
}
}
return notificationInterface;
}
// Called at startup to conditionally set up ZMQ socket(s)
bool CZMQNotificationInterface::Initialize()
{
int major = 0, minor = 0, patch = 0;
zmq_version(&major, &minor, &patch);
LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
assert(!pcontext);
pcontext = zmq_ctx_new();
if (!pcontext)
{
zmqError("Unable to initialize context");
return false;
}
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
for (; i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
if (i!=notifiers.end())
{
return false;
}
return true;
}
// Called during shutdown sequence
void CZMQNotificationInterface::Shutdown()
{
LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
if (pcontext)
{
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_term(pcontext);
pcontext = nullptr;
}
}
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
{
if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
return;
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyBlock(pindexNew))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx)
{
// Used by BlockConnected and BlockDisconnected as well, because they're
// all the same external callback.
const CTransaction& tx = *ptx;
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyTransaction(tx))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected, const std::vector<CTransactionRef>& vtxConflicted)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction added in the block
TransactionAddedToMempool(ptx);
}
}
void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock)
{
for (const CTransactionRef& ptx : pblock->vtx) {
// Do a normal notify for each transaction removed in block disconnection
TransactionAddedToMempool(ptx);
}
}
CZMQNotificationInterface* g_zmq_notification_interface = nullptr;