From 5624e055b310284e8dfd68833eea7f7246316354 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jo=C3=A3o=20Barbosa?= <joao@bitreserve.org>
Date: Thu, 7 May 2015 15:49:00 +0100
Subject: [PATCH 1/4] Add UpdatedBlockTip signal to CMainSignals and
 CValidationInterface

---
 src/main.cpp                | 1 +
 src/validationinterface.cpp | 4 ++++
 src/validationinterface.h   | 3 +++
 3 files changed, 8 insertions(+)

diff --git a/src/main.cpp b/src/main.cpp
index a880533e6..27278b977 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -2303,6 +2303,7 @@ bool ActivateBestChain(CValidationState &state, const CBlock *pblock) {
                         pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip));
             }
             // Notify external listeners about the new tip.
+            GetMainSignals().UpdatedBlockTip(hashNewTip);
             uiInterface.NotifyBlockTip(hashNewTip);
         }
     } while(pindexMostWork != chainActive.Tip());
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index d365f0300..bdb706907 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -13,6 +13,7 @@ CMainSignals& GetMainSignals()
 }
 
 void RegisterValidationInterface(CValidationInterface* pwalletIn) {
+    g_signals.UpdatedBlockTip.connect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
     g_signals.SyncTransaction.connect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2));
     g_signals.UpdatedTransaction.connect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
     g_signals.SetBestChain.connect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
@@ -32,6 +33,7 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn) {
     g_signals.SetBestChain.disconnect(boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1));
     g_signals.UpdatedTransaction.disconnect(boost::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, _1));
     g_signals.SyncTransaction.disconnect(boost::bind(&CValidationInterface::SyncTransaction, pwalletIn, _1, _2));
+    g_signals.UpdatedBlockTip.disconnect(boost::bind(&CValidationInterface::UpdatedBlockTip, pwalletIn, _1));
 }
 
 void UnregisterAllValidationInterfaces() {
@@ -43,6 +45,8 @@ void UnregisterAllValidationInterfaces() {
     g_signals.SetBestChain.disconnect_all_slots();
     g_signals.UpdatedTransaction.disconnect_all_slots();
     g_signals.SyncTransaction.disconnect_all_slots();
+    g_signals.UpdatedTransaction.disconnect_all_slots();
+    g_signals.UpdatedBlockTip.disconnect_all_slots();
 }
 
 void SyncWithWallets(const CTransaction &tx, const CBlock *pblock) {
diff --git a/src/validationinterface.h b/src/validationinterface.h
index fb0ce0bda..6f95ad74e 100644
--- a/src/validationinterface.h
+++ b/src/validationinterface.h
@@ -30,6 +30,7 @@ void SyncWithWallets(const CTransaction& tx, const CBlock* pblock = NULL);
 
 class CValidationInterface {
 protected:
+    virtual void UpdatedBlockTip(const uint256 &newHashTip) {}
     virtual void SyncTransaction(const CTransaction &tx, const CBlock *pblock) {}
     virtual void SetBestChain(const CBlockLocator &locator) {}
     virtual void UpdatedTransaction(const uint256 &hash) {}
@@ -44,6 +45,8 @@ protected:
 };
 
 struct CMainSignals {
+    /** Notifies listeners of updated block chain tip */
+    boost::signals2::signal<void (const uint256 &)> UpdatedBlockTip;
     /** Notifies listeners of updated transaction data (transaction, and optionally the block it is found in. */
     boost::signals2::signal<void (const CTransaction &, const CBlock *)> SyncTransaction;
     /** Notifies listeners of an updated transaction without new data (for now: a coinbase potentially becoming visible). */

From 1136879df8af2358efb706c5af886778fbd94989 Mon Sep 17 00:00:00 2001
From: Cory Fields <cory-nospam-@coryfields.com>
Date: Tue, 5 May 2015 20:30:20 -0400
Subject: [PATCH 2/4] Depends: Add ZeroMQ package

---
 depends/packages/packages.mk      |  2 ++
 depends/packages/zeromq.mk        | 26 ++++++++++++++++++++++++++
 qa/pull-tester/rpc-tests.sh       |  4 ++++
 qa/pull-tester/tests-config.sh.in |  1 +
 4 files changed, 33 insertions(+)
 create mode 100644 depends/packages/zeromq.mk

diff --git a/depends/packages/packages.mk b/depends/packages/packages.mk
index a0d377bb4..fc1e5f0b6 100644
--- a/depends/packages/packages.mk
+++ b/depends/packages/packages.mk
@@ -1,4 +1,6 @@
 packages:=boost openssl libevent
+packages_darwin:=zeromq
+packages_linux:=zeromq
 native_packages := native_ccache native_comparisontool
 
 qt_native_packages = native_protobuf
diff --git a/depends/packages/zeromq.mk b/depends/packages/zeromq.mk
new file mode 100644
index 000000000..24e8e5f1c
--- /dev/null
+++ b/depends/packages/zeromq.mk
@@ -0,0 +1,26 @@
+package=zeromq
+$(package)_version=4.0.4
+$(package)_download_path=http://download.zeromq.org
+$(package)_file_name=$(package)-$($(package)_version).tar.gz
+$(package)_sha256_hash=1ef71d46e94f33e27dd5a1661ed626cd39be4d2d6967792a275040e34457d399
+
+define $(package)_set_vars
+  $(package)_config_opts=--without-documentation --disable-shared
+  $(package)_config_opts_linux=--with-pic
+endef
+
+define $(package)_config_cmds
+  $($(package)_autoconf)
+endef
+
+define $(package)_build_cmds
+  $(MAKE) -C src
+endef
+
+define $(package)_stage_cmds
+  $(MAKE) -C src DESTDIR=$($(package)_staging_dir) install
+endef
+
+define $(package)_postprocess_cmds
+  rm -rf bin share
+endef
diff --git a/qa/pull-tester/rpc-tests.sh b/qa/pull-tester/rpc-tests.sh
index 514bdf564..c6181c48c 100755
--- a/qa/pull-tester/rpc-tests.sh
+++ b/qa/pull-tester/rpc-tests.sh
@@ -59,6 +59,10 @@ testScriptsExt=(
     'p2p-acceptblock.py'
 );
 
+if [ "x$ENABLE_ZMQ" = "x1" ]; then
+  testScripts=( ${testScripts[@]} 'zmq_test.py' )
+fi
+
 extArg="-extended"
 passOn=${@#$extArg}
 
diff --git a/qa/pull-tester/tests-config.sh.in b/qa/pull-tester/tests-config.sh.in
index 10f4d33e4..e881a9511 100755
--- a/qa/pull-tester/tests-config.sh.in
+++ b/qa/pull-tester/tests-config.sh.in
@@ -10,6 +10,7 @@ EXEEXT="@EXEEXT@"
 @ENABLE_WALLET_TRUE@ENABLE_WALLET=1
 @BUILD_BITCOIN_UTILS_TRUE@ENABLE_UTILS=1
 @BUILD_BITCOIND_TRUE@ENABLE_BITCOIND=1
+@ENABLE_ZMQ_TRUE@ENABLE_ZMQ=1
 
 REAL_BITCOIND="$BUILDDIR/src/bitcoind${EXEEXT}"
 REAL_BITCOINCLI="$BUILDDIR/src/bitcoin-cli${EXEEXT}"

From e6a14b64d665eb1fafd03a6bbc8d14597ce1c83c Mon Sep 17 00:00:00 2001
From: Jeff Garzik <jgarzik@bitpay.com>
Date: Tue, 18 Nov 2014 12:06:32 -0500
Subject: [PATCH 3/4] Add ZeroMQ support. Notify blocks and transactions via
 ZeroMQ

Continues Johnathan Corgan's work.
Publishing multipart messages

Bugfix: Add missing zmq header includes

Bugfix: Adjust build system to link ZeroMQ code for Qt binaries
---
 configure.ac                         |  22 ++++
 contrib/zmq/zmq_sub.py               |  37 ++++++
 doc/zmq.md                           |  98 +++++++++++++++
 src/Makefile.am                      |  26 +++-
 src/Makefile.qt.include              |   3 +
 src/Makefile.qttest.include          |   3 +
 src/Makefile.test.include            |   4 +
 src/init.cpp                         |  36 +++++-
 src/validationinterface.cpp          |   1 -
 src/zmq/zmqabstractnotifier.cpp      |  22 ++++
 src/zmq/zmqabstractnotifier.h        |  42 +++++++
 src/zmq/zmqconfig.h                  |  24 ++++
 src/zmq/zmqnotificationinterface.cpp | 155 ++++++++++++++++++++++++
 src/zmq/zmqnotificationinterface.h   |  35 ++++++
 src/zmq/zmqpublishnotifier.cpp       | 172 +++++++++++++++++++++++++++
 src/zmq/zmqpublishnotifier.h         |  41 +++++++
 16 files changed, 717 insertions(+), 4 deletions(-)
 create mode 100755 contrib/zmq/zmq_sub.py
 create mode 100644 doc/zmq.md
 create mode 100644 src/zmq/zmqabstractnotifier.cpp
 create mode 100644 src/zmq/zmqabstractnotifier.h
 create mode 100644 src/zmq/zmqconfig.h
 create mode 100644 src/zmq/zmqnotificationinterface.cpp
 create mode 100644 src/zmq/zmqnotificationinterface.h
 create mode 100644 src/zmq/zmqpublishnotifier.cpp
 create mode 100644 src/zmq/zmqpublishnotifier.h

diff --git a/configure.ac b/configure.ac
index 07ee28f84..9955ab9bf 100644
--- a/configure.ac
+++ b/configure.ac
@@ -137,6 +137,12 @@ AC_ARG_ENABLE([glibc-back-compat],
   [use_glibc_compat=$enableval],
   [use_glibc_compat=no])
 
+AC_ARG_ENABLE([zmq],
+  [AC_HELP_STRING([--disable-zmq],
+  [Disable ZMQ notifications])],
+  [use_zmq=$enableval],
+  [use_zmq=yes])
+
 AC_ARG_WITH([protoc-bindir],[AS_HELP_STRING([--with-protoc-bindir=BIN_DIR],[specify protoc bin path])], [protoc_bin_path=$withval], [])
 
 # Enable debug 
@@ -833,6 +839,22 @@ if test x$bitcoin_enable_qt != xno; then
   fi
 fi
 
+# conditional search for and use libzmq
+AC_MSG_CHECKING([whether to build ZMQ support])
+if test "x$use_zmq" = "xyes"; then
+  AC_MSG_RESULT([yes])
+  PKG_CHECK_MODULES([ZMQ],[libzmq],
+    [AC_DEFINE([ENABLE_ZMQ],[1],[Define to 1 to enable ZMQ functions])],
+    [AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
+     AC_MSG_WARN([libzmq not found, disabling])
+     use_zmq=no])
+else
+  AC_MSG_RESULT([no, --disable-zmq used])
+  AC_DEFINE_UNQUOTED([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
+fi
+
+AM_CONDITIONAL([ENABLE_ZMQ], [test "x$use_zmq" = "xyes"])
+
 AC_MSG_CHECKING([whether to build test_bitcoin])
 if test x$use_tests = xyes; then
   AC_MSG_RESULT([yes])
diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py
new file mode 100755
index 000000000..decf29d42
--- /dev/null
+++ b/contrib/zmq/zmq_sub.py
@@ -0,0 +1,37 @@
+#!/usr/bin/env python2
+
+import array
+import binascii
+import zmq
+
+port = 28332
+
+zmqContext = zmq.Context()
+zmqSubSocket = zmqContext.socket(zmq.SUB)
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock")
+zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx")
+zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
+
+try:
+    while True:
+        msg = zmqSubSocket.recv_multipart()
+        topic = str(msg[0])
+        body = msg[1]
+
+        if topic == "hashblock":
+            print "- HASH BLOCK -"
+            print binascii.hexlify(body)
+        elif topic == "hashtx":
+            print '- HASH TX -'
+            print binascii.hexlify(body)
+        elif topic == "rawblock":
+            print "- RAW BLOCK HEADER -"
+            print binascii.hexlify(body[:80])
+        elif topic == "rawtx":
+            print '- RAW TX -'
+            print binascii.hexlify(body)
+
+except KeyboardInterrupt:
+    zmqContext.destroy()
diff --git a/doc/zmq.md b/doc/zmq.md
new file mode 100644
index 000000000..fd04f6d9f
--- /dev/null
+++ b/doc/zmq.md
@@ -0,0 +1,98 @@
+# Block and Transaction Broadcasting With ZeroMQ
+
+[ZeroMQ](http://zeromq.org/) is a lightweight wrapper around TCP
+connections, inter-process communications, and shared-memory,
+providing various message-oriented semantics such as publish/subcribe,
+request/reply, and push/pull.
+
+The Bitcoin Core daemon can be configured to act as a trusted "border
+router", implementing the bitcoin wire protocol and relay, making
+consensus decisions, maintaining the local blockchain database,
+broadcasting locally generated transactions into the network, and
+providing a queryable RPC interface to interact on a polled basis for
+requesting blockchain related data. However, there exists only a
+limited service to notify external software of events like the arrival
+of new blocks or transactions.
+
+The ZeroMQ facility implements a notification interface through a
+set of specific notifiers. Currently there are notifiers that publish
+blocks and transactions. This read-only facility requires only the
+connection of a corresponding ZeroMQ subscriber port in receiving 
+software; it is not authenticated nor is there any two-way protocol
+involvement. Therefore, subscribers should validate the received data
+since it may be out of date, incomplete or even invalid.
+
+ZeroMQ sockets are self-connecting and self-healing; that is, connects
+made between two endpoints will be automatically restored after an
+outage, and either end may be freely started or stopped in any order.
+
+Because ZeroMQ is message oriented, subscribers receive transactions
+and blocks all-at-once and do not need to implement any sort of
+buffering or reassembly.
+
+## Prerequisites
+
+The ZeroMQ feature in Bitcoin Core uses only a very small part of the
+ZeroMQ C API, and is thus compatible with any version of ZeroMQ
+from 2.1 onward, including all versions in the 3.x and 4.x release
+series. Typically, it is packaged by distributions as something like
+*libzmq-dev*.
+
+The C++ wrapper for ZeroMQ is *not* needed.
+
+## Enabling
+
+By default, the ZeroMQ port functionality is enabled. Two steps are
+required to enable--compiling in the ZeroMQ code, and configuring
+runtime operation on the command-line or configuration file.
+
+    $ ./configure --enable-zmq (other options)
+
+This will produce a binary that is capable of providing the ZeroMQ
+facility, but will not do so until also configured properly.
+
+## Usage
+
+Currently, the following notifications are supported:
+
+    -zmqpubhashtx=address
+    -zmqpubhashblock=address
+    -zmqpubrawblock=address
+    -zmqpubrawtx=address
+
+The socket type is PUB and the address must be a valid ZeroMQ
+socket address. The same address can be used in more than one notification.
+
+For instance:
+
+    $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw
+
+Each PUB notification has a topic and body, where the header
+corresponds to the notification type. For instance, for the notification
+`-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the
+hexadecimal transaction hash (32 bytes).
+
+These options can also be provided in bitcoin.conf.
+
+ZeroMQ endpoint specifiers for TCP (and others) are documented in the
+[ZeroMQ API](http://api.zeromq.org).
+
+Client side, then, the ZeroMQ subscriber socket must have the
+ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without
+doing so will result in no messages arriving. Please see `contrib/zmq/zmq_sub.py`
+for a working example.
+
+## Remarks
+
+From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB
+sockets don't even have a read function. Thus, there is no state
+introduced into bitcoind directly. Furthermore, no information is
+broadcast that wasn't already received from the public P2P network.
+
+No authentication or authorization is done on connecting clients; it
+is assumed that the ZeroMQ port is exposed only to trusted entities,
+using other means such as firewalling.
+
+Note that when the block chain tip changes, a reorganisation may occur and just
+the tip will be notified. It is up to the subscriber to retrieve the chain
+from the last known block to the new tip.
diff --git a/src/Makefile.am b/src/Makefile.am
index 390e9f143..67e848be3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -48,6 +48,9 @@ if ENABLE_WALLET
 BITCOIN_INCLUDES += $(BDB_CPPFLAGS)
 EXTRA_LIBRARIES += libbitcoin_wallet.a
 endif
+if ENABLE_ZMQ
+EXTRA_LIBRARIES += libbitcoin_zmq.a
+endif
 
 if BUILD_BITCOIN_LIBS
 lib_LTLIBRARIES = libbitcoinconsensus.la
@@ -157,7 +160,12 @@ BITCOIN_CORE_H = \
   wallet/db.h \
   wallet/wallet.h \
   wallet/wallet_ismine.h \
-  wallet/walletdb.h
+  wallet/walletdb.h \
+  zmq/zmqabstractnotifier.h \
+  zmq/zmqconfig.h\
+  zmq/zmqnotificationinterface.h \
+  zmq/zmqpublishnotifier.h
+
 
 obj/build.h: FORCE
 	@$(MKDIR_P) $(builddir)/obj
@@ -199,6 +207,17 @@ libbitcoin_server_a_SOURCES = \
   validationinterface.cpp \
   $(BITCOIN_CORE_H)
 
+if ENABLE_ZMQ
+LIBBITCOIN_ZMQ=libbitcoin_zmq.a
+
+libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES)
+libbitcoin_zmq_a_SOURCES = \
+  zmq/zmqabstractnotifier.cpp \
+  zmq/zmqnotificationinterface.cpp \
+  zmq/zmqpublishnotifier.cpp
+endif
+
+
 # wallet: shared between bitcoind and bitcoin-qt, but only linked
 # when wallet enabled
 libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
@@ -320,12 +339,15 @@ bitcoind_LDADD = \
   $(LIBMEMENV) \
   $(LIBSECP256K1)
 
+if ENABLE_ZMQ
+bitcoind_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
+
 if ENABLE_WALLET
 bitcoind_LDADD += libbitcoin_wallet.a
 endif
 
 bitcoind_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
-#
 
 # bitcoin-cli binary #
 bitcoin_cli_SOURCES = bitcoin-cli.cpp
diff --git a/src/Makefile.qt.include b/src/Makefile.qt.include
index 8d60aca25..3e8eda178 100644
--- a/src/Makefile.qt.include
+++ b/src/Makefile.qt.include
@@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER)
 if ENABLE_WALLET
 qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
 endif
+if ENABLE_ZMQ
+qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
 qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
   $(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
   $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
diff --git a/src/Makefile.qttest.include b/src/Makefile.qttest.include
index 4250bb8f3..6554580be 100644
--- a/src/Makefile.qttest.include
+++ b/src/Makefile.qttest.include
@@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER)
 if ENABLE_WALLET
 qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
 endif
+if ENABLE_ZMQ
+qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
+endif
 qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \
   $(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \
   $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
diff --git a/src/Makefile.test.include b/src/Makefile.test.include
index cc60cd92b..cee35926a 100644
--- a/src/Makefile.test.include
+++ b/src/Makefile.test.include
@@ -100,6 +100,10 @@ endif
 test_test_bitcoin_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS)
 test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static
 
+if ENABLE_ZMQ
+test_test_bitcoin_LDADD += $(ZMQ_LIBS)
+endif
+
 nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES)
 
 $(BITCOIN_TESTS): $(GENERATED_TEST_FILES)
diff --git a/src/init.cpp b/src/init.cpp
index a12e38ff5..f03388120 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -38,7 +38,6 @@
 #include "wallet/wallet.h"
 #include "wallet/walletdb.h"
 #endif
-
 #include <stdint.h>
 #include <stdio.h>
 
@@ -55,6 +54,10 @@
 #include <boost/thread.hpp>
 #include <openssl/crypto.h>
 
+#if ENABLE_ZMQ
+#include "zmq/zmqnotificationinterface.h"
+#endif
+
 using namespace std;
 
 #ifdef ENABLE_WALLET
@@ -62,6 +65,10 @@ CWallet* pwalletMain = NULL;
 #endif
 bool fFeeEstimatesInitialized = false;
 
+#if ENABLE_ZMQ
+static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
+#endif
+
 #ifdef WIN32
 // Win32 LevelDB doesn't use filedescriptors, and the ones used for
 // accessing block files don't count towards the fd_set size limit
@@ -211,6 +218,16 @@ void Shutdown()
     if (pwalletMain)
         pwalletMain->Flush(true);
 #endif
+
+#if ENABLE_ZMQ
+    if (pzmqNotificationInterface) {
+        UnregisterValidationInterface(pzmqNotificationInterface);
+        pzmqNotificationInterface->Shutdown();
+        delete pzmqNotificationInterface;
+        pzmqNotificationInterface = NULL;
+    }
+#endif
+
 #ifndef WIN32
     try {
         boost::filesystem::remove(GetPidFile());
@@ -375,6 +392,14 @@ std::string HelpMessage(HelpMessageMode mode)
         " " + _("(1 = keep tx meta data e.g. account owner and payment request information, 2 = drop tx meta data)"));
 #endif
 
+#if ENABLE_ZMQ
+    strUsage += HelpMessageGroup(_("ZeroMQ notification options:"));
+    strUsage += HelpMessageOpt("-zmqpubhashblock=<address>", _("Enable publish hash block in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubhashtransaction=<address>", _("Enable publish hash transaction in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubrawblock=<address>", _("Enable publish raw block in <address>"));
+    strUsage += HelpMessageOpt("-zmqpubrawtransaction=<address>", _("Enable publish raw transaction in <address>"));
+#endif
+
     strUsage += HelpMessageGroup(_("Debugging/Testing options:"));
     if (showDebug)
     {
@@ -1125,6 +1150,15 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
     BOOST_FOREACH(const std::string& strDest, mapMultiArgs["-seednode"])
         AddOneShot(strDest);
 
+#if ENABLE_ZMQ
+    pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs);
+
+    if (pzmqNotificationInterface) {
+        pzmqNotificationInterface->Initialize();
+        RegisterValidationInterface(pzmqNotificationInterface);
+    }
+#endif
+
     // ********************************************************* Step 7: load block chain
 
     fReindex = GetBoolArg("-reindex", false);
diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp
index bdb706907..81f3b775f 100644
--- a/src/validationinterface.cpp
+++ b/src/validationinterface.cpp
@@ -45,7 +45,6 @@ void UnregisterAllValidationInterfaces() {
     g_signals.SetBestChain.disconnect_all_slots();
     g_signals.UpdatedTransaction.disconnect_all_slots();
     g_signals.SyncTransaction.disconnect_all_slots();
-    g_signals.UpdatedTransaction.disconnect_all_slots();
     g_signals.UpdatedBlockTip.disconnect_all_slots();
 }
 
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
new file mode 100644
index 000000000..744ec5923
--- /dev/null
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -0,0 +1,22 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqabstractnotifier.h"
+#include "util.h"
+
+
+CZMQAbstractNotifier::~CZMQAbstractNotifier()
+{
+    assert(!psocket);
+}
+
+bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/)
+{
+    return true;
+}
+
+bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
+{
+    return true;
+}
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
new file mode 100644
index 000000000..626d1ddf9
--- /dev/null
+++ b/src/zmq/zmqabstractnotifier.h
@@ -0,0 +1,42 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
+
+#include "zmqconfig.h"
+
+class CZMQAbstractNotifier;
+typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
+
+class CZMQAbstractNotifier
+{
+public:
+    CZMQAbstractNotifier() : psocket(0) { }
+    virtual ~CZMQAbstractNotifier();
+
+    template <typename T>
+    static CZMQAbstractNotifier* Create()
+    {
+        return new T();
+    }
+
+    std::string GetType() const { return type; }
+    void SetType(const std::string &t) { type = t; }
+    std::string GetAddress() const { return address; }
+    void SetAddress(const std::string &a) { address = a; }
+
+    virtual bool Initialize(void *pcontext) = 0;
+    virtual void Shutdown() = 0;
+
+    virtual bool NotifyBlock(const uint256 &hash);
+    virtual bool NotifyTransaction(const CTransaction &transaction);
+
+protected:
+    void *psocket;
+    std::string type;
+    std::string address;
+};
+
+#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h
new file mode 100644
index 000000000..6057f5d1a
--- /dev/null
+++ b/src/zmq/zmqconfig.h
@@ -0,0 +1,24 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
+#define BITCOIN_ZMQ_ZMQCONFIG_H
+
+#if defined(HAVE_CONFIG_H)
+#include "config/bitcoin-config.h"
+#endif
+
+#include <stdarg.h>
+#include <string>
+
+#if ENABLE_ZMQ
+#include <zmq.h>
+#endif
+
+#include "primitives/block.h"
+#include "primitives/transaction.h"
+
+void zmqError(const char *str);
+
+#endif // BITCOIN_ZMQ_ZMQCONFIG_H
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
new file mode 100644
index 000000000..71ccb59a4
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -0,0 +1,155 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqnotificationinterface.h"
+#include "zmqpublishnotifier.h"
+
+#include "version.h"
+#include "main.h"
+#include "streams.h"
+#include "util.h"
+
+void zmqError(const char *str)
+{
+    LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
+}
+
+CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
+{
+}
+
+CZMQNotificationInterface::~CZMQNotificationInterface()
+{
+    // ensure Shutdown if Initialize is called
+    assert(!pcontext);
+
+    for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+    {
+        delete *i;
+    }
+}
+
+CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
+{
+    CZMQNotificationInterface* notificationInterface = NULL;
+    std::map<std::string, CZMQNotifierFactory> factories;
+    std::list<CZMQAbstractNotifier*> notifiers;
+
+    factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
+    factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
+    factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
+    factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
+
+    for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
+    {
+        std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
+        if (j!=args.end())
+        {
+            CZMQNotifierFactory factory = i->second;
+            std::string address = j->second;
+            CZMQAbstractNotifier *notifier = factory();
+            notifier->SetType(i->first);
+            notifier->SetAddress(address);
+            notifiers.push_back(notifier);
+        }
+    }
+
+    if (!notifiers.empty())
+    {
+        notificationInterface = new CZMQNotificationInterface();
+        notificationInterface->notifiers = notifiers;
+    }
+
+    return notificationInterface;
+}
+
+// Called at startup to conditionally set up ZMQ socket(s)
+bool CZMQNotificationInterface::Initialize()
+{
+    LogPrint("zmq", "Initialize notification interface\n");
+    assert(!pcontext);
+
+    pcontext = zmq_init(1);
+
+    if (!pcontext)
+    {
+        zmqError("Unable to initialize context");
+        return false;
+    }
+
+    std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
+    for (; i!=notifiers.end(); ++i)
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->Initialize(pcontext))
+        {
+            LogPrint("zmq", "  Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+        }
+        else
+        {
+            LogPrint("zmq", "  Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
+            break;
+        }
+    }
+
+    if (i!=notifiers.end())
+    {
+        Shutdown();
+        return false;
+    }
+
+    return false;
+}
+
+// Called during shutdown sequence
+void CZMQNotificationInterface::Shutdown()
+{
+    LogPrint("zmq", "Shutdown notification interface\n");
+    if (pcontext)
+    {
+        for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
+        {
+            CZMQAbstractNotifier *notifier = *i;
+            LogPrint("zmq", "   Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
+            notifier->Shutdown();
+        }
+        zmq_ctx_destroy(pcontext);
+
+        pcontext = 0;
+    }
+}
+
+void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash)
+{
+    for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->NotifyBlock(hash))
+        {
+            i++;
+        }
+        else
+        {
+            notifier->Shutdown();
+            i = notifiers.erase(i);
+        }
+    }
+}
+
+void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
+{
+    for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
+    {
+        CZMQAbstractNotifier *notifier = *i;
+        if (notifier->NotifyTransaction(tx))
+        {
+            i++;
+        }
+        else
+        {
+            notifier->Shutdown();
+            i = notifiers.erase(i);
+        }
+    }
+}
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
new file mode 100644
index 000000000..afc0b8d24
--- /dev/null
+++ b/src/zmq/zmqnotificationinterface.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
+
+#include "validationinterface.h"
+#include <string>
+#include <map>
+
+class CZMQAbstractNotifier;
+
+class CZMQNotificationInterface : public CValidationInterface
+{
+public:
+    virtual ~CZMQNotificationInterface();
+
+    static CZMQNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args);
+
+    bool Initialize();
+    void Shutdown();
+
+protected: // CValidationInterface
+    void SyncTransaction(const CTransaction &tx, const CBlock *pblock);
+    void UpdatedBlockTip(const uint256 &newHashTip);
+
+private:
+    CZMQNotificationInterface();
+
+    void *pcontext;
+    std::list<CZMQAbstractNotifier*> notifiers;
+};
+
+#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
new file mode 100644
index 000000000..0a6d7d0db
--- /dev/null
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -0,0 +1,172 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#include "zmqpublishnotifier.h"
+#include "main.h"
+#include "util.h"
+
+static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
+
+// Internal function to send multipart message
+static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
+{
+    va_list args;
+    va_start(args, size);
+
+    while (1)
+    {
+        zmq_msg_t msg;
+
+        int rc = zmq_msg_init_size(&msg, size);
+        if (rc != 0)
+        {
+            zmqError("Unable to initialize ZMQ msg");
+            return -1;
+        }
+
+        void *buf = zmq_msg_data(&msg);
+        memcpy(buf, data, size);
+
+        data = va_arg(args, const void*);
+
+        rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
+        if (rc == -1)
+        {
+            zmqError("Unable to send ZMQ msg");
+            zmq_msg_close(&msg);
+            return -1;
+        }
+
+        zmq_msg_close(&msg);
+
+        if (!data)
+            break;
+
+        size = va_arg(args, size_t);
+    }
+    return 0;
+}
+
+bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
+{
+    assert(!psocket);
+
+    // check if address is being used by other publish notifier
+    std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
+
+    if (i==mapPublishNotifiers.end())
+    {
+        psocket = zmq_socket(pcontext, ZMQ_PUB);
+        if (!psocket)
+        {
+            zmqError("Failed to create socket");
+            return false;
+        }
+
+        int rc = zmq_bind(psocket, address.c_str());
+        if (rc!=0)
+        {
+            zmqError("Failed to bind address");
+            return false;
+        }
+
+        // register this notifier for the address, so it can be reused for other publish notifier
+        mapPublishNotifiers.insert(std::make_pair(address, this));
+        return true;
+    }
+    else
+    {
+        LogPrint("zmq", "  Reuse socket for address %s\n", address);
+
+        psocket = i->second->psocket;
+        mapPublishNotifiers.insert(std::make_pair(address, this));
+
+        return true;
+    }
+}
+
+void CZMQAbstractPublishNotifier::Shutdown()
+{
+    assert(psocket);
+
+    int count = mapPublishNotifiers.count(address);
+
+    // remove this notifier from the list of publishers using this address
+    typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
+    std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
+
+    for (iterator it = iterpair.first; it != iterpair.second; ++it)
+    {
+        if (it->second==this)
+        {
+            mapPublishNotifiers.erase(it);
+            break;
+        }
+    }
+
+    if (count == 1)
+    {
+        LogPrint("zmq", "Close socket at address %s\n", address);
+        int linger = 0;
+        zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
+        zmq_close(psocket);
+    }
+
+    psocket = 0;
+}
+
+bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+    LogPrint("zmq", "Publish hash block %s\n", hash.GetHex());
+    char data[32];
+    for (unsigned int i = 0; i < 32; i++)
+        data[31 - i] = hash.begin()[i];
+    int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
+    return rc == 0;
+}
+
+bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+    uint256 hash = transaction.GetHash();
+    LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex());
+    char data[32];
+    for (unsigned int i = 0; i < 32; i++)
+        data[31 - i] = hash.begin()[i];
+    int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
+    return rc == 0;
+}
+
+bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash)
+{
+    LogPrint("zmq", "Publish raw block %s\n", hash.GetHex());
+
+    CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+    {
+        LOCK(cs_main);
+
+        CBlock block;
+        CBlockIndex* pblockindex = mapBlockIndex[hash];
+
+        if(!ReadBlockFromDisk(block, pblockindex))
+        {
+            zmqError("Can't read block from disk");
+            return false;
+        }
+
+        ss << block;
+    }
+
+    int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
+    return rc == 0;
+}
+
+bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
+{
+    uint256 hash = transaction.GetHash();
+    LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex());
+    CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
+    ss << transaction;
+    int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
+    return rc == 0;
+}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
new file mode 100644
index 000000000..a0eb26f5e
--- /dev/null
+++ b/src/zmq/zmqpublishnotifier.h
@@ -0,0 +1,41 @@
+// Copyright (c) 2015 The Bitcoin Core developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
+
+#include "zmqabstractnotifier.h"
+
+class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
+{
+public:
+    bool Initialize(void *pcontext);
+    void Shutdown();
+};
+
+class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyBlock(const uint256 &hash);
+};
+
+class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyTransaction(const CTransaction &transaction);
+};
+
+class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyBlock(const uint256 &hash);
+};
+
+class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+    bool NotifyTransaction(const CTransaction &transaction);
+};
+
+#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H

From 029e278286cb861901c9cb8e1b84855ec1640aac Mon Sep 17 00:00:00 2001
From: Jonas Schnelli <jonas.schnelli@include7.ch>
Date: Tue, 5 May 2015 13:19:19 +0200
Subject: [PATCH 4/4] QA: Add ZeroMQ RPC test

---
 qa/rpc-tests/zmq_test.py | 93 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 93 insertions(+)
 create mode 100755 qa/rpc-tests/zmq_test.py

diff --git a/qa/rpc-tests/zmq_test.py b/qa/rpc-tests/zmq_test.py
new file mode 100755
index 000000000..fffaf677d
--- /dev/null
+++ b/qa/rpc-tests/zmq_test.py
@@ -0,0 +1,93 @@
+#!/usr/bin/env python2
+# Copyright (c) 2015 The Bitcoin Core developers
+# Distributed under the MIT software license, see the accompanying
+# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+#
+# Test ZMQ interface
+#
+
+from test_framework.test_framework import BitcoinTestFramework
+from test_framework.util import *
+import zmq
+import binascii
+from test_framework.mininode import hash256
+
+try:
+    import http.client as httplib
+except ImportError:
+    import httplib
+try:
+    import urllib.parse as urlparse
+except ImportError:
+    import urlparse
+
+class ZMQTest (BitcoinTestFramework):
+
+    port = 28332
+
+    def setup_nodes(self):
+        self.zmqContext = zmq.Context()
+        self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
+        self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
+        self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
+        self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % self.port)
+        # Note: proxies are not used to connect to local nodes
+        # this is because the proxy to use is based on CService.GetNetwork(), which return NET_UNROUTABLE for localhost
+        return start_nodes(4, self.options.tmpdir, extra_args=[
+            ['-zmqpubhashtx=tcp://127.0.0.1:'+str(self.port), '-zmqpubhashblock=tcp://127.0.0.1:'+str(self.port)],
+            [],
+            [],
+            []
+            ])
+
+    def run_test(self):
+        self.sync_all()
+
+        genhashes = self.nodes[0].generate(1);
+        self.sync_all()
+
+        print "listen..."
+        msg = self.zmqSubSocket.recv_multipart()
+        topic = str(msg[0])
+        body = msg[1]
+
+        msg = self.zmqSubSocket.recv_multipart()
+        topic = str(msg[0])
+        body = msg[1]
+        blkhash = binascii.hexlify(body)
+
+        assert_equal(genhashes[0], blkhash) #blockhash from generate must be equal to the hash received over zmq
+
+        n = 10
+        genhashes = self.nodes[1].generate(n);
+        self.sync_all()
+
+        zmqHashes = []
+        for x in range(0,n*2):
+            msg = self.zmqSubSocket.recv_multipart()
+            topic = str(msg[0])
+            body = msg[1]
+            if topic == "hashblock":
+                zmqHashes.append(binascii.hexlify(body))
+
+        for x in range(0,n):
+            assert_equal(genhashes[x], zmqHashes[x]) #blockhash from generate must be equal to the hash received over zmq
+
+        #test tx from a second node
+        hashRPC = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
+        self.sync_all()
+
+        #now we should receive a zmq msg because the tx was broadcastet
+        msg = self.zmqSubSocket.recv_multipart()
+        topic = str(msg[0])
+        body = msg[1]
+        hashZMQ = ""
+        if topic == "hashtx":
+            hashZMQ = binascii.hexlify(body)
+
+        assert_equal(hashRPC, hashZMQ) #blockhash from generate must be equal to the hash received over zmq
+
+
+if __name__ == '__main__':
+    ZMQTest ().main ()