Compare commits

..

12 commits

Author SHA1 Message Date
Alex Grintsvayg
4f264cc4f1
update module name 2020-11-03 15:37:56 -05:00
Alex Grintsvayg
818c60532e
add Size() fn 2020-11-03 15:35:11 -05:00
Karl Seguin
1189f7f993 make Clear thread-safe 2020-08-16 21:12:47 +08:00
Karl Seguin
839a17bedb Remove impossible race conditions from test
This makes the output of go test --race less noisy.
2020-08-16 19:07:52 +08:00
Karl Seguin
0dbf3f125f add TrackingSet to LayeredCache 2020-08-14 11:15:13 +08:00
Karl Seguin
f3b2b9fd88
Merge pull request #48 from sargun/master
Add TrackingSet method
2020-08-14 11:03:30 +08:00
Karl Seguin
aa0e37ad6f
Merge pull request #47 from bep/type-deletefunc
Use typed *Item in DeleteFunc
2020-08-14 10:56:02 +08:00
Sargun Dhillon
df91803297 Add TrackingSet method
This method reduces the likelihood of a race condition where
you can add a (tracked) item to the cache, and the item isn't
the item you thought it was.
2020-08-13 10:43:38 -07:00
Bjørn Erik Pedersen
a42bd4a9c8
Use typed *Item in DeleteFunc 2020-08-13 16:10:22 +02:00
Karl Seguin
e9b7be5016 remove race condition 2020-08-13 15:49:28 +08:00
Karl Seguin
fdd08e71c4
Merge pull request #46 from bep/document-deletefunc
Document DeleteFunc
2020-08-13 15:47:33 +08:00
Bjørn Erik Pedersen
992cd9564b
Document DeleteFunc 2020-08-13 09:05:27 +02:00
14 changed files with 115 additions and 75 deletions

View file

@ -23,9 +23,9 @@ func (b *bucket) get(key string) *Item {
return b.lookup[key]
}
func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, *Item) {
func (b *bucket) set(key string, value interface{}, duration time.Duration, track bool) (*Item, *Item) {
expires := time.Now().Add(duration).UnixNano()
item := newItem(key, value, expires)
item := newItem(key, value, expires, track)
b.Lock()
existing := b.lookup[key]
b.lookup[key] = item
@ -54,14 +54,9 @@ func (b *bucket) delete(key string) *Item {
// the item from the map. I'm pretty sure this is 100% fine, but it is unique.
// (We do this so that the write to the channel is under the read lock and not the
// write lock)
func (b *bucket) deleteFunc(matches func(key string, item interface{}) bool, deletables chan *Item) int {
func (b *bucket) deleteFunc(matches func(key string, item *Item) bool, deletables chan *Item) int {
lookup := b.lookup
b.RLock()
l := len(lookup)
b.RUnlock()
items := make([]*Item, 0, l/10)
items := make([]*Item, 0)
b.RLock()
for key, item := range lookup {
@ -86,7 +81,7 @@ func (b *bucket) deleteFunc(matches func(key string, item interface{}) bool, del
}
func (b *bucket) deletePrefix(prefix string, deletables chan *Item) int {
return b.deleteFunc(func(key string, item interface{}) bool {
return b.deleteFunc(func(key string, item *Item) bool {
return strings.HasPrefix(key, prefix)
}, deletables)
}

View file

@ -1,9 +1,10 @@
package ccache
import (
. "github.com/karlseguin/expect"
"testing"
"time"
. "github.com/karlseguin/expect"
)
type BucketTests struct {
@ -32,7 +33,7 @@ func (_ *BucketTests) DeleteItemFromBucket() {
func (_ *BucketTests) SetsANewBucketItem() {
bucket := testBucket()
item, existing := bucket.set("spice", TestValue("flow"), time.Minute)
item, existing := bucket.set("spice", TestValue("flow"), time.Minute, false)
assertValue(item, "flow")
item = bucket.get("spice")
assertValue(item, "flow")
@ -41,7 +42,7 @@ func (_ *BucketTests) SetsANewBucketItem() {
func (_ *BucketTests) SetsAnExistingItem() {
bucket := testBucket()
item, existing := bucket.set("power", TestValue("9001"), time.Minute)
item, existing := bucket.set("power", TestValue("9001"), time.Minute, false)
assertValue(item, "9001")
item = bucket.get("power")
assertValue(item, "9001")

View file

@ -17,6 +17,10 @@ type setMaxSize struct {
size int64
}
type clear struct {
done chan struct{}
}
type Cache struct {
*Configuration
list *list.List
@ -55,6 +59,10 @@ func (c *Cache) ItemCount() int {
return count
}
func (c *Cache) Size() int64 {
return c.size
}
func (c *Cache) DeletePrefix(prefix string) int {
count := 0
for _, b := range c.buckets {
@ -64,7 +72,7 @@ func (c *Cache) DeletePrefix(prefix string) int {
}
// Deletes all items that the matches func evaluates to true.
func (c *Cache) DeleteFunc(matches func(key string, item interface{}) bool) int {
func (c *Cache) DeleteFunc(matches func(key string, item *Item) bool) int {
count := 0
for _, b := range c.buckets {
count += b.deleteFunc(matches, c.deletables)
@ -98,9 +106,15 @@ func (c *Cache) TrackingGet(key string) TrackedItem {
return item
}
// Used when the cache was created with the Track() configuration option.
// Sets the item, and returns a tracked reference to it.
func (c *Cache) TrackingSet(key string, value interface{}, duration time.Duration) TrackedItem {
return c.set(key, value, duration, true)
}
// Set the value in the cache for the specified duration
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
c.set(key, value, duration)
c.set(key, value, duration, false)
}
// Replace the value if it exists, does not set if it doesn't.
@ -127,7 +141,7 @@ func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interfac
if err != nil {
return nil, err
}
return c.set(key, value, duration), nil
return c.set(key, value, duration, false), nil
}
// Remove the item from the cache, return true if the item was present, false otherwise.
@ -140,13 +154,11 @@ func (c *Cache) Delete(key string) bool {
return false
}
//this isn't thread safe. It's meant to be called from non-concurrent tests
// Clears the cache
func (c *Cache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
done := make(chan struct{})
c.control <- clear{done: done}
<-done
}
// Stops the background worker. Operations performed on the cache after Stop
@ -182,8 +194,8 @@ func (c *Cache) deleteItem(bucket *bucket, item *Item) {
c.deletables <- item
}
func (c *Cache) set(key string, value interface{}, duration time.Duration) *Item {
item, existing := c.bucket(key).set(key, value, duration)
func (c *Cache) set(key string, value interface{}, duration time.Duration, track bool) *Item {
item, existing := c.bucket(key).set(key, value, duration, track)
if existing != nil {
c.deletables <- existing
}
@ -225,6 +237,13 @@ func (c *Cache) worker() {
if c.size > c.maxSize {
dropped += c.gc()
}
case clear:
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
msg.done <- struct{}{}
}
}
}

View file

@ -2,6 +2,7 @@ package ccache
import (
"strconv"
"sync/atomic"
"testing"
"time"
@ -63,17 +64,17 @@ func (_ CacheTests) DeletesAFunc() {
cache.Set("f", 6, time.Minute)
Expect(cache.ItemCount()).To.Equal(6)
Expect(cache.DeleteFunc(func(key string, item interface{}) bool {
Expect(cache.DeleteFunc(func(key string, item *Item) bool {
return false
})).To.Equal(0)
Expect(cache.ItemCount()).To.Equal(6)
Expect(cache.DeleteFunc(func(key string, item interface{}) bool {
return item.(*Item).Value().(int) < 4
Expect(cache.DeleteFunc(func(key string, item *Item) bool {
return item.Value().(int) < 4
})).To.Equal(3)
Expect(cache.ItemCount()).To.Equal(3)
Expect(cache.DeleteFunc(func(key string, item interface{}) bool {
Expect(cache.DeleteFunc(func(key string, item *Item) bool {
return key == "d"
})).To.Equal(1)
Expect(cache.ItemCount()).To.Equal(2)
@ -81,10 +82,10 @@ func (_ CacheTests) DeletesAFunc() {
}
func (_ CacheTests) OnDeleteCallbackCalled() {
onDeleteFnCalled := false
onDeleteFnCalled := int32(0)
onDeleteFn := func(item *Item) {
if item.key == "spice" {
onDeleteFnCalled = true
atomic.AddInt32(&onDeleteFnCalled, 1)
}
}
@ -98,7 +99,7 @@ func (_ CacheTests) OnDeleteCallbackCalled() {
Expect(cache.Get("spice")).To.Equal(nil)
Expect(cache.Get("worm").Value()).To.Equal("sand")
Expect(onDeleteFnCalled).To.Equal(true)
Expect(atomic.LoadInt32(&onDeleteFnCalled)).To.Eql(1)
}
func (_ CacheTests) FetchesExpiredItems() {
@ -140,18 +141,21 @@ func (_ CacheTests) PromotedItemsDontGetPruned() {
}
func (_ CacheTests) TrackerDoesNotCleanupHeldInstance() {
cache := New(Configure().ItemsToPrune(10).Track())
for i := 0; i < 10; i++ {
cache := New(Configure().ItemsToPrune(11).Track())
item0 := cache.TrackingSet("0", 0, time.Minute)
for i := 1; i < 11; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
item := cache.TrackingGet("0")
item1 := cache.TrackingGet("1")
time.Sleep(time.Millisecond * 10)
gcCache(cache)
Expect(cache.Get("0").Value()).To.Equal(0)
Expect(cache.Get("1")).To.Equal(nil)
item.Release()
Expect(cache.Get("1").Value()).To.Equal(1)
item0.Release()
item1.Release()
gcCache(cache)
Expect(cache.Get("0")).To.Equal(nil)
Expect(cache.Get("1")).To.Equal(nil)
}
func (_ CacheTests) RemovesOldestItemWhenFull() {

View file

@ -1,8 +1,9 @@
package ccache
import (
. "github.com/karlseguin/expect"
"testing"
. "github.com/karlseguin/expect"
)
type ConfigurationTests struct{}

4
go.mod
View file

@ -1,8 +1,8 @@
module github.com/karlseguin/ccache/v2
module github.com/lbryio/ccache/v2
go 1.13
require (
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 // indirect
)

2
go.sum
View file

@ -1,5 +1,3 @@
github.com/karlseguin/expect v1.0.1 h1:z4wy4npwwHSWKjGWH85WNJO42VQhovxTCZDSzhjo8hY=
github.com/karlseguin/expect v1.0.1/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003 h1:vJ0Snvo+SLMY72r5J4sEfkuE7AFbixEP2qRbEcum/wA=
github.com/karlseguin/expect v1.0.2-0.20190806010014-778a5f0c6003/go.mod h1:zNBxMY8P21owkeogJELCLeHIt+voOSduHYTFUbwRAV8=
github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 h1:3UeQBvD0TFrlVjOeLOBz+CPAI8dnbqNSVwUwRrkp7vQ=

View file

@ -52,18 +52,22 @@ type Item struct {
element *list.Element
}
func newItem(key string, value interface{}, expires int64) *Item {
func newItem(key string, value interface{}, expires int64, track bool) *Item {
size := int64(1)
if sized, ok := value.(Sized); ok {
size = sized.Size()
}
return &Item{
item := &Item{
key: key,
value: value,
promotions: 0,
size: size,
expires: expires,
}
if track {
item.refCount = 1
}
return item
}
func (i *Item) shouldPromote(getsPerPromote int32) bool {

View file

@ -38,7 +38,7 @@ func (b *layeredBucket) getSecondaryBucket(primary string) *bucket {
return bucket
}
func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, *Item) {
func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration, track bool) (*Item, *Item) {
b.Lock()
bkt, exists := b.buckets[primary]
if exists == false {
@ -46,7 +46,7 @@ func (b *layeredBucket) set(primary, secondary string, value interface{}, durati
b.buckets[primary] = bkt
}
b.Unlock()
item, existing := bkt.set(secondary, value, duration)
item, existing := bkt.set(secondary, value, duration, track)
item.group = primary
return item, existing
}
@ -71,7 +71,7 @@ func (b *layeredBucket) deletePrefix(primary, prefix string, deletables chan *It
return bucket.deletePrefix(prefix, deletables)
}
func (b *layeredBucket) deleteFunc(primary string, matches func(key string, item interface{}) bool, deletables chan *Item) int {
func (b *layeredBucket) deleteFunc(primary string, matches func(key string, item *Item) bool, deletables chan *Item) int {
b.RLock()
bucket, exists := b.buckets[primary]
b.RUnlock()

View file

@ -102,9 +102,14 @@ func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
return item
}
// Set the value in the cache for the specified duration
func (c *LayeredCache) TrackingSet(primary, secondary string, value interface{}, duration time.Duration) TrackedItem {
return c.set(primary, secondary, value, duration, true)
}
// Set the value in the cache for the specified duration
func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
c.set(primary, secondary, value, duration)
c.set(primary, secondary, value, duration, false)
}
// Replace the value if it exists, does not set if it doesn't.
@ -131,7 +136,7 @@ func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration,
if err != nil {
return nil, err
}
return c.set(primary, secondary, value, duration), nil
return c.set(primary, secondary, value, duration, false), nil
}
// Remove the item from the cache, return true if the item was present, false otherwise.
@ -155,17 +160,15 @@ func (c *LayeredCache) DeletePrefix(primary, prefix string) int {
}
// Deletes all items that share the same primary key and where the matches func evaluates to true.
func (c *LayeredCache) DeleteFunc(primary string, matches func(key string, item interface{}) bool) int {
func (c *LayeredCache) DeleteFunc(primary string, matches func(key string, item *Item) bool) int {
return c.bucket(primary).deleteFunc(primary, matches, c.deletables)
}
//this isn't thread safe. It's meant to be called from non-concurrent tests
// Clears the cache
func (c *LayeredCache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
done := make(chan struct{})
c.control <- clear{done: done}
<-done
}
func (c *LayeredCache) Stop() {
@ -193,8 +196,8 @@ func (c *LayeredCache) restart() {
go c.worker()
}
func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item {
item, existing := c.bucket(primary).set(primary, secondary, value, duration)
func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration, track bool) *Item {
item, existing := c.bucket(primary).set(primary, secondary, value, duration, track)
if existing != nil {
c.deletables <- existing
}
@ -244,6 +247,13 @@ func (c *LayeredCache) worker() {
if c.size > c.maxSize {
dropped += c.gc()
}
case clear:
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
msg.done <- struct{}{}
}
}
}

View file

@ -2,6 +2,7 @@ package ccache
import (
"strconv"
"sync/atomic"
"testing"
"time"
@ -107,17 +108,17 @@ func (_ *LayeredCacheTests) DeletesAFunc() {
cache.Set("spice", "f", 6, time.Minute)
Expect(cache.ItemCount()).To.Equal(6)
Expect(cache.DeleteFunc("spice", func(key string, item interface{}) bool {
Expect(cache.DeleteFunc("spice", func(key string, item *Item) bool {
return false
})).To.Equal(0)
Expect(cache.ItemCount()).To.Equal(6)
Expect(cache.DeleteFunc("spice", func(key string, item interface{}) bool {
return item.(*Item).Value().(int) < 4
Expect(cache.DeleteFunc("spice", func(key string, item *Item) bool {
return item.Value().(int) < 4
})).To.Equal(2)
Expect(cache.ItemCount()).To.Equal(4)
Expect(cache.DeleteFunc("spice", func(key string, item interface{}) bool {
Expect(cache.DeleteFunc("spice", func(key string, item *Item) bool {
return key == "d"
})).To.Equal(1)
Expect(cache.ItemCount()).To.Equal(3)
@ -125,12 +126,10 @@ func (_ *LayeredCacheTests) DeletesAFunc() {
}
func (_ *LayeredCacheTests) OnDeleteCallbackCalled() {
onDeleteFnCalled := false
onDeleteFnCalled := int32(0)
onDeleteFn := func(item *Item) {
if item.group == "spice" && item.key == "flow" {
onDeleteFnCalled = true
atomic.AddInt32(&onDeleteFnCalled, 1)
}
}
@ -148,7 +147,7 @@ func (_ *LayeredCacheTests) OnDeleteCallbackCalled() {
Expect(cache.Get("spice", "worm")).To.Equal(nil)
Expect(cache.Get("leto", "sister").Value()).To.Equal("ghanima")
Expect(onDeleteFnCalled).To.Equal(true)
Expect(atomic.LoadInt32(&onDeleteFnCalled)).To.Eql(1)
}
func (_ *LayeredCacheTests) DeletesALayer() {
@ -196,17 +195,20 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() {
func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() {
cache := Layered(Configure().ItemsToPrune(10).Track())
for i := 0; i < 10; i++ {
item0 := cache.TrackingSet("0", "a", 0, time.Minute)
for i := 1; i < 11; i++ {
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
}
item := cache.TrackingGet("0", "a")
item1 := cache.TrackingGet("1", "a")
time.Sleep(time.Millisecond * 10)
gcLayeredCache(cache)
Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a")).To.Equal(nil)
item.Release()
Expect(cache.Get("1", "a").Value()).To.Equal(1)
item0.Release()
item1.Release()
gcLayeredCache(cache)
Expect(cache.Get("0", "a")).To.Equal(nil)
Expect(cache.Get("1", "a")).To.Equal(nil)
}
func (_ LayeredCacheTests) RemovesOldestItemWhenFull() {
@ -260,7 +262,9 @@ func (_ LayeredCacheTests) ResizeOnTheFly() {
}
func newLayered() *LayeredCache {
return Layered(Configure())
c := Layered(Configure())
c.Clear()
return c
}
func (_ LayeredCacheTests) RemovesOldestItemWhenFullBySizer() {

View file

@ -96,8 +96,11 @@ cache.Delete("user:4")
### DeletePrefix
`DeletePrefix` deletes all keys matching the provided prefix. Returns the number of keys removed.
### DeleteFunc
`DeleteFunc` deletes all items that the provded matches func evaluates to true. Returns the number of keys removed.
### Clear
`Clear` clears the cache. This method is **not** thread safe. It is meant to be used from tests.
`Clear` clears the cache. If the cache's gc is running, `Clear` waits for it to finish.
### Extend
The life of an item can be changed via the `Extend` method. This will change the expiry of the item by the specified duration relative to the current time.
@ -140,7 +143,7 @@ user := item.Value() //will be nil if "user:4" didn't exist in the cache
item.Release() //can be called even if item.Value() returned nil
```
In practice, `Release` wouldn't be called until later, at some other place in your code.
In practice, `Release` wouldn't be called until later, at some other place in your code. `TrackingSet` can be used to set a value to be tracked.
There's a couple reason to use the tracking mode if other parts of your code also hold references to objects. First, if you're already going to hold a reference to these objects, there's really no reason not to have them in the cache - the memory is used up anyways.

View file

@ -16,7 +16,7 @@ func (s *SecondaryCache) Get(secondary string) *Item {
// Set the secondary key to a value.
// The semantics are the same as for LayeredCache.Set
func (s *SecondaryCache) Set(secondary string, value interface{}, duration time.Duration) *Item {
item, existing := s.bucket.set(secondary, value, duration)
item, existing := s.bucket.set(secondary, value, duration, false)
if existing != nil {
s.pCache.deletables <- existing
}

View file

@ -1,10 +1,11 @@
package ccache
import (
. "github.com/karlseguin/expect"
"strconv"
"testing"
"time"
. "github.com/karlseguin/expect"
)
type SecondaryCacheTests struct{}