separate disk and lru behavior

This commit is contained in:
Alex Grintsvayg 2020-10-22 12:18:31 -04:00
parent 69fa06420b
commit c6b53792c8
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
4 changed files with 192 additions and 75 deletions

View file

@ -8,22 +8,17 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/afero"
lru "github.com/hashicorp/golang-lru"
"github.com/spf13/afero"
)
// DiskBlobStore stores blobs on a local disk
type DiskBlobStore struct {
// the location of blobs on disk
blobDir string
// max number of blobs to store
maxBlobs int
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
prefixLength int
// lru cache
lru *lru.Cache
// filesystem abstraction
fs afero.Fs
@ -32,14 +27,12 @@ type DiskBlobStore struct {
}
// NewDiskBlobStore returns an initialized file disk store pointer.
func NewDiskBlobStore(dir string, maxBlobs, prefixLength int) *DiskBlobStore {
dbs := DiskBlobStore{
func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore {
return &DiskBlobStore{
blobDir: dir,
maxBlobs: maxBlobs,
prefixLength: prefixLength,
fs: afero.NewOsFs(),
}
return &dbs
}
func (d *DiskBlobStore) dir(hash string) string {
@ -67,19 +60,6 @@ func (d *DiskBlobStore) initOnce() error {
return err
}
l, err := lru.NewWithEvict(d.maxBlobs, func(key interface{}, value interface{}) {
_ = d.fs.Remove(d.path(key.(string))) // TODO: log this error. may happen if file is gone but cache entry still there?
})
if err != nil {
return errors.Err(err)
}
d.lru = l
err = d.loadExisting()
if err != nil {
return err
}
d.initialized = true
return nil
}
@ -91,7 +71,14 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) {
return false, err
}
return d.lru.Contains(hash), nil
_, err = d.fs.Stat(d.path(hash))
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, errors.Err(err)
}
return true, nil
}
// Get returns the blob or an error if the blob doesn't exist.
@ -101,22 +88,17 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
return nil, err
}
_, has := d.lru.Get(hash)
if !has {
return nil, errors.Err(ErrBlobNotFound)
}
file, err := d.fs.Open(d.path(hash))
if err != nil {
if os.IsNotExist(err) {
d.lru.Remove(hash)
return nil, errors.Err(ErrBlobNotFound)
}
return nil, err
}
defer file.Close()
return ioutil.ReadAll(file)
blob, err := ioutil.ReadAll(file)
return blob, errors.Err(err)
}
// Put stores the blob on disk
@ -132,13 +114,7 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
}
err = afero.WriteFile(d.fs, d.path(hash), blob, 0644)
if err != nil {
return errors.Err(err)
}
d.lru.Add(hash, true)
return nil
return errors.Err(err)
}
// PutSD stores the sd blob on the disk
@ -153,30 +129,40 @@ func (d *DiskBlobStore) Delete(hash string) error {
return err
}
d.lru.Remove(hash)
return nil
}
// loadExisting scans the blobDir and imports existing blobs into lru cache
func (d *DiskBlobStore) loadExisting() error {
dirs, err := afero.ReadDir(d.fs, d.blobDir)
has, err := d.Has(hash)
if err != nil {
return err
}
if !has {
return nil
}
err = d.fs.Remove(d.path(hash))
return errors.Err(err)
}
// list returns a slice of blobs that already exist in the blobDir
func (d *DiskBlobStore) list() ([]string, error) {
dirs, err := afero.ReadDir(d.fs, d.blobDir)
if err != nil {
return nil, err
}
var existing []string
for _, dir := range dirs {
if dir.IsDir() {
files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name()))
if err != nil {
return err
return nil, err
}
for _, file := range files {
if file.Mode().IsRegular() && !file.IsDir() {
d.lru.Add(file.Name(), true)
existing = append(existing, file.Name())
}
}
}
}
return nil
return existing, nil
}

113
store/lru.go Normal file
View file

@ -0,0 +1,113 @@
package store
import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
golru "github.com/hashicorp/golang-lru"
)
// LRUStore adds a max cache size and LRU eviction to a BlobStore
type LRUStore struct {
// underlying store
store BlobStore
// lru implementation
lru *golru.Cache
}
// NewLRUStore initialize a new LRUStore
func NewLRUStore(store BlobStore, maxItems int) *LRUStore {
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
})
if err != nil {
panic(err)
}
l := &LRUStore{
store: store,
lru: lru,
}
if lstr, ok := store.(lister); ok {
err = l.loadExisting(lstr, maxItems)
if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
}
}
return l
}
// Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LRUStore) Has(hash string) (bool, error) {
return l.lru.Contains(hash), nil
}
// Get returns the blob or an error if the blob doesn't exist.
func (l *LRUStore) Get(hash string) (stream.Blob, error) {
_, has := l.lru.Get(hash)
if !has {
return nil, errors.Err(ErrBlobNotFound)
}
blob, err := l.store.Get(hash)
if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store
l.lru.Remove(hash)
}
return blob, err
}
// Put stores the blob
func (l *LRUStore) Put(hash string, blob stream.Blob) error {
err := l.store.Put(hash, blob)
if err != nil {
return err
}
l.lru.Add(hash, true)
return nil
}
// PutSD stores the sd blob
func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
err := l.store.PutSD(hash, blob)
if err != nil {
return err
}
l.lru.Add(hash, true)
return nil
}
// Delete deletes the blob from the store
func (l *LRUStore) Delete(hash string) error {
err := l.store.Delete(hash)
if err != nil {
return err
}
// This must come after store.Delete()
// Remove triggers onEvict function, which also tries to delete blob from store
// We need to delete it manually first so any errors can be propagated up
l.lru.Remove(hash)
return nil
}
// loadExisting imports existing blobs from the underlying store into the LRU cache
func (l *LRUStore) loadExisting(store lister, maxItems int) error {
existing, err := store.list()
if err != nil {
return err
}
added := 0
for _, h := range existing {
l.lru.Add(h, true)
added++
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
break
}
}
return nil
}

View file

@ -14,16 +14,17 @@ import (
const cacheMaxBlobs = 3
func memDiskStore() *DiskBlobStore {
d := NewDiskBlobStore("/", cacheMaxBlobs, 2)
func testLRUStore() (*LRUStore, *DiskBlobStore) {
d := NewDiskBlobStore("/", 2)
d.fs = afero.NewMemMapFs()
return d
return NewLRUStore(d, 3), d
}
func countOnDisk(t *testing.T, fs afero.Fs) int {
func countOnDisk(t *testing.T, disk *DiskBlobStore) int {
t.Helper()
count := 0
afero.Walk(fs, "/", func(path string, info os.FileInfo, err error) error {
afero.Walk(disk.fs, "/", func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Fatal(err)
}
@ -32,24 +33,29 @@ func countOnDisk(t *testing.T, fs afero.Fs) int {
}
return nil
})
list, err := disk.list()
require.NoError(t, err)
require.Equal(t, count, len(list))
return count
}
func TestDiskBlobStore_LRU(t *testing.T) {
d := memDiskStore()
func TestLRUStore_Eviction(t *testing.T) {
lru, disk := testLRUStore()
b := []byte("x")
err := d.Put("one", b)
err := lru.Put("one", b)
require.NoError(t, err)
err = d.Put("two", b)
err = lru.Put("two", b)
require.NoError(t, err)
err = d.Put("three", b)
err = lru.Put("three", b)
require.NoError(t, err)
err = d.Put("four", b)
err = lru.Put("four", b)
require.NoError(t, err)
err = d.Put("five", b)
err = lru.Put("five", b)
require.NoError(t, err)
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs))
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk))
for k, v := range map[string]bool{
"one": false,
@ -59,15 +65,15 @@ func TestDiskBlobStore_LRU(t *testing.T) {
"five": true,
"six": false,
} {
has, err := d.Has(k)
has, err := lru.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
d.Get("three") // touch so it stays in cache
d.Put("six", b)
lru.Get("three") // touch so it stays in cache
lru.Put("six", b)
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs))
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk))
for k, v := range map[string]bool{
"one": false,
@ -77,33 +83,39 @@ func TestDiskBlobStore_LRU(t *testing.T) {
"five": true,
"six": true,
} {
has, err := d.Has(k)
has, err := lru.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
err = d.Delete("three")
err = lru.Delete("three")
assert.NoError(t, err)
err = d.Delete("five")
err = lru.Delete("five")
assert.NoError(t, err)
err = d.Delete("six")
err = lru.Delete("six")
assert.NoError(t, err)
assert.Equal(t, 0, countOnDisk(t, d.fs))
assert.Equal(t, 0, countOnDisk(t, disk))
}
func TestDiskBlobStore_FileMissingOnDisk(t *testing.T) {
d := memDiskStore()
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
lru, disk := testLRUStore()
hash := "hash"
b := []byte("this is a blob of stuff")
err := d.Put(hash, b)
err := lru.Put(hash, b)
require.NoError(t, err)
err = d.fs.Remove("/ha/hash")
err = disk.fs.Remove("/ha/hash")
require.NoError(t, err)
blob, err := d.Get(hash)
// hash still exists in lru
assert.True(t, lru.lru.Contains(hash))
blob, err := lru.Get(hash)
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
reflect.TypeOf(err).String(), err.Error())
// lru.Get() removes hash if underlying store doesn't have it
assert.False(t, lru.lru.Contains(hash))
}

View file

@ -27,5 +27,11 @@ type Blocklister interface {
Wants(hash string) (bool, error)
}
// lister is a store that can list cached blobs. This is helpful when an overlay
// cache needs to track blob existence.
type lister interface {
list() ([]string, error)
}
//ErrBlobNotFound is a standard error when a blob is not found in the store.
var ErrBlobNotFound = errors.Base("blob not found")