Fix lots of flaws in treap snapshot tracking. Update tests to make them work.
This commit is contained in:
parent
0ab37e1541
commit
d2bc36bc8c
7 changed files with 191 additions and 96 deletions
|
@ -290,6 +290,7 @@ type dbCacheSnapshot struct {
|
||||||
pendingRemove *treap.Immutable
|
pendingRemove *treap.Immutable
|
||||||
pendingKeysSnap *treap.SnapRecord
|
pendingKeysSnap *treap.SnapRecord
|
||||||
pendingRemoveSnap *treap.SnapRecord
|
pendingRemoveSnap *treap.SnapRecord
|
||||||
|
cacheFlushed int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Has returns whether or not the passed key exists.
|
// Has returns whether or not the passed key exists.
|
||||||
|
@ -329,8 +330,18 @@ func (snap *dbCacheSnapshot) Get(key []byte) []byte {
|
||||||
// Release releases the snapshot.
|
// Release releases the snapshot.
|
||||||
func (snap *dbCacheSnapshot) Release() {
|
func (snap *dbCacheSnapshot) Release() {
|
||||||
snap.dbSnapshot.Release()
|
snap.dbSnapshot.Release()
|
||||||
|
if snap.cacheFlushed > 0 && snap.pendingKeys != nil {
|
||||||
|
snap.pendingKeys.Recycle(snap.pendingKeysSnap)
|
||||||
|
}
|
||||||
|
if snap.cacheFlushed > 0 && snap.pendingRemove != nil {
|
||||||
|
snap.pendingRemove.Recycle(snap.pendingRemoveSnap)
|
||||||
|
}
|
||||||
|
if snap.pendingKeysSnap != nil {
|
||||||
snap.pendingKeysSnap.Release()
|
snap.pendingKeysSnap.Release()
|
||||||
|
}
|
||||||
|
if snap.pendingRemoveSnap != nil {
|
||||||
snap.pendingRemoveSnap.Release()
|
snap.pendingRemoveSnap.Release()
|
||||||
|
}
|
||||||
snap.pendingKeys = nil
|
snap.pendingKeys = nil
|
||||||
snap.pendingRemove = nil
|
snap.pendingRemove = nil
|
||||||
}
|
}
|
||||||
|
@ -414,8 +425,12 @@ func (c *dbCache) Snapshot() (*dbCacheSnapshot, error) {
|
||||||
dbSnapshot: dbSnapshot,
|
dbSnapshot: dbSnapshot,
|
||||||
pendingKeys: c.cachedKeys,
|
pendingKeys: c.cachedKeys,
|
||||||
pendingRemove: c.cachedRemove,
|
pendingRemove: c.cachedRemove,
|
||||||
pendingKeysSnap: c.cachedKeys.Snapshot(),
|
}
|
||||||
pendingRemoveSnap: c.cachedRemove.Snapshot(),
|
if cacheSnapshot.pendingKeys != nil {
|
||||||
|
cacheSnapshot.pendingKeysSnap = cacheSnapshot.pendingKeys.Snapshot()
|
||||||
|
}
|
||||||
|
if cacheSnapshot.pendingRemove != nil {
|
||||||
|
cacheSnapshot.pendingRemoveSnap = cacheSnapshot.pendingRemove.Snapshot()
|
||||||
}
|
}
|
||||||
c.cacheLock.RUnlock()
|
c.cacheLock.RUnlock()
|
||||||
return cacheSnapshot, nil
|
return cacheSnapshot, nil
|
||||||
|
@ -491,7 +506,7 @@ func (c *dbCache) commitTreaps(pendingKeys, pendingRemove TreapForEacher) error
|
||||||
// cache to the underlying database.
|
// cache to the underlying database.
|
||||||
//
|
//
|
||||||
// This function MUST be called with the database write lock held.
|
// This function MUST be called with the database write lock held.
|
||||||
func (c *dbCache) flush() error {
|
func (c *dbCache) flush(tx *transaction) error {
|
||||||
c.lastFlush = time.Now()
|
c.lastFlush = time.Now()
|
||||||
|
|
||||||
// Sync the current write file associated with the block store. This is
|
// Sync the current write file associated with the block store. This is
|
||||||
|
@ -525,8 +540,14 @@ func (c *dbCache) flush() error {
|
||||||
c.cachedRemove = treap.NewImmutable()
|
c.cachedRemove = treap.NewImmutable()
|
||||||
c.cacheLock.Unlock()
|
c.cacheLock.Unlock()
|
||||||
|
|
||||||
cachedKeys.Recycle()
|
cachedKeys.Recycle(nil)
|
||||||
cachedRemove.Recycle()
|
cachedRemove.Recycle(nil)
|
||||||
|
|
||||||
|
// Make a note that cache was flushed so tx.snapshot.Release()
|
||||||
|
// can also call Recycle() to free more nodes.
|
||||||
|
if tx != nil && tx.snapshot != nil {
|
||||||
|
tx.snapshot.cacheFlushed++
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -576,7 +597,7 @@ func (c *dbCache) commitTx(tx *transaction) error {
|
||||||
// Flush the cache and write the current transaction directly to the
|
// Flush the cache and write the current transaction directly to the
|
||||||
// database if a flush is needed.
|
// database if a flush is needed.
|
||||||
if c.needsFlush(tx) {
|
if c.needsFlush(tx) {
|
||||||
if err := c.flush(); err != nil {
|
if err := c.flush(tx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -646,7 +667,7 @@ func (c *dbCache) commitTx(tx *transaction) error {
|
||||||
// This function MUST be called with the database write lock held.
|
// This function MUST be called with the database write lock held.
|
||||||
func (c *dbCache) Close() error {
|
func (c *dbCache) Close() error {
|
||||||
// Flush any outstanding cached entries to disk.
|
// Flush any outstanding cached entries to disk.
|
||||||
if err := c.flush(); err != nil {
|
if err := c.flush(nil); err != nil {
|
||||||
// Even if there is an error while flushing, attempt to close
|
// Even if there is an error while flushing, attempt to close
|
||||||
// the underlying database. The error is ignored since it would
|
// the underlying database. The error is ignored since it would
|
||||||
// mask the flush error.
|
// mask the flush error.
|
||||||
|
|
|
@ -319,7 +319,7 @@ func testWriteFailures(tc *testContext) bool {
|
||||||
file: &mockFile{forceSyncErr: true, maxSize: -1},
|
file: &mockFile{forceSyncErr: true, maxSize: -1},
|
||||||
}
|
}
|
||||||
store.writeCursor.Unlock()
|
store.writeCursor.Unlock()
|
||||||
err := tc.db.(*db).cache.flush()
|
err := tc.db.(*db).cache.flush(nil)
|
||||||
if !checkDbError(tc.t, testName, err, database.ErrDriverSpecific) {
|
if !checkDbError(tc.t, testName, err, database.ErrDriverSpecific) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ const (
|
||||||
// size in that case is acceptable since it avoids the need to import
|
// size in that case is acceptable since it avoids the need to import
|
||||||
// unsafe. It consists of 24-bytes for each key and value + 8 bytes for
|
// unsafe. It consists of 24-bytes for each key and value + 8 bytes for
|
||||||
// each of the priority, left, and right fields (24*2 + 8*3).
|
// each of the priority, left, and right fields (24*2 + 8*3).
|
||||||
nodeFieldsSize = 80
|
nodeFieldsSize = 96
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -35,10 +35,12 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Generation number for nodes in a Mutable treap.
|
// mutableGeneration is the generation number for nodes in a Mutable treap.
|
||||||
MutableGeneration int = -1
|
mutableGeneration int = -1
|
||||||
// Generation number for nodes in the free Pool.
|
// recycleGeneration indicates node is scheduled for recycling back to nodePool.
|
||||||
PoolGeneration int = -2
|
recycleGeneration int = -2
|
||||||
|
// poolGeneration is the generation number for free nodes in the nodePool.
|
||||||
|
poolGeneration int = -3
|
||||||
)
|
)
|
||||||
|
|
||||||
// treapNode represents a node in the treap.
|
// treapNode represents a node in the treap.
|
||||||
|
@ -49,6 +51,7 @@ type treapNode struct {
|
||||||
left *treapNode
|
left *treapNode
|
||||||
right *treapNode
|
right *treapNode
|
||||||
generation int
|
generation int
|
||||||
|
next *treapNode
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeSize returns the number of bytes the specified node occupies including
|
// nodeSize returns the number of bytes the specified node occupies including
|
||||||
|
@ -57,15 +60,15 @@ func nodeSize(node *treapNode) uint64 {
|
||||||
return nodeFieldsSize + uint64(len(node.key)+len(node.value))
|
return nodeFieldsSize + uint64(len(node.key)+len(node.value))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pool of treapNode available for reuse.
|
func poolNewTreapNode() interface{} {
|
||||||
var nodePool = &sync.Pool{
|
return &treapNode{key: nil, value: nil, priority: 0, generation: poolGeneration}
|
||||||
New: func() interface{} {
|
|
||||||
return &treapNode{key: nil, value: nil, priority: 0, generation: PoolGeneration}
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTreapNode returns a new node from the given key, value, and priority. The
|
// Pool of treapNode available for reuse.
|
||||||
// node is not initially linked to any others.
|
var nodePool sync.Pool = sync.Pool{New: poolNewTreapNode}
|
||||||
|
|
||||||
|
// getTreapNode returns a node from nodePool with the given key, value, priority,
|
||||||
|
// and generation. The node is not initially linked to any others.
|
||||||
func getTreapNode(key, value []byte, priority int, generation int) *treapNode {
|
func getTreapNode(key, value []byte, priority int, generation int) *treapNode {
|
||||||
n := nodePool.Get().(*treapNode)
|
n := nodePool.Get().(*treapNode)
|
||||||
n.key = key
|
n.key = key
|
||||||
|
@ -74,17 +77,22 @@ func getTreapNode(key, value []byte, priority int, generation int) *treapNode {
|
||||||
n.left = nil
|
n.left = nil
|
||||||
n.right = nil
|
n.right = nil
|
||||||
n.generation = generation
|
n.generation = generation
|
||||||
|
n.next = nil
|
||||||
return n
|
return n
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put treapNode back in the nodePool for reuse.
|
// putTreapNode returns a node back to nodePool for reuse.
|
||||||
func putTreapNode(n *treapNode) {
|
func putTreapNode(n *treapNode) {
|
||||||
|
if n.generation <= poolGeneration {
|
||||||
|
panic("double free of treapNode detected")
|
||||||
|
}
|
||||||
n.key = nil
|
n.key = nil
|
||||||
n.value = nil
|
n.value = nil
|
||||||
n.priority = 0
|
n.priority = 0
|
||||||
n.left = nil
|
n.left = nil
|
||||||
n.right = nil
|
n.right = nil
|
||||||
n.generation = PoolGeneration
|
n.generation = poolGeneration
|
||||||
|
n.next = nil
|
||||||
nodePool.Put(n)
|
nodePool.Put(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ testLoop:
|
||||||
for j := 0; j < test.numNodes; j++ {
|
for j := 0; j < test.numNodes; j++ {
|
||||||
var key [4]byte
|
var key [4]byte
|
||||||
binary.BigEndian.PutUint32(key[:], uint32(j))
|
binary.BigEndian.PutUint32(key[:], uint32(j))
|
||||||
node := newTreapNode(key[:], key[:], 0)
|
node := getTreapNode(key[:], key[:], 0, 0)
|
||||||
nodes = append(nodes, node)
|
nodes = append(nodes, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,11 +48,11 @@ type Immutable struct {
|
||||||
|
|
||||||
// snap is a pointer to a node in snapshot history linked list.
|
// snap is a pointer to a node in snapshot history linked list.
|
||||||
// A value nil means no snapshots are outstanding.
|
// A value nil means no snapshots are outstanding.
|
||||||
snap *SnapRecord
|
snap **SnapRecord
|
||||||
}
|
}
|
||||||
|
|
||||||
// newImmutable returns a new immutable treap given the passed parameters.
|
// newImmutable returns a new immutable treap given the passed parameters.
|
||||||
func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap *SnapRecord) *Immutable {
|
func newImmutable(root *treapNode, count int, totalSize uint64, generation int, snap **SnapRecord) *Immutable {
|
||||||
return &Immutable{root: root, count: count, totalSize: totalSize, generation: generation, snap: snap}
|
return &Immutable{root: root, count: count, totalSize: totalSize, generation: generation, snap: snap}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +111,7 @@ func (t *Immutable) Get(key []byte) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
// put inserts the passed key/value pair.
|
// put inserts the passed key/value pair.
|
||||||
func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old parentStack) {
|
func (t *Immutable) put(key, value []byte) (tp *Immutable, old parentStack) {
|
||||||
// Use an empty byte slice for the value when none was provided. This
|
// Use an empty byte slice for the value when none was provided. This
|
||||||
// ultimately allows key existence to be determined from the value since
|
// ultimately allows key existence to be determined from the value since
|
||||||
// an empty byte slice is distinguishable from nil.
|
// an empty byte slice is distinguishable from nil.
|
||||||
|
@ -121,8 +121,8 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare
|
||||||
|
|
||||||
// The node is the root of the tree if there isn't already one.
|
// The node is the root of the tree if there isn't already one.
|
||||||
if t.root == nil {
|
if t.root == nil {
|
||||||
root := getTreapNode(key, value, rand.Int(), t.generation+bumpGen)
|
root := getTreapNode(key, value, rand.Int(), t.generation+1)
|
||||||
return newImmutable(root, 1, nodeSize(root), t.generation+bumpGen, t.snap), parentStack{}
|
return newImmutable(root, 1, nodeSize(root), t.generation+1, t.snap), parentStack{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the binary tree insertion point and construct a replaced list of
|
// Find the binary tree insertion point and construct a replaced list of
|
||||||
|
@ -169,11 +169,11 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare
|
||||||
newRoot := parents.At(parents.Len() - 1)
|
newRoot := parents.At(parents.Len() - 1)
|
||||||
newTotalSize := t.totalSize - uint64(len(node.value)) +
|
newTotalSize := t.totalSize - uint64(len(node.value)) +
|
||||||
uint64(len(value))
|
uint64(len(value))
|
||||||
return newImmutable(newRoot, t.count, newTotalSize, t.generation+bumpGen, t.snap), oldParents
|
return newImmutable(newRoot, t.count, newTotalSize, t.generation+1, t.snap), oldParents
|
||||||
}
|
}
|
||||||
|
|
||||||
// Link the new node into the binary tree in the correct position.
|
// Link the new node into the binary tree in the correct position.
|
||||||
node := getTreapNode(key, value, rand.Int(), t.generation+bumpGen)
|
node := getTreapNode(key, value, rand.Int(), t.generation+1)
|
||||||
parent := parents.At(0)
|
parent := parents.At(0)
|
||||||
if compareResult < 0 {
|
if compareResult < 0 {
|
||||||
parent.left = node
|
parent.left = node
|
||||||
|
@ -213,20 +213,21 @@ func (t *Immutable) put(key, value []byte, bumpGen int) (tp *Immutable, old pare
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+bumpGen, t.snap), oldParents
|
return newImmutable(newRoot, t.count+1, t.totalSize+nodeSize(node), t.generation+1, t.snap), oldParents
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put is the immutable variant of put. Generation number is bumped, and old
|
// Put is the immutable variant of put. Old nodes become garbage unless referenced elswhere.
|
||||||
// nodes become garbage unless referenced elswhere.
|
|
||||||
func (t *Immutable) Put(key, value []byte) *Immutable {
|
func (t *Immutable) Put(key, value []byte) *Immutable {
|
||||||
tp, _ := t.put(key, value, 1)
|
tp, _ := t.put(key, value)
|
||||||
return tp
|
return tp
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutM is the mutable variant of put. Generation number is NOT bumped, and old
|
// PutM is the mutable variant of put. Old nodes are recycled if possible. This is
|
||||||
// nodes are recycled if possible. This is only safe/useful in scenarios where
|
// only safe in structured scenarios using SnapRecord to track treap instances.
|
||||||
// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases
|
// The outstanding SnapRecords serve to protect nodes from recycling when they might
|
||||||
// of the intermediate treap states are created or desired. For example:
|
// be present in one or more snapshots. This is useful in scenarios where multiple
|
||||||
|
// Put/Delete() ops are applied to a treap and intermediate treap states are not
|
||||||
|
// created or desired. For example:
|
||||||
//
|
//
|
||||||
// for i := range keys {
|
// for i := range keys {
|
||||||
// t = t.Put(keys[i])
|
// t = t.Put(keys[i])
|
||||||
|
@ -242,15 +243,20 @@ func (t *Immutable) Put(key, value []byte) *Immutable {
|
||||||
// snapshot records.
|
// snapshot records.
|
||||||
//
|
//
|
||||||
func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) {
|
func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) {
|
||||||
tp, old := (*dest).put(key, value, 0)
|
tp, old := (*dest).put(key, value)
|
||||||
// Examine old nodes and recycle if possible.
|
// Examine old nodes and recycle if possible.
|
||||||
snapRecordMutex.Lock()
|
snapRecordMutex.Lock()
|
||||||
defer snapRecordMutex.Unlock()
|
defer snapRecordMutex.Unlock()
|
||||||
snapCount := (*dest).snapCount(excluded)
|
snapCount, maxSnap, minSnap := (*dest).snapCount(nil)
|
||||||
for old.Len() > 0 {
|
for old.Len() > 0 {
|
||||||
node := old.Pop()
|
node := old.Pop()
|
||||||
if node.generation == tp.generation && snapCount == 0 {
|
if snapCount == 0 || node.generation > maxSnap.generation {
|
||||||
putTreapNode(node)
|
putTreapNode(node)
|
||||||
|
} else {
|
||||||
|
// Defer recycle until Release() on oldest snap (minSnap).
|
||||||
|
node.generation = recycleGeneration
|
||||||
|
node.next = minSnap.recycle
|
||||||
|
minSnap.recycle = node
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*dest = tp
|
*dest = tp
|
||||||
|
@ -259,7 +265,7 @@ func PutM(dest **Immutable, key, value []byte, excluded *SnapRecord) {
|
||||||
// del removes the passed key from the treap and returns the resulting treap
|
// del removes the passed key from the treap and returns the resulting treap
|
||||||
// if it exists. The original immutable treap is returned if the key does not
|
// if it exists. The original immutable treap is returned if the key does not
|
||||||
// exist.
|
// exist.
|
||||||
func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack) {
|
func (t *Immutable) del(key []byte) (d *Immutable, old parentStack) {
|
||||||
// Find the node for the key while constructing a list of parents while
|
// Find the node for the key while constructing a list of parents while
|
||||||
// doing so.
|
// doing so.
|
||||||
var oldParents parentStack
|
var oldParents parentStack
|
||||||
|
@ -293,7 +299,7 @@ func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack)
|
||||||
// being deleted, there is nothing else to do besides removing it.
|
// being deleted, there is nothing else to do besides removing it.
|
||||||
parent := oldParents.At(1)
|
parent := oldParents.At(1)
|
||||||
if parent == nil && delNode.left == nil && delNode.right == nil {
|
if parent == nil && delNode.left == nil && delNode.right == nil {
|
||||||
return newImmutable(nil, 0, 0, t.generation+bumpGen, t.snap), oldParents
|
return newImmutable(nil, 0, 0, t.generation+1, t.snap), oldParents
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct a replaced list of parents and the node to delete itself.
|
// Construct a replaced list of parents and the node to delete itself.
|
||||||
|
@ -374,20 +380,21 @@ func (t *Immutable) del(key []byte, bumpGen int) (d *Immutable, old parentStack)
|
||||||
parent.left = nil
|
parent.left = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+bumpGen, t.snap), oldParents
|
return newImmutable(newRoot, t.count-1, t.totalSize-nodeSize(delNode), t.generation+1, t.snap), oldParents
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete is the immutable variant of del. Generation number is bumped, and old
|
// Delete is the immutable variant of del. Old nodes become garbage unless referenced elswhere.
|
||||||
// nodes become garbage unless referenced elswhere.
|
|
||||||
func (t *Immutable) Delete(key []byte) *Immutable {
|
func (t *Immutable) Delete(key []byte) *Immutable {
|
||||||
tp, _ := t.del(key, 1)
|
tp, _ := t.del(key)
|
||||||
return tp
|
return tp
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteM is the mutable variant of del. Generation number is NOT bumped, and old
|
// DeleteM is the mutable variant of del. Old nodes are recycled if possible. This is
|
||||||
// nodes are recycled if possible. This is only safe/useful in scenarios where
|
// only safe in structured scenarios using SnapRecord to track treap instances.
|
||||||
// multiple Put/Delete() ops are applied to a unique treap and no snapshots/aliases
|
// The outstanding SnapRecords serve to protect nodes from recycling when they might
|
||||||
// of the intermediate treap states are created or desired. For example:
|
// be present in one or more snapshots. This is useful in scenarios where multiple
|
||||||
|
// Put/Delete() ops are applied to a treap and intermediate treap states are not
|
||||||
|
// created or desired. For example:
|
||||||
//
|
//
|
||||||
// for i := range keys {
|
// for i := range keys {
|
||||||
// t = t.Delete(keys[i])
|
// t = t.Delete(keys[i])
|
||||||
|
@ -403,15 +410,20 @@ func (t *Immutable) Delete(key []byte) *Immutable {
|
||||||
// snapshot records.
|
// snapshot records.
|
||||||
//
|
//
|
||||||
func DeleteM(dest **Immutable, key []byte, excluded *SnapRecord) {
|
func DeleteM(dest **Immutable, key []byte, excluded *SnapRecord) {
|
||||||
tp, old := (*dest).del(key, 0)
|
tp, old := (*dest).del(key)
|
||||||
// Examine old nodes and recycle if possible.
|
// Examine old nodes and recycle if possible.
|
||||||
snapRecordMutex.Lock()
|
snapRecordMutex.Lock()
|
||||||
defer snapRecordMutex.Unlock()
|
defer snapRecordMutex.Unlock()
|
||||||
snapCount := (*dest).snapCount(excluded)
|
snapCount, maxSnap, minSnap := (*dest).snapCount(nil)
|
||||||
for old.Len() > 0 {
|
for old.Len() > 0 {
|
||||||
node := old.Pop()
|
node := old.Pop()
|
||||||
if node.generation == tp.generation && snapCount == 0 {
|
if snapCount == 0 || node.generation > maxSnap.generation {
|
||||||
putTreapNode(node)
|
putTreapNode(node)
|
||||||
|
} else {
|
||||||
|
// Defer recycle until Release() on oldest snap (minSnap).
|
||||||
|
node.generation = recycleGeneration
|
||||||
|
node.next = minSnap.recycle
|
||||||
|
minSnap.recycle = node
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*dest = tp
|
*dest = tp
|
||||||
|
@ -447,31 +459,45 @@ func NewImmutable() *Immutable {
|
||||||
return &Immutable{}
|
return &Immutable{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SnapRecord assists in tracking/releasing outstanding snapshots.
|
// SnapRecord assists in tracking outstanding snapshots. While a SnapRecord
|
||||||
|
// is present and has not been Released(), treap nodes at or below this
|
||||||
|
// generation are protected from Recycle().
|
||||||
type SnapRecord struct {
|
type SnapRecord struct {
|
||||||
|
generation int
|
||||||
|
rp **SnapRecord
|
||||||
prev *SnapRecord
|
prev *SnapRecord
|
||||||
next *SnapRecord
|
next *SnapRecord
|
||||||
|
recycle *treapNode
|
||||||
}
|
}
|
||||||
|
|
||||||
var snapRecordMutex sync.Mutex
|
var snapRecordMutex sync.Mutex
|
||||||
|
|
||||||
// Snapshot makes a SnapRecord and linkis it into the snapshot history of a treap.
|
// Snapshot makes a SnapRecord and links it into the snapshot history of a treap.
|
||||||
func (t *Immutable) Snapshot() *SnapRecord {
|
func (t *Immutable) Snapshot() *SnapRecord {
|
||||||
snapRecordMutex.Lock()
|
snapRecordMutex.Lock()
|
||||||
defer snapRecordMutex.Unlock()
|
defer snapRecordMutex.Unlock()
|
||||||
|
|
||||||
// Link this record so it follows the existing t.snap record, if any.
|
rp := t.snap
|
||||||
prev := t.snap
|
|
||||||
var next *SnapRecord = nil
|
var next *SnapRecord = nil
|
||||||
if prev != nil {
|
var prev *SnapRecord = nil
|
||||||
next = prev.next
|
if rp != nil {
|
||||||
|
prev = *rp
|
||||||
|
if *rp != nil {
|
||||||
|
next = (*rp).next
|
||||||
}
|
}
|
||||||
t.snap = &SnapRecord{prev: prev, next: next}
|
|
||||||
if prev != nil {
|
|
||||||
prev.next = t.snap
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return t.snap
|
// Create a new record stamped with the current generation. Link it
|
||||||
|
// following the existing snapshot record, if any.
|
||||||
|
p := new(*SnapRecord)
|
||||||
|
*p = &SnapRecord{generation: t.generation, rp: p, prev: prev, next: next}
|
||||||
|
t.snap = p
|
||||||
|
|
||||||
|
if rp != nil && *rp != nil {
|
||||||
|
(*rp).next = *(t.snap)
|
||||||
|
}
|
||||||
|
|
||||||
|
return *(t.snap)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release of SnapRecord unlinks that record from the snapshot history of a treap.
|
// Release of SnapRecord unlinks that record from the snapshot history of a treap.
|
||||||
|
@ -480,45 +506,73 @@ func (r *SnapRecord) Release() {
|
||||||
defer snapRecordMutex.Unlock()
|
defer snapRecordMutex.Unlock()
|
||||||
|
|
||||||
// Unlink this record.
|
// Unlink this record.
|
||||||
if r.prev != nil {
|
*(r.rp) = nil
|
||||||
r.prev.next = r.next
|
|
||||||
}
|
|
||||||
if r.next != nil {
|
if r.next != nil {
|
||||||
r.next.prev = r.prev
|
r.next.prev = r.prev
|
||||||
|
*(r.rp) = r.next
|
||||||
|
}
|
||||||
|
if r.prev != nil {
|
||||||
|
r.prev.next = r.next
|
||||||
|
*(r.rp) = r.prev
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle deferred recycle list.
|
||||||
|
for node := r.recycle; node != nil; {
|
||||||
|
next := node.next
|
||||||
|
putTreapNode(node)
|
||||||
|
node = next
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// snapCount returns the number of snapshots outstanding which were created
|
// snapCount returns the number of snapshots outstanding which were created
|
||||||
// but not released. When snapshots are absent, mutable PutM()/DeleteM() can
|
// but not released. When snapshots are absent, mutable PutM()/DeleteM() can
|
||||||
// recycle nodes more aggressively. The record "exclude" is not counted.
|
// recycle nodes more aggressively. The record "excluded" is not counted.
|
||||||
func (t *Immutable) snapCount(exclude *SnapRecord) int {
|
func (t *Immutable) snapCount(excluded *SnapRecord) (count int, maxSnap, minSnap *SnapRecord) {
|
||||||
// snapRecordMutex should be locked already
|
// snapRecordMutex should be locked already
|
||||||
|
|
||||||
sum := 0
|
count, maxSnap, minSnap = 0, nil, nil
|
||||||
if t.snap == nil {
|
if t.snap == nil || *(t.snap) == nil {
|
||||||
// No snapshots.
|
// No snapshots.
|
||||||
return sum
|
return count, maxSnap, minSnap
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count snapshots taken BEFORE creation of this instance.
|
// Count snapshots taken BEFORE creation of this instance.
|
||||||
for h := t.snap; h != nil; h = h.prev {
|
for h := *(t.snap); h != nil; h = h.prev {
|
||||||
if h != exclude {
|
if h != excluded {
|
||||||
sum++
|
count++
|
||||||
|
if maxSnap == nil || maxSnap.generation < h.generation {
|
||||||
|
maxSnap = h
|
||||||
|
}
|
||||||
|
if minSnap == nil || minSnap.generation > h.generation {
|
||||||
|
minSnap = h
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count snapshots taken AFTER creation of this instance.
|
// Count snapshots taken AFTER creation of this instance.
|
||||||
for h := t.snap.next; h != nil; h = h.next {
|
for h := (*(t.snap)).next; h != nil; h = h.next {
|
||||||
if h != exclude {
|
if h != excluded {
|
||||||
sum++
|
count++
|
||||||
|
if maxSnap == nil || maxSnap.generation < h.generation {
|
||||||
|
maxSnap = h
|
||||||
|
}
|
||||||
|
if minSnap == nil || minSnap.generation > h.generation {
|
||||||
|
minSnap = h
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return sum
|
return count, maxSnap, minSnap
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Immutable) Recycle() {
|
func (t *Immutable) Recycle(excluded *SnapRecord) {
|
||||||
snapCount := t.snapCount(nil) - 1
|
snapRecordMutex.Lock()
|
||||||
|
_, maxSnap, _ := t.snapCount(excluded)
|
||||||
|
snapGen := 0
|
||||||
|
if maxSnap != nil {
|
||||||
|
snapGen = maxSnap.generation
|
||||||
|
}
|
||||||
|
snapRecordMutex.Unlock()
|
||||||
|
|
||||||
var parents parentStack
|
var parents parentStack
|
||||||
for node := t.root; node != nil; node = node.left {
|
for node := t.root; node != nil; node = node.left {
|
||||||
|
@ -534,7 +588,10 @@ func (t *Immutable) Recycle() {
|
||||||
parents.Push(n)
|
parents.Push(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
if node.generation == t.generation && snapCount == 0 {
|
// Recycle node if it cannot be in a snapshot. Note that nodes
|
||||||
|
// scheduled for deferred recycling will have negative generation
|
||||||
|
// (recycleGeneration) and will not qualify.
|
||||||
|
if node.generation > snapGen {
|
||||||
putTreapNode(node)
|
putTreapNode(node)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,10 @@ type Mutable struct {
|
||||||
// totalSize is the best estimate of the total size of of all data in
|
// totalSize is the best estimate of the total size of of all data in
|
||||||
// the treap including the keys, values, and node sizes.
|
// the treap including the keys, values, and node sizes.
|
||||||
totalSize uint64
|
totalSize uint64
|
||||||
|
|
||||||
|
// generation number is the constant mutableGeneration, unless
|
||||||
|
// creation of a treap.Iterator bumps it.
|
||||||
|
generation int
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the number of items stored in the treap.
|
// Len returns the number of items stored in the treap.
|
||||||
|
@ -113,7 +117,7 @@ func (t *Mutable) Put(key, value []byte) {
|
||||||
|
|
||||||
// The node is the root of the tree if there isn't already one.
|
// The node is the root of the tree if there isn't already one.
|
||||||
if t.root == nil {
|
if t.root == nil {
|
||||||
node := getTreapNode(key, value, rand.Int(), MutableGeneration)
|
node := getTreapNode(key, value, rand.Int(), t.generation)
|
||||||
t.count = 1
|
t.count = 1
|
||||||
t.totalSize = nodeSize(node)
|
t.totalSize = nodeSize(node)
|
||||||
t.root = node
|
t.root = node
|
||||||
|
@ -145,7 +149,7 @@ func (t *Mutable) Put(key, value []byte) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Link the new node into the binary tree in the correct position.
|
// Link the new node into the binary tree in the correct position.
|
||||||
node := getTreapNode(key, value, rand.Int(), MutableGeneration)
|
node := getTreapNode(key, value, rand.Int(), t.generation)
|
||||||
t.count++
|
t.count++
|
||||||
t.totalSize += nodeSize(node)
|
t.totalSize += nodeSize(node)
|
||||||
parent := parents.At(0)
|
parent := parents.At(0)
|
||||||
|
@ -190,7 +194,9 @@ func (t *Mutable) Delete(key []byte) {
|
||||||
t.root = nil
|
t.root = nil
|
||||||
t.count = 0
|
t.count = 0
|
||||||
t.totalSize = 0
|
t.totalSize = 0
|
||||||
|
if node.generation == t.generation && node.generation == mutableGeneration {
|
||||||
putTreapNode(node)
|
putTreapNode(node)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +245,9 @@ func (t *Mutable) Delete(key []byte) {
|
||||||
}
|
}
|
||||||
t.count--
|
t.count--
|
||||||
t.totalSize -= nodeSize(node)
|
t.totalSize -= nodeSize(node)
|
||||||
|
if node.generation == t.generation && node.generation == mutableGeneration {
|
||||||
putTreapNode(node)
|
putTreapNode(node)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForEach invokes the passed function with every key/value pair in the treap
|
// ForEach invokes the passed function with every key/value pair in the treap
|
||||||
|
@ -276,7 +284,7 @@ func (t *Mutable) Reset() {
|
||||||
// NewMutable returns a new empty mutable treap ready for use. See the
|
// NewMutable returns a new empty mutable treap ready for use. See the
|
||||||
// documentation for the Mutable structure for more details.
|
// documentation for the Mutable structure for more details.
|
||||||
func NewMutable() *Mutable {
|
func NewMutable() *Mutable {
|
||||||
return &Mutable{}
|
return &Mutable{generation: mutableGeneration}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Mutable) Recycle() {
|
func (t *Mutable) Recycle() {
|
||||||
|
@ -294,7 +302,7 @@ func (t *Mutable) Recycle() {
|
||||||
parents.Push(n)
|
parents.Push(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
if node.generation == MutableGeneration {
|
if node.generation == t.generation && node.generation == mutableGeneration {
|
||||||
putTreapNode(node)
|
putTreapNode(node)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -326,6 +326,7 @@ func (iter *Iterator) ForceReseek() {
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
func (t *Mutable) Iterator(startKey, limitKey []byte) *Iterator {
|
func (t *Mutable) Iterator(startKey, limitKey []byte) *Iterator {
|
||||||
|
t.generation++
|
||||||
iter := &Iterator{
|
iter := &Iterator{
|
||||||
t: t,
|
t: t,
|
||||||
root: t.root,
|
root: t.root,
|
||||||
|
|
Loading…
Reference in a new issue