database: Implement cache layer.
This commit adds a database cache layer to the ffldb database backend so that callers can commit multiple transactions without having to incur the overhead of a disk sync on every new block.
This commit is contained in:
parent
af3ed803f5
commit
0b32febe5c
7 changed files with 815 additions and 106 deletions
|
@ -417,7 +417,7 @@ func (s *blockStore) writeBlock(rawBlock []byte) (blockLocation, error) {
|
|||
wc := s.writeCursor
|
||||
finalOffset := wc.curOffset + fullLen
|
||||
if finalOffset < wc.curOffset || finalOffset > s.maxBlockFileSize {
|
||||
// This is done under the write cursor lock since the fileNum
|
||||
// This is done under the write cursor lock since the curFileNum
|
||||
// field is accessed elsewhere by readers.
|
||||
//
|
||||
// Close the current write file to force a read-only reopen
|
||||
|
@ -483,14 +483,6 @@ func (s *blockStore) writeBlock(rawBlock []byte) (blockLocation, error) {
|
|||
return blockLocation{}, err
|
||||
}
|
||||
|
||||
// Sync the file to disk.
|
||||
if err := wc.curFile.file.Sync(); err != nil {
|
||||
str := fmt.Sprintf("failed to sync file %d: %v", wc.curFileNum,
|
||||
err)
|
||||
return blockLocation{}, makeDbErr(database.ErrDriverSpecific,
|
||||
str, err)
|
||||
}
|
||||
|
||||
loc := blockLocation{
|
||||
blockFileNum: wc.curFileNum,
|
||||
fileOffset: origOffset,
|
||||
|
@ -594,6 +586,36 @@ func (s *blockStore) readBlockRegion(loc blockLocation, offset, numBytes uint32)
|
|||
return serializedData, nil
|
||||
}
|
||||
|
||||
// syncBlocks performs a file system sync on the flat file associated with the
|
||||
// store's current write cursor. It is safe to call even when there is not a
|
||||
// current write file in which case it will have no effect.
|
||||
//
|
||||
// This is used when flushing cached metadata updates to disk to ensure all the
|
||||
// block data is fully written before updating the metadata. This ensures the
|
||||
// metadata and block data can be properly reconciled in failure scenarios.
|
||||
func (s *blockStore) syncBlocks() error {
|
||||
wc := s.writeCursor
|
||||
wc.RLock()
|
||||
defer wc.RUnlock()
|
||||
|
||||
// Nothing to do if there is no current file associated with the write
|
||||
// cursor.
|
||||
wc.curFile.RLock()
|
||||
defer wc.curFile.RUnlock()
|
||||
if wc.curFile.file == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync the file to disk.
|
||||
if err := wc.curFile.file.Sync(); err != nil {
|
||||
str := fmt.Sprintf("failed to sync file %d: %v", wc.curFileNum,
|
||||
err)
|
||||
return makeDbErr(database.ErrDriverSpecific, str, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleRollback rolls the block files on disk back to the provided file number
|
||||
// and offset. This involves potentially deleting and truncating the files that
|
||||
// were partially written.
|
||||
|
|
|
@ -225,12 +225,12 @@ func (c *cursor) Delete() error {
|
|||
|
||||
// skipPendingUpdates skips any keys at the current database iterator position
|
||||
// that are being updated by the transaction. The forwards flag indicates the
|
||||
// direction the cursor is moving moved.
|
||||
// direction the cursor is moving.
|
||||
func (c *cursor) skipPendingUpdates(forwards bool) {
|
||||
for c.dbIter.Valid() {
|
||||
var skip bool
|
||||
key := c.dbIter.Key()
|
||||
if _, ok := c.bucket.tx.pendingRemove[string(key)]; ok {
|
||||
if c.bucket.tx.pendingRemove.Has(key) {
|
||||
skip = true
|
||||
} else if c.bucket.tx.pendingKeys.Has(key) {
|
||||
skip = true
|
||||
|
@ -249,7 +249,7 @@ func (c *cursor) skipPendingUpdates(forwards bool) {
|
|||
|
||||
// chooseIterator first skips any entries in the database iterator that are
|
||||
// being updated by the transaction and sets the current iterator to the
|
||||
// appropriate iterator depending on their validatidy and the order they compare
|
||||
// appropriate iterator depending on their validity and the order they compare
|
||||
// in while taking into account the direction flag. When the cursor is being
|
||||
// moved forwards and both iterators are valid, the iterator with the smaller
|
||||
// key is chosen and vice versa when the cursor is being moved backwards.
|
||||
|
@ -258,7 +258,7 @@ func (c *cursor) chooseIterator(forwards bool) bool {
|
|||
// being updated by the transaction.
|
||||
c.skipPendingUpdates(forwards)
|
||||
|
||||
// When bother iterators are exhausted, the cursor is exhausted too.
|
||||
// When both iterators are exhausted, the cursor is exhausted too.
|
||||
if !c.dbIter.Valid() && !c.pendingIter.Valid() {
|
||||
c.currentIter = nil
|
||||
return false
|
||||
|
@ -494,7 +494,7 @@ func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
|
|||
switch cursorTyp {
|
||||
case ctKeys:
|
||||
keyRange := util.BytesPrefix(bucketID)
|
||||
dbIter = b.tx.snapshot.NewIterator(keyRange, nil)
|
||||
dbIter = b.tx.snapshot.NewIterator(keyRange)
|
||||
pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
|
||||
pendingIter = pendingKeyIter
|
||||
|
||||
|
@ -510,7 +510,7 @@ func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
|
|||
copy(prefix[len(bucketIndexPrefix):], bucketID)
|
||||
bucketRange := util.BytesPrefix(prefix)
|
||||
|
||||
dbIter = b.tx.snapshot.NewIterator(bucketRange, nil)
|
||||
dbIter = b.tx.snapshot.NewIterator(bucketRange)
|
||||
pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
|
||||
pendingIter = pendingBucketIter
|
||||
|
||||
|
@ -528,8 +528,8 @@ func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
|
|||
// Since both keys and buckets are needed from the database,
|
||||
// create an individual iterator for each prefix and then create
|
||||
// a merged iterator from them.
|
||||
dbKeyIter := b.tx.snapshot.NewIterator(keyRange, nil)
|
||||
dbBucketIter := b.tx.snapshot.NewIterator(bucketRange, nil)
|
||||
dbKeyIter := b.tx.snapshot.NewIterator(keyRange)
|
||||
dbBucketIter := b.tx.snapshot.NewIterator(bucketRange)
|
||||
iters := []iterator.Iterator{dbKeyIter, dbBucketIter}
|
||||
dbIter = iterator.NewMergedIterator(iters,
|
||||
comparer.DefaultComparer, true)
|
||||
|
@ -952,13 +952,13 @@ type pendingBlock struct {
|
|||
// read-write and implements the database.Bucket interface. The transaction
|
||||
// provides a root bucket against which all read and writes occur.
|
||||
type transaction struct {
|
||||
managed bool // Is the transaction managed?
|
||||
closed bool // Is the transaction closed?
|
||||
writable bool // Is the transaction writable?
|
||||
db *db // DB instance the tx was created from.
|
||||
snapshot *leveldb.Snapshot // Underlying snapshot for txns.
|
||||
metaBucket *bucket // The root metadata bucket.
|
||||
blockIdxBucket *bucket // The block index bucket.
|
||||
managed bool // Is the transaction managed?
|
||||
closed bool // Is the transaction closed?
|
||||
writable bool // Is the transaction writable?
|
||||
db *db // DB instance the tx was created from.
|
||||
snapshot *dbCacheSnapshot // Underlying snapshot for txns.
|
||||
metaBucket *bucket // The root metadata bucket.
|
||||
blockIdxBucket *bucket // The block index bucket.
|
||||
|
||||
// Blocks that need to be stored on commit. The pendingBlocks map is
|
||||
// kept to allow quick lookups of pending data by block hash.
|
||||
|
@ -967,7 +967,7 @@ type transaction struct {
|
|||
|
||||
// Keys that need to be stored or deleted on commit.
|
||||
pendingKeys *treap.Mutable
|
||||
pendingRemove map[string]struct{}
|
||||
pendingRemove *treap.Mutable
|
||||
|
||||
// Active iterators that need to be notified when the pending keys have
|
||||
// been updated so the cursors can properly handle updates to the
|
||||
|
@ -1030,7 +1030,7 @@ func (tx *transaction) hasKey(key []byte) bool {
|
|||
// When the transaction is writable, check the pending transaction
|
||||
// state first.
|
||||
if tx.writable {
|
||||
if _, ok := tx.pendingRemove[string(key)]; ok {
|
||||
if tx.pendingRemove.Has(key) {
|
||||
return false
|
||||
}
|
||||
if tx.pendingKeys.Has(key) {
|
||||
|
@ -1038,9 +1038,8 @@ func (tx *transaction) hasKey(key []byte) bool {
|
|||
}
|
||||
}
|
||||
|
||||
// Consult the database.
|
||||
hasKey, _ := tx.snapshot.Has(key, nil)
|
||||
return hasKey
|
||||
// Consult the database cache and underlying database.
|
||||
return tx.snapshot.Has(key)
|
||||
}
|
||||
|
||||
// putKey adds the provided key to the list of keys to be updated in the
|
||||
|
@ -1051,7 +1050,7 @@ func (tx *transaction) hasKey(key []byte) bool {
|
|||
func (tx *transaction) putKey(key, value []byte) error {
|
||||
// Prevent the key from being deleted if it was previously scheduled
|
||||
// to be deleted on transaction commit.
|
||||
delete(tx.pendingRemove, string(key))
|
||||
tx.pendingRemove.Delete(key)
|
||||
|
||||
// Add the key/value pair to the list to be written on transaction
|
||||
// commit.
|
||||
|
@ -1060,14 +1059,14 @@ func (tx *transaction) putKey(key, value []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// fetchKey attempts to fetch the provided key from the database while taking
|
||||
// into account the current transaction state. Returns nil if the key does not
|
||||
// exist.
|
||||
// fetchKey attempts to fetch the provided key from the database cache (and
|
||||
// hence underlying database) while taking into account the current transaction
|
||||
// state. Returns nil if the key does not exist.
|
||||
func (tx *transaction) fetchKey(key []byte) []byte {
|
||||
// When the transaction is writable, check the pending transaction
|
||||
// state first.
|
||||
if tx.writable {
|
||||
if _, ok := tx.pendingRemove[string(key)]; ok {
|
||||
if tx.pendingRemove.Has(key) {
|
||||
return nil
|
||||
}
|
||||
if value := tx.pendingKeys.Get(key); value != nil {
|
||||
|
@ -1075,11 +1074,8 @@ func (tx *transaction) fetchKey(key []byte) []byte {
|
|||
}
|
||||
}
|
||||
|
||||
value, err := tx.snapshot.Get(key, nil)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return value
|
||||
// Consult the database cache and underlying database.
|
||||
return tx.snapshot.Get(key)
|
||||
}
|
||||
|
||||
// deleteKey adds the provided key to the list of keys to be deleted from the
|
||||
|
@ -1094,10 +1090,7 @@ func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
|
|||
tx.pendingKeys.Delete(key)
|
||||
|
||||
// Add the key to the list to be deleted on transaction commit.
|
||||
if tx.pendingRemove == nil {
|
||||
tx.pendingRemove = make(map[string]struct{})
|
||||
}
|
||||
tx.pendingRemove[string(key)] = struct{}{}
|
||||
tx.pendingRemove.Put(key, nil)
|
||||
|
||||
// Notify the active iterators about the change if the flag is set.
|
||||
if notifyIterators {
|
||||
|
@ -1644,7 +1637,7 @@ func (tx *transaction) close() {
|
|||
tx.pendingBlockData = nil
|
||||
|
||||
// Clear pending keys that would have been written or deleted on commit.
|
||||
tx.pendingKeys.Reset()
|
||||
tx.pendingKeys = nil
|
||||
tx.pendingRemove = nil
|
||||
|
||||
// Release the snapshot.
|
||||
|
@ -1677,7 +1670,7 @@ func serializeBlockRow(blockLoc blockLocation, blockHdr []byte) []byte {
|
|||
|
||||
// writePendingAndCommit writes pending block data to the flat block files,
|
||||
// updates the metadata with their locations as well as the new current write
|
||||
// location, and commits the metadata to the underlying database. It also
|
||||
// location, and commits the metadata to the memory database cache. It also
|
||||
// properly handles rollback in the case of failures.
|
||||
//
|
||||
// This function MUST only be called when there is pending data to be written.
|
||||
|
@ -1728,25 +1721,17 @@ func (tx *transaction) writePendingAndCommit() error {
|
|||
return convertErr("failed to store write cursor", err)
|
||||
}
|
||||
|
||||
// Perform all leveldb update operations using a batch for atomicity.
|
||||
batch := new(leveldb.Batch)
|
||||
tx.pendingKeys.ForEach(func(k, v []byte) bool {
|
||||
batch.Put(k, v)
|
||||
return true
|
||||
})
|
||||
for k := range tx.pendingRemove {
|
||||
batch.Delete([]byte(k))
|
||||
}
|
||||
if err := tx.db.ldb.Write(batch, nil); err != nil {
|
||||
rollback()
|
||||
return convertErr("failed to commit transaction", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
// Atomically update the database cache. The cache automatically
|
||||
// handles flushing to the underlying persistent storage database.
|
||||
return tx.db.cache.commitTx(tx)
|
||||
}
|
||||
|
||||
// Commit commits all changes that have been made through the root bucket and
|
||||
// all of its sub-buckets to persistent storage.
|
||||
// Commit commits all changes that have been made to the root metadata bucket
|
||||
// and all of its sub-buckets to the database cache which is periodically synced
|
||||
// to persistent storage. In addition, it commits all new blocks directly to
|
||||
// persistent storage bypassing the db cache. Blocks can be rather large, so
|
||||
// this help increase the amount of cache available for the metadata updates and
|
||||
// is safe since blocks are immutable.
|
||||
//
|
||||
// This function is part of the database.Tx interface implementation.
|
||||
func (tx *transaction) Commit() error {
|
||||
|
@ -1802,8 +1787,8 @@ type db struct {
|
|||
writeLock sync.Mutex // Limit to one write transaction at a time.
|
||||
closeLock sync.RWMutex // Make database close block while txns active.
|
||||
closed bool // Is the database closed?
|
||||
ldb *leveldb.DB // The underlying leveldb DB for metadata.
|
||||
store *blockStore // Handles read/writing blocks to flat files.
|
||||
cache *dbCache // Cache layer which wraps underlying leveldb DB.
|
||||
}
|
||||
|
||||
// Enforce db implements the database.DB interface.
|
||||
|
@ -1846,24 +1831,26 @@ func (db *db) begin(writable bool) (*transaction, error) {
|
|||
nil)
|
||||
}
|
||||
|
||||
snapshot, err := db.ldb.GetSnapshot()
|
||||
// Grab a snapshot of the database cache (which in turn also handles the
|
||||
// underlying database).
|
||||
snapshot, err := db.cache.Snapshot()
|
||||
if err != nil {
|
||||
db.closeLock.RUnlock()
|
||||
if writable {
|
||||
db.writeLock.Unlock()
|
||||
}
|
||||
|
||||
str := "failed to open transaction"
|
||||
return nil, convertErr(str, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The metadata and block index buckets are internal-only buckets, so
|
||||
// they have defined IDs.
|
||||
tx := &transaction{
|
||||
writable: writable,
|
||||
db: db,
|
||||
snapshot: snapshot,
|
||||
pendingKeys: treap.NewMutable(),
|
||||
writable: writable,
|
||||
db: db,
|
||||
snapshot: snapshot,
|
||||
pendingKeys: treap.NewMutable(),
|
||||
pendingRemove: treap.NewMutable(),
|
||||
}
|
||||
tx.metaBucket = &bucket{tx: tx, id: metadataBucketID}
|
||||
tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID}
|
||||
|
@ -1968,10 +1955,9 @@ func (db *db) Update(fn func(database.Tx) error) error {
|
|||
return tx.Commit()
|
||||
}
|
||||
|
||||
// Close cleanly shuts down the database and syncs all data. Any data in
|
||||
// database transactions which have not been committed will be lost, so it is
|
||||
// important to ensure all transactions are finalized prior to calling this
|
||||
// function if that data is intended to be stored.
|
||||
// Close cleanly shuts down the database and syncs all data. It will block
|
||||
// until all database transactions have been finalized (rolled back or
|
||||
// committed).
|
||||
//
|
||||
// This function is part of the database.DB interface implementation.
|
||||
func (db *db) Close() error {
|
||||
|
@ -1985,6 +1971,15 @@ func (db *db) Close() error {
|
|||
}
|
||||
db.closed = true
|
||||
|
||||
// Close the database cache which will flush any existing entries to
|
||||
// disk and close the underlying leveldb database. Any error is saved
|
||||
// and returned at the end after the remaining cleanup since the
|
||||
// database will be marked closed even if this fails given there is no
|
||||
// good way for the caller to recover from a failure here anyways.
|
||||
db.writeLock.Lock()
|
||||
closeErr := db.cache.Close()
|
||||
db.writeLock.Unlock()
|
||||
|
||||
// NOTE: Since the above lock waits for all transactions to finish and
|
||||
// prevents any new ones from being started, it is safe to clear all
|
||||
// state without the individual locks.
|
||||
|
@ -2002,12 +1997,7 @@ func (db *db) Close() error {
|
|||
db.store.openBlocksLRU.Init()
|
||||
db.store.fileNumToLRUElem = nil
|
||||
|
||||
if err := db.ldb.Close(); err != nil {
|
||||
str := "failed to close underlying leveldb database"
|
||||
return convertErr(str, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return closeErr
|
||||
}
|
||||
|
||||
// filesExists reports whether the named file or directory exists.
|
||||
|
@ -2082,9 +2072,12 @@ func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, e
|
|||
|
||||
// Create the block store which includes scanning the existing flat
|
||||
// block files to find what the current write cursor position is
|
||||
// according to the data that is actually on disk.
|
||||
// according to the data that is actually on disk. Also create the
|
||||
// database cache which wraps the underlying leveldb database to provide
|
||||
// write caching.
|
||||
store := newBlockStore(dbPath, network)
|
||||
pdb := &db{ldb: ldb, store: store}
|
||||
cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs)
|
||||
pdb := &db{store: store, cache: cache}
|
||||
|
||||
// Perform any reconciliation needed between the block and metadata as
|
||||
// well as database initialization, if needed.
|
||||
|
|
705
database2/ffldb/dbcache.go
Normal file
705
database2/ffldb/dbcache.go
Normal file
|
@ -0,0 +1,705 @@
|
|||
// Copyright (c) 2015-2016 The btcsuite developers
|
||||
// Use of this source code is governed by an ISC
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package ffldb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/database2/internal/treap"
|
||||
"github.com/btcsuite/goleveldb/leveldb"
|
||||
"github.com/btcsuite/goleveldb/leveldb/iterator"
|
||||
"github.com/btcsuite/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultCacheSize is the default size for the database cache.
|
||||
defaultCacheSize = 50 * 1024 * 1024 // 50 MB
|
||||
|
||||
// defaultFlushSecs is the default number of seconds to use as a
|
||||
// threshold in between database cache flushes when the cache size has
|
||||
// not been exceeded.
|
||||
defaultFlushSecs = 60 // 1 minute
|
||||
|
||||
// sliceOverheadSize is the size a slice takes for overhead. It assumes
|
||||
// 64-bit pointers so technically it is smaller on 32-bit platforms, but
|
||||
// overestimating the size in that case is acceptable since it avoids
|
||||
// the need to import unsafe.
|
||||
sliceOverheadSize = 24
|
||||
|
||||
// logEntryFieldsSize is the size the fields of each log entry takes
|
||||
// excluding the contents of the key and value. It assumes 64-bit
|
||||
// pointers so technically it is smaller on 32-bit platforms, but
|
||||
// overestimating the size in that case is acceptable since it avoids
|
||||
// the need to import unsafe. It consists of 8 bytes for the log entry
|
||||
// type (due to padding) + 24 bytes for the key + 24 bytes for the
|
||||
// value. (8 + 24 + 24).
|
||||
logEntryFieldsSize = 8 + sliceOverheadSize*2
|
||||
|
||||
// batchThreshold is the number of items used to trigger a write of a
|
||||
// leveldb batch. It is used to keep the batch from growing too large
|
||||
// and consequently consuming large amounts of memory.
|
||||
batchThreshold = 8000
|
||||
)
|
||||
|
||||
// ldbCacheIter wraps a treap iterator to provide the additional functionality
|
||||
// needed to satisfy the leveldb iterator.Iterator interface.
|
||||
type ldbCacheIter struct {
|
||||
*treap.Iterator
|
||||
}
|
||||
|
||||
// Enforce ldbCacheIterator implements the leveldb iterator.Iterator interface.
|
||||
var _ iterator.Iterator = (*ldbCacheIter)(nil)
|
||||
|
||||
// Error is only provided to satisfy the iterator interface as there are no
|
||||
// errors for this memory-only structure.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *ldbCacheIter) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetReleaser is only provided to satisfy the iterator interface as there is no
|
||||
// need to override it.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *ldbCacheIter) SetReleaser(releaser util.Releaser) {
|
||||
}
|
||||
|
||||
// Release is only provided to satisfy the iterator interface.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *ldbCacheIter) Release() {
|
||||
}
|
||||
|
||||
// newLdbCacheIter creates a new treap iterator for the given slice against the
|
||||
// pending keys for the passed cache snapshot and returns it wrapped in an
|
||||
// ldbCacheIter so it can be used as a leveldb iterator.
|
||||
func newLdbCacheIter(snap *dbCacheSnapshot, slice *util.Range) *ldbCacheIter {
|
||||
iter := snap.pendingKeys.Iterator(slice.Start, slice.Limit)
|
||||
return &ldbCacheIter{Iterator: iter}
|
||||
}
|
||||
|
||||
// dbCacheIterator defines an iterator over the key/value pairs in the database
|
||||
// cache and underlying database.
|
||||
type dbCacheIterator struct {
|
||||
cacheSnapshot *dbCacheSnapshot
|
||||
dbIter iterator.Iterator
|
||||
cacheIter iterator.Iterator
|
||||
currentIter iterator.Iterator
|
||||
released bool
|
||||
}
|
||||
|
||||
// Enforce dbCacheIterator implements the leveldb iterator.Iterator interface.
|
||||
var _ iterator.Iterator = (*dbCacheIterator)(nil)
|
||||
|
||||
// skipPendingUpdates skips any keys at the current database iterator position
|
||||
// that are being updated by the cache. The forwards flag indicates the
|
||||
// direction the iterator is moving.
|
||||
func (iter *dbCacheIterator) skipPendingUpdates(forwards bool) {
|
||||
for iter.dbIter.Valid() {
|
||||
var skip bool
|
||||
key := iter.dbIter.Key()
|
||||
if iter.cacheSnapshot.pendingRemove.Has(key) {
|
||||
skip = true
|
||||
} else if iter.cacheSnapshot.pendingKeys.Has(key) {
|
||||
skip = true
|
||||
}
|
||||
if !skip {
|
||||
break
|
||||
}
|
||||
|
||||
if forwards {
|
||||
iter.dbIter.Next()
|
||||
} else {
|
||||
iter.dbIter.Prev()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// chooseIterator first skips any entries in the database iterator that are
|
||||
// being updated by the cache and sets the current iterator to the appropriate
|
||||
// iterator depending on their validity and the order they compare in while taking
|
||||
// into account the direction flag. When the iterator is being moved forwards
|
||||
// and both iterators are valid, the iterator with the smaller key is chosen and
|
||||
// vice versa when the iterator is being moved backwards.
|
||||
func (iter *dbCacheIterator) chooseIterator(forwards bool) bool {
|
||||
// Skip any keys at the current database iterator position that are
|
||||
// being updated by the cache.
|
||||
iter.skipPendingUpdates(forwards)
|
||||
|
||||
// When both iterators are exhausted, the iterator is exhausted too.
|
||||
if !iter.dbIter.Valid() && !iter.cacheIter.Valid() {
|
||||
iter.currentIter = nil
|
||||
return false
|
||||
}
|
||||
|
||||
// Choose the database iterator when the cache iterator is exhausted.
|
||||
if !iter.cacheIter.Valid() {
|
||||
iter.currentIter = iter.dbIter
|
||||
return true
|
||||
}
|
||||
|
||||
// Choose the cache iterator when the database iterator is exhausted.
|
||||
if !iter.dbIter.Valid() {
|
||||
iter.currentIter = iter.cacheIter
|
||||
return true
|
||||
}
|
||||
|
||||
// Both iterators are valid, so choose the iterator with either the
|
||||
// smaller or larger key depending on the forwards flag.
|
||||
compare := bytes.Compare(iter.dbIter.Key(), iter.cacheIter.Key())
|
||||
if (forwards && compare > 0) || (!forwards && compare < 0) {
|
||||
iter.currentIter = iter.cacheIter
|
||||
} else {
|
||||
iter.currentIter = iter.dbIter
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// First positions the iterator at the first key/value pair and returns whether
|
||||
// or not the pair exists.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) First() bool {
|
||||
// Seek to the first key in both the database and cache iterators and
|
||||
// choose the iterator that is both valid and has the smaller key.
|
||||
iter.dbIter.First()
|
||||
iter.cacheIter.First()
|
||||
return iter.chooseIterator(true)
|
||||
}
|
||||
|
||||
// Last positions the iterator at the last key/value pair and returns whether or
|
||||
// not the pair exists.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Last() bool {
|
||||
// Seek to the last key in both the database and cache iterators and
|
||||
// choose the iterator that is both valid and has the larger key.
|
||||
iter.dbIter.Last()
|
||||
iter.cacheIter.Last()
|
||||
return iter.chooseIterator(false)
|
||||
}
|
||||
|
||||
// Next moves the iterator one key/value pair forward and returns whether or not
|
||||
// the pair exists.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Next() bool {
|
||||
// Nothing to return if cursor is exhausted.
|
||||
if iter.currentIter == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Move the current iterator to the next entry and choose the iterator
|
||||
// that is both valid and has the smaller key.
|
||||
iter.currentIter.Next()
|
||||
return iter.chooseIterator(true)
|
||||
}
|
||||
|
||||
// Prev moves the iterator one key/value pair backward and returns whether or
|
||||
// not the pair exists.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Prev() bool {
|
||||
// Nothing to return if cursor is exhausted.
|
||||
if iter.currentIter == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Move the current iterator to the previous entry and choose the
|
||||
// iterator that is both valid and has the larger key.
|
||||
iter.currentIter.Prev()
|
||||
return iter.chooseIterator(false)
|
||||
}
|
||||
|
||||
// Seek positions the iterator at the first key/value pair that is greater than
|
||||
// or equal to the passed seek key. Returns false if no suitable key was found.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Seek(key []byte) bool {
|
||||
// Seek to the provided key in both the database and cache iterators
|
||||
// then choose the iterator that is both valid and has the larger key.
|
||||
iter.dbIter.Seek(key)
|
||||
iter.cacheIter.Seek(key)
|
||||
return iter.chooseIterator(true)
|
||||
}
|
||||
|
||||
// Valid indicates whether the iterator is positioned at a valid key/value pair.
|
||||
// It will be considered invalid when the iterator is newly created or exhausted.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Valid() bool {
|
||||
return iter.currentIter != nil
|
||||
}
|
||||
|
||||
// Key returns the current key the iterator is pointing to.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Key() []byte {
|
||||
// Nothing to return if iterator is exhausted.
|
||||
if iter.currentIter == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return iter.currentIter.Key()
|
||||
}
|
||||
|
||||
// Value returns the current value the iterator is pointing to.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Value() []byte {
|
||||
// Nothing to return if iterator is exhausted.
|
||||
if iter.currentIter == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return iter.currentIter.Value()
|
||||
}
|
||||
|
||||
// SetReleaser is only provided to satisfy the iterator interface as there is no
|
||||
// need to override it.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) SetReleaser(releaser util.Releaser) {
|
||||
}
|
||||
|
||||
// Release releases the iterator by removing the underlying treap iterator from
|
||||
// the list of active iterators against the pending keys treap.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Release() {
|
||||
if !iter.released {
|
||||
iter.dbIter.Release()
|
||||
iter.cacheIter.Release()
|
||||
iter.currentIter = nil
|
||||
iter.released = true
|
||||
}
|
||||
}
|
||||
|
||||
// Error is only provided to satisfy the iterator interface as there are no
|
||||
// errors for this memory-only structure.
|
||||
//
|
||||
// This is part of the leveldb iterator.Iterator interface implementation.
|
||||
func (iter *dbCacheIterator) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// dbCacheSnapshot defines a snapshot of the database cache and underlying
|
||||
// database at a particular point in time.
|
||||
type dbCacheSnapshot struct {
|
||||
dbSnapshot *leveldb.Snapshot
|
||||
pendingKeys *treap.Immutable
|
||||
pendingRemove *treap.Immutable
|
||||
}
|
||||
|
||||
// Has returns whether or not the passed key exists.
|
||||
func (snap *dbCacheSnapshot) Has(key []byte) bool {
|
||||
// Check the cached entries first.
|
||||
if snap.pendingRemove.Has(key) {
|
||||
return false
|
||||
}
|
||||
if snap.pendingKeys.Has(key) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Consult the database.
|
||||
hasKey, _ := snap.dbSnapshot.Has(key, nil)
|
||||
return hasKey
|
||||
}
|
||||
|
||||
// Get returns the value for the passed key. The function will return nil when
|
||||
// the key does not exist.
|
||||
func (snap *dbCacheSnapshot) Get(key []byte) []byte {
|
||||
// Check the cached entries first.
|
||||
if snap.pendingRemove.Has(key) {
|
||||
return nil
|
||||
}
|
||||
if value := snap.pendingKeys.Get(key); value != nil {
|
||||
return value
|
||||
}
|
||||
|
||||
// Consult the database.
|
||||
value, err := snap.dbSnapshot.Get(key, nil)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
// Release releases the snapshot.
|
||||
func (snap *dbCacheSnapshot) Release() {
|
||||
snap.dbSnapshot.Release()
|
||||
snap.pendingKeys = nil
|
||||
snap.pendingRemove = nil
|
||||
}
|
||||
|
||||
// NewIterator returns a new iterator for the snapshot. The newly returned
|
||||
// iterator is not pointing to a valid item until a call to one of the methods
|
||||
// to position it is made.
|
||||
//
|
||||
// The slice parameter allows the iterator to be limited to a range of keys.
|
||||
// The start key is inclusive and the limit key is exclusive. Either or both
|
||||
// can be nil if the functionality is not desired.
|
||||
func (snap *dbCacheSnapshot) NewIterator(slice *util.Range) *dbCacheIterator {
|
||||
return &dbCacheIterator{
|
||||
dbIter: snap.dbSnapshot.NewIterator(slice, nil),
|
||||
cacheIter: newLdbCacheIter(snap, slice),
|
||||
cacheSnapshot: snap,
|
||||
}
|
||||
}
|
||||
|
||||
// txLogEntryType defines the type of a log entry.
|
||||
type txLogEntryType uint8
|
||||
|
||||
// The following constants define the allowed log entry types.
|
||||
const (
|
||||
// entryTypeUpdate specifies a key is to be added or updated to a given
|
||||
// value.
|
||||
entryTypeUpdate txLogEntryType = iota
|
||||
|
||||
// entryTypeRemove species a key is to be removed.
|
||||
entryTypeRemove
|
||||
)
|
||||
|
||||
// txLogEntry defines an entry in the transaction log. It is used when
|
||||
// replaying transactions during a cache flush.
|
||||
type txLogEntry struct {
|
||||
entryType txLogEntryType
|
||||
key []byte
|
||||
value []byte // Only set for entryTypUpdate.
|
||||
}
|
||||
|
||||
// dbCache provides a database cache layer backed by an underlying database. It
|
||||
// allows a maximum cache size and flush interval to be specified such that the
|
||||
// cache is flushed to the database when the cache size exceeds the maximum
|
||||
// configured value or it has been longer than the configured interval since the
|
||||
// last flush. This effectively provides transaction batching so that callers
|
||||
// can commit transactions at will without incurring large performance hits due
|
||||
// to frequent disk syncs.
|
||||
type dbCache struct {
|
||||
// ldb is the underlying leveldb DB for metadata.
|
||||
ldb *leveldb.DB
|
||||
|
||||
// store is used to sync blocks to flat files.
|
||||
store *blockStore
|
||||
|
||||
// The following fields are related to flushing the cache to persistent
|
||||
// storage. Note that all flushing is performed in an opportunistic
|
||||
// fashion. This means that it is only flushed during a transaction or
|
||||
// when the database cache is closed.
|
||||
//
|
||||
// maxSize is the maximum size threshold the cache can grow to before
|
||||
// it is flushed.
|
||||
//
|
||||
// flushInterval is the threshold interval of time that is allowed to
|
||||
// pass before the cache is flushed.
|
||||
//
|
||||
// lastFlush is the time the cache was last flushed. It is used in
|
||||
// conjunction with the current time and the flush interval.
|
||||
//
|
||||
// txLog maintains a log of all modifications made by each committed
|
||||
// transaction since the last flush. When a flush happens, the data
|
||||
// in the current flat file being written to is synced and then all
|
||||
// transaction metadata is replayed into the database. The sync is
|
||||
// necessary to ensure the metadata is only updated after the associated
|
||||
// block data has been written to persistent storage so crash recovery
|
||||
// can be handled properly.
|
||||
//
|
||||
// This log approach is used because leveldb consumes a massive amount
|
||||
// of memory for batches that have large numbers of entries, so the
|
||||
// final state of the cache can't simply be batched without causing
|
||||
// memory usage to balloon unreasonably. It also isn't possible to
|
||||
// batch smaller pieces of the final state since that would result in
|
||||
// inconsistent metadata should an unexpected failure such as power
|
||||
// loss occur in the middle of writing the pieces.
|
||||
//
|
||||
// NOTE: These flush related fields are protected by the database write
|
||||
// lock.
|
||||
maxSize uint64
|
||||
flushInterval time.Duration
|
||||
lastFlush time.Time
|
||||
txLog [][]txLogEntry
|
||||
|
||||
// The following fields hold the keys that need to be stored or deleted
|
||||
// from the underlying database once the cache is full, enough time has
|
||||
// passed, or when the database is shutting down. Note that these are
|
||||
// stored using immutable treaps to support O(1) MVCC snapshots against
|
||||
// the cached data. The cacheLock is used to protect concurrent access
|
||||
// for cache updates and snapshots.
|
||||
cacheLock sync.RWMutex
|
||||
cachedKeys *treap.Immutable
|
||||
cachedRemove *treap.Immutable
|
||||
}
|
||||
|
||||
// Snapshot returns a snapshot of the database cache and underlying database at
|
||||
// a particular point in time.
|
||||
//
|
||||
// The snapshot must be released after use by calling Release.
|
||||
func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) {
|
||||
dbSnapshot, err := c.ldb.GetSnapshot()
|
||||
if err != nil {
|
||||
str := "failed to open transaction"
|
||||
return nil, convertErr(str, err)
|
||||
}
|
||||
|
||||
// Since the cached keys to be added and removed use an immutable treap,
|
||||
// a snapshot is simply obtaining the root of the tree under the lock
|
||||
// which is used to atomically swap the root.
|
||||
c.cacheLock.RLock()
|
||||
cacheSnapshot := &dbCacheSnapshot{
|
||||
dbSnapshot: dbSnapshot,
|
||||
pendingKeys: c.cachedKeys,
|
||||
pendingRemove: c.cachedRemove,
|
||||
}
|
||||
c.cacheLock.RUnlock()
|
||||
return cacheSnapshot, nil
|
||||
}
|
||||
|
||||
// flush flushes the database cache to persistent storage. This involes syncing
|
||||
// the block store and replaying all transactions that have been applied to the
|
||||
// cache to the underlying database.
|
||||
//
|
||||
// This function MUST be called with the database write lock held.
|
||||
func (c *dbCache) flush() error {
|
||||
c.lastFlush = time.Now()
|
||||
|
||||
// Sync the current write file associated with the block store. This is
|
||||
// necessary before writing the metadata to prevent the case where the
|
||||
// metadata contains information about a block which actually hasn't
|
||||
// been written yet in unexpected shutdown scenarios.
|
||||
if err := c.store.syncBlocks(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Nothing to do if there are no transactions to flush.
|
||||
if len(c.txLog) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Perform all leveldb updates using batches for atomicity.
|
||||
batchLen := 0
|
||||
batchTxns := 0
|
||||
batch := new(leveldb.Batch)
|
||||
for logTxNum, txLogEntries := range c.txLog {
|
||||
// Replay the transaction from the log into the current batch.
|
||||
for _, logEntry := range txLogEntries {
|
||||
switch logEntry.entryType {
|
||||
case entryTypeUpdate:
|
||||
batch.Put(logEntry.key, logEntry.value)
|
||||
case entryTypeRemove:
|
||||
batch.Delete(logEntry.key)
|
||||
}
|
||||
}
|
||||
batchTxns++
|
||||
|
||||
// Write and reset the current batch when the number of items in
|
||||
// it exceeds the the batch threshold or this is the last
|
||||
// transaction in the log.
|
||||
batchLen += len(txLogEntries)
|
||||
if batchLen > batchThreshold || logTxNum == len(c.txLog)-1 {
|
||||
if err := c.ldb.Write(batch, nil); err != nil {
|
||||
return convertErr("failed to write batch", err)
|
||||
}
|
||||
batch.Reset()
|
||||
batchLen = 0
|
||||
|
||||
// Clear the transactions that were written from the
|
||||
// log so the memory can be reclaimed.
|
||||
for i := logTxNum - (batchTxns - 1); i <= logTxNum; i++ {
|
||||
c.txLog[i] = nil
|
||||
}
|
||||
batchTxns = 0
|
||||
}
|
||||
}
|
||||
c.txLog = c.txLog[:]
|
||||
|
||||
// Clear the cache since it has been flushed.
|
||||
c.cacheLock.Lock()
|
||||
c.cachedKeys = treap.NewImmutable()
|
||||
c.cachedRemove = treap.NewImmutable()
|
||||
c.cacheLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// needsFlush returns whether or not the database cache needs to be flushed to
|
||||
// persistent storage based on its current size, whether or not adding all of
|
||||
// the entries in the passed database transaction would cause it to exceed the
|
||||
// configured limit, and how much time has elapsed since the last time the cache
|
||||
// was flushed.
|
||||
//
|
||||
// This function MUST be called with the database write lock held.
|
||||
func (c *dbCache) needsFlush(tx *transaction) bool {
|
||||
// A flush is needed when more time has elapsed than the configured
|
||||
// flush interval.
|
||||
if time.Now().Sub(c.lastFlush) > c.flushInterval {
|
||||
return true
|
||||
}
|
||||
|
||||
// Calculate the size of the transaction log.
|
||||
var txLogSize uint64
|
||||
for _, txLogEntries := range c.txLog {
|
||||
txLogSize += uint64(cap(txLogEntries)) * logEntryFieldsSize
|
||||
}
|
||||
txLogSize += uint64(cap(c.txLog)) * sliceOverheadSize
|
||||
|
||||
// A flush is needed when the size of the database cache exceeds the
|
||||
// specified max cache size. The total calculated size is multiplied by
|
||||
// 1.5 here to account for additional memory consumption that will be
|
||||
// needed during the flush as well as old nodes in the cache that are
|
||||
// referenced by the snapshot used by the transaction.
|
||||
snap := tx.snapshot
|
||||
totalSize := txLogSize + snap.pendingKeys.Size() + snap.pendingRemove.Size()
|
||||
totalSize = uint64(float64(totalSize) * 1.5)
|
||||
if totalSize > c.maxSize {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// commitTx atomically adds all of the pending keys to add and remove into the
|
||||
// database cache. When adding the pending keys would cause the size of the
|
||||
// cache to exceed the max cache size, or the time since the last flush exceeds
|
||||
// the configured flush interval, the cache will be flushed to the underlying
|
||||
// persistent database.
|
||||
//
|
||||
// This is an atomic operation with respect to the cache in that either all of
|
||||
// the pending keys to add and remove in the transaction will be applied or none
|
||||
// of them will.
|
||||
//
|
||||
// The database cache itself might be flushed to the underlying persistent
|
||||
// database even if the transaction fails to apply, but it will only be the
|
||||
// state of the cache without the transaction applied.
|
||||
//
|
||||
// This function MUST be called during a database write transaction which in
|
||||
// turn implies the database write lock will be held.
|
||||
func (c *dbCache) commitTx(tx *transaction) error {
|
||||
// Flush the cache and write directly to the database if a flush is
|
||||
// needed.
|
||||
if c.needsFlush(tx) {
|
||||
if err := c.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Perform all leveldb update operations using a batch for
|
||||
// atomicity.
|
||||
batch := new(leveldb.Batch)
|
||||
tx.pendingKeys.ForEach(func(k, v []byte) bool {
|
||||
batch.Put(k, v)
|
||||
return true
|
||||
})
|
||||
tx.pendingKeys = nil
|
||||
tx.pendingRemove.ForEach(func(k, v []byte) bool {
|
||||
batch.Delete(k)
|
||||
return true
|
||||
})
|
||||
tx.pendingRemove = nil
|
||||
if err := c.ldb.Write(batch, nil); err != nil {
|
||||
return convertErr("failed to commit transaction", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// At this point a database flush is not needed, so atomically commit
|
||||
// the transaction to the cache.
|
||||
|
||||
// Create a slice of transaction log entries large enough to house all
|
||||
// of the updates and add it to the list of logged transactions to
|
||||
// replay on flush.
|
||||
numEntries := tx.pendingKeys.Len() + tx.pendingRemove.Len()
|
||||
txLogEntries := make([]txLogEntry, numEntries)
|
||||
c.txLog = append(c.txLog, txLogEntries)
|
||||
|
||||
// Since the cached keys to be added and removed use an immutable treap,
|
||||
// a snapshot is simply obtaining the root of the tree under the lock
|
||||
// which is used to atomically swap the root.
|
||||
c.cacheLock.RLock()
|
||||
newCachedKeys := c.cachedKeys
|
||||
newCachedRemove := c.cachedRemove
|
||||
c.cacheLock.RUnlock()
|
||||
|
||||
// Apply every key to add in the database transaction to the cache.
|
||||
// Also create a transaction log entry for each one at the same time so
|
||||
// the database transaction can be replayed during flush.
|
||||
logEntryNum := 0
|
||||
tx.pendingKeys.ForEach(func(k, v []byte) bool {
|
||||
newCachedRemove = newCachedRemove.Delete(k)
|
||||
newCachedKeys = newCachedKeys.Put(k, v)
|
||||
|
||||
logEntry := &txLogEntries[logEntryNum]
|
||||
logEntry.entryType = entryTypeUpdate
|
||||
logEntry.key = k
|
||||
logEntry.value = v
|
||||
logEntryNum++
|
||||
return true
|
||||
})
|
||||
tx.pendingKeys = nil
|
||||
|
||||
// Apply every key to remove in the database transaction to the cache.
|
||||
// Also create a transaction log entry for each one at the same time so
|
||||
// the database transaction can be replayed during flush.
|
||||
tx.pendingRemove.ForEach(func(k, v []byte) bool {
|
||||
newCachedKeys = newCachedKeys.Delete(k)
|
||||
newCachedRemove = newCachedRemove.Put(k, nil)
|
||||
|
||||
logEntry := &txLogEntries[logEntryNum]
|
||||
logEntry.entryType = entryTypeRemove
|
||||
logEntry.key = k
|
||||
logEntryNum++
|
||||
return true
|
||||
})
|
||||
tx.pendingRemove = nil
|
||||
|
||||
// Atomically replace the immutable treaps which hold the cached keys to
|
||||
// add and delete.
|
||||
c.cacheLock.Lock()
|
||||
c.cachedKeys = newCachedKeys
|
||||
c.cachedRemove = newCachedRemove
|
||||
c.cacheLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close cleanly shuts down the database cache by syncing all data and closing
|
||||
// the underlying leveldb database.
|
||||
//
|
||||
// This function MUST be called with the database write lock held.
|
||||
func (c *dbCache) Close() error {
|
||||
// Flush any outstanding cached entries to disk.
|
||||
if err := c.flush(); err != nil {
|
||||
// Even if there is an error while flushing, attempt to close
|
||||
// the underlying database. The error is ignored since it would
|
||||
// mask the flush error.
|
||||
_ = c.ldb.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Close the underlying leveldb database.
|
||||
if err := c.ldb.Close(); err != nil {
|
||||
str := "failed to close underlying leveldb database"
|
||||
return convertErr(str, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newDbCache returns a new database cache instance backed by the provided
|
||||
// leveldb instance. The cache will be flushed to leveldb when the max size
|
||||
// exceeds the provided value or it has been longer than the provided interval
|
||||
// since the last flush.
|
||||
func newDbCache(ldb *leveldb.DB, store *blockStore, maxSize uint64, flushIntervalSecs uint32) *dbCache {
|
||||
return &dbCache{
|
||||
ldb: ldb,
|
||||
store: store,
|
||||
maxSize: maxSize,
|
||||
flushInterval: time.Second * time.Duration(flushIntervalSecs),
|
||||
lastFlush: time.Now(),
|
||||
cachedKeys: treap.NewImmutable(),
|
||||
cachedRemove: treap.NewImmutable(),
|
||||
}
|
||||
}
|
|
@ -47,7 +47,7 @@ func (iter *ldbTreapIter) Release() {
|
|||
}
|
||||
}
|
||||
|
||||
// newLdbTreapIter create a new treap iterator for the given slice against the
|
||||
// newLdbTreapIter creates a new treap iterator for the given slice against the
|
||||
// pending keys for the passed transaction and returns it wrapped in an
|
||||
// ldbTreapIter so it can be used as a leveldb iterator. It also adds the new
|
||||
// iterator to the list of active iterators for the transaction.
|
||||
|
|
|
@ -53,7 +53,7 @@ func reconcileDB(pdb *db, create bool) (database.DB, error) {
|
|||
// Perform initial internal bucket and value creation during database
|
||||
// creation.
|
||||
if create {
|
||||
if err := initDB(pdb.ldb); err != nil {
|
||||
if err := initDB(pdb.cache.ldb); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,14 +214,8 @@ func TestCornerCases(t *testing.T) {
|
|||
}
|
||||
_ = os.RemoveAll(filePath)
|
||||
|
||||
// Start a transaction and close the underlying leveldb database out
|
||||
// from under it.
|
||||
dbTx, err := idb.Begin(true)
|
||||
if err != nil {
|
||||
t.Errorf("Begin: unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
ldb := idb.(*db).ldb
|
||||
// Close the underlying leveldb database out from under the database.
|
||||
ldb := idb.(*db).cache.ldb
|
||||
ldb.Close()
|
||||
|
||||
// Ensure initilization errors in the underlying database work as
|
||||
|
@ -233,15 +227,6 @@ func TestCornerCases(t *testing.T) {
|
|||
return
|
||||
}
|
||||
|
||||
// Ensure errors in the underlying database during a transaction commit
|
||||
// are handled properly.
|
||||
testName = "Commit: underlying leveldb error"
|
||||
wantErrCode = database.ErrDbNotOpen
|
||||
err = dbTx.Commit()
|
||||
if !checkDbError(t, testName, err, wantErrCode) {
|
||||
return
|
||||
}
|
||||
|
||||
// Ensure the View handles errors in the underlying leveldb database
|
||||
// properly.
|
||||
testName = "View: underlying leveldb error"
|
||||
|
@ -325,16 +310,16 @@ func testWriteFailures(tc *testContext) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// Ensure file sync errors during writeBlock return the expected error.
|
||||
// Ensure file sync errors during flush return the expected error.
|
||||
store := tc.db.(*db).store
|
||||
testName := "writeBlock: file sync failure"
|
||||
testName := "flush: file sync failure"
|
||||
store.writeCursor.Lock()
|
||||
oldFile := store.writeCursor.curFile
|
||||
store.writeCursor.curFile = &lockableFile{
|
||||
file: &mockFile{forceSyncErr: true, maxSize: -1},
|
||||
}
|
||||
store.writeCursor.Unlock()
|
||||
_, err := store.writeBlock([]byte{0x00})
|
||||
err := tc.db.(*db).cache.flush()
|
||||
if !checkDbError(tc.t, testName, err, database.ErrDriverSpecific) {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -392,8 +392,12 @@ type Tx interface {
|
|||
// ******************************************************************
|
||||
|
||||
// Commit commits all changes that have been made to the metadata or
|
||||
// block storage to persistent storage. Calling this function on a
|
||||
// managed transaction will result in a panic.
|
||||
// block storage. Depending on the backend implementation this could be
|
||||
// to a cache that is periodically synced to persistent storage or
|
||||
// directly to persistent storage. In any case, all transactions which
|
||||
// are started after the commit finishes will include all changes made
|
||||
// by this transaction. Calling this function on a managed transaction
|
||||
// will result in a panic.
|
||||
Commit() error
|
||||
|
||||
// Rollback undoes all changes that have been made to the metadata or
|
||||
|
|
Loading…
Add table
Reference in a new issue