Ittt #52

Merged
anbsky merged 62 commits from ittt into master 2021-07-24 02:35:22 +02:00
7 changed files with 107 additions and 366 deletions
Showing only changes of commit 63a574ec2f - Show all commits

View file

@ -56,13 +56,14 @@ var (
secondaryDiskCache string secondaryDiskCache string
memCache int memCache int
) )
var cacheManagers = []string{"localdb", "lfuda", "lru"} var cacheManagers = []string{"localdb", "lfu", "arc", "lru", "simple"}
const ( var cacheMangerToGcache = map[string]store.EvictionStrategy{
LOCALDB int = iota "lfu": store.LFU,
LFUDA "arc": store.ARC,
LRU "lru": store.LRU,
) "simple": store.SIMPLE,
}
func init() { func init() {
var cmd = &cobra.Command{ var cmd = &cobra.Command{
@ -89,8 +90,8 @@ func init() {
cmd.Flags().StringVar(&originEndpoint, "origin-endpoint", "", "HTTP edge endpoint for standard HTTP retrieval") cmd.Flags().StringVar(&originEndpoint, "origin-endpoint", "", "HTTP edge endpoint for standard HTTP retrieval")
cmd.Flags().StringVar(&originEndpointFallback, "origin-endpoint-fallback", "", "HTTP edge endpoint for standard HTTP retrieval if first origin fails") cmd.Flags().StringVar(&originEndpointFallback, "origin-endpoint-fallback", "", "HTTP edge endpoint for standard HTTP retrieval if first origin fails")
cmd.Flags().StringVar(&diskCache, "disk-cache", "100GB:/tmp/downloaded_blobs:localdb", "Where to cache blobs on the file system. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru)") cmd.Flags().StringVar(&diskCache, "disk-cache", "100GB:/tmp/downloaded_blobs:localdb", "Where to cache blobs on the file system. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfu/arc/lru)")
cmd.Flags().StringVar(&secondaryDiskCache, "optional-disk-cache", "", "Optional secondary file system cache for blobs. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru) (this would get hit before the one specified in disk-cache)") cmd.Flags().StringVar(&secondaryDiskCache, "optional-disk-cache", "", "Optional secondary file system cache for blobs. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfu/arc/lru) (this would get hit before the one specified in disk-cache)")
cmd.Flags().IntVar(&memCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") cmd.Flags().IntVar(&memCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
@ -228,7 +229,7 @@ func initCaches(s store.BlobStore) (store.BlobStore, *stop.Group) {
finalStore = store.NewCachingStore( finalStore = store.NewCachingStore(
"reflector", "reflector",
finalStore, finalStore,
store.NewLRUStore("mem", store.NewMemStore(), memCache), store.NewGcacheStore("mem", store.NewMemStore(), memCache, store.LRU),
) )
} }
return finalStore, stopper return finalStore, stopper
@ -251,8 +252,7 @@ func initDiskStore(upstreamStore store.BlobStore, diskParams string, stopper *st
var unwrappedStore store.BlobStore var unwrappedStore store.BlobStore
cleanerStopper := stop.New(stopper) cleanerStopper := stop.New(stopper)
switch cacheManager { if cacheManager == "localdb" {
case cacheManagers[LOCALDB]:
localDb := &db.SQL{ localDb := &db.SQL{
SoftDelete: true, SoftDelete: true,
TrackAccess: db.TrackAccessBlobs, TrackAccess: db.TrackAccessBlobs,
@ -264,11 +264,10 @@ func initDiskStore(upstreamStore store.BlobStore, diskParams string, stopper *st
} }
unwrappedStore = store.NewDBBackedStore(diskStore, localDb, true) unwrappedStore = store.NewDBBackedStore(diskStore, localDb, true)
go cleanOldestBlobs(int(realCacheSize), localDb, unwrappedStore, cleanerStopper) go cleanOldestBlobs(int(realCacheSize), localDb, unwrappedStore, cleanerStopper)
case cacheManagers[LFUDA]: } else {
unwrappedStore = store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize) unwrappedStore = store.NewGcacheStore("nvme", store.NewDiskStore(diskCachePath, 2), int(realCacheSize), cacheMangerToGcache[cacheManager])
case cacheManagers[LRU]:
unwrappedStore = store.NewLRUStore("nvme", store.NewDiskStore(diskCachePath, 2), int(realCacheSize))
} }
wrapped := store.NewCachingStore( wrapped := store.NewCachingStore(
"reflector", "reflector",
upstreamStore, upstreamStore,

3
go.mod
View file

@ -2,13 +2,10 @@ module github.com/lbryio/reflector.go
replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19 replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203050410-e1076f12bf19
//replace github.com/lbryio/lbry.go/v2 => ../lbry.go
require ( require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/aws/aws-sdk-go v1.27.0 github.com/aws/aws-sdk-go v1.27.0
github.com/bluele/gcache v0.0.2 github.com/bluele/gcache v0.0.2
github.com/bparli/lfuda-go v0.3.1
github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7 github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d

2
go.sum
View file

@ -59,8 +59,6 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw= github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0= github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
github.com/bparli/lfuda-go v0.3.1 h1:nO9Szo627RC8/z+R+MMPBItNwHCOonchmpjQuQi8jVY=
github.com/bparli/lfuda-go v0.3.1/go.mod h1:BR5a9lwlqRqnPhU3F5ojFK3VhTKg8iFVtJJKgZBQhAo=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7 h1:7gNKWnX6OF+ERiXVw4I9RsHhZ52aumXdFE07nEx5v20= github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7 h1:7gNKWnX6OF+ERiXVw4I9RsHhZ52aumXdFE07nEx5v20=
github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7/go.mod h1:M/KA3XJG5PJaApPiv4gWNsgcSJquOQTqumZNLyYE0KM= github.com/brk0v/directio v0.0.0-20190225130936-69406e757cf7/go.mod h1:M/KA3XJG5PJaApPiv4gWNsgcSJquOQTqumZNLyYE0KM=

View file

@ -1,7 +1,5 @@
package store package store
//TODO: the caching strategy is actually not LFUDA, it should become a parameter and the name of the struct should be changed
import ( import (
"time" "time"
@ -11,27 +9,53 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/bparli/lfuda-go" "github.com/bluele/gcache"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// LFUDAStore adds a max cache size and Greedy-Dual-Size-Frequency cache eviction strategy to a BlobStore // GcacheStore adds a max cache size and Greedy-Dual-Size-Frequency cache eviction strategy to a BlobStore
type LFUDAStore struct { type GcacheStore struct {
// underlying store // underlying store
store BlobStore store BlobStore
// lfuda implementation // cache implementation
lfuda *lfuda.Cache cache gcache.Cache
} }
type EvictionStrategy int
// NewLFUDAStore initialize a new LRUStore const (
func NewLFUDAStore(component string, store BlobStore, maxSize float64) *LFUDAStore { //LFU Discards the least frequently used items first.
lfuda := lfuda.NewGDSFWithEvict(maxSize, func(key interface{}, value interface{}) { LFU EvictionStrategy = iota
//ARC Constantly balances between LRU and LFU, to improve the combined result.
ARC
//LRU Discards the least recently used items first.
LRU
//SIMPLE has no clear priority for evict cache. It depends on key-value map order.
SIMPLE
)
// NewGcacheStore initialize a new LRUStore
func NewGcacheStore(component string, store BlobStore, maxSize int, strategy EvictionStrategy) *GcacheStore {
cacheBuilder := gcache.New(maxSize)
var cache gcache.Cache
evictFunc := func(key interface{}, value interface{}) {
logrus.Infof("evicting %s", key)
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc() metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
}) }
l := &LFUDAStore{ switch strategy {
case LFU:
cache = cacheBuilder.LFU().EvictedFunc(evictFunc).Build()
case ARC:
cache = cacheBuilder.ARC().EvictedFunc(evictFunc).Build()
case LRU:
cache = cacheBuilder.LRU().EvictedFunc(evictFunc).Build()
case SIMPLE:
cache = cacheBuilder.Simple().EvictedFunc(evictFunc).Build()
}
l := &GcacheStore{
store: store, store: store,
lfuda: lfuda, cache: cache,
} }
go func() { go func() {
if lstr, ok := store.(lister); ok { if lstr, ok := store.(lister); ok {
@ -45,34 +69,34 @@ func NewLFUDAStore(component string, store BlobStore, maxSize float64) *LFUDASto
return l return l
} }
const nameLFUDA = "lfuda" const nameGcache = "gcache"
// Name is the cache type name // Name is the cache type name
func (l *LFUDAStore) Name() string { return nameLFUDA } func (l *GcacheStore) Name() string { return nameGcache }
// Has returns whether the blob is in the store, without updating the recent-ness. // Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LFUDAStore) Has(hash string) (bool, error) { func (l *GcacheStore) Has(hash string) (bool, error) {
return l.lfuda.Contains(hash), nil return l.cache.Has(hash), nil
} }
// Get returns the blob or an error if the blob doesn't exist. // Get returns the blob or an error if the blob doesn't exist.
func (l *LFUDAStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { func (l *GcacheStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now() start := time.Now()
_, has := l.lfuda.Get(hash) _, err := l.cache.Get(hash)
if !has { if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound) return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
} }
blob, stack, err := l.store.Get(hash) blob, stack, err := l.store.Get(hash)
if errors.Is(err, ErrBlobNotFound) { if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store // Blob disappeared from underlying store
l.lfuda.Remove(hash) l.cache.Remove(hash)
} }
return blob, stack.Stack(time.Since(start), l.Name()), err return blob, stack.Stack(time.Since(start), l.Name()), err
} }
// Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!! // Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!!
func (l *LFUDAStore) Put(hash string, blob stream.Blob) error { func (l *GcacheStore) Put(hash string, blob stream.Blob) error {
l.lfuda.Set(hash, true) l.cache.Set(hash, true)
has, _ := l.Has(hash) has, _ := l.Has(hash)
if has { if has {
err := l.store.Put(hash, blob) err := l.store.Put(hash, blob)
@ -84,8 +108,8 @@ func (l *LFUDAStore) Put(hash string, blob stream.Blob) error {
} }
// PutSD stores the sd blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!! // PutSD stores the sd blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!!
func (l *LFUDAStore) PutSD(hash string, blob stream.Blob) error { func (l *GcacheStore) PutSD(hash string, blob stream.Blob) error {
l.lfuda.Set(hash, true) l.cache.Set(hash, true)
has, _ := l.Has(hash) has, _ := l.Has(hash)
if has { if has {
err := l.store.PutSD(hash, blob) err := l.store.PutSD(hash, blob)
@ -97,7 +121,7 @@ func (l *LFUDAStore) PutSD(hash string, blob stream.Blob) error {
} }
// Delete deletes the blob from the store // Delete deletes the blob from the store
func (l *LFUDAStore) Delete(hash string) error { func (l *GcacheStore) Delete(hash string) error {
err := l.store.Delete(hash) err := l.store.Delete(hash)
if err != nil { if err != nil {
return err return err
@ -106,12 +130,12 @@ func (l *LFUDAStore) Delete(hash string) error {
// This must come after store.Delete() // This must come after store.Delete()
// Remove triggers onEvict function, which also tries to delete blob from store // Remove triggers onEvict function, which also tries to delete blob from store
// We need to delete it manually first so any errors can be propagated up // We need to delete it manually first so any errors can be propagated up
l.lfuda.Remove(hash) l.cache.Remove(hash)
return nil return nil
} }
// loadExisting imports existing blobs from the underlying store into the LRU cache // loadExisting imports existing blobs from the underlying store into the LRU cache
func (l *LFUDAStore) loadExisting(store lister, maxItems int) error { func (l *GcacheStore) loadExisting(store lister, maxItems int) error {
logrus.Infof("loading at most %d items", maxItems) logrus.Infof("loading at most %d items", maxItems)
existing, err := store.list() existing, err := store.list()
if err != nil { if err != nil {
@ -121,7 +145,7 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
added := 0 added := 0
for _, h := range existing { for _, h := range existing {
l.lfuda.Set(h, true) l.cache.Set(h, true)
added++ added++
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than the cache if maxItems > 0 && added >= maxItems { // underlying cache is bigger than the cache
break break
@ -131,6 +155,6 @@ func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
} }
// Shutdown shuts down the store gracefully // Shutdown shuts down the store gracefully
func (l *LFUDAStore) Shutdown() { func (l *GcacheStore) Shutdown() {
return return
} }

View file

@ -1,6 +1,7 @@
package store package store
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect" "reflect"
@ -13,93 +14,80 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
const cacheMaxBlobs = 3 const cacheMaxSize = 3
func getTestLRUStore() (*LRUStore, *MemStore) { func getTestGcacheStore() (*GcacheStore, *MemStore) {
m := NewMemStore() m := NewMemStore()
return NewLRUStore("test", m, 3), m return NewGcacheStore("test", m, cacheMaxSize, LFU), m
} }
func TestLRUStore_Eviction(t *testing.T) { func TestGcacheStore_Eviction(t *testing.T) {
lru, mem := getTestLRUStore() lfu, mem := getTestGcacheStore()
b := []byte("x") b := []byte("x")
err := lru.Put("one", b) for i := 0; i < 3; i++ {
err := lfu.Put(fmt.Sprintf("%d", i), b)
require.NoError(t, err) require.NoError(t, err)
err = lru.Put("two", b) for j := 0; j < 3-i; j++ {
_, _, err = lfu.Get(fmt.Sprintf("%d", i))
require.NoError(t, err) require.NoError(t, err)
err = lru.Put("three", b) }
require.NoError(t, err) }
err = lru.Put("four", b)
require.NoError(t, err)
err = lru.Put("five", b)
require.NoError(t, err)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{ for k, v := range map[string]bool{
"one": false, "0": true,
"two": false, "1": true,
"three": true, "2": true,
"four": true,
"five": true,
"six": false,
} { } {
has, err := lru.Has(k) has, err := lfu.Has(k)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, v, has) assert.Equal(t, v, has)
} }
err := lfu.Put("3", b)
lru.Get("three") // touch so it stays in cache require.NoError(t, err)
lru.Put("six", b)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{ for k, v := range map[string]bool{
"one": false, "0": true,
"two": false, "1": true,
"three": true, "2": false,
"four": false, "3": true,
"five": true,
"six": true,
} { } {
has, err := lru.Has(k) has, err := lfu.Has(k)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, v, has) assert.Equal(t, v, has)
} }
assert.Equal(t, cacheMaxSize, len(mem.Debug()))
err = lru.Delete("three") err = lfu.Delete("0")
assert.NoError(t, err) assert.NoError(t, err)
err = lru.Delete("five") err = lfu.Delete("1")
assert.NoError(t, err) assert.NoError(t, err)
err = lru.Delete("six") err = lfu.Delete("3")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 0, len(mem.Debug())) assert.Equal(t, 0, len(mem.Debug()))
} }
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { func TestGcacheStore_UnderlyingBlobMissing(t *testing.T) {
lru, mem := getTestLRUStore() lfu, mem := getTestGcacheStore()
hash := "hash" hash := "hash"
b := []byte("this is a blob of stuff") b := []byte("this is a blob of stuff")
err := lru.Put(hash, b) err := lfu.Put(hash, b)
require.NoError(t, err) require.NoError(t, err)
err = mem.Delete(hash) err = mem.Delete(hash)
require.NoError(t, err) require.NoError(t, err)
// hash still exists in lru // hash still exists in lru
assert.True(t, lru.lru.Has(hash)) assert.True(t, lfu.cache.Has(hash))
blob, _, err := lru.Get(hash) blob, _, err := lfu.Get(hash)
assert.Nil(t, blob) assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
reflect.TypeOf(err).String(), err.Error()) reflect.TypeOf(err).String(), err.Error())
// lru.Get() removes hash if underlying store doesn't have it // lru.Get() removes hash if underlying store doesn't have it
assert.False(t, lru.lru.Has(hash)) assert.False(t, lfu.cache.Has(hash))
} }
func TestLRUStore_loadExisting(t *testing.T) { func TestGcacheStore_loadExisting(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "reflector_test_*") tmpDir, err := ioutil.TempDir("", "reflector_test_*")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
@ -115,9 +103,9 @@ func TestLRUStore_loadExisting(t *testing.T) {
require.Equal(t, 1, len(existing), "blob should exist in cache") require.Equal(t, 1, len(existing), "blob should exist in cache")
assert.Equal(t, hash, existing[0]) assert.Equal(t, hash, existing[0])
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created lfu := NewGcacheStore("test", d, 3, LFU) // lru should load existing blobs when it's created
time.Sleep(100 * time.Millisecond) // async load so let's wait... time.Sleep(100 * time.Millisecond) // async load so let's wait...
has, err := lru.Has(hash) has, err := lfu.Has(hash)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, has, "hash should be loaded from disk store but it's not") assert.True(t, has, "hash should be loaded from disk store but it's not")
} }

View file

@ -1,136 +0,0 @@
package store
import (
"io/ioutil"
"os"
"reflect"
"testing"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const cacheMaxSize = 3
func getTestLFUDAStore() (*LFUDAStore, *MemStore) {
m := NewMemStore()
return NewLFUDAStore("test", m, cacheMaxSize), m
}
func TestFUDAStore_Eviction(t *testing.T) {
lfuda, mem := getTestLFUDAStore()
b := []byte("x")
err := lfuda.Put("one", b)
require.NoError(t, err)
err = lfuda.Put("two", b)
require.NoError(t, err)
err = lfuda.Put("three", b)
require.NoError(t, err)
err = lfuda.Put("four", b)
require.NoError(t, err)
err = lfuda.Put("five", b)
require.NoError(t, err)
err = lfuda.Put("five", b)
require.NoError(t, err)
err = lfuda.Put("four", b)
require.NoError(t, err)
err = lfuda.Put("two", b)
require.NoError(t, err)
_, _, err = lfuda.Get("five")
require.NoError(t, err)
_, _, err = lfuda.Get("four")
require.NoError(t, err)
_, _, err = lfuda.Get("two")
require.NoError(t, err)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{
"one": false,
"two": true,
"three": false,
"four": true,
"five": true,
"six": false,
} {
has, err := lfuda.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
lfuda.Get("two") // touch so it stays in cache
lfuda.Get("five") // touch so it stays in cache
lfuda.Put("six", b)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{
"one": false,
"two": true,
"three": false,
"four": false,
"five": true,
"six": true,
} {
has, err := lfuda.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
err = lfuda.Delete("six")
assert.NoError(t, err)
err = lfuda.Delete("five")
assert.NoError(t, err)
err = lfuda.Delete("two")
assert.NoError(t, err)
assert.Equal(t, 0, len(mem.Debug()))
}
func TestFUDAStore_UnderlyingBlobMissing(t *testing.T) {
lfuda, mem := getTestLFUDAStore()
hash := "hash"
b := []byte("this is a blob of stuff")
err := lfuda.Put(hash, b)
require.NoError(t, err)
err = mem.Delete(hash)
require.NoError(t, err)
// hash still exists in lru
assert.True(t, lfuda.lfuda.Contains(hash))
blob, _, err := lfuda.Get(hash)
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
reflect.TypeOf(err).String(), err.Error())
// lru.Get() removes hash if underlying store doesn't have it
assert.False(t, lfuda.lfuda.Contains(hash))
}
func TestFUDAStore_loadExisting(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
d := NewDiskStore(tmpDir, 2)
hash := "hash"
b := []byte("this is a blob of stuff")
err = d.Put(hash, b)
require.NoError(t, err)
existing, err := d.list()
require.NoError(t, err)
require.Equal(t, 1, len(existing), "blob should exist in cache")
assert.Equal(t, hash, existing[0])
lfuda := NewLFUDAStore("test", d, 3) // lru should load existing blobs when it's created
time.Sleep(100 * time.Millisecond) // async load so let's wait...
has, err := lfuda.Has(hash)
require.NoError(t, err)
assert.True(t, has, "hash should be loaded from disk store but it's not")
}

View file

@ -1,129 +0,0 @@
package store
import (
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/bluele/gcache"
"github.com/sirupsen/logrus"
)
// LRUStore adds a max cache size and LRU eviction to a BlobStore
type LRUStore struct {
// underlying store
store BlobStore
// lru implementation
lru gcache.Cache
}
// NewLRUStore initialize a new LRUStore
func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
l := &LRUStore{
store: store,
}
l.lru = gcache.New(maxItems).ARC().EvictedFunc(func(key, value interface{}) {
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(l.Name(), component)).Inc()
_ = store.Delete(key.(string))
}).Build()
go func() {
if lstr, ok := store.(lister); ok {
err := l.loadExisting(lstr, maxItems)
if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
}
}
}()
return l
}
// Name is the cache type name
func (l *LRUStore) Name() string {
return "lru_" + l.store.Name()
}
// Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LRUStore) Has(hash string) (bool, error) {
return l.lru.Has(hash), nil
}
// Get returns the blob or an error if the blob doesn't exist.
func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
_, err := l.lru.Get(hash)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
}
blob, stack, err := l.store.Get(hash)
if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store
l.lru.Remove(hash)
}
return blob, stack.Stack(time.Since(start), l.Name()), err
}
// Put stores the blob
func (l *LRUStore) Put(hash string, blob stream.Blob) error {
err := l.store.Put(hash, blob)
if err != nil {
return err
}
l.lru.Set(hash, true)
return nil
}
// PutSD stores the sd blob
func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
err := l.store.PutSD(hash, blob)
if err != nil {
return err
}
_ = l.lru.Set(hash, true)
return nil
}
// Delete deletes the blob from the store
func (l *LRUStore) Delete(hash string) error {
err := l.store.Delete(hash)
if err != nil {
return err
}
// This must come after store.Delete()
// Remove triggers onEvict function, which also tries to delete blob from store
// We need to delete it manually first so any errors can be propagated up
l.lru.Remove(hash)
return nil
}
// loadExisting imports existing blobs from the underlying store into the LRU cache
func (l *LRUStore) loadExisting(store lister, maxItems int) error {
logrus.Infof("loading at most %d items", maxItems)
existing, err := store.list()
if err != nil {
return err
}
logrus.Infof("read %d files from disk", len(existing))
added := 0
for _, h := range existing {
l.lru.Set(h, true)
added++
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
break
}
}
return nil
}
// Shutdown shuts down the store gracefully
func (l *LRUStore) Shutdown() {
return
}