move block filter specifics out of base.cpp

This commit is contained in:
Brannon King 2020-03-10 10:05:21 -06:00 committed by Anthony Fieroni
parent 05128d3ee9
commit 10a898607f
6 changed files with 63 additions and 86 deletions

View file

@ -4,9 +4,9 @@
#include <limits> #include <limits>
#include <sstream> #include <sstream>
COutPoint::COutPoint() noexcept COutPoint::COutPoint() noexcept : n(std::numeric_limits<uint32_t>::max())
{ {
SetNull();
} }
COutPoint::COutPoint(uint256 hashIn, uint32_t nIn) : hash(std::move(hashIn)), n(nIn) COutPoint::COutPoint(uint256 hashIn, uint32_t nIn) : hash(std::move(hashIn)), n(nIn)

View file

@ -33,24 +33,15 @@ static const sqlite::sqlite_config sharedConfig {
}; };
BaseIndex::DB::DB(const fs::path& path, size_t n_cache_size, bool fMemory, bool fWipe) BaseIndex::DB::DB(const fs::path& path, size_t n_cache_size, bool fMemory, bool fWipe)
: sqlite::database(fMemory ? ":memory:" : (path / "db.sqlite").string(), sharedConfig) : sqlite::database(fMemory ? ":memory:" : path.string() + ".sqlite", sharedConfig)
{ {
applyPragmas(*this, n_cache_size >> 10); // in -KB applyPragmas(*this, n_cache_size >> 10); // in -KB
(*this) << "CREATE TABLE IF NOT EXISTS locator (branch BLOB NOT NULL COLLATE BINARY);"; (*this) << "CREATE TABLE IF NOT EXISTS locator (branch BLOB NOT NULL COLLATE BINARY);";
(*this) << "CREATE TABLE IF NOT EXISTS file_pos (file INTEGER NOT NULL, pos INTEGER NOT NULL);";
(*this) << "CREATE TABLE IF NOT EXISTS block (height INTEGER, hash BLOB NOT NULL COLLATE BINARY, "
"filter_hash BLOB NOT NULL COLLATE BINARY, header BLOB NOT NULL COLLATE BINARY, "
"file INTEGER NOT NULL, pos INTEGER NOT NULL, "
"PRIMARY KEY(height, hash), UNIQUE(filter_hash, header, file, pos));";
if (fWipe) { if (fWipe) {
(*this) << "DELETE FROM locator"; (*this) << "DELETE FROM locator";
(*this) << "DELETE FROM file_pos";
(*this) << "DELETE FROM block";
} }
(*this) << "BEGIN";
} }
bool BaseIndex::DB::ReadBestBlock(CBlockLocator& locator) const bool BaseIndex::DB::ReadBestBlock(CBlockLocator& locator) const
@ -73,24 +64,6 @@ bool BaseIndex::DB::WriteBestBlock(const CBlockLocator& locator)
return (*this).rows_modified() > 0; return (*this).rows_modified() > 0;
} }
bool BaseIndex::DB::ReadFilePos(FlatFilePos& file) const
{
file.SetNull();
bool success = false;
for (auto&& row : (*this) << "SELECT file, pos FROM file_pos") {
row >> file.nFile >> file.nPos;
success = true;
}
return success;
}
bool BaseIndex::DB::WriteFilePos(const FlatFilePos& file)
{
(*this) << "DELETE FROM file_pos";
(*this) << "INSERT INTO file_pos VALUES(?, ?)" << file.nFile << file.nPos;
return rows_modified() > 0;
}
BaseIndex::~BaseIndex() BaseIndex::~BaseIndex()
{ {
Interrupt(); Interrupt();
@ -135,6 +108,7 @@ void BaseIndex::ThreadSync()
const CBlockIndex* pindex = m_best_block_index.load(); const CBlockIndex* pindex = m_best_block_index.load();
if (!m_synced) { if (!m_synced) {
auto& consensus_params = Params().GetConsensus(); auto& consensus_params = Params().GetConsensus();
GetDB() << "BEGIN";
int64_t last_log_time = 0; int64_t last_log_time = 0;
int64_t last_locator_write_time = 0; int64_t last_locator_write_time = 0;
@ -144,7 +118,7 @@ void BaseIndex::ThreadSync()
// No need to handle errors in Commit. If it fails, the error will be already be // No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by // logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state. // a missed commit to disk for an advanced index state.
Commit(); Commit(true);
return; return;
} }
@ -155,10 +129,11 @@ void BaseIndex::ThreadSync()
m_best_block_index = pindex; m_best_block_index = pindex;
m_synced = true; m_synced = true;
// No need to handle errors in Commit. See rationale above. // No need to handle errors in Commit. See rationale above.
Commit(); Commit(true);
break; break;
} }
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) { if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
GetDB() << "ROLLBACK";
FatalError("%s: Failed to rewind index %s to a previous chain tip", FatalError("%s: Failed to rewind index %s to a previous chain tip",
__func__, GetName()); __func__, GetName());
return; return;
@ -178,15 +153,18 @@ void BaseIndex::ThreadSync()
last_locator_write_time = current_time; last_locator_write_time = current_time;
// No need to handle errors in Commit. See rationale above. // No need to handle errors in Commit. See rationale above.
Commit(); Commit();
GetDB() << "BEGIN";
} }
CBlock block; CBlock block;
if (!ReadBlockFromDisk(block, pindex, consensus_params)) { if (!ReadBlockFromDisk(block, pindex, consensus_params)) {
GetDB() << "ROLLBACK";
FatalError("%s: Failed to read block %s from disk", FatalError("%s: Failed to read block %s from disk",
__func__, pindex->GetBlockHash().ToString()); __func__, pindex->GetBlockHash().ToString());
return; return;
} }
if (!WriteBlock(block, pindex)) { if (!WriteBlock(block, pindex)) {
GetDB() << "ROLLBACK";
FatalError("%s: Failed to write block %s to index database", FatalError("%s: Failed to write block %s to index database",
__func__, pindex->GetBlockHash().ToString()); __func__, pindex->GetBlockHash().ToString());
return; return;
@ -201,21 +179,27 @@ void BaseIndex::ThreadSync()
} }
} }
bool BaseIndex::Commit() bool BaseIndex::Commit(bool syncToDisk)
{ {
if (!CommitInternal() || sqlite::commit(GetDB()) != SQLITE_OK) { if (!CommitInternal() || sqlite::commit(GetDB()) != SQLITE_OK) {
GetDB() << "ROLLBACK"; GetDB() << "ROLLBACK";
GetDB() << "BEGIN";
return error("%s: Failed to commit latest %s state", __func__, GetName()); return error("%s: Failed to commit latest %s state", __func__, GetName());
} }
GetDB() << "BEGIN"; if (syncToDisk) {
if (sqlite::sync(GetDB()) != SQLITE_OK)
return error("%s: Unable to sync to disk", __func__);
}
return true; return true;
} }
bool BaseIndex::CommitInternal() bool BaseIndex::CommitInternal()
{ {
LOCK(cs_main); CBlockLocator locator;
return GetDB().WriteBestBlock(::ChainActive().GetLocator(m_best_block_index)); {
LOCK(cs_main);
locator = ::ChainActive().GetLocator(m_best_block_index);
}
return GetDB().WriteBestBlock(locator);
} }
bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip) bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
@ -268,54 +252,18 @@ void BaseIndex::BlockConnected(const std::shared_ptr<const CBlock>& block, const
} }
} }
GetDB() << "BEGIN";
if (WriteBlock(*block, pindex)) { if (WriteBlock(*block, pindex)) {
Commit(true);
m_best_block_index = pindex; m_best_block_index = pindex;
} else { } else {
GetDB() << "ROLLBACK";
FatalError("%s: Failed to write block %s to index", FatalError("%s: Failed to write block %s to index",
__func__, pindex->GetBlockHash().ToString()); __func__, pindex->GetBlockHash().ToString());
return; return;
} }
} }
void BaseIndex::ChainStateFlushed(const CBlockLocator& locator)
{
if (!m_synced) {
return;
}
const uint256& locator_tip_hash = locator.vHave.front();
const CBlockIndex* locator_tip_index;
{
LOCK(cs_main);
locator_tip_index = LookupBlockIndex(locator_tip_hash);
}
if (!locator_tip_index) {
FatalError("%s: First block (hash=%s) in locator was not found",
__func__, locator_tip_hash.ToString());
return;
}
// This checks that ChainStateFlushed callbacks are received after BlockConnected. The check may fail
// immediately after the sync thread catches up and sets m_synced. Consider the case where
// there is a reorg and the blocks on the stale branch are in the ValidationInterface queue
// backlog even after the sync thread has caught up to the new chain tip. In this unlikely
// event, log a warning and let the queue clear.
const CBlockIndex* best_block_index = m_best_block_index.load();
if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) {
LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known best " /* Continued */
"chain (tip=%s); not writing index locator\n",
__func__, locator_tip_hash.ToString(),
best_block_index->GetBlockHash().ToString());
return;
}
// No need to handle errors in Commit. If it fails, the error will be already be logged. The
// best way to recover is to continue, as index cannot be corrupted by a missed commit to disk
// for an advanced index state.
Commit();
}
bool BaseIndex::BlockUntilSyncedToCurrentChain() bool BaseIndex::BlockUntilSyncedToCurrentChain()
{ {
AssertLockNotHeld(cs_main); AssertLockNotHeld(cs_main);

View file

@ -33,10 +33,6 @@ protected:
/// Write block locator of the chain that the txindex is in sync with. /// Write block locator of the chain that the txindex is in sync with.
bool WriteBestBlock(const CBlockLocator& locator); bool WriteBestBlock(const CBlockLocator& locator);
bool ReadFilePos(FlatFilePos& file) const;
bool WriteFilePos(const FlatFilePos& file);
}; };
private: private:
@ -66,14 +62,12 @@ private:
/// from further behind on reboot. If the new state is not a successor of the previous state (due /// from further behind on reboot. If the new state is not a successor of the previous state (due
/// to a chain reorganization), the index must halt until Commit succeeds or else it could end up /// to a chain reorganization), the index must halt until Commit succeeds or else it could end up
/// getting corrupted. /// getting corrupted.
bool Commit(); bool Commit(bool syncToDisk=false);
protected: protected:
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex, void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex,
const std::vector<CTransactionRef>& txn_conflicted) override; const std::vector<CTransactionRef>& txn_conflicted) override;
void ChainStateFlushed(const CBlockLocator& locator) override;
/// Initialize internal state from the database and block index. /// Initialize internal state from the database and block index.
virtual bool Init(); virtual bool Init();

View file

@ -56,17 +56,48 @@ BlockFilterIndex::BlockFilterIndex(BlockFilterType filter_type,
m_name = filter_name + " block filter index"; m_name = filter_name + " block filter index";
m_db = MakeUnique<BaseIndex::DB>(path, n_cache_size, f_memory, f_wipe); m_db = MakeUnique<BaseIndex::DB>(path, n_cache_size, f_memory, f_wipe);
m_filter_fileseq = MakeUnique<FlatFileSeq>(std::move(path), "fltr", FLTR_FILE_CHUNK_SIZE); m_filter_fileseq = MakeUnique<FlatFileSeq>(std::move(path), "fltr", FLTR_FILE_CHUNK_SIZE);
(*m_db) << "CREATE TABLE IF NOT EXISTS block (height INTEGER, hash BLOB NOT NULL COLLATE BINARY, "
"filter_hash BLOB NOT NULL COLLATE BINARY, header BLOB NOT NULL COLLATE BINARY, "
"file INTEGER NOT NULL, pos INTEGER NOT NULL, "
"PRIMARY KEY(height, hash), UNIQUE(filter_hash, header, file, pos));";
(*m_db) << "CREATE TABLE IF NOT EXISTS file_pos (file INTEGER NOT NULL, pos INTEGER NOT NULL);";
if (f_wipe) {
(*m_db) << "DELETE FROM file_pos";
(*m_db) << "DELETE FROM block";
}
} }
bool BlockFilterIndex::Init() bool BlockFilterIndex::Init()
{ {
if (!m_db->ReadFilePos(m_next_filter_pos)) { if (!ReadFilePos(m_next_filter_pos)) {
m_next_filter_pos.nFile = 0; m_next_filter_pos.nFile = 0;
m_next_filter_pos.nPos = 0; m_next_filter_pos.nPos = 0;
} }
return BaseIndex::Init(); return BaseIndex::Init();
} }
bool BlockFilterIndex::ReadFilePos(FlatFilePos& file) const
{
file.SetNull();
bool success = false;
for (auto&& row : (*m_db) << "SELECT file, pos FROM file_pos") {
row >> file.nFile >> file.nPos;
success = true;
}
return success;
}
bool BlockFilterIndex::WriteFilePos(const FlatFilePos& file)
{
(*m_db) << "DELETE FROM file_pos";
(*m_db) << "INSERT INTO file_pos VALUES(?, ?)" << file.nFile << file.nPos;
return m_db->rows_modified() > 0;
}
bool BlockFilterIndex::CommitInternal() bool BlockFilterIndex::CommitInternal()
{ {
const FlatFilePos& pos = m_next_filter_pos; const FlatFilePos& pos = m_next_filter_pos;
@ -222,7 +253,7 @@ bool BlockFilterIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex*
// The latest filter position gets written in Commit by the call to the BaseIndex::Rewind. // The latest filter position gets written in Commit by the call to the BaseIndex::Rewind.
// But since this creates new references to the filter, the position should get updated here // But since this creates new references to the filter, the position should get updated here
// atomically as well in case Commit fails. // atomically as well in case Commit fails.
if (!m_db->WriteFilePos(m_next_filter_pos)) return false; if (!WriteFilePos(m_next_filter_pos)) return false;
return BaseIndex::Rewind(current_tip, new_tip); return BaseIndex::Rewind(current_tip, new_tip);
} }

View file

@ -43,6 +43,10 @@ protected:
const char* GetName() const override { return m_name.c_str(); } const char* GetName() const override { return m_name.c_str(); }
bool ReadFilePos(FlatFilePos& file) const;
bool WriteFilePos(const FlatFilePos& file);
public: public:
/** Constructs the index, which becomes available to be queried. */ /** Constructs the index, which becomes available to be queried. */
explicit BlockFilterIndex(BlockFilterType filter_type, explicit BlockFilterIndex(BlockFilterType filter_type,

View file

@ -20,7 +20,7 @@
#include <vector> #include <vector>
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <boost/test/output_test_stream.hpp> #include <boost/test/tools/output_test_stream.hpp>
BOOST_FIXTURE_TEST_SUITE(bloom_tests, BasicTestingSetup) BOOST_FIXTURE_TEST_SUITE(bloom_tests, BasicTestingSetup)