Merge #9698: net: fix socket close race

9a0b784 net: add a lock around hSocket (Cory Fields)
45e2e08 net: rearrange so that socket accesses can be grouped together (Cory Fields)
This commit is contained in:
Wladimir J. van der Laan 2017-02-10 12:38:45 +01:00
commit 2447c1024e
No known key found for this signature in database
GPG key ID: 74810B012346C9A6
2 changed files with 49 additions and 26 deletions

View file

@ -425,6 +425,7 @@ void CConnman::DumpBanlist()
void CNode::CloseSocketDisconnect() void CNode::CloseSocketDisconnect()
{ {
fDisconnect = true; fDisconnect = true;
LOCK(cs_hSocket);
if (hSocket != INVALID_SOCKET) if (hSocket != INVALID_SOCKET)
{ {
LogPrint("net", "disconnecting peer=%d\n", id); LogPrint("net", "disconnecting peer=%d\n", id);
@ -789,7 +790,13 @@ size_t CConnman::SocketSendData(CNode *pnode) const
while (it != pnode->vSendMsg.end()) { while (it != pnode->vSendMsg.end()) {
const auto &data = *it; const auto &data = *it;
assert(data.size() > pnode->nSendOffset); assert(data.size() > pnode->nSendOffset);
int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); int nBytes = 0;
{
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
break;
nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
}
if (nBytes > 0) { if (nBytes > 0) {
pnode->nLastSend = GetSystemTimeInSeconds(); pnode->nLastSend = GetSystemTimeInSeconds();
pnode->nSendBytes += nBytes; pnode->nSendBytes += nBytes;
@ -1148,12 +1155,6 @@ void CConnman::ThreadSocketHandler()
LOCK(cs_vNodes); LOCK(cs_vNodes);
BOOST_FOREACH(CNode* pnode, vNodes) BOOST_FOREACH(CNode* pnode, vNodes)
{ {
if (pnode->hSocket == INVALID_SOCKET)
continue;
FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = std::max(hSocketMax, pnode->hSocket);
have_fds = true;
// Implement the following logic: // Implement the following logic:
// * If there is data to send, select() for sending data. As this only // * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the // happens when optimistic write failed, we choose to first drain the
@ -1164,16 +1165,28 @@ void CConnman::ThreadSocketHandler()
// receiving data. // receiving data.
// * Hand off all complete messages to the processor, to be handled without // * Hand off all complete messages to the processor, to be handled without
// blocking here. // blocking here.
bool select_recv = !pnode->fPauseRecv;
bool select_send;
{ {
LOCK(pnode->cs_vSend); LOCK(pnode->cs_vSend);
if (!pnode->vSendMsg.empty()) { select_send = !pnode->vSendMsg.empty();
FD_SET(pnode->hSocket, &fdsetSend);
continue;
}
} }
{
if (!pnode->fPauseRecv) LOCK(pnode->cs_hSocket);
FD_SET(pnode->hSocket, &fdsetRecv); if (pnode->hSocket == INVALID_SOCKET)
continue;
FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = std::max(hSocketMax, pnode->hSocket);
have_fds = true;
if (select_send) {
FD_SET(pnode->hSocket, &fdsetSend);
continue;
}
if (select_recv) {
FD_SET(pnode->hSocket, &fdsetRecv);
} }
} }
} }
@ -1227,15 +1240,30 @@ void CConnman::ThreadSocketHandler()
// //
// Receive // Receive
// //
if (pnode->hSocket == INVALID_SOCKET) bool recvSet = false;
continue; bool sendSet = false;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) bool errorSet = false;
{
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
}
if (recvSet || errorSet)
{ {
{ {
{ {
// typical socket buffer is 8K-64K // typical socket buffer is 8K-64K
char pchBuf[0x10000]; char pchBuf[0x10000];
int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); int nBytes = 0;
{
LOCK(pnode->cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET)
continue;
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
}
if (nBytes > 0) if (nBytes > 0)
{ {
bool notify = false; bool notify = false;
@ -1284,9 +1312,7 @@ void CConnman::ThreadSocketHandler()
// //
// Send // Send
// //
if (pnode->hSocket == INVALID_SOCKET) if (sendSet)
continue;
if (FD_ISSET(pnode->hSocket, &fdsetSend))
{ {
LOCK(pnode->cs_vSend); LOCK(pnode->cs_vSend);
size_t nBytes = SocketSendData(pnode); size_t nBytes = SocketSendData(pnode);
@ -2275,8 +2301,7 @@ void CConnman::Stop()
// Close sockets // Close sockets
BOOST_FOREACH(CNode* pnode, vNodes) BOOST_FOREACH(CNode* pnode, vNodes)
if (pnode->hSocket != INVALID_SOCKET) pnode->CloseSocketDisconnect();
CloseSocket(pnode->hSocket);
BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket) BOOST_FOREACH(ListenSocket& hListenSocket, vhListenSocket)
if (hListenSocket.socket != INVALID_SOCKET) if (hListenSocket.socket != INVALID_SOCKET)
if (!CloseSocket(hListenSocket.socket)) if (!CloseSocket(hListenSocket.socket))
@ -2677,9 +2702,6 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0; size_t nBytesSent = 0;
{ {
LOCK(pnode->cs_vSend); LOCK(pnode->cs_vSend);
if(pnode->hSocket == INVALID_SOCKET) {
return;
}
bool optimisticSend(pnode->vSendMsg.empty()); bool optimisticSend(pnode->vSendMsg.empty());
//log total amount of bytes per command //log total amount of bytes per command

View file

@ -572,6 +572,7 @@ public:
uint64_t nSendBytes; uint64_t nSendBytes;
std::deque<std::vector<unsigned char>> vSendMsg; std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
CCriticalSection cs_hSocket;
CCriticalSection cs_vProcessMsg; CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg; std::list<CNetMessage> vProcessMsg;