The cache can now do reference counting so that the LRU algorithm is aware of

long-lived objects and won't clean them up. Oftentimes, the value returned
from a cache hit is short-lived. As a silly example:

	func GetUser(http.responseWrite) {
		user := cache.Get("user:1")
		response.Write(serialize(user))
	}

It's fine if the cache's GC cleans up "user:1" while the user variable has a reference to the
object..the cache's reference is removed and the real GC will clean it up
at some point after the user variable falls out of scope.

However, what if user is long-lived? Possibly stored as a reference to another
cached object? Normally (without this commit) the next time you call
cache.Get("user:1"), you'll get a miss and will need to refetch the object; even
though the original user object is still somewhere in memory - you just lost
your reference to it from the cache.

By enabling the Track() configuration flag, and calling TrackingGet() (instead
of Get), the cache will track that the object is in-use and won't GC it (even
if there's great memory pressure (what's the point? something else is holding on
to it anyways). Calling item.Release() will decrement the number of references.
When the count is 0, the item can be pruned from the cache.

The returned value is a TrackedItem which exposes:

- Value() interface{} (to get the actual cached value)
- Release() to release the item back in the cache
This commit is contained in:
Karl Seguin 2014-02-28 20:10:42 +08:00
parent af884bb25f
commit 890bb18dbf
7 changed files with 321 additions and 207 deletions

View file

@ -1,53 +1,53 @@
package ccache
import (
"sync"
"time"
"sync"
"time"
)
type Bucket struct {
sync.RWMutex
lookup map[string]*Item
sync.RWMutex
lookup map[string]*Item
}
func (b *Bucket) get(key string) *Item {
b.RLock()
defer b.RUnlock()
return b.lookup[key]
b.RLock()
defer b.RUnlock()
return b.lookup[key]
}
func (b *Bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool) {
expires := time.Now().Add(duration)
b.Lock()
defer b.Unlock()
if existing, exists := b.lookup[key]; exists {
existing.Lock()
existing.value = value
existing.expires = expires
existing.Unlock()
return existing, false
}
item := newItem(key, value, expires)
b.lookup[key] = item
return item, true
expires := time.Now().Add(duration)
b.Lock()
defer b.Unlock()
if existing, exists := b.lookup[key]; exists {
existing.Lock()
existing.value = value
existing.expires = expires
existing.Unlock()
return existing, false
}
item := newItem(key, value, expires)
b.lookup[key] = item
return item, true
}
func (b *Bucket) delete(key string) {
b.Lock()
defer b.Unlock()
delete(b.lookup, key)
b.Lock()
defer b.Unlock()
delete(b.lookup, key)
}
func (b *Bucket) getAndDelete(key string) *Item{
b.Lock()
defer b.Unlock()
item := b.lookup[key]
delete(b.lookup, key)
return item
func (b *Bucket) getAndDelete(key string) *Item {
b.Lock()
defer b.Unlock()
item := b.lookup[key]
delete(b.lookup, key)
return item
}
func (b *Bucket) clear() {
b.Lock()
defer b.Unlock()
b.lookup = make(map[string]*Item)
b.Lock()
defer b.Unlock()
b.lookup = make(map[string]*Item)
}

View file

@ -1,64 +1,64 @@
package ccache
import (
"time"
"testing"
"github.com/viki-org/gspec"
"github.com/karlseguin/gspec"
"testing"
"time"
)
func TestGetMissFromBucket(t *testing.T) {
bucket := testBucket()
gspec.New(t).Expect(bucket.get("invalid")).ToBeNil()
bucket := testBucket()
gspec.New(t).Expect(bucket.get("invalid")).ToBeNil()
}
func TestGetHitFromBucket(t *testing.T) {
bucket := testBucket()
item := bucket.get("power")
assertValue(t, item, "9000")
bucket := testBucket()
item := bucket.get("power")
assertValue(t, item, "9000")
}
func TestDeleteItemFromBucket(t *testing.T) {
bucket := testBucket()
bucket.delete("power")
gspec.New(t).Expect(bucket.get("power")).ToBeNil()
bucket := testBucket()
bucket.delete("power")
gspec.New(t).Expect(bucket.get("power")).ToBeNil()
}
func TestSetsANewBucketItem(t *testing.T) {
spec := gspec.New(t)
bucket := testBucket()
item, new := bucket.set("spice", TestValue("flow"), time.Minute)
assertValue(t, item, "flow")
item = bucket.get("spice")
assertValue(t, item, "flow")
spec.Expect(new).ToEqual(true)
spec := gspec.New(t)
bucket := testBucket()
item, new := bucket.set("spice", TestValue("flow"), time.Minute)
assertValue(t, item, "flow")
item = bucket.get("spice")
assertValue(t, item, "flow")
spec.Expect(new).ToEqual(true)
}
func TestSetsAnExistingItem(t *testing.T) {
spec := gspec.New(t)
bucket := testBucket()
item, new := bucket.set("power", TestValue("9002"), time.Minute)
assertValue(t, item, "9002")
item = bucket.get("power")
assertValue(t, item, "9002")
spec.Expect(new).ToEqual(false)
spec := gspec.New(t)
bucket := testBucket()
item, new := bucket.set("power", TestValue("9002"), time.Minute)
assertValue(t, item, "9002")
item = bucket.get("power")
assertValue(t, item, "9002")
spec.Expect(new).ToEqual(false)
}
func testBucket() *Bucket {
b := &Bucket{lookup: make(map[string]*Item),}
b.lookup["power"] = &Item{
key: "power",
value: TestValue("9000"),
}
return b
b := &Bucket{lookup: make(map[string]*Item)}
b.lookup["power"] = &Item{
key: "power",
value: TestValue("9000"),
}
return b
}
func assertValue(t *testing.T, item *Item, expected string) {
value := item.value.(TestValue)
gspec.New(t).Expect(value).ToEqual(TestValue(expected))
value := item.value.(TestValue)
gspec.New(t).Expect(value).ToEqual(TestValue(expected))
}
type TestValue string
func (v TestValue) Expires() time.Time {
return time.Now()
return time.Now()
}

217
cache.go
View file

@ -1,139 +1,172 @@
// An LRU cached aimed at high concurrency
package ccache
import (
"time"
"runtime"
"hash/fnv"
"container/list"
"container/list"
"hash/fnv"
"runtime"
"sync/atomic"
"time"
)
type Cache struct {
*Configuration
list *list.List
buckets []*Bucket
bucketCount uint32
deletables chan *Item
promotables chan *Item
*Configuration
list *list.List
buckets []*Bucket
bucketCount uint32
deletables chan *Item
promotables chan *Item
}
func New(config *Configuration) *Cache {
c := &Cache{
list: list.New(),
Configuration: config,
bucketCount: uint32(config.buckets),
buckets: make([]*Bucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
}
for i := 0; i < config.buckets; i++ {
c.buckets[i] = &Bucket{
lookup: make(map[string]*Item),
}
}
go c.worker()
return c
c := &Cache{
list: list.New(),
Configuration: config,
bucketCount: uint32(config.buckets),
buckets: make([]*Bucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
}
for i := 0; i < config.buckets; i++ {
c.buckets[i] = &Bucket{
lookup: make(map[string]*Item),
}
}
go c.worker()
return c
}
func (c *Cache) Get(key string) interface{} {
bucket := c.bucket(key)
item := bucket.get(key)
if item == nil { return nil }
if item.expires.Before(time.Now()) {
c.deleteItem(bucket, item)
return nil
}
c.conditionalPromote(item)
return item.value
if item := c.get(key); item != nil {
return item.value
}
return nil
}
func (c *Cache) TrackingGet(key string) TrackedItem {
item := c.get(key)
if item == nil {
return NilTracked
}
item.track()
return item
}
func (c *Cache) get(key string) *Item {
bucket := c.bucket(key)
item := bucket.get(key)
if item == nil {
return nil
}
if item.expires.Before(time.Now()) {
c.deleteItem(bucket, item)
return nil
}
c.conditionalPromote(item)
return item
}
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
item, new := c.bucket(key).set(key, value, duration)
if new {
c.promote(item)
} else {
c.conditionalPromote(item)
}
item, new := c.bucket(key).set(key, value, duration)
if new {
c.promote(item)
} else {
c.conditionalPromote(item)
}
}
func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) {
item := c.Get(key)
if item != nil { return item, nil }
value, err := fetch()
if err == nil {
c.Set(key, value, duration)
}
return value, err
item := c.Get(key)
if item != nil {
return item, nil
}
value, err := fetch()
if err == nil {
c.Set(key, value, duration)
}
return value, err
}
func (c *Cache) Delete(key string) {
item := c.bucket(key).getAndDelete(key)
if item != nil {
c.deletables <- item
}
item := c.bucket(key).getAndDelete(key)
if item != nil {
c.deletables <- item
}
}
//this isn't thread safe. It's meant to be called from non-concurrent tests
func (c *Cache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.list = list.New()
for _, bucket := range c.buckets {
bucket.clear()
}
c.list = list.New()
}
func (c *Cache) deleteItem(bucket *Bucket, item *Item) {
bucket.delete(item.key) //stop othe GETs from getting it
c.deletables <- item
bucket.delete(item.key) //stop othe GETs from getting it
c.deletables <- item
}
func (c *Cache) bucket(key string) *Bucket {
h := fnv.New32a()
h.Write([]byte(key))
index := h.Sum32() % c.bucketCount
return c.buckets[index]
h := fnv.New32a()
h.Write([]byte(key))
index := h.Sum32() % c.bucketCount
return c.buckets[index]
}
func (c *Cache) conditionalPromote(item *Item) {
if item.shouldPromote(c.getsPerPromote) == false { return }
c.promote(item)
if item.shouldPromote(c.getsPerPromote) == false {
return
}
c.promote(item)
}
func (c *Cache) promote(item *Item) {
c.promotables <- item
c.promotables <- item
}
func (c *Cache) worker() {
ms := new(runtime.MemStats)
for {
select {
case item := <- c.promotables:
wasNew := c.doPromote(item)
if wasNew == false { continue }
runtime.ReadMemStats(ms)
if ms.HeapAlloc > c.size { c.gc() }
case item := <- c.deletables:
c.list.Remove(item.element)
}
}
ms := new(runtime.MemStats)
for {
select {
case item := <-c.promotables:
if wasNew := c.doPromote(item); wasNew == false {
continue
}
runtime.ReadMemStats(ms)
if ms.HeapAlloc > c.size {
c.gc()
}
case item := <-c.deletables:
c.list.Remove(item.element)
}
}
}
func (c *Cache) doPromote(item *Item) bool {
item.Lock()
defer item.Unlock()
item.promotions = 0
if item.element != nil { //not a new item
c.list.MoveToFront(item.element)
return false
}
item.element = c.list.PushFront(item)
return true
item.Lock()
defer item.Unlock()
item.promotions = 0
if item.element != nil { //not a new item
c.list.MoveToFront(item.element)
return false
}
item.element = c.list.PushFront(item)
return true
}
func (c *Cache) gc() {
for i := 0; i < c.itemsToPrune; i++ {
element := c.list.Back()
if element == nil { return }
item := element.Value.(*Item)
c.bucket(item.key).delete(item.key)
c.list.Remove(element)
}
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.key).delete(item.key)
c.list.Remove(element)
}
element = prev
}
}

49
cache_test.go Normal file
View file

@ -0,0 +1,49 @@
package ccache
import (
"github.com/karlseguin/gspec"
"testing"
"strconv"
"time"
)
func TestGCsTheOldestItems(t *testing.T) {
spec := gspec.New(t)
cache := New(Configure().ItemsToPrune(10))
for i := 0; i < 500; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
cache.gc()
spec.Expect(cache.Get("9")).ToBeNil()
spec.Expect(cache.Get("10").(int)).ToEqual(10)
}
func TestPromotedItemsDontGetPruned(t *testing.T) {
spec := gspec.New(t)
cache := New(Configure().ItemsToPrune(10).GetsPerPromote(1))
for i := 0; i < 500; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
cache.Get("9")
time.Sleep(time.Millisecond * 10)
cache.gc()
spec.Expect(cache.Get("9").(int)).ToEqual(9)
spec.Expect(cache.Get("10")).ToBeNil()
spec.Expect(cache.Get("11").(int)).ToEqual(11)
}
func TestTrackerDoesNotCleanupHeldInstance(t *testing.T) {
spec := gspec.New(t)
cache := New(Configure().ItemsToPrune(10).Track())
for i := 0; i < 10; i++ {
cache.Set(strconv.Itoa(i), i, time.Minute)
}
item := cache.TrackingGet("0")
time.Sleep(time.Millisecond * 10)
cache.gc()
spec.Expect(cache.Get("0").(int)).ToEqual(0)
spec.Expect(cache.Get("1")).ToBeNil()
item.Release()
cache.gc()
spec.Expect(cache.Get("0")).ToBeNil()
}

View file

@ -1,51 +1,58 @@
package ccache
type Configuration struct {
size uint64
buckets int
itemsToPrune int
deleteBuffer int
promoteBuffer int
getsPerPromote int32
size uint64
buckets int
itemsToPrune int
deleteBuffer int
promoteBuffer int
getsPerPromote int32
tracking bool
}
func Configure() *Configuration {
return &Configuration {
buckets: 64,
itemsToPrune: 500,
deleteBuffer: 1024,
getsPerPromote: 10,
promoteBuffer: 1024,
size: 500 * 1024 * 1024,
}
return &Configuration{
buckets: 64,
itemsToPrune: 500,
deleteBuffer: 1024,
getsPerPromote: 10,
promoteBuffer: 1024,
size: 500 * 1024 * 1024,
tracking: false,
}
}
func (c *Configuration) Size(bytes uint64) *Configuration {
c.size = bytes
return c
c.size = bytes
return c
}
func (c *Configuration) Buckets(count int) *Configuration {
c.buckets = count
return c
c.buckets = count
return c
}
func (c *Configuration) ItemsToPrune(count int) *Configuration {
c.itemsToPrune = count
return c
c.itemsToPrune = count
return c
}
func (c *Configuration) PromoteBuffer(size int) *Configuration {
c.promoteBuffer = size
return c
c.promoteBuffer = size
return c
}
func (c *Configuration) DeleteBuffer(size int) *Configuration {
c.deleteBuffer = size
return c
c.deleteBuffer = size
return c
}
func (c *Configuration) GetsPerPromote(count int) *Configuration {
c.getsPerPromote = int32(count)
return c
c.getsPerPromote = int32(count)
return c
}
func (c *Configuration) Track() *Configuration {
c.tracking = true
return c
}

59
item.go
View file

@ -1,30 +1,55 @@
package ccache
import (
"sync"
"time"
"sync/atomic"
"container/list"
"container/list"
"sync"
"sync/atomic"
"time"
)
type TrackedItem interface {
Value() interface{}
Release()
}
type nilItem struct{}
func (n *nilItem) Value() interface{} { return nil }
func (n *nilItem) Release() {}
var NilTracked = new(nilItem)
type Item struct {
key string
sync.RWMutex
promotions int32
expires time.Time
value interface{}
element *list.Element
key string
sync.RWMutex
promotions int32
refCount int32
expires time.Time
value interface{}
element *list.Element
}
func newItem(key string, value interface{}, expires time.Time) *Item {
return &Item{
key: key,
value: value,
promotions: -1,
expires: expires,
}
return &Item{
key: key,
value: value,
promotions: -1,
expires: expires,
}
}
func (i *Item) shouldPromote(getsPerPromote int32) bool {
return atomic.AddInt32(&i.promotions, 1) == getsPerPromote
return atomic.AddInt32(&i.promotions, 1) == getsPerPromote
}
func (i *Item) Value() interface{} {
return i.value
}
func (i *Item) track() {
atomic.AddInt32(&i.refCount, 1)
}
func (i *Item) Release() {
atomic.AddInt32(&i.refCount, -1)
}

View file

@ -1,13 +1,13 @@
package ccache
import (
"testing"
"github.com/viki-org/gspec"
"github.com/karlseguin/gspec"
"testing"
)
func TestItemPromotability(t *testing.T) {
spec := gspec.New(t)
item := &Item{promotions: 4}
spec.Expect(item.shouldPromote(5)).ToEqual(true)
spec.Expect(item.shouldPromote(5)).ToEqual(false)
spec := gspec.New(t)
item := &Item{promotions: 4}
spec.Expect(item.shouldPromote(5)).ToEqual(true)
spec.Expect(item.shouldPromote(5)).ToEqual(false)
}