lbrycrd/src/zmq/zmqnotificationinterface.cpp
João Barbosa de0499d3b8 Fix ZMQ Notification initialization and shutdown
Moves the call Initialize() from init.cpp to CreateWithArguments() and handles the
return value. Moves the call Shutdown() from init.cpp to destructor.
Changes Initialize() and Shutdown() to protected members.
2015-11-04 10:36:00 +00:00

161 lines
4.5 KiB
C++

// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqnotificationinterface.h"
#include "zmqpublishnotifier.h"
#include "version.h"
#include "main.h"
#include "streams.h"
#include "util.h"
void zmqError(const char *str)
{
LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
}
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
{
}
CZMQNotificationInterface::~CZMQNotificationInterface()
{
Shutdown();
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
delete *i;
}
}
CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
{
CZMQNotificationInterface* notificationInterface = NULL;
std::map<std::string, CZMQNotifierFactory> factories;
std::list<CZMQAbstractNotifier*> notifiers;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
{
std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
if (j!=args.end())
{
CZMQNotifierFactory factory = i->second;
std::string address = j->second;
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(i->first);
notifier->SetAddress(address);
notifiers.push_back(notifier);
}
}
if (!notifiers.empty())
{
notificationInterface = new CZMQNotificationInterface();
notificationInterface->notifiers = notifiers;
if (!notificationInterface->Initialize())
{
delete notificationInterface;
notificationInterface = NULL;
}
}
return notificationInterface;
}
// Called at startup to conditionally set up ZMQ socket(s)
bool CZMQNotificationInterface::Initialize()
{
LogPrint("zmq", "Initialize notification interface\n");
assert(!pcontext);
pcontext = zmq_init(1);
if (!pcontext)
{
zmqError("Unable to initialize context");
return false;
}
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
for (; i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
if (i!=notifiers.end())
{
Shutdown();
return false;
}
return true;
}
// Called during shutdown sequence
void CZMQNotificationInterface::Shutdown()
{
LogPrint("zmq", "Shutdown notification interface\n");
if (pcontext)
{
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_destroy(pcontext);
pcontext = 0;
}
}
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindex)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyBlock(pindex))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyTransaction(tx))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}