diff --git a/database/ffldb/dbcache.go b/database/ffldb/dbcache.go index 52c6737f..9a690c22 100644 --- a/database/ffldb/dbcache.go +++ b/database/ffldb/dbcache.go @@ -290,6 +290,7 @@ type dbCacheSnapshot struct { pendingRemove *treap.Immutable pendingKeysSnap *treap.SnapRecord pendingRemoveSnap *treap.SnapRecord + cacheFlushed int } // Has returns whether or not the passed key exists. @@ -329,8 +330,18 @@ func (snap *dbCacheSnapshot) Get(key []byte) []byte { // Release releases the snapshot. func (snap *dbCacheSnapshot) Release() { snap.dbSnapshot.Release() - snap.pendingKeysSnap.Release() - snap.pendingRemoveSnap.Release() + if snap.cacheFlushed > 0 && snap.pendingKeys != nil { + snap.pendingKeys.Recycle(snap.pendingKeysSnap) + } + if snap.cacheFlushed > 0 && snap.pendingRemove != nil { + snap.pendingRemove.Recycle(snap.pendingRemoveSnap) + } + if snap.pendingKeysSnap != nil { + snap.pendingKeysSnap.Release() + } + if snap.pendingRemoveSnap != nil { + snap.pendingRemoveSnap.Release() + } snap.pendingKeys = nil snap.pendingRemove = nil } @@ -411,11 +422,15 @@ func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) { // which is used to atomically swap the root. c.cacheLock.RLock() cacheSnapshot := &dbCacheSnapshot{ - dbSnapshot: dbSnapshot, - pendingKeys: c.cachedKeys, - pendingRemove: c.cachedRemove, - pendingKeysSnap: c.cachedKeys.Snapshot(), - pendingRemoveSnap: c.cachedRemove.Snapshot(), + dbSnapshot: dbSnapshot, + pendingKeys: c.cachedKeys, + pendingRemove: c.cachedRemove, + } + if cacheSnapshot.pendingKeys != nil { + cacheSnapshot.pendingKeysSnap = cacheSnapshot.pendingKeys.Snapshot() + } + if cacheSnapshot.pendingRemove != nil { + cacheSnapshot.pendingRemoveSnap = cacheSnapshot.pendingRemove.Snapshot() } c.cacheLock.RUnlock() return cacheSnapshot, nil @@ -491,7 +506,7 @@ func (c *dbCache) commitTreaps(pendingKeys, pendingRemove TreapForEacher) error // cache to the underlying database. // // This function MUST be called with the database write lock held. -func (c *dbCache) flush() error { +func (c *dbCache) flush(tx *transaction) error { c.lastFlush = time.Now() // Sync the current write file associated with the block store. This is @@ -525,8 +540,14 @@ func (c *dbCache) flush() error { c.cachedRemove = treap.NewImmutable() c.cacheLock.Unlock() - cachedKeys.Recycle() - cachedRemove.Recycle() + cachedKeys.Recycle(nil) + cachedRemove.Recycle(nil) + + // Make a note that cache was flushed so tx.snapshot.Release() + // can also call Recycle() to free more nodes. + if tx != nil && tx.snapshot != nil { + tx.snapshot.cacheFlushed++ + } return nil } @@ -576,7 +597,7 @@ func (c *dbCache) commitTx(tx *transaction) error { // Flush the cache and write the current transaction directly to the // database if a flush is needed. if c.needsFlush(tx) { - if err := c.flush(); err != nil { + if err := c.flush(tx); err != nil { return err } @@ -646,7 +667,7 @@ func (c *dbCache) commitTx(tx *transaction) error { // This function MUST be called with the database write lock held. func (c *dbCache) Close() error { // Flush any outstanding cached entries to disk. - if err := c.flush(); err != nil { + if err := c.flush(nil); err != nil { // Even if there is an error while flushing, attempt to close // the underlying database. The error is ignored since it would // mask the flush error. diff --git a/database/ffldb/whitebox_test.go b/database/ffldb/whitebox_test.go index 31a8ab34..47ac6b74 100644 --- a/database/ffldb/whitebox_test.go +++ b/database/ffldb/whitebox_test.go @@ -319,7 +319,7 @@ func testWriteFailures(tc *testContext) bool { file: &mockFile{forceSyncErr: true, maxSize: -1}, } store.writeCursor.Unlock() - err := tc.db.(*db).cache.flush() + err := tc.db.(*db).cache.flush(nil) if !checkDbError(tc.t, testName, err, database.ErrDriverSpecific) { return false } diff --git a/database/internal/treap/common.go b/database/internal/treap/common.go index 3ab25045..8df86872 100644 --- a/database/internal/treap/common.go +++ b/database/internal/treap/common.go @@ -24,7 +24,7 @@ const ( // size in that case is acceptable since it avoids the need to import // unsafe. It consists of 24-bytes for each key and value + 8 bytes for // each of the priority, left, and right fields (24*2 + 8*3). - nodeFieldsSize = 80 + nodeFieldsSize = 96 ) var ( @@ -35,10 +35,12 @@ var ( ) const ( - // Generation number for nodes in a Mutable treap. - MutableGeneration int = -1 - // Generation number for nodes in the free Pool. - PoolGeneration int = -2 + // mutableGeneration is the generation number for nodes in a Mutable treap. + mutableGeneration int = -1 + // recycleGeneration indicates node is scheduled for recycling back to nodePool. + recycleGeneration int = -2 + // poolGeneration is the generation number for free nodes in the nodePool. + poolGeneration int = -3 ) // treapNode represents a node in the treap. @@ -49,6 +51,7 @@ type treapNode struct { left *treapNode right *treapNode generation int + next *treapNode } // nodeSize returns the number of bytes the specified node occupies including @@ -57,15 +60,15 @@ func nodeSize(node *treapNode) uint64 { return nodeFieldsSize + uint64(len(node.key)+len(node.value)) } -// Pool of treapNode available for reuse. -var nodePool = &sync.Pool{ - New: func() interface{} { - return &treapNode{key: nil, value: nil, priority: 0, generation: PoolGeneration} - }, +func poolNewTreapNode() interface{} { + return &treapNode{key: nil, value: nil, priority: 0, generation: poolGeneration} } -// getTreapNode returns a new node from the given key, value, and priority. The -// node is not initially linked to any others. +// Pool of treapNode available for reuse. +var nodePool sync.Pool = sync.Pool{New: poolNewTreapNode} + +// getTreapNode returns a node from nodePool with the given key, value, priority, +// and generation. The node is not initially linked to any others. func getTreapNode(key, value []byte, priority int, generation int) *treapNode { n := nodePool.Get().(*treapNode) n.key = key @@ -74,17 +77,22 @@ func getTreapNode(key, value []byte, priority int, generation int) *treapNode { n.left = nil n.right = nil n.generation = generation + n.next = nil return n } -// Put treapNode back in the nodePool for reuse. +// putTreapNode returns a node back to nodePool for reuse. func putTreapNode(n *treapNode) { + if n.generation <= poolGeneration { + panic("double free of treapNode detected") + } n.key = nil n.value = nil n.priority = 0 n.left = nil n.right = nil - n.generation = PoolGeneration + n.generation = poolGeneration + n.next = nil nodePool.Put(n) } diff --git a/database/internal/treap/common_test.go b/database/internal/treap/common_test.go index c43e678d..6cd1363b 100644 --- a/database/internal/treap/common_test.go +++ b/database/internal/treap/common_test.go @@ -49,7 +49,7 @@ testLoop: for j := 0; j < test.numNodes; j++ { var key [4]byte binary.BigEndian.PutUint32(key[:], uint32(j)) - node := newTreapNode(key[:], key[:], 0) + node := getTreapNode(key[:], key[:], 0, 0) nodes = append(nodes, node) } diff --git a/database/internal/treap/immutable.go b/database/internal/treap/immutable.go index a9d59fb3..be58a8af 100644 --- a/database/internal/treap/immutable.go +++ b/database/internal/treap/immutable.go @@ -48,11 +48,11 @@ type Immutable struct { // snap is a pointer to a node in snapshot history linked list. // A value nil means no snapshots are outstanding. - snap *SnapRecord + snap **SnapRecord } // newImmutable returns a new immutable treap given the passed parameters. -func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap *SnapRecord) *Immutable { +func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap **SnapRecord) *Immutable { return &Immutable{root: root, count: count, totalSize: totalSize, generation: generation, snap: snap} } @@ -111,7 +111,7 @@ func (t *Immutable) Get(key []byte) []byte { } // put inserts the passed key/value pair. -func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old parentStack) { +func (t *Immutable) put(key, value []byte) (tp *Immutable, old parentStack) { // Use an empty byte slice for the value when none was provided. This // ultimately allows key existence to be determined from the value since // an empty byte slice is distinguishable from nil. @@ -121,8 +121,8 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare // The node is the root of the tree if there isn't already one. if t.root == nil { - root := getTreapNode(key, value, rand.Int(), t.generation+bumpGen) - return newImmutable(root, 1, nodeSize(root), t.generation+bumpGen, t.snap), parentStack{} + root := getTreapNode(key, value, rand.Int(), t.generation+1) + return newImmutable(root, 1, nodeSize(root), t.generation+1, t.snap), parentStack{} } // Find the binary tree insertion point and construct a replaced list of @@ -169,11 +169,11 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare newRoot := parents.At(parents.Len() - 1) newTotalSize := t.totalSize - uint64(len(node.value)) + uint64(len(value)) - return newImmutable(newRoot, t.count, newTotalSize, t.generation+bumpGen, t.snap), oldParents + return newImmutable(newRoot, t.count, newTotalSize, t.generation+1, t.snap), oldParents } // Link the new node into the binary tree in the correct position. - node := getTreapNode(key, value, rand.Int(), t.generation+bumpGen) + node := getTreapNode(key, value, rand.Int(), t.generation+1) parent := parents.At(0) if compareResult < 0 { parent.left = node @@ -213,20 +213,21 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare } } - return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+bumpGen, t.snap), oldParents + return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+1, t.snap), oldParents } -// Put is the immutable variant of put. Generation number is bumped, and old -// nodes become garbage unless referenced elswhere. +// Put is the immutable variant of put. Old nodes become garbage unless referenced elswhere. func (t *Immutable) Put(key, value []byte) *Immutable { - tp, _ := t.put(key, value, 1) + tp, _ := t.put(key, value) return tp } -// PutM is the mutable variant of put. Generation number is NOT bumped, and old -// nodes are recycled if possible. This is only safe/useful in scenarios where -// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases -// of the intermediate treap states are created or desired. For example: +// PutM is the mutable variant of put. Old nodes are recycled if possible. This is +// only safe in structured scenarios using SnapRecord to track treap instances. +// The outstanding SnapRecords serve to protect nodes from recycling when they might +// be present in one or more snapshots. This is useful in scenarios where multiple +// Put/Delete() ops are applied to a treap and intermediate treap states are not +// created or desired. For example: // // for i := range keys { // t = t.Put(keys[i]) @@ -242,15 +243,20 @@ func (t *Immutable) Put(key, value []byte) *Immutable { // snapshot records. // func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) { - tp, old := (*dest).put(key, value, 0) + tp, old := (*dest).put(key, value) // Examine old nodes and recycle if possible. snapRecordMutex.Lock() defer snapRecordMutex.Unlock() - snapCount := (*dest).snapCount(excluded) + snapCount, maxSnap, minSnap := (*dest).snapCount(nil) for old.Len() > 0 { node := old.Pop() - if node.generation == tp.generation && snapCount == 0 { + if snapCount == 0 || node.generation > maxSnap.generation { putTreapNode(node) + } else { + // Defer recycle until Release() on oldest snap (minSnap). + node.generation = recycleGeneration + node.next = minSnap.recycle + minSnap.recycle = node } } *dest = tp @@ -259,7 +265,7 @@ func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) { // del removes the passed key from the treap and returns the resulting treap // if it exists. The original immutable treap is returned if the key does not // exist. -func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) { +func (t *Immutable) del(key []byte) (d *Immutable, old parentStack) { // Find the node for the key while constructing a list of parents while // doing so. var oldParents parentStack @@ -293,7 +299,7 @@ func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) // being deleted, there is nothing else to do besides removing it. parent := oldParents.At(1) if parent == nil && delNode.left == nil && delNode.right == nil { - return newImmutable(nil, 0, 0, t.generation+bumpGen, t.snap), oldParents + return newImmutable(nil, 0, 0, t.generation+1, t.snap), oldParents } // Construct a replaced list of parents and the node to delete itself. @@ -374,20 +380,21 @@ func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) parent.left = nil } - return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+bumpGen, t.snap), oldParents + return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+1, t.snap), oldParents } -// Delete is the immutable variant of del. Generation number is bumped, and old -// nodes become garbage unless referenced elswhere. +// Delete is the immutable variant of del. Old nodes become garbage unless referenced elswhere. func (t *Immutable) Delete(key []byte) *Immutable { - tp, _ := t.del(key, 1) + tp, _ := t.del(key) return tp } -// DeleteM is the mutable variant of del. Generation number is NOT bumped, and old -// nodes are recycled if possible. This is only safe/useful in scenarios where -// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases -// of the intermediate treap states are created or desired. For example: +// DeleteM is the mutable variant of del. Old nodes are recycled if possible. This is +// only safe in structured scenarios using SnapRecord to track treap instances. +// The outstanding SnapRecords serve to protect nodes from recycling when they might +// be present in one or more snapshots. This is useful in scenarios where multiple +// Put/Delete() ops are applied to a treap and intermediate treap states are not +// created or desired. For example: // // for i := range keys { // t = t.Delete(keys[i]) @@ -403,15 +410,20 @@ func (t *Immutable) Delete(key []byte) *Immutable { // snapshot records. // func DeleteM(dest **Immutable, key []byte, excluded *SnapRecord) { - tp, old := (*dest).del(key, 0) + tp, old := (*dest).del(key) // Examine old nodes and recycle if possible. snapRecordMutex.Lock() defer snapRecordMutex.Unlock() - snapCount := (*dest).snapCount(excluded) + snapCount, maxSnap, minSnap := (*dest).snapCount(nil) for old.Len() > 0 { node := old.Pop() - if node.generation == tp.generation && snapCount == 0 { + if snapCount == 0 || node.generation > maxSnap.generation { putTreapNode(node) + } else { + // Defer recycle until Release() on oldest snap (minSnap). + node.generation = recycleGeneration + node.next = minSnap.recycle + minSnap.recycle = node } } *dest = tp @@ -447,31 +459,45 @@ func NewImmutable() *Immutable { return &Immutable{} } -// SnapRecord assists in tracking/releasing outstanding snapshots. +// SnapRecord assists in tracking outstanding snapshots. While a SnapRecord +// is present and has not been Released(), treap nodes at or below this +// generation are protected from Recycle(). type SnapRecord struct { - prev *SnapRecord - next *SnapRecord + generation int + rp **SnapRecord + prev *SnapRecord + next *SnapRecord + recycle *treapNode } var snapRecordMutex sync.Mutex -// Snapshot makes a SnapRecord and linkis it into the snapshot history of a treap. +// Snapshot makes a SnapRecord and links it into the snapshot history of a treap. func (t *Immutable) Snapshot() *SnapRecord { snapRecordMutex.Lock() defer snapRecordMutex.Unlock() - // Link this record so it follows the existing t.snap record, if any. - prev := t.snap + rp := t.snap var next *SnapRecord = nil - if prev != nil { - next = prev.next - } - t.snap = &SnapRecord{prev: prev, next: next} - if prev != nil { - prev.next = t.snap + var prev *SnapRecord = nil + if rp != nil { + prev = *rp + if *rp != nil { + next = (*rp).next + } } - return t.snap + // Create a new record stamped with the current generation. Link it + // following the existing snapshot record, if any. + p := new(*SnapRecord) + *p = &SnapRecord{generation: t.generation, rp: p, prev: prev, next: next} + t.snap = p + + if rp != nil && *rp != nil { + (*rp).next = *(t.snap) + } + + return *(t.snap) } // Release of SnapRecord unlinks that record from the snapshot history of a treap. @@ -480,45 +506,73 @@ func (r *SnapRecord) Release() { defer snapRecordMutex.Unlock() // Unlink this record. - if r.prev != nil { - r.prev.next = r.next - } + *(r.rp) = nil if r.next != nil { r.next.prev = r.prev + *(r.rp) = r.next + } + if r.prev != nil { + r.prev.next = r.next + *(r.rp) = r.prev + } + + // Handle deferred recycle list. + for node := r.recycle; node != nil; { + next := node.next + putTreapNode(node) + node = next } } // snapCount returns the number of snapshots outstanding which were created // but not released. When snapshots are absent, mutable PutM()/DeleteM() can -// recycle nodes more aggressively. The record "exclude" is not counted. -func (t *Immutable) snapCount(exclude *SnapRecord) int { +// recycle nodes more aggressively. The record "excluded" is not counted. +func (t *Immutable) snapCount(excluded *SnapRecord) (count int, maxSnap, minSnap *SnapRecord) { // snapRecordMutex should be locked already - sum := 0 - if t.snap == nil { + count, maxSnap, minSnap = 0, nil, nil + if t.snap == nil || *(t.snap) == nil { // No snapshots. - return sum + return count, maxSnap, minSnap } // Count snapshots taken BEFORE creation of this instance. - for h := t.snap; h != nil; h = h.prev { - if h != exclude { - sum++ + for h := *(t.snap); h != nil; h = h.prev { + if h != excluded { + count++ + if maxSnap == nil || maxSnap.generation < h.generation { + maxSnap = h + } + if minSnap == nil || minSnap.generation > h.generation { + minSnap = h + } } } // Count snapshots taken AFTER creation of this instance. - for h := t.snap.next; h != nil; h = h.next { - if h != exclude { - sum++ + for h := (*(t.snap)).next; h != nil; h = h.next { + if h != excluded { + count++ + if maxSnap == nil || maxSnap.generation < h.generation { + maxSnap = h + } + if minSnap == nil || minSnap.generation > h.generation { + minSnap = h + } } } - return sum + return count, maxSnap, minSnap } -func (t *Immutable) Recycle() { - snapCount := t.snapCount(nil) - 1 +func (t *Immutable) Recycle(excluded *SnapRecord) { + snapRecordMutex.Lock() + _, maxSnap, _ := t.snapCount(excluded) + snapGen := 0 + if maxSnap != nil { + snapGen = maxSnap.generation + } + snapRecordMutex.Unlock() var parents parentStack for node := t.root; node != nil; node = node.left { @@ -534,7 +588,10 @@ func (t *Immutable) Recycle() { parents.Push(n) } - if node.generation == t.generation && snapCount == 0 { + // Recycle node if it cannot be in a snapshot. Note that nodes + // scheduled for deferred recycling will have negative generation + // (recycleGeneration) and will not qualify. + if node.generation > snapGen { putTreapNode(node) } } diff --git a/database/internal/treap/mutable.go b/database/internal/treap/mutable.go index 1ded6808..3289d318 100644 --- a/database/internal/treap/mutable.go +++ b/database/internal/treap/mutable.go @@ -21,6 +21,10 @@ type Mutable struct { // totalSize is the best estimate of the total size of of all data in // the treap including the keys, values, and node sizes. totalSize uint64 + + // generation number is the constant mutableGeneration, unless + // creation of a treap.Iterator bumps it. + generation int } // Len returns the number of items stored in the treap. @@ -113,7 +117,7 @@ func (t *Mutable) Put(key, value []byte) { // The node is the root of the tree if there isn't already one. if t.root == nil { - node := getTreapNode(key, value, rand.Int(), MutableGeneration) + node := getTreapNode(key, value, rand.Int(), t.generation) t.count = 1 t.totalSize = nodeSize(node) t.root = node @@ -145,7 +149,7 @@ func (t *Mutable) Put(key, value []byte) { } // Link the new node into the binary tree in the correct position. - node := getTreapNode(key, value, rand.Int(), MutableGeneration) + node := getTreapNode(key, value, rand.Int(), t.generation) t.count++ t.totalSize += nodeSize(node) parent := parents.At(0) @@ -190,7 +194,9 @@ func (t *Mutable) Delete(key []byte) { t.root = nil t.count = 0 t.totalSize = 0 - putTreapNode(node) + if node.generation == t.generation && node.generation == mutableGeneration { + putTreapNode(node) + } return } @@ -239,7 +245,9 @@ func (t *Mutable) Delete(key []byte) { } t.count-- t.totalSize -= nodeSize(node) - putTreapNode(node) + if node.generation == t.generation && node.generation == mutableGeneration { + putTreapNode(node) + } } // ForEach invokes the passed function with every key/value pair in the treap @@ -276,7 +284,7 @@ func (t *Mutable) Reset() { // NewMutable returns a new empty mutable treap ready for use. See the // documentation for the Mutable structure for more details. func NewMutable() *Mutable { - return &Mutable{} + return &Mutable{generation: mutableGeneration} } func (t *Mutable) Recycle() { @@ -294,7 +302,7 @@ func (t *Mutable) Recycle() { parents.Push(n) } - if node.generation == MutableGeneration { + if node.generation == t.generation && node.generation == mutableGeneration { putTreapNode(node) } } diff --git a/database/internal/treap/treapiter.go b/database/internal/treap/treapiter.go index d6981aaf..81028ed7 100644 --- a/database/internal/treap/treapiter.go +++ b/database/internal/treap/treapiter.go @@ -326,6 +326,7 @@ func (iter *Iterator) ForceReseek() { // } // } func (t *Mutable) Iterator(startKey, limitKey []byte) *Iterator { + t.generation++ iter := &Iterator{ t: t, root: t.root,