Add single flight for cache not just origin
This commit is contained in:
parent
724ee47c8b
commit
5aefaf061e
2 changed files with 38 additions and 1 deletions
|
@ -24,7 +24,7 @@ func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
|
||||||
return &CachingStore{
|
return &CachingStore{
|
||||||
component: component,
|
component: component,
|
||||||
origin: WithSingleFlight(component, origin),
|
origin: WithSingleFlight(component, origin),
|
||||||
cache: cache,
|
cache: WithSingleFlight(component, cache),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,6 +80,43 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Put ensures that only one request per hash is sent to the origin at a time,
|
||||||
|
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
||||||
|
func (s *singleflightStore) Put(hash string, blob stream.Blob) error {
|
||||||
|
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc()
|
||||||
|
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
|
||||||
|
|
||||||
|
_, err, _ := s.sf.Do(hash, s.putter(hash, blob))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// putter returns a function that puts a blob from the origin
|
||||||
|
// only one putter per hash will be executing at a time
|
||||||
|
func (s *singleflightStore) putter(hash string, blob stream.Blob) func() (interface{}, error) {
|
||||||
|
return func() (interface{}, error) {
|
||||||
|
metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc()
|
||||||
|
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := s.BlobStore.Put(hash, blob)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||||
|
metrics.CacheRetrievalSpeed.With(map[string]string{
|
||||||
|
metrics.LabelCacheType: s.Name(),
|
||||||
|
metrics.LabelComponent: s.component,
|
||||||
|
metrics.LabelSource: "origin",
|
||||||
|
}).Set(rate)
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Shutdown shuts down the store gracefully
|
// Shutdown shuts down the store gracefully
|
||||||
func (s *singleflightStore) Shutdown() {
|
func (s *singleflightStore) Shutdown() {
|
||||||
s.BlobStore.Shutdown()
|
s.BlobStore.Shutdown()
|
||||||
|
|
Loading…
Reference in a new issue