diff --git a/go.mod b/go.mod index b98b2ad..5ab0eb5 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203 require ( github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v1.16.11 + github.com/bparli/lfuda-go v0.3.0 github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/davecgh/go-spew v1.1.1 @@ -25,7 +26,7 @@ require ( github.com/lbryio/lbry.go v1.1.2 // indirect github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128 github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec - github.com/lucas-clemente/quic-go v0.19.1 + github.com/lucas-clemente/quic-go v0.19.2 github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 github.com/prometheus/client_golang v0.9.2 github.com/sirupsen/logrus v1.4.2 diff --git a/go.sum b/go.sum index 9140bdb..6967521 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/aws/aws-sdk-go v1.16.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpi github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bparli/lfuda-go v0.3.0 h1:b6qPjEb0BN006oQnj2nuGfz94yY3iYo0bmuFM079tQg= +github.com/bparli/lfuda-go v0.3.0/go.mod h1:BR5a9lwlqRqnPhU3F5ojFK3VhTKg8iFVtJJKgZBQhAo= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= @@ -219,6 +221,8 @@ github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386/go.mod h1:CG3wsDv5BiV github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lucas-clemente/quic-go v0.19.1 h1:J9TkQJGJVOR3UmGhd4zdVYwKSA0EoXbLRf15uQJ6gT4= github.com/lucas-clemente/quic-go v0.19.1/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0= +github.com/lucas-clemente/quic-go v0.19.2 h1:w8BBYUx5Z+kNpeaOeQW/KzcNsKWhh4O6PeQhb0nURPg= +github.com/lucas-clemente/quic-go v0.19.2/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5/go.mod h1:c2mYKRyMb1BPkO5St0c/ps62L4S0W2NAkaTXj9qEI+0= github.com/lusis/slack-test v0.0.0-20180109053238-3c758769bfa6/go.mod h1:sFlOUpQL1YcjhFVXhg1CG8ZASEs/Mf1oVb6H75JL/zg= diff --git a/store/caching_test.go b/store/caching_test.go index 34f928c..1168a92 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -58,6 +58,7 @@ func TestCachingStore_CacheMiss(t *testing.T) { if !bytes.Equal(b, res) { t.Errorf("expected Get() to return %s, got %s", string(b), string(res)) } + time.Sleep(10 * time.Millisecond) //storing to cache is done async so let's give it some time has, err := cache.Has(hash) if err != nil { diff --git a/store/lfuda.go b/store/lfuda.go new file mode 100644 index 0000000..9715954 --- /dev/null +++ b/store/lfuda.go @@ -0,0 +1,118 @@ +package store + +import ( + "github.com/bparli/lfuda-go" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" +) + +// LRUStore adds a max cache size and LRU eviction to a BlobStore +type LFUDAStore struct { + // underlying store + store BlobStore + // lfuda implementation + lfuda *lfuda.Cache +} + +// NewLRUStore initialize a new LRUStore +func NewLFUDAStore(component string, store BlobStore, maxSize float64) *LFUDAStore { + lfuda := lfuda.NewGDSFWithEvict(maxSize, func(key interface{}, value interface{}) { + metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc() + _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there + }) + lfuda.Age() + l := &LFUDAStore{ + store: store, + lfuda: lfuda, + } + go func() { + if lstr, ok := store.(lister); ok { + err := l.loadExisting(lstr, int(maxSize/2000000.0)) + if err != nil { + panic(err) // TODO: what should happen here? panic? return nil? just keep going? + } + } + }() + + return l +} + +const nameLFUDA = "lfuda" + +var fakeTrue = []byte{'t'} + +// Name is the cache type name +func (l *LFUDAStore) Name() string { return nameLFUDA } + +// Has returns whether the blob is in the store, without updating the recent-ness. +func (l *LFUDAStore) Has(hash string) (bool, error) { + return l.lfuda.Contains(hash), nil +} + +// Get returns the blob or an error if the blob doesn't exist. +func (l *LFUDAStore) Get(hash string) (stream.Blob, error) { + _, has := l.lfuda.Get(hash) + if !has { + return nil, errors.Err(ErrBlobNotFound) + } + blob, err := l.store.Get(hash) + if errors.Is(err, ErrBlobNotFound) { + // Blob disappeared from underlying store + l.lfuda.Remove(hash) + } + return blob, err +} + +// Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!! +func (l *LFUDAStore) Put(hash string, blob stream.Blob) error { + err := l.store.Put(hash, blob) + if err != nil { + return err + } + l.lfuda.Set(hash, fakeTrue) + return nil +} + +// PutSD stores the sd blob +func (l *LFUDAStore) PutSD(hash string, blob stream.Blob) error { + err := l.store.PutSD(hash, blob) + if err != nil { + return err + } + + l.lfuda.Set(hash, fakeTrue) + return nil +} + +// Delete deletes the blob from the store +func (l *LFUDAStore) Delete(hash string) error { + err := l.store.Delete(hash) + if err != nil { + return err + } + + // This must come after store.Delete() + // Remove triggers onEvict function, which also tries to delete blob from store + // We need to delete it manually first so any errors can be propagated up + l.lfuda.Remove(hash) + return nil +} + +// loadExisting imports existing blobs from the underlying store into the LRU cache +func (l *LFUDAStore) loadExisting(store lister, maxItems int) error { + existing, err := store.list() + if err != nil { + return err + } + + added := 0 + for _, h := range existing { + l.lfuda.Set(h, fakeTrue) + added++ + if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache + break + } + } + return nil +} diff --git a/store/lfuda_test.go b/store/lfuda_test.go new file mode 100644 index 0000000..9b1eb58 --- /dev/null +++ b/store/lfuda_test.go @@ -0,0 +1,139 @@ +package store + +import ( + "io/ioutil" + "os" + "reflect" + "testing" + "time" + + "github.com/lbryio/lbry.go/v2/extras/errors" + log "github.com/sirupsen/logrus" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const cacheMaxSize = 3 + +func getTestLFUDAStore() (*LFUDAStore, *MemStore) { + m := NewMemStore() + return NewLFUDAStore("test", m, cacheMaxSize), m +} + +func TestFUDAStore_Eviction(t *testing.T) { + lfuda, mem := getTestLFUDAStore() + b := []byte("x") + err := lfuda.Put("one", b) + require.NoError(t, err) + err = lfuda.Put("two", b) + require.NoError(t, err) + err = lfuda.Put("three", b) + require.NoError(t, err) + err = lfuda.Put("four", b) + require.NoError(t, err) + err = lfuda.Put("five", b) + require.NoError(t, err) + err = lfuda.Put("five", b) + require.NoError(t, err) + err = lfuda.Put("four", b) + require.NoError(t, err) + err = lfuda.Put("two", b) + require.NoError(t, err) + + _, err = lfuda.Get("five") + require.NoError(t, err) + _, err = lfuda.Get("four") + require.NoError(t, err) + _, err = lfuda.Get("two") + require.NoError(t, err) + assert.Equal(t, cacheMaxBlobs, len(mem.Debug())) + + for k, v := range map[string]bool{ + "one": false, + "two": true, + "three": false, + "four": true, + "five": true, + "six": false, + } { + has, err := lfuda.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + lfuda.Get("two") // touch so it stays in cache + lfuda.Get("five") // touch so it stays in cache + lfuda.Put("six", b) + + assert.Equal(t, cacheMaxBlobs, len(mem.Debug())) + + keys := lfuda.lfuda.Keys() + log.Infof("%+v", keys) + for k, v := range map[string]bool{ + "one": false, + "two": true, + "three": false, + "four": false, + "five": true, + "six": true, + } { + has, err := lfuda.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + err = lfuda.Delete("six") + assert.NoError(t, err) + err = lfuda.Delete("five") + assert.NoError(t, err) + err = lfuda.Delete("two") + assert.NoError(t, err) + assert.Equal(t, 0, len(mem.Debug())) +} + +func TestFUDAStore_UnderlyingBlobMissing(t *testing.T) { + lfuda, mem := getTestLFUDAStore() + hash := "hash" + b := []byte("this is a blob of stuff") + err := lfuda.Put(hash, b) + require.NoError(t, err) + + err = mem.Delete(hash) + require.NoError(t, err) + + // hash still exists in lru + assert.True(t, lfuda.lfuda.Contains(hash)) + + blob, err := lfuda.Get(hash) + assert.Nil(t, blob) + assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", + reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), + reflect.TypeOf(err).String(), err.Error()) + + // lru.Get() removes hash if underlying store doesn't have it + assert.False(t, lfuda.lfuda.Contains(hash)) +} + +func TestFUDAStore_loadExisting(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "reflector_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + d := NewDiskStore(tmpDir, 2) + + hash := "hash" + b := []byte("this is a blob of stuff") + err = d.Put(hash, b) + require.NoError(t, err) + + existing, err := d.list() + require.NoError(t, err) + require.Equal(t, 1, len(existing), "blob should exist in cache") + assert.Equal(t, hash, existing[0]) + + lfuda := NewLFUDAStore("test", d, 3) // lru should load existing blobs when it's created + time.Sleep(100 * time.Millisecond) // async load so let's wait... + has, err := lfuda.Has(hash) + require.NoError(t, err) + assert.True(t, has, "hash should be loaded from disk store but it's not") +} diff --git a/store/lru_test.go b/store/lru_test.go index 968956c..2bd934c 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -5,6 +5,7 @@ import ( "os" "reflect" "testing" + "time" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -114,7 +115,8 @@ func TestLRUStore_loadExisting(t *testing.T) { require.Equal(t, 1, len(existing), "blob should exist in cache") assert.Equal(t, hash, existing[0]) - lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created + lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created + time.Sleep(100 * time.Millisecond) // async load so let's wait... has, err := lru.Has(hash) require.NoError(t, err) assert.True(t, has, "hash should be loaded from disk store but it's not")