ccache/cache.go
Karl Seguin 890bb18dbf The cache can now do reference counting so that the LRU algorithm is aware of
long-lived objects and won't clean them up. Oftentimes, the value returned
from a cache hit is short-lived. As a silly example:

	func GetUser(http.responseWrite) {
		user := cache.Get("user:1")
		response.Write(serialize(user))
	}

It's fine if the cache's GC cleans up "user:1" while the user variable has a reference to the
object..the cache's reference is removed and the real GC will clean it up
at some point after the user variable falls out of scope.

However, what if user is long-lived? Possibly stored as a reference to another
cached object? Normally (without this commit) the next time you call
cache.Get("user:1"), you'll get a miss and will need to refetch the object; even
though the original user object is still somewhere in memory - you just lost
your reference to it from the cache.

By enabling the Track() configuration flag, and calling TrackingGet() (instead
of Get), the cache will track that the object is in-use and won't GC it (even
if there's great memory pressure (what's the point? something else is holding on
to it anyways). Calling item.Release() will decrement the number of references.
When the count is 0, the item can be pruned from the cache.

The returned value is a TrackedItem which exposes:

- Value() interface{} (to get the actual cached value)
- Release() to release the item back in the cache
2014-02-28 20:10:42 +08:00

172 lines
3.4 KiB
Go

// An LRU cached aimed at high concurrency
package ccache
import (
"container/list"
"hash/fnv"
"runtime"
"sync/atomic"
"time"
)
type Cache struct {
*Configuration
list *list.List
buckets []*Bucket
bucketCount uint32
deletables chan *Item
promotables chan *Item
}
func New(config *Configuration) *Cache {
c := &Cache{
list: list.New(),
Configuration: config,
bucketCount: uint32(config.buckets),
buckets: make([]*Bucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
}
for i := 0; i < config.buckets; i++ {
c.buckets[i] = &Bucket{
lookup: make(map[string]*Item),
}
}
go c.worker()
return c
}
func (c *Cache) Get(key string) interface{} {
if item := c.get(key); item != nil {
return item.value
}
return nil
}
func (c *Cache) TrackingGet(key string) TrackedItem {
item := c.get(key)
if item == nil {
return NilTracked
}
item.track()
return item
}
func (c *Cache) get(key string) *Item {
bucket := c.bucket(key)
item := bucket.get(key)
if item == nil {
return nil
}
if item.expires.Before(time.Now()) {
c.deleteItem(bucket, item)
return nil
}
c.conditionalPromote(item)
return item
}
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
item, new := c.bucket(key).set(key, value, duration)
if new {
c.promote(item)
} else {
c.conditionalPromote(item)
}
}
func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) {
item := c.Get(key)
if item != nil {
return item, nil
}
value, err := fetch()
if err == nil {
c.Set(key, value, duration)
}
return value, err
}
func (c *Cache) Delete(key string) {
item := c.bucket(key).getAndDelete(key)
if item != nil {
c.deletables <- item
}
}
//this isn't thread safe. It's meant to be called from non-concurrent tests
func (c *Cache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.list = list.New()
}
func (c *Cache) deleteItem(bucket *Bucket, item *Item) {
bucket.delete(item.key) //stop othe GETs from getting it
c.deletables <- item
}
func (c *Cache) bucket(key string) *Bucket {
h := fnv.New32a()
h.Write([]byte(key))
index := h.Sum32() % c.bucketCount
return c.buckets[index]
}
func (c *Cache) conditionalPromote(item *Item) {
if item.shouldPromote(c.getsPerPromote) == false {
return
}
c.promote(item)
}
func (c *Cache) promote(item *Item) {
c.promotables <- item
}
func (c *Cache) worker() {
ms := new(runtime.MemStats)
for {
select {
case item := <-c.promotables:
if wasNew := c.doPromote(item); wasNew == false {
continue
}
runtime.ReadMemStats(ms)
if ms.HeapAlloc > c.size {
c.gc()
}
case item := <-c.deletables:
c.list.Remove(item.element)
}
}
}
func (c *Cache) doPromote(item *Item) bool {
item.Lock()
defer item.Unlock()
item.promotions = 0
if item.element != nil { //not a new item
c.list.MoveToFront(item.element)
return false
}
item.element = c.list.PushFront(item)
return true
}
func (c *Cache) gc() {
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.key).delete(item.key)
c.list.Remove(element)
}
element = prev
}
}