remove afero fs abstraction in prep for using speedwalk
This commit is contained in:
parent
131fed28d2
commit
aaae3ffa5b
3 changed files with 23 additions and 48 deletions
|
@ -8,8 +8,6 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
"github.com/spf13/afero"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DiskStore stores blobs on a local disk
|
// DiskStore stores blobs on a local disk
|
||||||
|
@ -19,9 +17,6 @@ type DiskStore struct {
|
||||||
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
|
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
|
||||||
prefixLength int
|
prefixLength int
|
||||||
|
|
||||||
// filesystem abstraction
|
|
||||||
fs afero.Fs
|
|
||||||
|
|
||||||
// true if initOnce ran, false otherwise
|
// true if initOnce ran, false otherwise
|
||||||
initialized bool
|
initialized bool
|
||||||
}
|
}
|
||||||
|
@ -31,7 +26,6 @@ func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
||||||
return &DiskStore{
|
return &DiskStore{
|
||||||
blobDir: dir,
|
blobDir: dir,
|
||||||
prefixLength: prefixLength,
|
prefixLength: prefixLength,
|
||||||
fs: afero.NewOsFs(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,7 +41,7 @@ func (d *DiskStore) Has(hash string) (bool, error) {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = d.fs.Stat(d.path(hash))
|
_, err = os.Stat(d.path(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -64,7 +58,7 @@ func (d *DiskStore) Get(hash string) (stream.Blob, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
file, err := d.fs.Open(d.path(hash))
|
file, err := os.Open(d.path(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, errors.Err(ErrBlobNotFound)
|
return nil, errors.Err(ErrBlobNotFound)
|
||||||
|
@ -89,7 +83,7 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = afero.WriteFile(d.fs, d.path(hash), blob, 0644)
|
err = ioutil.WriteFile(d.path(hash), blob, 0644)
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +107,7 @@ func (d *DiskStore) Delete(hash string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = d.fs.Remove(d.path(hash))
|
err = os.Remove(d.path(hash))
|
||||||
return errors.Err(err)
|
return errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,7 +118,7 @@ func (d *DiskStore) list() ([]string, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
dirs, err := afero.ReadDir(d.fs, d.blobDir)
|
dirs, err := ioutil.ReadDir(d.blobDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -133,7 +127,7 @@ func (d *DiskStore) list() ([]string, error) {
|
||||||
|
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
if dir.IsDir() {
|
if dir.IsDir() {
|
||||||
files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name()))
|
files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -160,7 +154,7 @@ func (d *DiskStore) path(hash string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DiskStore) ensureDirExists(dir string) error {
|
func (d *DiskStore) ensureDirExists(dir string) error {
|
||||||
return errors.Err(d.fs.MkdirAll(dir, 0755))
|
return errors.Err(os.MkdirAll(dir, 0755))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DiskStore) initOnce() error {
|
func (d *DiskStore) initOnce() error {
|
||||||
|
|
|
@ -1,48 +1,26 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
|
||||||
"github.com/spf13/afero"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
const cacheMaxBlobs = 3
|
const cacheMaxBlobs = 3
|
||||||
|
|
||||||
func getTestLRUStore() (*LRUStore, *DiskStore) {
|
func getTestLRUStore() (*LRUStore, *MemStore) {
|
||||||
d := NewDiskStore("/", 2)
|
m := NewMemStore()
|
||||||
d.fs = afero.NewMemMapFs()
|
return NewLRUStore("test", m, 3), m
|
||||||
return NewLRUStore("test", d, 3), d
|
|
||||||
}
|
|
||||||
|
|
||||||
func countOnDisk(t *testing.T, disk *DiskStore) int {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
count := 0
|
|
||||||
afero.Walk(disk.fs, "/", func(path string, info os.FileInfo, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if !info.IsDir() {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
list, err := disk.list()
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, count, len(list))
|
|
||||||
|
|
||||||
return count
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLRUStore_Eviction(t *testing.T) {
|
func TestLRUStore_Eviction(t *testing.T) {
|
||||||
lru, disk := getTestLRUStore()
|
lru, mem := getTestLRUStore()
|
||||||
b := []byte("x")
|
b := []byte("x")
|
||||||
err := lru.Put("one", b)
|
err := lru.Put("one", b)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -55,7 +33,7 @@ func TestLRUStore_Eviction(t *testing.T) {
|
||||||
err = lru.Put("five", b)
|
err = lru.Put("five", b)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk))
|
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||||
|
|
||||||
for k, v := range map[string]bool{
|
for k, v := range map[string]bool{
|
||||||
"one": false,
|
"one": false,
|
||||||
|
@ -73,7 +51,7 @@ func TestLRUStore_Eviction(t *testing.T) {
|
||||||
lru.Get("three") // touch so it stays in cache
|
lru.Get("three") // touch so it stays in cache
|
||||||
lru.Put("six", b)
|
lru.Put("six", b)
|
||||||
|
|
||||||
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk))
|
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||||
|
|
||||||
for k, v := range map[string]bool{
|
for k, v := range map[string]bool{
|
||||||
"one": false,
|
"one": false,
|
||||||
|
@ -94,17 +72,17 @@ func TestLRUStore_Eviction(t *testing.T) {
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
err = lru.Delete("six")
|
err = lru.Delete("six")
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 0, countOnDisk(t, disk))
|
assert.Equal(t, 0, len(mem.Debug()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
|
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
|
||||||
lru, disk := getTestLRUStore()
|
lru, mem := getTestLRUStore()
|
||||||
hash := "hash"
|
hash := "hash"
|
||||||
b := []byte("this is a blob of stuff")
|
b := []byte("this is a blob of stuff")
|
||||||
err := lru.Put(hash, b)
|
err := lru.Put(hash, b)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = disk.fs.Remove("/ha/hash")
|
err = mem.Delete(hash)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// hash still exists in lru
|
// hash still exists in lru
|
||||||
|
@ -121,12 +99,14 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLRUStore_loadExisting(t *testing.T) {
|
func TestLRUStore_loadExisting(t *testing.T) {
|
||||||
d := NewDiskStore("/", 2)
|
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
|
||||||
d.fs = afero.NewMemMapFs()
|
require.NoError(t, err)
|
||||||
|
defer os.RemoveAll(tmpDir)
|
||||||
|
d := NewDiskStore(tmpDir, 2)
|
||||||
|
|
||||||
hash := "hash"
|
hash := "hash"
|
||||||
b := []byte("this is a blob of stuff")
|
b := []byte("this is a blob of stuff")
|
||||||
err := d.Put(hash, b)
|
err = d.Put(hash, b)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
existing, err := d.list()
|
existing, err := d.list()
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// MemStore is an in memory only blob store with no persistence.
|
// MemStore is an in memory only blob store with no persistence.
|
||||||
|
// MemStore is NOT THREAD-SAFE
|
||||||
type MemStore struct {
|
type MemStore struct {
|
||||||
blobs map[string]stream.Blob
|
blobs map[string]stream.Blob
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue