net: add a flag to indicate when a node's process queue is full
Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue.
This commit is contained in:
parent
4d712e366c
commit
c6e8a9bcff
3 changed files with 11 additions and 12 deletions
10
src/net.cpp
10
src/net.cpp
|
@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler()
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
||||||
if (lockRecv && (
|
if (lockRecv && !pnode->fPauseRecv)
|
||||||
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
|
|
||||||
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
|
|
||||||
FD_SET(pnode->hSocket, &fdsetRecv);
|
FD_SET(pnode->hSocket, &fdsetRecv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler()
|
||||||
pnode->CloseSocketDisconnect();
|
pnode->CloseSocketDisconnect();
|
||||||
RecordBytesRecv(nBytes);
|
RecordBytesRecv(nBytes);
|
||||||
if (notify) {
|
if (notify) {
|
||||||
|
size_t nSizeAdded = 0;
|
||||||
auto it(pnode->vRecvMsg.begin());
|
auto it(pnode->vRecvMsg.begin());
|
||||||
for (; it != pnode->vRecvMsg.end(); ++it) {
|
for (; it != pnode->vRecvMsg.end(); ++it) {
|
||||||
if (!it->complete())
|
if (!it->complete())
|
||||||
break;
|
break;
|
||||||
|
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
LOCK(pnode->cs_vProcessMsg);
|
LOCK(pnode->cs_vProcessMsg);
|
||||||
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
||||||
|
pnode->nProcessQueueSize += nSizeAdded;
|
||||||
|
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
|
||||||
}
|
}
|
||||||
WakeMessageHandler();
|
WakeMessageHandler();
|
||||||
}
|
}
|
||||||
|
@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
|
||||||
minFeeFilter = 0;
|
minFeeFilter = 0;
|
||||||
lastSentFeeFilter = 0;
|
lastSentFeeFilter = 0;
|
||||||
nextSendTimeFeeFilter = 0;
|
nextSendTimeFeeFilter = 0;
|
||||||
|
fPauseRecv = false;
|
||||||
|
nProcessQueueSize = 0;
|
||||||
|
|
||||||
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
|
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
|
||||||
mapRecvBytesPerMsgCmd[msg] = 0;
|
mapRecvBytesPerMsgCmd[msg] = 0;
|
||||||
|
|
11
src/net.h
11
src/net.h
|
@ -610,6 +610,7 @@ public:
|
||||||
|
|
||||||
CCriticalSection cs_vProcessMsg;
|
CCriticalSection cs_vProcessMsg;
|
||||||
std::list<CNetMessage> vProcessMsg;
|
std::list<CNetMessage> vProcessMsg;
|
||||||
|
size_t nProcessQueueSize;
|
||||||
|
|
||||||
std::deque<CInv> vRecvGetData;
|
std::deque<CInv> vRecvGetData;
|
||||||
std::list<CNetMessage> vRecvMsg;
|
std::list<CNetMessage> vRecvMsg;
|
||||||
|
@ -650,6 +651,7 @@ public:
|
||||||
const NodeId id;
|
const NodeId id;
|
||||||
|
|
||||||
const uint64_t nKeyedNetGroup;
|
const uint64_t nKeyedNetGroup;
|
||||||
|
std::atomic_bool fPauseRecv;
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
mapMsgCmdSize mapSendBytesPerMsgCmd;
|
mapMsgCmdSize mapSendBytesPerMsgCmd;
|
||||||
|
@ -743,15 +745,6 @@ public:
|
||||||
return nRefCount;
|
return nRefCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
unsigned int GetTotalRecvSize()
|
|
||||||
{
|
|
||||||
unsigned int total = 0;
|
|
||||||
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
|
|
||||||
total += msg.vRecv.size() + 24;
|
|
||||||
return total;
|
|
||||||
}
|
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
// requires LOCK(cs_vRecvMsg)
|
||||||
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
|
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
|
||||||
|
|
||||||
|
|
|
@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
||||||
return false;
|
return false;
|
||||||
// Just take one message
|
// Just take one message
|
||||||
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
||||||
|
pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
|
||||||
|
pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
|
||||||
fMoreWork = !pfrom->vProcessMsg.empty();
|
fMoreWork = !pfrom->vProcessMsg.empty();
|
||||||
}
|
}
|
||||||
CNetMessage& msg(msgs.front());
|
CNetMessage& msg(msgs.front());
|
||||||
|
|
Loading…
Reference in a new issue