net: add a new message queue for the message processor
This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split.
This commit is contained in:
parent
c5a8b1b946
commit
4d712e366c
3 changed files with 24 additions and 16 deletions
12
src/net.cpp
12
src/net.cpp
|
@ -1239,9 +1239,19 @@ void CConnman::ThreadSocketHandler()
|
|||
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
||||
pnode->CloseSocketDisconnect();
|
||||
RecordBytesRecv(nBytes);
|
||||
if (notify)
|
||||
if (notify) {
|
||||
auto it(pnode->vRecvMsg.begin());
|
||||
for (; it != pnode->vRecvMsg.end(); ++it) {
|
||||
if (!it->complete())
|
||||
break;
|
||||
}
|
||||
{
|
||||
LOCK(pnode->cs_vProcessMsg);
|
||||
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
||||
}
|
||||
WakeMessageHandler();
|
||||
}
|
||||
}
|
||||
else if (nBytes == 0)
|
||||
{
|
||||
// socket closed gracefully
|
||||
|
|
|
@ -608,6 +608,9 @@ public:
|
|||
std::deque<std::vector<unsigned char>> vSendMsg;
|
||||
CCriticalSection cs_vSend;
|
||||
|
||||
CCriticalSection cs_vProcessMsg;
|
||||
std::list<CNetMessage> vProcessMsg;
|
||||
|
||||
std::deque<CInv> vRecvGetData;
|
||||
std::list<CNetMessage> vRecvMsg;
|
||||
CCriticalSection cs_vRecvMsg;
|
||||
|
|
|
@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
|
|||
if (pfrom->nSendSize >= nMaxSendBufferSize)
|
||||
return false;
|
||||
|
||||
auto it = pfrom->vRecvMsg.begin();
|
||||
if (it == pfrom->vRecvMsg.end())
|
||||
std::list<CNetMessage> msgs;
|
||||
{
|
||||
LOCK(pfrom->cs_vProcessMsg);
|
||||
if (pfrom->vProcessMsg.empty())
|
||||
return false;
|
||||
|
||||
// end, if an incomplete message is found
|
||||
if (!it->complete())
|
||||
return false;
|
||||
|
||||
// get next message
|
||||
CNetMessage msg = std::move(*it);
|
||||
|
||||
// at this point, any failure means we can delete the current message
|
||||
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
|
||||
|
||||
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
|
||||
// Just take one message
|
||||
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
|
||||
fMoreWork = !pfrom->vProcessMsg.empty();
|
||||
}
|
||||
CNetMessage& msg(msgs.front());
|
||||
|
||||
msg.SetVersion(pfrom->GetRecvVersion());
|
||||
// Scan for message start
|
||||
|
|
Loading…
Reference in a new issue