From c69270ce087ce59dd7d73c013506e288b19d444e Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 10 Feb 2017 10:26:41 -0800 Subject: [PATCH] layeredcache: add Stop() and fix races in tests worker goroutine running concurrently with tests would cause data race errors when running tests with -race enabled. --- layeredcache.go | 21 ++++++++++++++++++--- layeredcache_test.go | 38 +++++++++++++++++++++++++------------- secondarycache_test.go | 4 ++-- 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/layeredcache.go b/layeredcache.go index e064eed..20b13f9 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -16,6 +16,7 @@ type LayeredCache struct { size int64 deletables chan *Item promotables chan *Item + donec chan struct{} } // Create a new layered cache with the specified configuration. @@ -38,14 +39,13 @@ func Layered(config *Configuration) *LayeredCache { bucketMask: uint32(config.buckets) - 1, buckets: make([]*layeredBucket, config.buckets), deletables: make(chan *Item, config.deleteBuffer), - promotables: make(chan *Item, config.promoteBuffer), } for i := 0; i < int(config.buckets); i++ { c.buckets[i] = &layeredBucket{ buckets: make(map[string]*bucket), } } - go c.worker() + c.restart() return c } @@ -149,6 +149,17 @@ func (c *LayeredCache) Clear() { c.list = list.New() } +func (c *LayeredCache) Stop() { + close(c.promotables) + <-c.donec +} + +func (c *LayeredCache) restart() { + c.promotables = make(chan *Item, c.promoteBuffer) + c.donec = make(chan struct{}) + go c.worker() +} + func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item { item, existing := c.bucket(primary).set(primary, secondary, value, duration) if existing != nil { @@ -169,9 +180,13 @@ func (c *LayeredCache) promote(item *Item) { } func (c *LayeredCache) worker() { + defer close(c.donec) for { select { - case item := <-c.promotables: + case item, ok := <-c.promotables: + if ok == false { + return + } if c.doPromote(item) && c.size > c.maxSize { c.gc() } diff --git a/layeredcache_test.go b/layeredcache_test.go index e233a47..94b61a6 100644 --- a/layeredcache_test.go +++ b/layeredcache_test.go @@ -86,7 +86,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() { cache.Set("xx", "b", 9001, time.Minute) //let the items get promoted (and added to our list) time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("xx", "a")).To.Equal(nil) Expect(cache.Get("xx", "b").Value()).To.Equal(9001) Expect(cache.Get("8", "a")).To.Equal(nil) @@ -102,7 +102,7 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() { time.Sleep(time.Millisecond * 10) //run the worker once to init the list cache.Get("9", "a") time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("9", "a").Value()).To.Equal(9) Expect(cache.Get("10", "a")).To.Equal(nil) Expect(cache.Get("11", "a").Value()).To.Equal(11) @@ -115,11 +115,11 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() { } item := cache.TrackingGet("0", "a") time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("1", "a")).To.Equal(nil) item.Release() - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a")).To.Equal(nil) } @@ -161,20 +161,20 @@ func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() { cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(6)) + checkLayeredSize(cache, 6) cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute) cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(7)) + checkLayeredSize(cache, 7) cache.Delete("pri", "b") time.Sleep(time.Millisecond * 10) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) } func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { @@ -184,7 +184,7 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute) cache.Replace("sec", "3", &SizedItem{1, 2}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(6)) + checkLayeredSize(cache, 6) } func (_ LayeredCacheTests) ReplaceChangesSize() { @@ -194,13 +194,25 @@ func (_ LayeredCacheTests) ReplaceChangesSize() { cache.Replace("pri", "2", &SizedItem{1, 2}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(4)) + checkLayeredSize(cache, 4) cache.Replace("pri", "2", &SizedItem{1, 1}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(3)) + checkLayeredSize(cache, 3) cache.Replace("pri", "2", &SizedItem{1, 3}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) +} + +func checkLayeredSize(cache *LayeredCache, sz int64) { + cache.Stop() + Expect(cache.size).To.Equal(sz) + cache.restart() +} + +func gcLayeredCache(cache *LayeredCache) { + cache.Stop() + cache.gc() + cache.restart() } diff --git a/secondarycache_test.go b/secondarycache_test.go index 8b733c1..7065c23 100644 --- a/secondarycache_test.go +++ b/secondarycache_test.go @@ -96,10 +96,10 @@ func (_ SecondaryCacheTests) TrackerDoesNotCleanupHeldInstance() { sCache := cache.GetOrCreateSecondaryCache("0") item := sCache.TrackingGet("a") time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("1", "a")).To.Equal(nil) item.Release() - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a")).To.Equal(nil) }