net: push only raw data into CConnman

This fixes one of the last major layer violations in the networking stack.

The network side is no longer in charge of message serialization, so it is now
decoupled from Bitcoin structures. Only the header is serialized and attached
to the payload.
This commit is contained in:
Cory Fields 2016-11-10 20:17:30 -05:00
parent 2ec935dcaa
commit c7be56dcef
3 changed files with 93 additions and 119 deletions

View file

@ -18,6 +18,7 @@
#include "init.h"
#include "merkleblock.h"
#include "net.h"
#include "netmessagemaker.h"
#include "netbase.h"
#include "policy/fees.h"
#include "policy/policy.h"
@ -368,8 +369,8 @@ void PushNodeVersion(CNode *pnode, CConnman& connman, int64_t nTime)
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
CAddress addrMe = CAddress(CService(), nLocalNodeServices);
connman.PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes);
connman.PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes));
if (fLogIPs)
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), nodeid);
@ -530,13 +531,13 @@ void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pf
// As per BIP152, we only get 3 of our peers to announce
// blocks using compact encodings.
connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){
connman.PushMessage(pnodeStop, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
return true;
});
lNodesAnnouncingHeaderAndIDs.pop_front();
}
fAnnounceUsingCMPCTBLOCK = true;
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId());
}
}
@ -4893,9 +4894,8 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
{
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
vector<CInv> vNotFound;
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
LOCK(cs_main);
while (it != pfrom->vRecvGetData.end()) {
@ -4949,9 +4949,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
if (!ReadBlockFromDisk(block, (*mi).second, consensusParams))
assert(!"cannot load block from disk");
if (inv.type == MSG_BLOCK)
connman.PushMessageWithFlag(pfrom, SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
connman.PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block));
else if (inv.type == MSG_WITNESS_BLOCK)
connman.PushMessage(pfrom, NetMsgType::BLOCK, block);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, block));
else if (inv.type == MSG_FILTERED_BLOCK)
{
bool sendMerkleBlock = false;
@ -4964,7 +4964,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}
if (sendMerkleBlock) {
connman.PushMessage(pfrom, NetMsgType::MERKLEBLOCK, merkleBlock);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock));
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
// This avoids hurting performance by pointlessly requiring a round-trip
// Note that there is currently no way for a node to request any single transactions we didn't send here -
@ -4973,7 +4973,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// however we MUST always provide at least what the remote peer needs
typedef std::pair<unsigned int, uint256> PairType;
BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn)
connman.PushMessageWithFlag(pfrom, SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *block.vtx[pair.first]);
connman.PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *block.vtx[pair.first]));
}
// else
// no response
@ -4985,11 +4985,12 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// and we don't feel like constructing the object for them, so
// instead we respond with the full, non-compact block.
bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness;
int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) {
CBlockHeaderAndShortTxIDs cmpctblock(block, fPeerWantsWitness);
connman.PushMessageWithFlag(pfrom, fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
} else
connman.PushMessageWithFlag(pfrom, fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, block));
}
// Trigger the peer node to send a getblocks request for the next batch of inventory
@ -5000,7 +5001,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// wait for other stuff first.
vector<CInv> vInv;
vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash()));
connman.PushMessage(pfrom, NetMsgType::INV, vInv);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv));
pfrom->hashContinue.SetNull();
}
}
@ -5010,15 +5011,16 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// Send stream from relay memory
bool push = false;
auto mi = mapRelay.find(inv.hash);
int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0);
if (mi != mapRelay.end()) {
connman.PushMessageWithFlag(pfrom, inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *mi->second);
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second));
push = true;
} else if (pfrom->timeLastMempoolReq) {
auto txinfo = mempool.info(inv.hash);
// To protect privacy, do not answer getdata using the mempool when
// that TX couldn't have been INVed in reply to a MEMPOOL request.
if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) {
connman.PushMessageWithFlag(pfrom, inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *txinfo.tx);
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx));
push = true;
}
}
@ -5045,7 +5047,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// do that because they want to know about (and store and rebroadcast and
// risk analyze) the dependencies of transactions relevant to them, without
// having to download the entire memory pool.
connman.PushMessage(pfrom, NetMsgType::NOTFOUND, vNotFound);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound));
}
}
@ -5089,7 +5091,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Each connection can only send one version message
if (pfrom->nVersion != 0)
{
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message")));
LOCK(cs_main);
Misbehaving(pfrom->GetId(), 1);
return false;
@ -5109,8 +5111,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (pfrom->nServicesExpected & ~pfrom->nServices)
{
LogPrint("net", "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom->id, pfrom->nServices, pfrom->nServicesExpected);
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
strprintf("Expected to offer services %08x", pfrom->nServicesExpected));
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
strprintf("Expected to offer services %08x", pfrom->nServicesExpected)));
pfrom->fDisconnect = true;
return false;
}
@ -5119,8 +5121,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
{
// disconnect from peers older than this proto version
LogPrintf("peer=%d using obsolete version %i; disconnecting\n", pfrom->id, pfrom->nVersion);
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION));
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION)));
pfrom->fDisconnect = true;
return false;
}
@ -5177,8 +5179,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
// Change version
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::VERACK);
pfrom->SetSendVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERACK));
int nSendVersion = std::min(pfrom->nVersion, PROTOCOL_VERSION);
pfrom->SetSendVersion(nSendVersion);
if (!pfrom->fInbound)
{
@ -5201,7 +5204,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Get recent addresses
if (pfrom->fOneShot || pfrom->nVersion >= CADDR_TIME_VERSION || connman.GetAddressCount() < 1000)
{
connman.PushMessage(pfrom, NetMsgType::GETADDR);
connman.PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR));
pfrom->fGetAddr = true;
}
connman.MarkAddressGood(pfrom->addr);
@ -5227,6 +5230,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
assert(pfrom->fInbound == false);
pfrom->fDisconnect = true;
}
return true;
}
@ -5238,8 +5242,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return false;
}
// At this point, the outgoing message serialization version can't change.
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
else if (strCommand == NetMsgType::VERACK)
if (strCommand == NetMsgType::VERACK)
{
pfrom->SetRecvVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
@ -5254,7 +5260,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// We send this to non-NODE NETWORK peers as well, because even
// non-NODE NETWORK peers can announce blocks (such as pruning
// nodes)
connman.PushMessage(pfrom, NetMsgType::SENDHEADERS);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDHEADERS));
}
if (pfrom->nVersion >= SHORT_IDS_BLOCKS_VERSION) {
// Tell our peer we are willing to provide version 1 or 2 cmpctblocks
@ -5265,9 +5271,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
bool fAnnounceUsingCMPCTBLOCK = false;
uint64_t nCMPCTBLOCKVersion = 2;
if (pfrom->GetLocalServices() & NODE_WITNESS)
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
nCMPCTBLOCKVersion = 1;
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
}
}
@ -5392,7 +5398,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// fell back to inv we probably have a reorg which we should get the headers for first,
// we now only provide a getheaders response here. When we receive the headers, we will
// then ask for the blocks we need.
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash));
LogPrint("net", "getheaders (%d) %s to peer=%d\n", pindexBestHeader->nHeight, inv.hash.ToString(), pfrom->id);
}
}
@ -5415,7 +5421,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
if (!vToFetch.empty())
connman.PushMessage(pfrom, NetMsgType::GETDATA, vToFetch);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vToFetch));
}
@ -5527,7 +5533,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
}
resp.txn[i] = block.vtx[req.indexes[i]];
}
connman.PushMessageWithFlag(pfrom, State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCKTXN, resp);
int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
}
@ -5576,7 +5583,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// headers message). In both cases it's safe to update
// pindexBestHeaderSent to be our tip.
nodestate->pindexBestHeaderSent = pindex ? pindex : chainActive.Tip();
connman.PushMessage(pfrom, NetMsgType::HEADERS, vHeaders);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders));
}
@ -5739,8 +5746,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
pfrom->id,
FormatStateMessage(state));
if (state.GetRejectCode() < REJECT_INTERNAL) // Never send AcceptToMemoryPool's internal codes over P2P
connman.PushMessage(pfrom, NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), inv.hash);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), inv.hash));
if (nDoS > 0) {
Misbehaving(pfrom->GetId(), nDoS);
}
@ -5758,7 +5765,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (mapBlockIndex.find(cmpctblock.header.hashPrevBlock) == mapBlockIndex.end()) {
// Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers
if (!IsInitialBlockDownload())
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256()));
return true;
}
@ -5791,7 +5798,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// so we just grab the block via normal getdata
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
}
return true;
}
@ -5835,7 +5842,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Duplicate txindexes, the block is now in-flight, so just request it
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
return true;
}
@ -5859,7 +5866,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman);
} else {
req.blockhash = pindex->GetBlockHash();
connman.PushMessage(pfrom, NetMsgType::GETBLOCKTXN, req);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req));
}
}
} else {
@ -5868,7 +5875,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// mempool will probably be useless - request the block normally
std::vector<CInv> vInv(1);
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
return true;
} else {
// If this was an announce-cmpctblock, we want the same treatment as a header message
@ -5910,7 +5917,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Might have collided, fall back to getdata now :(
std::vector<CInv> invs;
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus()), resp.blockhash));
connman.PushMessage(pfrom, NetMsgType::GETDATA, invs);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, invs));
} else {
// Block is either okay, or possibly we received
// READ_STATUS_CHECKBLOCK_FAILED.
@ -5987,7 +5994,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// nUnconnectingHeaders gets reset back to 0.
if (mapBlockIndex.find(headers[0].hashPrevBlock) == mapBlockIndex.end() && nCount < MAX_BLOCKS_TO_ANNOUNCE) {
nodestate->nUnconnectingHeaders++;
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256()));
LogPrint("net", "received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n",
headers[0].GetHash().ToString(),
headers[0].hashPrevBlock.ToString(),
@ -6034,7 +6041,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// TODO: optimize: if pindexLast is an ancestor of chainActive.Tip or pindexBestHeader, continue
// from there instead.
LogPrint("net", "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom->id, pfrom->nStartingHeight);
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256());
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256()));
}
bool fCanDirectFetch = CanDirectFetch(chainparams.GetConsensus());
@ -6087,7 +6094,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// In any case, we want to download using a compact block, not a regular one
vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash);
}
connman.PushMessage(pfrom, NetMsgType::GETDATA, vGetData);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vGetData));
}
}
}
@ -6191,7 +6198,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// it, if the remote node sends a ping once per second and this node takes 5
// seconds to respond to each, the 5th ping the remote sends would appear to
// return very quickly.
connman.PushMessage(pfrom, NetMsgType::PONG, nonce);
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::PONG, nonce));
}
}
@ -6447,7 +6454,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
}
catch (const std::ios_base::failure& e)
{
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message")));
if (strstr(e.what(), "end of data"))
{
// Allow exceptions from under-length message on vRecv
@ -6515,6 +6522,9 @@ bool SendMessages(CNode* pto, CConnman& connman)
if (pto->nVersion == 0 || pto->fDisconnect)
return true;
// If we get here, the outgoing message serialization version is set and can't change.
CNetMsgMaker msgMaker(pto->GetSendVersion());
//
// Message: ping
//
@ -6536,11 +6546,11 @@ bool SendMessages(CNode* pto, CConnman& connman)
pto->nPingUsecStart = GetTimeMicros();
if (pto->nVersion > BIP0031_VERSION) {
pto->nPingNonceSent = nonce;
connman.PushMessage(pto, NetMsgType::PING, nonce);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce));
} else {
// Peer is too old to support ping command with nonce, pong will never arrive.
pto->nPingNonceSent = 0;
connman.PushMessage(pto, NetMsgType::PING);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING));
}
}
@ -6551,7 +6561,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
CNodeState &state = *State(pto->GetId());
BOOST_FOREACH(const CBlockReject& reject, state.rejects)
connman.PushMessage(pto, NetMsgType::REJECT, (string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::REJECT, (string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock));
state.rejects.clear();
if (state.fShouldBan) {
@ -6593,14 +6603,14 @@ bool SendMessages(CNode* pto, CConnman& connman)
// receiver rejects addr messages larger than 1000
if (vAddr.size() >= 1000)
{
connman.PushMessage(pto, NetMsgType::ADDR, vAddr);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr));
vAddr.clear();
}
}
}
pto->vAddrToSend.clear();
if (!vAddr.empty())
connman.PushMessage(pto, NetMsgType::ADDR, vAddr);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr));
// we only send the big addr message once
if (pto->vAddrToSend.capacity() > 40)
pto->vAddrToSend.shrink_to_fit();
@ -6626,7 +6636,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
if (pindexStart->pprev)
pindexStart = pindexStart->pprev;
LogPrint("net", "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->id, pto->nStartingHeight);
connman.PushMessage(pto, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256());
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256()));
}
}
@ -6715,7 +6725,8 @@ bool SendMessages(CNode* pto, CConnman& connman)
CBlock block;
assert(ReadBlockFromDisk(block, pBestIndex, consensusParams));
CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness);
connman.PushMessageWithFlag(pto, state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
int nSendFlags = state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
state.pindexBestHeaderSent = pBestIndex;
} else if (state.fPreferHeaders) {
if (vHeaders.size() > 1) {
@ -6727,7 +6738,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
LogPrint("net", "%s: sending header %s to peer=%d\n", __func__,
vHeaders.front().GetHash().ToString(), pto->id);
}
connman.PushMessage(pto, NetMsgType::HEADERS, vHeaders);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::HEADERS, vHeaders));
state.pindexBestHeaderSent = pBestIndex;
} else
fRevertToInv = true;
@ -6773,7 +6784,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
BOOST_FOREACH(const uint256& hash, pto->vInventoryBlockToSend) {
vInv.push_back(CInv(MSG_BLOCK, hash));
if (vInv.size() == MAX_INV_SZ) {
connman.PushMessage(pto, NetMsgType::INV, vInv);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
}
@ -6819,7 +6830,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
pto->filterInventoryKnown.insert(hash);
vInv.push_back(inv);
if (vInv.size() == MAX_INV_SZ) {
connman.PushMessage(pto, NetMsgType::INV, vInv);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
}
@ -6885,7 +6896,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
}
}
if (vInv.size() == MAX_INV_SZ) {
connman.PushMessage(pto, NetMsgType::INV, vInv);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
vInv.clear();
}
pto->filterInventoryKnown.insert(hash);
@ -6893,7 +6904,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
}
}
if (!vInv.empty())
connman.PushMessage(pto, NetMsgType::INV, vInv);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
// Detect whether we're stalling
nNow = GetTimeMicros();
@ -6956,7 +6967,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
vGetData.push_back(inv);
if (vGetData.size() >= 1000)
{
connman.PushMessage(pto, NetMsgType::GETDATA, vGetData);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
vGetData.clear();
}
} else {
@ -6966,7 +6977,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
pto->mapAskFor.erase(pto->mapAskFor.begin());
}
if (!vGetData.empty())
connman.PushMessage(pto, NetMsgType::GETDATA, vGetData);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
//
// Message: feefilter
@ -6979,7 +6990,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
if (timeNow > pto->nextSendTimeFeeFilter) {
CAmount filterToSend = filterRounder.round(currentFilter);
if (filterToSend != pto->lastSentFeeFilter) {
connman.PushMessage(pto, NetMsgType::FEEFILTER, filterToSend);
connman.PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend));
pto->lastSentFeeFilter = filterToSend;
}
pto->nextSendTimeFeeFilter = PoissonNextSend(timeNow, AVG_FEEFILTER_BROADCAST_INTERVAL);

View file

@ -768,13 +768,13 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
size_t SocketSendData(CNode *pnode)
{
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
while (it != pnode->vSendMsg.end()) {
const CSerializeData &data = *it;
const auto &data = *it;
assert(data.size() > pnode->nSendOffset);
int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0) {
pnode->nLastSend = GetTime();
pnode->nSendBytes += nBytes;
@ -2612,30 +2612,19 @@ void CNode::AskFor(const CInv& inv)
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
}
size_t nMessageSize = msg.data.size();
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->id);
void CConnman::EndMessage(CDataStream& strm)
{
// Set the size
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
// Set the checksum
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
std::vector<unsigned char> serializedHeader;
serializedHeader.reserve(CMessageHeader::HEADER_SIZE);
uint256 hash = Hash(msg.data.data(), msg.data.data() + nMessageSize);
CMessageHeader hdr(Params().MessageStart(), msg.command.c_str(), nMessageSize);
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
}
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
{
if(strm.empty())
return;
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, serializedHeader, 0, hdr};
size_t nBytesSent = 0;
{
@ -2644,11 +2633,14 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s
return;
}
bool optimisticSend(pnode->vSendMsg.empty());
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
//log total amount of bytes per command
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
pnode->nSendSize += strm.size();
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));
// If write queue empty, attempt "optimistic write"
if (optimisticSend == true)

View file

@ -152,32 +152,7 @@ public:
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
template <typename... Args>
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
{
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
::SerializeMany(msg, std::forward<Args>(args)...);
EndMessage(msg);
PushMessage(pnode, msg, sCommand);
}
template <typename... Args>
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
}
template <typename... Args>
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
}
template <typename... Args>
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
{
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
}
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);
template<typename Callable>
bool ForEachNodeContinueIf(Callable&& func)
@ -388,10 +363,6 @@ private:
unsigned int GetReceiveFloodSize() const;
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
void EndMessage(CDataStream& strm);
// Network stats
void RecordBytesRecv(uint64_t bytes);
void RecordBytesSent(uint64_t bytes);
@ -615,7 +586,7 @@ public:
size_t nSendSize; // total size of all vSendMsg entries
size_t nSendOffset; // offset inside the first vSendMsg already sent
uint64_t nSendBytes;
std::deque<CSerializeData> vSendMsg;
std::deque<std::vector<unsigned char>> vSendMsg;
CCriticalSection cs_vSend;
std::deque<CInv> vRecvGetData;
@ -785,7 +756,7 @@ public:
{
// The send version should always be explicitly set to
// INIT_PROTO_VERSION rather than using this value until the handshake
// is complete. See PushMessageWithVersion().
// is complete.
assert(nSendVersion != 0);
return nSendVersion;
}