From ff8727e847a28175e94f0b65eadaaef02da8abcd Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Fri, 21 Nov 2014 14:39:25 +0700 Subject: [PATCH] initial work on tracking cache by item size --- bucket.go | 16 +++++++++++++--- bucket_test.go | 8 +++++--- cache.go | 14 ++++++++------ configuration.go | 4 ++-- item.go | 10 ++++++++++ layeredbucket.go | 6 +++--- layeredcache.go | 15 +++++++++------ 7 files changed, 50 insertions(+), 23 deletions(-) diff --git a/bucket.go b/bucket.go index 8687672..b0fd8bb 100644 --- a/bucket.go +++ b/bucket.go @@ -2,6 +2,7 @@ package ccache import ( "sync" + "sync/atomic" "time" ) @@ -16,18 +17,27 @@ func (b *bucket) get(key string) *Item { return b.lookup[key] } -func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool) { +func (b *bucket) set(key string, value interface{}, duration time.Duration) (*Item, bool, int64) { expires := time.Now().Add(duration).Unix() b.Lock() defer b.Unlock() if existing, exists := b.lookup[key]; exists { + s := existing.size existing.value = value existing.expires = expires - return existing, false + d := int64(0) + if sized, ok := value.(Sized); ok { + newSize := sized.Size() + d = newSize - s + if d != 0 { + atomic.StoreInt64(&existing.size, newSize) + } + } + return existing, false, int64(d) } item := newItem(key, value, expires) b.lookup[key] = item - return item, true + return item, true, int64(item.size) } func (b *bucket) replace(key string, value interface{}) bool { diff --git a/bucket_test.go b/bucket_test.go index 379852a..25717fb 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -32,20 +32,22 @@ func (_ *BucketTests) DeleteItemFromBucket() { func (_ *BucketTests) SetsANewBucketItem() { bucket := testBucket() - item, new := bucket.set("spice", TestValue("flow"), time.Minute) + item, new, d := bucket.set("spice", TestValue("flow"), time.Minute) assertValue(item, "flow") item = bucket.get("spice") assertValue(item, "flow") Expect(new).To.Equal(true) + Expect(d).To.Equal(1) } func (_ *BucketTests) SetsAnExistingItem() { bucket := testBucket() - item, new := bucket.set("power", TestValue("9002"), time.Minute) + item, new, d := bucket.set("power", TestValue("9002"), time.Minute) assertValue(item, "9002") item = bucket.get("power") assertValue(item, "9002") Expect(new).To.Equal(false) + Expect(d).To.Equal(0) } func (_ *BucketTests) ReplaceDoesNothingIfKeyDoesNotExist() { @@ -56,7 +58,7 @@ func (_ *BucketTests) ReplaceDoesNothingIfKeyDoesNotExist() { func (_ *BucketTests) ReplaceReplacesThevalue() { bucket := testBucket() - item, _ := bucket.set("power", TestValue("9002"), time.Minute) + item, _, _ := bucket.set("power", TestValue("9002"), time.Minute) Expect(bucket.replace("power", TestValue("9004"))).To.Equal(true) Expect(item.Value().(string)).To.Equal("9004") Expect(bucket.get("power").Value().(string)).To.Equal("9004") diff --git a/cache.go b/cache.go index e50e904..cb20984 100644 --- a/cache.go +++ b/cache.go @@ -11,7 +11,7 @@ import ( type Cache struct { *Configuration list *list.List - size uint64 + size int64 buckets []*bucket bucketMask uint32 deletables chan *Item @@ -67,12 +67,15 @@ func (c *Cache) TrackingGet(key string) TrackedItem { // Set the value in the cache for the specified duration func (c *Cache) Set(key string, value interface{}, duration time.Duration) { - item, new := c.bucket(key).set(key, value, duration) + item, new, d := c.bucket(key).set(key, value, duration) if new { c.promote(item) } else { c.conditionalPromote(item) } + if d != 0 { + atomic.AddInt64(&c.size, d) + } } // Replace the value if it exists, does not set if it doesn't. @@ -142,14 +145,14 @@ func (c *Cache) worker() { for { select { case item := <-c.promotables: - if c.doPromote(item) && c.size > c.maxItems { + if c.doPromote(item) && atomic.LoadInt64(&c.size) > c.maxItems { c.gc() } case item := <-c.deletables: if item.element == nil { item.promotions = -2 } else { - c.size -= 1 + atomic.AddInt64(&c.size, -item.size) c.list.Remove(item.element) } } @@ -166,7 +169,6 @@ func (c *Cache) doPromote(item *Item) bool { c.list.MoveToFront(item.element) return false } - c.size += 1 item.element = c.list.PushFront(item) return true } @@ -181,7 +183,7 @@ func (c *Cache) gc() { item := element.Value.(*Item) if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 { c.bucket(item.key).delete(item.key) - c.size -= 1 + atomic.AddInt64(&c.size, -item.size) c.list.Remove(element) } element = prev diff --git a/configuration.go b/configuration.go index f06067c..736c362 100644 --- a/configuration.go +++ b/configuration.go @@ -1,7 +1,7 @@ package ccache type Configuration struct { - maxItems uint64 + maxItems int64 buckets int itemsToPrune int deleteBuffer int @@ -27,7 +27,7 @@ func Configure() *Configuration { // The max number of items to store in the cache // [5000] -func (c *Configuration) MaxItems(max uint64) *Configuration { +func (c *Configuration) MaxItems(max int64) *Configuration { c.maxItems = max return c } diff --git a/item.go b/item.go index e7812a4..35e42c6 100644 --- a/item.go +++ b/item.go @@ -6,6 +6,10 @@ import ( "time" ) +type Sized interface { + Size() int64 +} + type TrackedItem interface { Value() interface{} Release() @@ -43,15 +47,21 @@ type Item struct { promotions int32 refCount int32 expires int64 + size int64 value interface{} element *list.Element } func newItem(key string, value interface{}, expires int64) *Item { + size := int64(1) + if sized, ok := value.(Sized); ok { + size = sized.Size() + } return &Item{ key: key, value: value, promotions: -1, + size: size, expires: expires, } } diff --git a/layeredbucket.go b/layeredbucket.go index 370b763..73a0ddc 100644 --- a/layeredbucket.go +++ b/layeredbucket.go @@ -20,7 +20,7 @@ func (b *layeredBucket) get(primary, secondary string) *Item { return bucket.get(secondary) } -func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, bool) { +func (b *layeredBucket) set(primary, secondary string, value interface{}, duration time.Duration) (*Item, bool, int64) { b.Lock() bkt, exists := b.buckets[primary] if exists == false { @@ -28,11 +28,11 @@ func (b *layeredBucket) set(primary, secondary string, value interface{}, durati b.buckets[primary] = bkt } b.Unlock() - item, new := bkt.set(secondary, value, duration) + item, new, d := bkt.set(secondary, value, duration) if new { item.group = primary } - return item, new + return item, new, d } func (b *layeredBucket) replace(primary, secondary string, value interface{}) bool { diff --git a/layeredcache.go b/layeredcache.go index ed0b719..869090d 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -13,7 +13,7 @@ type LayeredCache struct { list *list.List buckets []*layeredBucket bucketMask uint32 - size uint64 + size int64 deletables chan *Item promotables chan *Item } @@ -78,12 +78,15 @@ func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem { // Set the value in the cache for the specified duration func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) { - item, new := c.bucket(primary).set(primary, secondary, value, duration) + item, new, d := c.bucket(primary).set(primary, secondary, value, duration) if new { c.promote(item) } else { c.conditionalPromote(item) } + if d != 0 { + atomic.AddInt64(&c.size, d) + } } // Replace the value if it exists, does not set if it doesn't. @@ -128,6 +131,7 @@ func (c *LayeredCache) Clear() { for _, bucket := range c.buckets { bucket.clear() } + c.size = 0 c.list = list.New() } @@ -152,14 +156,14 @@ func (c *LayeredCache) worker() { for { select { case item := <-c.promotables: - if c.doPromote(item) && c.size > c.maxItems { + if c.doPromote(item) && atomic.LoadInt64(&c.size) > c.maxItems { c.gc() } case item := <-c.deletables: if item.element == nil { item.promotions = -2 } else { - c.size -= 1 + atomic.AddInt64(&c.size, -item.size) c.list.Remove(item.element) } } @@ -177,7 +181,6 @@ func (c *LayeredCache) doPromote(item *Item) bool { c.list.MoveToFront(item.element) return false } - c.size += 1 item.element = c.list.PushFront(item) return true } @@ -191,7 +194,7 @@ func (c *LayeredCache) gc() { prev := element.Prev() item := element.Value.(*Item) if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 { - c.size -= 1 + atomic.AddInt64(&c.size, -item.size) c.bucket(item.group).delete(item.group, item.key) c.list.Remove(element) }