From 69fa06420bd9fb5be8e25a6ee7f3d147c7caece1 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 21 Oct 2020 17:31:15 -0400 Subject: [PATCH 01/13] LRU cache for disk store, abstract fs in disk store for testing --- cmd/getstream.go | 2 +- cmd/reflector.go | 32 ++++---- go.mod | 5 +- go.sum | 9 +++ internal/metrics/metrics.go | 15 ++-- store/disk.go | 155 +++++++++++++----------------------- store/disk_test.go | 109 +++++++++++++++++++++++++ 7 files changed, 208 insertions(+), 119 deletions(-) create mode 100644 store/disk_test.go diff --git a/cmd/getstream.go b/cmd/getstream.go index f3f6d8f..355824c 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -30,7 +30,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) { s := store.NewCachingBlobStore( peer.NewStore(peer.StoreOpts{Address: addr}), - store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2), + store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 1000, 2), ) wd, err := os.Getwd() diff --git a/cmd/reflector.go b/cmd/reflector.go index f071f60..7a97e72 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -19,18 +19,21 @@ import ( "github.com/spf13/cobra" ) -var reflectorCmdCacheDir string -var tcpPeerPort int -var http3PeerPort int -var receiverPort int -var metricsPort int -var disableUploads bool -var disableBlocklist bool -var proxyAddress string -var proxyPort string -var proxyProtocol string -var useDB bool -var cloudFrontEndpoint string +var ( + tcpPeerPort int + http3PeerPort int + receiverPort int + metricsPort int + disableUploads bool + disableBlocklist bool + proxyAddress string + proxyPort string + proxyProtocol string + useDB bool + cloudFrontEndpoint string + reflectorCmdCacheDir string + reflectorCmdCacheMaxBlobs int +) func init() { var cmd = &cobra.Command{ @@ -38,7 +41,6 @@ func init() { Short: "Run reflector server", Run: reflectorCmd, } - cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") @@ -50,6 +52,8 @@ func init() { cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") + cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") + cmd.Flags().IntVar(&reflectorCmdCacheMaxBlobs, "cache-max-blobs", 0, "if cache is enabled, this option sets the max blobs the cache will hold") rootCmd.AddCommand(cmd) } @@ -113,7 +117,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { if err != nil { log.Fatal(err) } - blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2)) + blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, reflectorCmdCacheMaxBlobs, 2)) } peerServer := peer.NewServer(blobStore) diff --git a/go.mod b/go.mod index 4002591..b7de5f7 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/google/gops v0.3.7 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/golang-lru v0.5.3 // indirect + github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/memberlist v0.1.4 // indirect github.com/hashicorp/serf v0.8.2 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf @@ -27,9 +27,11 @@ require ( github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 github.com/prometheus/client_golang v0.9.2 github.com/sirupsen/logrus v1.4.2 + github.com/spf13/afero v1.4.1 github.com/spf13/cast v1.3.0 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect + github.com/stretchr/testify v1.4.0 github.com/volatiletech/null v8.0.0+incompatible go.uber.org/atomic v1.5.1 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 @@ -37,6 +39,7 @@ require ( golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect google.golang.org/appengine v1.6.2 // indirect + gotest.tools v2.2.0+incompatible ) go 1.15 diff --git a/go.sum b/go.sum index 4e9282a..01d1580 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= @@ -192,6 +194,7 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -280,6 +283,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= @@ -346,6 +350,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I= github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ= +github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= @@ -394,6 +400,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -471,6 +478,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4d70cd1..17ce4e0 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -92,6 +92,12 @@ const ( ) var ( + ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Name: "error_total", + Help: "Total number of errors", + }, []string{labelDirection, labelErrorType}) + BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "blob_download_total", @@ -107,6 +113,7 @@ var ( Name: "http3_blob_download_total", Help: "Total number of blobs downloaded from reflector through QUIC protocol", }) + CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "cache_hit_total", @@ -128,6 +135,7 @@ var ( Name: "cache_waiting_requests_total", Help: "How many cache requests are waiting for an in-flight origin request", }) + BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "blob_upload_total", @@ -138,16 +146,13 @@ var ( Name: "sdblob_upload_total", Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", }) + RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Name: "speed_mbps", Help: "Speed of blob retrieval", }, []string{MtrLabelSource}) - ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: ns, - Name: "error_total", - Help: "Total number of errors", - }, []string{labelDirection, labelErrorType}) + MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "tcp_in_bytes", diff --git a/store/disk.go b/store/disk.go index 82e40ac..5c4d2ff 100644 --- a/store/disk.go +++ b/store/disk.go @@ -5,32 +5,40 @@ import ( "os" "path" "path/filepath" - "sort" - "syscall" - "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/spf13/afero" - log "github.com/sirupsen/logrus" + lru "github.com/hashicorp/golang-lru" ) // DiskBlobStore stores blobs on a local disk type DiskBlobStore struct { // the location of blobs on disk blobDir string + // max number of blobs to store + maxBlobs int // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. prefixLength int - initialized bool - lastChecked time.Time - diskCleanupBusy chan bool + // lru cache + lru *lru.Cache + // filesystem abstraction + fs afero.Fs + + // true if initOnce ran, false otherwise + initialized bool } // NewDiskBlobStore returns an initialized file disk store pointer. -func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { - dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)} - dbs.diskCleanupBusy <- true +func NewDiskBlobStore(dir string, maxBlobs, prefixLength int) *DiskBlobStore { + dbs := DiskBlobStore{ + blobDir: dir, + maxBlobs: maxBlobs, + prefixLength: prefixLength, + fs: afero.NewOsFs(), + } return &dbs } @@ -41,27 +49,12 @@ func (d *DiskBlobStore) dir(hash string) string { return path.Join(d.blobDir, hash[:d.prefixLength]) } -// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path -func (d *DiskBlobStore) getUsedSpace() (float32, error) { - var stat syscall.Statfs_t - err := syscall.Statfs(d.blobDir, &stat) - if err != nil { - return 0, err - } - // Available blocks * size per block = available space in bytes - all := stat.Blocks * uint64(stat.Bsize) - free := stat.Bfree * uint64(stat.Bsize) - used := all - free - - return float32(used) / float32(all), nil -} - func (d *DiskBlobStore) path(hash string) string { return path.Join(d.dir(hash), hash) } func (d *DiskBlobStore) ensureDirExists(dir string) error { - return errors.Err(os.MkdirAll(dir, 0755)) + return errors.Err(d.fs.MkdirAll(dir, 0755)) } func (d *DiskBlobStore) initOnce() error { @@ -74,6 +67,19 @@ func (d *DiskBlobStore) initOnce() error { return err } + l, err := lru.NewWithEvict(d.maxBlobs, func(key interface{}, value interface{}) { + _ = d.fs.Remove(d.path(key.(string))) // TODO: log this error. may happen if file is gone but cache entry still there? + }) + if err != nil { + return errors.Err(err) + } + d.lru = l + + err = d.loadExisting() + if err != nil { + return err + } + d.initialized = true return nil } @@ -85,14 +91,7 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) { return false, err } - _, err = os.Stat(d.path(hash)) - if err != nil { - if os.IsNotExist(err) { - return false, nil - } - return false, err - } - return true, nil + return d.lru.Contains(hash), nil } // Get returns the blob or an error if the blob doesn't exist. @@ -102,9 +101,15 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { return nil, err } - file, err := os.Open(d.path(hash)) + _, has := d.lru.Get(hash) + if !has { + return nil, errors.Err(ErrBlobNotFound) + } + + file, err := d.fs.Open(d.path(hash)) if err != nil { if os.IsNotExist(err) { + d.lru.Remove(hash) return nil, errors.Err(ErrBlobNotFound) } return nil, err @@ -126,7 +131,14 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { return err } - return ioutil.WriteFile(d.path(hash), blob, 0644) + err = afero.WriteFile(d.fs, d.path(hash), blob, 0644) + if err != nil { + return errors.Err(err) + } + + d.lru.Add(hash, true) + + return nil } // PutSD stores the sd blob on the disk @@ -141,83 +153,30 @@ func (d *DiskBlobStore) Delete(hash string) error { return err } - has, err := d.Has(hash) + d.lru.Remove(hash) + return nil +} + +// loadExisting scans the blobDir and imports existing blobs into lru cache +func (d *DiskBlobStore) loadExisting() error { + dirs, err := afero.ReadDir(d.fs, d.blobDir) if err != nil { return err } - if !has { - return nil - } - return os.Remove(d.path(hash)) -} - -func (d *DiskBlobStore) ensureDiskSpace() { - defer func() { - d.lastChecked = time.Now() - d.diskCleanupBusy <- true - }() - - used, err := d.getUsedSpace() - if err != nil { - log.Errorln(err.Error()) - return - } - log.Infof("disk usage: %.2f%%\n", used*100) - if used > 0.90 { - log.Infoln("over 0.90, cleaning up") - err = d.WipeOldestBlobs() - if err != nil { - log.Errorln(err.Error()) - return - } - log.Infoln("Done cleaning up") - } -} - -func (d *DiskBlobStore) WipeOldestBlobs() (err error) { - dirs, err := ioutil.ReadDir(d.blobDir) - if err != nil { - return err - } - type datedFile struct { - Atime time.Time - File *os.FileInfo - FullPath string - } - datedFiles := make([]datedFile, 0, 5000) for _, dir := range dirs { if dir.IsDir() { - files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name())) + files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name())) if err != nil { return err } for _, file := range files { if file.Mode().IsRegular() && !file.IsDir() { - datedFiles = append(datedFiles, datedFile{ - Atime: atime(file), - File: &file, - FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()), - }) + d.lru.Add(file.Name(), true) } } } } - sort.Slice(datedFiles, func(i, j int) bool { - return datedFiles[i].Atime.Before(datedFiles[j].Atime) - }) - //delete the first 50000 blobs - for i, df := range datedFiles { - if i >= 50000 { - break - } - log.Infoln(df.FullPath) - log.Infoln(df.Atime.String()) - err = os.Remove(df.FullPath) - if err != nil { - return err - } - } return nil } diff --git a/store/disk_test.go b/store/disk_test.go new file mode 100644 index 0000000..d4f6a80 --- /dev/null +++ b/store/disk_test.go @@ -0,0 +1,109 @@ +package store + +import ( + "os" + "reflect" + "testing" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const cacheMaxBlobs = 3 + +func memDiskStore() *DiskBlobStore { + d := NewDiskBlobStore("/", cacheMaxBlobs, 2) + d.fs = afero.NewMemMapFs() + return d +} + +func countOnDisk(t *testing.T, fs afero.Fs) int { + t.Helper() + count := 0 + afero.Walk(fs, "/", func(path string, info os.FileInfo, err error) error { + if err != nil { + t.Fatal(err) + } + if !info.IsDir() { + count++ + } + return nil + }) + return count +} + +func TestDiskBlobStore_LRU(t *testing.T) { + d := memDiskStore() + b := []byte("x") + err := d.Put("one", b) + require.NoError(t, err) + err = d.Put("two", b) + require.NoError(t, err) + err = d.Put("three", b) + require.NoError(t, err) + err = d.Put("four", b) + require.NoError(t, err) + err = d.Put("five", b) + require.NoError(t, err) + + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + + for k, v := range map[string]bool{ + "one": false, + "two": false, + "three": true, + "four": true, + "five": true, + "six": false, + } { + has, err := d.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + d.Get("three") // touch so it stays in cache + d.Put("six", b) + + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + + for k, v := range map[string]bool{ + "one": false, + "two": false, + "three": true, + "four": false, + "five": true, + "six": true, + } { + has, err := d.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + err = d.Delete("three") + assert.NoError(t, err) + err = d.Delete("five") + assert.NoError(t, err) + err = d.Delete("six") + assert.NoError(t, err) + assert.Equal(t, 0, countOnDisk(t, d.fs)) +} + +func TestDiskBlobStore_FileMissingOnDisk(t *testing.T) { + d := memDiskStore() + hash := "hash" + b := []byte("this is a blob of stuff") + err := d.Put(hash, b) + require.NoError(t, err) + + err = d.fs.Remove("/ha/hash") + require.NoError(t, err) + + blob, err := d.Get(hash) + assert.Nil(t, blob) + assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", + reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), + reflect.TypeOf(err).String(), err.Error()) +} -- 2.45.3 From c6b53792c8cf3c327532e1d6831200330622b256 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 22 Oct 2020 12:18:31 -0400 Subject: [PATCH 02/13] separate disk and lru behavior --- store/disk.go | 82 +++++++++----------- store/lru.go | 113 ++++++++++++++++++++++++++++ store/{disk_test.go => lru_test.go} | 66 +++++++++------- store/store.go | 6 ++ 4 files changed, 192 insertions(+), 75 deletions(-) create mode 100644 store/lru.go rename store/{disk_test.go => lru_test.go} (52%) diff --git a/store/disk.go b/store/disk.go index 5c4d2ff..dfbc0f2 100644 --- a/store/disk.go +++ b/store/disk.go @@ -8,22 +8,17 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - "github.com/spf13/afero" - lru "github.com/hashicorp/golang-lru" + "github.com/spf13/afero" ) // DiskBlobStore stores blobs on a local disk type DiskBlobStore struct { // the location of blobs on disk blobDir string - // max number of blobs to store - maxBlobs int // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. prefixLength int - // lru cache - lru *lru.Cache // filesystem abstraction fs afero.Fs @@ -32,14 +27,12 @@ type DiskBlobStore struct { } // NewDiskBlobStore returns an initialized file disk store pointer. -func NewDiskBlobStore(dir string, maxBlobs, prefixLength int) *DiskBlobStore { - dbs := DiskBlobStore{ +func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { + return &DiskBlobStore{ blobDir: dir, - maxBlobs: maxBlobs, prefixLength: prefixLength, fs: afero.NewOsFs(), } - return &dbs } func (d *DiskBlobStore) dir(hash string) string { @@ -67,19 +60,6 @@ func (d *DiskBlobStore) initOnce() error { return err } - l, err := lru.NewWithEvict(d.maxBlobs, func(key interface{}, value interface{}) { - _ = d.fs.Remove(d.path(key.(string))) // TODO: log this error. may happen if file is gone but cache entry still there? - }) - if err != nil { - return errors.Err(err) - } - d.lru = l - - err = d.loadExisting() - if err != nil { - return err - } - d.initialized = true return nil } @@ -91,7 +71,14 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) { return false, err } - return d.lru.Contains(hash), nil + _, err = d.fs.Stat(d.path(hash)) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, errors.Err(err) + } + return true, nil } // Get returns the blob or an error if the blob doesn't exist. @@ -101,22 +88,17 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { return nil, err } - _, has := d.lru.Get(hash) - if !has { - return nil, errors.Err(ErrBlobNotFound) - } - file, err := d.fs.Open(d.path(hash)) if err != nil { if os.IsNotExist(err) { - d.lru.Remove(hash) return nil, errors.Err(ErrBlobNotFound) } return nil, err } defer file.Close() - return ioutil.ReadAll(file) + blob, err := ioutil.ReadAll(file) + return blob, errors.Err(err) } // Put stores the blob on disk @@ -132,13 +114,7 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { } err = afero.WriteFile(d.fs, d.path(hash), blob, 0644) - if err != nil { - return errors.Err(err) - } - - d.lru.Add(hash, true) - - return nil + return errors.Err(err) } // PutSD stores the sd blob on the disk @@ -153,30 +129,40 @@ func (d *DiskBlobStore) Delete(hash string) error { return err } - d.lru.Remove(hash) - return nil -} - -// loadExisting scans the blobDir and imports existing blobs into lru cache -func (d *DiskBlobStore) loadExisting() error { - dirs, err := afero.ReadDir(d.fs, d.blobDir) + has, err := d.Has(hash) if err != nil { return err } + if !has { + return nil + } + + err = d.fs.Remove(d.path(hash)) + return errors.Err(err) +} + +// list returns a slice of blobs that already exist in the blobDir +func (d *DiskBlobStore) list() ([]string, error) { + dirs, err := afero.ReadDir(d.fs, d.blobDir) + if err != nil { + return nil, err + } + + var existing []string for _, dir := range dirs { if dir.IsDir() { files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name())) if err != nil { - return err + return nil, err } for _, file := range files { if file.Mode().IsRegular() && !file.IsDir() { - d.lru.Add(file.Name(), true) + existing = append(existing, file.Name()) } } } } - return nil + return existing, nil } diff --git a/store/lru.go b/store/lru.go new file mode 100644 index 0000000..22386fa --- /dev/null +++ b/store/lru.go @@ -0,0 +1,113 @@ +package store + +import ( + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + golru "github.com/hashicorp/golang-lru" +) + +// LRUStore adds a max cache size and LRU eviction to a BlobStore +type LRUStore struct { + // underlying store + store BlobStore + // lru implementation + lru *golru.Cache +} + +// NewLRUStore initialize a new LRUStore +func NewLRUStore(store BlobStore, maxItems int) *LRUStore { + lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { + _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there + }) + if err != nil { + panic(err) + } + + l := &LRUStore{ + store: store, + lru: lru, + } + + if lstr, ok := store.(lister); ok { + err = l.loadExisting(lstr, maxItems) + if err != nil { + panic(err) // TODO: what should happen here? panic? return nil? just keep going? + } + } + + return l +} + +// Has returns whether the blob is in the store, without updating the recent-ness. +func (l *LRUStore) Has(hash string) (bool, error) { + return l.lru.Contains(hash), nil +} + +// Get returns the blob or an error if the blob doesn't exist. +func (l *LRUStore) Get(hash string) (stream.Blob, error) { + _, has := l.lru.Get(hash) + if !has { + return nil, errors.Err(ErrBlobNotFound) + } + blob, err := l.store.Get(hash) + if errors.Is(err, ErrBlobNotFound) { + // Blob disappeared from underlying store + l.lru.Remove(hash) + } + return blob, err +} + +// Put stores the blob +func (l *LRUStore) Put(hash string, blob stream.Blob) error { + err := l.store.Put(hash, blob) + if err != nil { + return err + } + + l.lru.Add(hash, true) + return nil +} + +// PutSD stores the sd blob +func (l *LRUStore) PutSD(hash string, blob stream.Blob) error { + err := l.store.PutSD(hash, blob) + if err != nil { + return err + } + + l.lru.Add(hash, true) + return nil +} + +// Delete deletes the blob from the store +func (l *LRUStore) Delete(hash string) error { + err := l.store.Delete(hash) + if err != nil { + return err + } + + // This must come after store.Delete() + // Remove triggers onEvict function, which also tries to delete blob from store + // We need to delete it manually first so any errors can be propagated up + l.lru.Remove(hash) + return nil +} + +// loadExisting imports existing blobs from the underlying store into the LRU cache +func (l *LRUStore) loadExisting(store lister, maxItems int) error { + existing, err := store.list() + if err != nil { + return err + } + + added := 0 + for _, h := range existing { + l.lru.Add(h, true) + added++ + if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache + break + } + } + return nil +} diff --git a/store/disk_test.go b/store/lru_test.go similarity index 52% rename from store/disk_test.go rename to store/lru_test.go index d4f6a80..92ddb7d 100644 --- a/store/disk_test.go +++ b/store/lru_test.go @@ -14,16 +14,17 @@ import ( const cacheMaxBlobs = 3 -func memDiskStore() *DiskBlobStore { - d := NewDiskBlobStore("/", cacheMaxBlobs, 2) +func testLRUStore() (*LRUStore, *DiskBlobStore) { + d := NewDiskBlobStore("/", 2) d.fs = afero.NewMemMapFs() - return d + return NewLRUStore(d, 3), d } -func countOnDisk(t *testing.T, fs afero.Fs) int { +func countOnDisk(t *testing.T, disk *DiskBlobStore) int { t.Helper() + count := 0 - afero.Walk(fs, "/", func(path string, info os.FileInfo, err error) error { + afero.Walk(disk.fs, "/", func(path string, info os.FileInfo, err error) error { if err != nil { t.Fatal(err) } @@ -32,24 +33,29 @@ func countOnDisk(t *testing.T, fs afero.Fs) int { } return nil }) + + list, err := disk.list() + require.NoError(t, err) + require.Equal(t, count, len(list)) + return count } -func TestDiskBlobStore_LRU(t *testing.T) { - d := memDiskStore() +func TestLRUStore_Eviction(t *testing.T) { + lru, disk := testLRUStore() b := []byte("x") - err := d.Put("one", b) + err := lru.Put("one", b) require.NoError(t, err) - err = d.Put("two", b) + err = lru.Put("two", b) require.NoError(t, err) - err = d.Put("three", b) + err = lru.Put("three", b) require.NoError(t, err) - err = d.Put("four", b) + err = lru.Put("four", b) require.NoError(t, err) - err = d.Put("five", b) + err = lru.Put("five", b) require.NoError(t, err) - assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk)) for k, v := range map[string]bool{ "one": false, @@ -59,15 +65,15 @@ func TestDiskBlobStore_LRU(t *testing.T) { "five": true, "six": false, } { - has, err := d.Has(k) + has, err := lru.Has(k) assert.NoError(t, err) assert.Equal(t, v, has) } - d.Get("three") // touch so it stays in cache - d.Put("six", b) + lru.Get("three") // touch so it stays in cache + lru.Put("six", b) - assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs)) + assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk)) for k, v := range map[string]bool{ "one": false, @@ -77,33 +83,39 @@ func TestDiskBlobStore_LRU(t *testing.T) { "five": true, "six": true, } { - has, err := d.Has(k) + has, err := lru.Has(k) assert.NoError(t, err) assert.Equal(t, v, has) } - err = d.Delete("three") + err = lru.Delete("three") assert.NoError(t, err) - err = d.Delete("five") + err = lru.Delete("five") assert.NoError(t, err) - err = d.Delete("six") + err = lru.Delete("six") assert.NoError(t, err) - assert.Equal(t, 0, countOnDisk(t, d.fs)) + assert.Equal(t, 0, countOnDisk(t, disk)) } -func TestDiskBlobStore_FileMissingOnDisk(t *testing.T) { - d := memDiskStore() +func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { + lru, disk := testLRUStore() hash := "hash" b := []byte("this is a blob of stuff") - err := d.Put(hash, b) + err := lru.Put(hash, b) require.NoError(t, err) - err = d.fs.Remove("/ha/hash") + err = disk.fs.Remove("/ha/hash") require.NoError(t, err) - blob, err := d.Get(hash) + // hash still exists in lru + assert.True(t, lru.lru.Contains(hash)) + + blob, err := lru.Get(hash) assert.Nil(t, blob) assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), reflect.TypeOf(err).String(), err.Error()) + + // lru.Get() removes hash if underlying store doesn't have it + assert.False(t, lru.lru.Contains(hash)) } diff --git a/store/store.go b/store/store.go index e2614e5..f8f6dd3 100644 --- a/store/store.go +++ b/store/store.go @@ -27,5 +27,11 @@ type Blocklister interface { Wants(hash string) (bool, error) } +// lister is a store that can list cached blobs. This is helpful when an overlay +// cache needs to track blob existence. +type lister interface { + list() ([]string, error) +} + //ErrBlobNotFound is a standard error when a blob is not found in the store. var ErrBlobNotFound = errors.Base("blob not found") -- 2.45.3 From c9fa04043c071ec04aadd6cc2af7ebda6afcde8f Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 22 Oct 2020 13:12:31 -0400 Subject: [PATCH 03/13] rename the stores, add caching to reflector cmd --- cmd/getstream.go | 4 +- cmd/peer.go | 2 +- cmd/reflector.go | 196 ++++++++++++++++++++++++--------------- cmd/start.go | 2 +- cmd/test.go | 2 +- cmd/upload.go | 2 +- peer/server_test.go | 2 +- reflector/server_test.go | 8 +- store/caching.go | 22 ++--- store/caching_test.go | 20 ++-- store/cloudfront.go | 117 +++++++++++------------ store/disk.go | 30 +++--- store/lru_test.go | 6 +- store/memory.go | 20 ++-- store/memory_test.go | 4 +- store/s3.go | 22 ++--- 16 files changed, 252 insertions(+), 207 deletions(-) diff --git a/cmd/getstream.go b/cmd/getstream.go index 355824c..ae586d7 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -28,9 +28,9 @@ func getStreamCmd(cmd *cobra.Command, args []string) { addr := args[0] sdHash := args[1] - s := store.NewCachingBlobStore( + s := store.NewCachingStore( peer.NewStore(peer.StoreOpts{Address: addr}), - store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 1000, 2), + store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2), ) wd, err := os.Getwd() diff --git a/cmd/peer.go b/cmd/peer.go index 88c49e0..3cb2a19 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -29,7 +29,7 @@ func init() { func peerCmd(cmd *cobra.Command, args []string) { var err error - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) peerServer := peer.NewServer(s3) if !peerNoDB { diff --git a/cmd/reflector.go b/cmd/reflector.go index 7a97e72..ddac74b 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" "time" @@ -16,23 +17,24 @@ import ( "github.com/lbryio/reflector.go/store" log "github.com/sirupsen/logrus" + "github.com/spf13/cast" "github.com/spf13/cobra" ) var ( - tcpPeerPort int - http3PeerPort int - receiverPort int - metricsPort int - disableUploads bool - disableBlocklist bool - proxyAddress string - proxyPort string - proxyProtocol string - useDB bool - cloudFrontEndpoint string - reflectorCmdCacheDir string - reflectorCmdCacheMaxBlobs int + tcpPeerPort int + http3PeerPort int + receiverPort int + metricsPort int + disableUploads bool + disableBlocklist bool + proxyAddress string + proxyPort string + proxyProtocol string + useDB bool + cloudFrontEndpoint string + reflectorCmdDiskCache string + reflectorCmdMemCache int ) func init() { @@ -52,96 +54,136 @@ func init() { cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") - cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") - cmd.Flags().IntVar(&reflectorCmdCacheMaxBlobs, "cache-max-blobs", 0, "if cache is enabled, this option sets the max blobs the cache will hold") + cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "", + "enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'") + cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") rootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - var blobStore store.BlobStore - if proxyAddress != "" { - switch proxyProtocol { - case "tcp": - blobStore = peer.NewStore(peer.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - case "http3": - blobStore = http3.NewStore(http3.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - default: - log.Fatalf("specified protocol is not recognized: %s", proxyProtocol) - } - } else { - s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - if cloudFrontEndpoint != "" { - blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store) - } else { - blobStore = s3Store - } - } + // the blocklist logic requires the db backed store to be the outer-most store + underlyingStore := setupStore() + outerStore := wrapWithCache(underlyingStore) - var err error - var reflectorServer *reflector.Server + if !disableUploads { + reflectorServer := reflector.NewServer(underlyingStore) + reflectorServer.Timeout = 3 * time.Minute + reflectorServer.EnableBlocklist = !disableBlocklist - if useDB { - db := new(db.SQL) - db.TrackAccessTime = true - err = db.Connect(globalConfig.DBConn) + err := reflectorServer.Start(":" + strconv.Itoa(receiverPort)) if err != nil { log.Fatal(err) } - - blobStore = store.NewDBBackedStore(blobStore, db) - - //this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work.... - //having this here prevents uploaded blobs from being stored in the disk cache - if !disableUploads { - reflectorServer = reflector.NewServer(blobStore) - reflectorServer.Timeout = 3 * time.Minute - reflectorServer.EnableBlocklist = !disableBlocklist - - err = reflectorServer.Start(":" + strconv.Itoa(receiverPort)) - if err != nil { - log.Fatal(err) - } - } + defer reflectorServer.Shutdown() } - if reflectorCmdCacheDir != "" { - err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm) - if err != nil { - log.Fatal(err) - } - blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, reflectorCmdCacheMaxBlobs, 2)) - } - - peerServer := peer.NewServer(blobStore) - err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) + peerServer := peer.NewServer(outerStore) + err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) if err != nil { log.Fatal(err) } + defer peerServer.Shutdown() - http3PeerServer := http3.NewServer(blobStore) + http3PeerServer := http3.NewServer(outerStore) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) if err != nil { log.Fatal(err) } + defer http3PeerServer.Shutdown() metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() + defer metricsServer.Shutdown() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan - metricsServer.Shutdown() - peerServer.Shutdown() - http3PeerServer.Shutdown() - if reflectorServer != nil { - reflectorServer.Shutdown() - } + // deferred shutdowns happen now +} + +func setupStore() store.BlobStore { + var s store.BlobStore + + if proxyAddress != "" { + switch proxyProtocol { + case "tcp": + s = peer.NewStore(peer.StoreOpts{ + Address: proxyAddress + ":" + proxyPort, + Timeout: 30 * time.Second, + }) + case "http3": + s = http3.NewStore(http3.StoreOpts{ + Address: proxyAddress + ":" + proxyPort, + Timeout: 30 * time.Second, + }) + default: + log.Fatalf("protocol is not recognized: %s", proxyProtocol) + } + } else { + s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + if cloudFrontEndpoint != "" { + s = store.NewCloudFrontStore(s3Store, cloudFrontEndpoint) + } else { + s = s3Store + } + } + + if useDB { + db := new(db.SQL) + db.TrackAccessTime = true + err := db.Connect(globalConfig.DBConn) + if err != nil { + log.Fatal(err) + } + + s = store.NewDBBackedStore(s, db) + } + + return s +} + +func wrapWithCache(s store.BlobStore) store.BlobStore { + wrapped := s + + diskCacheMaxSize, diskCachePath := diskCacheParams() + if diskCacheMaxSize > 0 { + err := os.MkdirAll(diskCachePath, os.ModePerm) + if err != nil { + log.Fatal(err) + } + wrapped = store.NewCachingStore(wrapped, + store.NewLRUStore(store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize)) + } + + if reflectorCmdMemCache > 0 { + wrapped = store.NewCachingStore(wrapped, + store.NewLRUStore(store.NewMemoryStore(), reflectorCmdMemCache)) + } + + return wrapped +} + +func diskCacheParams() (int, string) { + if reflectorCmdDiskCache == "" { + return 0, "" + } + + parts := strings.Split(reflectorCmdDiskCache, ":") + if len(parts) != 2 { + log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string") + } + + maxSize := cast.ToInt(parts[0]) + if maxSize <= 0 { + log.Fatalf("--disk-cache max size must be more than 0") + } + + path := parts[1] + if len(path) == 0 || path[0] != '/' { + log.Fatalf("--disk-cache path must start with '/'") + } + + return maxSize, path } diff --git a/cmd/start.go b/cmd/start.go index a82d597..4d4a56b 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) { db := new(db.SQL) err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) comboStore := store.NewDBBackedStore(s3, db) conf := prism.DefaultConf() diff --git a/cmd/test.go b/cmd/test.go index 051ddf7..e611b22 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -29,7 +29,7 @@ func init() { func testCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - memStore := store.NewMemoryBlobStore() + memStore := store.NewMemoryStore() reflectorServer := reflector.NewServer(memStore) reflectorServer.Timeout = 3 * time.Minute diff --git a/cmd/upload.go b/cmd/upload.go index 006f007..636e824 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { checkErr(err) st := store.NewDBBackedStore( - store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), + store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), db) uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload) diff --git a/peer/server_test.go b/peer/server_test.go index 2e1d9bc..7c9b5f2 100644 --- a/peer/server_test.go +++ b/peer/server_test.go @@ -34,7 +34,7 @@ var availabilityRequests = []pair{ } func getServer(t *testing.T, withBlobs bool) *Server { - st := store.NewMemoryBlobStore() + st := store.NewMemoryStore() if withBlobs { for k, v := range blobs { err := st.Put(k, v) diff --git a/reflector/server_test.go b/reflector/server_test.go index 1dd7e83..1039417 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) { t.Fatal(err) } - srv := NewServer(store.NewMemoryBlobStore()) + srv := NewServer(store.NewMemoryStore()) err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { t.Fatal(err) @@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) { t.Fatal(err) } - srv := NewServer(store.NewMemoryBlobStore()) + srv := NewServer(store.NewMemoryStore()) srv.Timeout = testTimeout err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { @@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) { //} type mockPartialStore struct { - *store.MemoryBlobStore + *store.MemoryStore missing []string } @@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) { missing[i] = bits.Rand().String() } - st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing}) + st := store.BlobStore(&mockPartialStore{MemoryStore: store.NewMemoryStore(), missing: missing}) if _, ok := st.(neededBlobChecker); !ok { t.Fatal("mock does not implement the relevant interface") } diff --git a/store/caching.go b/store/caching.go index 4dd5b6b..e4ff6b3 100644 --- a/store/caching.go +++ b/store/caching.go @@ -11,22 +11,22 @@ import ( "golang.org/x/sync/singleflight" ) -// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance. +// CachingStore combines two stores, typically a local and a remote store, to improve performance. // Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they // are retrieved from the origin and cached. Puts are cached and also forwarded to the origin. -type CachingBlobStore struct { +type CachingStore struct { origin, cache BlobStore sf *singleflight.Group } -// NewCachingBlobStore makes a new caching disk store and returns a pointer to it. -func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore { - return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)} +// NewCachingStore makes a new caching disk store and returns a pointer to it. +func NewCachingStore(origin, cache BlobStore) *CachingStore { + return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)} } // Has checks the cache and then the origin for a hash. It returns true if either store has it. -func (c *CachingBlobStore) Has(hash string) (bool, error) { +func (c *CachingStore) Has(hash string) (bool, error) { has, err := c.cache.Has(hash) if has || err != nil { return has, err @@ -36,7 +36,7 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) { // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // from the origin, it is also stored in the cache. -func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { +func (c *CachingStore) Get(hash string) (stream.Blob, error) { start := time.Now() blob, err := c.cache.Get(hash) if err == nil || !errors.Is(err, ErrBlobNotFound) { @@ -52,7 +52,7 @@ func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { // getFromOrigin ensures that only one Get per hash is sent to the origin at a time, // thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem -func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) { +func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) { metrics.CacheWaitingRequestsCount.Inc() defer metrics.CacheWaitingRequestsCount.Dec() originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) { @@ -78,7 +78,7 @@ func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) { } // Put stores the blob in the origin and the cache -func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { +func (c *CachingStore) Put(hash string, blob stream.Blob) error { err := c.origin.Put(hash, blob) if err != nil { return err @@ -87,7 +87,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob in the origin and the cache -func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { +func (c *CachingStore) PutSD(hash string, blob stream.Blob) error { err := c.origin.PutSD(hash, blob) if err != nil { return err @@ -96,7 +96,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { } // Delete deletes the blob from the origin and the cache -func (c *CachingBlobStore) Delete(hash string) error { +func (c *CachingStore) Delete(hash string) error { err := c.origin.Delete(hash) if err != nil { return err diff --git a/store/caching_test.go b/store/caching_test.go index 13f44a2..f56c833 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -10,9 +10,9 @@ import ( ) func TestCachingBlobStore_Put(t *testing.T) { - origin := NewMemoryBlobStore() - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) + origin := NewMemoryStore() + cache := NewMemoryStore() + s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -40,9 +40,9 @@ func TestCachingBlobStore_Put(t *testing.T) { } func TestCachingBlobStore_CacheMiss(t *testing.T) { - origin := NewMemoryBlobStore() - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) + origin := NewMemoryStore() + cache := NewMemoryStore() + s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -79,8 +79,8 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) { func TestCachingBlobStore_ThunderingHerd(t *testing.T) { storeDelay := 100 * time.Millisecond origin := NewSlowBlobStore(storeDelay) - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) + cache := NewMemoryStore() + s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -129,13 +129,13 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) { // SlowBlobStore adds a delay to each request type SlowBlobStore struct { - mem *MemoryBlobStore + mem *MemoryStore delay time.Duration } func NewSlowBlobStore(delay time.Duration) *SlowBlobStore { return &SlowBlobStore{ - mem: NewMemoryBlobStore(), + mem: NewMemoryStore(), delay: delay, } } diff --git a/store/cloudfront.go b/store/cloudfront.go index fbfafec..c4adb3f 100644 --- a/store/cloudfront.go +++ b/store/cloudfront.go @@ -1,109 +1,112 @@ package store import ( + "io" "io/ioutil" "net/http" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + log "github.com/sirupsen/logrus" ) -// CloudFrontBlobStore is an CloudFront backed store (retrieval only) -type CloudFrontBlobStore struct { - cfEndpoint string - s3Store *S3BlobStore +// CloudFrontStore wraps an S3 store. Reads go to Cloudfront, writes go to S3. +type CloudFrontStore struct { + s3 *S3Store + endpoint string // cloudflare endpoint } -// NewS3BlobStore returns an initialized S3 store pointer. -func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore { - return &CloudFrontBlobStore{ - cfEndpoint: cloudFrontEndpoint, - s3Store: S3Store, +// NewCloudFrontStore returns an initialized CloudFrontStore store pointer. +// NOTE: It panics if S3Store is nil. +func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore { + if s3 == nil { + panic("S3Store must not be nil") + } + + return &CloudFrontStore{ + endpoint: cfEndpoint, + s3: s3, } } -// Has returns T/F or Error if the store contains the blob. -func (s *CloudFrontBlobStore) Has(hash string) (bool, error) { - url := s.cfEndpoint + hash - - req, err := http.NewRequest("HEAD", url, nil) +// Has checks if the hash is in the store. +func (c *CloudFrontStore) Has(hash string) (bool, error) { + status, body, err := c.cfRequest(http.MethodHead, hash) if err != nil { - return false, errors.Err(err) + return false, err } - req.Header.Add("User-Agent", "reflector.go/"+meta.Version) - res, err := http.DefaultClient.Do(req) - if err != nil { - return false, errors.Err(err) - } - defer res.Body.Close() + defer body.Close() - switch res.StatusCode { + switch status { case http.StatusNotFound, http.StatusForbidden: return false, nil case http.StatusOK: return true, nil default: - return false, errors.Err(res.Status) + return false, errors.Err("unexpected status %d", status) } } -// Get returns the blob slice if present or errors. -func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) { - url := s.cfEndpoint + hash +// Get gets the blob from Cloudfront. +func (c *CloudFrontStore) Get(hash string) (stream.Blob, error) { log.Debugf("Getting %s from S3", hash[:8]) defer func(t time.Time) { log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) }(time.Now()) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, errors.Err(err) - } - req.Header.Add("User-Agent", "reflector.go/"+meta.Version) - res, err := http.DefaultClient.Do(req) - if err != nil { - return nil, errors.Err(err) - } - defer res.Body.Close() - switch res.StatusCode { + status, body, err := c.cfRequest(http.MethodGet, hash) + if err != nil { + return nil, err + } + defer body.Close() + + switch status { case http.StatusNotFound, http.StatusForbidden: return nil, errors.Err(ErrBlobNotFound) case http.StatusOK: - b, err := ioutil.ReadAll(res.Body) + b, err := ioutil.ReadAll(body) if err != nil { return nil, errors.Err(err) } metrics.MtrInBytesS3.Add(float64(len(b))) return b, nil default: - return nil, errors.Err(res.Status) + return nil, errors.Err("unexpected status %d", status) } } -// Put stores the blob on S3 or errors if S3 store is not present. -func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error { - if s.s3Store != nil { - return s.s3Store.Put(hash, blob) +func (c *CloudFrontStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { + url := c.endpoint + hash + req, err := http.NewRequest(method, url, nil) + if err != nil { + return 0, nil, errors.Err(err) } - return errors.Err("not implemented in cloudfront store") + req.Header.Add("User-Agent", "reflector.go/"+meta.Version) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return 0, nil, errors.Err(err) + } + + return res.StatusCode, res.Body, nil } -// PutSD stores the sd blob on S3 or errors if S3 store is not present. -func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error { - if s.s3Store != nil { - return s.s3Store.PutSD(hash, blob) - } - return errors.Err("not implemented in cloudfront store") +// Put stores the blob on S3 +func (c *CloudFrontStore) Put(hash string, blob stream.Blob) error { + return c.s3.Put(hash, blob) } -func (s *CloudFrontBlobStore) Delete(hash string) error { - if s.s3Store != nil { - return s.s3Store.Delete(hash) - } - return errors.Err("not implemented in cloudfront store") +// PutSD stores the sd blob on S3 +func (c *CloudFrontStore) PutSD(hash string, blob stream.Blob) error { + return c.s3.PutSD(hash, blob) +} + +// Delete deletes the blob from S3 +func (c *CloudFrontStore) Delete(hash string) error { + return c.s3.Delete(hash) } diff --git a/store/disk.go b/store/disk.go index dfbc0f2..71c8058 100644 --- a/store/disk.go +++ b/store/disk.go @@ -12,8 +12,8 @@ import ( "github.com/spf13/afero" ) -// DiskBlobStore stores blobs on a local disk -type DiskBlobStore struct { +// DiskStore stores blobs on a local disk +type DiskStore struct { // the location of blobs on disk blobDir string // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. @@ -26,31 +26,31 @@ type DiskBlobStore struct { initialized bool } -// NewDiskBlobStore returns an initialized file disk store pointer. -func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { - return &DiskBlobStore{ +// NewDiskStore returns an initialized file disk store pointer. +func NewDiskStore(dir string, prefixLength int) *DiskStore { + return &DiskStore{ blobDir: dir, prefixLength: prefixLength, fs: afero.NewOsFs(), } } -func (d *DiskBlobStore) dir(hash string) string { +func (d *DiskStore) dir(hash string) string { if d.prefixLength <= 0 || len(hash) < d.prefixLength { return d.blobDir } return path.Join(d.blobDir, hash[:d.prefixLength]) } -func (d *DiskBlobStore) path(hash string) string { +func (d *DiskStore) path(hash string) string { return path.Join(d.dir(hash), hash) } -func (d *DiskBlobStore) ensureDirExists(dir string) error { +func (d *DiskStore) ensureDirExists(dir string) error { return errors.Err(d.fs.MkdirAll(dir, 0755)) } -func (d *DiskBlobStore) initOnce() error { +func (d *DiskStore) initOnce() error { if d.initialized { return nil } @@ -65,7 +65,7 @@ func (d *DiskBlobStore) initOnce() error { } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. -func (d *DiskBlobStore) Has(hash string) (bool, error) { +func (d *DiskStore) Has(hash string) (bool, error) { err := d.initOnce() if err != nil { return false, err @@ -82,7 +82,7 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) { } // Get returns the blob or an error if the blob doesn't exist. -func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { +func (d *DiskStore) Get(hash string) (stream.Blob, error) { err := d.initOnce() if err != nil { return nil, err @@ -102,7 +102,7 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob on disk -func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { +func (d *DiskStore) Put(hash string, blob stream.Blob) error { err := d.initOnce() if err != nil { return err @@ -118,12 +118,12 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob on the disk -func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error { +func (d *DiskStore) PutSD(hash string, blob stream.Blob) error { return d.Put(hash, blob) } // Delete deletes the blob from the store -func (d *DiskBlobStore) Delete(hash string) error { +func (d *DiskStore) Delete(hash string) error { err := d.initOnce() if err != nil { return err @@ -142,7 +142,7 @@ func (d *DiskBlobStore) Delete(hash string) error { } // list returns a slice of blobs that already exist in the blobDir -func (d *DiskBlobStore) list() ([]string, error) { +func (d *DiskStore) list() ([]string, error) { dirs, err := afero.ReadDir(d.fs, d.blobDir) if err != nil { return nil, err diff --git a/store/lru_test.go b/store/lru_test.go index 92ddb7d..c95ca6b 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -14,13 +14,13 @@ import ( const cacheMaxBlobs = 3 -func testLRUStore() (*LRUStore, *DiskBlobStore) { - d := NewDiskBlobStore("/", 2) +func testLRUStore() (*LRUStore, *DiskStore) { + d := NewDiskStore("/", 2) d.fs = afero.NewMemMapFs() return NewLRUStore(d, 3), d } -func countOnDisk(t *testing.T, disk *DiskBlobStore) int { +func countOnDisk(t *testing.T, disk *DiskStore) int { t.Helper() count := 0 diff --git a/store/memory.go b/store/memory.go index 27dbc0f..c50f282 100644 --- a/store/memory.go +++ b/store/memory.go @@ -5,25 +5,25 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -// MemoryBlobStore is an in memory only blob store with no persistence. -type MemoryBlobStore struct { +// MemoryStore is an in memory only blob store with no persistence. +type MemoryStore struct { blobs map[string]stream.Blob } -func NewMemoryBlobStore() *MemoryBlobStore { - return &MemoryBlobStore{ +func NewMemoryStore() *MemoryStore { + return &MemoryStore{ blobs: make(map[string]stream.Blob), } } // Has returns T/F if the blob is currently stored. It will never error. -func (m *MemoryBlobStore) Has(hash string) (bool, error) { +func (m *MemoryStore) Has(hash string) (bool, error) { _, ok := m.blobs[hash] return ok, nil } // Get returns the blob byte slice if present and errors if the blob is not found. -func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { +func (m *MemoryStore) Get(hash string) (stream.Blob, error) { blob, ok := m.blobs[hash] if !ok { return nil, errors.Err(ErrBlobNotFound) @@ -32,23 +32,23 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob in memory -func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error { +func (m *MemoryStore) Put(hash string, blob stream.Blob) error { m.blobs[hash] = blob return nil } // PutSD stores the sd blob in memory -func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error { +func (m *MemoryStore) PutSD(hash string, blob stream.Blob) error { return m.Put(hash, blob) } // Delete deletes the blob from the store -func (m *MemoryBlobStore) Delete(hash string) error { +func (m *MemoryStore) Delete(hash string) error { delete(m.blobs, hash) return nil } // Debug returns the blobs in memory. It's useful for testing and debugging. -func (m *MemoryBlobStore) Debug() map[string]stream.Blob { +func (m *MemoryStore) Debug() map[string]stream.Blob { return m.blobs } diff --git a/store/memory_test.go b/store/memory_test.go index e359803..892c735 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -8,7 +8,7 @@ import ( ) func TestMemoryBlobStore_Put(t *testing.T) { - s := NewMemoryBlobStore() + s := NewMemoryStore() blob := []byte("abcdefg") err := s.Put("abc", blob) if err != nil { @@ -17,7 +17,7 @@ func TestMemoryBlobStore_Put(t *testing.T) { } func TestMemoryBlobStore_Get(t *testing.T) { - s := NewMemoryBlobStore() + s := NewMemoryStore() hash := "abc" blob := []byte("abcdefg") err := s.Put(hash, blob) diff --git a/store/s3.go b/store/s3.go index e08b910..2189ef1 100644 --- a/store/s3.go +++ b/store/s3.go @@ -18,8 +18,8 @@ import ( log "github.com/sirupsen/logrus" ) -// S3BlobStore is an S3 store -type S3BlobStore struct { +// S3Store is an S3 store +type S3Store struct { awsID string awsSecret string region string @@ -28,9 +28,9 @@ type S3BlobStore struct { session *session.Session } -// NewS3BlobStore returns an initialized S3 store pointer. -func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { - return &S3BlobStore{ +// NewS3Store returns an initialized S3 store pointer. +func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store { + return &S3Store{ awsID: awsID, awsSecret: awsSecret, region: region, @@ -38,7 +38,7 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { } } -func (s *S3BlobStore) initOnce() error { +func (s *S3Store) initOnce() error { if s.session != nil { return nil } @@ -56,7 +56,7 @@ func (s *S3BlobStore) initOnce() error { } // Has returns T/F or Error ( from S3 ) if the store contains the blob. -func (s *S3BlobStore) Has(hash string) (bool, error) { +func (s *S3Store) Has(hash string) (bool, error) { err := s.initOnce() if err != nil { return false, err @@ -77,7 +77,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) { } // Get returns the blob slice if present or errors on S3. -func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { +func (s *S3Store) Get(hash string) (stream.Blob, error) { //Todo-Need to handle error for blob doesn't exist for consistency. err := s.initOnce() if err != nil { @@ -110,7 +110,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { +func (s *S3Store) Put(hash string, blob stream.Blob) error { err := s.initOnce() if err != nil { return err @@ -133,12 +133,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error { +func (s *S3Store) PutSD(hash string, blob stream.Blob) error { //Todo - handle missing stream for consistency return s.Put(hash, blob) } -func (s *S3BlobStore) Delete(hash string) error { +func (s *S3Store) Delete(hash string) error { err := s.initOnce() if err != nil { return err -- 2.45.3 From 3608971f0bff7db5d84dbbf202e43255b2862a29 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 22 Oct 2020 13:49:02 -0400 Subject: [PATCH 04/13] add lru cache eviction metric --- cmd/reflector.go | 2 +- cmd/test.go | 2 +- internal/metrics/metrics.go | 26 +++++++++++----- peer/http3/store.go | 2 ++ peer/server_test.go | 2 +- peer/store.go | 2 ++ reflector/server_test.go | 8 ++--- store/caching.go | 9 ++++-- store/caching_test.go | 23 ++++++++------- store/cloudfront.go | 5 ++++ store/dbbacked.go | 5 ++++ store/disk.go | 59 ++++++++++++++++++++----------------- store/lru.go | 7 +++++ store/lru_test.go | 6 ++-- store/memory.go | 25 +++++++++------- store/memory_test.go | 8 ++--- store/s3.go | 35 ++++++++++++---------- store/store.go | 2 ++ 18 files changed, 143 insertions(+), 85 deletions(-) diff --git a/cmd/reflector.go b/cmd/reflector.go index ddac74b..d46fd09 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -159,7 +159,7 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { if reflectorCmdMemCache > 0 { wrapped = store.NewCachingStore(wrapped, - store.NewLRUStore(store.NewMemoryStore(), reflectorCmdMemCache)) + store.NewLRUStore(store.NewMemStore(), reflectorCmdMemCache)) } return wrapped diff --git a/cmd/test.go b/cmd/test.go index e611b22..a330856 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -29,7 +29,7 @@ func init() { func testCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - memStore := store.NewMemoryStore() + memStore := store.NewMemStore() reflectorServer := reflector.NewServer(memStore) reflectorServer.Timeout = 3 * time.Minute diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 17ce4e0..59cf050 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -57,7 +57,8 @@ func (s *Server) Shutdown() { } const ( - ns = "reflector" + ns = "reflector" + subsystemCache = "cache" labelDirection = "direction" labelErrorType = "error_type" @@ -65,7 +66,8 @@ const ( DirectionUpload = "upload" // to reflector DirectionDownload = "download" // from reflector - MtrLabelSource = "source" + LabelCacheType = "cache_type" + LabelSource = "source" errConnReset = "conn_reset" errReadConnReset = "read_conn_reset" @@ -116,25 +118,35 @@ var ( CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, - Name: "cache_hit_total", + Subsystem: subsystemCache, + Name: "hit_total", Help: "Total number of blobs retrieved from the cache storage", }) CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, - Name: "cache_miss_total", + Subsystem: subsystemCache, + Name: "miss_total", Help: "Total number of blobs retrieved from origin rather than cache storage", }) CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: ns, - Name: "cache_origin_requests_total", + Subsystem: subsystemCache, + Name: "origin_requests_total", Help: "How many Get requests are in flight from the cache to the origin", }) // during thundering-herd situations, the metric below should be a lot smaller than the metric above CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: ns, - Name: "cache_waiting_requests_total", + Subsystem: subsystemCache, + Name: "waiting_requests_total", Help: "How many cache requests are waiting for an in-flight origin request", }) + CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: subsystemCache, + Name: "evict_total", + Help: "Count of blobs evicted from cache", + }, []string{LabelCacheType}) BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, @@ -151,7 +163,7 @@ var ( Namespace: ns, Name: "speed_mbps", Help: "Speed of blob retrieval", - }, []string{MtrLabelSource}) + }, []string{LabelSource}) MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, diff --git a/peer/http3/store.go b/peer/http3/store.go index dea0290..4749ab0 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) { return c, errors.Prefix("connection error", err) } +func (p *Store) Name() string { return "http3" } + // Has asks the peer if they have a hash func (p *Store) Has(hash string) (bool, error) { c, err := p.getClient() diff --git a/peer/server_test.go b/peer/server_test.go index 7c9b5f2..7d21921 100644 --- a/peer/server_test.go +++ b/peer/server_test.go @@ -34,7 +34,7 @@ var availabilityRequests = []pair{ } func getServer(t *testing.T, withBlobs bool) *Server { - st := store.NewMemoryStore() + st := store.NewMemStore() if withBlobs { for k, v := range blobs { err := st.Put(k, v) diff --git a/peer/store.go b/peer/store.go index 3857426..b8abedd 100644 --- a/peer/store.go +++ b/peer/store.go @@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) { return c, errors.Prefix("connection error", err) } +func (p *Store) Name() string { return "peer" } + // Has asks the peer if they have a hash func (p *Store) Has(hash string) (bool, error) { c, err := p.getClient() diff --git a/reflector/server_test.go b/reflector/server_test.go index 1039417..0de200e 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) { t.Fatal(err) } - srv := NewServer(store.NewMemoryStore()) + srv := NewServer(store.NewMemStore()) err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { t.Fatal(err) @@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) { t.Fatal(err) } - srv := NewServer(store.NewMemoryStore()) + srv := NewServer(store.NewMemStore()) srv.Timeout = testTimeout err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { @@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) { //} type mockPartialStore struct { - *store.MemoryStore + *store.MemStore missing []string } @@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) { missing[i] = bits.Rand().String() } - st := store.BlobStore(&mockPartialStore{MemoryStore: store.NewMemoryStore(), missing: missing}) + st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing}) if _, ok := st.(neededBlobChecker); !ok { t.Fatal("mock does not implement the relevant interface") } diff --git a/store/caching.go b/store/caching.go index e4ff6b3..c5ee404 100644 --- a/store/caching.go +++ b/store/caching.go @@ -25,6 +25,11 @@ func NewCachingStore(origin, cache BlobStore) *CachingStore { return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)} } +const nameCaching = "caching" + +// Name is the cache type name +func (c *CachingStore) Name() string { return nameCaching } + // Has checks the cache and then the origin for a hash. It returns true if either store has it. func (c *CachingStore) Has(hash string) (bool, error) { has, err := c.cache.Has(hash) @@ -42,7 +47,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) { if err == nil || !errors.Is(err, ErrBlobNotFound) { metrics.CacheHitCount.Inc() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate) + metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "cache"}).Set(rate) return blob, err } @@ -66,7 +71,7 @@ func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) { } rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate) + metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "origin"}).Set(rate) err = c.cache.Put(hash, blob) return blob, err diff --git a/store/caching_test.go b/store/caching_test.go index f56c833..5ea61d5 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -9,9 +9,9 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -func TestCachingBlobStore_Put(t *testing.T) { - origin := NewMemoryStore() - cache := NewMemoryStore() +func TestCachingStore_Put(t *testing.T) { + origin := NewMemStore() + cache := NewMemStore() s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") @@ -39,9 +39,9 @@ func TestCachingBlobStore_Put(t *testing.T) { } } -func TestCachingBlobStore_CacheMiss(t *testing.T) { - origin := NewMemoryStore() - cache := NewMemoryStore() +func TestCachingStore_CacheMiss(t *testing.T) { + origin := NewMemStore() + cache := NewMemStore() s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") @@ -76,10 +76,10 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) { } } -func TestCachingBlobStore_ThunderingHerd(t *testing.T) { +func TestCachingStore_ThunderingHerd(t *testing.T) { storeDelay := 100 * time.Millisecond origin := NewSlowBlobStore(storeDelay) - cache := NewMemoryStore() + cache := NewMemStore() s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") @@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) { // SlowBlobStore adds a delay to each request type SlowBlobStore struct { - mem *MemoryStore + mem *MemStore delay time.Duration } func NewSlowBlobStore(delay time.Duration) *SlowBlobStore { return &SlowBlobStore{ - mem: NewMemoryStore(), + mem: NewMemStore(), delay: delay, } } +func (s *SlowBlobStore) Name() string { + return "slow" +} func (s *SlowBlobStore) Has(hash string) (bool, error) { time.Sleep(s.delay) diff --git a/store/cloudfront.go b/store/cloudfront.go index c4adb3f..32f1246 100644 --- a/store/cloudfront.go +++ b/store/cloudfront.go @@ -34,6 +34,11 @@ func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore { } } +const nameCloudFront = "cloudfront" + +// Name is the cache type name +func (c *CloudFrontStore) Name() string { return nameCloudFront } + // Has checks if the hash is in the store. func (c *CloudFrontStore) Has(hash string) (bool, error) { status, body, err := c.cfRequest(http.MethodHead, hash) diff --git a/store/dbbacked.go b/store/dbbacked.go index 1b554ca..59c47b5 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore { return &DBBackedStore{blobs: blobs, db: db} } +const nameDBBacked = "db-backed" + +// Name is the cache type name +func (d *DBBackedStore) Name() string { return nameDBBacked } + // Has returns true if the blob is in the store func (d *DBBackedStore) Has(hash string) (bool, error) { return d.db.HasBlob(hash) diff --git a/store/disk.go b/store/disk.go index 71c8058..2a11e8e 100644 --- a/store/disk.go +++ b/store/disk.go @@ -35,34 +35,10 @@ func NewDiskStore(dir string, prefixLength int) *DiskStore { } } -func (d *DiskStore) dir(hash string) string { - if d.prefixLength <= 0 || len(hash) < d.prefixLength { - return d.blobDir - } - return path.Join(d.blobDir, hash[:d.prefixLength]) -} +const nameDisk = "disk" -func (d *DiskStore) path(hash string) string { - return path.Join(d.dir(hash), hash) -} - -func (d *DiskStore) ensureDirExists(dir string) error { - return errors.Err(d.fs.MkdirAll(dir, 0755)) -} - -func (d *DiskStore) initOnce() error { - if d.initialized { - return nil - } - - err := d.ensureDirExists(d.blobDir) - if err != nil { - return err - } - - d.initialized = true - return nil -} +// Name is the cache type name +func (d *DiskStore) Name() string { return nameDisk } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. func (d *DiskStore) Has(hash string) (bool, error) { @@ -166,3 +142,32 @@ func (d *DiskStore) list() ([]string, error) { return existing, nil } + +func (d *DiskStore) dir(hash string) string { + if d.prefixLength <= 0 || len(hash) < d.prefixLength { + return d.blobDir + } + return path.Join(d.blobDir, hash[:d.prefixLength]) +} + +func (d *DiskStore) path(hash string) string { + return path.Join(d.dir(hash), hash) +} + +func (d *DiskStore) ensureDirExists(dir string) error { + return errors.Err(d.fs.MkdirAll(dir, 0755)) +} + +func (d *DiskStore) initOnce() error { + if d.initialized { + return nil + } + + err := d.ensureDirExists(d.blobDir) + if err != nil { + return err + } + + d.initialized = true + return nil +} diff --git a/store/lru.go b/store/lru.go index 22386fa..a9199c4 100644 --- a/store/lru.go +++ b/store/lru.go @@ -3,6 +3,7 @@ package store import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" golru "github.com/hashicorp/golang-lru" ) @@ -18,6 +19,7 @@ type LRUStore struct { // NewLRUStore initialize a new LRUStore func NewLRUStore(store BlobStore, maxItems int) *LRUStore { lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { + metrics.CacheLRUEvictCount.WithLabelValues(store.Name()).Inc() _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there }) if err != nil { @@ -39,6 +41,11 @@ func NewLRUStore(store BlobStore, maxItems int) *LRUStore { return l } +const nameLRU = "lru" + +// Name is the cache type name +func (l *LRUStore) Name() string { return nameLRU } + // Has returns whether the blob is in the store, without updating the recent-ness. func (l *LRUStore) Has(hash string) (bool, error) { return l.lru.Contains(hash), nil diff --git a/store/lru_test.go b/store/lru_test.go index c95ca6b..5b41220 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -14,7 +14,7 @@ import ( const cacheMaxBlobs = 3 -func testLRUStore() (*LRUStore, *DiskStore) { +func getTestLRUStore() (*LRUStore, *DiskStore) { d := NewDiskStore("/", 2) d.fs = afero.NewMemMapFs() return NewLRUStore(d, 3), d @@ -42,7 +42,7 @@ func countOnDisk(t *testing.T, disk *DiskStore) int { } func TestLRUStore_Eviction(t *testing.T) { - lru, disk := testLRUStore() + lru, disk := getTestLRUStore() b := []byte("x") err := lru.Put("one", b) require.NoError(t, err) @@ -98,7 +98,7 @@ func TestLRUStore_Eviction(t *testing.T) { } func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { - lru, disk := testLRUStore() + lru, disk := getTestLRUStore() hash := "hash" b := []byte("this is a blob of stuff") err := lru.Put(hash, b) diff --git a/store/memory.go b/store/memory.go index c50f282..47969fc 100644 --- a/store/memory.go +++ b/store/memory.go @@ -5,25 +5,30 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -// MemoryStore is an in memory only blob store with no persistence. -type MemoryStore struct { +// MemStore is an in memory only blob store with no persistence. +type MemStore struct { blobs map[string]stream.Blob } -func NewMemoryStore() *MemoryStore { - return &MemoryStore{ +func NewMemStore() *MemStore { + return &MemStore{ blobs: make(map[string]stream.Blob), } } +const nameMem = "mem" + +// Name is the cache type name +func (m *MemStore) Name() string { return nameMem } + // Has returns T/F if the blob is currently stored. It will never error. -func (m *MemoryStore) Has(hash string) (bool, error) { +func (m *MemStore) Has(hash string) (bool, error) { _, ok := m.blobs[hash] return ok, nil } // Get returns the blob byte slice if present and errors if the blob is not found. -func (m *MemoryStore) Get(hash string) (stream.Blob, error) { +func (m *MemStore) Get(hash string) (stream.Blob, error) { blob, ok := m.blobs[hash] if !ok { return nil, errors.Err(ErrBlobNotFound) @@ -32,23 +37,23 @@ func (m *MemoryStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob in memory -func (m *MemoryStore) Put(hash string, blob stream.Blob) error { +func (m *MemStore) Put(hash string, blob stream.Blob) error { m.blobs[hash] = blob return nil } // PutSD stores the sd blob in memory -func (m *MemoryStore) PutSD(hash string, blob stream.Blob) error { +func (m *MemStore) PutSD(hash string, blob stream.Blob) error { return m.Put(hash, blob) } // Delete deletes the blob from the store -func (m *MemoryStore) Delete(hash string) error { +func (m *MemStore) Delete(hash string) error { delete(m.blobs, hash) return nil } // Debug returns the blobs in memory. It's useful for testing and debugging. -func (m *MemoryStore) Debug() map[string]stream.Blob { +func (m *MemStore) Debug() map[string]stream.Blob { return m.blobs } diff --git a/store/memory_test.go b/store/memory_test.go index 892c735..8d85114 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -7,8 +7,8 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" ) -func TestMemoryBlobStore_Put(t *testing.T) { - s := NewMemoryStore() +func TestMemStore_Put(t *testing.T) { + s := NewMemStore() blob := []byte("abcdefg") err := s.Put("abc", blob) if err != nil { @@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) { } } -func TestMemoryBlobStore_Get(t *testing.T) { - s := NewMemoryStore() +func TestMemStore_Get(t *testing.T) { + s := NewMemStore() hash := "abc" blob := []byte("abcdefg") err := s.Put(hash, blob) diff --git a/store/s3.go b/store/s3.go index 2189ef1..53451be 100644 --- a/store/s3.go +++ b/store/s3.go @@ -38,22 +38,10 @@ func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store { } } -func (s *S3Store) initOnce() error { - if s.session != nil { - return nil - } +const nameS3 = "s3" - sess, err := session.NewSession(&aws.Config{ - Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), - Region: aws.String(s.region), - }) - if err != nil { - return err - } - - s.session = sess - return nil -} +// Name is the cache type name +func (s *S3Store) Name() string { return nameS3 } // Has returns T/F or Error ( from S3 ) if the store contains the blob. func (s *S3Store) Has(hash string) (bool, error) { @@ -153,3 +141,20 @@ func (s *S3Store) Delete(hash string) error { return err } + +func (s *S3Store) initOnce() error { + if s.session != nil { + return nil + } + + sess, err := session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), + Region: aws.String(s.region), + }) + if err != nil { + return err + } + + s.session = sess + return nil +} diff --git a/store/store.go b/store/store.go index f8f6dd3..80d56c1 100644 --- a/store/store.go +++ b/store/store.go @@ -7,6 +7,8 @@ import ( // BlobStore is an interface for handling blob storage. type BlobStore interface { + // Name of blob store (useful for metrics) + Name() string // Does blob exist in the store Has(hash string) (bool, error) // Get the blob from the store -- 2.45.3 From 7a3225434e8720a9a91866cd52cb4abab986c65e Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 26 Oct 2020 12:27:27 -0400 Subject: [PATCH 05/13] add test for DiskStore.list() --- store/disk.go | 7 ++++++- store/lru_test.go | 20 ++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/store/disk.go b/store/disk.go index 2a11e8e..2d8f435 100644 --- a/store/disk.go +++ b/store/disk.go @@ -117,8 +117,13 @@ func (d *DiskStore) Delete(hash string) error { return errors.Err(err) } -// list returns a slice of blobs that already exist in the blobDir +// list returns the hashes of blobs that already exist in the blobDir func (d *DiskStore) list() ([]string, error) { + err := d.initOnce() + if err != nil { + return nil, err + } + dirs, err := afero.ReadDir(d.fs, d.blobDir) if err != nil { return nil, err diff --git a/store/lru_test.go b/store/lru_test.go index 5b41220..9a2a94a 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -119,3 +119,23 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { // lru.Get() removes hash if underlying store doesn't have it assert.False(t, lru.lru.Contains(hash)) } + +func TestLRUStore_loadExisting(t *testing.T) { + d := NewDiskStore("/", 2) + d.fs = afero.NewMemMapFs() + + hash := "hash" + b := []byte("this is a blob of stuff") + err := d.Put(hash, b) + require.NoError(t, err) + + existing, err := d.list() + require.NoError(t, err) + require.Equal(t, 1, len(existing), "blob should exist in cache") + assert.Equal(t, hash, existing[0]) + + lru := NewLRUStore(d, 3) // lru should load existing blobs when it's created + has, err := lru.Has(hash) + require.NoError(t, err) + assert.True(t, has, "hash should be loaded from disk store but it's not") +} -- 2.45.3 From f131c1f35b3763411c0e83abf313b72206b83251 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 26 Oct 2020 13:12:01 -0400 Subject: [PATCH 06/13] add noop store that does nothing --- store/noop.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 store/noop.go diff --git a/store/noop.go b/store/noop.go new file mode 100644 index 0000000..9e5d815 --- /dev/null +++ b/store/noop.go @@ -0,0 +1,15 @@ +package store + +import "github.com/lbryio/lbry.go/v2/stream" + +// NoopStore is a store that does nothing +type NoopStore struct{} + +const nameNoop = "noop" + +func (n *NoopStore) Name() string { return nameNoop } +func (n *NoopStore) Has(_ string) (bool, error) { return false, nil } +func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil } +func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil } +func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil } +func (n *NoopStore) Delete(_ string) error { return nil } -- 2.45.3 From 124d4065c249f0425cfbdbacd7d5536eebe0ba4a Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 28 Oct 2020 13:59:02 -0400 Subject: [PATCH 07/13] split cloudfront into RO and RW stores --- cmd/reflector.go | 2 +- store/{cloudfront.go => cloudfront_ro.go} | 44 ++++++++------------ store/cloudfront_rw.go | 50 +++++++++++++++++++++++ 3 files changed, 67 insertions(+), 29 deletions(-) rename store/{cloudfront.go => cloudfront_ro.go} (60%) create mode 100644 store/cloudfront_rw.go diff --git a/cmd/reflector.go b/cmd/reflector.go index d46fd09..ff379ad 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -124,7 +124,7 @@ func setupStore() store.BlobStore { } else { s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) if cloudFrontEndpoint != "" { - s = store.NewCloudFrontStore(s3Store, cloudFrontEndpoint) + s = store.NewCloudFrontRWStore(store.NewCloudFrontROStore(cloudFrontEndpoint), s3Store) } else { s = s3Store } diff --git a/store/cloudfront.go b/store/cloudfront_ro.go similarity index 60% rename from store/cloudfront.go rename to store/cloudfront_ro.go index 32f1246..a914285 100644 --- a/store/cloudfront.go +++ b/store/cloudfront_ro.go @@ -15,32 +15,23 @@ import ( log "github.com/sirupsen/logrus" ) -// CloudFrontStore wraps an S3 store. Reads go to Cloudfront, writes go to S3. -type CloudFrontStore struct { - s3 *S3Store +// CloudFrontROStore reads from cloudfront. All writes panic. +type CloudFrontROStore struct { endpoint string // cloudflare endpoint } -// NewCloudFrontStore returns an initialized CloudFrontStore store pointer. -// NOTE: It panics if S3Store is nil. -func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore { - if s3 == nil { - panic("S3Store must not be nil") - } - - return &CloudFrontStore{ - endpoint: cfEndpoint, - s3: s3, - } +// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer. +func NewCloudFrontROStore(endpoint string) *CloudFrontROStore { + return &CloudFrontROStore{endpoint: endpoint} } -const nameCloudFront = "cloudfront" +const nameCloudFrontRO = "cloudfront_ro" // Name is the cache type name -func (c *CloudFrontStore) Name() string { return nameCloudFront } +func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO } // Has checks if the hash is in the store. -func (c *CloudFrontStore) Has(hash string) (bool, error) { +func (c *CloudFrontROStore) Has(hash string) (bool, error) { status, body, err := c.cfRequest(http.MethodHead, hash) if err != nil { return false, err @@ -58,7 +49,7 @@ func (c *CloudFrontStore) Has(hash string) (bool, error) { } // Get gets the blob from Cloudfront. -func (c *CloudFrontStore) Get(hash string) (stream.Blob, error) { +func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) { log.Debugf("Getting %s from S3", hash[:8]) defer func(t time.Time) { log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) @@ -85,7 +76,7 @@ func (c *CloudFrontStore) Get(hash string) (stream.Blob, error) { } } -func (c *CloudFrontStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { +func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { url := c.endpoint + hash req, err := http.NewRequest(method, url, nil) if err != nil { @@ -101,17 +92,14 @@ func (c *CloudFrontStore) cfRequest(method, hash string) (int, io.ReadCloser, er return res.StatusCode, res.Body, nil } -// Put stores the blob on S3 -func (c *CloudFrontStore) Put(hash string, blob stream.Blob) error { - return c.s3.Put(hash, blob) +func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") } -// PutSD stores the sd blob on S3 -func (c *CloudFrontStore) PutSD(hash string, blob stream.Blob) error { - return c.s3.PutSD(hash, blob) +func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") } -// Delete deletes the blob from S3 -func (c *CloudFrontStore) Delete(hash string) error { - return c.s3.Delete(hash) +func (c *CloudFrontROStore) Delete(_ string) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") } diff --git a/store/cloudfront_rw.go b/store/cloudfront_rw.go new file mode 100644 index 0000000..ee771da --- /dev/null +++ b/store/cloudfront_rw.go @@ -0,0 +1,50 @@ +package store + +import ( + "github.com/lbryio/lbry.go/v2/stream" +) + +// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3. +type CloudFrontRWStore struct { + cf *CloudFrontROStore + s3 *S3Store +} + +// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer. +// NOTE: It panics if either argument is nil. +func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore { + if cf == nil || s3 == nil { + panic("both stores must be set") + } + return &CloudFrontRWStore{cf: cf, s3: s3} +} + +const nameCloudFrontRW = "cloudfront_rw" + +// Name is the cache type name +func (c *CloudFrontRWStore) Name() string { return nameCloudFrontRW } + +// Has checks if the hash is in the store. +func (c *CloudFrontRWStore) Has(hash string) (bool, error) { + return c.cf.Has(hash) +} + +// Get gets the blob from Cloudfront. +func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) { + return c.cf.Get(hash) +} + +// Put stores the blob on S3 +func (c *CloudFrontRWStore) Put(hash string, blob stream.Blob) error { + return c.s3.Put(hash, blob) +} + +// PutSD stores the sd blob on S3 +func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error { + return c.s3.PutSD(hash, blob) +} + +// Delete deletes the blob from S3 +func (c *CloudFrontRWStore) Delete(hash string) error { + return c.s3.Delete(hash) +} -- 2.45.3 From 070c378dfd930a178b530acde07642c3933c68a9 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 28 Oct 2020 15:15:35 -0400 Subject: [PATCH 08/13] apparently the normal check doesn't work --- peer/http3/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer/http3/server.go b/peer/http3/server.go index d0a39e1..55ab5fc 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -164,7 +164,7 @@ func generateTLSConfig() *tls.Config { func (s *Server) listenAndServe(server *http3.Server) { err := server.ListenAndServe() - if err != nil && !errors.Is(err, http.ErrServerClosed) { + if err != nil && err.Error() != "server closed" { log.Errorln(errors.FullTrace(err)) } } -- 2.45.3 From 560e180e368ca18dd5dc3c23d31e0c3454ae7256 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 29 Oct 2020 12:39:53 -0400 Subject: [PATCH 09/13] separate singleflight cache wrapper, component names for cache metrics --- cmd/getstream.go | 1 + cmd/reflector.go | 14 +++++--- internal/metrics/metrics.go | 38 ++++++++++++--------- store/caching.go | 51 ++++++++++------------------ store/caching_test.go | 6 ++-- store/lru.go | 4 +-- store/lru_test.go | 4 +-- store/singleflight.go | 67 +++++++++++++++++++++++++++++++++++++ 8 files changed, 126 insertions(+), 59 deletions(-) create mode 100644 store/singleflight.go diff --git a/cmd/getstream.go b/cmd/getstream.go index ae586d7..c02ad0b 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -29,6 +29,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) { sdHash := args[1] s := store.NewCachingStore( + "getstream", peer.NewStore(peer.StoreOpts{Address: addr}), store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2), ) diff --git a/cmd/reflector.go b/cmd/reflector.go index ff379ad..2f687d0 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -153,13 +153,19 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { if err != nil { log.Fatal(err) } - wrapped = store.NewCachingStore(wrapped, - store.NewLRUStore(store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize)) + wrapped = store.NewCachingStore( + "reflector", + wrapped, + store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize), + ) } if reflectorCmdMemCache > 0 { - wrapped = store.NewCachingStore(wrapped, - store.NewLRUStore(store.NewMemStore(), reflectorCmdMemCache)) + wrapped = store.NewCachingStore( + "reflector", + wrapped, + store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache), + ) } return wrapped diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 59cf050..8d46266 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -12,6 +12,7 @@ import ( ee "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -67,6 +68,7 @@ const ( DirectionDownload = "download" // from reflector LabelCacheType = "cache_type" + LabelComponent = "component" LabelSource = "source" errConnReset = "conn_reset" @@ -116,37 +118,42 @@ var ( Help: "Total number of blobs downloaded from reflector through QUIC protocol", }) - CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ + CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "hit_total", Help: "Total number of blobs retrieved from the cache storage", - }) - CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{LabelCacheType, LabelComponent}) + CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "miss_total", Help: "Total number of blobs retrieved from origin rather than cache storage", - }) - CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{LabelCacheType, LabelComponent}) + CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "origin_requests_total", Help: "How many Get requests are in flight from the cache to the origin", - }) + }, []string{LabelCacheType, LabelComponent}) // during thundering-herd situations, the metric below should be a lot smaller than the metric above - CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ + CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "waiting_requests_total", Help: "How many cache requests are waiting for an in-flight origin request", - }) + }, []string{LabelCacheType, LabelComponent}) CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, Subsystem: subsystemCache, Name: "evict_total", Help: "Count of blobs evicted from cache", - }, []string{LabelCacheType}) + }, []string{LabelCacheType, LabelComponent}) + CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "speed_mbps", + Help: "Speed of blob retrieval from cache or from origin", + }, []string{LabelCacheType, LabelComponent, LabelSource}) BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, @@ -159,12 +166,6 @@ var ( Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", }) - RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: ns, - Name: "speed_mbps", - Help: "Speed of blob retrieval", - }, []string{LabelSource}) - MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "tcp_in_bytes", @@ -202,6 +203,13 @@ var ( }) ) +func CacheLabels(name, component string) prometheus.Labels { + return prometheus.Labels{ + LabelCacheType: name, + LabelComponent: component, + } +} + func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever if e == nil { return diff --git a/store/caching.go b/store/caching.go index c5ee404..0a06395 100644 --- a/store/caching.go +++ b/store/caching.go @@ -7,8 +7,6 @@ import ( "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" - - "golang.org/x/sync/singleflight" ) // CachingStore combines two stores, typically a local and a remote store, to improve performance. @@ -16,13 +14,16 @@ import ( // are retrieved from the origin and cached. Puts are cached and also forwarded to the origin. type CachingStore struct { origin, cache BlobStore - - sf *singleflight.Group + component string } // NewCachingStore makes a new caching disk store and returns a pointer to it. -func NewCachingStore(origin, cache BlobStore) *CachingStore { - return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)} +func NewCachingStore(component string, origin, cache BlobStore) *CachingStore { + return &CachingStore{ + component: component, + origin: WithSingleFlight(component, origin), + cache: cache, + } } const nameCaching = "caching" @@ -45,41 +46,25 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) { start := time.Now() blob, err := c.cache.Get(hash) if err == nil || !errors.Is(err, ErrBlobNotFound) { - metrics.CacheHitCount.Inc() + metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "cache"}).Set(rate) + metrics.CacheRetrievalSpeed.With(map[string]string{ + metrics.LabelCacheType: c.cache.Name(), + metrics.LabelComponent: c.component, + metrics.LabelSource: "cache", + }).Set(rate) return blob, err } - metrics.CacheMissCount.Inc() - return c.getFromOrigin(hash) -} + metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() -// getFromOrigin ensures that only one Get per hash is sent to the origin at a time, -// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem -func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) { - metrics.CacheWaitingRequestsCount.Inc() - defer metrics.CacheWaitingRequestsCount.Dec() - originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) { - metrics.CacheOriginRequestsCount.Inc() - defer metrics.CacheOriginRequestsCount.Dec() - - start := time.Now() - blob, err := c.origin.Get(hash) - if err != nil { - return nil, err - } - - rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.LabelSource: "origin"}).Set(rate) - - err = c.cache.Put(hash, blob) - return blob, err - }) + blob, err = c.origin.Get(hash) if err != nil { return nil, err } - return originBlob.(stream.Blob), nil + + err = c.cache.Put(hash, blob) + return blob, err } // Put stores the blob in the origin and the cache diff --git a/store/caching_test.go b/store/caching_test.go index 5ea61d5..34f928c 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -12,7 +12,7 @@ import ( func TestCachingStore_Put(t *testing.T) { origin := NewMemStore() cache := NewMemStore() - s := NewCachingStore(origin, cache) + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -42,7 +42,7 @@ func TestCachingStore_Put(t *testing.T) { func TestCachingStore_CacheMiss(t *testing.T) { origin := NewMemStore() cache := NewMemStore() - s := NewCachingStore(origin, cache) + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -80,7 +80,7 @@ func TestCachingStore_ThunderingHerd(t *testing.T) { storeDelay := 100 * time.Millisecond origin := NewSlowBlobStore(storeDelay) cache := NewMemStore() - s := NewCachingStore(origin, cache) + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" diff --git a/store/lru.go b/store/lru.go index a9199c4..4ef4908 100644 --- a/store/lru.go +++ b/store/lru.go @@ -17,9 +17,9 @@ type LRUStore struct { } // NewLRUStore initialize a new LRUStore -func NewLRUStore(store BlobStore, maxItems int) *LRUStore { +func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore { lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { - metrics.CacheLRUEvictCount.WithLabelValues(store.Name()).Inc() + metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc() _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there }) if err != nil { diff --git a/store/lru_test.go b/store/lru_test.go index 9a2a94a..0576377 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -17,7 +17,7 @@ const cacheMaxBlobs = 3 func getTestLRUStore() (*LRUStore, *DiskStore) { d := NewDiskStore("/", 2) d.fs = afero.NewMemMapFs() - return NewLRUStore(d, 3), d + return NewLRUStore("test", d, 3), d } func countOnDisk(t *testing.T, disk *DiskStore) int { @@ -134,7 +134,7 @@ func TestLRUStore_loadExisting(t *testing.T) { require.Equal(t, 1, len(existing), "blob should exist in cache") assert.Equal(t, hash, existing[0]) - lru := NewLRUStore(d, 3) // lru should load existing blobs when it's created + lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created has, err := lru.Has(hash) require.NoError(t, err) assert.True(t, has, "hash should be loaded from disk store but it's not") diff --git a/store/singleflight.go b/store/singleflight.go new file mode 100644 index 0000000..fbe314f --- /dev/null +++ b/store/singleflight.go @@ -0,0 +1,67 @@ +package store + +import ( + "time" + + "github.com/lbryio/reflector.go/internal/metrics" + + "github.com/lbryio/lbry.go/v2/stream" + + "golang.org/x/sync/singleflight" +) + +func WithSingleFlight(component string, origin BlobStore) BlobStore { + return &singleflightStore{ + BlobStore: origin, + component: component, + sf: new(singleflight.Group), + } +} + +type singleflightStore struct { + BlobStore + + component string + sf *singleflight.Group +} + +func (s *singleflightStore) Name() string { + return "sf_" + s.BlobStore.Name() +} + +// Get ensures that only one request per hash is sent to the origin at a time, +// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem +func (s *singleflightStore) Get(hash string) (stream.Blob, error) { + metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() + defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() + + blob, err, _ := s.sf.Do(hash, s.getter(hash)) + if err != nil { + return nil, err + } + return blob.(stream.Blob), nil +} + +// getter returns a function that gets a blob from the origin +// only one getter per hash will be executing at a time +func (s *singleflightStore) getter(hash string) func() (interface{}, error) { + return func() (interface{}, error) { + metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() + defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() + + start := time.Now() + blob, err := s.BlobStore.Get(hash) + if err != nil { + return nil, err + } + + rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() + metrics.CacheRetrievalSpeed.With(map[string]string{ + metrics.LabelCacheType: s.BlobStore.Name(), + metrics.LabelComponent: s.component, + metrics.LabelSource: "origin", + }).Set(rate) + + return blob, nil + } +} -- 2.45.3 From 72571236ab1b119ba2eb9c8ff557c30d7b173429 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 29 Oct 2020 14:22:58 -0400 Subject: [PATCH 10/13] clarify Get() error requirement --- store/store.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/store/store.go b/store/store.go index 80d56c1..200ff61 100644 --- a/store/store.go +++ b/store/store.go @@ -9,15 +9,15 @@ import ( type BlobStore interface { // Name of blob store (useful for metrics) Name() string - // Does blob exist in the store + // Does blob exist in the store. Has(hash string) (bool, error) - // Get the blob from the store + // Get the blob from the store. Must return ErrBlobNotFound if blob is not in store. Get(hash string) (stream.Blob, error) - // Put the blob into the store + // Put the blob into the store. Put(hash string, blob stream.Blob) error - // Put an SD blob into the store + // Put an SD blob into the store. PutSD(hash string, blob stream.Blob) error - // Delete the blob from the store + // Delete the blob from the store. Delete(hash string) error } -- 2.45.3 From 131fed28d27729b0eade6beb54cb7b0e16e64fdb Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Fri, 30 Oct 2020 15:01:56 -0400 Subject: [PATCH 11/13] add faster file.Walk fn. meant for DiskStore.list() --- go.mod | 1 + go.sum | 2 + store/speedwalk/speedwalk.go | 84 ++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+) create mode 100644 store/speedwalk/speedwalk.go diff --git a/go.mod b/go.mod index b7de5f7..9df120d 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/hashicorp/serf v0.8.2 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07 + github.com/karrick/godirwalk v1.16.1 github.com/lbryio/chainquery v1.9.0 github.com/lbryio/lbry.go v1.1.2 // indirect github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128 diff --git a/go.sum b/go.sum index 01d1580..84b207a 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= +github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw= +github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= github.com/keybase/go-ps v0.0.0-20161005175911-668c8856d999/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= diff --git a/store/speedwalk/speedwalk.go b/store/speedwalk/speedwalk.go new file mode 100644 index 0000000..1c94ed1 --- /dev/null +++ b/store/speedwalk/speedwalk.go @@ -0,0 +1,84 @@ +package speedwalk + +import ( + "io/ioutil" + "path/filepath" + "runtime" + "sync" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/karrick/godirwalk" + "github.com/sirupsen/logrus" +) + +// AllFiles recursively lists every file in every subdirectory of a given directory +// If basename is true, retun the basename of each file. Otherwise return the full path starting at startDir. +func AllFiles(startDir string, basename bool) ([]string, error) { + items, err := ioutil.ReadDir(startDir) + if err != nil { + return nil, err + } + + pathChan := make(chan string) + paths := make([]string, 0, 1000) + go func() { + for { + path, ok := <-pathChan + if !ok { + return + } + paths = append(paths, path) + } + }() + + wg := &sync.WaitGroup{} + maxThreads := runtime.NumCPU() - 1 + goroutineLimiter := make(chan struct{}, maxThreads) + for i := 0; i < maxThreads; i++ { + goroutineLimiter <- struct{}{} + } + + for _, item := range items { + if !item.IsDir() { + if basename { + pathChan <- item.Name() + } else { + pathChan <- filepath.Join(startDir, item.Name()) + } + continue + } + + <-goroutineLimiter + wg.Add(1) + + go func(dir string) { + defer func() { + wg.Done() + goroutineLimiter <- struct{}{} + }() + + err = godirwalk.Walk(filepath.Join(startDir, dir), &godirwalk.Options{ + Unsorted: true, // faster this way + Callback: func(osPathname string, de *godirwalk.Dirent) error { + if de.IsRegular() { + if basename { + pathChan <- de.Name() + } else { + pathChan <- filepath.Join(startDir, osPathname) + } + } + return nil + }, + }) + if err != nil { + logrus.Errorf(errors.FullTrace(err)) + } + }(item.Name()) + } + + wg.Wait() + + close(pathChan) + return paths, nil +} -- 2.45.3 From aaae3ffa5bbc76e8dcedcba35f4d9a603e8da842 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 2 Nov 2020 14:35:04 -0500 Subject: [PATCH 12/13] remove afero fs abstraction in prep for using speedwalk --- store/disk.go | 20 +++++++------------ store/lru_test.go | 50 ++++++++++++++--------------------------------- store/memory.go | 1 + 3 files changed, 23 insertions(+), 48 deletions(-) diff --git a/store/disk.go b/store/disk.go index 2d8f435..008897b 100644 --- a/store/disk.go +++ b/store/disk.go @@ -8,8 +8,6 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - - "github.com/spf13/afero" ) // DiskStore stores blobs on a local disk @@ -19,9 +17,6 @@ type DiskStore struct { // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. prefixLength int - // filesystem abstraction - fs afero.Fs - // true if initOnce ran, false otherwise initialized bool } @@ -31,7 +26,6 @@ func NewDiskStore(dir string, prefixLength int) *DiskStore { return &DiskStore{ blobDir: dir, prefixLength: prefixLength, - fs: afero.NewOsFs(), } } @@ -47,7 +41,7 @@ func (d *DiskStore) Has(hash string) (bool, error) { return false, err } - _, err = d.fs.Stat(d.path(hash)) + _, err = os.Stat(d.path(hash)) if err != nil { if os.IsNotExist(err) { return false, nil @@ -64,7 +58,7 @@ func (d *DiskStore) Get(hash string) (stream.Blob, error) { return nil, err } - file, err := d.fs.Open(d.path(hash)) + file, err := os.Open(d.path(hash)) if err != nil { if os.IsNotExist(err) { return nil, errors.Err(ErrBlobNotFound) @@ -89,7 +83,7 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error { return err } - err = afero.WriteFile(d.fs, d.path(hash), blob, 0644) + err = ioutil.WriteFile(d.path(hash), blob, 0644) return errors.Err(err) } @@ -113,7 +107,7 @@ func (d *DiskStore) Delete(hash string) error { return nil } - err = d.fs.Remove(d.path(hash)) + err = os.Remove(d.path(hash)) return errors.Err(err) } @@ -124,7 +118,7 @@ func (d *DiskStore) list() ([]string, error) { return nil, err } - dirs, err := afero.ReadDir(d.fs, d.blobDir) + dirs, err := ioutil.ReadDir(d.blobDir) if err != nil { return nil, err } @@ -133,7 +127,7 @@ func (d *DiskStore) list() ([]string, error) { for _, dir := range dirs { if dir.IsDir() { - files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name())) + files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name())) if err != nil { return nil, err } @@ -160,7 +154,7 @@ func (d *DiskStore) path(hash string) string { } func (d *DiskStore) ensureDirExists(dir string) error { - return errors.Err(d.fs.MkdirAll(dir, 0755)) + return errors.Err(os.MkdirAll(dir, 0755)) } func (d *DiskStore) initOnce() error { diff --git a/store/lru_test.go b/store/lru_test.go index 0576377..968956c 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -1,48 +1,26 @@ package store import ( + "io/ioutil" "os" "reflect" "testing" "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) const cacheMaxBlobs = 3 -func getTestLRUStore() (*LRUStore, *DiskStore) { - d := NewDiskStore("/", 2) - d.fs = afero.NewMemMapFs() - return NewLRUStore("test", d, 3), d -} - -func countOnDisk(t *testing.T, disk *DiskStore) int { - t.Helper() - - count := 0 - afero.Walk(disk.fs, "/", func(path string, info os.FileInfo, err error) error { - if err != nil { - t.Fatal(err) - } - if !info.IsDir() { - count++ - } - return nil - }) - - list, err := disk.list() - require.NoError(t, err) - require.Equal(t, count, len(list)) - - return count +func getTestLRUStore() (*LRUStore, *MemStore) { + m := NewMemStore() + return NewLRUStore("test", m, 3), m } func TestLRUStore_Eviction(t *testing.T) { - lru, disk := getTestLRUStore() + lru, mem := getTestLRUStore() b := []byte("x") err := lru.Put("one", b) require.NoError(t, err) @@ -55,7 +33,7 @@ func TestLRUStore_Eviction(t *testing.T) { err = lru.Put("five", b) require.NoError(t, err) - assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk)) + assert.Equal(t, cacheMaxBlobs, len(mem.Debug())) for k, v := range map[string]bool{ "one": false, @@ -73,7 +51,7 @@ func TestLRUStore_Eviction(t *testing.T) { lru.Get("three") // touch so it stays in cache lru.Put("six", b) - assert.Equal(t, cacheMaxBlobs, countOnDisk(t, disk)) + assert.Equal(t, cacheMaxBlobs, len(mem.Debug())) for k, v := range map[string]bool{ "one": false, @@ -94,17 +72,17 @@ func TestLRUStore_Eviction(t *testing.T) { assert.NoError(t, err) err = lru.Delete("six") assert.NoError(t, err) - assert.Equal(t, 0, countOnDisk(t, disk)) + assert.Equal(t, 0, len(mem.Debug())) } func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { - lru, disk := getTestLRUStore() + lru, mem := getTestLRUStore() hash := "hash" b := []byte("this is a blob of stuff") err := lru.Put(hash, b) require.NoError(t, err) - err = disk.fs.Remove("/ha/hash") + err = mem.Delete(hash) require.NoError(t, err) // hash still exists in lru @@ -121,12 +99,14 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { } func TestLRUStore_loadExisting(t *testing.T) { - d := NewDiskStore("/", 2) - d.fs = afero.NewMemMapFs() + tmpDir, err := ioutil.TempDir("", "reflector_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + d := NewDiskStore(tmpDir, 2) hash := "hash" b := []byte("this is a blob of stuff") - err := d.Put(hash, b) + err = d.Put(hash, b) require.NoError(t, err) existing, err := d.list() diff --git a/store/memory.go b/store/memory.go index 47969fc..f46a0f2 100644 --- a/store/memory.go +++ b/store/memory.go @@ -6,6 +6,7 @@ import ( ) // MemStore is an in memory only blob store with no persistence. +// MemStore is NOT THREAD-SAFE type MemStore struct { blobs map[string]stream.Blob } -- 2.45.3 From 659a6e73cc6cfea12294515bb9f1518a8e4b143a Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Mon, 2 Nov 2020 14:48:56 -0500 Subject: [PATCH 13/13] use speedwalk for faster file listing --- store/disk.go | 25 ++----------------------- store/memory.go | 15 ++++++++++++++- store/speedwalk/speedwalk.go | 17 +++++++++++------ 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/store/disk.go b/store/disk.go index 008897b..4ac2a0f 100644 --- a/store/disk.go +++ b/store/disk.go @@ -4,10 +4,10 @@ import ( "io/ioutil" "os" "path" - "path/filepath" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/store/speedwalk" ) // DiskStore stores blobs on a local disk @@ -118,28 +118,7 @@ func (d *DiskStore) list() ([]string, error) { return nil, err } - dirs, err := ioutil.ReadDir(d.blobDir) - if err != nil { - return nil, err - } - - var existing []string - - for _, dir := range dirs { - if dir.IsDir() { - files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name())) - if err != nil { - return nil, err - } - for _, file := range files { - if file.Mode().IsRegular() && !file.IsDir() { - existing = append(existing, file.Name()) - } - } - } - } - - return existing, nil + return speedwalk.AllFiles(d.blobDir, true) } func (d *DiskStore) dir(hash string) string { diff --git a/store/memory.go b/store/memory.go index f46a0f2..f462a8d 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,19 +1,22 @@ package store import ( + "sync" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" ) // MemStore is an in memory only blob store with no persistence. -// MemStore is NOT THREAD-SAFE type MemStore struct { blobs map[string]stream.Blob + mu *sync.RWMutex } func NewMemStore() *MemStore { return &MemStore{ blobs: make(map[string]stream.Blob), + mu: &sync.RWMutex{}, } } @@ -24,12 +27,16 @@ func (m *MemStore) Name() string { return nameMem } // Has returns T/F if the blob is currently stored. It will never error. func (m *MemStore) Has(hash string) (bool, error) { + m.mu.RLock() + defer m.mu.RUnlock() _, ok := m.blobs[hash] return ok, nil } // Get returns the blob byte slice if present and errors if the blob is not found. func (m *MemStore) Get(hash string) (stream.Blob, error) { + m.mu.RLock() + defer m.mu.RUnlock() blob, ok := m.blobs[hash] if !ok { return nil, errors.Err(ErrBlobNotFound) @@ -39,6 +46,8 @@ func (m *MemStore) Get(hash string) (stream.Blob, error) { // Put stores the blob in memory func (m *MemStore) Put(hash string, blob stream.Blob) error { + m.mu.Lock() + defer m.mu.Unlock() m.blobs[hash] = blob return nil } @@ -50,11 +59,15 @@ func (m *MemStore) PutSD(hash string, blob stream.Blob) error { // Delete deletes the blob from the store func (m *MemStore) Delete(hash string) error { + m.mu.Lock() + defer m.mu.Unlock() delete(m.blobs, hash) return nil } // Debug returns the blobs in memory. It's useful for testing and debugging. func (m *MemStore) Debug() map[string]stream.Blob { + m.mu.RLock() + defer m.mu.RUnlock() return m.blobs } diff --git a/store/speedwalk/speedwalk.go b/store/speedwalk/speedwalk.go index 1c94ed1..e2563ba 100644 --- a/store/speedwalk/speedwalk.go +++ b/store/speedwalk/speedwalk.go @@ -13,7 +13,7 @@ import ( ) // AllFiles recursively lists every file in every subdirectory of a given directory -// If basename is true, retun the basename of each file. Otherwise return the full path starting at startDir. +// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir. func AllFiles(startDir string, basename bool) ([]string, error) { items, err := ioutil.ReadDir(startDir) if err != nil { @@ -22,7 +22,10 @@ func AllFiles(startDir string, basename bool) ([]string, error) { pathChan := make(chan string) paths := make([]string, 0, 1000) + pathWG := &sync.WaitGroup{} + pathWG.Add(1) go func() { + defer pathWG.Done() for { path, ok := <-pathChan if !ok { @@ -32,13 +35,13 @@ func AllFiles(startDir string, basename bool) ([]string, error) { } }() - wg := &sync.WaitGroup{} maxThreads := runtime.NumCPU() - 1 goroutineLimiter := make(chan struct{}, maxThreads) for i := 0; i < maxThreads; i++ { goroutineLimiter <- struct{}{} } + walkerWG := &sync.WaitGroup{} for _, item := range items { if !item.IsDir() { if basename { @@ -50,11 +53,11 @@ func AllFiles(startDir string, basename bool) ([]string, error) { } <-goroutineLimiter - wg.Add(1) + walkerWG.Add(1) go func(dir string) { defer func() { - wg.Done() + walkerWG.Done() goroutineLimiter <- struct{}{} }() @@ -65,7 +68,7 @@ func AllFiles(startDir string, basename bool) ([]string, error) { if basename { pathChan <- de.Name() } else { - pathChan <- filepath.Join(startDir, osPathname) + pathChan <- osPathname } } return nil @@ -77,8 +80,10 @@ func AllFiles(startDir string, basename bool) ([]string, error) { }(item.Name()) } - wg.Wait() + walkerWG.Wait() close(pathChan) + pathWG.Wait() + return paths, nil } -- 2.45.3