Added layered cache
This commit is contained in:
parent
13c50b1ff5
commit
0c7492b382
7 changed files with 442 additions and 29 deletions
|
@ -30,13 +30,7 @@ func (b *Bucket) set(key string, value interface{}, duration time.Duration) (*It
|
|||
return item, true
|
||||
}
|
||||
|
||||
func (b *Bucket) delete(key string) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
delete(b.lookup, key)
|
||||
}
|
||||
|
||||
func (b *Bucket) getAndDelete(key string) *Item {
|
||||
func (b *Bucket) delete(key string) *Item {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
item := b.lookup[key]
|
||||
|
|
4
cache.go
4
cache.go
|
@ -87,7 +87,7 @@ func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interfac
|
|||
}
|
||||
|
||||
func (c *Cache) Delete(key string) {
|
||||
item := c.bucket(key).getAndDelete(key)
|
||||
item := c.bucket(key).delete(key)
|
||||
if item != nil {
|
||||
c.deletables <- item
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ func (c *Cache) Clear() {
|
|||
}
|
||||
|
||||
func (c *Cache) deleteItem(bucket *Bucket, item *Item) {
|
||||
bucket.delete(item.key) //stop othe GETs from getting it
|
||||
bucket.delete(item.key) //stop other GETs from getting it
|
||||
c.deletables <- item
|
||||
}
|
||||
|
||||
|
|
1
item.go
1
item.go
|
@ -19,6 +19,7 @@ var NilTracked = new(nilItem)
|
|||
|
||||
type Item struct {
|
||||
key string
|
||||
group string
|
||||
promotions int32
|
||||
refCount int32
|
||||
expires int64
|
||||
|
|
76
layeredbucket.go
Normal file
76
layeredbucket.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package ccache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LayeredBucket struct {
|
||||
sync.RWMutex
|
||||
buckets map[string]*Bucket
|
||||
}
|
||||
|
||||
func (b *LayeredBucket) get(primary, secondary string) *Item {
|
||||
b.RLock()
|
||||
bucket, exists := b.buckets[primary]
|
||||
b.RUnlock()
|
||||
if exists == false {
|
||||
return nil
|
||||
}
|
||||
return bucket.get(secondary)
|
||||
}
|
||||
|
||||
|
||||
func (b *LayeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, bool) {
|
||||
b.Lock()
|
||||
bucket, exists := b.buckets[primary]
|
||||
if exists == false {
|
||||
bucket = &Bucket{lookup: make(map[string]*Item)}
|
||||
b.buckets[primary] = bucket
|
||||
}
|
||||
b.Unlock()
|
||||
item, new := bucket.set(secondary, value, duration)
|
||||
if new {
|
||||
item.group = primary
|
||||
}
|
||||
return item, new
|
||||
}
|
||||
|
||||
func (b *LayeredBucket) delete(primary, secondary string) *Item {
|
||||
b.RLock()
|
||||
bucket, exists := b.buckets[primary]
|
||||
b.RUnlock()
|
||||
if exists == false {
|
||||
return nil
|
||||
}
|
||||
return bucket.delete(secondary)
|
||||
}
|
||||
|
||||
func (b *LayeredBucket) deleteAll(primary string, deletables chan *Item) {
|
||||
b.RLock()
|
||||
bucket, exists := b.buckets[primary]
|
||||
b.RUnlock()
|
||||
if exists == false {
|
||||
return
|
||||
}
|
||||
|
||||
bucket.Lock()
|
||||
defer bucket.Unlock()
|
||||
|
||||
if l := len(bucket.lookup); l == 0 {
|
||||
return
|
||||
}
|
||||
for key, item := range bucket.lookup {
|
||||
delete(bucket.lookup, key)
|
||||
deletables <- item
|
||||
}
|
||||
}
|
||||
|
||||
func (b *LayeredBucket) clear() {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
for _, bucket := range b.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
b.buckets = make(map[string]*Bucket)
|
||||
}
|
170
layeredcache.go
Normal file
170
layeredcache.go
Normal file
|
@ -0,0 +1,170 @@
|
|||
// An LRU cached aimed at high concurrency
|
||||
package ccache
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"hash/fnv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LayeredCache struct {
|
||||
*Configuration
|
||||
list *list.List
|
||||
buckets []*LayeredBucket
|
||||
bucketCount uint32
|
||||
deletables chan *Item
|
||||
promotables chan *Item
|
||||
}
|
||||
|
||||
func Layered(config *Configuration) *LayeredCache {
|
||||
c := &LayeredCache{
|
||||
list: list.New(),
|
||||
Configuration: config,
|
||||
bucketCount: uint32(config.buckets),
|
||||
buckets: make([]*LayeredBucket, config.buckets),
|
||||
deletables: make(chan *Item, config.deleteBuffer),
|
||||
promotables: make(chan *Item, config.promoteBuffer),
|
||||
}
|
||||
for i := 0; i < int(config.buckets); i++ {
|
||||
c.buckets[i] = &LayeredBucket{
|
||||
buckets: make(map[string]*Bucket),
|
||||
}
|
||||
}
|
||||
go c.worker()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *LayeredCache) Get(primary, secondary string) interface{} {
|
||||
if item := c.get(primary, secondary); item != nil {
|
||||
return item.value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
|
||||
item := c.get(primary, secondary)
|
||||
if item == nil {
|
||||
return NilTracked
|
||||
}
|
||||
item.track()
|
||||
return item
|
||||
}
|
||||
|
||||
func (c *LayeredCache) get(primary, secondary string) *Item {
|
||||
bucket := c.bucket(primary)
|
||||
item := bucket.get(primary, secondary)
|
||||
if item == nil {
|
||||
return nil
|
||||
}
|
||||
if item.expires < time.Now().Unix() {
|
||||
return nil
|
||||
}
|
||||
c.conditionalPromote(item)
|
||||
return item
|
||||
}
|
||||
|
||||
func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
|
||||
item, new := c.bucket(primary).set(primary, secondary, value, duration)
|
||||
if new {
|
||||
c.promote(item)
|
||||
} else {
|
||||
c.conditionalPromote(item)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) {
|
||||
item := c.Get(primary, secondary)
|
||||
if item != nil {
|
||||
return item, nil
|
||||
}
|
||||
value, err := fetch()
|
||||
if err == nil {
|
||||
c.Set(primary, secondary, value, duration)
|
||||
}
|
||||
return value, err
|
||||
}
|
||||
|
||||
func (c *LayeredCache) Delete(primary, secondary string) {
|
||||
item := c.bucket(primary).delete(primary, secondary)
|
||||
if item != nil {
|
||||
c.deletables <- item
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LayeredCache) DeleteAll(primary string) {
|
||||
c.bucket(primary).deleteAll(primary, c.deletables)
|
||||
}
|
||||
|
||||
//this isn't thread safe. It's meant to be called from non-concurrent tests
|
||||
func (c *LayeredCache) Clear() {
|
||||
for _, bucket := range c.buckets {
|
||||
bucket.clear()
|
||||
}
|
||||
c.list = list.New()
|
||||
}
|
||||
|
||||
func (c *LayeredCache) bucket(key string) *LayeredBucket {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
return c.buckets[h.Sum32()%c.bucketCount]
|
||||
}
|
||||
|
||||
func (c *LayeredCache) conditionalPromote(item *Item) {
|
||||
if item.shouldPromote(c.getsPerPromote) == false {
|
||||
return
|
||||
}
|
||||
c.promote(item)
|
||||
}
|
||||
|
||||
func (c *LayeredCache) promote(item *Item) {
|
||||
c.promotables <- item
|
||||
}
|
||||
|
||||
func (c *LayeredCache) worker() {
|
||||
for {
|
||||
select {
|
||||
case item := <-c.promotables:
|
||||
if c.doPromote(item) && uint64(c.list.Len()) > c.maxItems {
|
||||
c.gc()
|
||||
}
|
||||
case item := <-c.deletables:
|
||||
if item.element == nil {
|
||||
item.promotions = -2
|
||||
} else {
|
||||
c.list.Remove(item.element)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *LayeredCache) doPromote(item *Item) bool {
|
||||
// deleted before it ever got promoted
|
||||
if item.promotions == -2 {
|
||||
return false
|
||||
}
|
||||
|
||||
item.promotions = 0
|
||||
if item.element != nil { //not a new item
|
||||
c.list.MoveToFront(item.element)
|
||||
return false
|
||||
}
|
||||
item.element = c.list.PushFront(item)
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *LayeredCache) gc() {
|
||||
element := c.list.Back()
|
||||
for i := 0; i < c.itemsToPrune; i++ {
|
||||
if element == nil {
|
||||
return
|
||||
}
|
||||
prev := element.Prev()
|
||||
item := element.Value.(*Item)
|
||||
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
|
||||
c.bucket(item.group).delete(item.group, item.key)
|
||||
c.list.Remove(element)
|
||||
}
|
||||
element = prev
|
||||
}
|
||||
}
|
135
layeredcache_test.go
Normal file
135
layeredcache_test.go
Normal file
|
@ -0,0 +1,135 @@
|
|||
package ccache
|
||||
|
||||
import (
|
||||
. "github.com/karlseguin/expect"
|
||||
"testing"
|
||||
"time"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type LayeredCacheTests struct{}
|
||||
|
||||
func Test_LayeredCache(t *testing.T) {
|
||||
Expectify(new(LayeredCacheTests), t)
|
||||
}
|
||||
|
||||
func (l *LayeredCacheTests) GetsANonExistantValue() {
|
||||
cache := newLayered()
|
||||
Expect(cache.Get("spice", "flow")).To.Equal(nil)
|
||||
}
|
||||
|
||||
func (l *LayeredCacheTests) SetANewValue() {
|
||||
cache := newLayered()
|
||||
cache.Set("spice", "flow", "a value", time.Minute)
|
||||
Expect(cache.Get("spice", "flow").(string)).To.Equal("a value")
|
||||
Expect(cache.Get("spice", "stop")).To.Equal(nil)
|
||||
}
|
||||
|
||||
func (l *LayeredCacheTests) SetsMultipleValueWithinTheSameLayer() {
|
||||
cache := newLayered()
|
||||
cache.Set("spice", "flow", "value-a", time.Minute)
|
||||
cache.Set("spice", "must", "value-b", time.Minute)
|
||||
cache.Set("leto", "sister", "ghanima", time.Minute)
|
||||
Expect(cache.Get("spice", "flow").(string)).To.Equal("value-a")
|
||||
Expect(cache.Get("spice", "must").(string)).To.Equal("value-b")
|
||||
Expect(cache.Get("spice", "worm")).To.Equal(nil)
|
||||
|
||||
Expect(cache.Get("leto", "sister").(string)).To.Equal("ghanima")
|
||||
Expect(cache.Get("leto", "brother")).To.Equal(nil)
|
||||
Expect(cache.Get("baron", "friend")).To.Equal(nil)
|
||||
}
|
||||
|
||||
func (l *LayeredCacheTests) DeletesAValue() {
|
||||
cache := newLayered()
|
||||
cache.Set("spice", "flow", "value-a", time.Minute)
|
||||
cache.Set("spice", "must", "value-b", time.Minute)
|
||||
cache.Set("leto", "sister", "ghanima", time.Minute)
|
||||
cache.Delete("spice", "flow")
|
||||
Expect(cache.Get("spice", "flow")).To.Equal(nil)
|
||||
Expect(cache.Get("spice", "must").(string)).To.Equal("value-b")
|
||||
Expect(cache.Get("spice", "worm")).To.Equal(nil)
|
||||
Expect(cache.Get("leto", "sister").(string)).To.Equal("ghanima")
|
||||
}
|
||||
|
||||
|
||||
func (l *LayeredCacheTests) DeletesALayer() {
|
||||
cache := newLayered()
|
||||
cache.Set("spice", "flow", "value-a", time.Minute)
|
||||
cache.Set("spice", "must", "value-b", time.Minute)
|
||||
cache.Set("leto", "sister", "ghanima", time.Minute)
|
||||
cache.DeleteAll("spice")
|
||||
Expect(cache.Get("spice", "flow")).To.Equal(nil)
|
||||
Expect(cache.Get("spice", "must")).To.Equal(nil)
|
||||
Expect(cache.Get("spice", "worm")).To.Equal(nil)
|
||||
Expect(cache.Get("leto", "sister").(string)).To.Equal("ghanima")
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
func (c *LayeredCacheTests) GCsTheOldestItems() {
|
||||
cache := Layered(Configure().ItemsToPrune(10))
|
||||
cache.Set("xx", "a", 23, time.Minute)
|
||||
for i := 0; i < 500; i++ {
|
||||
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
|
||||
}
|
||||
cache.Set("xx", "b", 9001, time.Minute)
|
||||
//let the items get promoted (and added to our list)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
cache.gc()
|
||||
Expect(cache.Get("xx", "a")).To.Equal(nil)
|
||||
Expect(cache.Get("xx", "b").(int)).To.Equal(9001)
|
||||
Expect(cache.Get("8", "a")).To.Equal(nil)
|
||||
Expect(cache.Get("9", "a")).To.Equal(9)
|
||||
Expect(cache.Get("10", "a").(int)).To.Equal(10)
|
||||
}
|
||||
|
||||
func (c *LayeredCacheTests) PromotedItemsDontGetPruned() {
|
||||
cache := Layered(Configure().ItemsToPrune(10).GetsPerPromote(1))
|
||||
for i := 0; i < 500; i++ {
|
||||
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 10) //run the worker once to init the list
|
||||
cache.Get("9", "a")
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
cache.gc()
|
||||
Expect(cache.Get("9", "a").(int)).To.Equal(9)
|
||||
Expect(cache.Get("10", "a")).To.Equal(nil)
|
||||
Expect(cache.Get("11", "a").(int)).To.Equal(11)
|
||||
}
|
||||
|
||||
func (c *LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() {
|
||||
cache := Layered(Configure().ItemsToPrune(10).Track())
|
||||
for i := 0; i < 10; i++ {
|
||||
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
|
||||
}
|
||||
item := cache.TrackingGet("0", "a")
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
cache.gc()
|
||||
Expect(cache.Get("0", "a").(int)).To.Equal(0)
|
||||
Expect(cache.Get("1", "a")).To.Equal(nil)
|
||||
item.Release()
|
||||
cache.gc()
|
||||
Expect(cache.Get("0", "a")).To.Equal(nil)
|
||||
}
|
||||
|
||||
func (c *LayeredCacheTests) RemovesOldestItemWhenFull() {
|
||||
cache := Layered(Configure().MaxItems(5).ItemsToPrune(1))
|
||||
cache.Set("xx", "a", 23, time.Minute)
|
||||
for i := 0; i < 7; i++ {
|
||||
cache.Set(strconv.Itoa(i), "a", i, time.Minute)
|
||||
}
|
||||
cache.Set("xx", "b", 9001, time.Minute)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
Expect(cache.Get("xx", "a")).To.Equal(nil)
|
||||
Expect(cache.Get("0", "a")).To.Equal(nil)
|
||||
Expect(cache.Get("1", "a")).To.Equal(nil)
|
||||
Expect(cache.Get("2", "a")).To.Equal(nil)
|
||||
Expect(cache.Get("3", "a")).To.Equal(3)
|
||||
Expect(cache.Get("xx", "b")).To.Equal(9001)
|
||||
}
|
||||
|
||||
func newLayered() *LayeredCache {
|
||||
return Layered(Configure())
|
||||
}
|
77
readme.md
77
readme.md
|
@ -17,15 +17,19 @@ First, download the project:
|
|||
Next, import and create a `ccache` instance:
|
||||
|
||||
|
||||
import (
|
||||
"github.com/karlseguin/ccache"
|
||||
)
|
||||
```go
|
||||
import (
|
||||
"github.com/karlseguin/ccache"
|
||||
)
|
||||
|
||||
var cache = ccache.New(ccache.Configure())
|
||||
var cache = ccache.New(ccache.Configure())
|
||||
```
|
||||
|
||||
`Configure` exposes a chainable API:
|
||||
|
||||
var cache = ccache.New(ccache.Configure().MaxItems(1000).itemsToPrune(100))
|
||||
```go
|
||||
var cache = ccache.New(ccache.Configure().MaxItems(1000).itemsToPrune(100))
|
||||
```
|
||||
|
||||
The most likely configuration options to tweak are:
|
||||
|
||||
|
@ -43,39 +47,72 @@ Configurations that change the internals of the cache, which aren't as likely to
|
|||
|
||||
Once the cache is setup, you can `Get`, `Set` and `Delete` items from it. A `Get` returns an `interface{}` which you'll want to cast back to the type of object you stored:
|
||||
|
||||
item := cache.Get("user:4")
|
||||
if item == nil {
|
||||
//handle
|
||||
} else {
|
||||
user := item.(*User)
|
||||
}
|
||||
```go
|
||||
item := cache.Get("user:4")
|
||||
if item == nil {
|
||||
//handle
|
||||
} else {
|
||||
user := item.(*User)
|
||||
}
|
||||
```
|
||||
|
||||
`Set` expects the key, value and ttl:
|
||||
|
||||
cache.Set("user:4", user, time.Minute * 10)
|
||||
```go
|
||||
cache.Set("user:4", user, time.Minute * 10)
|
||||
```
|
||||
|
||||
There's also a `Fetch` which mixes a `Get` and a `Set`:
|
||||
|
||||
item, err := cache.Fetch("user:4", time.Minute * 10, func() (interface{}, error) {
|
||||
//code to fetch the data incase of a miss
|
||||
//should return the data to cache and the error, if any
|
||||
})
|
||||
```go
|
||||
item, err := cache.Fetch("user:4", time.Minute * 10, func() (interface{}, error) {
|
||||
//code to fetch the data incase of a miss
|
||||
//should return the data to cache and the error, if any
|
||||
})
|
||||
```
|
||||
|
||||
## Tracking
|
||||
ccache supports a special tracking mode which is meant to be used in conjunction with other pieces of your code that maintains a long-lived reference to data.
|
||||
|
||||
When you configure your cache with `Track()`:
|
||||
|
||||
cache = ccache.New(ccache.Configure().Track())
|
||||
```go
|
||||
cache = ccache.New(ccache.Configure().Track())
|
||||
```
|
||||
|
||||
The items retrieved via `TrackingGet` will not be eligible for purge until `Release` is called on them:
|
||||
|
||||
item := cache.TrackingGet("user:4")
|
||||
user := item.Value() //will be nil if "user:4" didn't exist in the cache
|
||||
item.Release() //can be called even if item.Value() returned nil
|
||||
```go
|
||||
item := cache.TrackingGet("user:4")
|
||||
user := item.Value() //will be nil if "user:4" didn't exist in the cache
|
||||
item.Release() //can be called even if item.Value() returned nil
|
||||
```
|
||||
|
||||
In practive, `Release` wouldn't be called until later, at some other place in your code.
|
||||
|
||||
There's a couple reason to use the tracking mode if other parts of your code also hold references to objects. First, if you're already going to hold a reference to these objects, there's really no reason not to have them in the cache - the memory is used up anyways.
|
||||
|
||||
More important, it helps ensure that you're code returns consistent data. With tracking, "user:4" might be purged, and a subsequent `Fetch` would reload the data. This can result in different versions of "user:4" being returned by different parts of your system.
|
||||
|
||||
## LayeredCache
|
||||
|
||||
CCache's `LayeredCache` stores and retrieves values by both a primary and secondary key. Deletion can happen against either the primary and secondary key, or the primary key only (removing all values that share the same primary key).
|
||||
|
||||
`LayeredCache` is useful for HTTP caching, when you want to purge all variations of a request. Consider:
|
||||
|
||||
`LayeredCache` takes the same configuration object as the main cache, exposes the same optional tracking capabilities, but exposes a slightly different API:
|
||||
|
||||
```go
|
||||
cache := ccache.Layered(ccache.Configure())
|
||||
|
||||
cache.Set("/users/goku", "type:json", "{value_to_cache}", time.Minute * 5)
|
||||
cache.Set("/users/goku", "type:xml", "<value_to_cache>", time.Minute * 5)
|
||||
|
||||
json := cache.Get("/users/goku", "type:json")
|
||||
xml := cache.Get("/users/goku", "type:xml")
|
||||
|
||||
cache.Delete("/users/goku", "type:json")
|
||||
cache.Delete("/users/goku", "type:xml")
|
||||
// OR
|
||||
cache.DeleteAll("/users/goku")
|
||||
```
|
||||
|
|
Loading…
Reference in a new issue