Replace trickle nodes with per-node/message Poisson delays

We used to have a trickle node, a node which was chosen in each iteration of
the send loop that was privileged and allowed to send out queued up non-time
critical messages. Since the removal of the fixed sleeps in the network code,
this resulted in fast and attackable treatment of such broadcasts.

This pull request changes the 3 remaining trickle use cases by random delays:
* Local address broadcast (while also removing the the wiping of the seen filter)
* Address relay
* Inv relay (for transactions; blocks are always relayed immediately)

The code is based on older commits by Patrick Strateman.
This commit is contained in:
Pieter Wuille 2015-04-08 11:20:00 -07:00
parent 9ee02cf564
commit 5400ef6bcb
5 changed files with 47 additions and 36 deletions

View file

@ -5326,7 +5326,7 @@ bool ProcessMessages(CNode* pfrom)
} }
bool SendMessages(CNode* pto, bool fSendTrickle) bool SendMessages(CNode* pto)
{ {
const Consensus::Params& consensusParams = Params().GetConsensus(); const Consensus::Params& consensusParams = Params().GetConsensus();
{ {
@ -5368,28 +5368,17 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
return true; return true;
// Address refresh broadcast // Address refresh broadcast
static int64_t nLastRebroadcast; int64_t nNow = GetTimeMicros();
if (!IsInitialBlockDownload() && (GetTime() - nLastRebroadcast > 24 * 60 * 60)) if (!IsInitialBlockDownload() && pto->nNextLocalAddrSend < nNow) {
{ AdvertizeLocal(pto);
LOCK(cs_vNodes); pto->nNextLocalAddrSend = PoissonNextSend(nNow, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL);
BOOST_FOREACH(CNode* pnode, vNodes)
{
// Periodically clear addrKnown to allow refresh broadcasts
if (nLastRebroadcast)
pnode->addrKnown.reset();
// Rebroadcast our address
AdvertizeLocal(pnode);
}
if (!vNodes.empty())
nLastRebroadcast = GetTime();
} }
// //
// Message: addr // Message: addr
// //
if (fSendTrickle) if (pto->nNextAddrSend < nNow) {
{ pto->nNextAddrSend = PoissonNextSend(nNow, AVG_ADDRESS_BROADCAST_INTERVAL);
vector<CAddress> vAddr; vector<CAddress> vAddr;
vAddr.reserve(pto->vAddrToSend.size()); vAddr.reserve(pto->vAddrToSend.size());
BOOST_FOREACH(const CAddress& addr, pto->vAddrToSend) BOOST_FOREACH(const CAddress& addr, pto->vAddrToSend)
@ -5563,8 +5552,13 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
vector<CInv> vInv; vector<CInv> vInv;
vector<CInv> vInvWait; vector<CInv> vInvWait;
{ {
bool fSendTrickle = pto->fWhitelisted;
if (pto->nNextInvSend < nNow) {
fSendTrickle = true;
pto->nNextInvSend = PoissonNextSend(nNow, AVG_INVENTORY_BROADCAST_INTERVAL);
}
LOCK(pto->cs_inventory); LOCK(pto->cs_inventory);
vInv.reserve(pto->vInventoryToSend.size()); vInv.reserve(std::min<size_t>(1000, pto->vInventoryToSend.size()));
vInvWait.reserve(pto->vInventoryToSend.size()); vInvWait.reserve(pto->vInventoryToSend.size());
BOOST_FOREACH(const CInv& inv, pto->vInventoryToSend) BOOST_FOREACH(const CInv& inv, pto->vInventoryToSend)
{ {
@ -5604,7 +5598,7 @@ bool SendMessages(CNode* pto, bool fSendTrickle)
pto->PushMessage(NetMsgType::INV, vInv); pto->PushMessage(NetMsgType::INV, vInv);
// Detect whether we're stalling // Detect whether we're stalling
int64_t nNow = GetTimeMicros(); nNow = GetTimeMicros();
if (!pto->fDisconnect && state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { if (!pto->fDisconnect && state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
// Stalling only triggers when the block download window cannot move. During normal steady state, // Stalling only triggers when the block download window cannot move. During normal steady state,
// the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection

View file

@ -87,6 +87,14 @@ static const unsigned int DATABASE_WRITE_INTERVAL = 60 * 60;
static const unsigned int DATABASE_FLUSH_INTERVAL = 24 * 60 * 60; static const unsigned int DATABASE_FLUSH_INTERVAL = 24 * 60 * 60;
/** Maximum length of reject messages. */ /** Maximum length of reject messages. */
static const unsigned int MAX_REJECT_MESSAGE_LENGTH = 111; static const unsigned int MAX_REJECT_MESSAGE_LENGTH = 111;
/** Average delay between local address broadcasts in seconds. */
static const unsigned int AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL = 24 * 24 * 60;
/** Average delay between peer address broadcasts in seconds. */
static const unsigned int AVG_ADDRESS_BROADCAST_INTERVAL = 30;
/** Average delay between trickled inventory broadcasts in seconds.
* Blocks, whitelisted receivers, and a random 25% of transactions bypass this. */
static const unsigned int AVG_INVENTORY_BROADCAST_INTERVAL = 5;
static const unsigned int DEFAULT_LIMITFREERELAY = 15; static const unsigned int DEFAULT_LIMITFREERELAY = 15;
static const bool DEFAULT_RELAYPRIORITY = true; static const bool DEFAULT_RELAYPRIORITY = true;
@ -197,9 +205,8 @@ bool ProcessMessages(CNode* pfrom);
* Send queued protocol messages to be sent to a give node. * Send queued protocol messages to be sent to a give node.
* *
* @param[in] pto The node which we are sending messages to. * @param[in] pto The node which we are sending messages to.
* @param[in] fSendTrickle When true send the trickled data, otherwise trickle the data until true.
*/ */
bool SendMessages(CNode* pto, bool fSendTrickle); bool SendMessages(CNode* pto);
/** Run an instance of the script checking thread */ /** Run an instance of the script checking thread */
void ThreadScriptCheck(); void ThreadScriptCheck();
/** Try to detect Partition (network isolation) attacks against us */ /** Try to detect Partition (network isolation) attacks against us */

View file

@ -36,6 +36,8 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <math.h>
// Dump addresses to peers.dat every 15 minutes (900s) // Dump addresses to peers.dat every 15 minutes (900s)
#define DUMP_ADDRESSES_INTERVAL 900 #define DUMP_ADDRESSES_INTERVAL 900
@ -1733,11 +1735,6 @@ void ThreadMessageHandler()
} }
} }
// Poll the connected nodes for messages
CNode* pnodeTrickle = NULL;
if (!vNodesCopy.empty())
pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
bool fSleep = true; bool fSleep = true;
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
@ -1768,7 +1765,7 @@ void ThreadMessageHandler()
{ {
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) if (lockSend)
g_signals.SendMessages(pnode, pnode == pnodeTrickle || pnode->fWhitelisted); g_signals.SendMessages(pnode);
} }
boost::this_thread::interruption_point(); boost::this_thread::interruption_point();
} }
@ -2384,6 +2381,9 @@ CNode::CNode(SOCKET hSocketIn, const CAddress& addrIn, const std::string& addrNa
nStartingHeight = -1; nStartingHeight = -1;
filterInventoryKnown.reset(); filterInventoryKnown.reset();
fGetAddr = false; fGetAddr = false;
nNextLocalAddrSend = 0;
nNextAddrSend = 0;
nNextInvSend = 0;
fRelayTxes = false; fRelayTxes = false;
pfilter = new CBloomFilter(); pfilter = new CBloomFilter();
nPingNonceSent = 0; nPingNonceSent = 0;
@ -2634,3 +2634,7 @@ void DumpBanlist()
LogPrint("net", "Flushed %d banned node ips/subnets to banlist.dat %dms\n", LogPrint("net", "Flushed %d banned node ips/subnets to banlist.dat %dms\n",
banmap.size(), GetTimeMillis() - nStart); banmap.size(), GetTimeMillis() - nStart);
} }
int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) {
return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5);
}

View file

@ -113,7 +113,7 @@ struct CNodeSignals
{ {
boost::signals2::signal<int ()> GetHeight; boost::signals2::signal<int ()> GetHeight;
boost::signals2::signal<bool (CNode*), CombinerAll> ProcessMessages; boost::signals2::signal<bool (CNode*), CombinerAll> ProcessMessages;
boost::signals2::signal<bool (CNode*, bool), CombinerAll> SendMessages; boost::signals2::signal<bool (CNode*), CombinerAll> SendMessages;
boost::signals2::signal<void (NodeId, const CNode*)> InitializeNode; boost::signals2::signal<void (NodeId, const CNode*)> InitializeNode;
boost::signals2::signal<void (NodeId)> FinalizeNode; boost::signals2::signal<void (NodeId)> FinalizeNode;
}; };
@ -391,6 +391,8 @@ public:
CRollingBloomFilter addrKnown; CRollingBloomFilter addrKnown;
bool fGetAddr; bool fGetAddr;
std::set<uint256> setKnown; std::set<uint256> setKnown;
int64_t nNextAddrSend;
int64_t nNextLocalAddrSend;
// inventory based relay // inventory based relay
CRollingBloomFilter filterInventoryKnown; CRollingBloomFilter filterInventoryKnown;
@ -398,6 +400,7 @@ public:
CCriticalSection cs_inventory; CCriticalSection cs_inventory;
std::set<uint256> setAskFor; std::set<uint256> setAskFor;
std::multimap<int64_t, CInv> mapAskFor; std::multimap<int64_t, CInv> mapAskFor;
int64_t nNextInvSend;
// Used for headers announcements - unfiltered blocks to relay // Used for headers announcements - unfiltered blocks to relay
// Also protected by cs_inventory // Also protected by cs_inventory
std::vector<uint256> vBlockHashesToAnnounce; std::vector<uint256> vBlockHashesToAnnounce;
@ -791,4 +794,7 @@ public:
void DumpBanlist(); void DumpBanlist();
/** Return a timestamp in the future (in microseconds) for exponentially distributed events. */
int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds);
#endif // BITCOIN_NET_H #endif // BITCOIN_NET_H

View file

@ -49,7 +49,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
CNode dummyNode1(INVALID_SOCKET, addr1, "", true); CNode dummyNode1(INVALID_SOCKET, addr1, "", true);
dummyNode1.nVersion = 1; dummyNode1.nVersion = 1;
Misbehaving(dummyNode1.GetId(), 100); // Should get banned Misbehaving(dummyNode1.GetId(), 100); // Should get banned
SendMessages(&dummyNode1, false); SendMessages(&dummyNode1);
BOOST_CHECK(CNode::IsBanned(addr1)); BOOST_CHECK(CNode::IsBanned(addr1));
BOOST_CHECK(!CNode::IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned BOOST_CHECK(!CNode::IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned
@ -57,11 +57,11 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
CNode dummyNode2(INVALID_SOCKET, addr2, "", true); CNode dummyNode2(INVALID_SOCKET, addr2, "", true);
dummyNode2.nVersion = 1; dummyNode2.nVersion = 1;
Misbehaving(dummyNode2.GetId(), 50); Misbehaving(dummyNode2.GetId(), 50);
SendMessages(&dummyNode2, false); SendMessages(&dummyNode2);
BOOST_CHECK(!CNode::IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(!CNode::IsBanned(addr2)); // 2 not banned yet...
BOOST_CHECK(CNode::IsBanned(addr1)); // ... but 1 still should be BOOST_CHECK(CNode::IsBanned(addr1)); // ... but 1 still should be
Misbehaving(dummyNode2.GetId(), 50); Misbehaving(dummyNode2.GetId(), 50);
SendMessages(&dummyNode2, false); SendMessages(&dummyNode2);
BOOST_CHECK(CNode::IsBanned(addr2)); BOOST_CHECK(CNode::IsBanned(addr2));
} }
@ -73,13 +73,13 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
CNode dummyNode1(INVALID_SOCKET, addr1, "", true); CNode dummyNode1(INVALID_SOCKET, addr1, "", true);
dummyNode1.nVersion = 1; dummyNode1.nVersion = 1;
Misbehaving(dummyNode1.GetId(), 100); Misbehaving(dummyNode1.GetId(), 100);
SendMessages(&dummyNode1, false); SendMessages(&dummyNode1);
BOOST_CHECK(!CNode::IsBanned(addr1)); BOOST_CHECK(!CNode::IsBanned(addr1));
Misbehaving(dummyNode1.GetId(), 10); Misbehaving(dummyNode1.GetId(), 10);
SendMessages(&dummyNode1, false); SendMessages(&dummyNode1);
BOOST_CHECK(!CNode::IsBanned(addr1)); BOOST_CHECK(!CNode::IsBanned(addr1));
Misbehaving(dummyNode1.GetId(), 1); Misbehaving(dummyNode1.GetId(), 1);
SendMessages(&dummyNode1, false); SendMessages(&dummyNode1);
BOOST_CHECK(CNode::IsBanned(addr1)); BOOST_CHECK(CNode::IsBanned(addr1));
mapArgs.erase("-banscore"); mapArgs.erase("-banscore");
} }
@ -95,7 +95,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
dummyNode.nVersion = 1; dummyNode.nVersion = 1;
Misbehaving(dummyNode.GetId(), 100); Misbehaving(dummyNode.GetId(), 100);
SendMessages(&dummyNode, false); SendMessages(&dummyNode);
BOOST_CHECK(CNode::IsBanned(addr)); BOOST_CHECK(CNode::IsBanned(addr));
SetMockTime(nStartTime+60*60); SetMockTime(nStartTime+60*60);