Merge #9441: Net: Massive speedup. Net locks overhaul

e60360e net: remove cs_vRecvMsg (Cory Fields)
991955e net: add a flag to indicate when a node's send buffer is full (Cory Fields)
c6e8a9b net: add a flag to indicate when a node's process queue is full (Cory Fields)
4d712e3 net: add a new message queue for the message processor (Cory Fields)
c5a8b1b net: rework the way that the messagehandler sleeps (Cory Fields)
c72cc88 net: remove useless comments (Cory Fields)
ef7b5ec net: Add a simple function for waking the message handler (Cory Fields)
f5c36d1 net: record bytes written before notifying the message processor (Cory Fields)
60befa3 net: handle message accounting in ReceiveMsgBytes (Cory Fields)
56212e2 net: set message deserialization version when it's actually time to deserialize (Cory Fields)
0e973d9 net: remove redundant max sendbuffer size check (Cory Fields)
6042587 net: wait until the node is destroyed to delete its recv buffer (Cory Fields)
f6315e0 net: only disconnect if fDisconnect has been set (Cory Fields)
5b4a8ac net: make GetReceiveFloodSize public (Cory Fields)
e5bcd9c net: make vRecvMsg a list so that we can use splice() (Cory Fields)
53ad9a1 net: fix typo causing the wrong receive buffer size (Cory Fields)
This commit is contained in:
Pieter Wuille 2017-01-13 09:51:04 -08:00
commit 8b66bf74e2
No known key found for this signature in database
GPG key ID: DBA1A67379A1A931
4 changed files with 106 additions and 124 deletions

View file

@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect()
LogPrint("net", "disconnecting peer=%d\n", id); LogPrint("net", "disconnecting peer=%d\n", id);
CloseSocket(hSocket); CloseSocket(hSocket);
} }
// in case this fails, we'll empty the recv buffer when the CNode is deleted
TRY_LOCK(cs_vRecvMsg, lockRecv);
if (lockRecv)
vRecvMsg.clear();
} }
void CConnman::ClearBanned() void CConnman::ClearBanned()
@ -650,16 +645,18 @@ void CNode::copyStats(CNodeStats &stats)
} }
#undef X #undef X
// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{ {
complete = false; complete = false;
int64_t nTimeMicros = GetTimeMicros();
nLastRecv = nTimeMicros / 1000000;
nRecvBytes += nBytes;
while (nBytes > 0) { while (nBytes > 0) {
// get current incomplete message, or create a new one // get current incomplete message, or create a new one
if (vRecvMsg.empty() || if (vRecvMsg.empty() ||
vRecvMsg.back().complete()) vRecvMsg.back().complete())
vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion)); vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
CNetMessage& msg = vRecvMsg.back(); CNetMessage& msg = vRecvMsg.back();
@ -691,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end()); assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
msg.nTime = GetTimeMicros(); msg.nTime = nTimeMicros;
complete = true; complete = true;
} }
} }
@ -764,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend) // requires LOCK(cs_vSend)
size_t SocketSendData(CNode *pnode) size_t CConnman::SocketSendData(CNode *pnode)
{ {
auto it = pnode->vSendMsg.begin(); auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0; size_t nSentSize = 0;
@ -781,6 +778,7 @@ size_t SocketSendData(CNode *pnode)
if (pnode->nSendOffset == data.size()) { if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0; pnode->nSendOffset = 0;
pnode->nSendSize -= data.size(); pnode->nSendSize -= data.size();
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++; it++;
} else { } else {
// could not send full message; stop sending more // could not send full message; stop sending more
@ -1052,8 +1050,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes; std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
{ {
if (pnode->fDisconnect || if (pnode->fDisconnect)
(pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
{ {
// remove from vNodes // remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@ -1083,13 +1080,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) if (lockSend)
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
TRY_LOCK(pnode->cs_inventory, lockInv); TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv) if (lockInv)
fDelete = true; fDelete = true;
}
} }
} }
if (fDelete) if (fDelete)
@ -1149,15 +1142,10 @@ void CConnman::ThreadSocketHandler()
// write buffer in this case before receiving more. This avoids // write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves // needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling. // receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is no (complete) message in the receive buffer, // * Otherwise, if there is space left in the receive buffer, select() for
// or there is space left in the buffer, select() for receiving data. // receiving data.
// * (if neither of the above applies, there is certainly one message // * Hand off all complete messages to the processor, to be handled without
// in the receiver buffer ready to be processed). // blocking here.
// Together, that means that at least one of the following is always possible,
// so we don't deadlock:
// * We send some data.
// * We wait for data to be received (and disconnect after timeout).
// * We process a message in the buffer (message handler thread).
{ {
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) { if (lockSend) {
@ -1168,10 +1156,7 @@ void CConnman::ThreadSocketHandler()
} }
} }
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (!pnode->fPauseRecv)
if (lockRecv && (
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
FD_SET(pnode->hSocket, &fdsetRecv); FD_SET(pnode->hSocket, &fdsetRecv);
} }
} }
@ -1230,8 +1215,6 @@ void CConnman::ThreadSocketHandler()
continue; continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{ {
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{ {
{ {
// typical socket buffer is 8K-64K // typical socket buffer is 8K-64K
@ -1242,11 +1225,23 @@ void CConnman::ThreadSocketHandler()
bool notify = false; bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect(); pnode->CloseSocketDisconnect();
if(notify)
condMsgProc.notify_one();
pnode->nLastRecv = GetTime();
pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes); RecordBytesRecv(nBytes);
if (notify) {
size_t nSizeAdded = 0;
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
pnode->nProcessQueueSize += nSizeAdded;
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
WakeMessageHandler();
}
} }
else if (nBytes == 0) else if (nBytes == 0)
{ {
@ -1280,8 +1275,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend); TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend) { if (lockSend) {
size_t nBytes = SocketSendData(pnode); size_t nBytes = SocketSendData(pnode);
if (nBytes) if (nBytes) {
RecordBytesSent(nBytes); RecordBytesSent(nBytes);
}
} }
} }
@ -1321,8 +1317,14 @@ void CConnman::ThreadSocketHandler()
} }
} }
void CConnman::WakeMessageHandler()
{
{
std::lock_guard<std::mutex> lock(mutexMsgProc);
fMsgProcWake = true;
}
condMsgProc.notify_one();
}
@ -1858,7 +1860,7 @@ void CConnman::ThreadMessageHandler()
} }
} }
bool fSleep = true; bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy) BOOST_FOREACH(CNode* pnode, vNodesCopy)
{ {
@ -1866,22 +1868,8 @@ void CConnman::ThreadMessageHandler()
continue; continue;
// Receive messages // Receive messages
{ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (lockRecv)
{
if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
pnode->CloseSocketDisconnect();
if (pnode->nSendSize < GetSendBufferSize())
{
if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
{
fSleep = false;
}
}
}
}
if (flagInterruptMsgProc) if (flagInterruptMsgProc)
return; return;
@ -1901,10 +1889,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release(); pnode->Release();
} }
if (fSleep) { std::unique_lock<std::mutex> lock(mutexMsgProc);
std::unique_lock<std::mutex> lock(mutexMsgProc); if (!fMoreWork) {
condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
} }
fMsgProcWake = false;
} }
} }
@ -2121,7 +2110,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nMaxFeeler = connOptions.nMaxFeeler; nMaxFeeler = connOptions.nMaxFeeler;
nSendBufferMaxSize = connOptions.nSendBufferMaxSize; nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
nReceiveFloodSize = connOptions.nSendBufferMaxSize; nReceiveFloodSize = connOptions.nReceiveFloodSize;
nMaxOutboundLimit = connOptions.nMaxOutboundLimit; nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@ -2182,6 +2171,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset(); interruptNet.reset();
flagInterruptMsgProc = false; flagInterruptMsgProc = false;
{
std::unique_lock<std::mutex> lock(mutexMsgProc);
fMsgProcWake = false;
}
// Send and receive from sockets, accept connections // Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@ -2613,6 +2607,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0; minFeeFilter = 0;
lastSentFeeFilter = 0; lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0; nextSendTimeFeeFilter = 0;
fPauseRecv = false;
fPauseSend = false;
nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0; mapRecvBytesPerMsgCmd[msg] = 0;
@ -2692,6 +2689,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize; pnode->nSendSize += nTotalSize;
if (pnode->nSendSize > nSendBufferMaxSize)
pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader)); pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize) if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data)); pnode->vSendMsg.push_back(std::move(msg.data));

View file

@ -327,6 +327,7 @@ public:
/** Get a unique deterministic randomizer. */ /** Get a unique deterministic randomizer. */
CSipHasher GetDeterministicRandomizer(uint64_t id); CSipHasher GetDeterministicRandomizer(uint64_t id);
unsigned int GetReceiveFloodSize() const;
private: private:
struct ListenSocket { struct ListenSocket {
SOCKET socket; SOCKET socket;
@ -343,6 +344,8 @@ private:
void ThreadSocketHandler(); void ThreadSocketHandler();
void ThreadDNSAddressSeed(); void ThreadDNSAddressSeed();
void WakeMessageHandler();
uint64_t CalculateKeyedNetGroup(const CAddress& ad); uint64_t CalculateKeyedNetGroup(const CAddress& ad);
CNode* FindNode(const CNetAddr& ip); CNode* FindNode(const CNetAddr& ip);
@ -358,6 +361,7 @@ private:
NodeId GetNewNodeId(); NodeId GetNewNodeId();
size_t SocketSendData(CNode *pnode);
//!check is the banlist has unwritten changes //!check is the banlist has unwritten changes
bool BannedSetIsDirty(); bool BannedSetIsDirty();
//!set the "dirty" flag for the banlist //!set the "dirty" flag for the banlist
@ -368,8 +372,6 @@ private:
void DumpData(); void DumpData();
void DumpBanlist(); void DumpBanlist();
unsigned int GetReceiveFloodSize() const;
// Network stats // Network stats
void RecordBytesRecv(uint64_t bytes); void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes); void RecordBytesSent(uint64_t bytes);
@ -428,6 +430,9 @@ private:
/** SipHasher seeds for deterministic randomness */ /** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0, nSeed1; const uint64_t nSeed0, nSeed1;
/** flag for waking the message processor. */
bool fMsgProcWake;
std::condition_variable condMsgProc; std::condition_variable condMsgProc;
std::mutex mutexMsgProc; std::mutex mutexMsgProc;
std::atomic<bool> flagInterruptMsgProc; std::atomic<bool> flagInterruptMsgProc;
@ -445,7 +450,6 @@ void Discover(boost::thread_group& threadGroup);
void MapPort(bool fUseUPnP); void MapPort(bool fUseUPnP);
unsigned short GetListenPort(); unsigned short GetListenPort();
bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false);
size_t SocketSendData(CNode *pnode);
struct CombinerAll struct CombinerAll
{ {
@ -610,11 +614,13 @@ public:
std::deque<std::vector<unsigned char>> vSendMsg; std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend; CCriticalSection cs_vSend;
CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
size_t nProcessQueueSize;
std::deque<CInv> vRecvGetData; std::deque<CInv> vRecvGetData;
std::deque<CNetMessage> vRecvMsg;
CCriticalSection cs_vRecvMsg;
uint64_t nRecvBytes; uint64_t nRecvBytes;
int nRecvVersion; std::atomic<int> nRecvVersion;
int64_t nLastSend; int64_t nLastSend;
int64_t nLastRecv; int64_t nLastRecv;
@ -650,6 +656,8 @@ public:
const NodeId id; const NodeId id;
const uint64_t nKeyedNetGroup; const uint64_t nKeyedNetGroup;
std::atomic_bool fPauseRecv;
std::atomic_bool fPauseSend;
protected: protected:
mapMsgCmdSize mapSendBytesPerMsgCmd; mapMsgCmdSize mapSendBytesPerMsgCmd;
@ -723,6 +731,7 @@ private:
const ServiceFlags nLocalServices; const ServiceFlags nLocalServices;
const int nMyStartingHeight; const int nMyStartingHeight;
int nSendVersion; int nSendVersion;
std::list<CNetMessage> vRecvMsg; // Used only by SocketHandler thread
public: public:
NodeId GetId() const { NodeId GetId() const {
@ -743,24 +752,15 @@ public:
return nRefCount; return nRefCount;
} }
// requires LOCK(cs_vRecvMsg)
unsigned int GetTotalRecvSize()
{
unsigned int total = 0;
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
total += msg.vRecv.size() + 24;
return total;
}
// requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete); bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
// requires LOCK(cs_vRecvMsg)
void SetRecvVersion(int nVersionIn) void SetRecvVersion(int nVersionIn)
{ {
nRecvVersion = nVersionIn; nRecvVersion = nVersionIn;
BOOST_FOREACH(CNetMessage &msg, vRecvMsg) }
msg.SetVersion(nVersionIn); int GetRecvVersion()
{
return nRecvVersion;
} }
void SetSendVersion(int nVersionIn) void SetSendVersion(int nVersionIn)
{ {

View file

@ -889,14 +889,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman& connma
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc) void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin(); std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
vector<CInv> vNotFound; vector<CInv> vNotFound;
CNetMsgMaker msgMaker(pfrom->GetSendVersion()); CNetMsgMaker msgMaker(pfrom->GetSendVersion());
LOCK(cs_main); LOCK(cs_main);
while (it != pfrom->vRecvGetData.end()) { while (it != pfrom->vRecvGetData.end()) {
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= nMaxSendBufferSize) if (pfrom->fPauseSend)
break; break;
const CInv &inv = *it; const CInv &inv = *it;
@ -1059,8 +1058,6 @@ uint32_t GetFetchFlags(CNode* pfrom, CBlockIndex* pprev, const Consensus::Params
bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc) bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id); LogPrint("net", "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->id);
if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0) if (IsArgSet("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 0)) == 0)
{ {
@ -1413,11 +1410,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Track requests for our stuff // Track requests for our stuff
GetMainSignals().Inventory(inv.hash); GetMainSignals().Inventory(inv.hash);
if (pfrom->nSendSize > (nMaxSendBufferSize * 2)) {
Misbehaving(pfrom->GetId(), 50);
return error("send buffer size() = %u", pfrom->nSendSize);
}
} }
if (!vToFetch.empty()) if (!vToFetch.empty())
@ -2450,14 +2442,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return true; return true;
} }
// requires LOCK(cs_vRecvMsg)
bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc) bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interruptMsgProc)
{ {
const CChainParams& chainparams = Params(); const CChainParams& chainparams = Params();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
//if (fDebug)
// LogPrintf("%s(%u messages)\n", __func__, pfrom->vRecvMsg.size());
// //
// Message format // Message format
// (4) message start // (4) message start
@ -2466,40 +2453,40 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
// (4) checksum // (4) checksum
// (x) data // (x) data
// //
bool fOk = true; bool fMoreWork = false;
if (!pfrom->vRecvGetData.empty()) if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc);
if (pfrom->fDisconnect)
return false;
// this maintains the order of responses // this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return fOk; if (!pfrom->vRecvGetData.empty()) return true;
std::deque<CNetMessage>::iterator it = pfrom->vRecvMsg.begin();
while (!pfrom->fDisconnect && it != pfrom->vRecvMsg.end()) {
// Don't bother if send buffer is too full to respond anyway // Don't bother if send buffer is too full to respond anyway
if (pfrom->nSendSize >= nMaxSendBufferSize) if (pfrom->fPauseSend)
break; return false;
// get next message std::list<CNetMessage> msgs;
CNetMessage& msg = *it; {
LOCK(pfrom->cs_vProcessMsg);
//if (fDebug) if (pfrom->vProcessMsg.empty())
// LogPrintf("%s(message %u msgsz, %u bytes, complete:%s)\n", __func__, return false;
// msg.hdr.nMessageSize, msg.vRecv.size(), // Just take one message
// msg.complete() ? "Y" : "N"); msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
// end, if an incomplete message is found pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
if (!msg.complete()) fMoreWork = !pfrom->vProcessMsg.empty();
break; }
CNetMessage& msg(msgs.front());
// at this point, any failure means we can delete the current message
it++;
msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start // Scan for message start
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) { if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id); LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
fOk = false; pfrom->fDisconnect = true;
break; return false;
} }
// Read header // Read header
@ -2507,7 +2494,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!hdr.IsValid(chainparams.MessageStart())) if (!hdr.IsValid(chainparams.MessageStart()))
{ {
LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); LogPrintf("PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id);
continue; return fMoreWork;
} }
string strCommand = hdr.GetCommand(); string strCommand = hdr.GetCommand();
@ -2523,7 +2510,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
SanitizeString(strCommand), nMessageSize, SanitizeString(strCommand), nMessageSize,
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
continue; return fMoreWork;
} }
// Process message // Process message
@ -2532,7 +2519,9 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
{ {
fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc); fRet = ProcessMessage(pfrom, strCommand, vRecv, msg.nTime, chainparams, connman, interruptMsgProc);
if (interruptMsgProc) if (interruptMsgProc)
return true; return false;
if (!pfrom->vRecvGetData.empty())
fMoreWork = true;
} }
catch (const std::ios_base::failure& e) catch (const std::ios_base::failure& e)
{ {
@ -2566,14 +2555,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (!fRet) if (!fRet)
LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id); LogPrintf("%s(%s, %u bytes) FAILED peer=%d\n", __func__, SanitizeString(strCommand), nMessageSize, pfrom->id);
break; return fMoreWork;
}
// In case the connection got shut down, its receive buffer was wiped
if (!pfrom->fDisconnect)
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), it);
return fOk;
} }
class CompareInvMempoolOrder class CompareInvMempoolOrder

View file

@ -46,6 +46,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
* @param[in] pto The node which we are sending messages to. * @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node. * @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads * @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/ */
bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt); bool SendMessages(CNode* pto, CConnman& connman, std::atomic<bool>& interrupt);