2019-10-03 19:36:35 +02:00
|
|
|
package store
|
|
|
|
|
|
|
|
import (
|
2020-07-09 04:28:34 +02:00
|
|
|
"time"
|
|
|
|
|
2021-07-20 02:09:14 +02:00
|
|
|
"github.com/lbryio/reflector.go/internal/metrics"
|
|
|
|
"github.com/lbryio/reflector.go/shared"
|
|
|
|
|
2019-11-14 01:11:35 +01:00
|
|
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
|
|
"github.com/lbryio/lbry.go/v2/stream"
|
2020-07-09 04:28:34 +02:00
|
|
|
|
2021-07-20 02:09:14 +02:00
|
|
|
log "github.com/sirupsen/logrus"
|
2019-10-03 19:36:35 +02:00
|
|
|
)
|
|
|
|
|
2020-10-22 19:12:31 +02:00
|
|
|
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
|
2019-10-03 19:36:35 +02:00
|
|
|
// Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they
|
|
|
|
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
|
2020-10-22 19:12:31 +02:00
|
|
|
type CachingStore struct {
|
2019-10-03 19:36:35 +02:00
|
|
|
origin, cache BlobStore
|
2020-10-29 17:39:53 +01:00
|
|
|
component string
|
2019-10-03 19:36:35 +02:00
|
|
|
}
|
|
|
|
|
2020-10-22 19:12:31 +02:00
|
|
|
// NewCachingStore makes a new caching disk store and returns a pointer to it.
|
2020-10-29 17:39:53 +01:00
|
|
|
func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
|
|
|
|
return &CachingStore{
|
|
|
|
component: component,
|
|
|
|
origin: WithSingleFlight(component, origin),
|
2021-05-21 01:05:48 +02:00
|
|
|
cache: WithSingleFlight(component, cache),
|
2020-10-29 17:39:53 +01:00
|
|
|
}
|
2019-10-03 19:36:35 +02:00
|
|
|
}
|
|
|
|
|
2020-10-22 19:49:02 +02:00
|
|
|
const nameCaching = "caching"
|
|
|
|
|
|
|
|
// Name is the cache type name
|
|
|
|
func (c *CachingStore) Name() string { return nameCaching }
|
|
|
|
|
2019-10-03 19:36:35 +02:00
|
|
|
// Has checks the cache and then the origin for a hash. It returns true if either store has it.
|
2020-10-22 19:12:31 +02:00
|
|
|
func (c *CachingStore) Has(hash string) (bool, error) {
|
2019-10-03 19:36:35 +02:00
|
|
|
has, err := c.cache.Has(hash)
|
|
|
|
if has || err != nil {
|
|
|
|
return has, err
|
|
|
|
}
|
|
|
|
return c.origin.Has(hash)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
|
|
|
|
// from the origin, it is also stored in the cache.
|
2021-01-09 05:08:20 +01:00
|
|
|
func (c *CachingStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
2020-07-09 04:28:34 +02:00
|
|
|
start := time.Now()
|
2021-01-09 05:08:20 +01:00
|
|
|
blob, trace, err := c.cache.Get(hash)
|
2019-10-03 19:36:35 +02:00
|
|
|
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
2020-10-29 17:39:53 +01:00
|
|
|
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
2020-10-14 22:08:48 +02:00
|
|
|
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
2020-10-29 17:39:53 +01:00
|
|
|
metrics.CacheRetrievalSpeed.With(map[string]string{
|
|
|
|
metrics.LabelCacheType: c.cache.Name(),
|
|
|
|
metrics.LabelComponent: c.component,
|
|
|
|
metrics.LabelSource: "cache",
|
|
|
|
}).Set(rate)
|
2021-01-09 05:08:20 +01:00
|
|
|
return blob, trace.Stack(time.Since(start), c.Name()), err
|
2019-10-03 19:36:35 +02:00
|
|
|
}
|
|
|
|
|
2020-10-29 17:39:53 +01:00
|
|
|
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
2020-10-14 22:08:48 +02:00
|
|
|
|
2021-01-09 05:08:20 +01:00
|
|
|
blob, trace, err = c.origin.Get(hash)
|
2019-10-03 19:36:35 +02:00
|
|
|
if err != nil {
|
2021-01-09 05:08:20 +01:00
|
|
|
return nil, trace.Stack(time.Since(start), c.Name()), err
|
2019-10-03 19:36:35 +02:00
|
|
|
}
|
2021-05-28 01:19:57 +02:00
|
|
|
// do not do this async unless you're prepared to deal with mayhem
|
|
|
|
err = c.cache.Put(hash, blob)
|
|
|
|
if err != nil {
|
|
|
|
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
|
|
|
|
}
|
2021-01-09 05:08:20 +01:00
|
|
|
return blob, trace.Stack(time.Since(start), c.Name()), nil
|
2019-10-03 19:36:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Put stores the blob in the origin and the cache
|
2020-10-22 19:12:31 +02:00
|
|
|
func (c *CachingStore) Put(hash string, blob stream.Blob) error {
|
2019-10-03 19:36:35 +02:00
|
|
|
err := c.origin.Put(hash, blob)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return c.cache.Put(hash, blob)
|
|
|
|
}
|
|
|
|
|
|
|
|
// PutSD stores the sd blob in the origin and the cache
|
2020-10-22 19:12:31 +02:00
|
|
|
func (c *CachingStore) PutSD(hash string, blob stream.Blob) error {
|
2019-10-03 19:36:35 +02:00
|
|
|
err := c.origin.PutSD(hash, blob)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return c.cache.PutSD(hash, blob)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete deletes the blob from the origin and the cache
|
2020-10-22 19:12:31 +02:00
|
|
|
func (c *CachingStore) Delete(hash string) error {
|
2019-10-03 19:36:35 +02:00
|
|
|
err := c.origin.Delete(hash)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return c.cache.Delete(hash)
|
|
|
|
}
|
2020-12-23 06:04:42 +01:00
|
|
|
|
|
|
|
// Shutdown shuts down the store gracefully
|
|
|
|
func (c *CachingStore) Shutdown() {
|
|
|
|
c.origin.Shutdown()
|
|
|
|
c.cache.Shutdown()
|
|
|
|
}
|