when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50
6 changed files with 267 additions and 2 deletions
3
go.mod
3
go.mod
|
@ -5,6 +5,7 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203
|
||||||
require (
|
require (
|
||||||
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
|
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
|
||||||
github.com/aws/aws-sdk-go v1.16.11
|
github.com/aws/aws-sdk-go v1.16.11
|
||||||
|
github.com/bparli/lfuda-go v0.3.0
|
||||||
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
|
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
|
||||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
|
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
|
||||||
github.com/davecgh/go-spew v1.1.1
|
github.com/davecgh/go-spew v1.1.1
|
||||||
|
@ -25,7 +26,7 @@ require (
|
||||||
github.com/lbryio/lbry.go v1.1.2 // indirect
|
github.com/lbryio/lbry.go v1.1.2 // indirect
|
||||||
github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128
|
github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128
|
||||||
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec
|
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec
|
||||||
github.com/lucas-clemente/quic-go v0.19.1
|
github.com/lucas-clemente/quic-go v0.19.2
|
||||||
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
|
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
|
||||||
github.com/prometheus/client_golang v0.9.2
|
github.com/prometheus/client_golang v0.9.2
|
||||||
github.com/sirupsen/logrus v1.4.2
|
github.com/sirupsen/logrus v1.4.2
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -25,6 +25,8 @@ github.com/aws/aws-sdk-go v1.16.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpi
|
||||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
|
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
|
||||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||||
|
github.com/bparli/lfuda-go v0.3.0 h1:b6qPjEb0BN006oQnj2nuGfz94yY3iYo0bmuFM079tQg=
|
||||||
|
github.com/bparli/lfuda-go v0.3.0/go.mod h1:BR5a9lwlqRqnPhU3F5ojFK3VhTKg8iFVtJJKgZBQhAo=
|
||||||
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
||||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
|
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
|
||||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
|
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
|
||||||
|
@ -223,6 +225,8 @@ github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec/go.mod h1:CG3wsDv5BiV
|
||||||
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||||
github.com/lucas-clemente/quic-go v0.19.1 h1:J9TkQJGJVOR3UmGhd4zdVYwKSA0EoXbLRf15uQJ6gT4=
|
github.com/lucas-clemente/quic-go v0.19.1 h1:J9TkQJGJVOR3UmGhd4zdVYwKSA0EoXbLRf15uQJ6gT4=
|
||||||
github.com/lucas-clemente/quic-go v0.19.1/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
|
github.com/lucas-clemente/quic-go v0.19.1/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
|
||||||
|
github.com/lucas-clemente/quic-go v0.19.2 h1:w8BBYUx5Z+kNpeaOeQW/KzcNsKWhh4O6PeQhb0nURPg=
|
||||||
|
github.com/lucas-clemente/quic-go v0.19.2/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
|
||||||
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
|
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
|
||||||
github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5/go.mod h1:c2mYKRyMb1BPkO5St0c/ps62L4S0W2NAkaTXj9qEI+0=
|
github.com/lusis/go-slackbot v0.0.0-20180109053408-401027ccfef5/go.mod h1:c2mYKRyMb1BPkO5St0c/ps62L4S0W2NAkaTXj9qEI+0=
|
||||||
github.com/lusis/slack-test v0.0.0-20180109053238-3c758769bfa6/go.mod h1:sFlOUpQL1YcjhFVXhg1CG8ZASEs/Mf1oVb6H75JL/zg=
|
github.com/lusis/slack-test v0.0.0-20180109053238-3c758769bfa6/go.mod h1:sFlOUpQL1YcjhFVXhg1CG8ZASEs/Mf1oVb6H75JL/zg=
|
||||||
|
|
|
@ -58,6 +58,7 @@ func TestCachingStore_CacheMiss(t *testing.T) {
|
||||||
if !bytes.Equal(b, res) {
|
if !bytes.Equal(b, res) {
|
||||||
t.Errorf("expected Get() to return %s, got %s", string(b), string(res))
|
t.Errorf("expected Get() to return %s, got %s", string(b), string(res))
|
||||||
}
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond) //storing to cache is done async so let's give it some time
|
||||||
|
|
||||||
has, err := cache.Has(hash)
|
has, err := cache.Has(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
118
store/lfuda.go
Normal file
118
store/lfuda.go
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/bparli/lfuda-go"
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LRUStore adds a max cache size and LRU eviction to a BlobStore
|
||||||
|
type LFUDAStore struct {
|
||||||
|
// underlying store
|
||||||
|
store BlobStore
|
||||||
|
// lfuda implementation
|
||||||
|
lfuda *lfuda.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLRUStore initialize a new LRUStore
|
||||||
|
func NewLFUDAStore(component string, store BlobStore, maxSize float64) *LFUDAStore {
|
||||||
|
lfuda := lfuda.NewGDSFWithEvict(maxSize, func(key interface{}, value interface{}) {
|
||||||
|
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
|
||||||
|
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
|
||||||
|
})
|
||||||
|
lfuda.Age()
|
||||||
|
l := &LFUDAStore{
|
||||||
|
store: store,
|
||||||
|
lfuda: lfuda,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
if lstr, ok := store.(lister); ok {
|
||||||
|
err := l.loadExisting(lstr, int(maxSize/2000000.0))
|
||||||
|
if err != nil {
|
||||||
|
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
const nameLFUDA = "lfuda"
|
||||||
|
|
||||||
|
var fakeTrue = []byte{'t'}
|
||||||
|
|
||||||
|
// Name is the cache type name
|
||||||
|
func (l *LFUDAStore) Name() string { return nameLFUDA }
|
||||||
|
|
||||||
|
// Has returns whether the blob is in the store, without updating the recent-ness.
|
||||||
|
func (l *LFUDAStore) Has(hash string) (bool, error) {
|
||||||
|
return l.lfuda.Contains(hash), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the blob or an error if the blob doesn't exist.
|
||||||
|
func (l *LFUDAStore) Get(hash string) (stream.Blob, error) {
|
||||||
|
_, has := l.lfuda.Get(hash)
|
||||||
|
if !has {
|
||||||
|
return nil, errors.Err(ErrBlobNotFound)
|
||||||
|
}
|
||||||
|
blob, err := l.store.Get(hash)
|
||||||
|
if errors.Is(err, ErrBlobNotFound) {
|
||||||
|
// Blob disappeared from underlying store
|
||||||
|
l.lfuda.Remove(hash)
|
||||||
|
}
|
||||||
|
return blob, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!!
|
||||||
|
func (l *LFUDAStore) Put(hash string, blob stream.Blob) error {
|
||||||
|
err := l.store.Put(hash, blob)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
l.lfuda.Set(hash, fakeTrue)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutSD stores the sd blob
|
||||||
|
func (l *LFUDAStore) PutSD(hash string, blob stream.Blob) error {
|
||||||
|
err := l.store.PutSD(hash, blob)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
l.lfuda.Set(hash, fakeTrue)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete deletes the blob from the store
|
||||||
|
func (l *LFUDAStore) Delete(hash string) error {
|
||||||
|
err := l.store.Delete(hash)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This must come after store.Delete()
|
||||||
|
// Remove triggers onEvict function, which also tries to delete blob from store
|
||||||
|
// We need to delete it manually first so any errors can be propagated up
|
||||||
|
l.lfuda.Remove(hash)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadExisting imports existing blobs from the underlying store into the LRU cache
|
||||||
|
func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
|
||||||
|
existing, err := store.list()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
added := 0
|
||||||
|
for _, h := range existing {
|
||||||
|
l.lfuda.Set(h, fakeTrue)
|
||||||
|
added++
|
||||||
|
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
139
store/lfuda_test.go
Normal file
139
store/lfuda_test.go
Normal file
|
@ -0,0 +1,139 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
const cacheMaxSize = 3
|
||||||
|
|
||||||
|
func getTestLFUDAStore() (*LFUDAStore, *MemStore) {
|
||||||
|
m := NewMemStore()
|
||||||
|
return NewLFUDAStore("test", m, cacheMaxSize), m
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFUDAStore_Eviction(t *testing.T) {
|
||||||
|
lfuda, mem := getTestLFUDAStore()
|
||||||
|
b := []byte("x")
|
||||||
|
err := lfuda.Put("one", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lfuda.Put("two", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lfuda.Put("three", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lfuda.Put("four", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lfuda.Put("five", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lfuda.Put("five", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lfuda.Put("four", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = lfuda.Put("two", b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = lfuda.Get("five")
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = lfuda.Get("four")
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, err = lfuda.Get("two")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||||
|
|
||||||
|
for k, v := range map[string]bool{
|
||||||
|
"one": false,
|
||||||
|
"two": true,
|
||||||
|
"three": false,
|
||||||
|
"four": true,
|
||||||
|
"five": true,
|
||||||
|
"six": false,
|
||||||
|
} {
|
||||||
|
has, err := lfuda.Has(k)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, v, has)
|
||||||
|
}
|
||||||
|
|
||||||
|
lfuda.Get("two") // touch so it stays in cache
|
||||||
|
lfuda.Get("five") // touch so it stays in cache
|
||||||
|
lfuda.Put("six", b)
|
||||||
|
|
||||||
|
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||||
|
|
||||||
|
keys := lfuda.lfuda.Keys()
|
||||||
|
log.Infof("%+v", keys)
|
||||||
|
for k, v := range map[string]bool{
|
||||||
|
"one": false,
|
||||||
|
"two": true,
|
||||||
|
"three": false,
|
||||||
|
"four": false,
|
||||||
|
"five": true,
|
||||||
|
"six": true,
|
||||||
|
} {
|
||||||
|
has, err := lfuda.Has(k)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, v, has)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = lfuda.Delete("six")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = lfuda.Delete("five")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = lfuda.Delete("two")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 0, len(mem.Debug()))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFUDAStore_UnderlyingBlobMissing(t *testing.T) {
|
||||||
|
lfuda, mem := getTestLFUDAStore()
|
||||||
|
hash := "hash"
|
||||||
|
b := []byte("this is a blob of stuff")
|
||||||
|
err := lfuda.Put(hash, b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = mem.Delete(hash)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// hash still exists in lru
|
||||||
|
assert.True(t, lfuda.lfuda.Contains(hash))
|
||||||
|
|
||||||
|
blob, err := lfuda.Get(hash)
|
||||||
|
assert.Nil(t, blob)
|
||||||
|
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
|
||||||
|
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
|
||||||
|
reflect.TypeOf(err).String(), err.Error())
|
||||||
|
|
||||||
|
// lru.Get() removes hash if underlying store doesn't have it
|
||||||
|
assert.False(t, lfuda.lfuda.Contains(hash))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFUDAStore_loadExisting(t *testing.T) {
|
||||||
|
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
d := NewDiskStore(tmpDir, 2)
|
||||||
|
|
||||||
|
hash := "hash"
|
||||||
|
b := []byte("this is a blob of stuff")
|
||||||
|
err = d.Put(hash, b)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
existing, err := d.list()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, len(existing), "blob should exist in cache")
|
||||||
|
assert.Equal(t, hash, existing[0])
|
||||||
|
|
||||||
|
lfuda := NewLFUDAStore("test", d, 3) // lru should load existing blobs when it's created
|
||||||
|
time.Sleep(100 * time.Millisecond) // async load so let's wait...
|
||||||
|
has, err := lfuda.Has(hash)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.True(t, has, "hash should be loaded from disk store but it's not")
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
|
||||||
|
@ -115,6 +116,7 @@ func TestLRUStore_loadExisting(t *testing.T) {
|
||||||
assert.Equal(t, hash, existing[0])
|
assert.Equal(t, hash, existing[0])
|
||||||
|
|
||||||
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created
|
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created
|
||||||
|
time.Sleep(100 * time.Millisecond) // async load so let's wait...
|
||||||
has, err := lru.Has(hash)
|
has, err := lru.Has(hash)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, has, "hash should be loaded from disk store but it's not")
|
assert.True(t, has, "hash should be loaded from disk store but it's not")
|
||||||
|
|
Loading…
Reference in a new issue