Some fixes to CNetMessage processing

* Change CNode::vRecvMsg to be a deque instead of a vector (less copying)
* Make sure to acquire cs_vRecvMsg in CNode::CloseSocketDisconnect (as it
  may be called without that lock).
This commit is contained in:
Pieter Wuille 2013-03-01 01:41:28 +01:00 committed by Pieter Wuille
parent b9ff2970b9
commit 967f24590b
3 changed files with 28 additions and 21 deletions

View file

@ -3708,8 +3708,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv)
// requires LOCK(cs_vRecvMsg) // requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom) bool ProcessMessages(CNode* pfrom)
{ {
if (pfrom->vRecvMsg.empty())
return true;
//if (fDebug) //if (fDebug)
// printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size()); // printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size());
@ -3721,29 +3719,34 @@ bool ProcessMessages(CNode* pfrom)
// (4) checksum // (4) checksum
// (x) data // (x) data
// //
bool fOk = true;
unsigned int nMsgPos = 0; std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
for (; nMsgPos < pfrom->vRecvMsg.size(); nMsgPos++) while (it != pfrom->vRecvMsg.end()) {
{
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->vSend.size() >= SendBufferSize()) if (pfrom->vSend.size() >= SendBufferSize())
break; break;
// get next message; end, if an incomplete message is found // get next message
CNetMessage& msg = pfrom->vRecvMsg[nMsgPos]; CNetMessage& msg = *it;
//if (fDebug) //if (fDebug)
// printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n", // printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n",
// msg.hdr.nMessageSize, msg.vRecv.size(), // msg.hdr.nMessageSize, msg.vRecv.size(),
// msg.complete() ? "Y" : "N"); // msg.complete() ? "Y" : "N");
// end, if an incomplete message is found
if (!msg.complete()) if (!msg.complete())
break; break;
// at this point, any failure means we can delete the current message
it++;
// Scan for message start // Scan for message start
if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) { if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) {
printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n"); printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n");
return false; fOk = false;
break;
} }
// Read header // Read header
@ -3779,7 +3782,7 @@ bool ProcessMessages(CNode* pfrom)
fRet = ProcessMessage(pfrom, strCommand, vRecv); fRet = ProcessMessage(pfrom, strCommand, vRecv);
} }
if (fShutdown) if (fShutdown)
return true; break;
} }
catch (std::ios_base::failure& e) catch (std::ios_base::failure& e)
{ {
@ -3808,11 +3811,8 @@ bool ProcessMessages(CNode* pfrom)
printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize);
} }
// remove processed messages; one incomplete message may remain pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
if (nMsgPos > 0) return fOk;
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(),
pfrom->vRecvMsg.begin() + nMsgPos);
return true;
} }

View file

@ -536,7 +536,11 @@ void CNode::CloseSocketDisconnect()
printf("disconnecting node %s\n", addrName.c_str()); printf("disconnecting node %s\n", addrName.c_str());
closesocket(hSocket); closesocket(hSocket);
hSocket = INVALID_SOCKET; hSocket = INVALID_SOCKET;
vRecvMsg.clear();
// in case this fails, we'll empty the recv buffer when the CNode is deleted
TRY_LOCK(cs_vRecvMsg, lockRecv);
if (lockRecv)
vRecvMsg.clear();
} }
} }
@ -634,7 +638,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)
while (nBytes > 0) { while (nBytes > 0) {
// get current incomplete message, or create a new one // get current incomplete message, or create a new one
if (vRecvMsg.size() == 0 || if (vRecvMsg.empty() ||
vRecvMsg.back().complete()) vRecvMsg.back().complete())
vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion));
@ -1767,6 +1771,9 @@ void ThreadMessageHandler2(void* parg)
pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
{ {
if (pnode->fDisconnect)
continue;
// Receive messages // Receive messages
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);

View file

@ -176,7 +176,7 @@ public:
CDataStream vSend; CDataStream vSend;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
std::vector<CNetMessage> vRecvMsg; std::deque<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg; CCriticalSection cs_vRecvMsg;
int nRecvVersion; int nRecvVersion;
@ -297,8 +297,8 @@ public:
unsigned int GetTotalRecvSize() unsigned int GetTotalRecvSize()
{ {
unsigned int total = 0; unsigned int total = 0;
for (unsigned int i = 0; i < vRecvMsg.size(); i++) BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
total += vRecvMsg[i].vRecv.size(); total += msg.vRecv.size() + 24;
return total; return total;
} }
@ -309,8 +309,8 @@ public:
void SetRecvVersion(int nVersionIn) void SetRecvVersion(int nVersionIn)
{ {
nRecvVersion = nVersionIn; nRecvVersion = nVersionIn;
for (unsigned int i = 0; i < vRecvMsg.size(); i++) BOOST_FOREACH(CNetMessage &msg, vRecvMsg)
vRecvMsg[i].SetVersion(nVersionIn); msg.SetVersion(nVersionIn);
} }
CNode* AddRef(int64 nTimeout=0) CNode* AddRef(int64 nTimeout=0)