Ittt #52

Merged
anbsky merged 62 commits from ittt into master 2021-07-24 02:35:22 +02:00
7 changed files with 66 additions and 42 deletions
Showing only changes of commit 64acdc29c3 - Show all commits

View file

@ -254,6 +254,11 @@ func diskCacheParams(diskParams string) (int, string) {
}
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
// this is so that it runs on startup without having to wait for 10 minutes
err := doClean(maxItems, db, store, stopper)
if err != nil {
log.Error(errors.FullTrace(err))
}
const cleanupInterval = 10 * time.Minute
for {
select {
@ -281,19 +286,38 @@ func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Grou
if err != nil {
return err
}
for _, hash := range blobs {
select {
case <-stopper.Ch():
return nil
default:
}
err = store.Delete(hash)
if err != nil {
return err
blobsChan := make(chan string, len(blobs))
wg := &stop.Group{}
go func() {
for _, hash := range blobs {
select {
case <-stopper.Ch():
return
default:
}
blobsChan <- hash
}
close(blobsChan)
}()
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for h := range blobsChan {
select {
case <-stopper.Ch():
return
default:
}
err = store.Delete(h)
if err != nil {
log.Errorf("error pruning %s: %s", h, errors.FullTrace(err))
continue
}
}
}()
}
wg.Wait()
}
return nil
}

View file

@ -317,9 +317,10 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) {
}
var (
hash string
blobID, streamID uint64
lastAccessedAt null.Time
hash string
blobID uint64
streamID null.Uint64
lastAccessedAt null.Time
)
var needsTouch []uint64
@ -379,8 +380,8 @@ WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)`
if !lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline) {
if s.TrackAccess == TrackAccessBlobs {
needsTouch = append(needsTouch, blobID)
} else if s.TrackAccess == TrackAccessStreams {
needsTouch = append(needsTouch, streamID)
} else if s.TrackAccess == TrackAccessStreams && !streamID.IsZero() {
needsTouch = append(needsTouch, streamID.Uint64)
}
}
}
@ -779,8 +780,9 @@ CREATE TABLE blob_ (
length bigint(20) unsigned DEFAULT NULL,
last_accessed_at TIMESTAMP NULL DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY blob_hash_idx (hash)
KEY `blob_last_accessed_idx` (`last_accessed_at`)
UNIQUE KEY blob_hash_idx (hash),
KEY `blob_last_accessed_idx` (`last_accessed_at`),
KEY `is_stored_idx` (`is_stored`)
);
CREATE TABLE stream (

3
go.mod
View file

@ -7,6 +7,7 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203
require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/aws/aws-sdk-go v1.16.11
github.com/bluele/gcache v0.0.2
github.com/bparli/lfuda-go v0.3.1
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
@ -17,7 +18,7 @@ require (
github.com/google/gops v0.3.7
github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/memberlist v0.1.4 // indirect
github.com/hashicorp/serf v0.8.2
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf

2
go.sum
View file

@ -42,6 +42,8 @@ github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
github.com/bparli/lfuda-go v0.3.1 h1:nO9Szo627RC8/z+R+MMPBItNwHCOonchmpjQuQi8jVY=
github.com/bparli/lfuda-go v0.3.1/go.mod h1:BR5a9lwlqRqnPhU3F5ojFK3VhTKg8iFVtJJKgZBQhAo=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=

View file

@ -19,7 +19,7 @@ func TestDiskStore_Get(t *testing.T) {
defer os.RemoveAll(tmpDir)
d := NewDiskStore(tmpDir, 2)
hash := "1234567890"
hash := "f428b8265d65dad7f8ffa52922bba836404cbd62f3ecfe10adba6b444f8f658938e54f5981ac4de39644d5b93d89a94b"
data := []byte("oyuntyausntoyaunpdoyruoyduanrstjwfjyuwf")
expectedPath := path.Join(tmpDir, hash[:2], hash)

View file

@ -3,12 +3,13 @@ package store
import (
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
golru "github.com/hashicorp/golang-lru"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/bluele/gcache"
"github.com/sirupsen/logrus"
)
@ -17,7 +18,7 @@ type LRUStore struct {
// underlying store
store BlobStore
// lru implementation
lru *golru.Cache
lru gcache.Cache
}
// NewLRUStore initialize a new LRUStore
@ -25,20 +26,14 @@ func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
l := &LRUStore{
store: store,
}
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
l.lru = gcache.New(maxItems).ARC().EvictedFunc(func(key, value interface{}) {
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(l.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
})
if err != nil {
panic(err)
}
l.lru = lru
_ = store.Delete(key.(string))
}).Build()
go func() {
if lstr, ok := store.(lister); ok {
err = l.loadExisting(lstr, maxItems)
err := l.loadExisting(lstr, maxItems)
if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
}
@ -55,14 +50,14 @@ func (l *LRUStore) Name() string {
// Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LRUStore) Has(hash string) (bool, error) {
return l.lru.Contains(hash), nil
return l.lru.Has(hash), nil
}
// Get returns the blob or an error if the blob doesn't exist.
func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
_, has := l.lru.Get(hash)
if !has {
_, err := l.lru.Get(hash)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
}
blob, stack, err := l.store.Get(hash)
@ -80,7 +75,7 @@ func (l *LRUStore) Put(hash string, blob stream.Blob) error {
return err
}
l.lru.Add(hash, true)
l.lru.Set(hash, true)
return nil
}
@ -91,7 +86,7 @@ func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
return err
}
l.lru.Add(hash, true)
_ = l.lru.Set(hash, true)
return nil
}
@ -119,7 +114,7 @@ func (l *LRUStore) loadExisting(store lister, maxItems int) error {
logrus.Infof("read %d files from disk", len(existing))
added := 0
for _, h := range existing {
l.lru.Add(h, true)
l.lru.Set(h, true)
added++
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
break

View file

@ -87,7 +87,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
require.NoError(t, err)
// hash still exists in lru
assert.True(t, lru.lru.Contains(hash))
assert.True(t, lru.lru.Has(hash))
blob, _, err := lru.Get(hash)
assert.Nil(t, blob)
@ -96,7 +96,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
reflect.TypeOf(err).String(), err.Error())
// lru.Get() removes hash if underlying store doesn't have it
assert.False(t, lru.lru.Contains(hash))
assert.False(t, lru.lru.Has(hash))
}
func TestLRUStore_loadExisting(t *testing.T) {