From 0b32febe5c83be9782671d5246f1a056255fc7ab Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Wed, 3 Feb 2016 11:41:46 -0600 Subject: [PATCH] database: Implement cache layer. This commit adds a database cache layer to the ffldb database backend so that callers can commit multiple transactions without having to incur the overhead of a disk sync on every new block. --- database2/ffldb/blockio.go | 40 +- database2/ffldb/db.go | 139 +++--- database2/ffldb/dbcache.go | 705 +++++++++++++++++++++++++++++++ database2/ffldb/ldbtreapiter.go | 2 +- database2/ffldb/reconcile.go | 2 +- database2/ffldb/whitebox_test.go | 25 +- database2/interface.go | 8 +- 7 files changed, 815 insertions(+), 106 deletions(-) create mode 100644 database2/ffldb/dbcache.go diff --git a/database2/ffldb/blockio.go b/database2/ffldb/blockio.go index d9a30cff..36fa2cef 100644 --- a/database2/ffldb/blockio.go +++ b/database2/ffldb/blockio.go @@ -417,7 +417,7 @@ func (s *blockStore) writeBlock(rawBlock []byte) (blockLocation, error) { wc := s.writeCursor finalOffset := wc.curOffset + fullLen if finalOffset < wc.curOffset || finalOffset > s.maxBlockFileSize { - // This is done under the write cursor lock since the fileNum + // This is done under the write cursor lock since the curFileNum // field is accessed elsewhere by readers. // // Close the current write file to force a read-only reopen @@ -483,14 +483,6 @@ func (s *blockStore) writeBlock(rawBlock []byte) (blockLocation, error) { return blockLocation{}, err } - // Sync the file to disk. - if err := wc.curFile.file.Sync(); err != nil { - str := fmt.Sprintf("failed to sync file %d: %v", wc.curFileNum, - err) - return blockLocation{}, makeDbErr(database.ErrDriverSpecific, - str, err) - } - loc := blockLocation{ blockFileNum: wc.curFileNum, fileOffset: origOffset, @@ -594,6 +586,36 @@ func (s *blockStore) readBlockRegion(loc blockLocation, offset, numBytes uint32) return serializedData, nil } +// syncBlocks performs a file system sync on the flat file associated with the +// store's current write cursor. It is safe to call even when there is not a +// current write file in which case it will have no effect. +// +// This is used when flushing cached metadata updates to disk to ensure all the +// block data is fully written before updating the metadata. This ensures the +// metadata and block data can be properly reconciled in failure scenarios. +func (s *blockStore) syncBlocks() error { + wc := s.writeCursor + wc.RLock() + defer wc.RUnlock() + + // Nothing to do if there is no current file associated with the write + // cursor. + wc.curFile.RLock() + defer wc.curFile.RUnlock() + if wc.curFile.file == nil { + return nil + } + + // Sync the file to disk. + if err := wc.curFile.file.Sync(); err != nil { + str := fmt.Sprintf("failed to sync file %d: %v", wc.curFileNum, + err) + return makeDbErr(database.ErrDriverSpecific, str, err) + } + + return nil +} + // handleRollback rolls the block files on disk back to the provided file number // and offset. This involves potentially deleting and truncating the files that // were partially written. diff --git a/database2/ffldb/db.go b/database2/ffldb/db.go index 93bd03dd..1b3d59eb 100644 --- a/database2/ffldb/db.go +++ b/database2/ffldb/db.go @@ -225,12 +225,12 @@ func (c *cursor) Delete() error { // skipPendingUpdates skips any keys at the current database iterator position // that are being updated by the transaction. The forwards flag indicates the -// direction the cursor is moving moved. +// direction the cursor is moving. func (c *cursor) skipPendingUpdates(forwards bool) { for c.dbIter.Valid() { var skip bool key := c.dbIter.Key() - if _, ok := c.bucket.tx.pendingRemove[string(key)]; ok { + if c.bucket.tx.pendingRemove.Has(key) { skip = true } else if c.bucket.tx.pendingKeys.Has(key) { skip = true @@ -249,7 +249,7 @@ func (c *cursor) skipPendingUpdates(forwards bool) { // chooseIterator first skips any entries in the database iterator that are // being updated by the transaction and sets the current iterator to the -// appropriate iterator depending on their validatidy and the order they compare +// appropriate iterator depending on their validity and the order they compare // in while taking into account the direction flag. When the cursor is being // moved forwards and both iterators are valid, the iterator with the smaller // key is chosen and vice versa when the cursor is being moved backwards. @@ -258,7 +258,7 @@ func (c *cursor) chooseIterator(forwards bool) bool { // being updated by the transaction. c.skipPendingUpdates(forwards) - // When bother iterators are exhausted, the cursor is exhausted too. + // When both iterators are exhausted, the cursor is exhausted too. if !c.dbIter.Valid() && !c.pendingIter.Valid() { c.currentIter = nil return false @@ -494,7 +494,7 @@ func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor { switch cursorTyp { case ctKeys: keyRange := util.BytesPrefix(bucketID) - dbIter = b.tx.snapshot.NewIterator(keyRange, nil) + dbIter = b.tx.snapshot.NewIterator(keyRange) pendingKeyIter := newLdbTreapIter(b.tx, keyRange) pendingIter = pendingKeyIter @@ -510,7 +510,7 @@ func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor { copy(prefix[len(bucketIndexPrefix):], bucketID) bucketRange := util.BytesPrefix(prefix) - dbIter = b.tx.snapshot.NewIterator(bucketRange, nil) + dbIter = b.tx.snapshot.NewIterator(bucketRange) pendingBucketIter := newLdbTreapIter(b.tx, bucketRange) pendingIter = pendingBucketIter @@ -528,8 +528,8 @@ func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor { // Since both keys and buckets are needed from the database, // create an individual iterator for each prefix and then create // a merged iterator from them. - dbKeyIter := b.tx.snapshot.NewIterator(keyRange, nil) - dbBucketIter := b.tx.snapshot.NewIterator(bucketRange, nil) + dbKeyIter := b.tx.snapshot.NewIterator(keyRange) + dbBucketIter := b.tx.snapshot.NewIterator(bucketRange) iters := []iterator.Iterator{dbKeyIter, dbBucketIter} dbIter = iterator.NewMergedIterator(iters, comparer.DefaultComparer, true) @@ -952,13 +952,13 @@ type pendingBlock struct { // read-write and implements the database.Bucket interface. The transaction // provides a root bucket against which all read and writes occur. type transaction struct { - managed bool // Is the transaction managed? - closed bool // Is the transaction closed? - writable bool // Is the transaction writable? - db *db // DB instance the tx was created from. - snapshot *leveldb.Snapshot // Underlying snapshot for txns. - metaBucket *bucket // The root metadata bucket. - blockIdxBucket *bucket // The block index bucket. + managed bool // Is the transaction managed? + closed bool // Is the transaction closed? + writable bool // Is the transaction writable? + db *db // DB instance the tx was created from. + snapshot *dbCacheSnapshot // Underlying snapshot for txns. + metaBucket *bucket // The root metadata bucket. + blockIdxBucket *bucket // The block index bucket. // Blocks that need to be stored on commit. The pendingBlocks map is // kept to allow quick lookups of pending data by block hash. @@ -967,7 +967,7 @@ type transaction struct { // Keys that need to be stored or deleted on commit. pendingKeys *treap.Mutable - pendingRemove map[string]struct{} + pendingRemove *treap.Mutable // Active iterators that need to be notified when the pending keys have // been updated so the cursors can properly handle updates to the @@ -1030,7 +1030,7 @@ func (tx *transaction) hasKey(key []byte) bool { // When the transaction is writable, check the pending transaction // state first. if tx.writable { - if _, ok := tx.pendingRemove[string(key)]; ok { + if tx.pendingRemove.Has(key) { return false } if tx.pendingKeys.Has(key) { @@ -1038,9 +1038,8 @@ func (tx *transaction) hasKey(key []byte) bool { } } - // Consult the database. - hasKey, _ := tx.snapshot.Has(key, nil) - return hasKey + // Consult the database cache and underlying database. + return tx.snapshot.Has(key) } // putKey adds the provided key to the list of keys to be updated in the @@ -1051,7 +1050,7 @@ func (tx *transaction) hasKey(key []byte) bool { func (tx *transaction) putKey(key, value []byte) error { // Prevent the key from being deleted if it was previously scheduled // to be deleted on transaction commit. - delete(tx.pendingRemove, string(key)) + tx.pendingRemove.Delete(key) // Add the key/value pair to the list to be written on transaction // commit. @@ -1060,14 +1059,14 @@ func (tx *transaction) putKey(key, value []byte) error { return nil } -// fetchKey attempts to fetch the provided key from the database while taking -// into account the current transaction state. Returns nil if the key does not -// exist. +// fetchKey attempts to fetch the provided key from the database cache (and +// hence underlying database) while taking into account the current transaction +// state. Returns nil if the key does not exist. func (tx *transaction) fetchKey(key []byte) []byte { // When the transaction is writable, check the pending transaction // state first. if tx.writable { - if _, ok := tx.pendingRemove[string(key)]; ok { + if tx.pendingRemove.Has(key) { return nil } if value := tx.pendingKeys.Get(key); value != nil { @@ -1075,11 +1074,8 @@ func (tx *transaction) fetchKey(key []byte) []byte { } } - value, err := tx.snapshot.Get(key, nil) - if err != nil { - return nil - } - return value + // Consult the database cache and underlying database. + return tx.snapshot.Get(key) } // deleteKey adds the provided key to the list of keys to be deleted from the @@ -1094,10 +1090,7 @@ func (tx *transaction) deleteKey(key []byte, notifyIterators bool) { tx.pendingKeys.Delete(key) // Add the key to the list to be deleted on transaction commit. - if tx.pendingRemove == nil { - tx.pendingRemove = make(map[string]struct{}) - } - tx.pendingRemove[string(key)] = struct{}{} + tx.pendingRemove.Put(key, nil) // Notify the active iterators about the change if the flag is set. if notifyIterators { @@ -1644,7 +1637,7 @@ func (tx *transaction) close() { tx.pendingBlockData = nil // Clear pending keys that would have been written or deleted on commit. - tx.pendingKeys.Reset() + tx.pendingKeys = nil tx.pendingRemove = nil // Release the snapshot. @@ -1677,7 +1670,7 @@ func serializeBlockRow(blockLoc blockLocation, blockHdr []byte) []byte { // writePendingAndCommit writes pending block data to the flat block files, // updates the metadata with their locations as well as the new current write -// location, and commits the metadata to the underlying database. It also +// location, and commits the metadata to the memory database cache. It also // properly handles rollback in the case of failures. // // This function MUST only be called when there is pending data to be written. @@ -1728,25 +1721,17 @@ func (tx *transaction) writePendingAndCommit() error { return convertErr("failed to store write cursor", err) } - // Perform all leveldb update operations using a batch for atomicity. - batch := new(leveldb.Batch) - tx.pendingKeys.ForEach(func(k, v []byte) bool { - batch.Put(k, v) - return true - }) - for k := range tx.pendingRemove { - batch.Delete([]byte(k)) - } - if err := tx.db.ldb.Write(batch, nil); err != nil { - rollback() - return convertErr("failed to commit transaction", err) - } - - return nil + // Atomically update the database cache. The cache automatically + // handles flushing to the underlying persistent storage database. + return tx.db.cache.commitTx(tx) } -// Commit commits all changes that have been made through the root bucket and -// all of its sub-buckets to persistent storage. +// Commit commits all changes that have been made to the root metadata bucket +// and all of its sub-buckets to the database cache which is periodically synced +// to persistent storage. In addition, it commits all new blocks directly to +// persistent storage bypassing the db cache. Blocks can be rather large, so +// this help increase the amount of cache available for the metadata updates and +// is safe since blocks are immutable. // // This function is part of the database.Tx interface implementation. func (tx *transaction) Commit() error { @@ -1802,8 +1787,8 @@ type db struct { writeLock sync.Mutex // Limit to one write transaction at a time. closeLock sync.RWMutex // Make database close block while txns active. closed bool // Is the database closed? - ldb *leveldb.DB // The underlying leveldb DB for metadata. store *blockStore // Handles read/writing blocks to flat files. + cache *dbCache // Cache layer which wraps underlying leveldb DB. } // Enforce db implements the database.DB interface. @@ -1846,24 +1831,26 @@ func (db *db) begin(writable bool) (*transaction, error) { nil) } - snapshot, err := db.ldb.GetSnapshot() + // Grab a snapshot of the database cache (which in turn also handles the + // underlying database). + snapshot, err := db.cache.Snapshot() if err != nil { db.closeLock.RUnlock() if writable { db.writeLock.Unlock() } - str := "failed to open transaction" - return nil, convertErr(str, err) + return nil, err } // The metadata and block index buckets are internal-only buckets, so // they have defined IDs. tx := &transaction{ - writable: writable, - db: db, - snapshot: snapshot, - pendingKeys: treap.NewMutable(), + writable: writable, + db: db, + snapshot: snapshot, + pendingKeys: treap.NewMutable(), + pendingRemove: treap.NewMutable(), } tx.metaBucket = &bucket{tx: tx, id: metadataBucketID} tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID} @@ -1968,10 +1955,9 @@ func (db *db) Update(fn func(database.Tx) error) error { return tx.Commit() } -// Close cleanly shuts down the database and syncs all data. Any data in -// database transactions which have not been committed will be lost, so it is -// important to ensure all transactions are finalized prior to calling this -// function if that data is intended to be stored. +// Close cleanly shuts down the database and syncs all data. It will block +// until all database transactions have been finalized (rolled back or +// committed). // // This function is part of the database.DB interface implementation. func (db *db) Close() error { @@ -1985,6 +1971,15 @@ func (db *db) Close() error { } db.closed = true + // Close the database cache which will flush any existing entries to + // disk and close the underlying leveldb database. Any error is saved + // and returned at the end after the remaining cleanup since the + // database will be marked closed even if this fails given there is no + // good way for the caller to recover from a failure here anyways. + db.writeLock.Lock() + closeErr := db.cache.Close() + db.writeLock.Unlock() + // NOTE: Since the above lock waits for all transactions to finish and // prevents any new ones from being started, it is safe to clear all // state without the individual locks. @@ -2002,12 +1997,7 @@ func (db *db) Close() error { db.store.openBlocksLRU.Init() db.store.fileNumToLRUElem = nil - if err := db.ldb.Close(); err != nil { - str := "failed to close underlying leveldb database" - return convertErr(str, err) - } - - return nil + return closeErr } // filesExists reports whether the named file or directory exists. @@ -2082,9 +2072,12 @@ func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, e // Create the block store which includes scanning the existing flat // block files to find what the current write cursor position is - // according to the data that is actually on disk. + // according to the data that is actually on disk. Also create the + // database cache which wraps the underlying leveldb database to provide + // write caching. store := newBlockStore(dbPath, network) - pdb := &db{ldb: ldb, store: store} + cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs) + pdb := &db{store: store, cache: cache} // Perform any reconciliation needed between the block and metadata as // well as database initialization, if needed. diff --git a/database2/ffldb/dbcache.go b/database2/ffldb/dbcache.go new file mode 100644 index 00000000..d3bc1992 --- /dev/null +++ b/database2/ffldb/dbcache.go @@ -0,0 +1,705 @@ +// Copyright (c) 2015-2016 The btcsuite developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package ffldb + +import ( + "bytes" + "sync" + "time" + + "github.com/btcsuite/btcd/database2/internal/treap" + "github.com/btcsuite/goleveldb/leveldb" + "github.com/btcsuite/goleveldb/leveldb/iterator" + "github.com/btcsuite/goleveldb/leveldb/util" +) + +const ( + // defaultCacheSize is the default size for the database cache. + defaultCacheSize = 50 * 1024 * 1024 // 50 MB + + // defaultFlushSecs is the default number of seconds to use as a + // threshold in between database cache flushes when the cache size has + // not been exceeded. + defaultFlushSecs = 60 // 1 minute + + // sliceOverheadSize is the size a slice takes for overhead. It assumes + // 64-bit pointers so technically it is smaller on 32-bit platforms, but + // overestimating the size in that case is acceptable since it avoids + // the need to import unsafe. + sliceOverheadSize = 24 + + // logEntryFieldsSize is the size the fields of each log entry takes + // excluding the contents of the key and value. It assumes 64-bit + // pointers so technically it is smaller on 32-bit platforms, but + // overestimating the size in that case is acceptable since it avoids + // the need to import unsafe. It consists of 8 bytes for the log entry + // type (due to padding) + 24 bytes for the key + 24 bytes for the + // value. (8 + 24 + 24). + logEntryFieldsSize = 8 + sliceOverheadSize*2 + + // batchThreshold is the number of items used to trigger a write of a + // leveldb batch. It is used to keep the batch from growing too large + // and consequently consuming large amounts of memory. + batchThreshold = 8000 +) + +// ldbCacheIter wraps a treap iterator to provide the additional functionality +// needed to satisfy the leveldb iterator.Iterator interface. +type ldbCacheIter struct { + *treap.Iterator +} + +// Enforce ldbCacheIterator implements the leveldb iterator.Iterator interface. +var _ iterator.Iterator = (*ldbCacheIter)(nil) + +// Error is only provided to satisfy the iterator interface as there are no +// errors for this memory-only structure. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *ldbCacheIter) Error() error { + return nil +} + +// SetReleaser is only provided to satisfy the iterator interface as there is no +// need to override it. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *ldbCacheIter) SetReleaser(releaser util.Releaser) { +} + +// Release is only provided to satisfy the iterator interface. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *ldbCacheIter) Release() { +} + +// newLdbCacheIter creates a new treap iterator for the given slice against the +// pending keys for the passed cache snapshot and returns it wrapped in an +// ldbCacheIter so it can be used as a leveldb iterator. +func newLdbCacheIter(snap *dbCacheSnapshot, slice *util.Range) *ldbCacheIter { + iter := snap.pendingKeys.Iterator(slice.Start, slice.Limit) + return &ldbCacheIter{Iterator: iter} +} + +// dbCacheIterator defines an iterator over the key/value pairs in the database +// cache and underlying database. +type dbCacheIterator struct { + cacheSnapshot *dbCacheSnapshot + dbIter iterator.Iterator + cacheIter iterator.Iterator + currentIter iterator.Iterator + released bool +} + +// Enforce dbCacheIterator implements the leveldb iterator.Iterator interface. +var _ iterator.Iterator = (*dbCacheIterator)(nil) + +// skipPendingUpdates skips any keys at the current database iterator position +// that are being updated by the cache. The forwards flag indicates the +// direction the iterator is moving. +func (iter *dbCacheIterator) skipPendingUpdates(forwards bool) { + for iter.dbIter.Valid() { + var skip bool + key := iter.dbIter.Key() + if iter.cacheSnapshot.pendingRemove.Has(key) { + skip = true + } else if iter.cacheSnapshot.pendingKeys.Has(key) { + skip = true + } + if !skip { + break + } + + if forwards { + iter.dbIter.Next() + } else { + iter.dbIter.Prev() + } + } +} + +// chooseIterator first skips any entries in the database iterator that are +// being updated by the cache and sets the current iterator to the appropriate +// iterator depending on their validity and the order they compare in while taking +// into account the direction flag. When the iterator is being moved forwards +// and both iterators are valid, the iterator with the smaller key is chosen and +// vice versa when the iterator is being moved backwards. +func (iter *dbCacheIterator) chooseIterator(forwards bool) bool { + // Skip any keys at the current database iterator position that are + // being updated by the cache. + iter.skipPendingUpdates(forwards) + + // When both iterators are exhausted, the iterator is exhausted too. + if !iter.dbIter.Valid() && !iter.cacheIter.Valid() { + iter.currentIter = nil + return false + } + + // Choose the database iterator when the cache iterator is exhausted. + if !iter.cacheIter.Valid() { + iter.currentIter = iter.dbIter + return true + } + + // Choose the cache iterator when the database iterator is exhausted. + if !iter.dbIter.Valid() { + iter.currentIter = iter.cacheIter + return true + } + + // Both iterators are valid, so choose the iterator with either the + // smaller or larger key depending on the forwards flag. + compare := bytes.Compare(iter.dbIter.Key(), iter.cacheIter.Key()) + if (forwards && compare > 0) || (!forwards && compare < 0) { + iter.currentIter = iter.cacheIter + } else { + iter.currentIter = iter.dbIter + } + return true +} + +// First positions the iterator at the first key/value pair and returns whether +// or not the pair exists. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) First() bool { + // Seek to the first key in both the database and cache iterators and + // choose the iterator that is both valid and has the smaller key. + iter.dbIter.First() + iter.cacheIter.First() + return iter.chooseIterator(true) +} + +// Last positions the iterator at the last key/value pair and returns whether or +// not the pair exists. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Last() bool { + // Seek to the last key in both the database and cache iterators and + // choose the iterator that is both valid and has the larger key. + iter.dbIter.Last() + iter.cacheIter.Last() + return iter.chooseIterator(false) +} + +// Next moves the iterator one key/value pair forward and returns whether or not +// the pair exists. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Next() bool { + // Nothing to return if cursor is exhausted. + if iter.currentIter == nil { + return false + } + + // Move the current iterator to the next entry and choose the iterator + // that is both valid and has the smaller key. + iter.currentIter.Next() + return iter.chooseIterator(true) +} + +// Prev moves the iterator one key/value pair backward and returns whether or +// not the pair exists. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Prev() bool { + // Nothing to return if cursor is exhausted. + if iter.currentIter == nil { + return false + } + + // Move the current iterator to the previous entry and choose the + // iterator that is both valid and has the larger key. + iter.currentIter.Prev() + return iter.chooseIterator(false) +} + +// Seek positions the iterator at the first key/value pair that is greater than +// or equal to the passed seek key. Returns false if no suitable key was found. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Seek(key []byte) bool { + // Seek to the provided key in both the database and cache iterators + // then choose the iterator that is both valid and has the larger key. + iter.dbIter.Seek(key) + iter.cacheIter.Seek(key) + return iter.chooseIterator(true) +} + +// Valid indicates whether the iterator is positioned at a valid key/value pair. +// It will be considered invalid when the iterator is newly created or exhausted. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Valid() bool { + return iter.currentIter != nil +} + +// Key returns the current key the iterator is pointing to. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Key() []byte { + // Nothing to return if iterator is exhausted. + if iter.currentIter == nil { + return nil + } + + return iter.currentIter.Key() +} + +// Value returns the current value the iterator is pointing to. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Value() []byte { + // Nothing to return if iterator is exhausted. + if iter.currentIter == nil { + return nil + } + + return iter.currentIter.Value() +} + +// SetReleaser is only provided to satisfy the iterator interface as there is no +// need to override it. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) SetReleaser(releaser util.Releaser) { +} + +// Release releases the iterator by removing the underlying treap iterator from +// the list of active iterators against the pending keys treap. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Release() { + if !iter.released { + iter.dbIter.Release() + iter.cacheIter.Release() + iter.currentIter = nil + iter.released = true + } +} + +// Error is only provided to satisfy the iterator interface as there are no +// errors for this memory-only structure. +// +// This is part of the leveldb iterator.Iterator interface implementation. +func (iter *dbCacheIterator) Error() error { + return nil +} + +// dbCacheSnapshot defines a snapshot of the database cache and underlying +// database at a particular point in time. +type dbCacheSnapshot struct { + dbSnapshot *leveldb.Snapshot + pendingKeys *treap.Immutable + pendingRemove *treap.Immutable +} + +// Has returns whether or not the passed key exists. +func (snap *dbCacheSnapshot) Has(key []byte) bool { + // Check the cached entries first. + if snap.pendingRemove.Has(key) { + return false + } + if snap.pendingKeys.Has(key) { + return true + } + + // Consult the database. + hasKey, _ := snap.dbSnapshot.Has(key, nil) + return hasKey +} + +// Get returns the value for the passed key. The function will return nil when +// the key does not exist. +func (snap *dbCacheSnapshot) Get(key []byte) []byte { + // Check the cached entries first. + if snap.pendingRemove.Has(key) { + return nil + } + if value := snap.pendingKeys.Get(key); value != nil { + return value + } + + // Consult the database. + value, err := snap.dbSnapshot.Get(key, nil) + if err != nil { + return nil + } + return value +} + +// Release releases the snapshot. +func (snap *dbCacheSnapshot) Release() { + snap.dbSnapshot.Release() + snap.pendingKeys = nil + snap.pendingRemove = nil +} + +// NewIterator returns a new iterator for the snapshot. The newly returned +// iterator is not pointing to a valid item until a call to one of the methods +// to position it is made. +// +// The slice parameter allows the iterator to be limited to a range of keys. +// The start key is inclusive and the limit key is exclusive. Either or both +// can be nil if the functionality is not desired. +func (snap *dbCacheSnapshot) NewIterator(slice *util.Range) *dbCacheIterator { + return &dbCacheIterator{ + dbIter: snap.dbSnapshot.NewIterator(slice, nil), + cacheIter: newLdbCacheIter(snap, slice), + cacheSnapshot: snap, + } +} + +// txLogEntryType defines the type of a log entry. +type txLogEntryType uint8 + +// The following constants define the allowed log entry types. +const ( + // entryTypeUpdate specifies a key is to be added or updated to a given + // value. + entryTypeUpdate txLogEntryType = iota + + // entryTypeRemove species a key is to be removed. + entryTypeRemove +) + +// txLogEntry defines an entry in the transaction log. It is used when +// replaying transactions during a cache flush. +type txLogEntry struct { + entryType txLogEntryType + key []byte + value []byte // Only set for entryTypUpdate. +} + +// dbCache provides a database cache layer backed by an underlying database. It +// allows a maximum cache size and flush interval to be specified such that the +// cache is flushed to the database when the cache size exceeds the maximum +// configured value or it has been longer than the configured interval since the +// last flush. This effectively provides transaction batching so that callers +// can commit transactions at will without incurring large performance hits due +// to frequent disk syncs. +type dbCache struct { + // ldb is the underlying leveldb DB for metadata. + ldb *leveldb.DB + + // store is used to sync blocks to flat files. + store *blockStore + + // The following fields are related to flushing the cache to persistent + // storage. Note that all flushing is performed in an opportunistic + // fashion. This means that it is only flushed during a transaction or + // when the database cache is closed. + // + // maxSize is the maximum size threshold the cache can grow to before + // it is flushed. + // + // flushInterval is the threshold interval of time that is allowed to + // pass before the cache is flushed. + // + // lastFlush is the time the cache was last flushed. It is used in + // conjunction with the current time and the flush interval. + // + // txLog maintains a log of all modifications made by each committed + // transaction since the last flush. When a flush happens, the data + // in the current flat file being written to is synced and then all + // transaction metadata is replayed into the database. The sync is + // necessary to ensure the metadata is only updated after the associated + // block data has been written to persistent storage so crash recovery + // can be handled properly. + // + // This log approach is used because leveldb consumes a massive amount + // of memory for batches that have large numbers of entries, so the + // final state of the cache can't simply be batched without causing + // memory usage to balloon unreasonably. It also isn't possible to + // batch smaller pieces of the final state since that would result in + // inconsistent metadata should an unexpected failure such as power + // loss occur in the middle of writing the pieces. + // + // NOTE: These flush related fields are protected by the database write + // lock. + maxSize uint64 + flushInterval time.Duration + lastFlush time.Time + txLog [][]txLogEntry + + // The following fields hold the keys that need to be stored or deleted + // from the underlying database once the cache is full, enough time has + // passed, or when the database is shutting down. Note that these are + // stored using immutable treaps to support O(1) MVCC snapshots against + // the cached data. The cacheLock is used to protect concurrent access + // for cache updates and snapshots. + cacheLock sync.RWMutex + cachedKeys *treap.Immutable + cachedRemove *treap.Immutable +} + +// Snapshot returns a snapshot of the database cache and underlying database at +// a particular point in time. +// +// The snapshot must be released after use by calling Release. +func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) { + dbSnapshot, err := c.ldb.GetSnapshot() + if err != nil { + str := "failed to open transaction" + return nil, convertErr(str, err) + } + + // Since the cached keys to be added and removed use an immutable treap, + // a snapshot is simply obtaining the root of the tree under the lock + // which is used to atomically swap the root. + c.cacheLock.RLock() + cacheSnapshot := &dbCacheSnapshot{ + dbSnapshot: dbSnapshot, + pendingKeys: c.cachedKeys, + pendingRemove: c.cachedRemove, + } + c.cacheLock.RUnlock() + return cacheSnapshot, nil +} + +// flush flushes the database cache to persistent storage. This involes syncing +// the block store and replaying all transactions that have been applied to the +// cache to the underlying database. +// +// This function MUST be called with the database write lock held. +func (c *dbCache) flush() error { + c.lastFlush = time.Now() + + // Sync the current write file associated with the block store. This is + // necessary before writing the metadata to prevent the case where the + // metadata contains information about a block which actually hasn't + // been written yet in unexpected shutdown scenarios. + if err := c.store.syncBlocks(); err != nil { + return err + } + + // Nothing to do if there are no transactions to flush. + if len(c.txLog) == 0 { + return nil + } + + // Perform all leveldb updates using batches for atomicity. + batchLen := 0 + batchTxns := 0 + batch := new(leveldb.Batch) + for logTxNum, txLogEntries := range c.txLog { + // Replay the transaction from the log into the current batch. + for _, logEntry := range txLogEntries { + switch logEntry.entryType { + case entryTypeUpdate: + batch.Put(logEntry.key, logEntry.value) + case entryTypeRemove: + batch.Delete(logEntry.key) + } + } + batchTxns++ + + // Write and reset the current batch when the number of items in + // it exceeds the the batch threshold or this is the last + // transaction in the log. + batchLen += len(txLogEntries) + if batchLen > batchThreshold || logTxNum == len(c.txLog)-1 { + if err := c.ldb.Write(batch, nil); err != nil { + return convertErr("failed to write batch", err) + } + batch.Reset() + batchLen = 0 + + // Clear the transactions that were written from the + // log so the memory can be reclaimed. + for i := logTxNum - (batchTxns - 1); i <= logTxNum; i++ { + c.txLog[i] = nil + } + batchTxns = 0 + } + } + c.txLog = c.txLog[:] + + // Clear the cache since it has been flushed. + c.cacheLock.Lock() + c.cachedKeys = treap.NewImmutable() + c.cachedRemove = treap.NewImmutable() + c.cacheLock.Unlock() + + return nil +} + +// needsFlush returns whether or not the database cache needs to be flushed to +// persistent storage based on its current size, whether or not adding all of +// the entries in the passed database transaction would cause it to exceed the +// configured limit, and how much time has elapsed since the last time the cache +// was flushed. +// +// This function MUST be called with the database write lock held. +func (c *dbCache) needsFlush(tx *transaction) bool { + // A flush is needed when more time has elapsed than the configured + // flush interval. + if time.Now().Sub(c.lastFlush) > c.flushInterval { + return true + } + + // Calculate the size of the transaction log. + var txLogSize uint64 + for _, txLogEntries := range c.txLog { + txLogSize += uint64(cap(txLogEntries)) * logEntryFieldsSize + } + txLogSize += uint64(cap(c.txLog)) * sliceOverheadSize + + // A flush is needed when the size of the database cache exceeds the + // specified max cache size. The total calculated size is multiplied by + // 1.5 here to account for additional memory consumption that will be + // needed during the flush as well as old nodes in the cache that are + // referenced by the snapshot used by the transaction. + snap := tx.snapshot + totalSize := txLogSize + snap.pendingKeys.Size() + snap.pendingRemove.Size() + totalSize = uint64(float64(totalSize) * 1.5) + if totalSize > c.maxSize { + return true + } + + return false +} + +// commitTx atomically adds all of the pending keys to add and remove into the +// database cache. When adding the pending keys would cause the size of the +// cache to exceed the max cache size, or the time since the last flush exceeds +// the configured flush interval, the cache will be flushed to the underlying +// persistent database. +// +// This is an atomic operation with respect to the cache in that either all of +// the pending keys to add and remove in the transaction will be applied or none +// of them will. +// +// The database cache itself might be flushed to the underlying persistent +// database even if the transaction fails to apply, but it will only be the +// state of the cache without the transaction applied. +// +// This function MUST be called during a database write transaction which in +// turn implies the database write lock will be held. +func (c *dbCache) commitTx(tx *transaction) error { + // Flush the cache and write directly to the database if a flush is + // needed. + if c.needsFlush(tx) { + if err := c.flush(); err != nil { + return err + } + + // Perform all leveldb update operations using a batch for + // atomicity. + batch := new(leveldb.Batch) + tx.pendingKeys.ForEach(func(k, v []byte) bool { + batch.Put(k, v) + return true + }) + tx.pendingKeys = nil + tx.pendingRemove.ForEach(func(k, v []byte) bool { + batch.Delete(k) + return true + }) + tx.pendingRemove = nil + if err := c.ldb.Write(batch, nil); err != nil { + return convertErr("failed to commit transaction", err) + } + + return nil + } + + // At this point a database flush is not needed, so atomically commit + // the transaction to the cache. + + // Create a slice of transaction log entries large enough to house all + // of the updates and add it to the list of logged transactions to + // replay on flush. + numEntries := tx.pendingKeys.Len() + tx.pendingRemove.Len() + txLogEntries := make([]txLogEntry, numEntries) + c.txLog = append(c.txLog, txLogEntries) + + // Since the cached keys to be added and removed use an immutable treap, + // a snapshot is simply obtaining the root of the tree under the lock + // which is used to atomically swap the root. + c.cacheLock.RLock() + newCachedKeys := c.cachedKeys + newCachedRemove := c.cachedRemove + c.cacheLock.RUnlock() + + // Apply every key to add in the database transaction to the cache. + // Also create a transaction log entry for each one at the same time so + // the database transaction can be replayed during flush. + logEntryNum := 0 + tx.pendingKeys.ForEach(func(k, v []byte) bool { + newCachedRemove = newCachedRemove.Delete(k) + newCachedKeys = newCachedKeys.Put(k, v) + + logEntry := &txLogEntries[logEntryNum] + logEntry.entryType = entryTypeUpdate + logEntry.key = k + logEntry.value = v + logEntryNum++ + return true + }) + tx.pendingKeys = nil + + // Apply every key to remove in the database transaction to the cache. + // Also create a transaction log entry for each one at the same time so + // the database transaction can be replayed during flush. + tx.pendingRemove.ForEach(func(k, v []byte) bool { + newCachedKeys = newCachedKeys.Delete(k) + newCachedRemove = newCachedRemove.Put(k, nil) + + logEntry := &txLogEntries[logEntryNum] + logEntry.entryType = entryTypeRemove + logEntry.key = k + logEntryNum++ + return true + }) + tx.pendingRemove = nil + + // Atomically replace the immutable treaps which hold the cached keys to + // add and delete. + c.cacheLock.Lock() + c.cachedKeys = newCachedKeys + c.cachedRemove = newCachedRemove + c.cacheLock.Unlock() + return nil +} + +// Close cleanly shuts down the database cache by syncing all data and closing +// the underlying leveldb database. +// +// This function MUST be called with the database write lock held. +func (c *dbCache) Close() error { + // Flush any outstanding cached entries to disk. + if err := c.flush(); err != nil { + // Even if there is an error while flushing, attempt to close + // the underlying database. The error is ignored since it would + // mask the flush error. + _ = c.ldb.Close() + return err + } + + // Close the underlying leveldb database. + if err := c.ldb.Close(); err != nil { + str := "failed to close underlying leveldb database" + return convertErr(str, err) + } + + return nil +} + +// newDbCache returns a new database cache instance backed by the provided +// leveldb instance. The cache will be flushed to leveldb when the max size +// exceeds the provided value or it has been longer than the provided interval +// since the last flush. +func newDbCache(ldb *leveldb.DB, store *blockStore, maxSize uint64, flushIntervalSecs uint32) *dbCache { + return &dbCache{ + ldb: ldb, + store: store, + maxSize: maxSize, + flushInterval: time.Second * time.Duration(flushIntervalSecs), + lastFlush: time.Now(), + cachedKeys: treap.NewImmutable(), + cachedRemove: treap.NewImmutable(), + } +} diff --git a/database2/ffldb/ldbtreapiter.go b/database2/ffldb/ldbtreapiter.go index 7415e446..a48da344 100644 --- a/database2/ffldb/ldbtreapiter.go +++ b/database2/ffldb/ldbtreapiter.go @@ -47,7 +47,7 @@ func (iter *ldbTreapIter) Release() { } } -// newLdbTreapIter create a new treap iterator for the given slice against the +// newLdbTreapIter creates a new treap iterator for the given slice against the // pending keys for the passed transaction and returns it wrapped in an // ldbTreapIter so it can be used as a leveldb iterator. It also adds the new // iterator to the list of active iterators for the transaction. diff --git a/database2/ffldb/reconcile.go b/database2/ffldb/reconcile.go index d0382a27..34def648 100644 --- a/database2/ffldb/reconcile.go +++ b/database2/ffldb/reconcile.go @@ -53,7 +53,7 @@ func reconcileDB(pdb *db, create bool) (database.DB, error) { // Perform initial internal bucket and value creation during database // creation. if create { - if err := initDB(pdb.ldb); err != nil { + if err := initDB(pdb.cache.ldb); err != nil { return nil, err } } diff --git a/database2/ffldb/whitebox_test.go b/database2/ffldb/whitebox_test.go index eeeabd56..12936f39 100644 --- a/database2/ffldb/whitebox_test.go +++ b/database2/ffldb/whitebox_test.go @@ -214,14 +214,8 @@ func TestCornerCases(t *testing.T) { } _ = os.RemoveAll(filePath) - // Start a transaction and close the underlying leveldb database out - // from under it. - dbTx, err := idb.Begin(true) - if err != nil { - t.Errorf("Begin: unexpected error: %v", err) - return - } - ldb := idb.(*db).ldb + // Close the underlying leveldb database out from under the database. + ldb := idb.(*db).cache.ldb ldb.Close() // Ensure initilization errors in the underlying database work as @@ -233,15 +227,6 @@ func TestCornerCases(t *testing.T) { return } - // Ensure errors in the underlying database during a transaction commit - // are handled properly. - testName = "Commit: underlying leveldb error" - wantErrCode = database.ErrDbNotOpen - err = dbTx.Commit() - if !checkDbError(t, testName, err, wantErrCode) { - return - } - // Ensure the View handles errors in the underlying leveldb database // properly. testName = "View: underlying leveldb error" @@ -325,16 +310,16 @@ func testWriteFailures(tc *testContext) bool { return false } - // Ensure file sync errors during writeBlock return the expected error. + // Ensure file sync errors during flush return the expected error. store := tc.db.(*db).store - testName := "writeBlock: file sync failure" + testName := "flush: file sync failure" store.writeCursor.Lock() oldFile := store.writeCursor.curFile store.writeCursor.curFile = &lockableFile{ file: &mockFile{forceSyncErr: true, maxSize: -1}, } store.writeCursor.Unlock() - _, err := store.writeBlock([]byte{0x00}) + err := tc.db.(*db).cache.flush() if !checkDbError(tc.t, testName, err, database.ErrDriverSpecific) { return false } diff --git a/database2/interface.go b/database2/interface.go index eed1c2fd..417b9631 100644 --- a/database2/interface.go +++ b/database2/interface.go @@ -392,8 +392,12 @@ type Tx interface { // ****************************************************************** // Commit commits all changes that have been made to the metadata or - // block storage to persistent storage. Calling this function on a - // managed transaction will result in a panic. + // block storage. Depending on the backend implementation this could be + // to a cache that is periodically synced to persistent storage or + // directly to persistent storage. In any case, all transactions which + // are started after the commit finishes will include all changes made + // by this transaction. Calling this function on a managed transaction + // will result in a panic. Commit() error // Rollback undoes all changes that have been made to the metadata or