net: add a new message queue for the message processor

This separates the storage of messages from the net and queued messages for
processing, allowing the locks to be split.
This commit is contained in:
Cory Fields 2016-12-31 02:05:28 -05:00
parent c5a8b1b946
commit 4d712e366c
3 changed files with 24 additions and 16 deletions

View file

@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
if (notify)
if (notify) {
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
}
WakeMessageHandler();
}
}
else if (nBytes == 0)
{

View file

@ -608,6 +608,9 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;
CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;

View file

@ -2468,21 +2468,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (pfrom->nSendSize >= nMaxSendBufferSize)
return false;
auto it = pfrom->vRecvMsg.begin();
if (it == pfrom->vRecvMsg.end())
return false;
// end, if an incomplete message is found
if (!it->complete())
return false;
// get next message
CNetMessage msg = std::move(*it);
// at this point, any failure means we can delete the current message
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());
fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
std::list<CNetMessage> msgs;
{
LOCK(pfrom->cs_vProcessMsg);
if (pfrom->vProcessMsg.empty())
return false;
// Just take one message
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
fMoreWork = !pfrom->vProcessMsg.empty();
}
CNetMessage& msg(msgs.front());
msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start