lbcd/database2/ffldb/dbcache.go

706 lines
24 KiB
Go
Raw Normal View History

// Copyright (c) 2015-2016 The btcsuite developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ffldb
import (
"bytes"
"sync"
"time"
"github.com/btcsuite/btcd/database2/internal/treap"
"github.com/btcsuite/goleveldb/leveldb"
"github.com/btcsuite/goleveldb/leveldb/iterator"
"github.com/btcsuite/goleveldb/leveldb/util"
)
const (
// defaultCacheSize is the default size for the database cache.
defaultCacheSize = 50 * 1024 * 1024 // 50 MB
// defaultFlushSecs is the default number of seconds to use as a
// threshold in between database cache flushes when the cache size has
// not been exceeded.
defaultFlushSecs = 60 // 1 minute
// sliceOverheadSize is the size a slice takes for overhead. It assumes
// 64-bit pointers so technically it is smaller on 32-bit platforms, but
// overestimating the size in that case is acceptable since it avoids
// the need to import unsafe.
sliceOverheadSize = 24
// logEntryFieldsSize is the size the fields of each log entry takes
// excluding the contents of the key and value. It assumes 64-bit
// pointers so technically it is smaller on 32-bit platforms, but
// overestimating the size in that case is acceptable since it avoids
// the need to import unsafe. It consists of 8 bytes for the log entry
// type (due to padding) + 24 bytes for the key + 24 bytes for the
// value. (8 + 24 + 24).
logEntryFieldsSize = 8 + sliceOverheadSize*2
// batchThreshold is the number of items used to trigger a write of a
// leveldb batch. It is used to keep the batch from growing too large
// and consequently consuming large amounts of memory.
batchThreshold = 8000
)
// ldbCacheIter wraps a treap iterator to provide the additional functionality
// needed to satisfy the leveldb iterator.Iterator interface.
type ldbCacheIter struct {
*treap.Iterator
}
// Enforce ldbCacheIterator implements the leveldb iterator.Iterator interface.
var _ iterator.Iterator = (*ldbCacheIter)(nil)
// Error is only provided to satisfy the iterator interface as there are no
// errors for this memory-only structure.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *ldbCacheIter) Error() error {
return nil
}
// SetReleaser is only provided to satisfy the iterator interface as there is no
// need to override it.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *ldbCacheIter) SetReleaser(releaser util.Releaser) {
}
// Release is only provided to satisfy the iterator interface.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *ldbCacheIter) Release() {
}
// newLdbCacheIter creates a new treap iterator for the given slice against the
// pending keys for the passed cache snapshot and returns it wrapped in an
// ldbCacheIter so it can be used as a leveldb iterator.
func newLdbCacheIter(snap *dbCacheSnapshot, slice *util.Range) *ldbCacheIter {
iter := snap.pendingKeys.Iterator(slice.Start, slice.Limit)
return &ldbCacheIter{Iterator: iter}
}
// dbCacheIterator defines an iterator over the key/value pairs in the database
// cache and underlying database.
type dbCacheIterator struct {
cacheSnapshot *dbCacheSnapshot
dbIter iterator.Iterator
cacheIter iterator.Iterator
currentIter iterator.Iterator
released bool
}
// Enforce dbCacheIterator implements the leveldb iterator.Iterator interface.
var _ iterator.Iterator = (*dbCacheIterator)(nil)
// skipPendingUpdates skips any keys at the current database iterator position
// that are being updated by the cache. The forwards flag indicates the
// direction the iterator is moving.
func (iter *dbCacheIterator) skipPendingUpdates(forwards bool) {
for iter.dbIter.Valid() {
var skip bool
key := iter.dbIter.Key()
if iter.cacheSnapshot.pendingRemove.Has(key) {
skip = true
} else if iter.cacheSnapshot.pendingKeys.Has(key) {
skip = true
}
if !skip {
break
}
if forwards {
iter.dbIter.Next()
} else {
iter.dbIter.Prev()
}
}
}
// chooseIterator first skips any entries in the database iterator that are
// being updated by the cache and sets the current iterator to the appropriate
// iterator depending on their validity and the order they compare in while taking
// into account the direction flag. When the iterator is being moved forwards
// and both iterators are valid, the iterator with the smaller key is chosen and
// vice versa when the iterator is being moved backwards.
func (iter *dbCacheIterator) chooseIterator(forwards bool) bool {
// Skip any keys at the current database iterator position that are
// being updated by the cache.
iter.skipPendingUpdates(forwards)
// When both iterators are exhausted, the iterator is exhausted too.
if !iter.dbIter.Valid() && !iter.cacheIter.Valid() {
iter.currentIter = nil
return false
}
// Choose the database iterator when the cache iterator is exhausted.
if !iter.cacheIter.Valid() {
iter.currentIter = iter.dbIter
return true
}
// Choose the cache iterator when the database iterator is exhausted.
if !iter.dbIter.Valid() {
iter.currentIter = iter.cacheIter
return true
}
// Both iterators are valid, so choose the iterator with either the
// smaller or larger key depending on the forwards flag.
compare := bytes.Compare(iter.dbIter.Key(), iter.cacheIter.Key())
if (forwards && compare > 0) || (!forwards && compare < 0) {
iter.currentIter = iter.cacheIter
} else {
iter.currentIter = iter.dbIter
}
return true
}
// First positions the iterator at the first key/value pair and returns whether
// or not the pair exists.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) First() bool {
// Seek to the first key in both the database and cache iterators and
// choose the iterator that is both valid and has the smaller key.
iter.dbIter.First()
iter.cacheIter.First()
return iter.chooseIterator(true)
}
// Last positions the iterator at the last key/value pair and returns whether or
// not the pair exists.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Last() bool {
// Seek to the last key in both the database and cache iterators and
// choose the iterator that is both valid and has the larger key.
iter.dbIter.Last()
iter.cacheIter.Last()
return iter.chooseIterator(false)
}
// Next moves the iterator one key/value pair forward and returns whether or not
// the pair exists.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Next() bool {
// Nothing to return if cursor is exhausted.
if iter.currentIter == nil {
return false
}
// Move the current iterator to the next entry and choose the iterator
// that is both valid and has the smaller key.
iter.currentIter.Next()
return iter.chooseIterator(true)
}
// Prev moves the iterator one key/value pair backward and returns whether or
// not the pair exists.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Prev() bool {
// Nothing to return if cursor is exhausted.
if iter.currentIter == nil {
return false
}
// Move the current iterator to the previous entry and choose the
// iterator that is both valid and has the larger key.
iter.currentIter.Prev()
return iter.chooseIterator(false)
}
// Seek positions the iterator at the first key/value pair that is greater than
// or equal to the passed seek key. Returns false if no suitable key was found.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Seek(key []byte) bool {
// Seek to the provided key in both the database and cache iterators
// then choose the iterator that is both valid and has the larger key.
iter.dbIter.Seek(key)
iter.cacheIter.Seek(key)
return iter.chooseIterator(true)
}
// Valid indicates whether the iterator is positioned at a valid key/value pair.
// It will be considered invalid when the iterator is newly created or exhausted.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Valid() bool {
return iter.currentIter != nil
}
// Key returns the current key the iterator is pointing to.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Key() []byte {
// Nothing to return if iterator is exhausted.
if iter.currentIter == nil {
return nil
}
return iter.currentIter.Key()
}
// Value returns the current value the iterator is pointing to.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Value() []byte {
// Nothing to return if iterator is exhausted.
if iter.currentIter == nil {
return nil
}
return iter.currentIter.Value()
}
// SetReleaser is only provided to satisfy the iterator interface as there is no
// need to override it.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) SetReleaser(releaser util.Releaser) {
}
// Release releases the iterator by removing the underlying treap iterator from
// the list of active iterators against the pending keys treap.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Release() {
if !iter.released {
iter.dbIter.Release()
iter.cacheIter.Release()
iter.currentIter = nil
iter.released = true
}
}
// Error is only provided to satisfy the iterator interface as there are no
// errors for this memory-only structure.
//
// This is part of the leveldb iterator.Iterator interface implementation.
func (iter *dbCacheIterator) Error() error {
return nil
}
// dbCacheSnapshot defines a snapshot of the database cache and underlying
// database at a particular point in time.
type dbCacheSnapshot struct {
dbSnapshot *leveldb.Snapshot
pendingKeys *treap.Immutable
pendingRemove *treap.Immutable
}
// Has returns whether or not the passed key exists.
func (snap *dbCacheSnapshot) Has(key []byte) bool {
// Check the cached entries first.
if snap.pendingRemove.Has(key) {
return false
}
if snap.pendingKeys.Has(key) {
return true
}
// Consult the database.
hasKey, _ := snap.dbSnapshot.Has(key, nil)
return hasKey
}
// Get returns the value for the passed key. The function will return nil when
// the key does not exist.
func (snap *dbCacheSnapshot) Get(key []byte) []byte {
// Check the cached entries first.
if snap.pendingRemove.Has(key) {
return nil
}
if value := snap.pendingKeys.Get(key); value != nil {
return value
}
// Consult the database.
value, err := snap.dbSnapshot.Get(key, nil)
if err != nil {
return nil
}
return value
}
// Release releases the snapshot.
func (snap *dbCacheSnapshot) Release() {
snap.dbSnapshot.Release()
snap.pendingKeys = nil
snap.pendingRemove = nil
}
// NewIterator returns a new iterator for the snapshot. The newly returned
// iterator is not pointing to a valid item until a call to one of the methods
// to position it is made.
//
// The slice parameter allows the iterator to be limited to a range of keys.
// The start key is inclusive and the limit key is exclusive. Either or both
// can be nil if the functionality is not desired.
func (snap *dbCacheSnapshot) NewIterator(slice *util.Range) *dbCacheIterator {
return &dbCacheIterator{
dbIter: snap.dbSnapshot.NewIterator(slice, nil),
cacheIter: newLdbCacheIter(snap, slice),
cacheSnapshot: snap,
}
}
// txLogEntryType defines the type of a log entry.
type txLogEntryType uint8
// The following constants define the allowed log entry types.
const (
// entryTypeUpdate specifies a key is to be added or updated to a given
// value.
entryTypeUpdate txLogEntryType = iota
// entryTypeRemove species a key is to be removed.
entryTypeRemove
)
// txLogEntry defines an entry in the transaction log. It is used when
// replaying transactions during a cache flush.
type txLogEntry struct {
entryType txLogEntryType
key []byte
value []byte // Only set for entryTypUpdate.
}
// dbCache provides a database cache layer backed by an underlying database. It
// allows a maximum cache size and flush interval to be specified such that the
// cache is flushed to the database when the cache size exceeds the maximum
// configured value or it has been longer than the configured interval since the
// last flush. This effectively provides transaction batching so that callers
// can commit transactions at will without incurring large performance hits due
// to frequent disk syncs.
type dbCache struct {
// ldb is the underlying leveldb DB for metadata.
ldb *leveldb.DB
// store is used to sync blocks to flat files.
store *blockStore
// The following fields are related to flushing the cache to persistent
// storage. Note that all flushing is performed in an opportunistic
// fashion. This means that it is only flushed during a transaction or
// when the database cache is closed.
//
// maxSize is the maximum size threshold the cache can grow to before
// it is flushed.
//
// flushInterval is the threshold interval of time that is allowed to
// pass before the cache is flushed.
//
// lastFlush is the time the cache was last flushed. It is used in
// conjunction with the current time and the flush interval.
//
// txLog maintains a log of all modifications made by each committed
// transaction since the last flush. When a flush happens, the data
// in the current flat file being written to is synced and then all
// transaction metadata is replayed into the database. The sync is
// necessary to ensure the metadata is only updated after the associated
// block data has been written to persistent storage so crash recovery
// can be handled properly.
//
// This log approach is used because leveldb consumes a massive amount
// of memory for batches that have large numbers of entries, so the
// final state of the cache can't simply be batched without causing
// memory usage to balloon unreasonably. It also isn't possible to
// batch smaller pieces of the final state since that would result in
// inconsistent metadata should an unexpected failure such as power
// loss occur in the middle of writing the pieces.
//
// NOTE: These flush related fields are protected by the database write
// lock.
maxSize uint64
flushInterval time.Duration
lastFlush time.Time
txLog [][]txLogEntry
// The following fields hold the keys that need to be stored or deleted
// from the underlying database once the cache is full, enough time has
// passed, or when the database is shutting down. Note that these are
// stored using immutable treaps to support O(1) MVCC snapshots against
// the cached data. The cacheLock is used to protect concurrent access
// for cache updates and snapshots.
cacheLock sync.RWMutex
cachedKeys *treap.Immutable
cachedRemove *treap.Immutable
}
// Snapshot returns a snapshot of the database cache and underlying database at
// a particular point in time.
//
// The snapshot must be released after use by calling Release.
func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) {
dbSnapshot, err := c.ldb.GetSnapshot()
if err != nil {
str := "failed to open transaction"
return nil, convertErr(str, err)
}
// Since the cached keys to be added and removed use an immutable treap,
// a snapshot is simply obtaining the root of the tree under the lock
// which is used to atomically swap the root.
c.cacheLock.RLock()
cacheSnapshot := &dbCacheSnapshot{
dbSnapshot: dbSnapshot,
pendingKeys: c.cachedKeys,
pendingRemove: c.cachedRemove,
}
c.cacheLock.RUnlock()
return cacheSnapshot, nil
}
// flush flushes the database cache to persistent storage. This involes syncing
// the block store and replaying all transactions that have been applied to the
// cache to the underlying database.
//
// This function MUST be called with the database write lock held.
func (c *dbCache) flush() error {
c.lastFlush = time.Now()
// Sync the current write file associated with the block store. This is
// necessary before writing the metadata to prevent the case where the
// metadata contains information about a block which actually hasn't
// been written yet in unexpected shutdown scenarios.
if err := c.store.syncBlocks(); err != nil {
return err
}
// Nothing to do if there are no transactions to flush.
if len(c.txLog) == 0 {
return nil
}
// Perform all leveldb updates using batches for atomicity.
batchLen := 0
batchTxns := 0
batch := new(leveldb.Batch)
for logTxNum, txLogEntries := range c.txLog {
// Replay the transaction from the log into the current batch.
for _, logEntry := range txLogEntries {
switch logEntry.entryType {
case entryTypeUpdate:
batch.Put(logEntry.key, logEntry.value)
case entryTypeRemove:
batch.Delete(logEntry.key)
}
}
batchTxns++
// Write and reset the current batch when the number of items in
// it exceeds the the batch threshold or this is the last
// transaction in the log.
batchLen += len(txLogEntries)
if batchLen > batchThreshold || logTxNum == len(c.txLog)-1 {
if err := c.ldb.Write(batch, nil); err != nil {
return convertErr("failed to write batch", err)
}
batch.Reset()
batchLen = 0
// Clear the transactions that were written from the
// log so the memory can be reclaimed.
for i := logTxNum - (batchTxns - 1); i <= logTxNum; i++ {
c.txLog[i] = nil
}
batchTxns = 0
}
}
c.txLog = c.txLog[:]
// Clear the cache since it has been flushed.
c.cacheLock.Lock()
c.cachedKeys = treap.NewImmutable()
c.cachedRemove = treap.NewImmutable()
c.cacheLock.Unlock()
return nil
}
// needsFlush returns whether or not the database cache needs to be flushed to
// persistent storage based on its current size, whether or not adding all of
// the entries in the passed database transaction would cause it to exceed the
// configured limit, and how much time has elapsed since the last time the cache
// was flushed.
//
// This function MUST be called with the database write lock held.
func (c *dbCache) needsFlush(tx *transaction) bool {
// A flush is needed when more time has elapsed than the configured
// flush interval.
if time.Now().Sub(c.lastFlush) > c.flushInterval {
return true
}
// Calculate the size of the transaction log.
var txLogSize uint64
for _, txLogEntries := range c.txLog {
txLogSize += uint64(cap(txLogEntries)) * logEntryFieldsSize
}
txLogSize += uint64(cap(c.txLog)) * sliceOverheadSize
// A flush is needed when the size of the database cache exceeds the
// specified max cache size. The total calculated size is multiplied by
// 1.5 here to account for additional memory consumption that will be
// needed during the flush as well as old nodes in the cache that are
// referenced by the snapshot used by the transaction.
snap := tx.snapshot
totalSize := txLogSize + snap.pendingKeys.Size() + snap.pendingRemove.Size()
totalSize = uint64(float64(totalSize) * 1.5)
if totalSize > c.maxSize {
return true
}
return false
}
// commitTx atomically adds all of the pending keys to add and remove into the
// database cache. When adding the pending keys would cause the size of the
// cache to exceed the max cache size, or the time since the last flush exceeds
// the configured flush interval, the cache will be flushed to the underlying
// persistent database.
//
// This is an atomic operation with respect to the cache in that either all of
// the pending keys to add and remove in the transaction will be applied or none
// of them will.
//
// The database cache itself might be flushed to the underlying persistent
// database even if the transaction fails to apply, but it will only be the
// state of the cache without the transaction applied.
//
// This function MUST be called during a database write transaction which in
// turn implies the database write lock will be held.
func (c *dbCache) commitTx(tx *transaction) error {
// Flush the cache and write directly to the database if a flush is
// needed.
if c.needsFlush(tx) {
if err := c.flush(); err != nil {
return err
}
// Perform all leveldb update operations using a batch for
// atomicity.
batch := new(leveldb.Batch)
tx.pendingKeys.ForEach(func(k, v []byte) bool {
batch.Put(k, v)
return true
})
tx.pendingKeys = nil
tx.pendingRemove.ForEach(func(k, v []byte) bool {
batch.Delete(k)
return true
})
tx.pendingRemove = nil
if err := c.ldb.Write(batch, nil); err != nil {
return convertErr("failed to commit transaction", err)
}
return nil
}
// At this point a database flush is not needed, so atomically commit
// the transaction to the cache.
// Create a slice of transaction log entries large enough to house all
// of the updates and add it to the list of logged transactions to
// replay on flush.
numEntries := tx.pendingKeys.Len() + tx.pendingRemove.Len()
txLogEntries := make([]txLogEntry, numEntries)
c.txLog = append(c.txLog, txLogEntries)
// Since the cached keys to be added and removed use an immutable treap,
// a snapshot is simply obtaining the root of the tree under the lock
// which is used to atomically swap the root.
c.cacheLock.RLock()
newCachedKeys := c.cachedKeys
newCachedRemove := c.cachedRemove
c.cacheLock.RUnlock()
// Apply every key to add in the database transaction to the cache.
// Also create a transaction log entry for each one at the same time so
// the database transaction can be replayed during flush.
logEntryNum := 0
tx.pendingKeys.ForEach(func(k, v []byte) bool {
newCachedRemove = newCachedRemove.Delete(k)
newCachedKeys = newCachedKeys.Put(k, v)
logEntry := &txLogEntries[logEntryNum]
logEntry.entryType = entryTypeUpdate
logEntry.key = k
logEntry.value = v
logEntryNum++
return true
})
tx.pendingKeys = nil
// Apply every key to remove in the database transaction to the cache.
// Also create a transaction log entry for each one at the same time so
// the database transaction can be replayed during flush.
tx.pendingRemove.ForEach(func(k, v []byte) bool {
newCachedKeys = newCachedKeys.Delete(k)
newCachedRemove = newCachedRemove.Put(k, nil)
logEntry := &txLogEntries[logEntryNum]
logEntry.entryType = entryTypeRemove
logEntry.key = k
logEntryNum++
return true
})
tx.pendingRemove = nil
// Atomically replace the immutable treaps which hold the cached keys to
// add and delete.
c.cacheLock.Lock()
c.cachedKeys = newCachedKeys
c.cachedRemove = newCachedRemove
c.cacheLock.Unlock()
return nil
}
// Close cleanly shuts down the database cache by syncing all data and closing
// the underlying leveldb database.
//
// This function MUST be called with the database write lock held.
func (c *dbCache) Close() error {
// Flush any outstanding cached entries to disk.
if err := c.flush(); err != nil {
// Even if there is an error while flushing, attempt to close
// the underlying database. The error is ignored since it would
// mask the flush error.
_ = c.ldb.Close()
return err
}
// Close the underlying leveldb database.
if err := c.ldb.Close(); err != nil {
str := "failed to close underlying leveldb database"
return convertErr(str, err)
}
return nil
}
// newDbCache returns a new database cache instance backed by the provided
// leveldb instance. The cache will be flushed to leveldb when the max size
// exceeds the provided value or it has been longer than the provided interval
// since the last flush.
func newDbCache(ldb *leveldb.DB, store *blockStore, maxSize uint64, flushIntervalSecs uint32) *dbCache {
return &dbCache{
ldb: ldb,
store: store,
maxSize: maxSize,
flushInterval: time.Second * time.Duration(flushIntervalSecs),
lastFlush: time.Now(),
cachedKeys: treap.NewImmutable(),
cachedRemove: treap.NewImmutable(),
}
}