net: remove cs_vRecvMsg
vRecvMsg is now only touched by the socket handler thread. The accounting vars (nRecvBytes/nLastRecv/mapRecvBytesPerMsgCmd) are also only used by the socket handler thread, with the exception of queries from rpc/gui. These accesses are not threadsafe, but they never were. This needs to be addressed separately. Also, update comment describing data flow
This commit is contained in:
parent
991955ee81
commit
e60360e139
3 changed files with 8 additions and 30 deletions
33
src/net.cpp
33
src/net.cpp
|
@ -644,7 +644,6 @@ void CNode::copyStats(CNodeStats &stats)
|
||||||
}
|
}
|
||||||
#undef X
|
#undef X
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
|
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
|
||||||
{
|
{
|
||||||
complete = false;
|
complete = false;
|
||||||
|
@ -1080,13 +1079,9 @@ void CConnman::ThreadSocketHandler()
|
||||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||||
if (lockSend)
|
if (lockSend)
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
|
||||||
if (lockRecv)
|
|
||||||
{
|
|
||||||
TRY_LOCK(pnode->cs_inventory, lockInv);
|
TRY_LOCK(pnode->cs_inventory, lockInv);
|
||||||
if (lockInv)
|
if (lockInv)
|
||||||
fDelete = true;
|
fDelete = true;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (fDelete)
|
if (fDelete)
|
||||||
|
@ -1146,15 +1141,10 @@ void CConnman::ThreadSocketHandler()
|
||||||
// write buffer in this case before receiving more. This avoids
|
// write buffer in this case before receiving more. This avoids
|
||||||
// needlessly queueing received data, if the remote peer is not themselves
|
// needlessly queueing received data, if the remote peer is not themselves
|
||||||
// receiving data. This means properly utilizing TCP flow control signalling.
|
// receiving data. This means properly utilizing TCP flow control signalling.
|
||||||
// * Otherwise, if there is no (complete) message in the receive buffer,
|
// * Otherwise, if there is space left in the receive buffer, select() for
|
||||||
// or there is space left in the buffer, select() for receiving data.
|
// receiving data.
|
||||||
// * (if neither of the above applies, there is certainly one message
|
// * Hand off all complete messages to the processor, to be handled without
|
||||||
// in the receiver buffer ready to be processed).
|
// blocking here.
|
||||||
// Together, that means that at least one of the following is always possible,
|
|
||||||
// so we don't deadlock:
|
|
||||||
// * We send some data.
|
|
||||||
// * We wait for data to be received (and disconnect after timeout).
|
|
||||||
// * We process a message in the buffer (message handler thread).
|
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||||
if (lockSend) {
|
if (lockSend) {
|
||||||
|
@ -1165,8 +1155,7 @@ void CConnman::ThreadSocketHandler()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
if (!pnode->fPauseRecv)
|
||||||
if (lockRecv && !pnode->fPauseRecv)
|
|
||||||
FD_SET(pnode->hSocket, &fdsetRecv);
|
FD_SET(pnode->hSocket, &fdsetRecv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1225,8 +1214,6 @@ void CConnman::ThreadSocketHandler()
|
||||||
continue;
|
continue;
|
||||||
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
|
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
|
||||||
{
|
{
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
|
||||||
if (lockRecv)
|
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
// typical socket buffer is 8K-64K
|
// typical socket buffer is 8K-64K
|
||||||
|
@ -1865,14 +1852,8 @@ void CConnman::ThreadMessageHandler()
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
// Receive messages
|
// Receive messages
|
||||||
{
|
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
|
||||||
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
|
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
|
||||||
if (lockRecv)
|
|
||||||
{
|
|
||||||
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
|
|
||||||
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (flagInterruptMsgProc)
|
if (flagInterruptMsgProc)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
|
@ -613,8 +613,6 @@ public:
|
||||||
size_t nProcessQueueSize;
|
size_t nProcessQueueSize;
|
||||||
|
|
||||||
std::deque<CInv> vRecvGetData;
|
std::deque<CInv> vRecvGetData;
|
||||||
std::list<CNetMessage> vRecvMsg;
|
|
||||||
CCriticalSection cs_vRecvMsg;
|
|
||||||
uint64_t nRecvBytes;
|
uint64_t nRecvBytes;
|
||||||
std::atomic<int> nRecvVersion;
|
std::atomic<int> nRecvVersion;
|
||||||
|
|
||||||
|
@ -726,6 +724,7 @@ private:
|
||||||
const ServiceFlags nLocalServices;
|
const ServiceFlags nLocalServices;
|
||||||
const int nMyStartingHeight;
|
const int nMyStartingHeight;
|
||||||
int nSendVersion;
|
int nSendVersion;
|
||||||
|
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
|
||||||
public:
|
public:
|
||||||
|
|
||||||
NodeId GetId() const {
|
NodeId GetId() const {
|
||||||
|
@ -746,7 +745,6 @@ public:
|
||||||
return nRefCount;
|
return nRefCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
|
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
|
||||||
|
|
||||||
void SetRecvVersion(int nVersionIn)
|
void SetRecvVersion(int nVersionIn)
|
||||||
|
|
|
@ -2439,7 +2439,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// requires LOCK(cs_vRecvMsg)
|
|
||||||
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
|
||||||
{
|
{
|
||||||
const CChainParams& chainparams = Params();
|
const CChainParams& chainparams = Params();
|
||||||
|
|
Loading…
Add table
Reference in a new issue