From c69270ce087ce59dd7d73c013506e288b19d444e Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 10 Feb 2017 10:26:41 -0800 Subject: [PATCH 1/2] layeredcache: add Stop() and fix races in tests worker goroutine running concurrently with tests would cause data race errors when running tests with -race enabled. --- layeredcache.go | 21 ++++++++++++++++++--- layeredcache_test.go | 38 +++++++++++++++++++++++++------------- secondarycache_test.go | 4 ++-- 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/layeredcache.go b/layeredcache.go index e064eed..20b13f9 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -16,6 +16,7 @@ type LayeredCache struct { size int64 deletables chan *Item promotables chan *Item + donec chan struct{} } // Create a new layered cache with the specified configuration. @@ -38,14 +39,13 @@ func Layered(config *Configuration) *LayeredCache { bucketMask: uint32(config.buckets) - 1, buckets: make([]*layeredBucket, config.buckets), deletables: make(chan *Item, config.deleteBuffer), - promotables: make(chan *Item, config.promoteBuffer), } for i := 0; i < int(config.buckets); i++ { c.buckets[i] = &layeredBucket{ buckets: make(map[string]*bucket), } } - go c.worker() + c.restart() return c } @@ -149,6 +149,17 @@ func (c *LayeredCache) Clear() { c.list = list.New() } +func (c *LayeredCache) Stop() { + close(c.promotables) + <-c.donec +} + +func (c *LayeredCache) restart() { + c.promotables = make(chan *Item, c.promoteBuffer) + c.donec = make(chan struct{}) + go c.worker() +} + func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration) *Item { item, existing := c.bucket(primary).set(primary, secondary, value, duration) if existing != nil { @@ -169,9 +180,13 @@ func (c *LayeredCache) promote(item *Item) { } func (c *LayeredCache) worker() { + defer close(c.donec) for { select { - case item := <-c.promotables: + case item, ok := <-c.promotables: + if ok == false { + return + } if c.doPromote(item) && c.size > c.maxSize { c.gc() } diff --git a/layeredcache_test.go b/layeredcache_test.go index e233a47..94b61a6 100644 --- a/layeredcache_test.go +++ b/layeredcache_test.go @@ -86,7 +86,7 @@ func (_ LayeredCacheTests) GCsTheOldestItems() { cache.Set("xx", "b", 9001, time.Minute) //let the items get promoted (and added to our list) time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("xx", "a")).To.Equal(nil) Expect(cache.Get("xx", "b").Value()).To.Equal(9001) Expect(cache.Get("8", "a")).To.Equal(nil) @@ -102,7 +102,7 @@ func (_ LayeredCacheTests) PromotedItemsDontGetPruned() { time.Sleep(time.Millisecond * 10) //run the worker once to init the list cache.Get("9", "a") time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("9", "a").Value()).To.Equal(9) Expect(cache.Get("10", "a")).To.Equal(nil) Expect(cache.Get("11", "a").Value()).To.Equal(11) @@ -115,11 +115,11 @@ func (_ LayeredCacheTests) TrackerDoesNotCleanupHeldInstance() { } item := cache.TrackingGet("0", "a") time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("1", "a")).To.Equal(nil) item.Release() - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a")).To.Equal(nil) } @@ -161,20 +161,20 @@ func (_ LayeredCacheTests) SetUpdatesSizeOnDelta() { cache.Set("pri", "a", &SizedItem{0, 2}, time.Minute) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) cache.Set("pri", "b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) cache.Set("pri", "b", &SizedItem{0, 4}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(6)) + checkLayeredSize(cache, 6) cache.Set("pri", "b", &SizedItem{0, 2}, time.Minute) cache.Set("sec", "b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(7)) + checkLayeredSize(cache, 7) cache.Delete("pri", "b") time.Sleep(time.Millisecond * 10) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) } func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { @@ -184,7 +184,7 @@ func (_ LayeredCacheTests) ReplaceDoesNotchangeSizeIfNotSet() { cache.Set("pri", "3", &SizedItem{1, 2}, time.Minute) cache.Replace("sec", "3", &SizedItem{1, 2}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(6)) + checkLayeredSize(cache, 6) } func (_ LayeredCacheTests) ReplaceChangesSize() { @@ -194,13 +194,25 @@ func (_ LayeredCacheTests) ReplaceChangesSize() { cache.Replace("pri", "2", &SizedItem{1, 2}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(4)) + checkLayeredSize(cache, 4) cache.Replace("pri", "2", &SizedItem{1, 1}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(3)) + checkLayeredSize(cache, 3) cache.Replace("pri", "2", &SizedItem{1, 3}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkLayeredSize(cache, 5) +} + +func checkLayeredSize(cache *LayeredCache, sz int64) { + cache.Stop() + Expect(cache.size).To.Equal(sz) + cache.restart() +} + +func gcLayeredCache(cache *LayeredCache) { + cache.Stop() + cache.gc() + cache.restart() } diff --git a/secondarycache_test.go b/secondarycache_test.go index 8b733c1..7065c23 100644 --- a/secondarycache_test.go +++ b/secondarycache_test.go @@ -96,10 +96,10 @@ func (_ SecondaryCacheTests) TrackerDoesNotCleanupHeldInstance() { sCache := cache.GetOrCreateSecondaryCache("0") item := sCache.TrackingGet("a") time.Sleep(time.Millisecond * 10) - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a").Value()).To.Equal(0) Expect(cache.Get("1", "a")).To.Equal(nil) item.Release() - cache.gc() + gcLayeredCache(cache) Expect(cache.Get("0", "a")).To.Equal(nil) } From b3c864ded7d5eef6180f410abd88179a0267e982 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 10 Feb 2017 10:51:01 -0800 Subject: [PATCH 2/2] cache: make Stop() synchronous and races in tests worker goroutine running concurrently with tests would cause data race errors when running tests with -race enabled. --- cache.go | 15 ++++++++++++--- cache_test.go | 38 +++++++++++++++++++++++++------------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/cache.go b/cache.go index 998d1db..a9e94f4 100644 --- a/cache.go +++ b/cache.go @@ -16,6 +16,7 @@ type Cache struct { bucketMask uint32 deletables chan *Item promotables chan *Item + donec chan struct{} } // Create a new cache with the specified configuration @@ -26,15 +27,13 @@ func New(config *Configuration) *Cache { Configuration: config, bucketMask: uint32(config.buckets) - 1, buckets: make([]*bucket, config.buckets), - deletables: make(chan *Item, config.deleteBuffer), - promotables: make(chan *Item, config.promoteBuffer), } for i := 0; i < int(config.buckets); i++ { c.buckets[i] = &bucket{ lookup: make(map[string]*Item), } } - go c.worker() + c.restart() return c } @@ -119,6 +118,14 @@ func (c *Cache) Clear() { // is called are likely to panic func (c *Cache) Stop() { close(c.promotables) + <-c.donec +} + +func (c *Cache) restart() { + c.deletables = make(chan *Item, c.deleteBuffer) + c.promotables = make(chan *Item, c.promoteBuffer) + c.donec = make(chan struct{}) + go c.worker() } func (c *Cache) deleteItem(bucket *bucket, item *Item) { @@ -146,6 +153,8 @@ func (c *Cache) promote(item *Item) { } func (c *Cache) worker() { + defer close(c.donec) + for { select { case item, ok := <-c.promotables: diff --git a/cache_test.go b/cache_test.go index d0989fb..552f36c 100644 --- a/cache_test.go +++ b/cache_test.go @@ -41,7 +41,7 @@ func (_ CacheTests) GCsTheOldestItems() { } //let the items get promoted (and added to our list) time.Sleep(time.Millisecond * 10) - cache.gc() + gcCache(cache) Expect(cache.Get("9")).To.Equal(nil) Expect(cache.Get("10").Value()).To.Equal(10) } @@ -54,7 +54,7 @@ func (_ CacheTests) PromotedItemsDontGetPruned() { time.Sleep(time.Millisecond * 10) //run the worker once to init the list cache.Get("9") time.Sleep(time.Millisecond * 10) - cache.gc() + gcCache(cache) Expect(cache.Get("9").Value()).To.Equal(9) Expect(cache.Get("10")).To.Equal(nil) Expect(cache.Get("11").Value()).To.Equal(11) @@ -67,11 +67,11 @@ func (_ CacheTests) TrackerDoesNotCleanupHeldInstance() { } item := cache.TrackingGet("0") time.Sleep(time.Millisecond * 10) - cache.gc() + gcCache(cache) Expect(cache.Get("0").Value()).To.Equal(0) Expect(cache.Get("1")).To.Equal(nil) item.Release() - cache.gc() + gcCache(cache) Expect(cache.Get("0")).To.Equal(nil) } @@ -104,19 +104,19 @@ func (_ CacheTests) SetUpdatesSizeOnDelta() { cache.Set("a", &SizedItem{0, 2}, time.Minute) cache.Set("b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkSize(cache, 5) cache.Set("b", &SizedItem{0, 3}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkSize(cache, 5) cache.Set("b", &SizedItem{0, 4}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(6)) + checkSize(cache, 6) cache.Set("b", &SizedItem{0, 2}, time.Minute) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(4)) + checkSize(cache, 4) cache.Delete("b") time.Sleep(time.Millisecond * 100) - Expect(cache.size).To.Equal(int64(2)) + checkSize(cache, 2) } func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() { @@ -126,7 +126,7 @@ func (_ CacheTests) ReplaceDoesNotchangeSizeIfNotSet() { cache.Set("3", &SizedItem{1, 2}, time.Minute) cache.Replace("4", &SizedItem{1, 2}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(6)) + checkSize(cache, 6) } func (_ CacheTests) ReplaceChangesSize() { @@ -136,15 +136,15 @@ func (_ CacheTests) ReplaceChangesSize() { cache.Replace("2", &SizedItem{1, 2}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(4)) + checkSize(cache, 4) cache.Replace("2", &SizedItem{1, 1}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(3)) + checkSize(cache, 3) cache.Replace("2", &SizedItem{1, 3}) time.Sleep(time.Millisecond * 5) - Expect(cache.size).To.Equal(int64(5)) + checkSize(cache, 5) } type SizedItem struct { @@ -155,3 +155,15 @@ type SizedItem struct { func (s *SizedItem) Size() int64 { return s.s } + +func checkSize(cache *Cache, sz int64) { + cache.Stop() + Expect(cache.size).To.Equal(sz) + cache.restart() +} + +func gcCache(cache *Cache) { + cache.Stop() + cache.gc() + cache.restart() +}