171 lines
3.7 KiB
Go
171 lines
3.7 KiB
Go
|
// An LRU cached aimed at high concurrency
|
||
|
package ccache
|
||
|
|
||
|
import (
|
||
|
"container/list"
|
||
|
"hash/fnv"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type LayeredCache struct {
|
||
|
*Configuration
|
||
|
list *list.List
|
||
|
buckets []*LayeredBucket
|
||
|
bucketCount uint32
|
||
|
deletables chan *Item
|
||
|
promotables chan *Item
|
||
|
}
|
||
|
|
||
|
func Layered(config *Configuration) *LayeredCache {
|
||
|
c := &LayeredCache{
|
||
|
list: list.New(),
|
||
|
Configuration: config,
|
||
|
bucketCount: uint32(config.buckets),
|
||
|
buckets: make([]*LayeredBucket, config.buckets),
|
||
|
deletables: make(chan *Item, config.deleteBuffer),
|
||
|
promotables: make(chan *Item, config.promoteBuffer),
|
||
|
}
|
||
|
for i := 0; i < int(config.buckets); i++ {
|
||
|
c.buckets[i] = &LayeredBucket{
|
||
|
buckets: make(map[string]*Bucket),
|
||
|
}
|
||
|
}
|
||
|
go c.worker()
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) Get(primary, secondary string) interface{} {
|
||
|
if item := c.get(primary, secondary); item != nil {
|
||
|
return item.value
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
|
||
|
item := c.get(primary, secondary)
|
||
|
if item == nil {
|
||
|
return NilTracked
|
||
|
}
|
||
|
item.track()
|
||
|
return item
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) get(primary, secondary string) *Item {
|
||
|
bucket := c.bucket(primary)
|
||
|
item := bucket.get(primary, secondary)
|
||
|
if item == nil {
|
||
|
return nil
|
||
|
}
|
||
|
if item.expires < time.Now().Unix() {
|
||
|
return nil
|
||
|
}
|
||
|
c.conditionalPromote(item)
|
||
|
return item
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
|
||
|
item, new := c.bucket(primary).set(primary, secondary, value, duration)
|
||
|
if new {
|
||
|
c.promote(item)
|
||
|
} else {
|
||
|
c.conditionalPromote(item)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) {
|
||
|
item := c.Get(primary, secondary)
|
||
|
if item != nil {
|
||
|
return item, nil
|
||
|
}
|
||
|
value, err := fetch()
|
||
|
if err == nil {
|
||
|
c.Set(primary, secondary, value, duration)
|
||
|
}
|
||
|
return value, err
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) Delete(primary, secondary string) {
|
||
|
item := c.bucket(primary).delete(primary, secondary)
|
||
|
if item != nil {
|
||
|
c.deletables <- item
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) DeleteAll(primary string) {
|
||
|
c.bucket(primary).deleteAll(primary, c.deletables)
|
||
|
}
|
||
|
|
||
|
//this isn't thread safe. It's meant to be called from non-concurrent tests
|
||
|
func (c *LayeredCache) Clear() {
|
||
|
for _, bucket := range c.buckets {
|
||
|
bucket.clear()
|
||
|
}
|
||
|
c.list = list.New()
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) bucket(key string) *LayeredBucket {
|
||
|
h := fnv.New32a()
|
||
|
h.Write([]byte(key))
|
||
|
return c.buckets[h.Sum32()%c.bucketCount]
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) conditionalPromote(item *Item) {
|
||
|
if item.shouldPromote(c.getsPerPromote) == false {
|
||
|
return
|
||
|
}
|
||
|
c.promote(item)
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) promote(item *Item) {
|
||
|
c.promotables <- item
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) worker() {
|
||
|
for {
|
||
|
select {
|
||
|
case item := <-c.promotables:
|
||
|
if c.doPromote(item) && uint64(c.list.Len()) > c.maxItems {
|
||
|
c.gc()
|
||
|
}
|
||
|
case item := <-c.deletables:
|
||
|
if item.element == nil {
|
||
|
item.promotions = -2
|
||
|
} else {
|
||
|
c.list.Remove(item.element)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) doPromote(item *Item) bool {
|
||
|
// deleted before it ever got promoted
|
||
|
if item.promotions == -2 {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
item.promotions = 0
|
||
|
if item.element != nil { //not a new item
|
||
|
c.list.MoveToFront(item.element)
|
||
|
return false
|
||
|
}
|
||
|
item.element = c.list.PushFront(item)
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (c *LayeredCache) gc() {
|
||
|
element := c.list.Back()
|
||
|
for i := 0; i < c.itemsToPrune; i++ {
|
||
|
if element == nil {
|
||
|
return
|
||
|
}
|
||
|
prev := element.Prev()
|
||
|
item := element.Value.(*Item)
|
||
|
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
|
||
|
c.bucket(item.group).delete(item.group, item.key)
|
||
|
c.list.Remove(element)
|
||
|
}
|
||
|
element = prev
|
||
|
}
|
||
|
}
|