when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50

Closed
shyba wants to merge 39 commits from insert_under_tx into master
7 changed files with 59 additions and 36 deletions
Showing only changes of commit c4084eeb68 - Show all commits

View file

@ -254,6 +254,11 @@ func diskCacheParams(diskParams string) (int, string) {
} }
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
// this is so that it runs on startup without having to wait for 10 minutes
err := doClean(maxItems, db, store, stopper)
if err != nil {
log.Error(errors.FullTrace(err))
}
const cleanupInterval = 10 * time.Minute const cleanupInterval = 10 * time.Minute
for { for {
select { select {
@ -281,19 +286,38 @@ func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Grou
if err != nil { if err != nil {
return err return err
} }
blobsChan := make(chan string, len(blobs))
for _, hash := range blobs { wg := &stop.Group{}
select { go func() {
case <-stopper.Ch(): for _, hash := range blobs {
return nil select {
default: case <-stopper.Ch():
} return
default:
err = store.Delete(hash) }
if err != nil { blobsChan <- hash
return err
} }
close(blobsChan)
}()
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for h := range blobsChan {
select {
case <-stopper.Ch():
return
default:
}
err = store.Delete(h)
if err != nil {
log.Errorf("error pruning %s: %s", h, errors.FullTrace(err))
continue
}
}
}()
} }
wg.Wait()
} }
return nil return nil
} }

View file

@ -781,7 +781,8 @@ CREATE TABLE blob_ (
last_accessed_at TIMESTAMP NULL DEFAULT NULL, last_accessed_at TIMESTAMP NULL DEFAULT NULL,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY blob_hash_idx (hash), UNIQUE KEY blob_hash_idx (hash),
KEY `blob_last_accessed_idx` (`last_accessed_at`) KEY `blob_last_accessed_idx` (`last_accessed_at`),
KEY `is_stored_idx` (`is_stored`)
); );
CREATE TABLE stream ( CREATE TABLE stream (

3
go.mod
View file

@ -7,6 +7,7 @@ replace github.com/btcsuite/btcd => github.com/lbryio/lbrycrd.go v0.0.0-20200203
require ( require (
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect
github.com/aws/aws-sdk-go v1.16.11 github.com/aws/aws-sdk-go v1.16.11
github.com/bluele/gcache v0.0.2
github.com/bparli/lfuda-go v0.3.1 github.com/bparli/lfuda-go v0.3.1
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
@ -17,7 +18,7 @@ require (
github.com/google/gops v0.3.7 github.com/google/gops v0.3.7
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/memberlist v0.1.4 // indirect github.com/hashicorp/memberlist v0.1.4 // indirect
github.com/hashicorp/serf v0.8.2 github.com/hashicorp/serf v0.8.2
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf

2
go.sum
View file

@ -42,6 +42,8 @@ github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
github.com/bparli/lfuda-go v0.3.1 h1:nO9Szo627RC8/z+R+MMPBItNwHCOonchmpjQuQi8jVY= github.com/bparli/lfuda-go v0.3.1 h1:nO9Szo627RC8/z+R+MMPBItNwHCOonchmpjQuQi8jVY=
github.com/bparli/lfuda-go v0.3.1/go.mod h1:BR5a9lwlqRqnPhU3F5ojFK3VhTKg8iFVtJJKgZBQhAo= github.com/bparli/lfuda-go v0.3.1/go.mod h1:BR5a9lwlqRqnPhU3F5ojFK3VhTKg8iFVtJJKgZBQhAo=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=

View file

@ -19,7 +19,7 @@ func TestDiskStore_Get(t *testing.T) {
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
d := NewDiskStore(tmpDir, 2) d := NewDiskStore(tmpDir, 2)
hash := "1234567890" hash := "f428b8265d65dad7f8ffa52922bba836404cbd62f3ecfe10adba6b444f8f658938e54f5981ac4de39644d5b93d89a94b"
data := []byte("oyuntyausntoyaunpdoyruoyduanrstjwfjyuwf") data := []byte("oyuntyausntoyaunpdoyruoyduanrstjwfjyuwf")
expectedPath := path.Join(tmpDir, hash[:2], hash) expectedPath := path.Join(tmpDir, hash[:2], hash)

View file

@ -3,12 +3,13 @@ package store
import ( import (
"time" "time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/shared"
golru "github.com/hashicorp/golang-lru" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/bluele/gcache"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -17,7 +18,7 @@ type LRUStore struct {
// underlying store // underlying store
store BlobStore store BlobStore
// lru implementation // lru implementation
lru *golru.Cache lru gcache.Cache
} }
// NewLRUStore initialize a new LRUStore // NewLRUStore initialize a new LRUStore
@ -25,20 +26,14 @@ func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
l := &LRUStore{ l := &LRUStore{
store: store, store: store,
} }
l.lru = gcache.New(maxItems).ARC().EvictedFunc(func(key, value interface{}) {
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(l.Name(), component)).Inc() metrics.CacheLRUEvictCount.With(metrics.CacheLabels(l.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there _ = store.Delete(key.(string))
}) }).Build()
if err != nil {
panic(err)
}
l.lru = lru
go func() { go func() {
if lstr, ok := store.(lister); ok { if lstr, ok := store.(lister); ok {
err = l.loadExisting(lstr, maxItems) err := l.loadExisting(lstr, maxItems)
if err != nil { if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going? panic(err) // TODO: what should happen here? panic? return nil? just keep going?
} }
@ -55,14 +50,14 @@ func (l *LRUStore) Name() string {
// Has returns whether the blob is in the store, without updating the recent-ness. // Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LRUStore) Has(hash string) (bool, error) { func (l *LRUStore) Has(hash string) (bool, error) {
return l.lru.Contains(hash), nil return l.lru.Has(hash), nil
} }
// Get returns the blob or an error if the blob doesn't exist. // Get returns the blob or an error if the blob doesn't exist.
func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now() start := time.Now()
_, has := l.lru.Get(hash) _, err := l.lru.Get(hash)
if !has { if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound) return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
} }
blob, stack, err := l.store.Get(hash) blob, stack, err := l.store.Get(hash)
@ -80,7 +75,7 @@ func (l *LRUStore) Put(hash string, blob stream.Blob) error {
return err return err
} }
l.lru.Add(hash, true) l.lru.Set(hash, true)
return nil return nil
} }
@ -91,7 +86,7 @@ func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
return err return err
} }
l.lru.Add(hash, true) _ = l.lru.Set(hash, true)
return nil return nil
} }
@ -119,7 +114,7 @@ func (l *LRUStore) loadExisting(store lister, maxItems int) error {
logrus.Infof("read %d files from disk", len(existing)) logrus.Infof("read %d files from disk", len(existing))
added := 0 added := 0
for _, h := range existing { for _, h := range existing {
l.lru.Add(h, true) l.lru.Set(h, true)
added++ added++
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
break break

View file

@ -87,7 +87,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// hash still exists in lru // hash still exists in lru
assert.True(t, lru.lru.Contains(hash)) assert.True(t, lru.lru.Has(hash))
blob, _, err := lru.Get(hash) blob, _, err := lru.Get(hash)
assert.Nil(t, blob) assert.Nil(t, blob)
@ -96,7 +96,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
reflect.TypeOf(err).String(), err.Error()) reflect.TypeOf(err).String(), err.Error())
// lru.Get() removes hash if underlying store doesn't have it // lru.Get() removes hash if underlying store doesn't have it
assert.False(t, lru.lru.Contains(hash)) assert.False(t, lru.lru.Has(hash))
} }
func TestLRUStore_loadExisting(t *testing.T) { func TestLRUStore_loadExisting(t *testing.T) {