add GetDropped function

This commit is contained in:
Karl Seguin 2020-02-05 22:05:05 +08:00
parent 78289f8f0b
commit 1a257a89d6
4 changed files with 60 additions and 21 deletions

View file

@ -10,13 +10,15 @@ import (
type Cache struct { type Cache struct {
*Configuration *Configuration
list *list.List list *list.List
size int64 size int64
buckets []*bucket buckets []*bucket
bucketMask uint32 bucketMask uint32
deletables chan *Item deletables chan *Item
promotables chan *Item promotables chan *Item
donec chan struct{} donec chan struct{}
getDroppedReq chan struct{}
getDroppedRes chan int
} }
// Create a new cache with the specified configuration // Create a new cache with the specified configuration
@ -27,6 +29,8 @@ func New(config *Configuration) *Cache {
Configuration: config, Configuration: config,
bucketMask: uint32(config.buckets) - 1, bucketMask: uint32(config.buckets) - 1,
buckets: make([]*bucket, config.buckets), buckets: make([]*bucket, config.buckets),
getDroppedReq: make(chan struct{}),
getDroppedRes: make(chan int),
} }
for i := 0; i < int(config.buckets); i++ { for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &bucket{ c.buckets[i] = &bucket{
@ -137,6 +141,13 @@ func (c *Cache) Stop() {
<-c.donec <-c.donec
} }
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *Cache) GetDropped() int {
c.getDroppedReq <- struct{}{}
return <-c.getDroppedRes
}
func (c *Cache) restart() { func (c *Cache) restart() {
c.deletables = make(chan *Item, c.deleteBuffer) c.deletables = make(chan *Item, c.deleteBuffer)
c.promotables = make(chan *Item, c.promoteBuffer) c.promotables = make(chan *Item, c.promoteBuffer)
@ -170,7 +181,7 @@ func (c *Cache) promote(item *Item) {
func (c *Cache) worker() { func (c *Cache) worker() {
defer close(c.donec) defer close(c.donec)
dropped := 0
for { for {
select { select {
case item, ok := <-c.promotables: case item, ok := <-c.promotables:
@ -178,10 +189,13 @@ func (c *Cache) worker() {
goto drain goto drain
} }
if c.doPromote(item) && c.size > c.maxSize { if c.doPromote(item) && c.size > c.maxSize {
c.gc() dropped += c.gc()
} }
case item := <-c.deletables: case item := <-c.deletables:
c.doDelete(item) c.doDelete(item)
case _ = <-c.getDroppedReq:
c.getDroppedRes <- dropped
dropped = 0
} }
} }
@ -227,11 +241,12 @@ func (c *Cache) doPromote(item *Item) bool {
return true return true
} }
func (c *Cache) gc() { func (c *Cache) gc() int {
dropped := 0
element := c.list.Back() element := c.list.Back()
for i := 0; i < c.itemsToPrune; i++ { for i := 0; i < c.itemsToPrune; i++ {
if element == nil { if element == nil {
return return dropped
} }
prev := element.Prev() prev := element.Prev()
item := element.Value.(*Item) item := element.Value.(*Item)
@ -242,8 +257,10 @@ func (c *Cache) gc() {
if c.onDelete != nil { if c.onDelete != nil {
c.onDelete(item) c.onDelete(item)
} }
dropped += 1
item.promotions = -2 item.promotions = -2
} }
element = prev element = prev
} }
return dropped
} }

View file

@ -156,6 +156,8 @@ func (_ CacheTests) RemovesOldestItemWhenFullBySizer() {
Expect(cache.Get("2")).To.Equal(nil) Expect(cache.Get("2")).To.Equal(nil)
Expect(cache.Get("3")).To.Equal(nil) Expect(cache.Get("3")).To.Equal(nil)
Expect(cache.Get("4").Value().(*SizedItem).id).To.Equal(4) Expect(cache.Get("4").Value().(*SizedItem).id).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(0)
} }
func (_ CacheTests) SetUpdatesSizeOnDelta() { func (_ CacheTests) SetUpdatesSizeOnDelta() {

View file

@ -10,13 +10,15 @@ import (
type LayeredCache struct { type LayeredCache struct {
*Configuration *Configuration
list *list.List list *list.List
buckets []*layeredBucket buckets []*layeredBucket
bucketMask uint32 bucketMask uint32
size int64 size int64
deletables chan *Item deletables chan *Item
promotables chan *Item promotables chan *Item
donec chan struct{} donec chan struct{}
getDroppedReq chan struct{}
getDroppedRes chan int
} }
// Create a new layered cache with the specified configuration. // Create a new layered cache with the specified configuration.
@ -39,6 +41,8 @@ func Layered(config *Configuration) *LayeredCache {
bucketMask: uint32(config.buckets) - 1, bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket, config.buckets), buckets: make([]*layeredBucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer), deletables: make(chan *Item, config.deleteBuffer),
getDroppedReq: make(chan struct{}),
getDroppedRes: make(chan int),
} }
for i := 0; i < int(config.buckets); i++ { for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket{ c.buckets[i] = &layeredBucket{
@ -162,6 +166,13 @@ func (c *LayeredCache) Stop() {
<-c.donec <-c.donec
} }
// Gets the number of items removed from the cache due to memory pressure since
// the last time GetDropped was called
func (c *LayeredCache) GetDropped() int {
c.getDroppedReq <- struct{}{}
return <-c.getDroppedRes
}
func (c *LayeredCache) restart() { func (c *LayeredCache) restart() {
c.promotables = make(chan *Item, c.promoteBuffer) c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{}) c.donec = make(chan struct{})
@ -189,6 +200,7 @@ func (c *LayeredCache) promote(item *Item) {
func (c *LayeredCache) worker() { func (c *LayeredCache) worker() {
defer close(c.donec) defer close(c.donec)
dropped := 0
for { for {
select { select {
case item, ok := <-c.promotables: case item, ok := <-c.promotables:
@ -196,7 +208,7 @@ func (c *LayeredCache) worker() {
return return
} }
if c.doPromote(item) && c.size > c.maxSize { if c.doPromote(item) && c.size > c.maxSize {
c.gc() dropped += c.gc()
} }
case item := <-c.deletables: case item := <-c.deletables:
if item.element == nil { if item.element == nil {
@ -208,6 +220,9 @@ func (c *LayeredCache) worker() {
} }
c.list.Remove(item.element) c.list.Remove(item.element)
} }
case _ = <-c.getDroppedReq:
c.getDroppedRes <- dropped
dropped = 0
} }
} }
} }
@ -229,11 +244,12 @@ func (c *LayeredCache) doPromote(item *Item) bool {
return true return true
} }
func (c *LayeredCache) gc() { func (c *LayeredCache) gc() int {
element := c.list.Back() element := c.list.Back()
dropped := 0
for i := 0; i < c.itemsToPrune; i++ { for i := 0; i < c.itemsToPrune; i++ {
if element == nil { if element == nil {
return return dropped
} }
prev := element.Prev() prev := element.Prev()
item := element.Value.(*Item) item := element.Value.(*Item)
@ -242,7 +258,9 @@ func (c *LayeredCache) gc() {
c.size -= item.size c.size -= item.size
c.list.Remove(element) c.list.Remove(element)
item.promotions = -2 item.promotions = -2
dropped += 1
} }
element = prev element = prev
} }
return dropped
} }

View file

@ -170,6 +170,8 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFull() {
Expect(cache.Get("2", "a")).To.Equal(nil) Expect(cache.Get("2", "a")).To.Equal(nil)
Expect(cache.Get("3", "a").Value()).To.Equal(3) Expect(cache.Get("3", "a").Value()).To.Equal(3)
Expect(cache.Get("xx", "b").Value()).To.Equal(9001) Expect(cache.Get("xx", "b").Value()).To.Equal(9001)
Expect(cache.GetDropped()).To.Equal(4)
Expect(cache.GetDropped()).To.Equal(0)
} }
func newLayered() *LayeredCache { func newLayered() *LayeredCache {