Merge #13151: net: Serve blocks directly from disk when possible

0bf431870e net: Serve blocks directly from disk when possible (Wladimir J. van der Laan)

Pull request description:

  In `ProcessGetBlockData`, send the block data directly from disk if type MSG_WITNESS_BLOCK is requested. This is a valid shortcut as the on-disk format matches the network format.

  This is expected to increase performance because a deserialization and subsequent serialization roundtrip is avoided.

Tree-SHA512: 9a9500b4c1354eaae1a6f1c6ef2416c1c1985029852589266f3a70e808f6c7482c135e9ab251a527566935378ab7c32dba4ed43ba5451e802d8e72b77d1ba472
This commit is contained in:
Wladimir J. van der Laan 2018-05-23 19:31:38 +02:00
commit 7f4db9a7c3
No known key found for this signature in database
GPG key ID: 1E4AED62986CD25D
3 changed files with 107 additions and 47 deletions

View file

@ -1070,12 +1070,13 @@ static void RelayAddress(const CAddress& addr, bool fReachable, CConnman* connma
connman->ForEachNodeThen(std::move(sortfunc), std::move(pushfunc)); connman->ForEachNodeThen(std::move(sortfunc), std::move(pushfunc));
} }
void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensusParams, const CInv& inv, CConnman* connman, const std::atomic<bool>& interruptMsgProc) void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, const CInv& inv, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
{ {
bool send = false; bool send = false;
std::shared_ptr<const CBlock> a_recent_block; std::shared_ptr<const CBlock> a_recent_block;
std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block; std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block;
bool fWitnessesPresentInARecentCompactBlock; bool fWitnessesPresentInARecentCompactBlock;
const Consensus::Params& consensusParams = chainparams.GetConsensus();
{ {
LOCK(cs_most_recent_block); LOCK(cs_most_recent_block);
a_recent_block = most_recent_block; a_recent_block = most_recent_block;
@ -1142,6 +1143,15 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus
std::shared_ptr<const CBlock> pblock; std::shared_ptr<const CBlock> pblock;
if (a_recent_block && a_recent_block->GetHash() == pindex->GetBlockHash()) { if (a_recent_block && a_recent_block->GetHash() == pindex->GetBlockHash()) {
pblock = a_recent_block; pblock = a_recent_block;
} else if (inv.type == MSG_WITNESS_BLOCK) {
// Fast-path: in this case it is possible to serve the block directly from disk,
// as the network format matches the format on disk
std::vector<uint8_t> block_data;
if (!ReadRawBlockFromDisk(block_data, pindex, chainparams.MessageStart())) {
assert(!"cannot load block from disk");
}
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, MakeSpan(block_data)));
// Don't set pblock as we've sent the block
} else { } else {
// Send block from disk // Send block from disk
std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>(); std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>();
@ -1149,53 +1159,55 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus
assert(!"cannot load block from disk"); assert(!"cannot load block from disk");
pblock = pblockRead; pblock = pblockRead;
} }
if (inv.type == MSG_BLOCK) if (pblock) {
connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock)); if (inv.type == MSG_BLOCK)
else if (inv.type == MSG_WITNESS_BLOCK) connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock));
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); else if (inv.type == MSG_WITNESS_BLOCK)
else if (inv.type == MSG_FILTERED_BLOCK) connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock));
{ else if (inv.type == MSG_FILTERED_BLOCK)
bool sendMerkleBlock = false;
CMerkleBlock merkleBlock;
{ {
LOCK(pfrom->cs_filter); bool sendMerkleBlock = false;
if (pfrom->pfilter) { CMerkleBlock merkleBlock;
sendMerkleBlock = true; {
merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter); LOCK(pfrom->cs_filter);
if (pfrom->pfilter) {
sendMerkleBlock = true;
merkleBlock = CMerkleBlock(*pblock, *pfrom->pfilter);
}
} }
if (sendMerkleBlock) {
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock));
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see
// This avoids hurting performance by pointlessly requiring a round-trip
// Note that there is currently no way for a node to request any single transactions we didn't send here -
// they must either disconnect and retry or request the full block.
// Thus, the protocol spec specified allows for us to provide duplicate txn here,
// however we MUST always provide at least what the remote peer needs
typedef std::pair<unsigned int, uint256> PairType;
for (PairType& pair : merkleBlock.vMatchedTxn)
connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first]));
}
// else
// no response
} }
if (sendMerkleBlock) { else if (inv.type == MSG_CMPCT_BLOCK)
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::MERKLEBLOCK, merkleBlock)); {
// CMerkleBlock just contains hashes, so also push any transactions in the block the client did not see // If a peer is asking for old blocks, we're almost guaranteed
// This avoids hurting performance by pointlessly requiring a round-trip // they won't have a useful mempool to match against a compact block,
// Note that there is currently no way for a node to request any single transactions we didn't send here - // and we don't feel like constructing the object for them, so
// they must either disconnect and retry or request the full block. // instead we respond with the full, non-compact block.
// Thus, the protocol spec specified allows for us to provide duplicate txn here, bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness;
// however we MUST always provide at least what the remote peer needs int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
typedef std::pair<unsigned int, uint256> PairType; if (CanDirectFetch(consensusParams) && pindex->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) {
for (PairType& pair : merkleBlock.vMatchedTxn) if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == pindex->GetBlockHash()) {
connman->PushMessage(pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::TX, *pblock->vtx[pair.first])); connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block));
} } else {
// else CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness);
// no response connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
} }
else if (inv.type == MSG_CMPCT_BLOCK)
{
// If a peer is asking for old blocks, we're almost guaranteed
// they won't have a useful mempool to match against a compact block,
// and we don't feel like constructing the object for them, so
// instead we respond with the full, non-compact block.
bool fPeerWantsWitness = State(pfrom->GetId())->fWantsCmpctWitness;
int nSendFlags = fPeerWantsWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS;
if (CanDirectFetch(consensusParams) && pindex->nHeight >= chainActive.Height() - MAX_CMPCTBLOCK_DEPTH) {
if ((fPeerWantsWitness || !fWitnessesPresentInARecentCompactBlock) && a_recent_compact_block && a_recent_compact_block->header.GetHash() == pindex->GetBlockHash()) {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *a_recent_compact_block));
} else { } else {
CBlockHeaderAndShortTxIDs cmpctblock(*pblock, fPeerWantsWitness); connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock));
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock));
} }
} else {
connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCK, *pblock));
} }
} }
@ -1213,7 +1225,7 @@ void static ProcessGetBlockData(CNode* pfrom, const Consensus::Params& consensus
} }
} }
void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParams, CConnman* connman, const std::atomic<bool>& interruptMsgProc) void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc)
{ {
AssertLockNotHeld(cs_main); AssertLockNotHeld(cs_main);
@ -1262,7 +1274,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
const CInv &inv = *it; const CInv &inv = *it;
if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) { if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) {
it++; it++;
ProcessGetBlockData(pfrom, consensusParams, inv, connman, interruptMsgProc); ProcessGetBlockData(pfrom, chainparams, inv, connman, interruptMsgProc);
} }
} }
@ -1972,7 +1984,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
} }
pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end()); pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
} }
@ -2942,7 +2954,7 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
bool fMoreWork = false; bool fMoreWork = false;
if (!pfrom->vRecvGetData.empty()) if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams.GetConsensus(), connman, interruptMsgProc); ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
if (pfrom->fDisconnect) if (pfrom->fDisconnect)
return false; return false;

View file

@ -1137,6 +1137,52 @@ bool ReadBlockFromDisk(CBlock& block, const CBlockIndex* pindex, const Consensus
return true; return true;
} }
bool ReadRawBlockFromDisk(std::vector<uint8_t>& block, const CDiskBlockPos& pos, const CMessageHeader::MessageStartChars& message_start)
{
CDiskBlockPos hpos = pos;
hpos.nPos -= 8; // Seek back 8 bytes for meta header
CAutoFile filein(OpenBlockFile(hpos, true), SER_DISK, CLIENT_VERSION);
if (filein.IsNull()) {
return error("%s: OpenBlockFile failed for %s", __func__, pos.ToString());
}
try {
CMessageHeader::MessageStartChars blk_start;
unsigned int blk_size;
filein >> blk_start >> blk_size;
if (memcmp(blk_start, message_start, CMessageHeader::MESSAGE_START_SIZE)) {
return error("%s: Block magic mismatch for %s: %s versus expected %s", __func__, pos.ToString(),
HexStr(blk_start, blk_start + CMessageHeader::MESSAGE_START_SIZE),
HexStr(message_start, message_start + CMessageHeader::MESSAGE_START_SIZE));
}
if (blk_size > MAX_SIZE) {
return error("%s: Block data is larger than maximum deserialization size for %s: %s versus %s", __func__, pos.ToString(),
blk_size, MAX_SIZE);
}
block.resize(blk_size); // Zeroing of memory is intentional here
filein.read((char*)block.data(), blk_size);
} catch(const std::exception& e) {
return error("%s: Read from block file failed: %s for %s", __func__, e.what(), pos.ToString());
}
return true;
}
bool ReadRawBlockFromDisk(std::vector<uint8_t>& block, const CBlockIndex* pindex, const CMessageHeader::MessageStartChars& message_start)
{
CDiskBlockPos block_pos;
{
LOCK(cs_main);
block_pos = pindex->GetBlockPos();
}
return ReadRawBlockFromDisk(block, block_pos, message_start);
}
CAmount GetBlockSubsidy(int nHeight, const Consensus::Params& consensusParams) CAmount GetBlockSubsidy(int nHeight, const Consensus::Params& consensusParams)
{ {
int halvings = nHeight / consensusParams.nSubsidyHalvingInterval; int halvings = nHeight / consensusParams.nSubsidyHalvingInterval;

View file

@ -398,6 +398,8 @@ void InitScriptExecutionCache();
/** Functions for disk access for blocks */ /** Functions for disk access for blocks */
bool ReadBlockFromDisk(CBlock& block, const CDiskBlockPos& pos, const Consensus::Params& consensusParams); bool ReadBlockFromDisk(CBlock& block, const CDiskBlockPos& pos, const Consensus::Params& consensusParams);
bool ReadBlockFromDisk(CBlock& block, const CBlockIndex* pindex, const Consensus::Params& consensusParams); bool ReadBlockFromDisk(CBlock& block, const CBlockIndex* pindex, const Consensus::Params& consensusParams);
bool ReadRawBlockFromDisk(std::vector<uint8_t>& block, const CDiskBlockPos& pos, const CMessageHeader::MessageStartChars& message_start);
bool ReadRawBlockFromDisk(std::vector<uint8_t>& block, const CBlockIndex* pindex, const CMessageHeader::MessageStartChars& message_start);
/** Functions for validating blocks and updating the block tree */ /** Functions for validating blocks and updating the block tree */