From a7a30339cb850191f8fc2f4ab4b58e9ccaf3b3b1 Mon Sep 17 00:00:00 2001 From: Brannon King Date: Thu, 22 Jul 2021 11:24:05 -0400 Subject: [PATCH 1/3] recycle treap nodes --- database/ffldb/dbcache.go | 10 +++++++ database/internal/treap/common.go | 7 +++++ database/internal/treap/immutable.go | 42 ++++++++++++++++++++++------ database/internal/treap/mutable.go | 25 ++++++++++++++++- 4 files changed, 75 insertions(+), 9 deletions(-) diff --git a/database/ffldb/dbcache.go b/database/ffldb/dbcache.go index eff239c6..3f06d0e7 100644 --- a/database/ffldb/dbcache.go +++ b/database/ffldb/dbcache.go @@ -516,6 +516,9 @@ func (c *dbCache) flush() error { return err } + cachedKeys.Recycle() + cachedRemove.Recycle() + return nil } @@ -574,9 +577,16 @@ func (c *dbCache) commitTx(tx *transaction) error { return err } + pk := tx.pendingKeys + pr := tx.pendingRemove + // Clear the transaction entries since they have been committed. tx.pendingKeys = nil tx.pendingRemove = nil + + pk.Recycle() + pr.Recycle() + return nil } diff --git a/database/internal/treap/common.go b/database/internal/treap/common.go index 090a7bd5..011aaffa 100644 --- a/database/internal/treap/common.go +++ b/database/internal/treap/common.go @@ -42,6 +42,13 @@ type treapNode struct { right *treapNode } +func (n *treapNode) Reset() { + n.key = nil + n.value = nil + n.left = nil + n.right = nil +} + // nodeSize returns the number of bytes the specified node occupies including // the struct fields and the contents of the key and value. func nodeSize(node *treapNode) uint64 { diff --git a/database/internal/treap/immutable.go b/database/internal/treap/immutable.go index a6e13ff4..a23c6f70 100644 --- a/database/internal/treap/immutable.go +++ b/database/internal/treap/immutable.go @@ -7,17 +7,20 @@ package treap import ( "bytes" "math/rand" + "sync" ) +var nodePool = &sync.Pool{New: func() interface{} { return newTreapNode(nil, nil, 0) }} + // cloneTreapNode returns a shallow copy of the passed node. func cloneTreapNode(node *treapNode) *treapNode { - return &treapNode{ - key: node.key, - value: node.value, - priority: node.priority, - left: node.left, - right: node.right, - } + clone := nodePool.Get().(*treapNode) + clone.key = node.key + clone.value = node.value + clone.priority = node.priority + clone.left = node.left + clone.right = node.right + return clone } // Immutable represents a treap data structure which is used to hold ordered @@ -165,7 +168,10 @@ func (t *Immutable) Put(key, value []byte) *Immutable { } // Link the new node into the binary tree in the correct position. - node := newTreapNode(key, value, rand.Int()) + node := nodePool.Get().(*treapNode) + node.key = key + node.value = value + node.priority = rand.Int() parent := parents.At(0) if compareResult < 0 { parent.left = node @@ -358,3 +364,23 @@ func (t *Immutable) ForEach(fn func(k, v []byte) bool) { func NewImmutable() *Immutable { return &Immutable{} } + +func (t *Immutable) Recycle() { + var parents parentStack + for node := t.root; node != nil; node = node.left { + parents.Push(node) + } + + for parents.Len() > 0 { + node := parents.Pop() + + // Extend the nodes to traverse by all children to the left of + // the current node's right child. + for n := node.right; n != nil; n = n.left { + parents.Push(n) + } + + node.Reset() + nodePool.Put(node) + } +} diff --git a/database/internal/treap/mutable.go b/database/internal/treap/mutable.go index 84ebe671..c68f326e 100644 --- a/database/internal/treap/mutable.go +++ b/database/internal/treap/mutable.go @@ -145,7 +145,10 @@ func (t *Mutable) Put(key, value []byte) { } // Link the new node into the binary tree in the correct position. - node := newTreapNode(key, value, rand.Int()) + node := nodePool.Get().(*treapNode) + node.key = key + node.value = value + node.priority = rand.Int() t.count++ t.totalSize += nodeSize(node) parent := parents.At(0) @@ -276,3 +279,23 @@ func (t *Mutable) Reset() { func NewMutable() *Mutable { return &Mutable{} } + +func (t *Mutable) Recycle() { + var parents parentStack + for node := t.root; node != nil; node = node.left { + parents.Push(node) + } + + for parents.Len() > 0 { + node := parents.Pop() + + // Extend the nodes to traverse by all children to the left of + // the current node's right child. + for n := node.right; n != nil; n = n.left { + parents.Push(n) + } + + node.Reset() + nodePool.Put(node) + } +} -- 2.45.2 From 0ab37e154175597007b636691cde908771403c3f Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Sat, 28 May 2022 07:37:53 -0400 Subject: [PATCH 2/3] Distinguish between Mutable/Immutable treap node recycling, and make Mutable treap recycle nodes consistently. Rework Immutable treap node recycling attempting to make it safer in the presence of code that takes snapshots (dbCacheSnapshot) of the treap. Add special mutable PutM and DeleteM methods which DB transaction can use to apply changes more efficiently without creating lots of garbage memory. --- database/ffldb/dbcache.go | 42 ++++-- database/internal/treap/common.go | 59 ++++++-- database/internal/treap/immutable.go | 215 +++++++++++++++++++++++---- database/internal/treap/mutable.go | 14 +- 4 files changed, 263 insertions(+), 67 deletions(-) diff --git a/database/ffldb/dbcache.go b/database/ffldb/dbcache.go index 3f06d0e7..52c6737f 100644 --- a/database/ffldb/dbcache.go +++ b/database/ffldb/dbcache.go @@ -285,9 +285,11 @@ func (iter *dbCacheIterator) Error() error { // dbCacheSnapshot defines a snapshot of the database cache and underlying // database at a particular point in time. type dbCacheSnapshot struct { - dbSnapshot *leveldb.Snapshot - pendingKeys *treap.Immutable - pendingRemove *treap.Immutable + dbSnapshot *leveldb.Snapshot + pendingKeys *treap.Immutable + pendingRemove *treap.Immutable + pendingKeysSnap *treap.SnapRecord + pendingRemoveSnap *treap.SnapRecord } // Has returns whether or not the passed key exists. @@ -327,6 +329,8 @@ func (snap *dbCacheSnapshot) Get(key []byte) []byte { // Release releases the snapshot. func (snap *dbCacheSnapshot) Release() { snap.dbSnapshot.Release() + snap.pendingKeysSnap.Release() + snap.pendingRemoveSnap.Release() snap.pendingKeys = nil snap.pendingRemove = nil } @@ -407,9 +411,11 @@ func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) { // which is used to atomically swap the root. c.cacheLock.RLock() cacheSnapshot := &dbCacheSnapshot{ - dbSnapshot: dbSnapshot, - pendingKeys: c.cachedKeys, - pendingRemove: c.cachedRemove, + dbSnapshot: dbSnapshot, + pendingKeys: c.cachedKeys, + pendingRemove: c.cachedRemove, + pendingKeysSnap: c.cachedKeys.Snapshot(), + pendingRemoveSnap: c.cachedRemove.Snapshot(), } c.cacheLock.RUnlock() return cacheSnapshot, nil @@ -499,12 +505,10 @@ func (c *dbCache) flush() error { // Since the cached keys to be added and removed use an immutable treap, // a snapshot is simply obtaining the root of the tree under the lock // which is used to atomically swap the root. - c.cacheLock.Lock() + c.cacheLock.RLock() cachedKeys := c.cachedKeys cachedRemove := c.cachedRemove - c.cachedKeys = treap.NewImmutable() - c.cachedRemove = treap.NewImmutable() - c.cacheLock.Unlock() + c.cacheLock.RUnlock() // Nothing to do if there is no data to flush. if cachedKeys.Len() == 0 && cachedRemove.Len() == 0 { @@ -516,6 +520,11 @@ func (c *dbCache) flush() error { return err } + c.cacheLock.Lock() + c.cachedKeys = treap.NewImmutable() + c.cachedRemove = treap.NewImmutable() + c.cacheLock.Unlock() + cachedKeys.Recycle() cachedRemove.Recycle() @@ -603,19 +612,23 @@ func (c *dbCache) commitTx(tx *transaction) error { // Apply every key to add in the database transaction to the cache. tx.pendingKeys.ForEach(func(k, v []byte) bool { - newCachedRemove = newCachedRemove.Delete(k) - newCachedKeys = newCachedKeys.Put(k, v) + treap.DeleteM(&newCachedRemove, k, tx.snapshot.pendingRemoveSnap) + treap.PutM(&newCachedKeys, k, v, tx.snapshot.pendingKeysSnap) return true }) + pk := tx.pendingKeys tx.pendingKeys = nil + pk.Recycle() // Apply every key to remove in the database transaction to the cache. tx.pendingRemove.ForEach(func(k, v []byte) bool { - newCachedKeys = newCachedKeys.Delete(k) - newCachedRemove = newCachedRemove.Put(k, nil) + treap.DeleteM(&newCachedKeys, k, tx.snapshot.pendingKeysSnap) + treap.PutM(&newCachedRemove, k, nil, tx.snapshot.pendingRemoveSnap) return true }) + pr := tx.pendingRemove tx.pendingRemove = nil + pr.Recycle() // Atomically replace the immutable treaps which hold the cached keys to // add and delete. @@ -623,6 +636,7 @@ func (c *dbCache) commitTx(tx *transaction) error { c.cachedKeys = newCachedKeys c.cachedRemove = newCachedRemove c.cacheLock.Unlock() + return nil } diff --git a/database/internal/treap/common.go b/database/internal/treap/common.go index 011aaffa..3ab25045 100644 --- a/database/internal/treap/common.go +++ b/database/internal/treap/common.go @@ -6,6 +6,7 @@ package treap import ( "math/rand" + "sync" "time" ) @@ -23,7 +24,7 @@ const ( // size in that case is acceptable since it avoids the need to import // unsafe. It consists of 24-bytes for each key and value + 8 bytes for // each of the priority, left, and right fields (24*2 + 8*3). - nodeFieldsSize = 72 + nodeFieldsSize = 80 ) var ( @@ -33,20 +34,21 @@ var ( emptySlice = make([]byte, 0) ) +const ( + // Generation number for nodes in a Mutable treap. + MutableGeneration int = -1 + // Generation number for nodes in the free Pool. + PoolGeneration int = -2 +) + // treapNode represents a node in the treap. type treapNode struct { - key []byte - value []byte - priority int - left *treapNode - right *treapNode -} - -func (n *treapNode) Reset() { - n.key = nil - n.value = nil - n.left = nil - n.right = nil + key []byte + value []byte + priority int + left *treapNode + right *treapNode + generation int } // nodeSize returns the number of bytes the specified node occupies including @@ -55,10 +57,35 @@ func nodeSize(node *treapNode) uint64 { return nodeFieldsSize + uint64(len(node.key)+len(node.value)) } -// newTreapNode returns a new node from the given key, value, and priority. The +// Pool of treapNode available for reuse. +var nodePool = &sync.Pool{ + New: func() interface{} { + return &treapNode{key: nil, value: nil, priority: 0, generation: PoolGeneration} + }, +} + +// getTreapNode returns a new node from the given key, value, and priority. The // node is not initially linked to any others. -func newTreapNode(key, value []byte, priority int) *treapNode { - return &treapNode{key: key, value: value, priority: priority} +func getTreapNode(key, value []byte, priority int, generation int) *treapNode { + n := nodePool.Get().(*treapNode) + n.key = key + n.value = value + n.priority = priority + n.left = nil + n.right = nil + n.generation = generation + return n +} + +// Put treapNode back in the nodePool for reuse. +func putTreapNode(n *treapNode) { + n.key = nil + n.value = nil + n.priority = 0 + n.left = nil + n.right = nil + n.generation = PoolGeneration + nodePool.Put(n) } // parentStack represents a stack of parent treap nodes that are used during diff --git a/database/internal/treap/immutable.go b/database/internal/treap/immutable.go index a23c6f70..a9d59fb3 100644 --- a/database/internal/treap/immutable.go +++ b/database/internal/treap/immutable.go @@ -10,14 +10,9 @@ import ( "sync" ) -var nodePool = &sync.Pool{New: func() interface{} { return newTreapNode(nil, nil, 0) }} - // cloneTreapNode returns a shallow copy of the passed node. func cloneTreapNode(node *treapNode) *treapNode { - clone := nodePool.Get().(*treapNode) - clone.key = node.key - clone.value = node.value - clone.priority = node.priority + clone := getTreapNode(node.key, node.value, node.priority, node.generation+1) clone.left = node.left clone.right = node.right return clone @@ -46,11 +41,19 @@ type Immutable struct { // totalSize is the best estimate of the total size of of all data in // the treap including the keys, values, and node sizes. totalSize uint64 + + // generation number starts at 0 after NewImmutable(), and + // is incremented with every Put()/Delete(). + generation int + + // snap is a pointer to a node in snapshot history linked list. + // A value nil means no snapshots are outstanding. + snap *SnapRecord } // newImmutable returns a new immutable treap given the passed parameters. -func newImmutable(root *treapNode, count int, totalSize uint64) *Immutable { - return &Immutable{root: root, count: count, totalSize: totalSize} +func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap *SnapRecord) *Immutable { + return &Immutable{root: root, count: count, totalSize: totalSize, generation: generation, snap: snap} } // Len returns the number of items stored in the treap. @@ -107,8 +110,8 @@ func (t *Immutable) Get(key []byte) []byte { return nil } -// Put inserts the passed key/value pair. -func (t *Immutable) Put(key, value []byte) *Immutable { +// put inserts the passed key/value pair. +func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old parentStack) { // Use an empty byte slice for the value when none was provided. This // ultimately allows key existence to be determined from the value since // an empty byte slice is distinguishable from nil. @@ -118,8 +121,8 @@ func (t *Immutable) Put(key, value []byte) *Immutable { // The node is the root of the tree if there isn't already one. if t.root == nil { - root := newTreapNode(key, value, rand.Int()) - return newImmutable(root, 1, nodeSize(root)) + root := getTreapNode(key, value, rand.Int(), t.generation+bumpGen) + return newImmutable(root, 1, nodeSize(root), t.generation+bumpGen, t.snap), parentStack{} } // Find the binary tree insertion point and construct a replaced list of @@ -131,9 +134,11 @@ func (t *Immutable) Put(key, value []byte) *Immutable { // When the key matches an entry already in the treap, replace the node // with a new one that has the new value set and return. var parents parentStack + var oldParents parentStack var compareResult int for node := t.root; node != nil; { // Clone the node and link its parent to it if needed. + oldParents.Push(node) nodeCopy := cloneTreapNode(node) if oldParent := parents.At(0); oldParent != nil { if oldParent.left == node { @@ -164,14 +169,11 @@ func (t *Immutable) Put(key, value []byte) *Immutable { newRoot := parents.At(parents.Len() - 1) newTotalSize := t.totalSize - uint64(len(node.value)) + uint64(len(value)) - return newImmutable(newRoot, t.count, newTotalSize) + return newImmutable(newRoot, t.count, newTotalSize, t.generation+bumpGen, t.snap), oldParents } // Link the new node into the binary tree in the correct position. - node := nodePool.Get().(*treapNode) - node.key = key - node.value = value - node.priority = rand.Int() + node := getTreapNode(key, value, rand.Int(), t.generation+bumpGen) parent := parents.At(0) if compareResult < 0 { parent.left = node @@ -211,19 +213,59 @@ func (t *Immutable) Put(key, value []byte) *Immutable { } } - return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node)) + return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+bumpGen, t.snap), oldParents } -// Delete removes the passed key from the treap and returns the resulting treap +// Put is the immutable variant of put. Generation number is bumped, and old +// nodes become garbage unless referenced elswhere. +func (t *Immutable) Put(key, value []byte) *Immutable { + tp, _ := t.put(key, value, 1) + return tp +} + +// PutM is the mutable variant of put. Generation number is NOT bumped, and old +// nodes are recycled if possible. This is only safe/useful in scenarios where +// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases +// of the intermediate treap states are created or desired. For example: +// +// for i := range keys { +// t = t.Put(keys[i]) +// } +// +// ...may be replaced with: +// +// for i := range keys { +// PutM(t, keys[i], nil) +// } +// +// If "excluded" is provided, that snapshot is ignored when counting +// snapshot records. +// +func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) { + tp, old := (*dest).put(key, value, 0) + // Examine old nodes and recycle if possible. + snapRecordMutex.Lock() + defer snapRecordMutex.Unlock() + snapCount := (*dest).snapCount(excluded) + for old.Len() > 0 { + node := old.Pop() + if node.generation == tp.generation && snapCount == 0 { + putTreapNode(node) + } + } + *dest = tp +} + +// del removes the passed key from the treap and returns the resulting treap // if it exists. The original immutable treap is returned if the key does not // exist. -func (t *Immutable) Delete(key []byte) *Immutable { +func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) { // Find the node for the key while constructing a list of parents while // doing so. - var parents parentStack + var oldParents parentStack var delNode *treapNode for node := t.root; node != nil; { - parents.Push(node) + oldParents.Push(node) // Traverse left or right depending on the result of the // comparison. @@ -244,14 +286,14 @@ func (t *Immutable) Delete(key []byte) *Immutable { // There is nothing to do if the key does not exist. if delNode == nil { - return t + return t, parentStack{} } // When the only node in the tree is the root node and it is the one // being deleted, there is nothing else to do besides removing it. - parent := parents.At(1) + parent := oldParents.At(1) if parent == nil && delNode.left == nil && delNode.right == nil { - return newImmutable(nil, 0, 0) + return newImmutable(nil, 0, 0, t.generation+bumpGen, t.snap), oldParents } // Construct a replaced list of parents and the node to delete itself. @@ -259,8 +301,8 @@ func (t *Immutable) Delete(key []byte) *Immutable { // therefore all ancestors of the node that will be deleted, up to and // including the root, need to be replaced. var newParents parentStack - for i := parents.Len(); i > 0; i-- { - node := parents.At(i - 1) + for i := oldParents.Len(); i > 0; i-- { + node := oldParents.At(i - 1) nodeCopy := cloneTreapNode(node) if oldParent := newParents.At(0); oldParent != nil { if oldParent.left == node { @@ -332,7 +374,47 @@ func (t *Immutable) Delete(key []byte) *Immutable { parent.left = nil } - return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode)) + return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+bumpGen, t.snap), oldParents +} + +// Delete is the immutable variant of del. Generation number is bumped, and old +// nodes become garbage unless referenced elswhere. +func (t *Immutable) Delete(key []byte) *Immutable { + tp, _ := t.del(key, 1) + return tp +} + +// DeleteM is the mutable variant of del. Generation number is NOT bumped, and old +// nodes are recycled if possible. This is only safe/useful in scenarios where +// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases +// of the intermediate treap states are created or desired. For example: +// +// for i := range keys { +// t = t.Delete(keys[i]) +// } +// +// ...may be replaced with: +// +// for i := range keys { +// DeleteM(t, keys[i], nil) +// } +// +// If "excluded" is provided, that snapshot is ignored when counting +// snapshot records. +// +func DeleteM(dest **Immutable, key []byte, excluded *SnapRecord) { + tp, old := (*dest).del(key, 0) + // Examine old nodes and recycle if possible. + snapRecordMutex.Lock() + defer snapRecordMutex.Unlock() + snapCount := (*dest).snapCount(excluded) + for old.Len() > 0 { + node := old.Pop() + if node.generation == tp.generation && snapCount == 0 { + putTreapNode(node) + } + } + *dest = tp } // ForEach invokes the passed function with every key/value pair in the treap @@ -365,7 +447,79 @@ func NewImmutable() *Immutable { return &Immutable{} } +// SnapRecord assists in tracking/releasing outstanding snapshots. +type SnapRecord struct { + prev *SnapRecord + next *SnapRecord +} + +var snapRecordMutex sync.Mutex + +// Snapshot makes a SnapRecord and linkis it into the snapshot history of a treap. +func (t *Immutable) Snapshot() *SnapRecord { + snapRecordMutex.Lock() + defer snapRecordMutex.Unlock() + + // Link this record so it follows the existing t.snap record, if any. + prev := t.snap + var next *SnapRecord = nil + if prev != nil { + next = prev.next + } + t.snap = &SnapRecord{prev: prev, next: next} + if prev != nil { + prev.next = t.snap + } + + return t.snap +} + +// Release of SnapRecord unlinks that record from the snapshot history of a treap. +func (r *SnapRecord) Release() { + snapRecordMutex.Lock() + defer snapRecordMutex.Unlock() + + // Unlink this record. + if r.prev != nil { + r.prev.next = r.next + } + if r.next != nil { + r.next.prev = r.prev + } +} + +// snapCount returns the number of snapshots outstanding which were created +// but not released. When snapshots are absent, mutable PutM()/DeleteM() can +// recycle nodes more aggressively. The record "exclude" is not counted. +func (t *Immutable) snapCount(exclude *SnapRecord) int { + // snapRecordMutex should be locked already + + sum := 0 + if t.snap == nil { + // No snapshots. + return sum + } + + // Count snapshots taken BEFORE creation of this instance. + for h := t.snap; h != nil; h = h.prev { + if h != exclude { + sum++ + } + } + + // Count snapshots taken AFTER creation of this instance. + for h := t.snap.next; h != nil; h = h.next { + if h != exclude { + sum++ + } + } + + return sum +} + func (t *Immutable) Recycle() { + snapCount := t.snapCount(nil) - 1 + var parents parentStack for node := t.root; node != nil; node = node.left { parents.Push(node) @@ -380,7 +534,8 @@ func (t *Immutable) Recycle() { parents.Push(n) } - node.Reset() - nodePool.Put(node) + if node.generation == t.generation && snapCount == 0 { + putTreapNode(node) + } } } diff --git a/database/internal/treap/mutable.go b/database/internal/treap/mutable.go index c68f326e..1ded6808 100644 --- a/database/internal/treap/mutable.go +++ b/database/internal/treap/mutable.go @@ -113,7 +113,7 @@ func (t *Mutable) Put(key, value []byte) { // The node is the root of the tree if there isn't already one. if t.root == nil { - node := newTreapNode(key, value, rand.Int()) + node := getTreapNode(key, value, rand.Int(), MutableGeneration) t.count = 1 t.totalSize = nodeSize(node) t.root = node @@ -145,10 +145,7 @@ func (t *Mutable) Put(key, value []byte) { } // Link the new node into the binary tree in the correct position. - node := nodePool.Get().(*treapNode) - node.key = key - node.value = value - node.priority = rand.Int() + node := getTreapNode(key, value, rand.Int(), MutableGeneration) t.count++ t.totalSize += nodeSize(node) parent := parents.At(0) @@ -193,6 +190,7 @@ func (t *Mutable) Delete(key []byte) { t.root = nil t.count = 0 t.totalSize = 0 + putTreapNode(node) return } @@ -241,6 +239,7 @@ func (t *Mutable) Delete(key []byte) { } t.count-- t.totalSize -= nodeSize(node) + putTreapNode(node) } // ForEach invokes the passed function with every key/value pair in the treap @@ -295,7 +294,8 @@ func (t *Mutable) Recycle() { parents.Push(n) } - node.Reset() - nodePool.Put(node) + if node.generation == MutableGeneration { + putTreapNode(node) + } } } -- 2.45.2 From d2bc36bc8c065167a39b4bc0aa2b598a51a1686f Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Thu, 2 Jun 2022 14:03:26 -0400 Subject: [PATCH 3/3] Fix lots of flaws in treap snapshot tracking. Update tests to make them work. --- database/ffldb/dbcache.go | 45 ++++-- database/ffldb/whitebox_test.go | 2 +- database/internal/treap/common.go | 36 +++-- database/internal/treap/common_test.go | 2 +- database/internal/treap/immutable.go | 181 ++++++++++++++++--------- database/internal/treap/mutable.go | 20 ++- database/internal/treap/treapiter.go | 1 + 7 files changed, 191 insertions(+), 96 deletions(-) diff --git a/database/ffldb/dbcache.go b/database/ffldb/dbcache.go index 52c6737f..9a690c22 100644 --- a/database/ffldb/dbcache.go +++ b/database/ffldb/dbcache.go @@ -290,6 +290,7 @@ type dbCacheSnapshot struct { pendingRemove *treap.Immutable pendingKeysSnap *treap.SnapRecord pendingRemoveSnap *treap.SnapRecord + cacheFlushed int } // Has returns whether or not the passed key exists. @@ -329,8 +330,18 @@ func (snap *dbCacheSnapshot) Get(key []byte) []byte { // Release releases the snapshot. func (snap *dbCacheSnapshot) Release() { snap.dbSnapshot.Release() - snap.pendingKeysSnap.Release() - snap.pendingRemoveSnap.Release() + if snap.cacheFlushed > 0 && snap.pendingKeys != nil { + snap.pendingKeys.Recycle(snap.pendingKeysSnap) + } + if snap.cacheFlushed > 0 && snap.pendingRemove != nil { + snap.pendingRemove.Recycle(snap.pendingRemoveSnap) + } + if snap.pendingKeysSnap != nil { + snap.pendingKeysSnap.Release() + } + if snap.pendingRemoveSnap != nil { + snap.pendingRemoveSnap.Release() + } snap.pendingKeys = nil snap.pendingRemove = nil } @@ -411,11 +422,15 @@ func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) { // which is used to atomically swap the root. c.cacheLock.RLock() cacheSnapshot := &dbCacheSnapshot{ - dbSnapshot: dbSnapshot, - pendingKeys: c.cachedKeys, - pendingRemove: c.cachedRemove, - pendingKeysSnap: c.cachedKeys.Snapshot(), - pendingRemoveSnap: c.cachedRemove.Snapshot(), + dbSnapshot: dbSnapshot, + pendingKeys: c.cachedKeys, + pendingRemove: c.cachedRemove, + } + if cacheSnapshot.pendingKeys != nil { + cacheSnapshot.pendingKeysSnap = cacheSnapshot.pendingKeys.Snapshot() + } + if cacheSnapshot.pendingRemove != nil { + cacheSnapshot.pendingRemoveSnap = cacheSnapshot.pendingRemove.Snapshot() } c.cacheLock.RUnlock() return cacheSnapshot, nil @@ -491,7 +506,7 @@ func (c *dbCache) commitTreaps(pendingKeys, pendingRemove TreapForEacher) error // cache to the underlying database. // // This function MUST be called with the database write lock held. -func (c *dbCache) flush() error { +func (c *dbCache) flush(tx *transaction) error { c.lastFlush = time.Now() // Sync the current write file associated with the block store. This is @@ -525,8 +540,14 @@ func (c *dbCache) flush() error { c.cachedRemove = treap.NewImmutable() c.cacheLock.Unlock() - cachedKeys.Recycle() - cachedRemove.Recycle() + cachedKeys.Recycle(nil) + cachedRemove.Recycle(nil) + + // Make a note that cache was flushed so tx.snapshot.Release() + // can also call Recycle() to free more nodes. + if tx != nil && tx.snapshot != nil { + tx.snapshot.cacheFlushed++ + } return nil } @@ -576,7 +597,7 @@ func (c *dbCache) commitTx(tx *transaction) error { // Flush the cache and write the current transaction directly to the // database if a flush is needed. if c.needsFlush(tx) { - if err := c.flush(); err != nil { + if err := c.flush(tx); err != nil { return err } @@ -646,7 +667,7 @@ func (c *dbCache) commitTx(tx *transaction) error { // This function MUST be called with the database write lock held. func (c *dbCache) Close() error { // Flush any outstanding cached entries to disk. - if err := c.flush(); err != nil { + if err := c.flush(nil); err != nil { // Even if there is an error while flushing, attempt to close // the underlying database. The error is ignored since it would // mask the flush error. diff --git a/database/ffldb/whitebox_test.go b/database/ffldb/whitebox_test.go index 31a8ab34..47ac6b74 100644 --- a/database/ffldb/whitebox_test.go +++ b/database/ffldb/whitebox_test.go @@ -319,7 +319,7 @@ func testWriteFailures(tc *testContext) bool { file: &mockFile{forceSyncErr: true, maxSize: -1}, } store.writeCursor.Unlock() - err := tc.db.(*db).cache.flush() + err := tc.db.(*db).cache.flush(nil) if !checkDbError(tc.t, testName, err, database.ErrDriverSpecific) { return false } diff --git a/database/internal/treap/common.go b/database/internal/treap/common.go index 3ab25045..8df86872 100644 --- a/database/internal/treap/common.go +++ b/database/internal/treap/common.go @@ -24,7 +24,7 @@ const ( // size in that case is acceptable since it avoids the need to import // unsafe. It consists of 24-bytes for each key and value + 8 bytes for // each of the priority, left, and right fields (24*2 + 8*3). - nodeFieldsSize = 80 + nodeFieldsSize = 96 ) var ( @@ -35,10 +35,12 @@ var ( ) const ( - // Generation number for nodes in a Mutable treap. - MutableGeneration int = -1 - // Generation number for nodes in the free Pool. - PoolGeneration int = -2 + // mutableGeneration is the generation number for nodes in a Mutable treap. + mutableGeneration int = -1 + // recycleGeneration indicates node is scheduled for recycling back to nodePool. + recycleGeneration int = -2 + // poolGeneration is the generation number for free nodes in the nodePool. + poolGeneration int = -3 ) // treapNode represents a node in the treap. @@ -49,6 +51,7 @@ type treapNode struct { left *treapNode right *treapNode generation int + next *treapNode } // nodeSize returns the number of bytes the specified node occupies including @@ -57,15 +60,15 @@ func nodeSize(node *treapNode) uint64 { return nodeFieldsSize + uint64(len(node.key)+len(node.value)) } -// Pool of treapNode available for reuse. -var nodePool = &sync.Pool{ - New: func() interface{} { - return &treapNode{key: nil, value: nil, priority: 0, generation: PoolGeneration} - }, +func poolNewTreapNode() interface{} { + return &treapNode{key: nil, value: nil, priority: 0, generation: poolGeneration} } -// getTreapNode returns a new node from the given key, value, and priority. The -// node is not initially linked to any others. +// Pool of treapNode available for reuse. +var nodePool sync.Pool = sync.Pool{New: poolNewTreapNode} + +// getTreapNode returns a node from nodePool with the given key, value, priority, +// and generation. The node is not initially linked to any others. func getTreapNode(key, value []byte, priority int, generation int) *treapNode { n := nodePool.Get().(*treapNode) n.key = key @@ -74,17 +77,22 @@ func getTreapNode(key, value []byte, priority int, generation int) *treapNode { n.left = nil n.right = nil n.generation = generation + n.next = nil return n } -// Put treapNode back in the nodePool for reuse. +// putTreapNode returns a node back to nodePool for reuse. func putTreapNode(n *treapNode) { + if n.generation <= poolGeneration { + panic("double free of treapNode detected") + } n.key = nil n.value = nil n.priority = 0 n.left = nil n.right = nil - n.generation = PoolGeneration + n.generation = poolGeneration + n.next = nil nodePool.Put(n) } diff --git a/database/internal/treap/common_test.go b/database/internal/treap/common_test.go index c43e678d..6cd1363b 100644 --- a/database/internal/treap/common_test.go +++ b/database/internal/treap/common_test.go @@ -49,7 +49,7 @@ testLoop: for j := 0; j < test.numNodes; j++ { var key [4]byte binary.BigEndian.PutUint32(key[:], uint32(j)) - node := newTreapNode(key[:], key[:], 0) + node := getTreapNode(key[:], key[:], 0, 0) nodes = append(nodes, node) } diff --git a/database/internal/treap/immutable.go b/database/internal/treap/immutable.go index a9d59fb3..be58a8af 100644 --- a/database/internal/treap/immutable.go +++ b/database/internal/treap/immutable.go @@ -48,11 +48,11 @@ type Immutable struct { // snap is a pointer to a node in snapshot history linked list. // A value nil means no snapshots are outstanding. - snap *SnapRecord + snap **SnapRecord } // newImmutable returns a new immutable treap given the passed parameters. -func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap *SnapRecord) *Immutable { +func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap **SnapRecord) *Immutable { return &Immutable{root: root, count: count, totalSize: totalSize, generation: generation, snap: snap} } @@ -111,7 +111,7 @@ func (t *Immutable) Get(key []byte) []byte { } // put inserts the passed key/value pair. -func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old parentStack) { +func (t *Immutable) put(key, value []byte) (tp *Immutable, old parentStack) { // Use an empty byte slice for the value when none was provided. This // ultimately allows key existence to be determined from the value since // an empty byte slice is distinguishable from nil. @@ -121,8 +121,8 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare // The node is the root of the tree if there isn't already one. if t.root == nil { - root := getTreapNode(key, value, rand.Int(), t.generation+bumpGen) - return newImmutable(root, 1, nodeSize(root), t.generation+bumpGen, t.snap), parentStack{} + root := getTreapNode(key, value, rand.Int(), t.generation+1) + return newImmutable(root, 1, nodeSize(root), t.generation+1, t.snap), parentStack{} } // Find the binary tree insertion point and construct a replaced list of @@ -169,11 +169,11 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare newRoot := parents.At(parents.Len() - 1) newTotalSize := t.totalSize - uint64(len(node.value)) + uint64(len(value)) - return newImmutable(newRoot, t.count, newTotalSize, t.generation+bumpGen, t.snap), oldParents + return newImmutable(newRoot, t.count, newTotalSize, t.generation+1, t.snap), oldParents } // Link the new node into the binary tree in the correct position. - node := getTreapNode(key, value, rand.Int(), t.generation+bumpGen) + node := getTreapNode(key, value, rand.Int(), t.generation+1) parent := parents.At(0) if compareResult < 0 { parent.left = node @@ -213,20 +213,21 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare } } - return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+bumpGen, t.snap), oldParents + return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+1, t.snap), oldParents } -// Put is the immutable variant of put. Generation number is bumped, and old -// nodes become garbage unless referenced elswhere. +// Put is the immutable variant of put. Old nodes become garbage unless referenced elswhere. func (t *Immutable) Put(key, value []byte) *Immutable { - tp, _ := t.put(key, value, 1) + tp, _ := t.put(key, value) return tp } -// PutM is the mutable variant of put. Generation number is NOT bumped, and old -// nodes are recycled if possible. This is only safe/useful in scenarios where -// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases -// of the intermediate treap states are created or desired. For example: +// PutM is the mutable variant of put. Old nodes are recycled if possible. This is +// only safe in structured scenarios using SnapRecord to track treap instances. +// The outstanding SnapRecords serve to protect nodes from recycling when they might +// be present in one or more snapshots. This is useful in scenarios where multiple +// Put/Delete() ops are applied to a treap and intermediate treap states are not +// created or desired. For example: // // for i := range keys { // t = t.Put(keys[i]) @@ -242,15 +243,20 @@ func (t *Immutable) Put(key, value []byte) *Immutable { // snapshot records. // func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) { - tp, old := (*dest).put(key, value, 0) + tp, old := (*dest).put(key, value) // Examine old nodes and recycle if possible. snapRecordMutex.Lock() defer snapRecordMutex.Unlock() - snapCount := (*dest).snapCount(excluded) + snapCount, maxSnap, minSnap := (*dest).snapCount(nil) for old.Len() > 0 { node := old.Pop() - if node.generation == tp.generation && snapCount == 0 { + if snapCount == 0 || node.generation > maxSnap.generation { putTreapNode(node) + } else { + // Defer recycle until Release() on oldest snap (minSnap). + node.generation = recycleGeneration + node.next = minSnap.recycle + minSnap.recycle = node } } *dest = tp @@ -259,7 +265,7 @@ func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) { // del removes the passed key from the treap and returns the resulting treap // if it exists. The original immutable treap is returned if the key does not // exist. -func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) { +func (t *Immutable) del(key []byte) (d *Immutable, old parentStack) { // Find the node for the key while constructing a list of parents while // doing so. var oldParents parentStack @@ -293,7 +299,7 @@ func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) // being deleted, there is nothing else to do besides removing it. parent := oldParents.At(1) if parent == nil && delNode.left == nil && delNode.right == nil { - return newImmutable(nil, 0, 0, t.generation+bumpGen, t.snap), oldParents + return newImmutable(nil, 0, 0, t.generation+1, t.snap), oldParents } // Construct a replaced list of parents and the node to delete itself. @@ -374,20 +380,21 @@ func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) parent.left = nil } - return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+bumpGen, t.snap), oldParents + return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+1, t.snap), oldParents } -// Delete is the immutable variant of del. Generation number is bumped, and old -// nodes become garbage unless referenced elswhere. +// Delete is the immutable variant of del. Old nodes become garbage unless referenced elswhere. func (t *Immutable) Delete(key []byte) *Immutable { - tp, _ := t.del(key, 1) + tp, _ := t.del(key) return tp } -// DeleteM is the mutable variant of del. Generation number is NOT bumped, and old -// nodes are recycled if possible. This is only safe/useful in scenarios where -// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases -// of the intermediate treap states are created or desired. For example: +// DeleteM is the mutable variant of del. Old nodes are recycled if possible. This is +// only safe in structured scenarios using SnapRecord to track treap instances. +// The outstanding SnapRecords serve to protect nodes from recycling when they might +// be present in one or more snapshots. This is useful in scenarios where multiple +// Put/Delete() ops are applied to a treap and intermediate treap states are not +// created or desired. For example: // // for i := range keys { // t = t.Delete(keys[i]) @@ -403,15 +410,20 @@ func (t *Immutable) Delete(key []byte) *Immutable { // snapshot records. // func DeleteM(dest **Immutable, key []byte, excluded *SnapRecord) { - tp, old := (*dest).del(key, 0) + tp, old := (*dest).del(key) // Examine old nodes and recycle if possible. snapRecordMutex.Lock() defer snapRecordMutex.Unlock() - snapCount := (*dest).snapCount(excluded) + snapCount, maxSnap, minSnap := (*dest).snapCount(nil) for old.Len() > 0 { node := old.Pop() - if node.generation == tp.generation && snapCount == 0 { + if snapCount == 0 || node.generation > maxSnap.generation { putTreapNode(node) + } else { + // Defer recycle until Release() on oldest snap (minSnap). + node.generation = recycleGeneration + node.next = minSnap.recycle + minSnap.recycle = node } } *dest = tp @@ -447,31 +459,45 @@ func NewImmutable() *Immutable { return &Immutable{} } -// SnapRecord assists in tracking/releasing outstanding snapshots. +// SnapRecord assists in tracking outstanding snapshots. While a SnapRecord +// is present and has not been Released(), treap nodes at or below this +// generation are protected from Recycle(). type SnapRecord struct { - prev *SnapRecord - next *SnapRecord + generation int + rp **SnapRecord + prev *SnapRecord + next *SnapRecord + recycle *treapNode } var snapRecordMutex sync.Mutex -// Snapshot makes a SnapRecord and linkis it into the snapshot history of a treap. +// Snapshot makes a SnapRecord and links it into the snapshot history of a treap. func (t *Immutable) Snapshot() *SnapRecord { snapRecordMutex.Lock() defer snapRecordMutex.Unlock() - // Link this record so it follows the existing t.snap record, if any. - prev := t.snap + rp := t.snap var next *SnapRecord = nil - if prev != nil { - next = prev.next - } - t.snap = &SnapRecord{prev: prev, next: next} - if prev != nil { - prev.next = t.snap + var prev *SnapRecord = nil + if rp != nil { + prev = *rp + if *rp != nil { + next = (*rp).next + } } - return t.snap + // Create a new record stamped with the current generation. Link it + // following the existing snapshot record, if any. + p := new(*SnapRecord) + *p = &SnapRecord{generation: t.generation, rp: p, prev: prev, next: next} + t.snap = p + + if rp != nil && *rp != nil { + (*rp).next = *(t.snap) + } + + return *(t.snap) } // Release of SnapRecord unlinks that record from the snapshot history of a treap. @@ -480,45 +506,73 @@ func (r *SnapRecord) Release() { defer snapRecordMutex.Unlock() // Unlink this record. - if r.prev != nil { - r.prev.next = r.next - } + *(r.rp) = nil if r.next != nil { r.next.prev = r.prev + *(r.rp) = r.next + } + if r.prev != nil { + r.prev.next = r.next + *(r.rp) = r.prev + } + + // Handle deferred recycle list. + for node := r.recycle; node != nil; { + next := node.next + putTreapNode(node) + node = next } } // snapCount returns the number of snapshots outstanding which were created // but not released. When snapshots are absent, mutable PutM()/DeleteM() can -// recycle nodes more aggressively. The record "exclude" is not counted. -func (t *Immutable) snapCount(exclude *SnapRecord) int { +// recycle nodes more aggressively. The record "excluded" is not counted. +func (t *Immutable) snapCount(excluded *SnapRecord) (count int, maxSnap, minSnap *SnapRecord) { // snapRecordMutex should be locked already - sum := 0 - if t.snap == nil { + count, maxSnap, minSnap = 0, nil, nil + if t.snap == nil || *(t.snap) == nil { // No snapshots. - return sum + return count, maxSnap, minSnap } // Count snapshots taken BEFORE creation of this instance. - for h := t.snap; h != nil; h = h.prev { - if h != exclude { - sum++ + for h := *(t.snap); h != nil; h = h.prev { + if h != excluded { + count++ + if maxSnap == nil || maxSnap.generation < h.generation { + maxSnap = h + } + if minSnap == nil || minSnap.generation > h.generation { + minSnap = h + } } } // Count snapshots taken AFTER creation of this instance. - for h := t.snap.next; h != nil; h = h.next { - if h != exclude { - sum++ + for h := (*(t.snap)).next; h != nil; h = h.next { + if h != excluded { + count++ + if maxSnap == nil || maxSnap.generation < h.generation { + maxSnap = h + } + if minSnap == nil || minSnap.generation > h.generation { + minSnap = h + } } } - return sum + return count, maxSnap, minSnap } -func (t *Immutable) Recycle() { - snapCount := t.snapCount(nil) - 1 +func (t *Immutable) Recycle(excluded *SnapRecord) { + snapRecordMutex.Lock() + _, maxSnap, _ := t.snapCount(excluded) + snapGen := 0 + if maxSnap != nil { + snapGen = maxSnap.generation + } + snapRecordMutex.Unlock() var parents parentStack for node := t.root; node != nil; node = node.left { @@ -534,7 +588,10 @@ func (t *Immutable) Recycle() { parents.Push(n) } - if node.generation == t.generation && snapCount == 0 { + // Recycle node if it cannot be in a snapshot. Note that nodes + // scheduled for deferred recycling will have negative generation + // (recycleGeneration) and will not qualify. + if node.generation > snapGen { putTreapNode(node) } } diff --git a/database/internal/treap/mutable.go b/database/internal/treap/mutable.go index 1ded6808..3289d318 100644 --- a/database/internal/treap/mutable.go +++ b/database/internal/treap/mutable.go @@ -21,6 +21,10 @@ type Mutable struct { // totalSize is the best estimate of the total size of of all data in // the treap including the keys, values, and node sizes. totalSize uint64 + + // generation number is the constant mutableGeneration, unless + // creation of a treap.Iterator bumps it. + generation int } // Len returns the number of items stored in the treap. @@ -113,7 +117,7 @@ func (t *Mutable) Put(key, value []byte) { // The node is the root of the tree if there isn't already one. if t.root == nil { - node := getTreapNode(key, value, rand.Int(), MutableGeneration) + node := getTreapNode(key, value, rand.Int(), t.generation) t.count = 1 t.totalSize = nodeSize(node) t.root = node @@ -145,7 +149,7 @@ func (t *Mutable) Put(key, value []byte) { } // Link the new node into the binary tree in the correct position. - node := getTreapNode(key, value, rand.Int(), MutableGeneration) + node := getTreapNode(key, value, rand.Int(), t.generation) t.count++ t.totalSize += nodeSize(node) parent := parents.At(0) @@ -190,7 +194,9 @@ func (t *Mutable) Delete(key []byte) { t.root = nil t.count = 0 t.totalSize = 0 - putTreapNode(node) + if node.generation == t.generation && node.generation == mutableGeneration { + putTreapNode(node) + } return } @@ -239,7 +245,9 @@ func (t *Mutable) Delete(key []byte) { } t.count-- t.totalSize -= nodeSize(node) - putTreapNode(node) + if node.generation == t.generation && node.generation == mutableGeneration { + putTreapNode(node) + } } // ForEach invokes the passed function with every key/value pair in the treap @@ -276,7 +284,7 @@ func (t *Mutable) Reset() { // NewMutable returns a new empty mutable treap ready for use. See the // documentation for the Mutable structure for more details. func NewMutable() *Mutable { - return &Mutable{} + return &Mutable{generation: mutableGeneration} } func (t *Mutable) Recycle() { @@ -294,7 +302,7 @@ func (t *Mutable) Recycle() { parents.Push(n) } - if node.generation == MutableGeneration { + if node.generation == t.generation && node.generation == mutableGeneration { putTreapNode(node) } } diff --git a/database/internal/treap/treapiter.go b/database/internal/treap/treapiter.go index d6981aaf..81028ed7 100644 --- a/database/internal/treap/treapiter.go +++ b/database/internal/treap/treapiter.go @@ -326,6 +326,7 @@ func (iter *Iterator) ForceReseek() { // } // } func (t *Mutable) Iterator(startKey, limitKey []byte) *Iterator { + t.generation++ iter := &Iterator{ t: t, root: t.root, -- 2.45.2