initial work on tracking cache by item size

This commit is contained in:
Karl Seguin 2014-11-21 14:39:25 +07:00
parent 44cdb043d1
commit ff8727e847
7 changed files with 50 additions and 23 deletions

View file

@ -2,6 +2,7 @@ package ccache
import (
"sync"
"sync/atomic"
"time"
)
@ -16,18 +17,27 @@ func (b *bucket) get(key string) *Item {
return b.lookup[key]
}
func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool) {
func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool, int64) {
expires := time.Now().Add(duration).Unix()
b.Lock()
defer b.Unlock()
if existing, exists := b.lookup[key]; exists {
s := existing.size
existing.value = value
existing.expires = expires
return existing, false
d := int64(0)
if sized, ok := value.(Sized); ok {
newSize := sized.Size()
d = newSize - s
if d != 0 {
atomic.StoreInt64(&existing.size, newSize)
}
}
return existing, false, int64(d)
}
item := newItem(key, value, expires)
b.lookup[key] = item
return item, true
return item, true, int64(item.size)
}
func (b *bucket) replace(key string, value interface{}) bool {

View file

@ -32,20 +32,22 @@ func (_ *BucketTests) DeleteItemFromBucket() {
func (_ *BucketTests) SetsANewBucketItem() {
bucket := testBucket()
item, new := bucket.set("spice", TestValue("flow"), time.Minute)
item, new, d := bucket.set("spice", TestValue("flow"), time.Minute)
assertValue(item, "flow")
item = bucket.get("spice")
assertValue(item, "flow")
Expect(new).To.Equal(true)
Expect(d).To.Equal(1)
}
func (_ *BucketTests) SetsAnExistingItem() {
bucket := testBucket()
item, new := bucket.set("power", TestValue("9002"), time.Minute)
item, new, d := bucket.set("power", TestValue("9002"), time.Minute)
assertValue(item, "9002")
item = bucket.get("power")
assertValue(item, "9002")
Expect(new).To.Equal(false)
Expect(d).To.Equal(0)
}
func (_ *BucketTests) ReplaceDoesNothingIfKeyDoesNotExist() {
@ -56,7 +58,7 @@ func (_ *BucketTests) ReplaceDoesNothingIfKeyDoesNotExist() {
func (_ *BucketTests) ReplaceReplacesThevalue() {
bucket := testBucket()
item, _ := bucket.set("power", TestValue("9002"), time.Minute)
item, _, _ := bucket.set("power", TestValue("9002"), time.Minute)
Expect(bucket.replace("power", TestValue("9004"))).To.Equal(true)
Expect(item.Value().(string)).To.Equal("9004")
Expect(bucket.get("power").Value().(string)).To.Equal("9004")

View file

@ -11,7 +11,7 @@ import (
type Cache struct {
*Configuration
list *list.List
size uint64
size int64
buckets []*bucket
bucketMask uint32
deletables chan *Item
@ -67,12 +67,15 @@ func (c *Cache) TrackingGet(key string) TrackedItem {
// Set the value in the cache for the specified duration
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
item, new := c.bucket(key).set(key, value, duration)
item, new, d := c.bucket(key).set(key, value, duration)
if new {
c.promote(item)
} else {
c.conditionalPromote(item)
}
if d != 0 {
atomic.AddInt64(&c.size, d)
}
}
// Replace the value if it exists, does not set if it doesn't.
@ -142,14 +145,14 @@ func (c *Cache) worker() {
for {
select {
case item := <-c.promotables:
if c.doPromote(item) && c.size > c.maxItems {
if c.doPromote(item) && atomic.LoadInt64(&c.size) > c.maxItems {
c.gc()
}
case item := <-c.deletables:
if item.element == nil {
item.promotions = -2
} else {
c.size -= 1
atomic.AddInt64(&c.size, -item.size)
c.list.Remove(item.element)
}
}
@ -166,7 +169,6 @@ func (c *Cache) doPromote(item *Item) bool {
c.list.MoveToFront(item.element)
return false
}
c.size += 1
item.element = c.list.PushFront(item)
return true
}
@ -181,7 +183,7 @@ func (c *Cache) gc() {
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.key).delete(item.key)
c.size -= 1
atomic.AddInt64(&c.size, -item.size)
c.list.Remove(element)
}
element = prev

View file

@ -1,7 +1,7 @@
package ccache
type Configuration struct {
maxItems uint64
maxItems int64
buckets int
itemsToPrune int
deleteBuffer int
@ -27,7 +27,7 @@ func Configure() *Configuration {
// The max number of items to store in the cache
// [5000]
func (c *Configuration) MaxItems(max uint64) *Configuration {
func (c *Configuration) MaxItems(max int64) *Configuration {
c.maxItems = max
return c
}

10
item.go
View file

@ -6,6 +6,10 @@ import (
"time"
)
type Sized interface {
Size() int64
}
type TrackedItem interface {
Value() interface{}
Release()
@ -43,15 +47,21 @@ type Item struct {
promotions int32
refCount int32
expires int64
size int64
value interface{}
element *list.Element
}
func newItem(key string, value interface{}, expires int64) *Item {
size := int64(1)
if sized, ok := value.(Sized); ok {
size = sized.Size()
}
return &Item{
key: key,
value: value,
promotions: -1,
size: size,
expires: expires,
}
}

View file

@ -20,7 +20,7 @@ func (b *layeredBucket) get(primary, secondary string) *Item {
return bucket.get(secondary)
}
func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, bool) {
func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, bool, int64) {
b.Lock()
bkt, exists := b.buckets[primary]
if exists == false {
@ -28,11 +28,11 @@ func (b *layeredBucket) set(primary, secondary string, value interface{}, durati
b.buckets[primary] = bkt
}
b.Unlock()
item, new := bkt.set(secondary, value, duration)
item, new, d := bkt.set(secondary, value, duration)
if new {
item.group = primary
}
return item, new
return item, new, d
}
func (b *layeredBucket) replace(primary, secondary string, value interface{}) bool {

View file

@ -13,7 +13,7 @@ type LayeredCache struct {
list *list.List
buckets []*layeredBucket
bucketMask uint32
size uint64
size int64
deletables chan *Item
promotables chan *Item
}
@ -78,12 +78,15 @@ func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
// Set the value in the cache for the specified duration
func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
item, new := c.bucket(primary).set(primary, secondary, value, duration)
item, new, d := c.bucket(primary).set(primary, secondary, value, duration)
if new {
c.promote(item)
} else {
c.conditionalPromote(item)
}
if d != 0 {
atomic.AddInt64(&c.size, d)
}
}
// Replace the value if it exists, does not set if it doesn't.
@ -128,6 +131,7 @@ func (c *LayeredCache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
}
@ -152,14 +156,14 @@ func (c *LayeredCache) worker() {
for {
select {
case item := <-c.promotables:
if c.doPromote(item) && c.size > c.maxItems {
if c.doPromote(item) && atomic.LoadInt64(&c.size) > c.maxItems {
c.gc()
}
case item := <-c.deletables:
if item.element == nil {
item.promotions = -2
} else {
c.size -= 1
atomic.AddInt64(&c.size, -item.size)
c.list.Remove(item.element)
}
}
@ -177,7 +181,6 @@ func (c *LayeredCache) doPromote(item *Item) bool {
c.list.MoveToFront(item.element)
return false
}
c.size += 1
item.element = c.list.PushFront(item)
return true
}
@ -191,7 +194,7 @@ func (c *LayeredCache) gc() {
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.size -= 1
atomic.AddInt64(&c.size, -item.size)
c.bucket(item.group).delete(item.group, item.key)
c.list.Remove(element)
}