Process getdata invs separately until send buffer overflows
There exists a per-message-processed send buffer overflow protection, where processing is halted when the send buffer is larger than the allowed maximum. This protection does not apply to individual items, however, and getdata has the potential for causing large amounts of data to be sent. In case several hundreds of blocks are requested in one getdata, the send buffer can easily grow 50 megabytes above the send buffer limit. This commit breaks up the processing of getdata requests, remembering them inside a CNode when too many are requested at once.
This commit is contained in:
parent
41b052ad87
commit
c7f039b674
2 changed files with 117 additions and 94 deletions
210
src/main.cpp
210
src/main.cpp
|
@ -3029,6 +3029,115 @@ bool static AlreadyHave(const CInv& inv)
|
||||||
unsigned char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 };
|
unsigned char pchMessageStart[4] = { 0xf9, 0xbe, 0xb4, 0xd9 };
|
||||||
|
|
||||||
|
|
||||||
|
void static ProcessGetData(CNode* pfrom)
|
||||||
|
{
|
||||||
|
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
|
||||||
|
|
||||||
|
vector<CInv> vNotFound;
|
||||||
|
|
||||||
|
while (it != pfrom->vRecvGetData.end()) {
|
||||||
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
|
if (pfrom->nSendSize >= SendBufferSize())
|
||||||
|
break;
|
||||||
|
|
||||||
|
const CInv &inv = *it;
|
||||||
|
{
|
||||||
|
if (fShutdown)
|
||||||
|
break;
|
||||||
|
it++;
|
||||||
|
|
||||||
|
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK)
|
||||||
|
{
|
||||||
|
// Send block from disk
|
||||||
|
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.find(inv.hash);
|
||||||
|
if (mi != mapBlockIndex.end())
|
||||||
|
{
|
||||||
|
CBlock block;
|
||||||
|
block.ReadFromDisk((*mi).second);
|
||||||
|
if (inv.type == MSG_BLOCK)
|
||||||
|
pfrom->PushMessage("block", block);
|
||||||
|
else // MSG_FILTERED_BLOCK)
|
||||||
|
{
|
||||||
|
LOCK(pfrom->cs_filter);
|
||||||
|
if (pfrom->pfilter)
|
||||||
|
{
|
||||||
|
CMerkleBlock merkleBlock(block, *pfrom->pfilter);
|
||||||
|
pfrom->PushMessage("merkleblock", merkleBlock);
|
||||||
|
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
|
||||||
|
// This avoids hurting performance by pointlessly requiring a round-trip
|
||||||
|
// Note that there is currently no way for a node to request any single transactions we didnt send here -
|
||||||
|
// they must either disconnect and retry or request the full block.
|
||||||
|
// Thus, the protocol spec specified allows for us to provide duplicate txn here,
|
||||||
|
// however we MUST always provide at least what the remote peer needs
|
||||||
|
typedef std::pair<unsigned int, uint256> PairType;
|
||||||
|
BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn)
|
||||||
|
if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second)))
|
||||||
|
pfrom->PushMessage("tx", block.vtx[pair.first]);
|
||||||
|
}
|
||||||
|
// else
|
||||||
|
// no response
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trigger them to send a getblocks request for the next batch of inventory
|
||||||
|
if (inv.hash == pfrom->hashContinue)
|
||||||
|
{
|
||||||
|
// Bypass PushInventory, this must send even if redundant,
|
||||||
|
// and we want it right after the last block so they don't
|
||||||
|
// wait for other stuff first.
|
||||||
|
vector<CInv> vInv;
|
||||||
|
vInv.push_back(CInv(MSG_BLOCK, hashBestChain));
|
||||||
|
pfrom->PushMessage("inv", vInv);
|
||||||
|
pfrom->hashContinue = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (inv.IsKnownType())
|
||||||
|
{
|
||||||
|
// Send stream from relay memory
|
||||||
|
bool pushed = false;
|
||||||
|
{
|
||||||
|
LOCK(cs_mapRelay);
|
||||||
|
map<CInv, CDataStream>::iterator mi = mapRelay.find(inv);
|
||||||
|
if (mi != mapRelay.end()) {
|
||||||
|
pfrom->PushMessage(inv.GetCommand(), (*mi).second);
|
||||||
|
pushed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!pushed && inv.type == MSG_TX) {
|
||||||
|
LOCK(mempool.cs);
|
||||||
|
if (mempool.exists(inv.hash)) {
|
||||||
|
CTransaction tx = mempool.lookup(inv.hash);
|
||||||
|
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
|
||||||
|
ss.reserve(1000);
|
||||||
|
ss << tx;
|
||||||
|
pfrom->PushMessage("tx", ss);
|
||||||
|
pushed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!pushed) {
|
||||||
|
vNotFound.push_back(inv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track requests for our stuff.
|
||||||
|
Inventory(inv.hash);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pfrom->vRecvGetData.erase(pfrom->vRecvGetData.begin(), it);
|
||||||
|
|
||||||
|
if (!vNotFound.empty()) {
|
||||||
|
// Let the peer know that we didn't find what it asked for, so it doesn't
|
||||||
|
// have to wait around forever. Currently only SPV clients actually care
|
||||||
|
// about this message: it's needed when they are recursively walking the
|
||||||
|
// dependencies of relevant unconfirmed transactions. SPV clients want to
|
||||||
|
// do that because they want to know about (and store and rebroadcast and
|
||||||
|
// risk analyze) the dependencies of transactions relevant to them, without
|
||||||
|
// having to download the entire memory pool.
|
||||||
|
pfrom->PushMessage("notfound", vNotFound);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
||||||
{
|
{
|
||||||
RandAddSeedPerfmon();
|
RandAddSeedPerfmon();
|
||||||
|
@ -3302,101 +3411,11 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
|
||||||
if (fDebugNet || (vInv.size() != 1))
|
if (fDebugNet || (vInv.size() != 1))
|
||||||
printf("received getdata (%"PRIszu" invsz)\n", vInv.size());
|
printf("received getdata (%"PRIszu" invsz)\n", vInv.size());
|
||||||
|
|
||||||
vector<CInv> vNotFound;
|
if ((fDebugNet && vInv.size() > 0) || (vInv.size() == 1))
|
||||||
BOOST_FOREACH(const CInv& inv, vInv)
|
printf("received getdata for: %s\n", vInv[0].ToString().c_str());
|
||||||
{
|
|
||||||
if (fShutdown)
|
|
||||||
return true;
|
|
||||||
if (fDebugNet || (vInv.size() == 1))
|
|
||||||
printf("received getdata for: %s\n", inv.ToString().c_str());
|
|
||||||
|
|
||||||
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK)
|
pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
|
||||||
{
|
ProcessGetData(pfrom);
|
||||||
// Send block from disk
|
|
||||||
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.find(inv.hash);
|
|
||||||
if (mi != mapBlockIndex.end())
|
|
||||||
{
|
|
||||||
CBlock block;
|
|
||||||
block.ReadFromDisk((*mi).second);
|
|
||||||
if (inv.type == MSG_BLOCK)
|
|
||||||
pfrom->PushMessage("block", block);
|
|
||||||
else // MSG_FILTERED_BLOCK)
|
|
||||||
{
|
|
||||||
LOCK(pfrom->cs_filter);
|
|
||||||
if (pfrom->pfilter)
|
|
||||||
{
|
|
||||||
CMerkleBlock merkleBlock(block, *pfrom->pfilter);
|
|
||||||
pfrom->PushMessage("merkleblock", merkleBlock);
|
|
||||||
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
|
|
||||||
// This avoids hurting performance by pointlessly requiring a round-trip
|
|
||||||
// Note that there is currently no way for a node to request any single transactions we didnt send here -
|
|
||||||
// they must either disconnect and retry or request the full block.
|
|
||||||
// Thus, the protocol spec specified allows for us to provide duplicate txn here,
|
|
||||||
// however we MUST always provide at least what the remote peer needs
|
|
||||||
typedef std::pair<unsigned int, uint256> PairType;
|
|
||||||
BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn)
|
|
||||||
if (!pfrom->setInventoryKnown.count(CInv(MSG_TX, pair.second)))
|
|
||||||
pfrom->PushMessage("tx", block.vtx[pair.first]);
|
|
||||||
}
|
|
||||||
// else
|
|
||||||
// no response
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trigger them to send a getblocks request for the next batch of inventory
|
|
||||||
if (inv.hash == pfrom->hashContinue)
|
|
||||||
{
|
|
||||||
// Bypass PushInventory, this must send even if redundant,
|
|
||||||
// and we want it right after the last block so they don't
|
|
||||||
// wait for other stuff first.
|
|
||||||
vector<CInv> vInv;
|
|
||||||
vInv.push_back(CInv(MSG_BLOCK, hashBestChain));
|
|
||||||
pfrom->PushMessage("inv", vInv);
|
|
||||||
pfrom->hashContinue = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (inv.IsKnownType())
|
|
||||||
{
|
|
||||||
// Send stream from relay memory
|
|
||||||
bool pushed = false;
|
|
||||||
{
|
|
||||||
LOCK(cs_mapRelay);
|
|
||||||
map<CInv, CDataStream>::iterator mi = mapRelay.find(inv);
|
|
||||||
if (mi != mapRelay.end()) {
|
|
||||||
pfrom->PushMessage(inv.GetCommand(), (*mi).second);
|
|
||||||
pushed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!pushed && inv.type == MSG_TX) {
|
|
||||||
LOCK(mempool.cs);
|
|
||||||
if (mempool.exists(inv.hash)) {
|
|
||||||
CTransaction tx = mempool.lookup(inv.hash);
|
|
||||||
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
|
|
||||||
ss.reserve(1000);
|
|
||||||
ss << tx;
|
|
||||||
pfrom->PushMessage("tx", ss);
|
|
||||||
pushed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!pushed) {
|
|
||||||
vNotFound.push_back(inv);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Track requests for our stuff.
|
|
||||||
Inventory(inv.hash);
|
|
||||||
|
|
||||||
if (!vNotFound.empty()) {
|
|
||||||
// Let the peer know that we didn't find what it asked for, so it doesn't
|
|
||||||
// have to wait around forever. Currently only SPV clients actually care
|
|
||||||
// about this message: it's needed when they are recursively walking the
|
|
||||||
// dependencies of relevant unconfirmed transactions. SPV clients want to
|
|
||||||
// do that because they want to know about (and store and rebroadcast and
|
|
||||||
// risk analyze) the dependencies of transactions relevant to them, without
|
|
||||||
// having to download the entire memory pool.
|
|
||||||
pfrom->PushMessage("notfound", vNotFound);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3721,6 +3740,9 @@ bool ProcessMessages(CNode* pfrom)
|
||||||
//
|
//
|
||||||
bool fOk = true;
|
bool fOk = true;
|
||||||
|
|
||||||
|
if (!pfrom->vRecvGetData.empty())
|
||||||
|
ProcessGetData(pfrom);
|
||||||
|
|
||||||
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
|
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
|
||||||
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
|
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
|
||||||
// Don't bother if send buffer is too full to respond anyway
|
// Don't bother if send buffer is too full to respond anyway
|
||||||
|
|
|
@ -179,6 +179,7 @@ public:
|
||||||
std::deque<CSerializeData> vSendMsg;
|
std::deque<CSerializeData> vSendMsg;
|
||||||
CCriticalSection cs_vSend;
|
CCriticalSection cs_vSend;
|
||||||
|
|
||||||
|
std::deque<CInv> vRecvGetData;
|
||||||
std::deque<CNetMessage> vRecvMsg;
|
std::deque<CNetMessage> vRecvMsg;
|
||||||
CCriticalSection cs_vRecvMsg;
|
CCriticalSection cs_vRecvMsg;
|
||||||
int nRecvVersion;
|
int nRecvVersion;
|
||||||
|
|
Loading…
Reference in a new issue