Distinguish between Mutable/Immutable treap node recycling, and make Mutable

treap recycle nodes consistently. Rework Immutable treap node recycling
attempting to make it safer in the presence of code that takes snapshots
(dbCacheSnapshot) of the treap. Add special mutable PutM and DeleteM
methods which DB transaction can use to apply changes more efficiently
without creating lots of garbage memory.
This commit is contained in:
Jonathan Moody 2022-05-28 07:37:53 -04:00
parent a7a30339cb
commit 0ab37e1541
4 changed files with 263 additions and 67 deletions

View file

@ -285,9 +285,11 @@ func (iter *dbCacheIterator) Error() error {
// dbCacheSnapshot defines a snapshot of the database cache and underlying // dbCacheSnapshot defines a snapshot of the database cache and underlying
// database at a particular point in time. // database at a particular point in time.
type dbCacheSnapshot struct { type dbCacheSnapshot struct {
dbSnapshot *leveldb.Snapshot dbSnapshot *leveldb.Snapshot
pendingKeys *treap.Immutable pendingKeys *treap.Immutable
pendingRemove *treap.Immutable pendingRemove *treap.Immutable
pendingKeysSnap *treap.SnapRecord
pendingRemoveSnap *treap.SnapRecord
} }
// Has returns whether or not the passed key exists. // Has returns whether or not the passed key exists.
@ -327,6 +329,8 @@ func (snap *dbCacheSnapshot) Get(key []byte) []byte {
// Release releases the snapshot. // Release releases the snapshot.
func (snap *dbCacheSnapshot) Release() { func (snap *dbCacheSnapshot) Release() {
snap.dbSnapshot.Release() snap.dbSnapshot.Release()
snap.pendingKeysSnap.Release()
snap.pendingRemoveSnap.Release()
snap.pendingKeys = nil snap.pendingKeys = nil
snap.pendingRemove = nil snap.pendingRemove = nil
} }
@ -407,9 +411,11 @@ func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) {
// which is used to atomically swap the root. // which is used to atomically swap the root.
c.cacheLock.RLock() c.cacheLock.RLock()
cacheSnapshot := &dbCacheSnapshot{ cacheSnapshot := &dbCacheSnapshot{
dbSnapshot: dbSnapshot, dbSnapshot: dbSnapshot,
pendingKeys: c.cachedKeys, pendingKeys: c.cachedKeys,
pendingRemove: c.cachedRemove, pendingRemove: c.cachedRemove,
pendingKeysSnap: c.cachedKeys.Snapshot(),
pendingRemoveSnap: c.cachedRemove.Snapshot(),
} }
c.cacheLock.RUnlock() c.cacheLock.RUnlock()
return cacheSnapshot, nil return cacheSnapshot, nil
@ -499,12 +505,10 @@ func (c *dbCache) flush() error {
// Since the cached keys to be added and removed use an immutable treap, // Since the cached keys to be added and removed use an immutable treap,
// a snapshot is simply obtaining the root of the tree under the lock // a snapshot is simply obtaining the root of the tree under the lock
// which is used to atomically swap the root. // which is used to atomically swap the root.
c.cacheLock.Lock() c.cacheLock.RLock()
cachedKeys := c.cachedKeys cachedKeys := c.cachedKeys
cachedRemove := c.cachedRemove cachedRemove := c.cachedRemove
c.cachedKeys = treap.NewImmutable() c.cacheLock.RUnlock()
c.cachedRemove = treap.NewImmutable()
c.cacheLock.Unlock()
// Nothing to do if there is no data to flush. // Nothing to do if there is no data to flush.
if cachedKeys.Len() == 0 && cachedRemove.Len() == 0 { if cachedKeys.Len() == 0 && cachedRemove.Len() == 0 {
@ -516,6 +520,11 @@ func (c *dbCache) flush() error {
return err return err
} }
c.cacheLock.Lock()
c.cachedKeys = treap.NewImmutable()
c.cachedRemove = treap.NewImmutable()
c.cacheLock.Unlock()
cachedKeys.Recycle() cachedKeys.Recycle()
cachedRemove.Recycle() cachedRemove.Recycle()
@ -603,19 +612,23 @@ func (c *dbCache) commitTx(tx *transaction) error {
// Apply every key to add in the database transaction to the cache. // Apply every key to add in the database transaction to the cache.
tx.pendingKeys.ForEach(func(k, v []byte) bool { tx.pendingKeys.ForEach(func(k, v []byte) bool {
newCachedRemove = newCachedRemove.Delete(k) treap.DeleteM(&newCachedRemove, k, tx.snapshot.pendingRemoveSnap)
newCachedKeys = newCachedKeys.Put(k, v) treap.PutM(&newCachedKeys, k, v, tx.snapshot.pendingKeysSnap)
return true return true
}) })
pk := tx.pendingKeys
tx.pendingKeys = nil tx.pendingKeys = nil
pk.Recycle()
// Apply every key to remove in the database transaction to the cache. // Apply every key to remove in the database transaction to the cache.
tx.pendingRemove.ForEach(func(k, v []byte) bool { tx.pendingRemove.ForEach(func(k, v []byte) bool {
newCachedKeys = newCachedKeys.Delete(k) treap.DeleteM(&newCachedKeys, k, tx.snapshot.pendingKeysSnap)
newCachedRemove = newCachedRemove.Put(k, nil) treap.PutM(&newCachedRemove, k, nil, tx.snapshot.pendingRemoveSnap)
return true return true
}) })
pr := tx.pendingRemove
tx.pendingRemove = nil tx.pendingRemove = nil
pr.Recycle()
// Atomically replace the immutable treaps which hold the cached keys to // Atomically replace the immutable treaps which hold the cached keys to
// add and delete. // add and delete.
@ -623,6 +636,7 @@ func (c *dbCache) commitTx(tx *transaction) error {
c.cachedKeys = newCachedKeys c.cachedKeys = newCachedKeys
c.cachedRemove = newCachedRemove c.cachedRemove = newCachedRemove
c.cacheLock.Unlock() c.cacheLock.Unlock()
return nil return nil
} }

View file

@ -6,6 +6,7 @@ package treap
import ( import (
"math/rand" "math/rand"
"sync"
"time" "time"
) )
@ -23,7 +24,7 @@ const (
// size in that case is acceptable since it avoids the need to import // size in that case is acceptable since it avoids the need to import
// unsafe. It consists of 24-bytes for each key and value + 8 bytes for // unsafe. It consists of 24-bytes for each key and value + 8 bytes for
// each of the priority, left, and right fields (24*2 + 8*3). // each of the priority, left, and right fields (24*2 + 8*3).
nodeFieldsSize = 72 nodeFieldsSize = 80
) )
var ( var (
@ -33,20 +34,21 @@ var (
emptySlice = make([]byte, 0) emptySlice = make([]byte, 0)
) )
const (
// Generation number for nodes in a Mutable treap.
MutableGeneration int = -1
// Generation number for nodes in the free Pool.
PoolGeneration int = -2
)
// treapNode represents a node in the treap. // treapNode represents a node in the treap.
type treapNode struct { type treapNode struct {
key []byte key []byte
value []byte value []byte
priority int priority int
left *treapNode left *treapNode
right *treapNode right *treapNode
} generation int
func (n *treapNode) Reset() {
n.key = nil
n.value = nil
n.left = nil
n.right = nil
} }
// nodeSize returns the number of bytes the specified node occupies including // nodeSize returns the number of bytes the specified node occupies including
@ -55,10 +57,35 @@ func nodeSize(node *treapNode) uint64 {
return nodeFieldsSize + uint64(len(node.key)+len(node.value)) return nodeFieldsSize + uint64(len(node.key)+len(node.value))
} }
// newTreapNode returns a new node from the given key, value, and priority. The // Pool of treapNode available for reuse.
var nodePool = &sync.Pool{
New: func() interface{} {
return &treapNode{key: nil, value: nil, priority: 0, generation: PoolGeneration}
},
}
// getTreapNode returns a new node from the given key, value, and priority. The
// node is not initially linked to any others. // node is not initially linked to any others.
func newTreapNode(key, value []byte, priority int) *treapNode { func getTreapNode(key, value []byte, priority int, generation int) *treapNode {
return &treapNode{key: key, value: value, priority: priority} n := nodePool.Get().(*treapNode)
n.key = key
n.value = value
n.priority = priority
n.left = nil
n.right = nil
n.generation = generation
return n
}
// Put treapNode back in the nodePool for reuse.
func putTreapNode(n *treapNode) {
n.key = nil
n.value = nil
n.priority = 0
n.left = nil
n.right = nil
n.generation = PoolGeneration
nodePool.Put(n)
} }
// parentStack represents a stack of parent treap nodes that are used during // parentStack represents a stack of parent treap nodes that are used during

View file

@ -10,14 +10,9 @@ import (
"sync" "sync"
) )
var nodePool = &sync.Pool{New: func() interface{} { return newTreapNode(nil, nil, 0) }}
// cloneTreapNode returns a shallow copy of the passed node. // cloneTreapNode returns a shallow copy of the passed node.
func cloneTreapNode(node *treapNode) *treapNode { func cloneTreapNode(node *treapNode) *treapNode {
clone := nodePool.Get().(*treapNode) clone := getTreapNode(node.key, node.value, node.priority, node.generation+1)
clone.key = node.key
clone.value = node.value
clone.priority = node.priority
clone.left = node.left clone.left = node.left
clone.right = node.right clone.right = node.right
return clone return clone
@ -46,11 +41,19 @@ type Immutable struct {
// totalSize is the best estimate of the total size of of all data in // totalSize is the best estimate of the total size of of all data in
// the treap including the keys, values, and node sizes. // the treap including the keys, values, and node sizes.
totalSize uint64 totalSize uint64
// generation number starts at 0 after NewImmutable(), and
// is incremented with every Put()/Delete().
generation int
// snap is a pointer to a node in snapshot history linked list.
// A value nil means no snapshots are outstanding.
snap *SnapRecord
} }
// newImmutable returns a new immutable treap given the passed parameters. // newImmutable returns a new immutable treap given the passed parameters.
func newImmutable(root *treapNode, count int, totalSize uint64) *Immutable { func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap *SnapRecord) *Immutable {
return &Immutable{root: root, count: count, totalSize: totalSize} return &Immutable{root: root, count: count, totalSize: totalSize, generation: generation, snap: snap}
} }
// Len returns the number of items stored in the treap. // Len returns the number of items stored in the treap.
@ -107,8 +110,8 @@ func (t *Immutable) Get(key []byte) []byte {
return nil return nil
} }
// Put inserts the passed key/value pair. // put inserts the passed key/value pair.
func (t *Immutable) Put(key, value []byte) *Immutable { func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old parentStack) {
// Use an empty byte slice for the value when none was provided. This // Use an empty byte slice for the value when none was provided. This
// ultimately allows key existence to be determined from the value since // ultimately allows key existence to be determined from the value since
// an empty byte slice is distinguishable from nil. // an empty byte slice is distinguishable from nil.
@ -118,8 +121,8 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
// The node is the root of the tree if there isn't already one. // The node is the root of the tree if there isn't already one.
if t.root == nil { if t.root == nil {
root := newTreapNode(key, value, rand.Int()) root := getTreapNode(key, value, rand.Int(), t.generation+bumpGen)
return newImmutable(root, 1, nodeSize(root)) return newImmutable(root, 1, nodeSize(root), t.generation+bumpGen, t.snap), parentStack{}
} }
// Find the binary tree insertion point and construct a replaced list of // Find the binary tree insertion point and construct a replaced list of
@ -131,9 +134,11 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
// When the key matches an entry already in the treap, replace the node // When the key matches an entry already in the treap, replace the node
// with a new one that has the new value set and return. // with a new one that has the new value set and return.
var parents parentStack var parents parentStack
var oldParents parentStack
var compareResult int var compareResult int
for node := t.root; node != nil; { for node := t.root; node != nil; {
// Clone the node and link its parent to it if needed. // Clone the node and link its parent to it if needed.
oldParents.Push(node)
nodeCopy := cloneTreapNode(node) nodeCopy := cloneTreapNode(node)
if oldParent := parents.At(0); oldParent != nil { if oldParent := parents.At(0); oldParent != nil {
if oldParent.left == node { if oldParent.left == node {
@ -164,14 +169,11 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
newRoot := parents.At(parents.Len() - 1) newRoot := parents.At(parents.Len() - 1)
newTotalSize := t.totalSize - uint64(len(node.value)) + newTotalSize := t.totalSize - uint64(len(node.value)) +
uint64(len(value)) uint64(len(value))
return newImmutable(newRoot, t.count, newTotalSize) return newImmutable(newRoot, t.count, newTotalSize, t.generation+bumpGen, t.snap), oldParents
} }
// Link the new node into the binary tree in the correct position. // Link the new node into the binary tree in the correct position.
node := nodePool.Get().(*treapNode) node := getTreapNode(key, value, rand.Int(), t.generation+bumpGen)
node.key = key
node.value = value
node.priority = rand.Int()
parent := parents.At(0) parent := parents.At(0)
if compareResult < 0 { if compareResult < 0 {
parent.left = node parent.left = node
@ -211,19 +213,59 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
} }
} }
return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node)) return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+bumpGen, t.snap), oldParents
} }
// Delete removes the passed key from the treap and returns the resulting treap // Put is the immutable variant of put. Generation number is bumped, and old
// nodes become garbage unless referenced elswhere.
func (t *Immutable) Put(key, value []byte) *Immutable {
tp, _ := t.put(key, value, 1)
return tp
}
// PutM is the mutable variant of put. Generation number is NOT bumped, and old
// nodes are recycled if possible. This is only safe/useful in scenarios where
// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases
// of the intermediate treap states are created or desired. For example:
//
// for i := range keys {
// t = t.Put(keys[i])
// }
//
// ...may be replaced with:
//
// for i := range keys {
// PutM(t, keys[i], nil)
// }
//
// If "excluded" is provided, that snapshot is ignored when counting
// snapshot records.
//
func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) {
tp, old := (*dest).put(key, value, 0)
// Examine old nodes and recycle if possible.
snapRecordMutex.Lock()
defer snapRecordMutex.Unlock()
snapCount := (*dest).snapCount(excluded)
for old.Len() > 0 {
node := old.Pop()
if node.generation == tp.generation && snapCount == 0 {
putTreapNode(node)
}
}
*dest = tp
}
// del removes the passed key from the treap and returns the resulting treap
// if it exists. The original immutable treap is returned if the key does not // if it exists. The original immutable treap is returned if the key does not
// exist. // exist.
func (t *Immutable) Delete(key []byte) *Immutable { func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) {
// Find the node for the key while constructing a list of parents while // Find the node for the key while constructing a list of parents while
// doing so. // doing so.
var parents parentStack var oldParents parentStack
var delNode *treapNode var delNode *treapNode
for node := t.root; node != nil; { for node := t.root; node != nil; {
parents.Push(node) oldParents.Push(node)
// Traverse left or right depending on the result of the // Traverse left or right depending on the result of the
// comparison. // comparison.
@ -244,14 +286,14 @@ func (t *Immutable) Delete(key []byte) *Immutable {
// There is nothing to do if the key does not exist. // There is nothing to do if the key does not exist.
if delNode == nil { if delNode == nil {
return t return t, parentStack{}
} }
// When the only node in the tree is the root node and it is the one // When the only node in the tree is the root node and it is the one
// being deleted, there is nothing else to do besides removing it. // being deleted, there is nothing else to do besides removing it.
parent := parents.At(1) parent := oldParents.At(1)
if parent == nil && delNode.left == nil && delNode.right == nil { if parent == nil && delNode.left == nil && delNode.right == nil {
return newImmutable(nil, 0, 0) return newImmutable(nil, 0, 0, t.generation+bumpGen, t.snap), oldParents
} }
// Construct a replaced list of parents and the node to delete itself. // Construct a replaced list of parents and the node to delete itself.
@ -259,8 +301,8 @@ func (t *Immutable) Delete(key []byte) *Immutable {
// therefore all ancestors of the node that will be deleted, up to and // therefore all ancestors of the node that will be deleted, up to and
// including the root, need to be replaced. // including the root, need to be replaced.
var newParents parentStack var newParents parentStack
for i := parents.Len(); i > 0; i-- { for i := oldParents.Len(); i > 0; i-- {
node := parents.At(i - 1) node := oldParents.At(i - 1)
nodeCopy := cloneTreapNode(node) nodeCopy := cloneTreapNode(node)
if oldParent := newParents.At(0); oldParent != nil { if oldParent := newParents.At(0); oldParent != nil {
if oldParent.left == node { if oldParent.left == node {
@ -332,7 +374,47 @@ func (t *Immutable) Delete(key []byte) *Immutable {
parent.left = nil parent.left = nil
} }
return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode)) return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+bumpGen, t.snap), oldParents
}
// Delete is the immutable variant of del. Generation number is bumped, and old
// nodes become garbage unless referenced elswhere.
func (t *Immutable) Delete(key []byte) *Immutable {
tp, _ := t.del(key, 1)
return tp
}
// DeleteM is the mutable variant of del. Generation number is NOT bumped, and old
// nodes are recycled if possible. This is only safe/useful in scenarios where
// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases
// of the intermediate treap states are created or desired. For example:
//
// for i := range keys {
// t = t.Delete(keys[i])
// }
//
// ...may be replaced with:
//
// for i := range keys {
// DeleteM(t, keys[i], nil)
// }
//
// If "excluded" is provided, that snapshot is ignored when counting
// snapshot records.
//
func DeleteM(dest **Immutable, key []byte, excluded *SnapRecord) {
tp, old := (*dest).del(key, 0)
// Examine old nodes and recycle if possible.
snapRecordMutex.Lock()
defer snapRecordMutex.Unlock()
snapCount := (*dest).snapCount(excluded)
for old.Len() > 0 {
node := old.Pop()
if node.generation == tp.generation && snapCount == 0 {
putTreapNode(node)
}
}
*dest = tp
} }
// ForEach invokes the passed function with every key/value pair in the treap // ForEach invokes the passed function with every key/value pair in the treap
@ -365,7 +447,79 @@ func NewImmutable() *Immutable {
return &Immutable{} return &Immutable{}
} }
// SnapRecord assists in tracking/releasing outstanding snapshots.
type SnapRecord struct {
prev *SnapRecord
next *SnapRecord
}
var snapRecordMutex sync.Mutex
// Snapshot makes a SnapRecord and linkis it into the snapshot history of a treap.
func (t *Immutable) Snapshot() *SnapRecord {
snapRecordMutex.Lock()
defer snapRecordMutex.Unlock()
// Link this record so it follows the existing t.snap record, if any.
prev := t.snap
var next *SnapRecord = nil
if prev != nil {
next = prev.next
}
t.snap = &SnapRecord{prev: prev, next: next}
if prev != nil {
prev.next = t.snap
}
return t.snap
}
// Release of SnapRecord unlinks that record from the snapshot history of a treap.
func (r *SnapRecord) Release() {
snapRecordMutex.Lock()
defer snapRecordMutex.Unlock()
// Unlink this record.
if r.prev != nil {
r.prev.next = r.next
}
if r.next != nil {
r.next.prev = r.prev
}
}
// snapCount returns the number of snapshots outstanding which were created
// but not released. When snapshots are absent, mutable PutM()/DeleteM() can
// recycle nodes more aggressively. The record "exclude" is not counted.
func (t *Immutable) snapCount(exclude *SnapRecord) int {
// snapRecordMutex should be locked already
sum := 0
if t.snap == nil {
// No snapshots.
return sum
}
// Count snapshots taken BEFORE creation of this instance.
for h := t.snap; h != nil; h = h.prev {
if h != exclude {
sum++
}
}
// Count snapshots taken AFTER creation of this instance.
for h := t.snap.next; h != nil; h = h.next {
if h != exclude {
sum++
}
}
return sum
}
func (t *Immutable) Recycle() { func (t *Immutable) Recycle() {
snapCount := t.snapCount(nil) - 1
var parents parentStack var parents parentStack
for node := t.root; node != nil; node = node.left { for node := t.root; node != nil; node = node.left {
parents.Push(node) parents.Push(node)
@ -380,7 +534,8 @@ func (t *Immutable) Recycle() {
parents.Push(n) parents.Push(n)
} }
node.Reset() if node.generation == t.generation && snapCount == 0 {
nodePool.Put(node) putTreapNode(node)
}
} }
} }

View file

@ -113,7 +113,7 @@ func (t *Mutable) Put(key, value []byte) {
// The node is the root of the tree if there isn't already one. // The node is the root of the tree if there isn't already one.
if t.root == nil { if t.root == nil {
node := newTreapNode(key, value, rand.Int()) node := getTreapNode(key, value, rand.Int(), MutableGeneration)
t.count = 1 t.count = 1
t.totalSize = nodeSize(node) t.totalSize = nodeSize(node)
t.root = node t.root = node
@ -145,10 +145,7 @@ func (t *Mutable) Put(key, value []byte) {
} }
// Link the new node into the binary tree in the correct position. // Link the new node into the binary tree in the correct position.
node := nodePool.Get().(*treapNode) node := getTreapNode(key, value, rand.Int(), MutableGeneration)
node.key = key
node.value = value
node.priority = rand.Int()
t.count++ t.count++
t.totalSize += nodeSize(node) t.totalSize += nodeSize(node)
parent := parents.At(0) parent := parents.At(0)
@ -193,6 +190,7 @@ func (t *Mutable) Delete(key []byte) {
t.root = nil t.root = nil
t.count = 0 t.count = 0
t.totalSize = 0 t.totalSize = 0
putTreapNode(node)
return return
} }
@ -241,6 +239,7 @@ func (t *Mutable) Delete(key []byte) {
} }
t.count-- t.count--
t.totalSize -= nodeSize(node) t.totalSize -= nodeSize(node)
putTreapNode(node)
} }
// ForEach invokes the passed function with every key/value pair in the treap // ForEach invokes the passed function with every key/value pair in the treap
@ -295,7 +294,8 @@ func (t *Mutable) Recycle() {
parents.Push(n) parents.Push(n)
} }
node.Reset() if node.generation == MutableGeneration {
nodePool.Put(node) putTreapNode(node)
}
} }
} }