Merge #9128: net: Decouple CConnman and message serialization
c7be56d
net: push only raw data into CConnman (Cory Fields)2ec935d
net: add CVectorWriter and CNetMsgMaker (Cory Fields)b7695c2
net: No need to check individually for disconnection anymore (Cory Fields)fedea8a
net: don't send any messages before handshake or after requested disconnect (Cory Fields)d74e352
net: Set feelers to disconnect at the end of the version message (Cory Fields)
This commit is contained in:
commit
76fec09d87
7 changed files with 308 additions and 152 deletions
|
@ -111,6 +111,7 @@ BITCOIN_CORE_H = \
|
|||
net.h \
|
||||
netaddress.h \
|
||||
netbase.h \
|
||||
netmessagemaker.h \
|
||||
noui.h \
|
||||
policy/fees.h \
|
||||
policy/policy.h \
|
||||
|
|
201
src/main.cpp
201
src/main.cpp
|
@ -18,6 +18,7 @@
|
|||
#include "init.h"
|
||||
#include "merkleblock.h"
|
||||
#include "net.h"
|
||||
#include "netmessagemaker.h"
|
||||
#include "netbase.h"
|
||||
#include "policy/fees.h"
|
||||
#include "policy/policy.h"
|
||||
|
@ -368,8 +369,8 @@ void PushNodeVersion(CNode *pnode, CConnman& connman, int64_t nTime)
|
|||
CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices));
|
||||
CAddress addrMe = CAddress(CService(), nLocalNodeServices);
|
||||
|
||||
connman.PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
|
||||
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes);
|
||||
connman.PushMessage(pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe,
|
||||
nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes));
|
||||
|
||||
if (fLogIPs)
|
||||
LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), nodeid);
|
||||
|
@ -530,13 +531,13 @@ void MaybeSetPeerAsAnnouncingHeaderAndIDs(const CNodeState* nodestate, CNode* pf
|
|||
// As per BIP152, we only get 3 of our peers to announce
|
||||
// blocks using compact encodings.
|
||||
connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion](CNode* pnodeStop){
|
||||
connman.PushMessage(pnodeStop, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
|
||||
connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
|
||||
return true;
|
||||
});
|
||||
lNodesAnnouncingHeaderAndIDs.pop_front();
|
||||
}
|
||||
fAnnounceUsingCMPCTBLOCK = true;
|
||||
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
|
||||
connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
|
||||
lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId());
|
||||
}
|
||||
}
|
||||
|
@ -4886,9 +4887,8 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
{
|
||||
std::deque<CInv>::iterator it = pfrom->vRecvGetData.begin();
|
||||
unsigned int nMaxSendBufferSize = connman.GetSendBufferSize();
|
||||
|
||||
vector<CInv> vNotFound;
|
||||
|
||||
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
|
||||
LOCK(cs_main);
|
||||
|
||||
while (it != pfrom->vRecvGetData.end()) {
|
||||
|
@ -4942,9 +4942,9 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
if (!ReadBlockFromDisk(block, (*mi).second, consensusParams))
|
||||
assert(!"cannot load block from disk");
|
||||
if (inv.type == MSG_BLOCK)
|
||||
connman.PushMessageWithFlag(pfrom, SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block));
|
||||
else if (inv.type == MSG_WITNESS_BLOCK)
|
||||
connman.PushMessage(pfrom, NetMsgType::BLOCK, block);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, block));
|
||||
else if (inv.type == MSG_FILTERED_BLOCK)
|
||||
{
|
||||
bool sendMerkleBlock = false;
|
||||
|
@ -4957,7 +4957,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
}
|
||||
}
|
||||
if (sendMerkleBlock) {
|
||||
connman.PushMessage(pfrom, NetMsgType::MERKLEBLOCK, merkleBlock);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock));
|
||||
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
|
||||
// This avoids hurting performance by pointlessly requiring a round-trip
|
||||
// Note that there is currently no way for a node to request any single transactions we didn't send here -
|
||||
|
@ -4966,7 +4966,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
// however we MUST always provide at least what the remote peer needs
|
||||
typedef std::pair<unsigned int, uint256> PairType;
|
||||
BOOST_FOREACH(PairType& pair, merkleBlock.vMatchedTxn)
|
||||
connman.PushMessageWithFlag(pfrom, SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *block.vtx[pair.first]);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *block.vtx[pair.first]));
|
||||
}
|
||||
// else
|
||||
// no response
|
||||
|
@ -4978,11 +4978,12 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
// and we don't feel like constructing the object for them, so
|
||||
// instead we respond with the full, non-compact block.
|
||||
bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness;
|
||||
int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
|
||||
if (CanDirectFetch(consensusParams) && mi->second->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) {
|
||||
CBlockHeaderAndShortTxIDs cmpctblock(block, fPeerWantsWitness);
|
||||
connman.PushMessageWithFlag(pfrom, fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
|
||||
} else
|
||||
connman.PushMessageWithFlag(pfrom, fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, block);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, block));
|
||||
}
|
||||
|
||||
// Trigger the peer node to send a getblocks request for the next batch of inventory
|
||||
|
@ -4993,7 +4994,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
// wait for other stuff first.
|
||||
vector<CInv> vInv;
|
||||
vInv.push_back(CInv(MSG_BLOCK, chainActive.Tip()->GetBlockHash()));
|
||||
connman.PushMessage(pfrom, NetMsgType::INV, vInv);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::INV, vInv));
|
||||
pfrom->hashContinue.SetNull();
|
||||
}
|
||||
}
|
||||
|
@ -5003,15 +5004,16 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
// Send stream from relay memory
|
||||
bool push = false;
|
||||
auto mi = mapRelay.find(inv.hash);
|
||||
int nSendFlags = (inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0);
|
||||
if (mi != mapRelay.end()) {
|
||||
connman.PushMessageWithFlag(pfrom, inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *mi->second);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *mi->second));
|
||||
push = true;
|
||||
} else if (pfrom->timeLastMempoolReq) {
|
||||
auto txinfo = mempool.info(inv.hash);
|
||||
// To protect privacy, do not answer getdata using the mempool when
|
||||
// that TX couldn't have been INVed in reply to a MEMPOOL request.
|
||||
if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq) {
|
||||
connman.PushMessageWithFlag(pfrom, inv.type == MSG_TX ? SERIALIZE_TRANSACTION_NO_WITNESS : 0, NetMsgType::TX, *txinfo.tx);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::TX, *txinfo.tx));
|
||||
push = true;
|
||||
}
|
||||
}
|
||||
|
@ -5038,7 +5040,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
|
|||
// do that because they want to know about (and store and rebroadcast and
|
||||
// risk analyze) the dependencies of transactions relevant to them, without
|
||||
// having to download the entire memory pool.
|
||||
connman.PushMessage(pfrom, NetMsgType::NOTFOUND, vNotFound);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::NOTFOUND, vNotFound));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5079,16 +5081,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
|
||||
if (strCommand == NetMsgType::VERSION)
|
||||
{
|
||||
// Feeler connections exist only to verify if address is online.
|
||||
if (pfrom->fFeeler) {
|
||||
assert(pfrom->fInbound == false);
|
||||
pfrom->fDisconnect = true;
|
||||
}
|
||||
|
||||
// Each connection can only send one version message
|
||||
if (pfrom->nVersion != 0)
|
||||
{
|
||||
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message"));
|
||||
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_DUPLICATE, string("Duplicate version message")));
|
||||
LOCK(cs_main);
|
||||
Misbehaving(pfrom->GetId(), 1);
|
||||
return false;
|
||||
|
@ -5108,8 +5104,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
if (pfrom->nServicesExpected & ~pfrom->nServices)
|
||||
{
|
||||
LogPrint("net", "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom->id, pfrom->nServices, pfrom->nServicesExpected);
|
||||
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
|
||||
strprintf("Expected to offer services %08x", pfrom->nServicesExpected));
|
||||
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_NONSTANDARD,
|
||||
strprintf("Expected to offer services %08x", pfrom->nServicesExpected)));
|
||||
pfrom->fDisconnect = true;
|
||||
return false;
|
||||
}
|
||||
|
@ -5118,8 +5114,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
{
|
||||
// disconnect from peers older than this proto version
|
||||
LogPrintf("peer=%d using obsolete version %i; disconnecting\n", pfrom->id, pfrom->nVersion);
|
||||
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
|
||||
strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION));
|
||||
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_OBSOLETE,
|
||||
strprintf("Version must be %d or greater", MIN_PEER_PROTO_VERSION)));
|
||||
pfrom->fDisconnect = true;
|
||||
return false;
|
||||
}
|
||||
|
@ -5176,8 +5172,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
}
|
||||
|
||||
// Change version
|
||||
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::VERACK);
|
||||
pfrom->SetSendVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
|
||||
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERACK));
|
||||
int nSendVersion = std::min(pfrom->nVersion, PROTOCOL_VERSION);
|
||||
pfrom->SetSendVersion(nSendVersion);
|
||||
|
||||
if (!pfrom->fInbound)
|
||||
{
|
||||
|
@ -5200,7 +5197,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// Get recent addresses
|
||||
if (pfrom->fOneShot || pfrom->nVersion >= CADDR_TIME_VERSION || connman.GetAddressCount() < 1000)
|
||||
{
|
||||
connman.PushMessage(pfrom, NetMsgType::GETADDR);
|
||||
connman.PushMessage(pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR));
|
||||
pfrom->fGetAddr = true;
|
||||
}
|
||||
connman.MarkAddressGood(pfrom->addr);
|
||||
|
@ -5220,6 +5217,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
int64_t nTimeOffset = nTime - GetTime();
|
||||
pfrom->nTimeOffset = nTimeOffset;
|
||||
AddTimeData(pfrom->addr, nTimeOffset);
|
||||
|
||||
// Feeler connections exist only to verify if address is online.
|
||||
if (pfrom->fFeeler) {
|
||||
assert(pfrom->fInbound == false);
|
||||
pfrom->fDisconnect = true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -5231,8 +5235,10 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
return false;
|
||||
}
|
||||
|
||||
// At this point, the outgoing message serialization version can't change.
|
||||
CNetMsgMaker msgMaker(pfrom->GetSendVersion());
|
||||
|
||||
else if (strCommand == NetMsgType::VERACK)
|
||||
if (strCommand == NetMsgType::VERACK)
|
||||
{
|
||||
pfrom->SetRecvVersion(min(pfrom->nVersion, PROTOCOL_VERSION));
|
||||
|
||||
|
@ -5247,7 +5253,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// We send this to non-NODE NETWORK peers as well, because even
|
||||
// non-NODE NETWORK peers can announce blocks (such as pruning
|
||||
// nodes)
|
||||
connman.PushMessage(pfrom, NetMsgType::SENDHEADERS);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDHEADERS));
|
||||
}
|
||||
if (pfrom->nVersion >= SHORT_IDS_BLOCKS_VERSION) {
|
||||
// Tell our peer we are willing to provide version 1 or 2 cmpctblocks
|
||||
|
@ -5258,9 +5264,9 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
bool fAnnounceUsingCMPCTBLOCK = false;
|
||||
uint64_t nCMPCTBLOCKVersion = 2;
|
||||
if (pfrom->GetLocalServices() & NODE_WITNESS)
|
||||
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
|
||||
nCMPCTBLOCKVersion = 1;
|
||||
connman.PushMessage(pfrom, NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5385,7 +5391,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// fell back to inv we probably have a reorg which we should get the headers for first,
|
||||
// we now only provide a getheaders response here. When we receive the headers, we will
|
||||
// then ask for the blocks we need.
|
||||
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), inv.hash));
|
||||
LogPrint("net", "getheaders (%d) %s to peer=%d\n", pindexBestHeader->nHeight, inv.hash.ToString(), pfrom->id);
|
||||
}
|
||||
}
|
||||
|
@ -5408,7 +5414,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
}
|
||||
|
||||
if (!vToFetch.empty())
|
||||
connman.PushMessage(pfrom, NetMsgType::GETDATA, vToFetch);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vToFetch));
|
||||
}
|
||||
|
||||
|
||||
|
@ -5520,7 +5526,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
}
|
||||
resp.txn[i] = block.vtx[req.indexes[i]];
|
||||
}
|
||||
connman.PushMessageWithFlag(pfrom, State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCKTXN, resp);
|
||||
int nSendFlags = State(pfrom->GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
|
||||
connman.PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp));
|
||||
}
|
||||
|
||||
|
||||
|
@ -5569,7 +5576,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// headers message). In both cases it's safe to update
|
||||
// pindexBestHeaderSent to be our tip.
|
||||
nodestate->pindexBestHeaderSent = pindex ? pindex : chainActive.Tip();
|
||||
connman.PushMessage(pfrom, NetMsgType::HEADERS, vHeaders);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders));
|
||||
}
|
||||
|
||||
|
||||
|
@ -5732,8 +5739,8 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
pfrom->id,
|
||||
FormatStateMessage(state));
|
||||
if (state.GetRejectCode() < REJECT_INTERNAL) // Never send AcceptToMemoryPool's internal codes over P2P
|
||||
connman.PushMessage(pfrom, NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
|
||||
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), inv.hash);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(),
|
||||
state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), inv.hash));
|
||||
if (nDoS > 0) {
|
||||
Misbehaving(pfrom->GetId(), nDoS);
|
||||
}
|
||||
|
@ -5751,7 +5758,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
if (mapBlockIndex.find(cmpctblock.header.hashPrevBlock) == mapBlockIndex.end()) {
|
||||
// Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers
|
||||
if (!IsInitialBlockDownload())
|
||||
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256()));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -5784,7 +5791,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// so we just grab the block via normal getdata
|
||||
std::vector<CInv> vInv(1);
|
||||
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
|
||||
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -5828,7 +5835,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// Duplicate txindexes, the block is now in-flight, so just request it
|
||||
std::vector<CInv> vInv(1);
|
||||
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
|
||||
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -5852,7 +5859,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman);
|
||||
} else {
|
||||
req.blockhash = pindex->GetBlockHash();
|
||||
connman.PushMessage(pfrom, NetMsgType::GETBLOCKTXN, req);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -5861,7 +5868,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// mempool will probably be useless - request the block normally
|
||||
std::vector<CInv> vInv(1);
|
||||
vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom, pindex->pprev, chainparams.GetConsensus()), cmpctblock.header.GetHash());
|
||||
connman.PushMessage(pfrom, NetMsgType::GETDATA, vInv);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv));
|
||||
return true;
|
||||
} else {
|
||||
// If this was an announce-cmpctblock, we want the same treatment as a header message
|
||||
|
@ -5903,7 +5910,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// Might have collided, fall back to getdata now :(
|
||||
std::vector<CInv> invs;
|
||||
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus()), resp.blockhash));
|
||||
connman.PushMessage(pfrom, NetMsgType::GETDATA, invs);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, invs));
|
||||
} else {
|
||||
// Block is either okay, or possibly we received
|
||||
// READ_STATUS_CHECKBLOCK_FAILED.
|
||||
|
@ -5980,7 +5987,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// nUnconnectingHeaders gets reset back to 0.
|
||||
if (mapBlockIndex.find(headers[0].hashPrevBlock) == mapBlockIndex.end() && nCount < MAX_BLOCKS_TO_ANNOUNCE) {
|
||||
nodestate->nUnconnectingHeaders++;
|
||||
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256());
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexBestHeader), uint256()));
|
||||
LogPrint("net", "received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n",
|
||||
headers[0].GetHash().ToString(),
|
||||
headers[0].hashPrevBlock.ToString(),
|
||||
|
@ -6027,7 +6034,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// TODO: optimize: if pindexLast is an ancestor of chainActive.Tip or pindexBestHeader, continue
|
||||
// from there instead.
|
||||
LogPrint("net", "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom->id, pfrom->nStartingHeight);
|
||||
connman.PushMessage(pfrom, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256());
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexLast), uint256()));
|
||||
}
|
||||
|
||||
bool fCanDirectFetch = CanDirectFetch(chainparams.GetConsensus());
|
||||
|
@ -6080,7 +6087,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// In any case, we want to download using a compact block, not a regular one
|
||||
vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash);
|
||||
}
|
||||
connman.PushMessage(pfrom, NetMsgType::GETDATA, vGetData);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -6184,7 +6191,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
|
|||
// it, if the remote node sends a ping once per second and this node takes 5
|
||||
// seconds to respond to each, the 5th ping the remote sends would appear to
|
||||
// return very quickly.
|
||||
connman.PushMessage(pfrom, NetMsgType::PONG, nonce);
|
||||
connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::PONG, nonce));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6440,7 +6447,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman)
|
|||
}
|
||||
catch (const std::ios_base::failure& e)
|
||||
{
|
||||
connman.PushMessageWithVersion(pfrom, INIT_PROTO_VERSION, NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message"));
|
||||
connman.PushMessage(pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::REJECT, strCommand, REJECT_MALFORMED, string("error parsing message")));
|
||||
if (strstr(e.what(), "end of data"))
|
||||
{
|
||||
// Allow exceptions from under-length message on vRecv
|
||||
|
@ -6505,9 +6512,12 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
const Consensus::Params& consensusParams = Params().GetConsensus();
|
||||
{
|
||||
// Don't send anything until we get its version message
|
||||
if (pto->nVersion == 0)
|
||||
if (pto->nVersion == 0 || pto->fDisconnect)
|
||||
return true;
|
||||
|
||||
// If we get here, the outgoing message serialization version is set and can't change.
|
||||
CNetMsgMaker msgMaker(pto->GetSendVersion());
|
||||
|
||||
//
|
||||
// Message: ping
|
||||
//
|
||||
|
@ -6520,7 +6530,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
// Ping automatically sent as a latency probe & keepalive.
|
||||
pingSend = true;
|
||||
}
|
||||
if (pingSend && !pto->fDisconnect) {
|
||||
if (pingSend) {
|
||||
uint64_t nonce = 0;
|
||||
while (nonce == 0) {
|
||||
GetRandBytes((unsigned char*)&nonce, sizeof(nonce));
|
||||
|
@ -6529,11 +6539,11 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
pto->nPingUsecStart = GetTimeMicros();
|
||||
if (pto->nVersion > BIP0031_VERSION) {
|
||||
pto->nPingNonceSent = nonce;
|
||||
connman.PushMessage(pto, NetMsgType::PING, nonce);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce));
|
||||
} else {
|
||||
// Peer is too old to support ping command with nonce, pong will never arrive.
|
||||
pto->nPingNonceSent = 0;
|
||||
connman.PushMessage(pto, NetMsgType::PING);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6541,6 +6551,28 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
if (!lockMain)
|
||||
return true;
|
||||
|
||||
CNodeState &state = *State(pto->GetId());
|
||||
|
||||
BOOST_FOREACH(const CBlockReject& reject, state.rejects)
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::REJECT, (string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock));
|
||||
state.rejects.clear();
|
||||
|
||||
if (state.fShouldBan) {
|
||||
state.fShouldBan = false;
|
||||
if (pto->fWhitelisted)
|
||||
LogPrintf("Warning: not punishing whitelisted peer %s!\n", pto->addr.ToString());
|
||||
else {
|
||||
pto->fDisconnect = true;
|
||||
if (pto->addr.IsLocal())
|
||||
LogPrintf("Warning: not banning local peer %s!\n", pto->addr.ToString());
|
||||
else
|
||||
{
|
||||
connman.Ban(pto->addr, BanReasonNodeMisbehaving);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Address refresh broadcast
|
||||
int64_t nNow = GetTimeMicros();
|
||||
if (!IsInitialBlockDownload() && pto->nNextLocalAddrSend < nNow) {
|
||||
|
@ -6564,44 +6596,24 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
// receiver rejects addr messages larger than 1000
|
||||
if (vAddr.size() >= 1000)
|
||||
{
|
||||
connman.PushMessage(pto, NetMsgType::ADDR, vAddr);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr));
|
||||
vAddr.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
pto->vAddrToSend.clear();
|
||||
if (!vAddr.empty())
|
||||
connman.PushMessage(pto, NetMsgType::ADDR, vAddr);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr));
|
||||
// we only send the big addr message once
|
||||
if (pto->vAddrToSend.capacity() > 40)
|
||||
pto->vAddrToSend.shrink_to_fit();
|
||||
}
|
||||
|
||||
CNodeState &state = *State(pto->GetId());
|
||||
if (state.fShouldBan) {
|
||||
if (pto->fWhitelisted)
|
||||
LogPrintf("Warning: not punishing whitelisted peer %s!\n", pto->addr.ToString());
|
||||
else {
|
||||
pto->fDisconnect = true;
|
||||
if (pto->addr.IsLocal())
|
||||
LogPrintf("Warning: not banning local peer %s!\n", pto->addr.ToString());
|
||||
else
|
||||
{
|
||||
connman.Ban(pto->addr, BanReasonNodeMisbehaving);
|
||||
}
|
||||
}
|
||||
state.fShouldBan = false;
|
||||
}
|
||||
|
||||
BOOST_FOREACH(const CBlockReject& reject, state.rejects)
|
||||
connman.PushMessage(pto, NetMsgType::REJECT, (string)NetMsgType::BLOCK, reject.chRejectCode, reject.strRejectReason, reject.hashBlock);
|
||||
state.rejects.clear();
|
||||
|
||||
// Start block sync
|
||||
if (pindexBestHeader == NULL)
|
||||
pindexBestHeader = chainActive.Tip();
|
||||
bool fFetch = state.fPreferredDownload || (nPreferredDownload == 0 && !pto->fClient && !pto->fOneShot); // Download if this is a nice peer, or we have no nice peers and this one might do.
|
||||
if (!state.fSyncStarted && !pto->fClient && !pto->fDisconnect && !fImporting && !fReindex) {
|
||||
if (!state.fSyncStarted && !pto->fClient && !fImporting && !fReindex) {
|
||||
// Only actively request headers from a single peer, unless we're close to today.
|
||||
if ((nSyncStarted == 0 && fFetch) || pindexBestHeader->GetBlockTime() > GetAdjustedTime() - 24 * 60 * 60) {
|
||||
state.fSyncStarted = true;
|
||||
|
@ -6617,7 +6629,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
if (pindexStart->pprev)
|
||||
pindexStart = pindexStart->pprev;
|
||||
LogPrint("net", "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->id, pto->nStartingHeight);
|
||||
connman.PushMessage(pto, NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256());
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, chainActive.GetLocator(pindexStart), uint256()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6706,7 +6718,8 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
CBlock block;
|
||||
assert(ReadBlockFromDisk(block, pBestIndex, consensusParams));
|
||||
CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness);
|
||||
connman.PushMessageWithFlag(pto, state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::CMPCTBLOCK, cmpctblock);
|
||||
int nSendFlags = state.fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
|
||||
connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
|
||||
state.pindexBestHeaderSent = pBestIndex;
|
||||
} else if (state.fPreferHeaders) {
|
||||
if (vHeaders.size() > 1) {
|
||||
|
@ -6718,7 +6731,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
LogPrint("net", "%s: sending header %s to peer=%d\n", __func__,
|
||||
vHeaders.front().GetHash().ToString(), pto->id);
|
||||
}
|
||||
connman.PushMessage(pto, NetMsgType::HEADERS, vHeaders);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::HEADERS, vHeaders));
|
||||
state.pindexBestHeaderSent = pBestIndex;
|
||||
} else
|
||||
fRevertToInv = true;
|
||||
|
@ -6764,7 +6777,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
BOOST_FOREACH(const uint256& hash, pto->vInventoryBlockToSend) {
|
||||
vInv.push_back(CInv(MSG_BLOCK, hash));
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
|
||||
vInv.clear();
|
||||
}
|
||||
}
|
||||
|
@ -6810,7 +6823,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
pto->filterInventoryKnown.insert(hash);
|
||||
vInv.push_back(inv);
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
|
||||
vInv.clear();
|
||||
}
|
||||
}
|
||||
|
@ -6876,7 +6889,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
}
|
||||
}
|
||||
if (vInv.size() == MAX_INV_SZ) {
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
|
||||
vInv.clear();
|
||||
}
|
||||
pto->filterInventoryKnown.insert(hash);
|
||||
|
@ -6884,28 +6897,30 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
}
|
||||
}
|
||||
if (!vInv.empty())
|
||||
connman.PushMessage(pto, NetMsgType::INV, vInv);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv));
|
||||
|
||||
// Detect whether we're stalling
|
||||
nNow = GetTimeMicros();
|
||||
if (!pto->fDisconnect && state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
|
||||
if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) {
|
||||
// Stalling only triggers when the block download window cannot move. During normal steady state,
|
||||
// the download window should be much larger than the to-be-downloaded set of blocks, so disconnection
|
||||
// should only happen during initial block download.
|
||||
LogPrintf("Peer=%d is stalling block download, disconnecting\n", pto->id);
|
||||
pto->fDisconnect = true;
|
||||
return true;
|
||||
}
|
||||
// In case there is a block that has been in flight from this peer for 2 + 0.5 * N times the block interval
|
||||
// (with N the number of peers from which we're downloading validated blocks), disconnect due to timeout.
|
||||
// We compensate for other peers to prevent killing off peers due to our own downstream link
|
||||
// being saturated. We only count validated in-flight blocks so peers can't advertise non-existing block hashes
|
||||
// to unreasonably increase our timeout.
|
||||
if (!pto->fDisconnect && state.vBlocksInFlight.size() > 0) {
|
||||
if (state.vBlocksInFlight.size() > 0) {
|
||||
QueuedBlock &queuedBlock = state.vBlocksInFlight.front();
|
||||
int nOtherPeersWithValidatedDownloads = nPeersWithValidatedDownloads - (state.nBlocksInFlightValidHeaders > 0);
|
||||
if (nNow > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) {
|
||||
LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.hash.ToString(), pto->id);
|
||||
pto->fDisconnect = true;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6913,7 +6928,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
// Message: getdata (blocks)
|
||||
//
|
||||
vector<CInv> vGetData;
|
||||
if (!pto->fDisconnect && !pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
if (!pto->fClient && (fFetch || !IsInitialBlockDownload()) && state.nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||
vector<CBlockIndex*> vToDownload;
|
||||
NodeId staller = -1;
|
||||
FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams);
|
||||
|
@ -6935,7 +6950,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
//
|
||||
// Message: getdata (non-blocks)
|
||||
//
|
||||
while (!pto->fDisconnect && !pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
|
||||
while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
|
||||
{
|
||||
const CInv& inv = (*pto->mapAskFor.begin()).second;
|
||||
if (!AlreadyHave(inv))
|
||||
|
@ -6945,7 +6960,7 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
vGetData.push_back(inv);
|
||||
if (vGetData.size() >= 1000)
|
||||
{
|
||||
connman.PushMessage(pto, NetMsgType::GETDATA, vGetData);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
||||
vGetData.clear();
|
||||
}
|
||||
} else {
|
||||
|
@ -6955,20 +6970,20 @@ bool SendMessages(CNode* pto, CConnman& connman)
|
|||
pto->mapAskFor.erase(pto->mapAskFor.begin());
|
||||
}
|
||||
if (!vGetData.empty())
|
||||
connman.PushMessage(pto, NetMsgType::GETDATA, vGetData);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
||||
|
||||
//
|
||||
// Message: feefilter
|
||||
//
|
||||
// We don't want white listed peers to filter txs to us if we have -whitelistforcerelay
|
||||
if (!pto->fDisconnect && pto->nVersion >= FEEFILTER_VERSION && GetBoolArg("-feefilter", DEFAULT_FEEFILTER) &&
|
||||
if (pto->nVersion >= FEEFILTER_VERSION && GetBoolArg("-feefilter", DEFAULT_FEEFILTER) &&
|
||||
!(pto->fWhitelisted && GetBoolArg("-whitelistforcerelay", DEFAULT_WHITELISTFORCERELAY))) {
|
||||
CAmount currentFilter = mempool.GetMinFee(GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK();
|
||||
int64_t timeNow = GetTimeMicros();
|
||||
if (timeNow > pto->nextSendTimeFeeFilter) {
|
||||
CAmount filterToSend = filterRounder.round(currentFilter);
|
||||
if (filterToSend != pto->lastSentFeeFilter) {
|
||||
connman.PushMessage(pto, NetMsgType::FEEFILTER, filterToSend);
|
||||
connman.PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend));
|
||||
pto->lastSentFeeFilter = filterToSend;
|
||||
}
|
||||
pto->nextSendTimeFeeFilter = PoissonNextSend(timeNow, AVG_FEEFILTER_BROADCAST_INTERVAL);
|
||||
|
|
46
src/net.cpp
46
src/net.cpp
|
@ -768,13 +768,13 @@ const uint256& CNetMessage::GetMessageHash() const
|
|||
// requires LOCK(cs_vSend)
|
||||
size_t SocketSendData(CNode *pnode)
|
||||
{
|
||||
std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
|
||||
auto it = pnode->vSendMsg.begin();
|
||||
size_t nSentSize = 0;
|
||||
|
||||
while (it != pnode->vSendMsg.end()) {
|
||||
const CSerializeData &data = *it;
|
||||
const auto &data = *it;
|
||||
assert(data.size() > pnode->nSendOffset);
|
||||
int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
|
||||
int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
|
||||
if (nBytes > 0) {
|
||||
pnode->nLastSend = GetTime();
|
||||
pnode->nSendBytes += nBytes;
|
||||
|
@ -2612,30 +2612,19 @@ void CNode::AskFor(const CInv& inv)
|
|||
mapAskFor.insert(std::make_pair(nRequestTime, inv));
|
||||
}
|
||||
|
||||
CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand)
|
||||
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
|
||||
{
|
||||
return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) };
|
||||
}
|
||||
size_t nMessageSize = msg.data.size();
|
||||
size_t nTotalSize = nMessageSize + CMessageHeader::HEADER_SIZE;
|
||||
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(msg.command.c_str()), nMessageSize, pnode->id);
|
||||
|
||||
void CConnman::EndMessage(CDataStream& strm)
|
||||
{
|
||||
// Set the size
|
||||
assert(strm.size () >= CMessageHeader::HEADER_SIZE);
|
||||
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
|
||||
WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize);
|
||||
// Set the checksum
|
||||
uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end());
|
||||
memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);
|
||||
std::vector<unsigned char> serializedHeader;
|
||||
serializedHeader.reserve(CMessageHeader::HEADER_SIZE);
|
||||
uint256 hash = Hash(msg.data.data(), msg.data.data() + nMessageSize);
|
||||
CMessageHeader hdr(Params().MessageStart(), msg.command.c_str(), nMessageSize);
|
||||
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
|
||||
|
||||
}
|
||||
|
||||
void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand)
|
||||
{
|
||||
if(strm.empty())
|
||||
return;
|
||||
|
||||
unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE;
|
||||
LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id);
|
||||
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, serializedHeader, 0, hdr};
|
||||
|
||||
size_t nBytesSent = 0;
|
||||
{
|
||||
|
@ -2644,11 +2633,14 @@ void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& s
|
|||
return;
|
||||
}
|
||||
bool optimisticSend(pnode->vSendMsg.empty());
|
||||
pnode->vSendMsg.emplace_back(strm.begin(), strm.end());
|
||||
|
||||
//log total amount of bytes per command
|
||||
pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size();
|
||||
pnode->nSendSize += strm.size();
|
||||
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
|
||||
pnode->nSendSize += nTotalSize;
|
||||
|
||||
pnode->vSendMsg.push_back(std::move(serializedHeader));
|
||||
if (nMessageSize)
|
||||
pnode->vSendMsg.push_back(std::move(msg.data));
|
||||
|
||||
// If write queue empty, attempt "optimistic write"
|
||||
if (optimisticSend == true)
|
||||
|
|
49
src/net.h
49
src/net.h
|
@ -101,6 +101,20 @@ class CTransaction;
|
|||
class CNodeStats;
|
||||
class CClientUIInterface;
|
||||
|
||||
struct CSerializedNetMsg
|
||||
{
|
||||
CSerializedNetMsg() = default;
|
||||
CSerializedNetMsg(CSerializedNetMsg&&) = default;
|
||||
CSerializedNetMsg& operator=(CSerializedNetMsg&&) = default;
|
||||
// No copying, only moves.
|
||||
CSerializedNetMsg(const CSerializedNetMsg& msg) = delete;
|
||||
CSerializedNetMsg& operator=(const CSerializedNetMsg&) = delete;
|
||||
|
||||
std::vector<unsigned char> data;
|
||||
std::string command;
|
||||
};
|
||||
|
||||
|
||||
class CConnman
|
||||
{
|
||||
public:
|
||||
|
@ -138,32 +152,7 @@ public:
|
|||
|
||||
bool ForNode(NodeId id, std::function<bool(CNode* pnode)> func);
|
||||
|
||||
template <typename... Args>
|
||||
void PushMessageWithVersionAndFlag(CNode* pnode, int nVersion, int flag, const std::string& sCommand, Args&&... args)
|
||||
{
|
||||
auto msg(BeginMessage(pnode, nVersion, flag, sCommand));
|
||||
::SerializeMany(msg, std::forward<Args>(args)...);
|
||||
EndMessage(msg);
|
||||
PushMessage(pnode, msg, sCommand);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
void PushMessageWithFlag(CNode* pnode, int flag, const std::string& sCommand, Args&&... args)
|
||||
{
|
||||
PushMessageWithVersionAndFlag(pnode, 0, flag, sCommand, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
void PushMessageWithVersion(CNode* pnode, int nVersion, const std::string& sCommand, Args&&... args)
|
||||
{
|
||||
PushMessageWithVersionAndFlag(pnode, nVersion, 0, sCommand, std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
void PushMessage(CNode* pnode, const std::string& sCommand, Args&&... args)
|
||||
{
|
||||
PushMessageWithVersionAndFlag(pnode, 0, 0, sCommand, std::forward<Args>(args)...);
|
||||
}
|
||||
void PushMessage(CNode* pnode, CSerializedNetMsg&& msg);
|
||||
|
||||
template<typename Callable>
|
||||
bool ForEachNodeContinueIf(Callable&& func)
|
||||
|
@ -374,10 +363,6 @@ private:
|
|||
|
||||
unsigned int GetReceiveFloodSize() const;
|
||||
|
||||
CDataStream BeginMessage(CNode* node, int nVersion, int flags, const std::string& sCommand);
|
||||
void PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand);
|
||||
void EndMessage(CDataStream& strm);
|
||||
|
||||
// Network stats
|
||||
void RecordBytesRecv(uint64_t bytes);
|
||||
void RecordBytesSent(uint64_t bytes);
|
||||
|
@ -601,7 +586,7 @@ public:
|
|||
size_t nSendSize; // total size of all vSendMsg entries
|
||||
size_t nSendOffset; // offset inside the first vSendMsg already sent
|
||||
uint64_t nSendBytes;
|
||||
std::deque<CSerializeData> vSendMsg;
|
||||
std::deque<std::vector<unsigned char>> vSendMsg;
|
||||
CCriticalSection cs_vSend;
|
||||
|
||||
std::deque<CInv> vRecvGetData;
|
||||
|
@ -771,7 +756,7 @@ public:
|
|||
{
|
||||
// The send version should always be explicitly set to
|
||||
// INIT_PROTO_VERSION rather than using this value until the handshake
|
||||
// is complete. See PushMessageWithVersion().
|
||||
// is complete.
|
||||
assert(nSendVersion != 0);
|
||||
return nSendVersion;
|
||||
}
|
||||
|
|
36
src/netmessagemaker.h
Normal file
36
src/netmessagemaker.h
Normal file
|
@ -0,0 +1,36 @@
|
|||
// Copyright (c) 2009-2010 Satoshi Nakamoto
|
||||
// Copyright (c) 2009-2016 The Bitcoin Core developers
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#ifndef BITCOIN_NETMESSAGEMAKER_H
|
||||
#define BITCOIN_NETMESSAGEMAKER_H
|
||||
|
||||
#include "net.h"
|
||||
#include "serialize.h"
|
||||
|
||||
class CNetMsgMaker
|
||||
{
|
||||
public:
|
||||
CNetMsgMaker(int nVersionIn) : nVersion(nVersionIn){}
|
||||
|
||||
template <typename... Args>
|
||||
CSerializedNetMsg Make(int nFlags, std::string sCommand, Args&&... args)
|
||||
{
|
||||
CSerializedNetMsg msg;
|
||||
msg.command = std::move(sCommand);
|
||||
CVectorWriter{ SER_NETWORK, nFlags | nVersion, msg.data, 0, std::forward<Args>(args)... };
|
||||
return msg;
|
||||
}
|
||||
|
||||
template <typename... Args>
|
||||
CSerializedNetMsg Make(std::string sCommand, Args&&... args)
|
||||
{
|
||||
return Make(0, std::move(sCommand), std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
private:
|
||||
const int nVersion;
|
||||
};
|
||||
|
||||
#endif // BITCOIN_NETMESSAGEMAKER_H
|
|
@ -69,6 +69,75 @@ OverrideStream<S> WithOrVersion(S* s, int nVersionFlag)
|
|||
return OverrideStream<S>(s, s->GetType(), s->GetVersion() | nVersionFlag);
|
||||
}
|
||||
|
||||
/* Minimal stream for overwriting and/or appending to an existing byte vector
|
||||
*
|
||||
* The referenced vector will grow as necessary
|
||||
*/
|
||||
class CVectorWriter
|
||||
{
|
||||
public:
|
||||
|
||||
/*
|
||||
* @param[in] nTypeIn Serialization Type
|
||||
* @param[in] nVersionIn Serialization Version (including any flags)
|
||||
* @param[in] vchDataIn Referenced byte vector to overwrite/append
|
||||
* @param[in] nPosIn Starting position. Vector index where writes should start. The vector will initially
|
||||
* grow as necessary to max(index, vec.size()). So to append, use vec.size().
|
||||
*/
|
||||
CVectorWriter(int nTypeIn, int nVersionIn, std::vector<unsigned char>& vchDataIn, size_t nPosIn) : nType(nTypeIn), nVersion(nVersionIn), vchData(vchDataIn), nPos(nPosIn)
|
||||
{
|
||||
if(nPos > vchData.size())
|
||||
vchData.resize(nPos);
|
||||
}
|
||||
/*
|
||||
* (other params same as above)
|
||||
* @param[in] args A list of items to serialize starting at nPos.
|
||||
*/
|
||||
template <typename... Args>
|
||||
CVectorWriter(int nTypeIn, int nVersionIn, std::vector<unsigned char>& vchDataIn, size_t nPosIn, Args&&... args) : CVectorWriter(nTypeIn, nVersionIn, vchDataIn, nPosIn)
|
||||
{
|
||||
::SerializeMany(*this, std::forward<Args>(args)...);
|
||||
}
|
||||
void write(const char* pch, size_t nSize)
|
||||
{
|
||||
assert(nPos <= vchData.size());
|
||||
size_t nOverwrite = std::min(nSize, vchData.size() - nPos);
|
||||
if (nOverwrite) {
|
||||
memcpy(vchData.data() + nPos, reinterpret_cast<const unsigned char*>(pch), nOverwrite);
|
||||
}
|
||||
if (nOverwrite < nSize) {
|
||||
vchData.insert(vchData.end(), reinterpret_cast<const unsigned char*>(pch) + nOverwrite, reinterpret_cast<const unsigned char*>(pch) + nSize);
|
||||
}
|
||||
nPos += nSize;
|
||||
}
|
||||
template<typename T>
|
||||
CVectorWriter& operator<<(const T& obj)
|
||||
{
|
||||
// Serialize to this stream
|
||||
::Serialize(*this, obj);
|
||||
return (*this);
|
||||
}
|
||||
int GetVersion() const
|
||||
{
|
||||
return nVersion;
|
||||
}
|
||||
int GetType() const
|
||||
{
|
||||
return nType;
|
||||
}
|
||||
void seek(size_t nSize)
|
||||
{
|
||||
nPos += nSize;
|
||||
if(nPos > vchData.size())
|
||||
vchData.resize(nPos);
|
||||
}
|
||||
private:
|
||||
const int nType;
|
||||
const int nVersion;
|
||||
std::vector<unsigned char>& vchData;
|
||||
size_t nPos;
|
||||
};
|
||||
|
||||
/** Double ended buffer combining vector and stream-like interfaces.
|
||||
*
|
||||
* >> and << read and write unformatted data using the above serialization templates.
|
||||
|
|
|
@ -15,6 +15,64 @@ using namespace boost::assign; // bring 'operator+=()' into scope
|
|||
|
||||
BOOST_FIXTURE_TEST_SUITE(streams_tests, BasicTestingSetup)
|
||||
|
||||
BOOST_AUTO_TEST_CASE(streams_vector_writer)
|
||||
{
|
||||
unsigned char a(1);
|
||||
unsigned char b(2);
|
||||
unsigned char bytes[] = { 3, 4, 5, 6 };
|
||||
std::vector<unsigned char> vch;
|
||||
|
||||
// Each test runs twice. Serializing a second time at the same starting
|
||||
// point should yield the same results, even if the first test grew the
|
||||
// vector.
|
||||
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{1, 2}}));
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{1, 2}}));
|
||||
vch.clear();
|
||||
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2}}));
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2}}));
|
||||
vch.clear();
|
||||
|
||||
vch.resize(5, 0);
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2, 0}}));
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 1, 2, 0}}));
|
||||
vch.clear();
|
||||
|
||||
vch.resize(4, 0);
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 3, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 1, 2}}));
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 3, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 1, 2}}));
|
||||
vch.clear();
|
||||
|
||||
vch.resize(4, 0);
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 4, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 0, 1, 2}}));
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 4, a, b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{0, 0, 0, 0, 1, 2}}));
|
||||
vch.clear();
|
||||
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, FLATDATA(bytes));
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{3, 4, 5, 6}}));
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 0, FLATDATA(bytes));
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{3, 4, 5, 6}}));
|
||||
vch.clear();
|
||||
|
||||
vch.resize(4, 8);
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, FLATDATA(bytes), b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{8, 8, 1, 3, 4, 5, 6, 2}}));
|
||||
CVectorWriter(SER_NETWORK, INIT_PROTO_VERSION, vch, 2, a, FLATDATA(bytes), b);
|
||||
BOOST_CHECK((vch == std::vector<unsigned char>{{8, 8, 1, 3, 4, 5, 6, 2}}));
|
||||
vch.clear();
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(streams_serializedata_xor)
|
||||
{
|
||||
std::vector<char> in;
|
||||
|
|
Loading…
Reference in a new issue