Add ZeroMQ support. Notify blocks and transactions via ZeroMQ

Continues Johnathan Corgan's work.
Publishing multipart messages

Bugfix: Add missing zmq header includes

Bugfix: Adjust build system to link ZeroMQ code for Qt binaries
This commit is contained in:
Jeff Garzik
2014-11-18 12:06:32 -05:00
committed by João Barbosa
parent 1136879df8
commit e6a14b64d6
16 changed files with 717 additions and 4 deletions

View File

@@ -48,6 +48,9 @@ if ENABLE_WALLET
BITCOIN_INCLUDES += $(BDB_CPPFLAGS)
EXTRA_LIBRARIES += libbitcoin_wallet.a
endif
if ENABLE_ZMQ
EXTRA_LIBRARIES += libbitcoin_zmq.a
endif
if BUILD_BITCOIN_LIBS
lib_LTLIBRARIES = libbitcoinconsensus.la
@@ -157,7 +160,12 @@ BITCOIN_CORE_H = \
wallet/db.h \
wallet/wallet.h \
wallet/wallet_ismine.h \
wallet/walletdb.h
wallet/walletdb.h \
zmq/zmqabstractnotifier.h \
zmq/zmqconfig.h\
zmq/zmqnotificationinterface.h \
zmq/zmqpublishnotifier.h
obj/build.h: FORCE
@$(MKDIR_P) $(builddir)/obj
@@ -199,6 +207,17 @@ libbitcoin_server_a_SOURCES = \
validationinterface.cpp \
$(BITCOIN_CORE_H)
if ENABLE_ZMQ
LIBBITCOIN_ZMQ=libbitcoin_zmq.a
libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES)
libbitcoin_zmq_a_SOURCES = \
zmq/zmqabstractnotifier.cpp \
zmq/zmqnotificationinterface.cpp \
zmq/zmqpublishnotifier.cpp
endif
# wallet: shared between bitcoind and bitcoin-qt, but only linked
# when wallet enabled
libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
@@ -320,12 +339,15 @@ bitcoind_LDADD = \
$(LIBMEMENV) \
$(LIBSECP256K1)
if ENABLE_ZMQ
bitcoind_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
if ENABLE_WALLET
bitcoind_LDADD += libbitcoin_wallet.a
endif
bitcoind_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
#
# bitcoin-cli binary #
bitcoin_cli_SOURCES = bitcoin-cli.cpp

View File

@@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER)
if ENABLE_WALLET
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
endif
if ENABLE_ZMQ
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
$(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
$(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)

View File

@@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER)
if ENABLE_WALLET
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
endif
if ENABLE_ZMQ
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \
$(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \
$(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \

View File

@@ -100,6 +100,10 @@ endif
test_test_bitcoin_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS)
test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static
if ENABLE_ZMQ
test_test_bitcoin_LDADD += $(ZMQ_LIBS)
endif
nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES)
$(BITCOIN_TESTS): $(GENERATED_TEST_FILES)

View File

@@ -38,7 +38,6 @@
#include "wallet/wallet.h"
#include "wallet/walletdb.h"
#endif
#include <stdint.h>
#include <stdio.h>
@@ -55,6 +54,10 @@
#include <boost/thread.hpp>
#include <openssl/crypto.h>
#if ENABLE_ZMQ
#include "zmq/zmqnotificationinterface.h"
#endif
using namespace std;
#ifdef ENABLE_WALLET
@@ -62,6 +65,10 @@ CWallet* pwalletMain = NULL;
#endif
bool fFeeEstimatesInitialized = false;
#if ENABLE_ZMQ
static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
#endif
#ifdef WIN32
// Win32 LevelDB doesn't use filedescriptors, and the ones used for
// accessing block files don't count towards the fd_set size limit
@@ -211,6 +218,16 @@ void Shutdown()
if (pwalletMain)
pwalletMain->Flush(true);
#endif
#if ENABLE_ZMQ
if (pzmqNotificationInterface) {
UnregisterValidationInterface(pzmqNotificationInterface);
pzmqNotificationInterface->Shutdown();
delete pzmqNotificationInterface;
pzmqNotificationInterface = NULL;
}
#endif
#ifndef WIN32
try {
boost::filesystem::remove(GetPidFile());
@@ -375,6 +392,14 @@ std::string HelpMessage(HelpMessageMode mode)
" " + _("(1 = keep tx meta data e.g. account owner and payment request information, 2 = drop tx meta data)"));
#endif
#if ENABLE_ZMQ
strUsage += HelpMessageGroup(_("ZeroMQ notification options:"));
strUsage += HelpMessageOpt("-zmqpubhashblock=<address>", _("Enable publish hash block in <address>"));
strUsage += HelpMessageOpt("-zmqpubhashtransaction=<address>", _("Enable publish hash transaction in <address>"));
strUsage += HelpMessageOpt("-zmqpubrawblock=<address>", _("Enable publish raw block in <address>"));
strUsage += HelpMessageOpt("-zmqpubrawtransaction=<address>", _("Enable publish raw transaction in <address>"));
#endif
strUsage += HelpMessageGroup(_("Debugging/Testing options:"));
if (showDebug)
{
@@ -1125,6 +1150,15 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
BOOST_FOREACH(const std::string& strDest, mapMultiArgs["-seednode"])
AddOneShot(strDest);
#if ENABLE_ZMQ
pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs);
if (pzmqNotificationInterface) {
pzmqNotificationInterface->Initialize();
RegisterValidationInterface(pzmqNotificationInterface);
}
#endif
// ********************************************************* Step 7: load block chain
fReindex = GetBoolArg("-reindex", false);

View File

@@ -45,7 +45,6 @@ void UnregisterAllValidationInterfaces() {
g_signals.SetBestChain.disconnect_all_slots();
g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.SyncTransaction.disconnect_all_slots();
g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.UpdatedBlockTip.disconnect_all_slots();
}

View File

@@ -0,0 +1,22 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqabstractnotifier.h"
#include "util.h"
CZMQAbstractNotifier::~CZMQAbstractNotifier()
{
assert(!psocket);
}
bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
{
return true;
}

View File

@@ -0,0 +1,42 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H
#include "zmqconfig.h"
class CZMQAbstractNotifier;
typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)();
class CZMQAbstractNotifier
{
public:
CZMQAbstractNotifier() : psocket(0) { }
virtual ~CZMQAbstractNotifier();
template <typename T>
static CZMQAbstractNotifier* Create()
{
return new T();
}
std::string GetType() const { return type; }
void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; }
virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0;
virtual bool NotifyBlock(const uint256 &hash);
virtual bool NotifyTransaction(const CTransaction &transaction);
protected:
void *psocket;
std::string type;
std::string address;
};
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H

24
src/zmq/zmqconfig.h Normal file
View File

@@ -0,0 +1,24 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQCONFIG_H
#define BITCOIN_ZMQ_ZMQCONFIG_H
#if defined(HAVE_CONFIG_H)
#include "config/bitcoin-config.h"
#endif
#include <stdarg.h>
#include <string>
#if ENABLE_ZMQ
#include <zmq.h>
#endif
#include "primitives/block.h"
#include "primitives/transaction.h"
void zmqError(const char *str);
#endif // BITCOIN_ZMQ_ZMQCONFIG_H

View File

@@ -0,0 +1,155 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqnotificationinterface.h"
#include "zmqpublishnotifier.h"
#include "version.h"
#include "main.h"
#include "streams.h"
#include "util.h"
void zmqError(const char *str)
{
LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno));
}
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
{
}
CZMQNotificationInterface::~CZMQNotificationInterface()
{
// ensure Shutdown if Initialize is called
assert(!pcontext);
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
delete *i;
}
}
CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map<std::string, std::string> &args)
{
CZMQNotificationInterface* notificationInterface = NULL;
std::map<std::string, CZMQNotifierFactory> factories;
std::list<CZMQAbstractNotifier*> notifiers;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
{
std::map<std::string, std::string>::const_iterator j = args.find("-zmq" + i->first);
if (j!=args.end())
{
CZMQNotifierFactory factory = i->second;
std::string address = j->second;
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(i->first);
notifier->SetAddress(address);
notifiers.push_back(notifier);
}
}
if (!notifiers.empty())
{
notificationInterface = new CZMQNotificationInterface();
notificationInterface->notifiers = notifiers;
}
return notificationInterface;
}
// Called at startup to conditionally set up ZMQ socket(s)
bool CZMQNotificationInterface::Initialize()
{
LogPrint("zmq", "Initialize notification interface\n");
assert(!pcontext);
pcontext = zmq_init(1);
if (!pcontext)
{
zmqError("Unable to initialize context");
return false;
}
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
for (; i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
if (i!=notifiers.end())
{
Shutdown();
return false;
}
return false;
}
// Called during shutdown sequence
void CZMQNotificationInterface::Shutdown()
{
LogPrint("zmq", "Shutdown notification interface\n");
if (pcontext)
{
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_destroy(pcontext);
pcontext = 0;
}
}
void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyBlock(hash))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyTransaction(tx))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}

View File

@@ -0,0 +1,35 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H
#include "validationinterface.h"
#include <string>
#include <map>
class CZMQAbstractNotifier;
class CZMQNotificationInterface : public CValidationInterface
{
public:
virtual ~CZMQNotificationInterface();
static CZMQNotificationInterface* CreateWithArguments(const std::map<std::string, std::string> &args);
bool Initialize();
void Shutdown();
protected: // CValidationInterface
void SyncTransaction(const CTransaction &tx, const CBlock *pblock);
void UpdatedBlockTip(const uint256 &newHashTip);
private:
CZMQNotificationInterface();
void *pcontext;
std::list<CZMQAbstractNotifier*> notifiers;
};
#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H

View File

@@ -0,0 +1,172 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqpublishnotifier.h"
#include "main.h"
#include "util.h"
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{
va_list args;
va_start(args, size);
while (1)
{
zmq_msg_t msg;
int rc = zmq_msg_init_size(&msg, size);
if (rc != 0)
{
zmqError("Unable to initialize ZMQ msg");
return -1;
}
void *buf = zmq_msg_data(&msg);
memcpy(buf, data, size);
data = va_arg(args, const void*);
rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
if (rc == -1)
{
zmqError("Unable to send ZMQ msg");
zmq_msg_close(&msg);
return -1;
}
zmq_msg_close(&msg);
if (!data)
break;
size = va_arg(args, size_t);
}
return 0;
}
bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
{
assert(!psocket);
// check if address is being used by other publish notifier
std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
if (i==mapPublishNotifiers.end())
{
psocket = zmq_socket(pcontext, ZMQ_PUB);
if (!psocket)
{
zmqError("Failed to create socket");
return false;
}
int rc = zmq_bind(psocket, address.c_str());
if (rc!=0)
{
zmqError("Failed to bind address");
return false;
}
// register this notifier for the address, so it can be reused for other publish notifier
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
else
{
LogPrint("zmq", " Reuse socket for address %s\n", address);
psocket = i->second->psocket;
mapPublishNotifiers.insert(std::make_pair(address, this));
return true;
}
}
void CZMQAbstractPublishNotifier::Shutdown()
{
assert(psocket);
int count = mapPublishNotifiers.count(address);
// remove this notifier from the list of publishers using this address
typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
for (iterator it = iterpair.first; it != iterpair.second; ++it)
{
if (it->second==this)
{
mapPublishNotifiers.erase(it);
break;
}
}
if (count == 1)
{
LogPrint("zmq", "Close socket at address %s\n", address);
int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket);
}
psocket = 0;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash)
{
LogPrint("zmq", "Publish hash block %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0);
return rc == 0;
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0);
return rc == 0;
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash)
{
LogPrint("zmq", "Publish raw block %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
CBlockIndex* pblockindex = mapBlockIndex[hash];
if(!ReadBlockFromDisk(block, pblockindex))
{
zmqError("Can't read block from disk");
return false;
}
ss << block;
}
int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0);
return rc == 0;
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0);
return rc == 0;
}

View File

@@ -0,0 +1,41 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H
#include "zmqabstractnotifier.h"
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{
public:
bool Initialize(void *pcontext);
void Shutdown();
};
class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlock(const uint256 &hash);
};
class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlock(const uint256 &hash);
};
class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyTransaction(const CTransaction &transaction);
};
#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H