From cb48bc3f2aef1c72c35820c7c04aa0bd5a98b739 Mon Sep 17 00:00:00 2001 From: Brannon King Date: Wed, 7 Aug 2019 16:42:06 -0600 Subject: [PATCH] replaced leveldb with sqlite for claimtrie db --- src/Makefile.am | 3 +- src/Makefile.bench.include | 4 +- src/Makefile.test.include | 4 +- src/claimtrie.cpp | 14 +- src/claimtrie.h | 4 +- src/claimtrieforks.cpp | 16 +- src/sqlwrapper.h | 323 ++++++++++++++++++++++++++ src/test/claimtriebranching_tests.cpp | 10 +- 8 files changed, 351 insertions(+), 27 deletions(-) create mode 100644 src/sqlwrapper.h diff --git a/src/Makefile.am b/src/Makefile.am index e88e21a60..a40e80b11 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -131,6 +131,7 @@ BITCOIN_CORE_H = \ keystore.h \ lbry.h \ dbwrapper.h \ + sqlwrapper.h \ limitedmap.h \ logging.h \ memusage.h \ @@ -479,7 +480,7 @@ lbrycrdd_LDADD = \ $(LIBMEMENV) \ $(LIBSECP256K1) -lbrycrdd_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) $(ZMQ_LIBS) +lbrycrdd_LDADD += -lsqlite3 $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) $(ZMQ_LIBS) -lsqlite3 # lbrycrd-cli binary # lbrycrd_cli_SOURCES = bitcoin-cli.cpp diff --git a/src/Makefile.bench.include b/src/Makefile.bench.include index 0462ce04f..c3f4f4b3d 100644 --- a/src/Makefile.bench.include +++ b/src/Makefile.bench.include @@ -34,7 +34,7 @@ nodist_bench_bench_bitcoin_SOURCES = $(GENERATED_BENCH_FILES) bench_bench_bitcoin_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(EVENT_CLFAGS) $(EVENT_PTHREADS_CFLAGS) $(BOOST_CPPFLAGS) -I$(builddir)/bench/ bench_bench_bitcoin_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS) -bench_bench_bitcoin_LDADD = \ +bench_bench_bitcoin_LDADD = -lsqlite3 \ $(LIBBITCOIN_WALLET) \ $(LIBBITCOIN_SERVER) \ $(LIBBITCOIN_COMMON) \ @@ -55,7 +55,7 @@ if ENABLE_WALLET bench_bench_bitcoin_SOURCES += bench/coin_selection.cpp endif -bench_bench_bitcoin_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) +bench_bench_bitcoin_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) -lsqlite3 bench_bench_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) CLEAN_BITCOIN_BENCH = bench/*.gcda bench/*.gcno $(GENERATED_BENCH_FILES) diff --git a/src/Makefile.test.include b/src/Makefile.test.include index 0e18a3ba8..5251eac28 100644 --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -121,11 +121,11 @@ if ENABLE_WALLET test_test_lbrycrd_LDADD += $(LIBBITCOIN_WALLET) endif -test_test_lbrycrd_LDADD += $(LIBBITCOIN_SERVER) $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CONSENSUS) $(LIBBITCOIN_CRYPTO) $(LIBUNIVALUE) \ +test_test_lbrycrd_LDADD += -lsqlite3 $(LIBBITCOIN_SERVER) $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CONSENSUS) $(LIBBITCOIN_CRYPTO) $(LIBUNIVALUE) \ $(LIBLEVELDB) $(LIBLEVELDB_SSE42) $(LIBMEMENV) $(BOOST_LIBS) $(BOOST_UNIT_TEST_FRAMEWORK_LIB) $(LIBSECP256K1) $(EVENT_LIBS) $(EVENT_PTHREADS_LIBS) test_test_lbrycrd_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS) -test_test_lbrycrd_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) +test_test_lbrycrd_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) -lsqlite3 test_test_lbrycrd_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static if ENABLE_ZMQ diff --git a/src/claimtrie.cpp b/src/claimtrie.cpp index 0ddf7bad5..4c32354ee 100644 --- a/src/claimtrie.cpp +++ b/src/claimtrie.cpp @@ -127,7 +127,7 @@ void CClaimTrieData::reorderClaims(const supportEntryType& supports) CClaimTrie::CClaimTrie(bool fMemory, bool fWipe, int proportionalDelayFactor) { nProportionalDelayFactor = proportionalDelayFactor; - db.reset(new CDBWrapper(GetDataDir() / "claimtrie", 200 * 1024 * 1024, fMemory, fWipe, false)); + db.reset(new CSqlWrapper(GetDataDir() / "claimtrie.sqlite3.db", 200 * 1024 * 1024, fMemory, fWipe)); } bool CClaimTrie::SyncToDisk() @@ -136,7 +136,7 @@ bool CClaimTrie::SyncToDisk() } template -typename Container::value_type* getQueue(CDBWrapper& db, uint8_t dbkey, const Key& key, Container& queue, bool create) +typename Container::value_type* getQueue(CSqlWrapper& db, uint8_t dbkey, const Key& key, Container& queue, bool create) { auto itQueue = queue.find(key); if (itQueue != queue.end()) @@ -522,7 +522,7 @@ bool CClaimTrieCacheBase::getClaimById(const uint160& claimId, std::string& name } template -void BatchWrite(CDBBatch& batch, uint8_t dbkey, const K& key, const std::vector& value) +void BatchWrite(CSqlBatch& batch, uint8_t dbkey, const K& key, const std::vector& value) { if (value.empty()) { batch.Erase(std::make_pair(dbkey, key)); @@ -532,7 +532,7 @@ void BatchWrite(CDBBatch& batch, uint8_t dbkey, const K& key, const std::vector< } template -void BatchWriteQueue(CDBBatch& batch, uint8_t dbkey, const Container& queue) +void BatchWriteQueue(CSqlBatch& batch, uint8_t dbkey, const Container& queue) { for (auto& itQueue : queue) BatchWrite(batch, dbkey, itQueue.first, itQueue.second); @@ -540,7 +540,7 @@ void BatchWriteQueue(CDBBatch& batch, uint8_t dbkey, const Container& queue) bool CClaimTrieCacheBase::flush() { - CDBBatch batch(*(base->db)); + CSqlBatch batch(*(base->db)); for (const auto& claim : claimsToDeleteFromByIdIndex) { auto it = std::find_if(claimsToAddToByIdIndex.begin(), claimsToAddToByIdIndex.end(), @@ -596,8 +596,8 @@ bool CClaimTrieCacheBase::flush() base->nNextHeight = nNextHeight; if (!nodesToAddOrUpdate.empty() && (LogAcceptCategory(BCLog::CLAIMS) || LogAcceptCategory(BCLog::BENCH))) { - LogPrintf("TrieCache size: %zu nodes on block %d, batch writes %zu bytes.\n", - nodesToAddOrUpdate.height(), nNextHeight, batch.SizeEstimate()); + LogPrintf("TrieCache size: %zu nodes on block %d.\n", + nodesToAddOrUpdate.height(), nNextHeight); } auto ret = base->db->WriteBatch(batch); clear(); diff --git a/src/claimtrie.h b/src/claimtrie.h index c3f75cd94..c8b6169e1 100644 --- a/src/claimtrie.h +++ b/src/claimtrie.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -321,7 +321,7 @@ class CClaimTrie { int nNextHeight = 0; int nProportionalDelayFactor = 0; - std::unique_ptr db; + std::unique_ptr db; public: CClaimTrie() = default; diff --git a/src/claimtrieforks.cpp b/src/claimtrieforks.cpp index ab823c546..9b86205d8 100644 --- a/src/claimtrieforks.cpp +++ b/src/claimtrieforks.cpp @@ -61,22 +61,22 @@ bool CClaimTrieCacheExpirationFork::forkForExpirationChange(bool increment) */ //look through db for expiration queues, if we haven't already found it in dirty expiration queue - boost::scoped_ptr pcursor(base->db->NewIterator()); - for (pcursor->SeekToFirst(); pcursor->Valid(); pcursor->Next()) { + CSqlIterator pcursor(*(base->db)); + for (pcursor.SeekToFirst(); pcursor.Valid(); pcursor.Next()) { std::pair key; - if (!pcursor->GetKey(key)) + if (!pcursor.GetKey(key)) continue; int height = key.second; if (key.first == CLAIM_EXP_QUEUE_ROW) { expirationQueueRowType row; - if (pcursor->GetValue(row)) { + if (pcursor.GetValue(row)) { reactivateClaim(row, height, increment); } else { return error("%s(): error reading expiration queue rows from disk", __func__); } } else if (key.first == SUPPORT_EXP_QUEUE_ROW) { expirationQueueRowType row; - if (pcursor->GetValue(row)) { + if (pcursor.GetValue(row)) { reactivateSupport(row, height, increment); } else { return error("%s(): error reading support expiration queue rows from disk", __func__); @@ -161,10 +161,10 @@ bool CClaimTrieCacheNormalizationFork::normalizeAllNamesInTrieIfNecessary(insert // run the one-time upgrade of all names that need to change // it modifies the (cache) trie as it goes, so we need to grab everything to be modified first - boost::scoped_ptr pcursor(base->db->NewIterator()); - for (pcursor->SeekToFirst(); pcursor->Valid(); pcursor->Next()) { + CSqlIterator pcursor(*(base->db)); + for (pcursor.SeekToFirst(); pcursor.Valid(); pcursor.Next()) { std::pair key; - if (!pcursor->GetKey(key) || key.first != TRIE_NODE_BY_NAME) + if (!pcursor.GetKey(key) || key.first != TRIE_NODE_BY_NAME) continue; const auto& name = key.second; diff --git a/src/sqlwrapper.h b/src/sqlwrapper.h new file mode 100644 index 000000000..198cbc374 --- /dev/null +++ b/src/sqlwrapper.h @@ -0,0 +1,323 @@ +// Copyright (c) 2012-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include + +class CSqlBatch; +class CSqlIterator; + +class CSqlWrapper +{ + friend CSqlBatch; + friend CSqlIterator; + sqlite3 *pdb; + mutable CDataStream ssKey; + mutable CDataStream ssValue; + + void VerifyRC(int rc, int target, int line) const { + if (rc != target) { + auto error = sqlite3_errmsg(pdb); + LogPrintf("Query error %d on line %d: %s\n", rc, line, error); + assert(rc == target); + } + } + +public: + /** + * @param[in] path Location in the filesystem where leveldb data will be stored. + * @param[in] nCacheSize Configures various leveldb cache settings. + * @param[in] fMemory If true, use leveldb's memory environment. + * @param[in] fWipe If true, remove all existing data. + * @param[in] obfuscate If true, store data obfuscated via simple XOR. If false, XOR + * with a zero'd byte array. + */ + CSqlWrapper(const fs::path& path, size_t nCacheSize, bool fMemory = false, bool fWipe = false): + ssKey(SER_DISK, CLIENT_VERSION), ssValue(SER_DISK, CLIENT_VERSION) { + int rc = sqlite3_open_v2(fMemory ? ":memory:" : path.c_str(), &pdb, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr); + if (rc != SQLITE_OK) + { + LogPrintf("ERROR: Unable to open %s. Message: %s\n", path, sqlite3_errmsg(pdb)); + assert(rc == SQLITE_OK); + } + char* error = nullptr; + rc = sqlite3_exec(pdb, "CREATE TABLE IF NOT EXISTS kv (key BLOB PRIMARY KEY, value BLOB);", nullptr, nullptr, &error); + if (rc != SQLITE_OK) { + LogPrintf("ERROR: Unable to create kv table. Message: %s\n", error); + sqlite3_free(error); + sqlite3_close(pdb); + assert(rc == SQLITE_OK); + } + + if (fWipe) { + rc = sqlite3_exec(pdb, "DELETE FROM kv;", nullptr, nullptr, &error); + if (rc != SQLITE_OK) { + LogPrintf("ERROR: Unable to drop kv table. Message: %s\n", error); + sqlite3_free(error); + sqlite3_close(pdb); + assert(rc == SQLITE_OK); + } + } + std::string pragmas = "PRAGMA cache_size=-" + std::to_string(nCacheSize >> 10) + + "; PRAGMA journal_mode=WAL; PRAGMA temp_store=MEMORY; PRAGMA synchronous=NORMAL; PRAGMA wal_autocheckpoint=10000;"; + rc = sqlite3_exec(pdb, pragmas.c_str(), nullptr, nullptr, &error); + if (rc != SQLITE_OK) { + LogPrintf("ERROR: Unable to set cache size. Message: %s\n", error); + sqlite3_free(error); + sqlite3_close(pdb); + assert(rc == SQLITE_OK); + } + } + ~CSqlWrapper() { + sqlite3_close(pdb); + } + + CSqlWrapper(const CSqlWrapper&) = delete; + + template + bool Read(const K& key, V& value) const + { + ssKey << key; + + sqlite3_stmt *stmt = nullptr; + auto rc = sqlite3_prepare_v2(pdb, "SELECT value FROM kv WHERE key = ?", -1, &stmt, nullptr); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_step(stmt); + bool ret = false; + if (rc == SQLITE_ROW) { + auto blob_size = sqlite3_column_bytes(stmt, 0); + auto blob = reinterpret_cast(sqlite3_column_blob(stmt, 0)); // released on step/finalize + if (blob_size && blob) { + try { + CDataStream ssValue(blob, blob + blob_size, SER_DISK, CLIENT_VERSION); + ssValue >> value; + ret = true; + } catch (const std::exception &) {} + } + } + sqlite3_finalize(stmt); + ssKey.clear(); + return ret; + } + + template + bool Write(const K& key, const V& value, bool fSync = false) + { + ssKey << key; + ssValue << value; + + sqlite3_stmt *stmt = nullptr; + auto rc = sqlite3_prepare_v2(pdb, "REPLACE INTO kv VALUES(?, ?)", -1, &stmt, nullptr); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_bind_blob(stmt, 2, ssValue.data(), ssValue.size(), SQLITE_STATIC); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_step(stmt); + VerifyRC(rc, SQLITE_DONE, __LINE__); + auto ret = sqlite3_changes(pdb) > 0; + sqlite3_finalize(stmt); + + ssKey.clear(); + ssValue.clear(); + + return ret; + } + + template + bool Exists(const K& key) const + { + ssKey << key; + + sqlite3_stmt *stmt = nullptr; + auto rc = sqlite3_prepare_v2(pdb, "SELECT 1 FROM kv WHERE key = ?", -1, &stmt, nullptr); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_step(stmt); + bool ret = false; + if (rc == SQLITE_ROW) + ret = 1 == sqlite3_column_int(stmt, 0); + sqlite3_finalize(stmt); + + ssKey.clear(); + return ret; + } + + template + bool Erase(const K& key, bool fSync = false) + { + ssKey << key; + + sqlite3_stmt *stmt = nullptr; + auto rc = sqlite3_prepare_v2(pdb, "DELETE FROM kv WHERE key = ?", -1, &stmt, nullptr); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC); + VerifyRC(rc, SQLITE_OK, __LINE__); + rc = sqlite3_step(stmt); + VerifyRC(rc, SQLITE_DONE, __LINE__); + auto ret = sqlite3_changes(pdb) > 0; + sqlite3_finalize(stmt); + ssKey.clear(); + if (fSync) Sync(); + return ret; + } + + bool WriteBatch(CSqlBatch& batch, bool fSync = false) { + int rc = sqlite3_exec(pdb, "COMMIT TRANSACTION;", nullptr, nullptr, nullptr); + if (rc == SQLITE_OK) { + if (fSync) + Sync(); + return true; + } + return false; + } + + // not available for LevelDB; provide for compatibility with BDB + bool Flush() + { + return true; + } + + bool Sync() + { + auto rc = sqlite3_wal_checkpoint_v2(pdb, nullptr, SQLITE_CHECKPOINT_FULL, nullptr, nullptr); + return rc == SQLITE_OK; + } + + /** + * Return true if the database managed by this class contains no entries. + */ + bool IsEmpty() { + int64_t count = -1; + static auto cb = [](void* state, int argc, char** argv, char** cols) { + auto data = reinterpret_cast(state); + if (argc) + *data = std::atoll(argv[0]); + return SQLITE_OK; + }; + int rc = sqlite3_exec(pdb, "SELECT COUNT(*) FROM kv", cb, &count, nullptr); + assert(rc == SQLITE_OK); + return count == 0; + } +}; + +/** Batch of changes queued to be written to a CDBWrapper */ +class CSqlBatch +{ + friend class CSqlWrapper; + +private: + CSqlWrapper &parent; + +public: + /** + * @param[in] _parent CDBWrapper that this batch is to be submitted to + */ + explicit CSqlBatch(CSqlWrapper &_parent) : parent(_parent) { + int rc = sqlite3_exec(parent.pdb, "BEGIN TRANSACTION;", nullptr, nullptr, nullptr); + parent.VerifyRC(rc, SQLITE_OK, __LINE__); + }; + + void Clear() + { + int rc = sqlite3_exec(parent.pdb, "ROLLBACK TRANSACTION;", nullptr, nullptr, nullptr); + parent.VerifyRC(rc, SQLITE_OK, __LINE__); + } + + template + void Write(const K& key, const V& value) + { + auto ret = parent.Write(key, value, false); + assert(ret); + } + + template + void Erase(const K& key) + { + parent.Erase(key, false); + } +}; + +class CSqlIterator +{ +private: + const CSqlWrapper &parent; + sqlite3_stmt *stmt; + int last_step; + +public: + + /** + * @param[in] _parent Parent CDBWrapper instance. + * @param[in] _piter The original leveldb iterator. + */ + CSqlIterator(const CSqlWrapper &_parent) : + parent(_parent) { + stmt = nullptr; + auto rc = sqlite3_prepare_v2(parent.pdb, "SELECT * FROM kv;", -1, &stmt, nullptr); + parent.VerifyRC(rc, SQLITE_OK, __LINE__); + }; + + ~CSqlIterator() { + last_step = SQLITE_DONE; + sqlite3_finalize(stmt); + } + + bool Valid() const { return last_step == SQLITE_ROW; } + + void SeekToFirst() { auto rc = sqlite3_reset(stmt); assert(rc == SQLITE_OK); Next(); } + + void Next() { last_step = sqlite3_step(stmt); } + + template bool GetKey(K& key) { + if (last_step != SQLITE_ROW) + return false; + + auto blob_size = sqlite3_column_bytes(stmt, 0); + auto blob = reinterpret_cast(sqlite3_column_blob(stmt, 0)); // released on step/finalize + if (blob_size && blob) { + try { + CDataStream ssKey(blob, blob + blob_size, SER_DISK, CLIENT_VERSION); + ssKey >> key; + return true; + } catch (const std::exception &) {} + } + return false; + } + + template bool GetValue(V& value) { + if (last_step != SQLITE_ROW) + return false; + + auto blob_size = sqlite3_column_bytes(stmt, 1); + auto blob = reinterpret_cast(sqlite3_column_blob(stmt, 1)); // released on step/finalize + if (blob_size && blob) { + try { + CDataStream ssValue(blob, blob + blob_size, SER_DISK, CLIENT_VERSION); + ssValue >> value; + return true; + } catch (const std::exception &) {} + } + return false; + } + + int GetValueSize() { + if (last_step != SQLITE_ROW) + return -1; + return sqlite3_column_bytes(stmt, 1); + } +}; + diff --git a/src/test/claimtriebranching_tests.cpp b/src/test/claimtriebranching_tests.cpp index dd4709901..245bff455 100644 --- a/src/test/claimtriebranching_tests.cpp +++ b/src/test/claimtriebranching_tests.cpp @@ -322,16 +322,16 @@ struct ClaimTrieChainFixture: public CClaimTrieCacheExpirationFork template bool keyTypeEmpty(uint8_t keyType) { - boost::scoped_ptr pcursor(base->db->NewIterator()); - pcursor->SeekToFirst(); + CSqlIterator pcursor(*(base->db)); + pcursor.SeekToFirst(); - while (pcursor->Valid()) { + while (pcursor.Valid()) { std::pair key; - if (pcursor->GetKey(key)) { + if (pcursor.GetKey(key)) { if (key.first == keyType) return false; } - pcursor->Next(); + pcursor.Next(); } return true; }