P2P: improve RX/TX flow control
1) "optimistic write": Push each message to kernel socket buffer immediately. 2) If there is write data at select time, that implies send() blocked during optimistic write. Drain write queue, before receiving any more messages. This avoids needlessly queueing received data, if the remote peer is not themselves receiving data. Result: write buffer (and thus memory usage) is kept small, DoS potential is slightly lower, and TCP flow control signalling is properly utilized. The kernel will queue data into the socket buffer, then signal the remote peer to stop sending data, until we resume reading again.
This commit is contained in:
parent
bc2f5aa72c
commit
b9ff2970b9
2 changed files with 15 additions and 6 deletions
12
src/net.cpp
12
src/net.cpp
|
@ -855,14 +855,18 @@ void ThreadSocketHandler2(void* parg)
|
||||||
{
|
{
|
||||||
if (pnode->hSocket == INVALID_SOCKET)
|
if (pnode->hSocket == INVALID_SOCKET)
|
||||||
continue;
|
continue;
|
||||||
|
{
|
||||||
|
TRY_LOCK(pnode->cs_vSend, lockSend);
|
||||||
|
if (lockSend) {
|
||||||
|
// do not read, if draining write queue
|
||||||
|
if (!pnode->vSend.empty())
|
||||||
|
FD_SET(pnode->hSocket, &fdsetSend);
|
||||||
|
else
|
||||||
FD_SET(pnode->hSocket, &fdsetRecv);
|
FD_SET(pnode->hSocket, &fdsetRecv);
|
||||||
FD_SET(pnode->hSocket, &fdsetError);
|
FD_SET(pnode->hSocket, &fdsetError);
|
||||||
hSocketMax = max(hSocketMax, pnode->hSocket);
|
hSocketMax = max(hSocketMax, pnode->hSocket);
|
||||||
have_fds = true;
|
have_fds = true;
|
||||||
{
|
}
|
||||||
TRY_LOCK(pnode->cs_vSend, lockSend);
|
|
||||||
if (lockSend && !pnode->vSend.empty())
|
|
||||||
FD_SET(pnode->hSocket, &fdsetSend);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,7 @@ unsigned short GetListenPort();
|
||||||
bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string()));
|
bool BindListenPort(const CService &bindAddr, std::string& strError=REF(std::string()));
|
||||||
void StartNode(void* parg);
|
void StartNode(void* parg);
|
||||||
bool StopNode();
|
bool StopNode();
|
||||||
|
void SocketSendData(CNode *pnode);
|
||||||
|
|
||||||
enum
|
enum
|
||||||
{
|
{
|
||||||
|
@ -437,6 +438,10 @@ public:
|
||||||
printf("(%d bytes)\n", nSize);
|
printf("(%d bytes)\n", nSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If write queue empty, attempt "optimistic write"
|
||||||
|
if (nHeaderStart == 0)
|
||||||
|
SocketSendData(this);
|
||||||
|
|
||||||
nHeaderStart = -1;
|
nHeaderStart = -1;
|
||||||
nMessageStart = -1;
|
nMessageStart = -1;
|
||||||
LEAVE_CRITICAL_SECTION(cs_vSend);
|
LEAVE_CRITICAL_SECTION(cs_vSend);
|
||||||
|
|
Loading…
Reference in a new issue