From 97bc65dc6a3e2ec632229c14245d65ef9f4ab8ef Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Sat, 19 Oct 2013 08:56:28 +0800 Subject: [PATCH] first commit --- app/ccache.go | 30 +++++++++++++++ bucket.go | 37 +++++++++++++++++++ cache.go | 95 ++++++++++++++++++++++++++++++++++++++++++++++++ configuration.go | 48 ++++++++++++++++++++++++ item.go | 22 +++++++++++ readme.md | 9 +++++ 6 files changed, 241 insertions(+) create mode 100755 app/ccache.go create mode 100755 bucket.go create mode 100755 cache.go create mode 100755 configuration.go create mode 100755 item.go create mode 100644 readme.md diff --git a/app/ccache.go b/app/ccache.go new file mode 100755 index 0000000..2a5ee26 --- /dev/null +++ b/app/ccache.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "time" + "ccache" +) + +func main() { + c := ccache.New(ccache.Configure().PromoteDelay(time.Second * 0)) + fmt.Println(c.Get("abc")) + c.Set("abc", new("xxx")) + fmt.Println(c.Get("abc")) + time.Sleep(time.Second) + fmt.Println(c.Get("abc")) + time.Sleep(time.Second) +} + +type Item struct { + value string + expires time.Time +} + +func (i *Item) Expires() time.Time { + return i.expires +} + +func new(value string) *Item { + return &Item{value, time.Now().Add(time.Minute)} +} diff --git a/bucket.go b/bucket.go new file mode 100755 index 0000000..6cf9562 --- /dev/null +++ b/bucket.go @@ -0,0 +1,37 @@ +package ccache + +import ( + "sync" +) + +type Bucket struct { + sync.RWMutex + lookup map[string]*Item +} + +func (b *Bucket) Get(key string) *Item { + b.RLock() + defer b.RUnlock() + return b.lookup[key] +} + +func (b *Bucket) Set(key string, value Value) *Item { + b.Lock() + defer b.Unlock() + if existing, exists := b.lookup[key]; exists { + existing.Lock() + existing.value = value + existing.Unlock() + return existing + } + item := &Item{key: key, value: value} + b.lookup[key] = item + return item +} + + +func (b *Bucket) Remove(key string) { + b.Lock() + defer b.Unlock() + delete(b.lookup, key) +} diff --git a/cache.go b/cache.go new file mode 100755 index 0000000..019a6f9 --- /dev/null +++ b/cache.go @@ -0,0 +1,95 @@ +package ccache + +import ( + "time" + "runtime" + "hash/fnv" + "container/list" +) + +type Value interface { + Expires() time.Time +} + +type Cache struct { + *Configuration + list *list.List + buckets []*Bucket + bucketCount uint32 + promotables chan *Item +} + +func New(config *Configuration) *Cache { + c := &Cache{ + list: new(list.List), + Configuration: config, + bucketCount: uint32(config.buckets), + buckets: make([]*Bucket, config.buckets), + promotables: make(chan *Item, config.promoteBuffer), + } + for i := 0; i < config.buckets; i++ { + c.buckets[i] = &Bucket{ + lookup: make(map[string]*Item), + } + } + go c.worker() + return c +} + +func (c *Cache) Get(key string) Value { + item := c.bucket(key).Get(key) + if item == nil { return nil } + c.promote(item) + return item.value +} + +func (c *Cache) Set(key string, value Value) { + item := c.bucket(key).Set(key, value) + c.promote(item) +} + +func (c *Cache) bucket(key string) *Bucket { + h := fnv.New32a() + h.Write([]byte(key)) + index := h.Sum32() % c.bucketCount + return c.buckets[index] +} + +func (c *Cache) promote(item *Item) { + if item.shouldPromote(c.promoteDelay) == false { return } + c.promotables <- item +} + +func (c *Cache) worker() { + ms := new(runtime.MemStats) + for { + wasNew := c.doPromote(<- c.promotables) + if wasNew == false { continue } + runtime.ReadMemStats(ms) + if ms.HeapAlloc > c.size{ + c.gc() + } + } +} + +func (c *Cache) doPromote(item *Item) bool { + item.Lock() + defer item.Unlock() + item.promoted = time.Now() + if item.element != nil { //not a new item + c.list.MoveToFront(item.element) + return false + } + item.element = c.list.PushFront(item) + return true +} + +func (c *Cache) gc() { + for i := 0; i < c.itemsToPrune; i++ { + element := c.list.Back() + if element == nil { return } + item := element.Value.(*Item) + c.bucket(item.key).Remove(item.key) + c.list.Remove(element) + } +} diff --git a/configuration.go b/configuration.go new file mode 100755 index 0000000..ad32b98 --- /dev/null +++ b/configuration.go @@ -0,0 +1,48 @@ +package ccache + +import ( + "time" +) + +type Configuration struct { + size uint64 + buckets int + itemsToPrune int + promoteBuffer int + promoteDelay time.Duration +} + +func Configure() *Configuration { + return &Configuration{ + buckets: 64, + itemsToPrune: 500, + promoteBuffer: 1024, + size: 500 * 1024 * 1024, //500MB + promoteDelay: time.Minute * -5, + } +} + +func (c *Configuration) Buckets(count int) *Configuration { + c.buckets = count + return c +} + +func (c *Configuration) PromoteBuffer(size int) *Configuration { + c.promoteBuffer = size + return c +} + +func (c *Configuration) PromoteDelay(delay time.Duration) *Configuration { + c.promoteDelay = -delay + return c +} + +func (c *Configuration) Size(bytes uint64) *Configuration { + c.size = bytes + return c +} + +func (c *Configuration) ItemsToPrune(count int) *Configuration { + c.itemsToPrune = count + return c +} diff --git a/item.go b/item.go new file mode 100755 index 0000000..0a2f954 --- /dev/null +++ b/item.go @@ -0,0 +1,22 @@ +package ccache + +import ( + "sync" + "time" + "container/list" +) + +type Item struct { + key string + value Value + sync.RWMutex + promoted time.Time + element *list.Element +} + +func (i *Item) shouldPromote(staleness time.Duration) bool { + stale := time.Now().Add(staleness) + i.RLock() + defer i.RUnlock() + return i.promoted.Before(stale) +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..e40f7da --- /dev/null +++ b/readme.md @@ -0,0 +1,9 @@ +# CCache +CCache is an LRU Cache, written in Go, focused on supporting high concurrency. + +Lock contention on the list is reduced by: + +1 - Introducing a window which limits the frequency that an item can get promoted +2 - Using a buffered channel to queue promotions for a single worker +3 - Garbage collecting within the same thread as the worker +