diff --git a/store/caching.go b/store/caching.go index cd05b41..65b6397 100644 --- a/store/caching.go +++ b/store/caching.go @@ -24,7 +24,7 @@ func NewCachingStore(component string, origin, cache BlobStore) *CachingStore { return &CachingStore{ component: component, origin: WithSingleFlight(component, origin), - cache: cache, + cache: WithSingleFlight(component, cache), } } diff --git a/store/singleflight.go b/store/singleflight.go index 362182d..6de91f4 100644 --- a/store/singleflight.go +++ b/store/singleflight.go @@ -80,6 +80,43 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) { } } +// Put ensures that only one request per hash is sent to the origin at a time, +// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem +func (s *singleflightStore) Put(hash string, blob stream.Blob) error { + metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc() + defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec() + + _, err, _ := s.sf.Do(hash, s.putter(hash, blob)) + if err != nil { + return err + } + return nil +} + +// putter returns a function that puts a blob from the origin +// only one putter per hash will be executing at a time +func (s *singleflightStore) putter(hash string, blob stream.Blob) func() (interface{}, error) { + return func() (interface{}, error) { + metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc() + defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec() + + start := time.Now() + err := s.BlobStore.Put(hash, blob) + if err != nil { + return nil, err + } + + rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() + metrics.CacheRetrievalSpeed.With(map[string]string{ + metrics.LabelCacheType: s.Name(), + metrics.LabelComponent: s.component, + metrics.LabelSource: "origin", + }).Set(rate) + + return nil, nil + } +} + // Shutdown shuts down the store gracefully func (s *singleflightStore) Shutdown() { s.BlobStore.Shutdown()