add lru cache eviction metric
This commit is contained in:
parent
c9fa04043c
commit
3608971f0b
18 changed files with 143 additions and 85 deletions
|
@ -159,7 +159,7 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
|||
|
||||
if reflectorCmdMemCache > 0 {
|
||||
wrapped = store.NewCachingStore(wrapped,
|
||||
store.NewLRUStore(store.NewMemoryStore(), reflectorCmdMemCache))
|
||||
store.NewLRUStore(store.NewMemStore(), reflectorCmdMemCache))
|
||||
}
|
||||
|
||||
return wrapped
|
||||
|
|
|
@ -29,7 +29,7 @@ func init() {
|
|||
func testCmd(cmd *cobra.Command, args []string) {
|
||||
log.Printf("reflector %s", meta.VersionString())
|
||||
|
||||
memStore := store.NewMemoryStore()
|
||||
memStore := store.NewMemStore()
|
||||
|
||||
reflectorServer := reflector.NewServer(memStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
|
|
|
@ -57,7 +57,8 @@ func (s *Server) Shutdown() {
|
|||
}
|
||||
|
||||
const (
|
||||
ns = "reflector"
|
||||
ns = "reflector"
|
||||
subsystemCache = "cache"
|
||||
|
||||
labelDirection = "direction"
|
||||
labelErrorType = "error_type"
|
||||
|
@ -65,7 +66,8 @@ const (
|
|||
DirectionUpload = "upload" // to reflector
|
||||
DirectionDownload = "download" // from reflector
|
||||
|
||||
MtrLabelSource = "source"
|
||||
LabelCacheType = "cache_type"
|
||||
LabelSource = "source"
|
||||
|
||||
errConnReset = "conn_reset"
|
||||
errReadConnReset = "read_conn_reset"
|
||||
|
@ -116,25 +118,35 @@ var (
|
|||
|
||||
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_hit_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "hit_total",
|
||||
Help: "Total number of blobs retrieved from the cache storage",
|
||||
})
|
||||
CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_miss_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "miss_total",
|
||||
Help: "Total number of blobs retrieved from origin rather than cache storage",
|
||||
})
|
||||
CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_origin_requests_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "origin_requests_total",
|
||||
Help: "How many Get requests are in flight from the cache to the origin",
|
||||
})
|
||||
// during thundering-herd situations, the metric below should be a lot smaller than the metric above
|
||||
CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_waiting_requests_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "waiting_requests_total",
|
||||
Help: "How many cache requests are waiting for an in-flight origin request",
|
||||
})
|
||||
CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Subsystem: subsystemCache,
|
||||
Name: "evict_total",
|
||||
Help: "Count of blobs evicted from cache",
|
||||
}, []string{LabelCacheType})
|
||||
|
||||
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
|
@ -151,7 +163,7 @@ var (
|
|||
Namespace: ns,
|
||||
Name: "speed_mbps",
|
||||
Help: "Speed of blob retrieval",
|
||||
}, []string{MtrLabelSource})
|
||||
}, []string{LabelSource})
|
||||
|
||||
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
|
|
|
@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) {
|
|||
return c, errors.Prefix("connection error", err)
|
||||
}
|
||||
|
||||
func (p *Store) Name() string { return "http3" }
|
||||
|
||||
// Has asks the peer if they have a hash
|
||||
func (p *Store) Has(hash string) (bool, error) {
|
||||
c, err := p.getClient()
|
||||
|
|
|
@ -34,7 +34,7 @@ var availabilityRequests = []pair{
|
|||
}
|
||||
|
||||
func getServer(t *testing.T, withBlobs bool) *Server {
|
||||
st := store.NewMemoryStore()
|
||||
st := store.NewMemStore()
|
||||
if withBlobs {
|
||||
for k, v := range blobs {
|
||||
err := st.Put(k, v)
|
||||
|
|
|
@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) {
|
|||
return c, errors.Prefix("connection error", err)
|
||||
}
|
||||
|
||||
func (p *Store) Name() string { return "peer" }
|
||||
|
||||
// Has asks the peer if they have a hash
|
||||
func (p *Store) Has(hash string) (bool, error) {
|
||||
c, err := p.getClient()
|
||||
|
|
|
@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
srv := NewServer(store.NewMemoryStore())
|
||||
srv := NewServer(store.NewMemStore())
|
||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
srv := NewServer(store.NewMemoryStore())
|
||||
srv := NewServer(store.NewMemStore())
|
||||
srv.Timeout = testTimeout
|
||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||
if err != nil {
|
||||
|
@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) {
|
|||
//}
|
||||
|
||||
type mockPartialStore struct {
|
||||
*store.MemoryStore
|
||||
*store.MemStore
|
||||
missing []string
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) {
|
|||
missing[i] = bits.Rand().String()
|
||||
}
|
||||
|
||||
st := store.BlobStore(&mockPartialStore{MemoryStore: store.NewMemoryStore(), missing: missing})
|
||||
st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing})
|
||||
if _, ok := st.(neededBlobChecker); !ok {
|
||||
t.Fatal("mock does not implement the relevant interface")
|
||||
}
|
||||
|
|
|
@ -25,6 +25,11 @@ func NewCachingStore(origin, cache BlobStore) *CachingStore {
|
|||
return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)}
|
||||
}
|
||||
|
||||
const nameCaching = "caching"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CachingStore) Name() string { return nameCaching }
|
||||
|
||||
// Has checks the cache and then the origin for a hash. It returns true if either store has it.
|
||||
func (c *CachingStore) Has(hash string) (bool, error) {
|
||||
has, err := c.cache.Has(hash)
|
||||
|
@ -42,7 +47,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
|||
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
||||
metrics.CacheHitCount.Inc()
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate)
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "cache"}).Set(rate)
|
||||
return blob, err
|
||||
}
|
||||
|
||||
|
@ -66,7 +71,7 @@ func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) {
|
|||
}
|
||||
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate)
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "origin"}).Set(rate)
|
||||
|
||||
err = c.cache.Put(hash, blob)
|
||||
return blob, err
|
||||
|
|
|
@ -9,9 +9,9 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
func TestCachingBlobStore_Put(t *testing.T) {
|
||||
origin := NewMemoryStore()
|
||||
cache := NewMemoryStore()
|
||||
func TestCachingStore_Put(t *testing.T) {
|
||||
origin := NewMemStore()
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore(origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
|
@ -39,9 +39,9 @@ func TestCachingBlobStore_Put(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
||||
origin := NewMemoryStore()
|
||||
cache := NewMemoryStore()
|
||||
func TestCachingStore_CacheMiss(t *testing.T) {
|
||||
origin := NewMemStore()
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore(origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
|
@ -76,10 +76,10 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
|
||||
func TestCachingStore_ThunderingHerd(t *testing.T) {
|
||||
storeDelay := 100 * time.Millisecond
|
||||
origin := NewSlowBlobStore(storeDelay)
|
||||
cache := NewMemoryStore()
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore(origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
|
@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
|
|||
|
||||
// SlowBlobStore adds a delay to each request
|
||||
type SlowBlobStore struct {
|
||||
mem *MemoryStore
|
||||
mem *MemStore
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func NewSlowBlobStore(delay time.Duration) *SlowBlobStore {
|
||||
return &SlowBlobStore{
|
||||
mem: NewMemoryStore(),
|
||||
mem: NewMemStore(),
|
||||
delay: delay,
|
||||
}
|
||||
}
|
||||
func (s *SlowBlobStore) Name() string {
|
||||
return "slow"
|
||||
}
|
||||
|
||||
func (s *SlowBlobStore) Has(hash string) (bool, error) {
|
||||
time.Sleep(s.delay)
|
||||
|
|
|
@ -34,6 +34,11 @@ func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore {
|
|||
}
|
||||
}
|
||||
|
||||
const nameCloudFront = "cloudfront"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CloudFrontStore) Name() string { return nameCloudFront }
|
||||
|
||||
// Has checks if the hash is in the store.
|
||||
func (c *CloudFrontStore) Has(hash string) (bool, error) {
|
||||
status, body, err := c.cfRequest(http.MethodHead, hash)
|
||||
|
|
|
@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore {
|
|||
return &DBBackedStore{blobs: blobs, db: db}
|
||||
}
|
||||
|
||||
const nameDBBacked = "db-backed"
|
||||
|
||||
// Name is the cache type name
|
||||
func (d *DBBackedStore) Name() string { return nameDBBacked }
|
||||
|
||||
// Has returns true if the blob is in the store
|
||||
func (d *DBBackedStore) Has(hash string) (bool, error) {
|
||||
return d.db.HasBlob(hash)
|
||||
|
|
|
@ -35,34 +35,10 @@ func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
|||
}
|
||||
}
|
||||
|
||||
func (d *DiskStore) dir(hash string) string {
|
||||
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
|
||||
return d.blobDir
|
||||
}
|
||||
return path.Join(d.blobDir, hash[:d.prefixLength])
|
||||
}
|
||||
const nameDisk = "disk"
|
||||
|
||||
func (d *DiskStore) path(hash string) string {
|
||||
return path.Join(d.dir(hash), hash)
|
||||
}
|
||||
|
||||
func (d *DiskStore) ensureDirExists(dir string) error {
|
||||
return errors.Err(d.fs.MkdirAll(dir, 0755))
|
||||
}
|
||||
|
||||
func (d *DiskStore) initOnce() error {
|
||||
if d.initialized {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := d.ensureDirExists(d.blobDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
// Name is the cache type name
|
||||
func (d *DiskStore) Name() string { return nameDisk }
|
||||
|
||||
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
|
||||
func (d *DiskStore) Has(hash string) (bool, error) {
|
||||
|
@ -166,3 +142,32 @@ func (d *DiskStore) list() ([]string, error) {
|
|||
|
||||
return existing, nil
|
||||
}
|
||||
|
||||
func (d *DiskStore) dir(hash string) string {
|
||||
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
|
||||
return d.blobDir
|
||||
}
|
||||
return path.Join(d.blobDir, hash[:d.prefixLength])
|
||||
}
|
||||
|
||||
func (d *DiskStore) path(hash string) string {
|
||||
return path.Join(d.dir(hash), hash)
|
||||
}
|
||||
|
||||
func (d *DiskStore) ensureDirExists(dir string) error {
|
||||
return errors.Err(d.fs.MkdirAll(dir, 0755))
|
||||
}
|
||||
|
||||
func (d *DiskStore) initOnce() error {
|
||||
if d.initialized {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := d.ensureDirExists(d.blobDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package store
|
|||
import (
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
||||
golru "github.com/hashicorp/golang-lru"
|
||||
)
|
||||
|
@ -18,6 +19,7 @@ type LRUStore struct {
|
|||
// NewLRUStore initialize a new LRUStore
|
||||
func NewLRUStore(store BlobStore, maxItems int) *LRUStore {
|
||||
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
|
||||
metrics.CacheLRUEvictCount.WithLabelValues(store.Name()).Inc()
|
||||
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -39,6 +41,11 @@ func NewLRUStore(store BlobStore, maxItems int) *LRUStore {
|
|||
return l
|
||||
}
|
||||
|
||||
const nameLRU = "lru"
|
||||
|
||||
// Name is the cache type name
|
||||
func (l *LRUStore) Name() string { return nameLRU }
|
||||
|
||||
// Has returns whether the blob is in the store, without updating the recent-ness.
|
||||
func (l *LRUStore) Has(hash string) (bool, error) {
|
||||
return l.lru.Contains(hash), nil
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
|
||||
const cacheMaxBlobs = 3
|
||||
|
||||
func testLRUStore() (*LRUStore, *DiskStore) {
|
||||
func getTestLRUStore() (*LRUStore, *DiskStore) {
|
||||
d := NewDiskStore("/", 2)
|
||||
d.fs = afero.NewMemMapFs()
|
||||
return NewLRUStore(d, 3), d
|
||||
|
@ -42,7 +42,7 @@ func countOnDisk(t *testing.T, disk *DiskStore) int {
|
|||
}
|
||||
|
||||
func TestLRUStore_Eviction(t *testing.T) {
|
||||
lru, disk := testLRUStore()
|
||||
lru, disk := getTestLRUStore()
|
||||
b := []byte("x")
|
||||
err := lru.Put("one", b)
|
||||
require.NoError(t, err)
|
||||
|
@ -98,7 +98,7 @@ func TestLRUStore_Eviction(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
|
||||
lru, disk := testLRUStore()
|
||||
lru, disk := getTestLRUStore()
|
||||
hash := "hash"
|
||||
b := []byte("this is a blob of stuff")
|
||||
err := lru.Put(hash, b)
|
||||
|
|
|
@ -5,25 +5,30 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
// MemoryStore is an in memory only blob store with no persistence.
|
||||
type MemoryStore struct {
|
||||
// MemStore is an in memory only blob store with no persistence.
|
||||
type MemStore struct {
|
||||
blobs map[string]stream.Blob
|
||||
}
|
||||
|
||||
func NewMemoryStore() *MemoryStore {
|
||||
return &MemoryStore{
|
||||
func NewMemStore() *MemStore {
|
||||
return &MemStore{
|
||||
blobs: make(map[string]stream.Blob),
|
||||
}
|
||||
}
|
||||
|
||||
const nameMem = "mem"
|
||||
|
||||
// Name is the cache type name
|
||||
func (m *MemStore) Name() string { return nameMem }
|
||||
|
||||
// Has returns T/F if the blob is currently stored. It will never error.
|
||||
func (m *MemoryStore) Has(hash string) (bool, error) {
|
||||
func (m *MemStore) Has(hash string) (bool, error) {
|
||||
_, ok := m.blobs[hash]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// Get returns the blob byte slice if present and errors if the blob is not found.
|
||||
func (m *MemoryStore) Get(hash string) (stream.Blob, error) {
|
||||
func (m *MemStore) Get(hash string) (stream.Blob, error) {
|
||||
blob, ok := m.blobs[hash]
|
||||
if !ok {
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
|
@ -32,23 +37,23 @@ func (m *MemoryStore) Get(hash string) (stream.Blob, error) {
|
|||
}
|
||||
|
||||
// Put stores the blob in memory
|
||||
func (m *MemoryStore) Put(hash string, blob stream.Blob) error {
|
||||
func (m *MemStore) Put(hash string, blob stream.Blob) error {
|
||||
m.blobs[hash] = blob
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob in memory
|
||||
func (m *MemoryStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (m *MemStore) PutSD(hash string, blob stream.Blob) error {
|
||||
return m.Put(hash, blob)
|
||||
}
|
||||
|
||||
// Delete deletes the blob from the store
|
||||
func (m *MemoryStore) Delete(hash string) error {
|
||||
func (m *MemStore) Delete(hash string) error {
|
||||
delete(m.blobs, hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Debug returns the blobs in memory. It's useful for testing and debugging.
|
||||
func (m *MemoryStore) Debug() map[string]stream.Blob {
|
||||
func (m *MemStore) Debug() map[string]stream.Blob {
|
||||
return m.blobs
|
||||
}
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
)
|
||||
|
||||
func TestMemoryBlobStore_Put(t *testing.T) {
|
||||
s := NewMemoryStore()
|
||||
func TestMemStore_Put(t *testing.T) {
|
||||
s := NewMemStore()
|
||||
blob := []byte("abcdefg")
|
||||
err := s.Put("abc", blob)
|
||||
if err != nil {
|
||||
|
@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMemoryBlobStore_Get(t *testing.T) {
|
||||
s := NewMemoryStore()
|
||||
func TestMemStore_Get(t *testing.T) {
|
||||
s := NewMemStore()
|
||||
hash := "abc"
|
||||
blob := []byte("abcdefg")
|
||||
err := s.Put(hash, blob)
|
||||
|
|
35
store/s3.go
35
store/s3.go
|
@ -38,22 +38,10 @@ func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *S3Store) initOnce() error {
|
||||
if s.session != nil {
|
||||
return nil
|
||||
}
|
||||
const nameS3 = "s3"
|
||||
|
||||
sess, err := session.NewSession(&aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
|
||||
Region: aws.String(s.region),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
// Name is the cache type name
|
||||
func (s *S3Store) Name() string { return nameS3 }
|
||||
|
||||
// Has returns T/F or Error ( from S3 ) if the store contains the blob.
|
||||
func (s *S3Store) Has(hash string) (bool, error) {
|
||||
|
@ -153,3 +141,20 @@ func (s *S3Store) Delete(hash string) error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *S3Store) initOnce() error {
|
||||
if s.session != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
sess, err := session.NewSession(&aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
|
||||
Region: aws.String(s.region),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
|
||||
// BlobStore is an interface for handling blob storage.
|
||||
type BlobStore interface {
|
||||
// Name of blob store (useful for metrics)
|
||||
Name() string
|
||||
// Does blob exist in the store
|
||||
Has(hash string) (bool, error)
|
||||
// Get the blob from the store
|
||||
|
|
Loading…
Reference in a new issue