replaced leveldb with sqlite for claimtrie db

This commit is contained in:
Brannon King 2019-08-07 16:42:06 -06:00
parent 83319b7f31
commit cb48bc3f2a
8 changed files with 351 additions and 27 deletions

View file

@ -131,6 +131,7 @@ BITCOIN_CORE_H = \
keystore.h \
lbry.h \
dbwrapper.h \
sqlwrapper.h \
limitedmap.h \
logging.h \
memusage.h \
@ -479,7 +480,7 @@ lbrycrdd_LDADD = \
$(LIBMEMENV) \
$(LIBSECP256K1)
lbrycrdd_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) $(ZMQ_LIBS)
lbrycrdd_LDADD += -lsqlite3 $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) $(ZMQ_LIBS) -lsqlite3
# lbrycrd-cli binary #
lbrycrd_cli_SOURCES = bitcoin-cli.cpp

View file

@ -34,7 +34,7 @@ nodist_bench_bench_bitcoin_SOURCES = $(GENERATED_BENCH_FILES)
bench_bench_bitcoin_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(EVENT_CLFAGS) $(EVENT_PTHREADS_CFLAGS) $(BOOST_CPPFLAGS) -I$(builddir)/bench/
bench_bench_bitcoin_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS)
bench_bench_bitcoin_LDADD = \
bench_bench_bitcoin_LDADD = -lsqlite3 \
$(LIBBITCOIN_WALLET) \
$(LIBBITCOIN_SERVER) \
$(LIBBITCOIN_COMMON) \
@ -55,7 +55,7 @@ if ENABLE_WALLET
bench_bench_bitcoin_SOURCES += bench/coin_selection.cpp
endif
bench_bench_bitcoin_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
bench_bench_bitcoin_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) -lsqlite3
bench_bench_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS)
CLEAN_BITCOIN_BENCH = bench/*.gcda bench/*.gcno $(GENERATED_BENCH_FILES)

View file

@ -121,11 +121,11 @@ if ENABLE_WALLET
test_test_lbrycrd_LDADD += $(LIBBITCOIN_WALLET)
endif
test_test_lbrycrd_LDADD += $(LIBBITCOIN_SERVER) $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CONSENSUS) $(LIBBITCOIN_CRYPTO) $(LIBUNIVALUE) \
test_test_lbrycrd_LDADD += -lsqlite3 $(LIBBITCOIN_SERVER) $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CONSENSUS) $(LIBBITCOIN_CRYPTO) $(LIBUNIVALUE) \
$(LIBLEVELDB) $(LIBLEVELDB_SSE42) $(LIBMEMENV) $(BOOST_LIBS) $(BOOST_UNIT_TEST_FRAMEWORK_LIB) $(LIBSECP256K1) $(EVENT_LIBS) $(EVENT_PTHREADS_LIBS)
test_test_lbrycrd_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS)
test_test_lbrycrd_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS)
test_test_lbrycrd_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(CRYPTO_LIBS) $(ICU_LIBS) $(MINIUPNPC_LIBS) -lsqlite3
test_test_lbrycrd_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static
if ENABLE_ZMQ

View file

@ -127,7 +127,7 @@ void CClaimTrieData::reorderClaims(const supportEntryType& supports)
CClaimTrie::CClaimTrie(bool fMemory, bool fWipe, int proportionalDelayFactor)
{
nProportionalDelayFactor = proportionalDelayFactor;
db.reset(new CDBWrapper(GetDataDir() / "claimtrie", 200 * 1024 * 1024, fMemory, fWipe, false));
db.reset(new CSqlWrapper(GetDataDir() / "claimtrie.sqlite3.db", 200 * 1024 * 1024, fMemory, fWipe));
}
bool CClaimTrie::SyncToDisk()
@ -136,7 +136,7 @@ bool CClaimTrie::SyncToDisk()
}
template <typename Key, typename Container>
typename Container::value_type* getQueue(CDBWrapper& db, uint8_t dbkey, const Key& key, Container& queue, bool create)
typename Container::value_type* getQueue(CSqlWrapper& db, uint8_t dbkey, const Key& key, Container& queue, bool create)
{
auto itQueue = queue.find(key);
if (itQueue != queue.end())
@ -522,7 +522,7 @@ bool CClaimTrieCacheBase::getClaimById(const uint160& claimId, std::string& name
}
template <typename K, typename T>
void BatchWrite(CDBBatch& batch, uint8_t dbkey, const K& key, const std::vector<T>& value)
void BatchWrite(CSqlBatch& batch, uint8_t dbkey, const K& key, const std::vector<T>& value)
{
if (value.empty()) {
batch.Erase(std::make_pair(dbkey, key));
@ -532,7 +532,7 @@ void BatchWrite(CDBBatch& batch, uint8_t dbkey, const K& key, const std::vector<
}
template <typename Container>
void BatchWriteQueue(CDBBatch& batch, uint8_t dbkey, const Container& queue)
void BatchWriteQueue(CSqlBatch& batch, uint8_t dbkey, const Container& queue)
{
for (auto& itQueue : queue)
BatchWrite(batch, dbkey, itQueue.first, itQueue.second);
@ -540,7 +540,7 @@ void BatchWriteQueue(CDBBatch& batch, uint8_t dbkey, const Container& queue)
bool CClaimTrieCacheBase::flush()
{
CDBBatch batch(*(base->db));
CSqlBatch batch(*(base->db));
for (const auto& claim : claimsToDeleteFromByIdIndex) {
auto it = std::find_if(claimsToAddToByIdIndex.begin(), claimsToAddToByIdIndex.end(),
@ -596,8 +596,8 @@ bool CClaimTrieCacheBase::flush()
base->nNextHeight = nNextHeight;
if (!nodesToAddOrUpdate.empty() && (LogAcceptCategory(BCLog::CLAIMS) || LogAcceptCategory(BCLog::BENCH))) {
LogPrintf("TrieCache size: %zu nodes on block %d, batch writes %zu bytes.\n",
nodesToAddOrUpdate.height(), nNextHeight, batch.SizeEstimate());
LogPrintf("TrieCache size: %zu nodes on block %d.\n",
nodesToAddOrUpdate.height(), nNextHeight);
}
auto ret = base->db->WriteBatch(batch);
clear();

View file

@ -4,7 +4,7 @@
#include <amount.h>
#include <chain.h>
#include <chainparams.h>
#include <dbwrapper.h>
#include <sqlwrapper.h>
#include <prefixtrie.h>
#include <primitives/transaction.h>
#include <serialize.h>
@ -321,7 +321,7 @@ class CClaimTrie
{
int nNextHeight = 0;
int nProportionalDelayFactor = 0;
std::unique_ptr<CDBWrapper> db;
std::unique_ptr<CSqlWrapper> db;
public:
CClaimTrie() = default;

View file

@ -61,22 +61,22 @@ bool CClaimTrieCacheExpirationFork::forkForExpirationChange(bool increment)
*/
//look through db for expiration queues, if we haven't already found it in dirty expiration queue
boost::scoped_ptr<CDBIterator> pcursor(base->db->NewIterator());
for (pcursor->SeekToFirst(); pcursor->Valid(); pcursor->Next()) {
CSqlIterator pcursor(*(base->db));
for (pcursor.SeekToFirst(); pcursor.Valid(); pcursor.Next()) {
std::pair<uint8_t, int> key;
if (!pcursor->GetKey(key))
if (!pcursor.GetKey(key))
continue;
int height = key.second;
if (key.first == CLAIM_EXP_QUEUE_ROW) {
expirationQueueRowType row;
if (pcursor->GetValue(row)) {
if (pcursor.GetValue(row)) {
reactivateClaim(row, height, increment);
} else {
return error("%s(): error reading expiration queue rows from disk", __func__);
}
} else if (key.first == SUPPORT_EXP_QUEUE_ROW) {
expirationQueueRowType row;
if (pcursor->GetValue(row)) {
if (pcursor.GetValue(row)) {
reactivateSupport(row, height, increment);
} else {
return error("%s(): error reading support expiration queue rows from disk", __func__);
@ -161,10 +161,10 @@ bool CClaimTrieCacheNormalizationFork::normalizeAllNamesInTrieIfNecessary(insert
// run the one-time upgrade of all names that need to change
// it modifies the (cache) trie as it goes, so we need to grab everything to be modified first
boost::scoped_ptr<CDBIterator> pcursor(base->db->NewIterator());
for (pcursor->SeekToFirst(); pcursor->Valid(); pcursor->Next()) {
CSqlIterator pcursor(*(base->db));
for (pcursor.SeekToFirst(); pcursor.Valid(); pcursor.Next()) {
std::pair<uint8_t, std::string> key;
if (!pcursor->GetKey(key) || key.first != TRIE_NODE_BY_NAME)
if (!pcursor.GetKey(key) || key.first != TRIE_NODE_BY_NAME)
continue;
const auto& name = key.second;

323
src/sqlwrapper.h Normal file
View file

@ -0,0 +1,323 @@
// Copyright (c) 2012-2018 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#pragma once
#include <clientversion.h>
#include <fs.h>
#include <serialize.h>
#include <streams.h>
#include <util.h>
#include <utilstrencodings.h>
#include <version.h>
#include <sqlite3.h>
class CSqlBatch;
class CSqlIterator;
class CSqlWrapper
{
friend CSqlBatch;
friend CSqlIterator;
sqlite3 *pdb;
mutable CDataStream ssKey;
mutable CDataStream ssValue;
void VerifyRC(int rc, int target, int line) const {
if (rc != target) {
auto error = sqlite3_errmsg(pdb);
LogPrintf("Query error %d on line %d: %s\n", rc, line, error);
assert(rc == target);
}
}
public:
/**
* @param[in] path Location in the filesystem where leveldb data will be stored.
* @param[in] nCacheSize Configures various leveldb cache settings.
* @param[in] fMemory If true, use leveldb's memory environment.
* @param[in] fWipe If true, remove all existing data.
* @param[in] obfuscate If true, store data obfuscated via simple XOR. If false, XOR
* with a zero'd byte array.
*/
CSqlWrapper(const fs::path& path, size_t nCacheSize, bool fMemory = false, bool fWipe = false):
ssKey(SER_DISK, CLIENT_VERSION), ssValue(SER_DISK, CLIENT_VERSION) {
int rc = sqlite3_open_v2(fMemory ? ":memory:" : path.c_str(), &pdb, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr);
if (rc != SQLITE_OK)
{
LogPrintf("ERROR: Unable to open %s. Message: %s\n", path, sqlite3_errmsg(pdb));
assert(rc == SQLITE_OK);
}
char* error = nullptr;
rc = sqlite3_exec(pdb, "CREATE TABLE IF NOT EXISTS kv (key BLOB PRIMARY KEY, value BLOB);", nullptr, nullptr, &error);
if (rc != SQLITE_OK) {
LogPrintf("ERROR: Unable to create kv table. Message: %s\n", error);
sqlite3_free(error);
sqlite3_close(pdb);
assert(rc == SQLITE_OK);
}
if (fWipe) {
rc = sqlite3_exec(pdb, "DELETE FROM kv;", nullptr, nullptr, &error);
if (rc != SQLITE_OK) {
LogPrintf("ERROR: Unable to drop kv table. Message: %s\n", error);
sqlite3_free(error);
sqlite3_close(pdb);
assert(rc == SQLITE_OK);
}
}
std::string pragmas = "PRAGMA cache_size=-" + std::to_string(nCacheSize >> 10)
+ "; PRAGMA journal_mode=WAL; PRAGMA temp_store=MEMORY; PRAGMA synchronous=NORMAL; PRAGMA wal_autocheckpoint=10000;";
rc = sqlite3_exec(pdb, pragmas.c_str(), nullptr, nullptr, &error);
if (rc != SQLITE_OK) {
LogPrintf("ERROR: Unable to set cache size. Message: %s\n", error);
sqlite3_free(error);
sqlite3_close(pdb);
assert(rc == SQLITE_OK);
}
}
~CSqlWrapper() {
sqlite3_close(pdb);
}
CSqlWrapper(const CSqlWrapper&) = delete;
template <typename K, typename V>
bool Read(const K& key, V& value) const
{
ssKey << key;
sqlite3_stmt *stmt = nullptr;
auto rc = sqlite3_prepare_v2(pdb, "SELECT value FROM kv WHERE key = ?", -1, &stmt, nullptr);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_step(stmt);
bool ret = false;
if (rc == SQLITE_ROW) {
auto blob_size = sqlite3_column_bytes(stmt, 0);
auto blob = reinterpret_cast<const char*>(sqlite3_column_blob(stmt, 0)); // released on step/finalize
if (blob_size && blob) {
try {
CDataStream ssValue(blob, blob + blob_size, SER_DISK, CLIENT_VERSION);
ssValue >> value;
ret = true;
} catch (const std::exception &) {}
}
}
sqlite3_finalize(stmt);
ssKey.clear();
return ret;
}
template <typename K, typename V>
bool Write(const K& key, const V& value, bool fSync = false)
{
ssKey << key;
ssValue << value;
sqlite3_stmt *stmt = nullptr;
auto rc = sqlite3_prepare_v2(pdb, "REPLACE INTO kv VALUES(?, ?)", -1, &stmt, nullptr);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_bind_blob(stmt, 2, ssValue.data(), ssValue.size(), SQLITE_STATIC);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_step(stmt);
VerifyRC(rc, SQLITE_DONE, __LINE__);
auto ret = sqlite3_changes(pdb) > 0;
sqlite3_finalize(stmt);
ssKey.clear();
ssValue.clear();
return ret;
}
template <typename K>
bool Exists(const K& key) const
{
ssKey << key;
sqlite3_stmt *stmt = nullptr;
auto rc = sqlite3_prepare_v2(pdb, "SELECT 1 FROM kv WHERE key = ?", -1, &stmt, nullptr);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_step(stmt);
bool ret = false;
if (rc == SQLITE_ROW)
ret = 1 == sqlite3_column_int(stmt, 0);
sqlite3_finalize(stmt);
ssKey.clear();
return ret;
}
template <typename K>
bool Erase(const K& key, bool fSync = false)
{
ssKey << key;
sqlite3_stmt *stmt = nullptr;
auto rc = sqlite3_prepare_v2(pdb, "DELETE FROM kv WHERE key = ?", -1, &stmt, nullptr);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_bind_blob(stmt, 1, ssKey.data(), ssKey.size(), SQLITE_STATIC);
VerifyRC(rc, SQLITE_OK, __LINE__);
rc = sqlite3_step(stmt);
VerifyRC(rc, SQLITE_DONE, __LINE__);
auto ret = sqlite3_changes(pdb) > 0;
sqlite3_finalize(stmt);
ssKey.clear();
if (fSync) Sync();
return ret;
}
bool WriteBatch(CSqlBatch& batch, bool fSync = false) {
int rc = sqlite3_exec(pdb, "COMMIT TRANSACTION;", nullptr, nullptr, nullptr);
if (rc == SQLITE_OK) {
if (fSync)
Sync();
return true;
}
return false;
}
// not available for LevelDB; provide for compatibility with BDB
bool Flush()
{
return true;
}
bool Sync()
{
auto rc = sqlite3_wal_checkpoint_v2(pdb, nullptr, SQLITE_CHECKPOINT_FULL, nullptr, nullptr);
return rc == SQLITE_OK;
}
/**
* Return true if the database managed by this class contains no entries.
*/
bool IsEmpty() {
int64_t count = -1;
static auto cb = [](void* state, int argc, char** argv, char** cols) {
auto data = reinterpret_cast<int64_t*>(state);
if (argc)
*data = std::atoll(argv[0]);
return SQLITE_OK;
};
int rc = sqlite3_exec(pdb, "SELECT COUNT(*) FROM kv", cb, &count, nullptr);
assert(rc == SQLITE_OK);
return count == 0;
}
};
/** Batch of changes queued to be written to a CDBWrapper */
class CSqlBatch
{
friend class CSqlWrapper;
private:
CSqlWrapper &parent;
public:
/**
* @param[in] _parent CDBWrapper that this batch is to be submitted to
*/
explicit CSqlBatch(CSqlWrapper &_parent) : parent(_parent) {
int rc = sqlite3_exec(parent.pdb, "BEGIN TRANSACTION;", nullptr, nullptr, nullptr);
parent.VerifyRC(rc, SQLITE_OK, __LINE__);
};
void Clear()
{
int rc = sqlite3_exec(parent.pdb, "ROLLBACK TRANSACTION;", nullptr, nullptr, nullptr);
parent.VerifyRC(rc, SQLITE_OK, __LINE__);
}
template <typename K, typename V>
void Write(const K& key, const V& value)
{
auto ret = parent.Write(key, value, false);
assert(ret);
}
template <typename K>
void Erase(const K& key)
{
parent.Erase(key, false);
}
};
class CSqlIterator
{
private:
const CSqlWrapper &parent;
sqlite3_stmt *stmt;
int last_step;
public:
/**
* @param[in] _parent Parent CDBWrapper instance.
* @param[in] _piter The original leveldb iterator.
*/
CSqlIterator(const CSqlWrapper &_parent) :
parent(_parent) {
stmt = nullptr;
auto rc = sqlite3_prepare_v2(parent.pdb, "SELECT * FROM kv;", -1, &stmt, nullptr);
parent.VerifyRC(rc, SQLITE_OK, __LINE__);
};
~CSqlIterator() {
last_step = SQLITE_DONE;
sqlite3_finalize(stmt);
}
bool Valid() const { return last_step == SQLITE_ROW; }
void SeekToFirst() { auto rc = sqlite3_reset(stmt); assert(rc == SQLITE_OK); Next(); }
void Next() { last_step = sqlite3_step(stmt); }
template<typename K> bool GetKey(K& key) {
if (last_step != SQLITE_ROW)
return false;
auto blob_size = sqlite3_column_bytes(stmt, 0);
auto blob = reinterpret_cast<const char*>(sqlite3_column_blob(stmt, 0)); // released on step/finalize
if (blob_size && blob) {
try {
CDataStream ssKey(blob, blob + blob_size, SER_DISK, CLIENT_VERSION);
ssKey >> key;
return true;
} catch (const std::exception &) {}
}
return false;
}
template<typename V> bool GetValue(V& value) {
if (last_step != SQLITE_ROW)
return false;
auto blob_size = sqlite3_column_bytes(stmt, 1);
auto blob = reinterpret_cast<const char*>(sqlite3_column_blob(stmt, 1)); // released on step/finalize
if (blob_size && blob) {
try {
CDataStream ssValue(blob, blob + blob_size, SER_DISK, CLIENT_VERSION);
ssValue >> value;
return true;
} catch (const std::exception &) {}
}
return false;
}
int GetValueSize() {
if (last_step != SQLITE_ROW)
return -1;
return sqlite3_column_bytes(stmt, 1);
}
};

View file

@ -322,16 +322,16 @@ struct ClaimTrieChainFixture: public CClaimTrieCacheExpirationFork
template <typename K>
bool keyTypeEmpty(uint8_t keyType)
{
boost::scoped_ptr<CDBIterator> pcursor(base->db->NewIterator());
pcursor->SeekToFirst();
CSqlIterator pcursor(*(base->db));
pcursor.SeekToFirst();
while (pcursor->Valid()) {
while (pcursor.Valid()) {
std::pair<uint8_t, K> key;
if (pcursor->GetKey(key)) {
if (pcursor.GetKey(key)) {
if (key.first == keyType)
return false;
}
pcursor->Next();
pcursor.Next();
}
return true;
}