layeredcache: add Stop() and fix races in tests

worker goroutine running concurrently with tests would cause data race errors
when running tests with -race enabled.
This commit is contained in:
Anthony Romano 2017-02-10 10:26:41 -08:00
parent 12c7ffdc19
commit c69270ce08
3 changed files with 45 additions and 18 deletions

View file

@ -16,6 +16,7 @@ type LayeredCache struct {
size int64 size int64
deletables chan *Item deletables chan *Item
promotables chan *Item promotables chan *Item
donec chan struct{}
} }
// Create a new layered cache with the specified configuration. // Create a new layered cache with the specified configuration.
@ -38,14 +39,13 @@ func Layered(config *Configuration) *LayeredCache {
bucketMask: uint32(config.buckets) - 1, bucketMask: uint32(config.buckets) - 1,
buckets: make([]*layeredBucket, config.buckets), buckets: make([]*layeredBucket, config.buckets),
deletables: make(chan *Item, config.deleteBuffer), deletables: make(chan *Item, config.deleteBuffer),
promotables: make(chan *Item, config.promoteBuffer),
} }
for i := 0; i < int(config.buckets); i++ { for i := 0; i < int(config.buckets); i++ {
c.buckets[i] = &layeredBucket{ c.buckets[i] = &layeredBucket{
buckets: make(map[string]*bucket), buckets: make(map[string]*bucket),
} }
} }
go c.worker() c.restart()
return c return c
} }
@ -149,6 +149,17 @@ func (c *LayeredCache) Clear() {
c.list = list.New() c.list = list.New()
} }
func (c *LayeredCache) Stop() {
close(c.promotables)
<-c.donec
}
func (c *LayeredCache) restart() {
c.promotables = make(chan *Item, c.promoteBuffer)
c.donec = make(chan struct{})
go c.worker()
}
func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item { func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item {
item, existing := c.bucket(primary).set(primary, secondary, value, duration) item, existing := c.bucket(primary).set(primary, secondary, value, duration)
if existing != nil { if existing != nil {
@ -169,9 +180,13 @@ func (c *LayeredCache) promote(item *Item) {
} }
func (c *LayeredCache) worker() { func (c *LayeredCache) worker() {
defer close(c.donec)
for { for {
select { select {
case item := <-c.promotables: case item, ok := <-c.promotables:
if ok == false {
return
}
if c.doPromote(item) && c.size > c.maxSize { if c.doPromote(item) && c.size > c.maxSize {
c.gc() c.gc()
} }

View file

@ -86,7 +86,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() {
cache.Set("xx", "b", 9001, time.Minute) cache.Set("xx", "b", 9001, time.Minute)
//let the items get promoted (and added to our list) //let the items get promoted (and added to our list)
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
cache.gc() gcLayeredCache(cache)
Expect(cache.Get("xx", "a")).To.Equal(nil) Expect(cache.Get("xx", "a")).To.Equal(nil)
Expect(cache.Get("xx", "b").Value()).To.Equal(9001) Expect(cache.Get("xx", "b").Value()).To.Equal(9001)
Expect(cache.Get("8", "a")).To.Equal(nil) Expect(cache.Get("8", "a")).To.Equal(nil)
@ -102,7 +102,7 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() {
time.Sleep(time.Millisecond * 10) //run the worker once to init the list time.Sleep(time.Millisecond * 10) //run the worker once to init the list
cache.Get("9", "a") cache.Get("9", "a")
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
cache.gc() gcLayeredCache(cache)
Expect(cache.Get("9", "a").Value()).To.Equal(9) Expect(cache.Get("9", "a").Value()).To.Equal(9)
Expect(cache.Get("10", "a")).To.Equal(nil) Expect(cache.Get("10", "a")).To.Equal(nil)
Expect(cache.Get("11", "a").Value()).To.Equal(11) Expect(cache.Get("11", "a").Value()).To.Equal(11)
@ -115,11 +115,11 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() {
} }
item := cache.TrackingGet("0", "a") item := cache.TrackingGet("0", "a")
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
cache.gc() gcLayeredCache(cache)
Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a")).To.Equal(nil) Expect(cache.Get("1", "a")).To.Equal(nil)
item.Release() item.Release()
cache.gc() gcLayeredCache(cache)
Expect(cache.Get("0", "a")).To.Equal(nil) Expect(cache.Get("0", "a")).To.Equal(nil)
} }
@ -161,20 +161,20 @@ func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() {
cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(5)) checkLayeredSize(cache, 5)
cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(5)) checkLayeredSize(cache, 5)
cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute)
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(6)) checkLayeredSize(cache, 6)
cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute)
cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute) cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute)
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(7)) checkLayeredSize(cache, 7)
cache.Delete("pri", "b") cache.Delete("pri", "b")
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
Expect(cache.size).To.Equal(int64(5)) checkLayeredSize(cache, 5)
} }
func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
@ -184,7 +184,7 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() {
cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute) cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute)
cache.Replace("sec", "3", &SizedItem{1, 2}) cache.Replace("sec", "3", &SizedItem{1, 2})
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(6)) checkLayeredSize(cache, 6)
} }
func (_ LayeredCacheTests) ReplaceChangesSize() { func (_ LayeredCacheTests) ReplaceChangesSize() {
@ -194,13 +194,25 @@ func (_ LayeredCacheTests) ReplaceChangesSize() {
cache.Replace("pri", "2", &SizedItem{1, 2}) cache.Replace("pri", "2", &SizedItem{1, 2})
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(4)) checkLayeredSize(cache, 4)
cache.Replace("pri", "2", &SizedItem{1, 1}) cache.Replace("pri", "2", &SizedItem{1, 1})
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(3)) checkLayeredSize(cache, 3)
cache.Replace("pri", "2", &SizedItem{1, 3}) cache.Replace("pri", "2", &SizedItem{1, 3})
time.Sleep(time.Millisecond * 5) time.Sleep(time.Millisecond * 5)
Expect(cache.size).To.Equal(int64(5)) checkLayeredSize(cache, 5)
}
func checkLayeredSize(cache *LayeredCache, sz int64) {
cache.Stop()
Expect(cache.size).To.Equal(sz)
cache.restart()
}
func gcLayeredCache(cache *LayeredCache) {
cache.Stop()
cache.gc()
cache.restart()
} }

View file

@ -96,10 +96,10 @@ func (_ SecondaryCacheTests) TrackerDoesNotCleanupHeldInstance() {
sCache := cache.GetOrCreateSecondaryCache("0") sCache := cache.GetOrCreateSecondaryCache("0")
item := sCache.TrackingGet("a") item := sCache.TrackingGet("a")
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
cache.gc() gcLayeredCache(cache)
Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("0", "a").Value()).To.Equal(0)
Expect(cache.Get("1", "a")).To.Equal(nil) Expect(cache.Get("1", "a")).To.Equal(nil)
item.Release() item.Release()
cache.gc() gcLayeredCache(cache)
Expect(cache.Get("0", "a")).To.Equal(nil) Expect(cache.Get("0", "a")).To.Equal(nil)
} }