reflector.go/store/caching.go

111 lines
3.2 KiB
Go
Raw Normal View History

2019-10-03 19:36:35 +02:00
package store
import (
2020-07-09 04:28:34 +02:00
"time"
2019-11-14 01:11:35 +01:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
2020-07-09 04:28:34 +02:00
"github.com/lbryio/reflector.go/internal/metrics"
"golang.org/x/sync/singleflight"
2019-10-03 19:36:35 +02:00
)
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
2019-10-03 19:36:35 +02:00
// Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
type CachingStore struct {
2019-10-03 19:36:35 +02:00
origin, cache BlobStore
sf *singleflight.Group
2019-10-03 19:36:35 +02:00
}
// NewCachingStore makes a new caching disk store and returns a pointer to it.
func NewCachingStore(origin, cache BlobStore) *CachingStore {
return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)}
2019-10-03 19:36:35 +02:00
}
2020-10-22 19:49:02 +02:00
const nameCaching = "caching"
// Name is the cache type name
func (c *CachingStore) Name() string { return nameCaching }
2019-10-03 19:36:35 +02:00
// Has checks the cache and then the origin for a hash. It returns true if either store has it.
func (c *CachingStore) Has(hash string) (bool, error) {
2019-10-03 19:36:35 +02:00
has, err := c.cache.Has(hash)
if has || err != nil {
return has, err
}
return c.origin.Has(hash)
}
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
// from the origin, it is also stored in the cache.
func (c *CachingStore) Get(hash string) (stream.Blob, error) {
2020-07-09 04:28:34 +02:00
start := time.Now()
2019-10-03 19:36:35 +02:00
blob, err := c.cache.Get(hash)
if err == nil || !errors.Is(err, ErrBlobNotFound) {
2020-07-09 04:28:34 +02:00
metrics.CacheHitCount.Inc()
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
2020-10-22 19:49:02 +02:00
metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "cache"}).Set(rate)
2019-10-03 19:36:35 +02:00
return blob, err
}
metrics.CacheMissCount.Inc()
return c.getFromOrigin(hash)
}
// getFromOrigin ensures that only one Get per hash is sent to the origin at a time,
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) {
metrics.CacheWaitingRequestsCount.Inc()
defer metrics.CacheWaitingRequestsCount.Dec()
originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) {
metrics.CacheOriginRequestsCount.Inc()
defer metrics.CacheOriginRequestsCount.Dec()
start := time.Now()
blob, err := c.origin.Get(hash)
if err != nil {
return nil, err
}
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
2020-10-22 19:49:02 +02:00
metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "origin"}).Set(rate)
err = c.cache.Put(hash, blob)
return blob, err
})
2019-10-03 19:36:35 +02:00
if err != nil {
return nil, err
}
return originBlob.(stream.Blob), nil
2019-10-03 19:36:35 +02:00
}
// Put stores the blob in the origin and the cache
func (c *CachingStore) Put(hash string, blob stream.Blob) error {
2019-10-03 19:36:35 +02:00
err := c.origin.Put(hash, blob)
if err != nil {
return err
}
return c.cache.Put(hash, blob)
}
// PutSD stores the sd blob in the origin and the cache
func (c *CachingStore) PutSD(hash string, blob stream.Blob) error {
2019-10-03 19:36:35 +02:00
err := c.origin.PutSD(hash, blob)
if err != nil {
return err
}
return c.cache.PutSD(hash, blob)
}
// Delete deletes the blob from the origin and the cache
func (c *CachingStore) Delete(hash string) error {
2019-10-03 19:36:35 +02:00
err := c.origin.Delete(hash)
if err != nil {
return err
}
return c.cache.Delete(hash)
}