ccache/layeredcache.go

200 lines
5.3 KiB
Go
Raw Normal View History

2014-10-25 07:19:14 +02:00
// An LRU cached aimed at high concurrency
package ccache
import (
"container/list"
"hash/fnv"
"sync/atomic"
"time"
)
type LayeredCache struct {
*Configuration
list *list.List
buckets []*layeredBucket
bucketMask uint32
size int64
2014-10-25 07:19:14 +02:00
deletables chan *Item
promotables chan *Item
}
// Create a new layered cache with the specified configuration.
// A layered cache used a two keys to identify a value: a primary key
// and a secondary key. Get, Set and Delete require both a primary and
// secondary key. However, DeleteAll requires only a primary key, deleting
// all values that share the same primary key.
// Layered Cache is useful as an HTTP cache, where an HTTP purge might
// delete multiple variants of the same resource:
// primary key = "user/44"
// secondary key 1 = ".json"
// secondary key 2 = ".xml"
// See ccache.Configure() for creating a configuration
2014-10-25 07:19:14 +02:00
func Layered(config *Configuration) *LayeredCache {
c := &LayeredCache{
list: list.New(),
Configuration: config,
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket, config.buckets),
2014-10-25 07:19:14 +02:00
deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket{
buckets: make(map[string]*bucket),
2014-10-25 07:19:14 +02:00
}
}
go c.worker()
return c
}
// Get an item from the cache. Returns nil if the item wasn't found.
// This can return an expired item. Use item.Expired() to see if the item
// is expired and item.TTL() to see how long until the item expires (which
// will be negative for an already expired item).
func (c *LayeredCache) Get(primary, secondary string) *Item {
item := c.bucket(primary).get(primary, secondary)
if item == nil {
return nil
2014-10-25 07:19:14 +02:00
}
if item.expires > time.Now().Unix() {
c.promote(item)
}
return item
2014-10-25 07:19:14 +02:00
}
// Used when the cache was created with the Track() configuration option.
// Avoid otherwise
2014-10-25 07:19:14 +02:00
func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
item := c.Get(primary, secondary)
2014-10-25 07:19:14 +02:00
if item == nil {
return NilTracked
}
item.track()
return item
}
// Set the value in the cache for the specified duration
2014-10-25 07:19:14 +02:00
func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
item, existing := c.bucket(primary).set(primary, secondary, value, duration)
if existing != nil {
c.deletables <- existing
}
c.promote(item)
2014-10-25 07:19:14 +02:00
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL nor does it alter its position in the LRU
2014-11-13 16:23:52 +01:00
func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
item := c.bucket(primary).get(primary, secondary)
if item == nil {
return false
2014-11-21 09:45:11 +01:00
}
c.Set(primary, secondary, value, item.TTL())
return true
2014-11-13 16:23:52 +01:00
}
// Attempts to get the value from the cache and calles fetch on a miss.
// If fetch returns an error, no value is cached and the error is returned back
// to the caller.
2014-10-25 07:19:14 +02:00
func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (interface{}, error) {
item := c.Get(primary, secondary)
if item != nil {
return item, nil
}
value, err := fetch()
if err == nil {
c.Set(primary, secondary, value, duration)
}
return value, err
}
// Remove the item from the cache, return true if the item was present, false otherwise.
func (c *LayeredCache) Delete(primary, secondary string) bool {
2014-10-25 07:19:14 +02:00
item := c.bucket(primary).delete(primary, secondary)
if item != nil {
c.deletables <- item
return true
2014-10-25 07:19:14 +02:00
}
return false
2014-10-25 07:19:14 +02:00
}
// Deletes all items that share the same primary key
func (c *LayeredCache) DeleteAll(primary string) bool {
return c.bucket(primary).deleteAll(primary, c.deletables)
2014-10-25 07:19:14 +02:00
}
//this isn't thread safe. It's meant to be called from non-concurrent tests
func (c *LayeredCache) Clear() {
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
2014-10-25 07:19:14 +02:00
c.list = list.New()
}
func (c *LayeredCache) bucket(key string) *layeredBucket {
2014-10-25 07:19:14 +02:00
h := fnv.New32a()
h.Write([]byte(key))
return c.buckets[h.Sum32()&c.bucketMask]
2014-10-25 07:19:14 +02:00
}
func (c *LayeredCache) promote(item *Item) {
c.promotables <- item
}
func (c *LayeredCache) worker() {
for {
select {
case item := <-c.promotables:
if c.doPromote(item) && c.size > c.maxSize {
2014-10-25 07:19:14 +02:00
c.gc()
}
case item := <-c.deletables:
if item.element == nil {
item.promotions = -2
} else {
c.size -= item.size
2014-10-25 07:19:14 +02:00
c.list.Remove(item.element)
}
}
}
}
func (c *LayeredCache) doPromote(item *Item) bool {
// deleted before it ever got promoted
if item.promotions == -2 {
return false
}
if item.element != nil { //not a new item
if item.shouldPromote(c.getsPerPromote) {
c.list.MoveToFront(item.element)
item.promotions = 0
}
2014-10-25 07:19:14 +02:00
return false
}
c.size += item.size
2014-10-25 07:19:14 +02:00
item.element = c.list.PushFront(item)
return true
}
func (c *LayeredCache) gc() {
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.group).delete(item.group, item.key)
c.size -= item.size
2014-10-25 07:19:14 +02:00
c.list.Remove(element)
item.promotions = -2
2014-10-25 07:19:14 +02:00
}
element = prev
}
}