Merge #14335: net: refactor: cleanup ThreadSocketHandler
032488e6e7
Move SocketHandler logic to private method. (Patrick Strateman)2af9cff11a
Move InactivityCheck logic to private method. (Patrick Strateman)7479b63d91
Move DisconnectNodes logic to private method. (Patrick Strateman)edb5350c32
Move NotifyNumConnectionsChanged logic to private method. (Patrick Strateman) Pull request description: Working towards using poll() on unix like systems. A number of small changes designed to separate the actual socket handling from the rest of the logic in ThreadSocketHandler. This is a simpler version of #14147 Tree-SHA512: 72f35c8ef7649019dcbfe19537d8c9f7e3d0fc5854dc691a70c5573352230fc31c3f55565820c632e9b8cb3c55b878bed19e0ad9423100762197ac35967d8067
This commit is contained in:
commit
23419e4c49
2 changed files with 278 additions and 260 deletions
533
src/net.cpp
533
src/net.cpp
|
@ -1153,310 +1153,322 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
|
|||
}
|
||||
}
|
||||
|
||||
void CConnman::ThreadSocketHandler()
|
||||
void CConnman::DisconnectNodes()
|
||||
{
|
||||
unsigned int nPrevNodeCount = 0;
|
||||
while (!interruptNet)
|
||||
{
|
||||
//
|
||||
// Disconnect nodes
|
||||
//
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
LOCK(cs_vNodes);
|
||||
|
||||
if (!fNetworkActive) {
|
||||
// Disconnect any connected nodes
|
||||
for (CNode* pnode : vNodes) {
|
||||
if (!pnode->fDisconnect) {
|
||||
LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId());
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Disconnect unused nodes
|
||||
std::vector<CNode*> vNodesCopy = vNodes;
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
{
|
||||
if (pnode->fDisconnect)
|
||||
{
|
||||
// remove from vNodes
|
||||
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
|
||||
|
||||
// release outbound grant (if any)
|
||||
pnode->grantOutbound.Release();
|
||||
|
||||
// close socket and cleanup
|
||||
pnode->CloseSocketDisconnect();
|
||||
|
||||
// hold in disconnected pool until all refs are released
|
||||
pnode->Release();
|
||||
vNodesDisconnected.push_back(pnode);
|
||||
if (!fNetworkActive) {
|
||||
// Disconnect any connected nodes
|
||||
for (CNode* pnode : vNodes) {
|
||||
if (!pnode->fDisconnect) {
|
||||
LogPrint(BCLog::NET, "Network not active, dropping peer=%d\n", pnode->GetId());
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Disconnect unused nodes
|
||||
std::vector<CNode*> vNodesCopy = vNodes;
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
{
|
||||
// Delete disconnected nodes
|
||||
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
|
||||
for (CNode* pnode : vNodesDisconnectedCopy)
|
||||
if (pnode->fDisconnect)
|
||||
{
|
||||
// wait until threads are done using it
|
||||
if (pnode->GetRefCount() <= 0) {
|
||||
bool fDelete = false;
|
||||
{
|
||||
TRY_LOCK(pnode->cs_inventory, lockInv);
|
||||
if (lockInv) {
|
||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||
if (lockSend) {
|
||||
fDelete = true;
|
||||
}
|
||||
// remove from vNodes
|
||||
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
|
||||
|
||||
// release outbound grant (if any)
|
||||
pnode->grantOutbound.Release();
|
||||
|
||||
// close socket and cleanup
|
||||
pnode->CloseSocketDisconnect();
|
||||
|
||||
// hold in disconnected pool until all refs are released
|
||||
pnode->Release();
|
||||
vNodesDisconnected.push_back(pnode);
|
||||
}
|
||||
}
|
||||
}
|
||||
{
|
||||
// Delete disconnected nodes
|
||||
std::list<CNode*> vNodesDisconnectedCopy = vNodesDisconnected;
|
||||
for (CNode* pnode : vNodesDisconnectedCopy)
|
||||
{
|
||||
// wait until threads are done using it
|
||||
if (pnode->GetRefCount() <= 0) {
|
||||
bool fDelete = false;
|
||||
{
|
||||
TRY_LOCK(pnode->cs_inventory, lockInv);
|
||||
if (lockInv) {
|
||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||
if (lockSend) {
|
||||
fDelete = true;
|
||||
}
|
||||
}
|
||||
if (fDelete) {
|
||||
vNodesDisconnected.remove(pnode);
|
||||
DeleteNode(pnode);
|
||||
}
|
||||
}
|
||||
if (fDelete) {
|
||||
vNodesDisconnected.remove(pnode);
|
||||
DeleteNode(pnode);
|
||||
}
|
||||
}
|
||||
}
|
||||
size_t vNodesSize;
|
||||
}
|
||||
}
|
||||
|
||||
void CConnman::NotifyNumConnectionsChanged()
|
||||
{
|
||||
size_t vNodesSize;
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
vNodesSize = vNodes.size();
|
||||
}
|
||||
if(vNodesSize != nPrevNodeCount) {
|
||||
nPrevNodeCount = vNodesSize;
|
||||
if(clientInterface)
|
||||
clientInterface->NotifyNumConnectionsChanged(vNodesSize);
|
||||
}
|
||||
}
|
||||
|
||||
void CConnman::InactivityCheck(CNode *pnode)
|
||||
{
|
||||
int64_t nTime = GetSystemTimeInSeconds();
|
||||
if (nTime - pnode->nTimeConnected > 60)
|
||||
{
|
||||
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
vNodesSize = vNodes.size();
|
||||
LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId());
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
if(vNodesSize != nPrevNodeCount) {
|
||||
nPrevNodeCount = vNodesSize;
|
||||
if(clientInterface)
|
||||
clientInterface->NotifyNumConnectionsChanged(vNodesSize);
|
||||
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
|
||||
{
|
||||
LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend);
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60))
|
||||
{
|
||||
LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv);
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
|
||||
{
|
||||
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
else if (!pnode->fSuccessfullyConnected)
|
||||
{
|
||||
LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId());
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Find which sockets have data to receive
|
||||
//
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 50000; // frequency to poll pnode->vSend
|
||||
void CConnman::SocketHandler()
|
||||
{
|
||||
//
|
||||
// Find which sockets have data to receive
|
||||
//
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 50000; // frequency to poll pnode->vSend
|
||||
|
||||
fd_set fdsetRecv;
|
||||
fd_set fdsetSend;
|
||||
fd_set fdsetError;
|
||||
FD_ZERO(&fdsetRecv);
|
||||
fd_set fdsetRecv;
|
||||
fd_set fdsetSend;
|
||||
fd_set fdsetError;
|
||||
FD_ZERO(&fdsetRecv);
|
||||
FD_ZERO(&fdsetSend);
|
||||
FD_ZERO(&fdsetError);
|
||||
SOCKET hSocketMax = 0;
|
||||
bool have_fds = false;
|
||||
|
||||
for (const ListenSocket& hListenSocket : vhListenSocket) {
|
||||
FD_SET(hListenSocket.socket, &fdsetRecv);
|
||||
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
|
||||
have_fds = true;
|
||||
}
|
||||
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
for (CNode* pnode : vNodes)
|
||||
{
|
||||
// Implement the following logic:
|
||||
// * If there is data to send, select() for sending data. As this only
|
||||
// happens when optimistic write failed, we choose to first drain the
|
||||
// write buffer in this case before receiving more. This avoids
|
||||
// needlessly queueing received data, if the remote peer is not themselves
|
||||
// receiving data. This means properly utilizing TCP flow control signalling.
|
||||
// * Otherwise, if there is space left in the receive buffer, select() for
|
||||
// receiving data.
|
||||
// * Hand off all complete messages to the processor, to be handled without
|
||||
// blocking here.
|
||||
|
||||
bool select_recv = !pnode->fPauseRecv;
|
||||
bool select_send;
|
||||
{
|
||||
LOCK(pnode->cs_vSend);
|
||||
select_send = !pnode->vSendMsg.empty();
|
||||
}
|
||||
|
||||
LOCK(pnode->cs_hSocket);
|
||||
if (pnode->hSocket == INVALID_SOCKET)
|
||||
continue;
|
||||
|
||||
FD_SET(pnode->hSocket, &fdsetError);
|
||||
hSocketMax = std::max(hSocketMax, pnode->hSocket);
|
||||
have_fds = true;
|
||||
|
||||
if (select_send) {
|
||||
FD_SET(pnode->hSocket, &fdsetSend);
|
||||
continue;
|
||||
}
|
||||
if (select_recv) {
|
||||
FD_SET(pnode->hSocket, &fdsetRecv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
|
||||
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
|
||||
if (interruptNet)
|
||||
return;
|
||||
|
||||
if (nSelect == SOCKET_ERROR)
|
||||
{
|
||||
if (have_fds)
|
||||
{
|
||||
int nErr = WSAGetLastError();
|
||||
LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
|
||||
for (unsigned int i = 0; i <= hSocketMax; i++)
|
||||
FD_SET(i, &fdsetRecv);
|
||||
}
|
||||
FD_ZERO(&fdsetSend);
|
||||
FD_ZERO(&fdsetError);
|
||||
SOCKET hSocketMax = 0;
|
||||
bool have_fds = false;
|
||||
|
||||
for (const ListenSocket& hListenSocket : vhListenSocket) {
|
||||
FD_SET(hListenSocket.socket, &fdsetRecv);
|
||||
hSocketMax = std::max(hSocketMax, hListenSocket.socket);
|
||||
have_fds = true;
|
||||
}
|
||||
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
|
||||
return;
|
||||
}
|
||||
|
||||
//
|
||||
// Accept new connections
|
||||
//
|
||||
for (const ListenSocket& hListenSocket : vhListenSocket)
|
||||
{
|
||||
if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
for (CNode* pnode : vNodes)
|
||||
{
|
||||
// Implement the following logic:
|
||||
// * If there is data to send, select() for sending data. As this only
|
||||
// happens when optimistic write failed, we choose to first drain the
|
||||
// write buffer in this case before receiving more. This avoids
|
||||
// needlessly queueing received data, if the remote peer is not themselves
|
||||
// receiving data. This means properly utilizing TCP flow control signalling.
|
||||
// * Otherwise, if there is space left in the receive buffer, select() for
|
||||
// receiving data.
|
||||
// * Hand off all complete messages to the processor, to be handled without
|
||||
// blocking here.
|
||||
|
||||
bool select_recv = !pnode->fPauseRecv;
|
||||
bool select_send;
|
||||
{
|
||||
LOCK(pnode->cs_vSend);
|
||||
select_send = !pnode->vSendMsg.empty();
|
||||
}
|
||||
|
||||
LOCK(pnode->cs_hSocket);
|
||||
if (pnode->hSocket == INVALID_SOCKET)
|
||||
continue;
|
||||
|
||||
FD_SET(pnode->hSocket, &fdsetError);
|
||||
hSocketMax = std::max(hSocketMax, pnode->hSocket);
|
||||
have_fds = true;
|
||||
|
||||
if (select_send) {
|
||||
FD_SET(pnode->hSocket, &fdsetSend);
|
||||
continue;
|
||||
}
|
||||
if (select_recv) {
|
||||
FD_SET(pnode->hSocket, &fdsetRecv);
|
||||
}
|
||||
}
|
||||
AcceptConnection(hListenSocket);
|
||||
}
|
||||
}
|
||||
|
||||
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
|
||||
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
|
||||
//
|
||||
// Service each socket
|
||||
//
|
||||
std::vector<CNode*> vNodesCopy;
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
vNodesCopy = vNodes;
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
pnode->AddRef();
|
||||
}
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
{
|
||||
if (interruptNet)
|
||||
return;
|
||||
|
||||
if (nSelect == SOCKET_ERROR)
|
||||
//
|
||||
// Receive
|
||||
//
|
||||
bool recvSet = false;
|
||||
bool sendSet = false;
|
||||
bool errorSet = false;
|
||||
{
|
||||
if (have_fds)
|
||||
{
|
||||
int nErr = WSAGetLastError();
|
||||
LogPrintf("socket select error %s\n", NetworkErrorString(nErr));
|
||||
for (unsigned int i = 0; i <= hSocketMax; i++)
|
||||
FD_SET(i, &fdsetRecv);
|
||||
}
|
||||
FD_ZERO(&fdsetSend);
|
||||
FD_ZERO(&fdsetError);
|
||||
if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000)))
|
||||
return;
|
||||
LOCK(pnode->cs_hSocket);
|
||||
if (pnode->hSocket == INVALID_SOCKET)
|
||||
continue;
|
||||
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
|
||||
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
|
||||
errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
|
||||
}
|
||||
|
||||
//
|
||||
// Accept new connections
|
||||
//
|
||||
for (const ListenSocket& hListenSocket : vhListenSocket)
|
||||
if (recvSet || errorSet)
|
||||
{
|
||||
if (hListenSocket.socket != INVALID_SOCKET && FD_ISSET(hListenSocket.socket, &fdsetRecv))
|
||||
{
|
||||
AcceptConnection(hListenSocket);
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Service each socket
|
||||
//
|
||||
std::vector<CNode*> vNodesCopy;
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
vNodesCopy = vNodes;
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
pnode->AddRef();
|
||||
}
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
{
|
||||
if (interruptNet)
|
||||
return;
|
||||
|
||||
//
|
||||
// Receive
|
||||
//
|
||||
bool recvSet = false;
|
||||
bool sendSet = false;
|
||||
bool errorSet = false;
|
||||
// typical socket buffer is 8K-64K
|
||||
char pchBuf[0x10000];
|
||||
int nBytes = 0;
|
||||
{
|
||||
LOCK(pnode->cs_hSocket);
|
||||
if (pnode->hSocket == INVALID_SOCKET)
|
||||
continue;
|
||||
recvSet = FD_ISSET(pnode->hSocket, &fdsetRecv);
|
||||
sendSet = FD_ISSET(pnode->hSocket, &fdsetSend);
|
||||
errorSet = FD_ISSET(pnode->hSocket, &fdsetError);
|
||||
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
|
||||
}
|
||||
if (recvSet || errorSet)
|
||||
if (nBytes > 0)
|
||||
{
|
||||
// typical socket buffer is 8K-64K
|
||||
char pchBuf[0x10000];
|
||||
int nBytes = 0;
|
||||
{
|
||||
LOCK(pnode->cs_hSocket);
|
||||
if (pnode->hSocket == INVALID_SOCKET)
|
||||
continue;
|
||||
nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
|
||||
}
|
||||
if (nBytes > 0)
|
||||
{
|
||||
bool notify = false;
|
||||
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
||||
pnode->CloseSocketDisconnect();
|
||||
RecordBytesRecv(nBytes);
|
||||
if (notify) {
|
||||
size_t nSizeAdded = 0;
|
||||
auto it(pnode->vRecvMsg.begin());
|
||||
for (; it != pnode->vRecvMsg.end(); ++it) {
|
||||
if (!it->complete())
|
||||
break;
|
||||
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
|
||||
}
|
||||
{
|
||||
LOCK(pnode->cs_vProcessMsg);
|
||||
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
||||
pnode->nProcessQueueSize += nSizeAdded;
|
||||
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
|
||||
}
|
||||
WakeMessageHandler();
|
||||
bool notify = false;
|
||||
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
||||
pnode->CloseSocketDisconnect();
|
||||
RecordBytesRecv(nBytes);
|
||||
if (notify) {
|
||||
size_t nSizeAdded = 0;
|
||||
auto it(pnode->vRecvMsg.begin());
|
||||
for (; it != pnode->vRecvMsg.end(); ++it) {
|
||||
if (!it->complete())
|
||||
break;
|
||||
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
|
||||
}
|
||||
}
|
||||
else if (nBytes == 0)
|
||||
{
|
||||
// socket closed gracefully
|
||||
if (!pnode->fDisconnect) {
|
||||
LogPrint(BCLog::NET, "socket closed\n");
|
||||
{
|
||||
LOCK(pnode->cs_vProcessMsg);
|
||||
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
|
||||
pnode->nProcessQueueSize += nSizeAdded;
|
||||
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
|
||||
}
|
||||
WakeMessageHandler();
|
||||
}
|
||||
}
|
||||
else if (nBytes == 0)
|
||||
{
|
||||
// socket closed gracefully
|
||||
if (!pnode->fDisconnect) {
|
||||
LogPrint(BCLog::NET, "socket closed\n");
|
||||
}
|
||||
pnode->CloseSocketDisconnect();
|
||||
}
|
||||
else if (nBytes < 0)
|
||||
{
|
||||
// error
|
||||
int nErr = WSAGetLastError();
|
||||
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
|
||||
{
|
||||
if (!pnode->fDisconnect)
|
||||
LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
|
||||
pnode->CloseSocketDisconnect();
|
||||
}
|
||||
else if (nBytes < 0)
|
||||
{
|
||||
// error
|
||||
int nErr = WSAGetLastError();
|
||||
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
|
||||
{
|
||||
if (!pnode->fDisconnect)
|
||||
LogPrintf("socket recv error %s\n", NetworkErrorString(nErr));
|
||||
pnode->CloseSocketDisconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Send
|
||||
//
|
||||
if (sendSet)
|
||||
{
|
||||
LOCK(pnode->cs_vSend);
|
||||
size_t nBytes = SocketSendData(pnode);
|
||||
if (nBytes) {
|
||||
RecordBytesSent(nBytes);
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Inactivity checking
|
||||
//
|
||||
int64_t nTime = GetSystemTimeInSeconds();
|
||||
if (nTime - pnode->nTimeConnected > 60)
|
||||
{
|
||||
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
|
||||
{
|
||||
LogPrint(BCLog::NET, "socket no message in first 60 seconds, %d %d from %d\n", pnode->nLastRecv != 0, pnode->nLastSend != 0, pnode->GetId());
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
else if (nTime - pnode->nLastSend > TIMEOUT_INTERVAL)
|
||||
{
|
||||
LogPrintf("socket sending timeout: %is\n", nTime - pnode->nLastSend);
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
else if (nTime - pnode->nLastRecv > (pnode->nVersion > BIP0031_VERSION ? TIMEOUT_INTERVAL : 90*60))
|
||||
{
|
||||
LogPrintf("socket receive timeout: %is\n", nTime - pnode->nLastRecv);
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
else if (pnode->nPingNonceSent && pnode->nPingUsecStart + TIMEOUT_INTERVAL * 1000000 < GetTimeMicros())
|
||||
{
|
||||
LogPrintf("ping timeout: %fs\n", 0.000001 * (GetTimeMicros() - pnode->nPingUsecStart));
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
else if (!pnode->fSuccessfullyConnected)
|
||||
{
|
||||
LogPrint(BCLog::NET, "version handshake timeout from %d\n", pnode->GetId());
|
||||
pnode->fDisconnect = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Send
|
||||
//
|
||||
if (sendSet)
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
pnode->Release();
|
||||
LOCK(pnode->cs_vSend);
|
||||
size_t nBytes = SocketSendData(pnode);
|
||||
if (nBytes) {
|
||||
RecordBytesSent(nBytes);
|
||||
}
|
||||
}
|
||||
|
||||
InactivityCheck(pnode);
|
||||
}
|
||||
{
|
||||
LOCK(cs_vNodes);
|
||||
for (CNode* pnode : vNodesCopy)
|
||||
pnode->Release();
|
||||
}
|
||||
}
|
||||
|
||||
void CConnman::ThreadSocketHandler()
|
||||
{
|
||||
while (!interruptNet)
|
||||
{
|
||||
DisconnectNodes();
|
||||
NotifyNumConnectionsChanged();
|
||||
SocketHandler();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2217,6 +2229,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe
|
|||
setBannedIsDirty = false;
|
||||
fAddressesInitialized = false;
|
||||
nLastNodeId = 0;
|
||||
nPrevNodeCount = 0;
|
||||
nSendBufferMaxSize = 0;
|
||||
nReceiveFloodSize = 0;
|
||||
flagInterruptMsgProc = false;
|
||||
|
|
|
@ -338,6 +338,10 @@ private:
|
|||
void ThreadOpenConnections(std::vector<std::string> connect);
|
||||
void ThreadMessageHandler();
|
||||
void AcceptConnection(const ListenSocket& hListenSocket);
|
||||
void DisconnectNodes();
|
||||
void NotifyNumConnectionsChanged();
|
||||
void InactivityCheck(CNode *pnode);
|
||||
void SocketHandler();
|
||||
void ThreadSocketHandler();
|
||||
void ThreadDNSAddressSeed();
|
||||
|
||||
|
@ -408,6 +412,7 @@ private:
|
|||
std::list<CNode*> vNodesDisconnected;
|
||||
mutable CCriticalSection cs_vNodes;
|
||||
std::atomic<NodeId> nLastNodeId;
|
||||
unsigned int nPrevNodeCount;
|
||||
|
||||
/** Services this instance offers */
|
||||
ServiceFlags nLocalServices;
|
||||
|
|
Loading…
Reference in a new issue