Merge #8969: Decouple peer-processing-logic from block-connection-logic (#2)

f5b960b Move nTimeBestReceived updating into net processing code (Matt Corallo)
d8670fb Move all calls to CheckBlockIndex out of net-processing logic (Matt Corallo)
d6ea737 Remove network state wipe from UnloadBlockIndex. (Matt Corallo)
fc0c24f Move MarkBlockAsReceived out of ProcessNewMessage (Matt Corallo)
65f35eb Move FlushStateToDisk call out of ProcessMessages::TX into ATMP (Matt Corallo)
This commit is contained in:
Wladimir J. van der Laan 2016-11-03 15:45:11 +01:00
commit 3665483be7
No known key found for this signature in database
GPG key ID: 74810B012346C9A6
3 changed files with 49 additions and 29 deletions

View file

@ -1103,6 +1103,10 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
return false; return false;
#endif #endif
// ********************************************************* Step 6: network initialization // ********************************************************* Step 6: network initialization
// Note that we absolutely cannot open any actual connections
// until the very end ("start node") as the UTXO/block state
// is not yet setup and may end up being set up twice if we
// need to reindex later.
assert(!g_connman); assert(!g_connman);
g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max()))); g_connman = std::unique_ptr<CConnman>(new CConnman(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max())));

View file

@ -63,7 +63,7 @@ CCriticalSection cs_main;
BlockMap mapBlockIndex; BlockMap mapBlockIndex;
CChain chainActive; CChain chainActive;
CBlockIndex *pindexBestHeader = NULL; CBlockIndex *pindexBestHeader = NULL;
int64_t nTimeBestReceived = 0; int64_t nTimeBestReceived = 0; // Used only to inform the wallet of when we last received a block
CWaitableCriticalSection csBestBlock; CWaitableCriticalSection csBestBlock;
CConditionVariable cvBlockChange; CConditionVariable cvBlockChange;
int nScriptCheckThreads = 0; int nScriptCheckThreads = 0;
@ -691,6 +691,16 @@ CBlockIndex* FindForkInGlobalIndex(const CChain& chain, const CBlockLocator& loc
CCoinsViewCache *pcoinsTip = NULL; CCoinsViewCache *pcoinsTip = NULL;
CBlockTreeDB *pblocktree = NULL; CBlockTreeDB *pblocktree = NULL;
enum FlushStateMode {
FLUSH_STATE_NONE,
FLUSH_STATE_IF_NEEDED,
FLUSH_STATE_PERIODIC,
FLUSH_STATE_ALWAYS
};
// See definition for documentation
bool static FlushStateToDisk(CValidationState &state, FlushStateMode mode);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// //
// mapOrphanTransactions // mapOrphanTransactions
@ -1581,6 +1591,9 @@ bool AcceptToMemoryPoolWithTime(CTxMemPool& pool, CValidationState &state, const
BOOST_FOREACH(const uint256& hashTx, vHashTxToUncache) BOOST_FOREACH(const uint256& hashTx, vHashTxToUncache)
pcoinsTip->Uncache(hashTx); pcoinsTip->Uncache(hashTx);
} }
// After we've (potentially) uncached entries, ensure our coins cache is still within its size limits
CValidationState stateDummy;
FlushStateToDisk(stateDummy, FLUSH_STATE_PERIODIC);
return res; return res;
} }
@ -2565,13 +2578,6 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
return true; return true;
} }
enum FlushStateMode {
FLUSH_STATE_NONE,
FLUSH_STATE_IF_NEEDED,
FLUSH_STATE_PERIODIC,
FLUSH_STATE_ALWAYS
};
/** /**
* Update the on-disk chain state. * Update the on-disk chain state.
* The caches and indexes are flushed depending on the mode we're called with * The caches and indexes are flushed depending on the mode we're called with
@ -2691,7 +2697,6 @@ void static UpdateTip(CBlockIndex *pindexNew, const CChainParams& chainParams) {
chainActive.SetTip(pindexNew); chainActive.SetTip(pindexNew);
// New best block // New best block
nTimeBestReceived = GetTime();
mempool.AddTransactionsUpdated(1); mempool.AddTransactionsUpdated(1);
cvBlockChange.notify_all(); cvBlockChange.notify_all();
@ -3676,6 +3681,8 @@ static bool AcceptBlockHeader(const CBlockHeader& block, CValidationState& state
if (ppindex) if (ppindex)
*ppindex = pindex; *ppindex = pindex;
CheckBlockIndex(chainparams.GetConsensus());
return true; return true;
} }
@ -3703,6 +3710,11 @@ static bool AcceptBlock(const CBlock& block, CValidationState& state, const CCha
// not process unrequested blocks. // not process unrequested blocks.
bool fTooFarAhead = (pindex->nHeight > int(chainActive.Height() + MIN_BLOCKS_TO_KEEP)); bool fTooFarAhead = (pindex->nHeight > int(chainActive.Height() + MIN_BLOCKS_TO_KEEP));
// TODO: Decouple this function from the block download logic by removing fRequested
// This requires some new chain datastructure to efficiently look up if a
// block is in a chain leading to a candidate for best tip, despite not
// being such a candidate itself.
// TODO: deal better with return value and error conditions for duplicate // TODO: deal better with return value and error conditions for duplicate
// and unrequested blocks. // and unrequested blocks.
if (fAlreadyHave) return true; if (fAlreadyHave) return true;
@ -3751,13 +3763,11 @@ bool ProcessNewBlock(CValidationState& state, const CChainParams& chainparams, C
{ {
{ {
LOCK(cs_main); LOCK(cs_main);
bool fRequested = MarkBlockAsReceived(pblock->GetHash());
fRequested |= fForceProcessing;
// Store to disk // Store to disk
CBlockIndex *pindex = NULL; CBlockIndex *pindex = NULL;
bool fNewBlock = false; bool fNewBlock = false;
bool ret = AcceptBlock(*pblock, state, chainparams, &pindex, fRequested, dbp, &fNewBlock); bool ret = AcceptBlock(*pblock, state, chainparams, &pindex, fForceProcessing, dbp, &fNewBlock);
if (pindex && pfrom) { if (pindex && pfrom) {
mapBlockSource[pindex->GetBlockHash()] = pfrom->GetId(); mapBlockSource[pindex->GetBlockHash()] = pfrom->GetId();
if (fNewBlock) pfrom->nLastBlockTime = GetTime(); if (fNewBlock) pfrom->nLastBlockTime = GetTime();
@ -4269,6 +4279,9 @@ bool RewindBlockIndex(const CChainParams& params)
return true; return true;
} }
// May NOT be used after any connections are up as much
// of the peer-processing logic assumes a consistent
// block index state
void UnloadBlockIndex() void UnloadBlockIndex()
{ {
LOCK(cs_main); LOCK(cs_main);
@ -4279,18 +4292,12 @@ void UnloadBlockIndex()
mempool.clear(); mempool.clear();
mapOrphanTransactions.clear(); mapOrphanTransactions.clear();
mapOrphanTransactionsByPrev.clear(); mapOrphanTransactionsByPrev.clear();
nSyncStarted = 0;
mapBlocksUnlinked.clear(); mapBlocksUnlinked.clear();
vinfoBlockFile.clear(); vinfoBlockFile.clear();
nLastBlockFile = 0; nLastBlockFile = 0;
nBlockSequenceId = 1; nBlockSequenceId = 1;
mapBlockSource.clear();
mapBlocksInFlight.clear();
nPreferredDownload = 0;
setDirtyBlockIndex.clear(); setDirtyBlockIndex.clear();
setDirtyFileInfo.clear(); setDirtyFileInfo.clear();
mapNodeState.clear();
recentRejects.reset(NULL);
versionbitscache.Clear(); versionbitscache.Clear();
for (int b = 0; b < VERSIONBITS_NUM_BITS; b++) { for (int b = 0; b < VERSIONBITS_NUM_BITS; b++) {
warningcache[b].clear(); warningcache[b].clear();
@ -4315,9 +4322,6 @@ bool InitBlockIndex(const CChainParams& chainparams)
{ {
LOCK(cs_main); LOCK(cs_main);
// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
// Check whether we're already initialized // Check whether we're already initialized
if (chainActive.Genesis() != NULL) if (chainActive.Genesis() != NULL)
return true; return true;
@ -4706,6 +4710,11 @@ std::string GetWarnings(const std::string& strFor)
// blockchain -> download logic notification // blockchain -> download logic notification
// //
PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn) : connman(connmanIn) {
// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
}
void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
const int nNewHeight = pindexNew->nHeight; const int nNewHeight = pindexNew->nHeight;
connman->SetBestHeight(nNewHeight); connman->SetBestHeight(nNewHeight);
@ -4732,6 +4741,8 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB
} }
}); });
} }
nTimeBestReceived = GetTime();
} }
void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationState& state) { void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationState& state) {
@ -5690,7 +5701,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
Misbehaving(pfrom->GetId(), nDoS); Misbehaving(pfrom->GetId(), nDoS);
} }
} }
FlushStateToDisk(state, FLUSH_STATE_PERIODIC);
} }
@ -5826,8 +5836,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman); return ProcessMessage(pfrom, NetMsgType::HEADERS, vHeadersMsg, nTimeReceived, chainparams, connman);
} }
} }
CheckBlockIndex(chainparams.GetConsensus());
} }
else if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing else if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing
@ -5859,12 +5867,16 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
std::vector<CInv> invs; std::vector<CInv> invs;
invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus()), resp.blockhash)); invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(pfrom, chainActive.Tip(), chainparams.GetConsensus()), resp.blockhash));
pfrom->PushMessage(NetMsgType::GETDATA, invs); pfrom->PushMessage(NetMsgType::GETDATA, invs);
} else } else {
MarkBlockAsReceived(resp.blockhash); // it is now an empty pointer
fBlockRead = true; fBlockRead = true;
}
} // Don't hold cs_main when we call into ProcessNewBlock } // Don't hold cs_main when we call into ProcessNewBlock
if (fBlockRead) { if (fBlockRead) {
CValidationState state; CValidationState state;
ProcessNewBlock(state, chainparams, pfrom, &block, false, NULL); // Since we requested this block (it was in mapBlocksInFlight), force it to be processed,
// even if it would not be a candidate for new tip (missing previous block, chain not long enough, etc)
ProcessNewBlock(state, chainparams, pfrom, &block, true, NULL);
int nDoS; int nDoS;
if (state.IsInvalid(nDoS)) { if (state.IsInvalid(nDoS)) {
assert (state.GetRejectCode() < REJECT_INTERNAL); // Blocks are never rejected with internal reject codes assert (state.GetRejectCode() < REJECT_INTERNAL); // Blocks are never rejected with internal reject codes
@ -6020,8 +6032,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
} }
} }
} }
CheckBlockIndex(chainparams.GetConsensus());
} }
NotifyHeaderTip(); NotifyHeaderTip();
@ -6040,6 +6050,12 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Such an unrequested block may still be processed, subject to the // Such an unrequested block may still be processed, subject to the
// conditions in AcceptBlock(). // conditions in AcceptBlock().
bool forceProcessing = pfrom->fWhitelisted && !IsInitialBlockDownload(); bool forceProcessing = pfrom->fWhitelisted && !IsInitialBlockDownload();
{
LOCK(cs_main);
// Also always process if we requested the block explicitly, as we may
// need it even though it is not a candidate for a new best tip.
forceProcessing |= MarkBlockAsReceived(block.GetHash());
}
ProcessNewBlock(state, chainparams, pfrom, &block, forceProcessing, NULL); ProcessNewBlock(state, chainparams, pfrom, &block, forceProcessing, NULL);
int nDoS; int nDoS;
if (state.IsInvalid(nDoS)) { if (state.IsInvalid(nDoS)) {

View file

@ -552,7 +552,7 @@ private:
CConnman* connman; CConnman* connman;
public: public:
PeerLogicValidation(CConnman* connmanIn) : connman(connmanIn) {} PeerLogicValidation(CConnman* connmanIn);
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
virtual void BlockChecked(const CBlock& block, const CValidationState& state); virtual void BlockChecked(const CBlock& block, const CValidationState& state);