ccache/cache.go
Alex Grintsvayg 818c60532e
add Size() fn
2020-11-03 15:35:11 -05:00

316 lines
7 KiB
Go

// An LRU cached aimed at high concurrency
package ccache
import (
"container/list"
"hash/fnv"
"sync/atomic"
"time"
)
// The cache has a generic 'control' channel that is used to send
// messages to the worker. These are the messages that can be sent to it
type getDropped struct {
res chan int
}
type setMaxSize struct {
size int64
}
type clear struct {
done chan struct{}
}
type Cache struct {
*Configuration
list *list.List
size int64
buckets []*bucket
bucketMask uint32
deletables chan *Item
promotables chan *Item
control chan interface{}
}
// Create a new cache with the specified configuration
// See ccache.Configure() for creating a configuration
func New(config *Configuration) *Cache {
c := &Cache{
list: list.New(),
Configuration: config,
bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket, config.buckets),
control: make(chan interface{}),
}
for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &bucket{
lookup: make(map[string]*Item),
}
}
c.restart()
return c
}
func (c *Cache) ItemCount() int {
count := 0
for _, b := range c.buckets {
count += b.itemCount()
}
return count
}
func (c *Cache) Size() int64 {
return c.size
}
func (c *Cache) DeletePrefix(prefix string) int {
count := 0
for _, b := range c.buckets {
count += b.deletePrefix(prefix, c.deletables)
}
return count
}
// Deletes all items that the matches func evaluates to true.
func (c *Cache) DeleteFunc(matches func(key string, item *Item) bool) int {
count := 0
for _, b := range c.buckets {
count += b.deleteFunc(matches, c.deletables)
}
return count
}
// Get an item from the cache. Returns nil if the item wasn't found.
// This can return an expired item. Use item.Expired() to see if the item
// is expired and item.TTL() to see how long until the item expires (which
// will be negative for an already expired item).
func (c *Cache) Get(key string) *Item {
item := c.bucket(key).get(key)
if item == nil {
return nil
}
if item.expires > time.Now().UnixNano() {
c.promote(item)
}
return item
}
// Used when the cache was created with the Track() configuration option.
// Avoid otherwise
func (c *Cache) TrackingGet(key string) TrackedItem {
item := c.Get(key)
if item == nil {
return NilTracked
}
item.track()
return item
}
// Used when the cache was created with the Track() configuration option.
// Sets the item, and returns a tracked reference to it.
func (c *Cache) TrackingSet(key string, value interface{}, duration time.Duration) TrackedItem {
return c.set(key, value, duration, true)
}
// Set the value in the cache for the specified duration
func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
c.set(key, value, duration, false)
}
// Replace the value if it exists, does not set if it doesn't.
// Returns true if the item existed an was replaced, false otherwise.
// Replace does not reset item's TTL
func (c *Cache) Replace(key string, value interface{}) bool {
item := c.bucket(key).get(key)
if item == nil {
return false
}
c.Set(key, value, item.TTL())
return true
}
// Attempts to get the value from the cache and calles fetch on a miss (missing
// or stale item). If fetch returns an error, no value is cached and the error
// is returned back to the caller.
func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
item := c.Get(key)
if item != nil && !item.Expired() {
return item, nil
}
value, err := fetch()
if err != nil {
return nil, err
}
return c.set(key, value, duration, false), nil
}
// Remove the item from the cache, return true if the item was present, false otherwise.
func (c *Cache) Delete(key string) bool {
item := c.bucket(key).delete(key)
if item != nil {
c.deletables <- item
return true
}
return false
}
// Clears the cache
func (c *Cache) Clear() {
done := make(chan struct{})
c.control <- clear{done: done}
<-done
}
// Stops the background worker. Operations performed on the cache after Stop
// is called are likely to panic
func (c *Cache) Stop() {
close(c.promotables)
<-c.control
}
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *Cache) GetDropped() int {
res := make(chan int)
c.control <- getDropped{res: res}
return <-res
}
// Sets a new max size. That can result in a GC being run if the new maxium size
// is smaller than the cached size
func (c *Cache) SetMaxSize(size int64) {
c.control <- setMaxSize{size}
}
func (c *Cache) restart() {
c.deletables = make(chan *Item, c.deleteBuffer)
c.promotables = make(chan *Item, c.promoteBuffer)
c.control = make(chan interface{})
go c.worker()
}
func (c *Cache) deleteItem(bucket *bucket, item *Item) {
bucket.delete(item.key) //stop other GETs from getting it
c.deletables <- item
}
func (c *Cache) set(key string, value interface{}, duration time.Duration, track bool) *Item {
item, existing := c.bucket(key).set(key, value, duration, track)
if existing != nil {
c.deletables <- existing
}
c.promote(item)
return item
}
func (c *Cache) bucket(key string) *bucket {
h := fnv.New32a()
h.Write([]byte(key))
return c.buckets[h.Sum32()&c.bucketMask]
}
func (c *Cache) promote(item *Item) {
c.promotables <- item
}
func (c *Cache) worker() {
defer close(c.control)
dropped := 0
for {
select {
case item, ok := <-c.promotables:
if ok == false {
goto drain
}
if c.doPromote(item) && c.size > c.maxSize {
dropped += c.gc()
}
case item := <-c.deletables:
c.doDelete(item)
case control := <-c.control:
switch msg := control.(type) {
case getDropped:
msg.res <- dropped
dropped = 0
case setMaxSize:
c.maxSize = msg.size
if c.size > c.maxSize {
dropped += c.gc()
}
case clear:
for _, bucket := range c.buckets {
bucket.clear()
}
c.size = 0
c.list = list.New()
msg.done <- struct{}{}
}
}
}
drain:
for {
select {
case item := <-c.deletables:
c.doDelete(item)
default:
close(c.deletables)
return
}
}
}
func (c *Cache) doDelete(item *Item) {
if item.element == nil {
item.promotions = -2
} else {
c.size -= item.size
if c.onDelete != nil {
c.onDelete(item)
}
c.list.Remove(item.element)
}
}
func (c *Cache) doPromote(item *Item) bool {
//already deleted
if item.promotions == -2 {
return false
}
if item.element != nil { //not a new item
if item.shouldPromote(c.getsPerPromote) {
c.list.MoveToFront(item.element)
item.promotions = 0
}
return false
}
c.size += item.size
item.element = c.list.PushFront(item)
return true
}
func (c *Cache) gc() int {
dropped := 0
element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ {
if element == nil {
return dropped
}
prev := element.Prev()
item := element.Value.(*Item)
if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
c.bucket(item.key).delete(item.key)
c.size -= item.size
c.list.Remove(element)
if c.onDelete != nil {
c.onDelete(item)
}
dropped += 1
item.promotions = -2
}
element = prev
}
return dropped
}