diff --git a/database2/ffldb/dbcache.go b/database2/ffldb/dbcache.go index d3bc1992..4d0a9588 100644 --- a/database2/ffldb/dbcache.go +++ b/database2/ffldb/dbcache.go @@ -6,6 +6,7 @@ package ffldb import ( "bytes" + "fmt" "sync" "time" @@ -17,32 +18,25 @@ import ( const ( // defaultCacheSize is the default size for the database cache. - defaultCacheSize = 50 * 1024 * 1024 // 50 MB + defaultCacheSize = 100 * 1024 * 1024 // 100 MB // defaultFlushSecs is the default number of seconds to use as a // threshold in between database cache flushes when the cache size has // not been exceeded. - defaultFlushSecs = 60 // 1 minute + defaultFlushSecs = 300 // 5 minutes - // sliceOverheadSize is the size a slice takes for overhead. It assumes - // 64-bit pointers so technically it is smaller on 32-bit platforms, but - // overestimating the size in that case is acceptable since it avoids - // the need to import unsafe. - sliceOverheadSize = 24 - - // logEntryFieldsSize is the size the fields of each log entry takes - // excluding the contents of the key and value. It assumes 64-bit - // pointers so technically it is smaller on 32-bit platforms, but - // overestimating the size in that case is acceptable since it avoids - // the need to import unsafe. It consists of 8 bytes for the log entry - // type (due to padding) + 24 bytes for the key + 24 bytes for the - // value. (8 + 24 + 24). - logEntryFieldsSize = 8 + sliceOverheadSize*2 - - // batchThreshold is the number of items used to trigger a write of a - // leveldb batch. It is used to keep the batch from growing too large - // and consequently consuming large amounts of memory. - batchThreshold = 8000 + // ldbBatchHeaderSize is the size of a leveldb batch header which + // includes the sequence header and record counter. + // + // ldbRecordIKeySize is the size of the ikey used internally by leveldb + // when appending a record to a batch. + // + // These are used to help preallocate space needed for a batch in one + // allocation instead of letting leveldb itself constantly grow it. + // This results in far less pressure on the GC and consequently helps + // prevent the GC from allocating a lot of extra unneeded space. + ldbBatchHeaderSize = 12 + ldbRecordIKeySize = 8 ) // ldbCacheIter wraps a treap iterator to provide the additional functionality @@ -352,27 +346,6 @@ func (snap *dbCacheSnapshot) NewIterator(slice *util.Range) *dbCacheIterator { } } -// txLogEntryType defines the type of a log entry. -type txLogEntryType uint8 - -// The following constants define the allowed log entry types. -const ( - // entryTypeUpdate specifies a key is to be added or updated to a given - // value. - entryTypeUpdate txLogEntryType = iota - - // entryTypeRemove species a key is to be removed. - entryTypeRemove -) - -// txLogEntry defines an entry in the transaction log. It is used when -// replaying transactions during a cache flush. -type txLogEntry struct { - entryType txLogEntryType - key []byte - value []byte // Only set for entryTypUpdate. -} - // dbCache provides a database cache layer backed by an underlying database. It // allows a maximum cache size and flush interval to be specified such that the // cache is flushed to the database when the cache size exceeds the maximum @@ -401,28 +374,11 @@ type dbCache struct { // lastFlush is the time the cache was last flushed. It is used in // conjunction with the current time and the flush interval. // - // txLog maintains a log of all modifications made by each committed - // transaction since the last flush. When a flush happens, the data - // in the current flat file being written to is synced and then all - // transaction metadata is replayed into the database. The sync is - // necessary to ensure the metadata is only updated after the associated - // block data has been written to persistent storage so crash recovery - // can be handled properly. - // - // This log approach is used because leveldb consumes a massive amount - // of memory for batches that have large numbers of entries, so the - // final state of the cache can't simply be batched without causing - // memory usage to balloon unreasonably. It also isn't possible to - // batch smaller pieces of the final state since that would result in - // inconsistent metadata should an unexpected failure such as power - // loss occur in the middle of writing the pieces. - // // NOTE: These flush related fields are protected by the database write // lock. maxSize uint64 flushInterval time.Duration lastFlush time.Time - txLog [][]txLogEntry // The following fields hold the keys that need to be stored or deleted // from the underlying database once the cache is full, enough time has @@ -459,6 +415,71 @@ func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) { return cacheSnapshot, nil } +// updateDB invokes the passed function in the context of a managed leveldb +// transaction. Any errors returned from the user-supplied function will cause +// the transaction to be rolled back and are returned from this function. +// Otherwise, the transaction is committed when the user-supplied function +// returns a nil error. +func (c *dbCache) updateDB(fn func(ldbTx *leveldb.Transaction) error) error { + // Start a leveldb transaction. + ldbTx, err := c.ldb.OpenTransaction() + if err != nil { + return convertErr("failed to open ldb transaction", err) + } + + if err := fn(ldbTx); err != nil { + ldbTx.Discard() + return err + } + + // Commit the leveldb transaction and convert any errors as needed. + if err := ldbTx.Commit(); err != nil { + return convertErr("failed to commit leveldb transaction", err) + } + return nil +} + +// TreapForEacher is an interface which allows iteration of a treap in ascending +// order using a user-supplied callback for each key/value pair. It mainly +// exists so both mutable and immutable treaps can be atomically committed to +// the database with the same function. +type TreapForEacher interface { + ForEach(func(k, v []byte) bool) +} + +// commitTreaps atomically commits all of the passed pending add/update/remove +// updates to the underlying database. +func (c *dbCache) commitTreaps(pendingKeys, pendingRemove TreapForEacher) error { + // Perform all leveldb updates using an atomic transaction. + return c.updateDB(func(ldbTx *leveldb.Transaction) error { + var innerErr error + pendingKeys.ForEach(func(k, v []byte) bool { + if dbErr := ldbTx.Put(k, v, nil); dbErr != nil { + str := fmt.Sprintf("failed to put key %q to "+ + "ldb transaction", k) + innerErr = convertErr(str, dbErr) + return false + } + return true + }) + if innerErr != nil { + return innerErr + } + + pendingRemove.ForEach(func(k, v []byte) bool { + if dbErr := ldbTx.Delete(k, nil); dbErr != nil { + str := fmt.Sprintf("failed to delete "+ + "key %q from ldb transaction", + k) + innerErr = convertErr(str, dbErr) + return false + } + return true + }) + return innerErr + }) +} + // flush flushes the database cache to persistent storage. This involes syncing // the block store and replaying all transactions that have been applied to the // cache to the underlying database. @@ -475,47 +496,23 @@ func (c *dbCache) flush() error { return err } - // Nothing to do if there are no transactions to flush. - if len(c.txLog) == 0 { + // Since the cached keys to be added and removed use an immutable treap, + // a snapshot is simply obtaining the root of the tree under the lock + // which is used to atomically swap the root. + c.cacheLock.RLock() + cachedKeys := c.cachedKeys + cachedRemove := c.cachedRemove + c.cacheLock.RUnlock() + + // Nothing to do if there is no data to flush. + if cachedKeys.Len() == 0 && cachedRemove.Len() == 0 { return nil } - // Perform all leveldb updates using batches for atomicity. - batchLen := 0 - batchTxns := 0 - batch := new(leveldb.Batch) - for logTxNum, txLogEntries := range c.txLog { - // Replay the transaction from the log into the current batch. - for _, logEntry := range txLogEntries { - switch logEntry.entryType { - case entryTypeUpdate: - batch.Put(logEntry.key, logEntry.value) - case entryTypeRemove: - batch.Delete(logEntry.key) - } - } - batchTxns++ - - // Write and reset the current batch when the number of items in - // it exceeds the the batch threshold or this is the last - // transaction in the log. - batchLen += len(txLogEntries) - if batchLen > batchThreshold || logTxNum == len(c.txLog)-1 { - if err := c.ldb.Write(batch, nil); err != nil { - return convertErr("failed to write batch", err) - } - batch.Reset() - batchLen = 0 - - // Clear the transactions that were written from the - // log so the memory can be reclaimed. - for i := logTxNum - (batchTxns - 1); i <= logTxNum; i++ { - c.txLog[i] = nil - } - batchTxns = 0 - } + // Perform all leveldb updates using an atomic transaction. + if err := c.commitTreaps(cachedKeys, cachedRemove); err != nil { + return err } - c.txLog = c.txLog[:] // Clear the cache since it has been flushed. c.cacheLock.Lock() @@ -540,20 +537,13 @@ func (c *dbCache) needsFlush(tx *transaction) bool { return true } - // Calculate the size of the transaction log. - var txLogSize uint64 - for _, txLogEntries := range c.txLog { - txLogSize += uint64(cap(txLogEntries)) * logEntryFieldsSize - } - txLogSize += uint64(cap(c.txLog)) * sliceOverheadSize - // A flush is needed when the size of the database cache exceeds the // specified max cache size. The total calculated size is multiplied by // 1.5 here to account for additional memory consumption that will be // needed during the flush as well as old nodes in the cache that are // referenced by the snapshot used by the transaction. snap := tx.snapshot - totalSize := txLogSize + snap.pendingKeys.Size() + snap.pendingRemove.Size() + totalSize := snap.pendingKeys.Size() + snap.pendingRemove.Size() totalSize = uint64(float64(totalSize) * 1.5) if totalSize > c.maxSize { return true @@ -579,43 +569,28 @@ func (c *dbCache) needsFlush(tx *transaction) bool { // This function MUST be called during a database write transaction which in // turn implies the database write lock will be held. func (c *dbCache) commitTx(tx *transaction) error { - // Flush the cache and write directly to the database if a flush is - // needed. + // Flush the cache and write the current transaction directly to the + // database if a flush is needed. if c.needsFlush(tx) { if err := c.flush(); err != nil { return err } - // Perform all leveldb update operations using a batch for - // atomicity. - batch := new(leveldb.Batch) - tx.pendingKeys.ForEach(func(k, v []byte) bool { - batch.Put(k, v) - return true - }) - tx.pendingKeys = nil - tx.pendingRemove.ForEach(func(k, v []byte) bool { - batch.Delete(k) - return true - }) - tx.pendingRemove = nil - if err := c.ldb.Write(batch, nil); err != nil { - return convertErr("failed to commit transaction", err) + // Perform all leveldb updates using an atomic transaction. + err := c.commitTreaps(tx.pendingKeys, tx.pendingRemove) + if err != nil { + return err } + // Clear the transaction entries since they have been committed. + tx.pendingKeys = nil + tx.pendingRemove = nil return nil } // At this point a database flush is not needed, so atomically commit // the transaction to the cache. - // Create a slice of transaction log entries large enough to house all - // of the updates and add it to the list of logged transactions to - // replay on flush. - numEntries := tx.pendingKeys.Len() + tx.pendingRemove.Len() - txLogEntries := make([]txLogEntry, numEntries) - c.txLog = append(c.txLog, txLogEntries) - // Since the cached keys to be added and removed use an immutable treap, // a snapshot is simply obtaining the root of the tree under the lock // which is used to atomically swap the root. @@ -625,33 +600,17 @@ func (c *dbCache) commitTx(tx *transaction) error { c.cacheLock.RUnlock() // Apply every key to add in the database transaction to the cache. - // Also create a transaction log entry for each one at the same time so - // the database transaction can be replayed during flush. - logEntryNum := 0 tx.pendingKeys.ForEach(func(k, v []byte) bool { newCachedRemove = newCachedRemove.Delete(k) newCachedKeys = newCachedKeys.Put(k, v) - - logEntry := &txLogEntries[logEntryNum] - logEntry.entryType = entryTypeUpdate - logEntry.key = k - logEntry.value = v - logEntryNum++ return true }) tx.pendingKeys = nil // Apply every key to remove in the database transaction to the cache. - // Also create a transaction log entry for each one at the same time so - // the database transaction can be replayed during flush. tx.pendingRemove.ForEach(func(k, v []byte) bool { newCachedKeys = newCachedKeys.Delete(k) newCachedRemove = newCachedRemove.Put(k, nil) - - logEntry := &txLogEntries[logEntryNum] - logEntry.entryType = entryTypeRemove - logEntry.key = k - logEntryNum++ return true }) tx.pendingRemove = nil