database: Remove tx log in favor of new ldb txns.

This removes the intermediate transaction log that was introduced as a
part of the database cache as a workaround for leveldb batches causing
massive memory usage spikes in favor of the recently introduced leveldb
transaction interface which no longer has the memory usage issues.

This approach is preferred because it can avoid the extra memory needed
for the transaction log and therefore all of the intermediate states as
well.  As a result, the default cache size has been doubled since it
equals roughly the same amount of overall memory usage and the flush
interval has been raised as well.
This commit is contained in:
Dave Collins 2016-02-11 18:08:53 -06:00
parent c75fea9c94
commit f45db028db

View file

@ -6,6 +6,7 @@ package ffldb
import (
"bytes"
"fmt"
"sync"
"time"
@ -17,32 +18,25 @@ import (
const (
// defaultCacheSize is the default size for the database cache.
defaultCacheSize = 50 * 1024 * 1024 // 50 MB
defaultCacheSize = 100 * 1024 * 1024 // 100 MB
// defaultFlushSecs is the default number of seconds to use as a
// threshold in between database cache flushes when the cache size has
// not been exceeded.
defaultFlushSecs = 60 // 1 minute
defaultFlushSecs = 300 // 5 minutes
// sliceOverheadSize is the size a slice takes for overhead. It assumes
// 64-bit pointers so technically it is smaller on 32-bit platforms, but
// overestimating the size in that case is acceptable since it avoids
// the need to import unsafe.
sliceOverheadSize = 24
// logEntryFieldsSize is the size the fields of each log entry takes
// excluding the contents of the key and value. It assumes 64-bit
// pointers so technically it is smaller on 32-bit platforms, but
// overestimating the size in that case is acceptable since it avoids
// the need to import unsafe. It consists of 8 bytes for the log entry
// type (due to padding) + 24 bytes for the key + 24 bytes for the
// value. (8 + 24 + 24).
logEntryFieldsSize = 8 + sliceOverheadSize*2
// batchThreshold is the number of items used to trigger a write of a
// leveldb batch. It is used to keep the batch from growing too large
// and consequently consuming large amounts of memory.
batchThreshold = 8000
// ldbBatchHeaderSize is the size of a leveldb batch header which
// includes the sequence header and record counter.
//
// ldbRecordIKeySize is the size of the ikey used internally by leveldb
// when appending a record to a batch.
//
// These are used to help preallocate space needed for a batch in one
// allocation instead of letting leveldb itself constantly grow it.
// This results in far less pressure on the GC and consequently helps
// prevent the GC from allocating a lot of extra unneeded space.
ldbBatchHeaderSize = 12
ldbRecordIKeySize = 8
)
// ldbCacheIter wraps a treap iterator to provide the additional functionality
@ -352,27 +346,6 @@ func (snap *dbCacheSnapshot) NewIterator(slice *util.Range) *dbCacheIterator {
}
}
// txLogEntryType defines the type of a log entry.
type txLogEntryType uint8
// The following constants define the allowed log entry types.
const (
// entryTypeUpdate specifies a key is to be added or updated to a given
// value.
entryTypeUpdate txLogEntryType = iota
// entryTypeRemove species a key is to be removed.
entryTypeRemove
)
// txLogEntry defines an entry in the transaction log. It is used when
// replaying transactions during a cache flush.
type txLogEntry struct {
entryType txLogEntryType
key []byte
value []byte // Only set for entryTypUpdate.
}
// dbCache provides a database cache layer backed by an underlying database. It
// allows a maximum cache size and flush interval to be specified such that the
// cache is flushed to the database when the cache size exceeds the maximum
@ -401,28 +374,11 @@ type dbCache struct {
// lastFlush is the time the cache was last flushed. It is used in
// conjunction with the current time and the flush interval.
//
// txLog maintains a log of all modifications made by each committed
// transaction since the last flush. When a flush happens, the data
// in the current flat file being written to is synced and then all
// transaction metadata is replayed into the database. The sync is
// necessary to ensure the metadata is only updated after the associated
// block data has been written to persistent storage so crash recovery
// can be handled properly.
//
// This log approach is used because leveldb consumes a massive amount
// of memory for batches that have large numbers of entries, so the
// final state of the cache can't simply be batched without causing
// memory usage to balloon unreasonably. It also isn't possible to
// batch smaller pieces of the final state since that would result in
// inconsistent metadata should an unexpected failure such as power
// loss occur in the middle of writing the pieces.
//
// NOTE: These flush related fields are protected by the database write
// lock.
maxSize uint64
flushInterval time.Duration
lastFlush time.Time
txLog [][]txLogEntry
// The following fields hold the keys that need to be stored or deleted
// from the underlying database once the cache is full, enough time has
@ -459,6 +415,71 @@ func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) {
return cacheSnapshot, nil
}
// updateDB invokes the passed function in the context of a managed leveldb
// transaction. Any errors returned from the user-supplied function will cause
// the transaction to be rolled back and are returned from this function.
// Otherwise, the transaction is committed when the user-supplied function
// returns a nil error.
func (c *dbCache) updateDB(fn func(ldbTx *leveldb.Transaction) error) error {
// Start a leveldb transaction.
ldbTx, err := c.ldb.OpenTransaction()
if err != nil {
return convertErr("failed to open ldb transaction", err)
}
if err := fn(ldbTx); err != nil {
ldbTx.Discard()
return err
}
// Commit the leveldb transaction and convert any errors as needed.
if err := ldbTx.Commit(); err != nil {
return convertErr("failed to commit leveldb transaction", err)
}
return nil
}
// TreapForEacher is an interface which allows iteration of a treap in ascending
// order using a user-supplied callback for each key/value pair. It mainly
// exists so both mutable and immutable treaps can be atomically committed to
// the database with the same function.
type TreapForEacher interface {
ForEach(func(k, v []byte) bool)
}
// commitTreaps atomically commits all of the passed pending add/update/remove
// updates to the underlying database.
func (c *dbCache) commitTreaps(pendingKeys, pendingRemove TreapForEacher) error {
// Perform all leveldb updates using an atomic transaction.
return c.updateDB(func(ldbTx *leveldb.Transaction) error {
var innerErr error
pendingKeys.ForEach(func(k, v []byte) bool {
if dbErr := ldbTx.Put(k, v, nil); dbErr != nil {
str := fmt.Sprintf("failed to put key %q to "+
"ldb transaction", k)
innerErr = convertErr(str, dbErr)
return false
}
return true
})
if innerErr != nil {
return innerErr
}
pendingRemove.ForEach(func(k, v []byte) bool {
if dbErr := ldbTx.Delete(k, nil); dbErr != nil {
str := fmt.Sprintf("failed to delete "+
"key %q from ldb transaction",
k)
innerErr = convertErr(str, dbErr)
return false
}
return true
})
return innerErr
})
}
// flush flushes the database cache to persistent storage. This involes syncing
// the block store and replaying all transactions that have been applied to the
// cache to the underlying database.
@ -475,47 +496,23 @@ func (c *dbCache) flush() error {
return err
}
// Nothing to do if there are no transactions to flush.
if len(c.txLog) == 0 {
// Since the cached keys to be added and removed use an immutable treap,
// a snapshot is simply obtaining the root of the tree under the lock
// which is used to atomically swap the root.
c.cacheLock.RLock()
cachedKeys := c.cachedKeys
cachedRemove := c.cachedRemove
c.cacheLock.RUnlock()
// Nothing to do if there is no data to flush.
if cachedKeys.Len() == 0 && cachedRemove.Len() == 0 {
return nil
}
// Perform all leveldb updates using batches for atomicity.
batchLen := 0
batchTxns := 0
batch := new(leveldb.Batch)
for logTxNum, txLogEntries := range c.txLog {
// Replay the transaction from the log into the current batch.
for _, logEntry := range txLogEntries {
switch logEntry.entryType {
case entryTypeUpdate:
batch.Put(logEntry.key, logEntry.value)
case entryTypeRemove:
batch.Delete(logEntry.key)
}
}
batchTxns++
// Write and reset the current batch when the number of items in
// it exceeds the the batch threshold or this is the last
// transaction in the log.
batchLen += len(txLogEntries)
if batchLen > batchThreshold || logTxNum == len(c.txLog)-1 {
if err := c.ldb.Write(batch, nil); err != nil {
return convertErr("failed to write batch", err)
}
batch.Reset()
batchLen = 0
// Clear the transactions that were written from the
// log so the memory can be reclaimed.
for i := logTxNum - (batchTxns - 1); i <= logTxNum; i++ {
c.txLog[i] = nil
}
batchTxns = 0
}
// Perform all leveldb updates using an atomic transaction.
if err := c.commitTreaps(cachedKeys, cachedRemove); err != nil {
return err
}
c.txLog = c.txLog[:]
// Clear the cache since it has been flushed.
c.cacheLock.Lock()
@ -540,20 +537,13 @@ func (c *dbCache) needsFlush(tx *transaction) bool {
return true
}
// Calculate the size of the transaction log.
var txLogSize uint64
for _, txLogEntries := range c.txLog {
txLogSize += uint64(cap(txLogEntries)) * logEntryFieldsSize
}
txLogSize += uint64(cap(c.txLog)) * sliceOverheadSize
// A flush is needed when the size of the database cache exceeds the
// specified max cache size. The total calculated size is multiplied by
// 1.5 here to account for additional memory consumption that will be
// needed during the flush as well as old nodes in the cache that are
// referenced by the snapshot used by the transaction.
snap := tx.snapshot
totalSize := txLogSize + snap.pendingKeys.Size() + snap.pendingRemove.Size()
totalSize := snap.pendingKeys.Size() + snap.pendingRemove.Size()
totalSize = uint64(float64(totalSize) * 1.5)
if totalSize > c.maxSize {
return true
@ -579,43 +569,28 @@ func (c *dbCache) needsFlush(tx *transaction) bool {
// This function MUST be called during a database write transaction which in
// turn implies the database write lock will be held.
func (c *dbCache) commitTx(tx *transaction) error {
// Flush the cache and write directly to the database if a flush is
// needed.
// Flush the cache and write the current transaction directly to the
// database if a flush is needed.
if c.needsFlush(tx) {
if err := c.flush(); err != nil {
return err
}
// Perform all leveldb update operations using a batch for
// atomicity.
batch := new(leveldb.Batch)
tx.pendingKeys.ForEach(func(k, v []byte) bool {
batch.Put(k, v)
return true
})
tx.pendingKeys = nil
tx.pendingRemove.ForEach(func(k, v []byte) bool {
batch.Delete(k)
return true
})
tx.pendingRemove = nil
if err := c.ldb.Write(batch, nil); err != nil {
return convertErr("failed to commit transaction", err)
// Perform all leveldb updates using an atomic transaction.
err := c.commitTreaps(tx.pendingKeys, tx.pendingRemove)
if err != nil {
return err
}
// Clear the transaction entries since they have been committed.
tx.pendingKeys = nil
tx.pendingRemove = nil
return nil
}
// At this point a database flush is not needed, so atomically commit
// the transaction to the cache.
// Create a slice of transaction log entries large enough to house all
// of the updates and add it to the list of logged transactions to
// replay on flush.
numEntries := tx.pendingKeys.Len() + tx.pendingRemove.Len()
txLogEntries := make([]txLogEntry, numEntries)
c.txLog = append(c.txLog, txLogEntries)
// Since the cached keys to be added and removed use an immutable treap,
// a snapshot is simply obtaining the root of the tree under the lock
// which is used to atomically swap the root.
@ -625,33 +600,17 @@ func (c *dbCache) commitTx(tx *transaction) error {
c.cacheLock.RUnlock()
// Apply every key to add in the database transaction to the cache.
// Also create a transaction log entry for each one at the same time so
// the database transaction can be replayed during flush.
logEntryNum := 0
tx.pendingKeys.ForEach(func(k, v []byte) bool {
newCachedRemove = newCachedRemove.Delete(k)
newCachedKeys = newCachedKeys.Put(k, v)
logEntry := &txLogEntries[logEntryNum]
logEntry.entryType = entryTypeUpdate
logEntry.key = k
logEntry.value = v
logEntryNum++
return true
})
tx.pendingKeys = nil
// Apply every key to remove in the database transaction to the cache.
// Also create a transaction log entry for each one at the same time so
// the database transaction can be replayed during flush.
tx.pendingRemove.ForEach(func(k, v []byte) bool {
newCachedKeys = newCachedKeys.Delete(k)
newCachedRemove = newCachedRemove.Put(k, nil)
logEntry := &txLogEntries[logEntryNum]
logEntry.entryType = entryTypeRemove
logEntry.key = k
logEntryNum++
return true
})
tx.pendingRemove = nil