diff --git a/cmd/reflector.go b/cmd/reflector.go index 6fe44ad..a642c7a 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -254,6 +254,11 @@ func diskCacheParams(diskParams string) (int, string) { } func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { + // this is so that it runs on startup without having to wait for 10 minutes + err := doClean(maxItems, db, store, stopper) + if err != nil { + log.Error(errors.FullTrace(err)) + } const cleanupInterval = 10 * time.Minute for { select { @@ -281,19 +286,38 @@ func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Grou if err != nil { return err } - - for _, hash := range blobs { - select { - case <-stopper.Ch(): - return nil - default: - } - - err = store.Delete(hash) - if err != nil { - return err + blobsChan := make(chan string, len(blobs)) + wg := &stop.Group{} + go func() { + for _, hash := range blobs { + select { + case <-stopper.Ch(): + return + default: + } + blobsChan <- hash } + close(blobsChan) + }() + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for h := range blobsChan { + select { + case <-stopper.Ch(): + return + default: + } + err = store.Delete(h) + if err != nil { + log.Errorf("error pruning %s: %s", h, errors.FullTrace(err)) + continue + } + } + }() } + wg.Wait() } return nil } diff --git a/db/db.go b/db/db.go index 3e7d4ca..babe641 100644 --- a/db/db.go +++ b/db/db.go @@ -781,7 +781,8 @@ CREATE TABLE blob_ ( last_accessed_at TIMESTAMP NULL DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY blob_hash_idx (hash), - KEY `blob_last_accessed_idx` (`last_accessed_at`) + KEY `blob_last_accessed_idx` (`last_accessed_at`), + KEY `is_stored_idx` (`is_stored`) ); CREATE TABLE stream ( diff --git a/go.mod b/go.mod index 4ed4f23..eceb7ba 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203 require ( github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v1.16.11 + github.com/bluele/gcache v0.0.2 github.com/bparli/lfuda-go v0.3.1 github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d @@ -17,7 +18,7 @@ require ( github.com/google/gops v0.3.7 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/golang-lru v0.5.4 + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/memberlist v0.1.4 // indirect github.com/hashicorp/serf v0.8.2 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf diff --git a/go.sum b/go.sum index c51b403..48680c9 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw= +github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0= github.com/bparli/lfuda-go v0.3.1 h1:nO9Szo627RC8/z+R+MMPBItNwHCOonchmpjQuQi8jVY= github.com/bparli/lfuda-go v0.3.1/go.mod h1:BR5a9lwlqRqnPhU3F5ojFK3VhTKg8iFVtJJKgZBQhAo= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= diff --git a/store/disk_test.go b/store/disk_test.go index 72c7755..1ab3f05 100644 --- a/store/disk_test.go +++ b/store/disk_test.go @@ -19,7 +19,7 @@ func TestDiskStore_Get(t *testing.T) { defer os.RemoveAll(tmpDir) d := NewDiskStore(tmpDir, 2) - hash := "1234567890" + hash := "f428b8265d65dad7f8ffa52922bba836404cbd62f3ecfe10adba6b444f8f658938e54f5981ac4de39644d5b93d89a94b" data := []byte("oyuntyausntoyaunpdoyruoyduanrstjwfjyuwf") expectedPath := path.Join(tmpDir, hash[:2], hash) diff --git a/store/lru.go b/store/lru.go index e96931b..be6cd82 100644 --- a/store/lru.go +++ b/store/lru.go @@ -3,12 +3,13 @@ package store import ( "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/shared" - golru "github.com/hashicorp/golang-lru" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + "github.com/bluele/gcache" "github.com/sirupsen/logrus" ) @@ -17,7 +18,7 @@ type LRUStore struct { // underlying store store BlobStore // lru implementation - lru *golru.Cache + lru gcache.Cache } // NewLRUStore initialize a new LRUStore @@ -25,20 +26,14 @@ func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore { l := &LRUStore{ store: store, } - - lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { + l.lru = gcache.New(maxItems).ARC().EvictedFunc(func(key, value interface{}) { metrics.CacheLRUEvictCount.With(metrics.CacheLabels(l.Name(), component)).Inc() - _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there - }) - if err != nil { - panic(err) - } - - l.lru = lru + _ = store.Delete(key.(string)) + }).Build() go func() { if lstr, ok := store.(lister); ok { - err = l.loadExisting(lstr, maxItems) + err := l.loadExisting(lstr, maxItems) if err != nil { panic(err) // TODO: what should happen here? panic? return nil? just keep going? } @@ -55,14 +50,14 @@ func (l *LRUStore) Name() string { // Has returns whether the blob is in the store, without updating the recent-ness. func (l *LRUStore) Has(hash string) (bool, error) { - return l.lru.Contains(hash), nil + return l.lru.Has(hash), nil } // Get returns the blob or an error if the blob doesn't exist. func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { start := time.Now() - _, has := l.lru.Get(hash) - if !has { + _, err := l.lru.Get(hash) + if err != nil { return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound) } blob, stack, err := l.store.Get(hash) @@ -80,7 +75,7 @@ func (l *LRUStore) Put(hash string, blob stream.Blob) error { return err } - l.lru.Add(hash, true) + l.lru.Set(hash, true) return nil } @@ -91,7 +86,7 @@ func (l *LRUStore) PutSD(hash string, blob stream.Blob) error { return err } - l.lru.Add(hash, true) + _ = l.lru.Set(hash, true) return nil } @@ -119,7 +114,7 @@ func (l *LRUStore) loadExisting(store lister, maxItems int) error { logrus.Infof("read %d files from disk", len(existing)) added := 0 for _, h := range existing { - l.lru.Add(h, true) + l.lru.Set(h, true) added++ if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache break diff --git a/store/lru_test.go b/store/lru_test.go index 6aec768..798d5c2 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -87,7 +87,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { require.NoError(t, err) // hash still exists in lru - assert.True(t, lru.lru.Contains(hash)) + assert.True(t, lru.lru.Has(hash)) blob, _, err := lru.Get(hash) assert.Nil(t, blob) @@ -96,7 +96,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { reflect.TypeOf(err).String(), err.Error()) // lru.Get() removes hash if underlying store doesn't have it - assert.False(t, lru.lru.Contains(hash)) + assert.False(t, lru.lru.Has(hash)) } func TestLRUStore_loadExisting(t *testing.T) {