Remove Value interface, cache now works against interface{} with the
expiry specified on a Set. Get no longer returns expired items Items can now be deleted
This commit is contained in:
parent
36e5fae491
commit
751266c34a
7 changed files with 80 additions and 37 deletions
18
bucket.go
18
bucket.go
|
@ -2,6 +2,7 @@ package ccache
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Bucket struct {
|
||||
|
@ -15,23 +16,32 @@ func (b *Bucket) get(key string) *Item {
|
|||
return b.lookup[key]
|
||||
}
|
||||
|
||||
func (b *Bucket) set(key string, value Value) *Item {
|
||||
func (b *Bucket) set(key string, value interface{}, duration time.Duration) *Item {
|
||||
expires := time.Now().Add(duration)
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
if existing, exists := b.lookup[key]; exists {
|
||||
existing.Lock()
|
||||
existing.value = value
|
||||
existing.expires = expires
|
||||
existing.Unlock()
|
||||
return existing
|
||||
}
|
||||
item := newItem(key, value)
|
||||
item := newItem(key, value, expires)
|
||||
b.lookup[key] = item
|
||||
return item
|
||||
}
|
||||
|
||||
|
||||
func (b *Bucket) remove(key string) {
|
||||
func (b *Bucket) delete(key string) {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
delete(b.lookup, key)
|
||||
}
|
||||
|
||||
func (b *Bucket) getAndDelete(key string) *Item{
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
item := b.lookup[key]
|
||||
delete(b.lookup, key)
|
||||
return item
|
||||
}
|
||||
|
|
|
@ -17,15 +17,15 @@ func TestGetHitFromBucket(t *testing.T) {
|
|||
assertValue(t, item, "9000")
|
||||
}
|
||||
|
||||
func TestRemovesItemFromBucket(t *testing.T) {
|
||||
func TestDeleteItemFromBucket(t *testing.T) {
|
||||
bucket := testBucket()
|
||||
bucket.remove("power")
|
||||
bucket.delete("power")
|
||||
gspec.New(t).Expect(bucket.get("power")).ToBeNil()
|
||||
}
|
||||
|
||||
func TestSetsANewBucketItem(t *testing.T) {
|
||||
bucket := testBucket()
|
||||
item := bucket.set("spice", TestValue("flow"))
|
||||
item := bucket.set("spice", TestValue("flow"), time.Minute)
|
||||
assertValue(t, item, "flow")
|
||||
item = bucket.get("spice")
|
||||
assertValue(t, item, "flow")
|
||||
|
@ -33,7 +33,7 @@ func TestSetsANewBucketItem(t *testing.T) {
|
|||
|
||||
func TestSetsAnExistingItem(t *testing.T) {
|
||||
bucket := testBucket()
|
||||
item := bucket.set("power", TestValue("9002"))
|
||||
item := bucket.set("power", TestValue("9002"), time.Minute)
|
||||
assertValue(t, item, "9002")
|
||||
item = bucket.get("power")
|
||||
assertValue(t, item, "9002")
|
||||
|
|
48
cache.go
48
cache.go
|
@ -7,15 +7,12 @@ import (
|
|||
"container/list"
|
||||
)
|
||||
|
||||
type Value interface {
|
||||
Expires() time.Time
|
||||
}
|
||||
|
||||
type Cache struct {
|
||||
*Configuration
|
||||
list *list.List
|
||||
buckets []*Bucket
|
||||
bucketCount uint32
|
||||
deletables chan *Item
|
||||
promotables chan *Item
|
||||
}
|
||||
|
||||
|
@ -25,6 +22,7 @@ func New(config *Configuration) *Cache {
|
|||
Configuration: config,
|
||||
bucketCount: uint32(config.buckets),
|
||||
buckets: make([]*Bucket, config.buckets),
|
||||
deletables: make(chan *Item, config.deleteBuffer),
|
||||
promotables: make(chan *Item, config.promoteBuffer),
|
||||
}
|
||||
for i := 0; i < config.buckets; i++ {
|
||||
|
@ -36,18 +34,35 @@ func New(config *Configuration) *Cache {
|
|||
return c
|
||||
}
|
||||
|
||||
func (c *Cache) Get(key string) Value {
|
||||
item := c.bucket(key).get(key)
|
||||
func (c *Cache) Get(key string) interface{} {
|
||||
bucket := c.bucket(key)
|
||||
item := bucket.get(key)
|
||||
if item == nil { return nil }
|
||||
if item.expires.Before(time.Now()) {
|
||||
c.deleteItem(bucket, item)
|
||||
return nil
|
||||
}
|
||||
c.promote(item)
|
||||
return item.value
|
||||
}
|
||||
|
||||
func (c *Cache) Set(key string, value Value) {
|
||||
item := c.bucket(key).set(key, value)
|
||||
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
|
||||
item := c.bucket(key).set(key, value, duration)
|
||||
c.promote(item)
|
||||
}
|
||||
|
||||
func (c *Cache) Delete(key string) {
|
||||
item := c.bucket(key).getAndDelete(key)
|
||||
if item != nil {
|
||||
c.deletables <- item
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) deleteItem(bucket *Bucket, item *Item) {
|
||||
bucket.delete(item.key) //stop othe GETs from getting it
|
||||
c.deletables <- item
|
||||
}
|
||||
|
||||
func (c *Cache) bucket(key string) *Bucket {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(key))
|
||||
|
@ -56,18 +71,21 @@ func (c *Cache) bucket(key string) *Bucket {
|
|||
}
|
||||
|
||||
func (c *Cache) promote(item *Item) {
|
||||
if item.shouldPromote() == false { return }
|
||||
if item.shouldPromote(c.getsPerPromote) == false { return }
|
||||
c.promotables <- item
|
||||
}
|
||||
|
||||
func (c *Cache) worker() {
|
||||
ms := new(runtime.MemStats)
|
||||
for {
|
||||
wasNew := c.doPromote(<- c.promotables)
|
||||
if wasNew == false { continue }
|
||||
runtime.ReadMemStats(ms)
|
||||
if ms.HeapAlloc > c.size{
|
||||
c.gc()
|
||||
select {
|
||||
case item := <- c.promotables:
|
||||
wasNew := c.doPromote(item)
|
||||
if wasNew == false { continue }
|
||||
runtime.ReadMemStats(ms)
|
||||
if ms.HeapAlloc > c.size { c.gc() }
|
||||
case item := <- c.deletables:
|
||||
c.list.Remove(item.element)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -89,7 +107,7 @@ func (c *Cache) gc() {
|
|||
element := c.list.Back()
|
||||
if element == nil { return }
|
||||
item := element.Value.(*Item)
|
||||
c.bucket(item.key).remove(item.key)
|
||||
c.bucket(item.key).delete(item.key)
|
||||
c.list.Remove(element)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,13 +4,17 @@ type Configuration struct {
|
|||
size uint64
|
||||
buckets int
|
||||
itemsToPrune int
|
||||
deleteBuffer int
|
||||
promoteBuffer int
|
||||
getsPerPromote int32
|
||||
}
|
||||
|
||||
func Configure() *Configuration {
|
||||
return &Configuration{
|
||||
return &Configuration {
|
||||
buckets: 64,
|
||||
itemsToPrune: 500,
|
||||
deleteBuffer: 1024,
|
||||
getsPerPromote: 10,
|
||||
promoteBuffer: 1024,
|
||||
size: 500 * 1024 * 1024,
|
||||
}
|
||||
|
@ -35,3 +39,13 @@ func (c *Configuration) PromoteBuffer(size int) *Configuration {
|
|||
c.promoteBuffer = size
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Configuration) DeleteBuffer(size int) *Configuration {
|
||||
c.deleteBuffer = size
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Configuration) GetsPerPromote(count int) *Configuration {
|
||||
c.getsPerPromote = int32(count)
|
||||
return c
|
||||
}
|
||||
|
|
13
item.go
13
item.go
|
@ -2,31 +2,32 @@ package ccache
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
"sync/atomic"
|
||||
"container/list"
|
||||
)
|
||||
|
||||
const promoteCap = 5
|
||||
|
||||
type Item struct {
|
||||
key string
|
||||
value Value
|
||||
sync.RWMutex
|
||||
promotions int32
|
||||
expires time.Time
|
||||
value interface{}
|
||||
element *list.Element
|
||||
}
|
||||
|
||||
func newItem(key string, value Value) *Item {
|
||||
func newItem(key string, value interface{}, expires time.Time) *Item {
|
||||
return &Item{
|
||||
key: key,
|
||||
value: value,
|
||||
promotions: -1,
|
||||
expires: expires,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *Item) shouldPromote() bool {
|
||||
func (i *Item) shouldPromote(getsPerPromote int32) bool {
|
||||
promotions := atomic.AddInt32(&i.promotions, 1)
|
||||
if promotions == promoteCap || promotions == 0 {
|
||||
if promotions == getsPerPromote || promotions == 0 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
|
|
@ -8,10 +8,10 @@ import (
|
|||
func TestItemPromotability(t *testing.T) {
|
||||
spec := gspec.New(t)
|
||||
item := &Item{promotions: -1}
|
||||
spec.Expect(item.shouldPromote()).ToEqual(true)
|
||||
spec.Expect(item.shouldPromote()).ToEqual(false)
|
||||
spec.Expect(item.shouldPromote(5)).ToEqual(true)
|
||||
spec.Expect(item.shouldPromote(5)).ToEqual(false)
|
||||
|
||||
item.promotions = 4
|
||||
spec.Expect(item.shouldPromote()).ToEqual(true)
|
||||
spec.Expect(item.shouldPromote()).ToEqual(false)
|
||||
spec.Expect(item.shouldPromote(5)).ToEqual(true)
|
||||
spec.Expect(item.shouldPromote(5)).ToEqual(false)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ CCache is an LRU Cache, written in Go, focused on supporting high concurrency.
|
|||
|
||||
Lock contention on the list is reduced by:
|
||||
|
||||
1 - Introducing a window which limits the frequency that an item can get promoted
|
||||
2 - Using a buffered channel to queue promotions for a single worker
|
||||
3 - Garbage collecting within the same thread as the worker
|
||||
* Introducing a window which limits the frequency that an item can get promoted
|
||||
* Using a buffered channel to queue promotions for a single worker
|
||||
* Garbage collecting within the same thread as the worker
|
||||
|
||||
|
|
Loading…
Reference in a new issue