separate singleflight cache wrapper, component names for cache metrics
This commit is contained in:
parent
070c378dfd
commit
560e180e36
8 changed files with 126 additions and 59 deletions
|
@ -29,6 +29,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
|
||||||
sdHash := args[1]
|
sdHash := args[1]
|
||||||
|
|
||||||
s := store.NewCachingStore(
|
s := store.NewCachingStore(
|
||||||
|
"getstream",
|
||||||
peer.NewStore(peer.StoreOpts{Address: addr}),
|
peer.NewStore(peer.StoreOpts{Address: addr}),
|
||||||
store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
|
store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
|
||||||
)
|
)
|
||||||
|
|
|
@ -153,13 +153,19 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
wrapped = store.NewCachingStore(wrapped,
|
wrapped = store.NewCachingStore(
|
||||||
store.NewLRUStore(store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize))
|
"reflector",
|
||||||
|
wrapped,
|
||||||
|
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
if reflectorCmdMemCache > 0 {
|
if reflectorCmdMemCache > 0 {
|
||||||
wrapped = store.NewCachingStore(wrapped,
|
wrapped = store.NewCachingStore(
|
||||||
store.NewLRUStore(store.NewMemStore(), reflectorCmdMemCache))
|
"reflector",
|
||||||
|
wrapped,
|
||||||
|
store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache),
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return wrapped
|
return wrapped
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
|
|
||||||
ee "github.com/lbryio/lbry.go/v2/extras/errors"
|
ee "github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
@ -67,6 +68,7 @@ const (
|
||||||
DirectionDownload = "download" // from reflector
|
DirectionDownload = "download" // from reflector
|
||||||
|
|
||||||
LabelCacheType = "cache_type"
|
LabelCacheType = "cache_type"
|
||||||
|
LabelComponent = "component"
|
||||||
LabelSource = "source"
|
LabelSource = "source"
|
||||||
|
|
||||||
errConnReset = "conn_reset"
|
errConnReset = "conn_reset"
|
||||||
|
@ -116,37 +118,42 @@ var (
|
||||||
Help: "Total number of blobs downloaded from reflector through QUIC protocol",
|
Help: "Total number of blobs downloaded from reflector through QUIC protocol",
|
||||||
})
|
})
|
||||||
|
|
||||||
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
|
CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Subsystem: subsystemCache,
|
Subsystem: subsystemCache,
|
||||||
Name: "hit_total",
|
Name: "hit_total",
|
||||||
Help: "Total number of blobs retrieved from the cache storage",
|
Help: "Total number of blobs retrieved from the cache storage",
|
||||||
})
|
}, []string{LabelCacheType, LabelComponent})
|
||||||
CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{
|
CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Subsystem: subsystemCache,
|
Subsystem: subsystemCache,
|
||||||
Name: "miss_total",
|
Name: "miss_total",
|
||||||
Help: "Total number of blobs retrieved from origin rather than cache storage",
|
Help: "Total number of blobs retrieved from origin rather than cache storage",
|
||||||
})
|
}, []string{LabelCacheType, LabelComponent})
|
||||||
CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Subsystem: subsystemCache,
|
Subsystem: subsystemCache,
|
||||||
Name: "origin_requests_total",
|
Name: "origin_requests_total",
|
||||||
Help: "How many Get requests are in flight from the cache to the origin",
|
Help: "How many Get requests are in flight from the cache to the origin",
|
||||||
})
|
}, []string{LabelCacheType, LabelComponent})
|
||||||
// during thundering-herd situations, the metric below should be a lot smaller than the metric above
|
// during thundering-herd situations, the metric below should be a lot smaller than the metric above
|
||||||
CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Subsystem: subsystemCache,
|
Subsystem: subsystemCache,
|
||||||
Name: "waiting_requests_total",
|
Name: "waiting_requests_total",
|
||||||
Help: "How many cache requests are waiting for an in-flight origin request",
|
Help: "How many cache requests are waiting for an in-flight origin request",
|
||||||
})
|
}, []string{LabelCacheType, LabelComponent})
|
||||||
CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Subsystem: subsystemCache,
|
Subsystem: subsystemCache,
|
||||||
Name: "evict_total",
|
Name: "evict_total",
|
||||||
Help: "Count of blobs evicted from cache",
|
Help: "Count of blobs evicted from cache",
|
||||||
}, []string{LabelCacheType})
|
}, []string{LabelCacheType, LabelComponent})
|
||||||
|
CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: ns,
|
||||||
|
Name: "speed_mbps",
|
||||||
|
Help: "Speed of blob retrieval from cache or from origin",
|
||||||
|
}, []string{LabelCacheType, LabelComponent, LabelSource})
|
||||||
|
|
||||||
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
|
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
|
@ -159,12 +166,6 @@ var (
|
||||||
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
|
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
|
||||||
})
|
})
|
||||||
|
|
||||||
RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
|
||||||
Namespace: ns,
|
|
||||||
Name: "speed_mbps",
|
|
||||||
Help: "Speed of blob retrieval",
|
|
||||||
}, []string{LabelSource})
|
|
||||||
|
|
||||||
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
|
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Name: "tcp_in_bytes",
|
Name: "tcp_in_bytes",
|
||||||
|
@ -202,6 +203,13 @@ var (
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func CacheLabels(name, component string) prometheus.Labels {
|
||||||
|
return prometheus.Labels{
|
||||||
|
LabelCacheType: name,
|
||||||
|
LabelComponent: component,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever
|
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever
|
||||||
if e == nil {
|
if e == nil {
|
||||||
return
|
return
|
||||||
|
|
|
@ -7,8 +7,6 @@ import (
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
|
||||||
"golang.org/x/sync/singleflight"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
|
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
|
||||||
|
@ -16,13 +14,16 @@ import (
|
||||||
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
|
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
|
||||||
type CachingStore struct {
|
type CachingStore struct {
|
||||||
origin, cache BlobStore
|
origin, cache BlobStore
|
||||||
|
component string
|
||||||
sf *singleflight.Group
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCachingStore makes a new caching disk store and returns a pointer to it.
|
// NewCachingStore makes a new caching disk store and returns a pointer to it.
|
||||||
func NewCachingStore(origin, cache BlobStore) *CachingStore {
|
func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
|
||||||
return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)}
|
return &CachingStore{
|
||||||
|
component: component,
|
||||||
|
origin: WithSingleFlight(component, origin),
|
||||||
|
cache: cache,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const nameCaching = "caching"
|
const nameCaching = "caching"
|
||||||
|
@ -45,41 +46,25 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
blob, err := c.cache.Get(hash)
|
blob, err := c.cache.Get(hash)
|
||||||
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
||||||
metrics.CacheHitCount.Inc()
|
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||||
metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "cache"}).Set(rate)
|
metrics.CacheRetrievalSpeed.With(map[string]string{
|
||||||
|
metrics.LabelCacheType: c.cache.Name(),
|
||||||
|
metrics.LabelComponent: c.component,
|
||||||
|
metrics.LabelSource: "cache",
|
||||||
|
}).Set(rate)
|
||||||
return blob, err
|
return blob, err
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.CacheMissCount.Inc()
|
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||||
return c.getFromOrigin(hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getFromOrigin ensures that only one Get per hash is sent to the origin at a time,
|
blob, err = c.origin.Get(hash)
|
||||||
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
|
||||||
func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) {
|
|
||||||
metrics.CacheWaitingRequestsCount.Inc()
|
|
||||||
defer metrics.CacheWaitingRequestsCount.Dec()
|
|
||||||
originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) {
|
|
||||||
metrics.CacheOriginRequestsCount.Inc()
|
|
||||||
defer metrics.CacheOriginRequestsCount.Dec()
|
|
||||||
|
|
||||||
start := time.Now()
|
|
||||||
blob, err := c.origin.Get(hash)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
|
||||||
metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "origin"}).Set(rate)
|
|
||||||
|
|
||||||
err = c.cache.Put(hash, blob)
|
|
||||||
return blob, err
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return originBlob.(stream.Blob), nil
|
|
||||||
|
err = c.cache.Put(hash, blob)
|
||||||
|
return blob, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob in the origin and the cache
|
// Put stores the blob in the origin and the cache
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
func TestCachingStore_Put(t *testing.T) {
|
func TestCachingStore_Put(t *testing.T) {
|
||||||
origin := NewMemStore()
|
origin := NewMemStore()
|
||||||
cache := NewMemStore()
|
cache := NewMemStore()
|
||||||
s := NewCachingStore(origin, cache)
|
s := NewCachingStore("test", origin, cache)
|
||||||
|
|
||||||
b := []byte("this is a blob of stuff")
|
b := []byte("this is a blob of stuff")
|
||||||
hash := "hash"
|
hash := "hash"
|
||||||
|
@ -42,7 +42,7 @@ func TestCachingStore_Put(t *testing.T) {
|
||||||
func TestCachingStore_CacheMiss(t *testing.T) {
|
func TestCachingStore_CacheMiss(t *testing.T) {
|
||||||
origin := NewMemStore()
|
origin := NewMemStore()
|
||||||
cache := NewMemStore()
|
cache := NewMemStore()
|
||||||
s := NewCachingStore(origin, cache)
|
s := NewCachingStore("test", origin, cache)
|
||||||
|
|
||||||
b := []byte("this is a blob of stuff")
|
b := []byte("this is a blob of stuff")
|
||||||
hash := "hash"
|
hash := "hash"
|
||||||
|
@ -80,7 +80,7 @@ func TestCachingStore_ThunderingHerd(t *testing.T) {
|
||||||
storeDelay := 100 * time.Millisecond
|
storeDelay := 100 * time.Millisecond
|
||||||
origin := NewSlowBlobStore(storeDelay)
|
origin := NewSlowBlobStore(storeDelay)
|
||||||
cache := NewMemStore()
|
cache := NewMemStore()
|
||||||
s := NewCachingStore(origin, cache)
|
s := NewCachingStore("test", origin, cache)
|
||||||
|
|
||||||
b := []byte("this is a blob of stuff")
|
b := []byte("this is a blob of stuff")
|
||||||
hash := "hash"
|
hash := "hash"
|
||||||
|
|
|
@ -17,9 +17,9 @@ type LRUStore struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLRUStore initialize a new LRUStore
|
// NewLRUStore initialize a new LRUStore
|
||||||
func NewLRUStore(store BlobStore, maxItems int) *LRUStore {
|
func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
|
||||||
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
|
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
|
||||||
metrics.CacheLRUEvictCount.WithLabelValues(store.Name()).Inc()
|
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
|
||||||
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
|
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -17,7 +17,7 @@ const cacheMaxBlobs = 3
|
||||||
func getTestLRUStore() (*LRUStore, *DiskStore) {
|
func getTestLRUStore() (*LRUStore, *DiskStore) {
|
||||||
d := NewDiskStore("/", 2)
|
d := NewDiskStore("/", 2)
|
||||||
d.fs = afero.NewMemMapFs()
|
d.fs = afero.NewMemMapFs()
|
||||||
return NewLRUStore(d, 3), d
|
return NewLRUStore("test", d, 3), d
|
||||||
}
|
}
|
||||||
|
|
||||||
func countOnDisk(t *testing.T, disk *DiskStore) int {
|
func countOnDisk(t *testing.T, disk *DiskStore) int {
|
||||||
|
@ -134,7 +134,7 @@ func TestLRUStore_loadExisting(t *testing.T) {
|
||||||
require.Equal(t, 1, len(existing), "blob should exist in cache")
|
require.Equal(t, 1, len(existing), "blob should exist in cache")
|
||||||
assert.Equal(t, hash, existing[0])
|
assert.Equal(t, hash, existing[0])
|
||||||
|
|
||||||
lru := NewLRUStore(d, 3) // lru should load existing blobs when it's created
|
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created
|
||||||
has, err := lru.Has(hash)
|
has, err := lru.Has(hash)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, has, "hash should be loaded from disk store but it's not")
|
assert.True(t, has, "hash should be loaded from disk store but it's not")
|
||||||
|
|
67
store/singleflight.go
Normal file
67
store/singleflight.go
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
|
"golang.org/x/sync/singleflight"
|
||||||
|
)
|
||||||
|
|
||||||
|
func WithSingleFlight(component string, origin BlobStore) BlobStore {
|
||||||
|
return &singleflightStore{
|
||||||
|
BlobStore: origin,
|
||||||
|
component: component,
|
||||||
|
sf: new(singleflight.Group),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type singleflightStore struct {
|
||||||
|
BlobStore
|
||||||
|
|
||||||
|
component string
|
||||||
|
sf *singleflight.Group
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *singleflightStore) Name() string {
|
||||||
|
return "sf_" + s.BlobStore.Name()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get ensures that only one request per hash is sent to the origin at a time,
|
||||||
|
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
||||||
|
func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
|
||||||
|
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
|
||||||
|
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
|
||||||
|
|
||||||
|
blob, err, _ := s.sf.Do(hash, s.getter(hash))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return blob.(stream.Blob), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getter returns a function that gets a blob from the origin
|
||||||
|
// only one getter per hash will be executing at a time
|
||||||
|
func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
||||||
|
return func() (interface{}, error) {
|
||||||
|
metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
|
||||||
|
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
blob, err := s.BlobStore.Get(hash)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||||
|
metrics.CacheRetrievalSpeed.With(map[string]string{
|
||||||
|
metrics.LabelCacheType: s.BlobStore.Name(),
|
||||||
|
metrics.LabelComponent: s.component,
|
||||||
|
metrics.LabelSource: "origin",
|
||||||
|
}).Set(rate)
|
||||||
|
|
||||||
|
return blob, nil
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue