From c9fa04043c071ec04aadd6cc2af7ebda6afcde8f Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 22 Oct 2020 13:12:31 -0400 Subject: [PATCH] rename the stores, add caching to reflector cmd --- cmd/getstream.go | 4 +- cmd/peer.go | 2 +- cmd/reflector.go | 196 ++++++++++++++++++++++++--------------- cmd/start.go | 2 +- cmd/test.go | 2 +- cmd/upload.go | 2 +- peer/server_test.go | 2 +- reflector/server_test.go | 8 +- store/caching.go | 22 ++--- store/caching_test.go | 20 ++-- store/cloudfront.go | 117 +++++++++++------------ store/disk.go | 30 +++--- store/lru_test.go | 6 +- store/memory.go | 20 ++-- store/memory_test.go | 4 +- store/s3.go | 22 ++--- 16 files changed, 252 insertions(+), 207 deletions(-) diff --git a/cmd/getstream.go b/cmd/getstream.go index 355824c..ae586d7 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -28,9 +28,9 @@ func getStreamCmd(cmd *cobra.Command, args []string) { addr := args[0] sdHash := args[1] - s := store.NewCachingBlobStore( + s := store.NewCachingStore( peer.NewStore(peer.StoreOpts{Address: addr}), - store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 1000, 2), + store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2), ) wd, err := os.Getwd() diff --git a/cmd/peer.go b/cmd/peer.go index 88c49e0..3cb2a19 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -29,7 +29,7 @@ func init() { func peerCmd(cmd *cobra.Command, args []string) { var err error - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) peerServer := peer.NewServer(s3) if !peerNoDB { diff --git a/cmd/reflector.go b/cmd/reflector.go index 7a97e72..ddac74b 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" "time" @@ -16,23 +17,24 @@ import ( "github.com/lbryio/reflector.go/store" log "github.com/sirupsen/logrus" + "github.com/spf13/cast" "github.com/spf13/cobra" ) var ( - tcpPeerPort int - http3PeerPort int - receiverPort int - metricsPort int - disableUploads bool - disableBlocklist bool - proxyAddress string - proxyPort string - proxyProtocol string - useDB bool - cloudFrontEndpoint string - reflectorCmdCacheDir string - reflectorCmdCacheMaxBlobs int + tcpPeerPort int + http3PeerPort int + receiverPort int + metricsPort int + disableUploads bool + disableBlocklist bool + proxyAddress string + proxyPort string + proxyProtocol string + useDB bool + cloudFrontEndpoint string + reflectorCmdDiskCache string + reflectorCmdMemCache int ) func init() { @@ -52,96 +54,136 @@ func init() { cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") - cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") - cmd.Flags().IntVar(&reflectorCmdCacheMaxBlobs, "cache-max-blobs", 0, "if cache is enabled, this option sets the max blobs the cache will hold") + cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "", + "enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'") + cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") rootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - var blobStore store.BlobStore - if proxyAddress != "" { - switch proxyProtocol { - case "tcp": - blobStore = peer.NewStore(peer.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - case "http3": - blobStore = http3.NewStore(http3.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - default: - log.Fatalf("specified protocol is not recognized: %s", proxyProtocol) - } - } else { - s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - if cloudFrontEndpoint != "" { - blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store) - } else { - blobStore = s3Store - } - } + // the blocklist logic requires the db backed store to be the outer-most store + underlyingStore := setupStore() + outerStore := wrapWithCache(underlyingStore) - var err error - var reflectorServer *reflector.Server + if !disableUploads { + reflectorServer := reflector.NewServer(underlyingStore) + reflectorServer.Timeout = 3 * time.Minute + reflectorServer.EnableBlocklist = !disableBlocklist - if useDB { - db := new(db.SQL) - db.TrackAccessTime = true - err = db.Connect(globalConfig.DBConn) + err := reflectorServer.Start(":" + strconv.Itoa(receiverPort)) if err != nil { log.Fatal(err) } - - blobStore = store.NewDBBackedStore(blobStore, db) - - //this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work.... - //having this here prevents uploaded blobs from being stored in the disk cache - if !disableUploads { - reflectorServer = reflector.NewServer(blobStore) - reflectorServer.Timeout = 3 * time.Minute - reflectorServer.EnableBlocklist = !disableBlocklist - - err = reflectorServer.Start(":" + strconv.Itoa(receiverPort)) - if err != nil { - log.Fatal(err) - } - } + defer reflectorServer.Shutdown() } - if reflectorCmdCacheDir != "" { - err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm) - if err != nil { - log.Fatal(err) - } - blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, reflectorCmdCacheMaxBlobs, 2)) - } - - peerServer := peer.NewServer(blobStore) - err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) + peerServer := peer.NewServer(outerStore) + err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) if err != nil { log.Fatal(err) } + defer peerServer.Shutdown() - http3PeerServer := http3.NewServer(blobStore) + http3PeerServer := http3.NewServer(outerStore) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) if err != nil { log.Fatal(err) } + defer http3PeerServer.Shutdown() metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() + defer metricsServer.Shutdown() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan - metricsServer.Shutdown() - peerServer.Shutdown() - http3PeerServer.Shutdown() - if reflectorServer != nil { - reflectorServer.Shutdown() - } + // deferred shutdowns happen now +} + +func setupStore() store.BlobStore { + var s store.BlobStore + + if proxyAddress != "" { + switch proxyProtocol { + case "tcp": + s = peer.NewStore(peer.StoreOpts{ + Address: proxyAddress + ":" + proxyPort, + Timeout: 30 * time.Second, + }) + case "http3": + s = http3.NewStore(http3.StoreOpts{ + Address: proxyAddress + ":" + proxyPort, + Timeout: 30 * time.Second, + }) + default: + log.Fatalf("protocol is not recognized: %s", proxyProtocol) + } + } else { + s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + if cloudFrontEndpoint != "" { + s = store.NewCloudFrontStore(s3Store, cloudFrontEndpoint) + } else { + s = s3Store + } + } + + if useDB { + db := new(db.SQL) + db.TrackAccessTime = true + err := db.Connect(globalConfig.DBConn) + if err != nil { + log.Fatal(err) + } + + s = store.NewDBBackedStore(s, db) + } + + return s +} + +func wrapWithCache(s store.BlobStore) store.BlobStore { + wrapped := s + + diskCacheMaxSize, diskCachePath := diskCacheParams() + if diskCacheMaxSize > 0 { + err := os.MkdirAll(diskCachePath, os.ModePerm) + if err != nil { + log.Fatal(err) + } + wrapped = store.NewCachingStore(wrapped, + store.NewLRUStore(store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize)) + } + + if reflectorCmdMemCache > 0 { + wrapped = store.NewCachingStore(wrapped, + store.NewLRUStore(store.NewMemoryStore(), reflectorCmdMemCache)) + } + + return wrapped +} + +func diskCacheParams() (int, string) { + if reflectorCmdDiskCache == "" { + return 0, "" + } + + parts := strings.Split(reflectorCmdDiskCache, ":") + if len(parts) != 2 { + log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string") + } + + maxSize := cast.ToInt(parts[0]) + if maxSize <= 0 { + log.Fatalf("--disk-cache max size must be more than 0") + } + + path := parts[1] + if len(path) == 0 || path[0] != '/' { + log.Fatalf("--disk-cache path must start with '/'") + } + + return maxSize, path } diff --git a/cmd/start.go b/cmd/start.go index a82d597..4d4a56b 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) { db := new(db.SQL) err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) comboStore := store.NewDBBackedStore(s3, db) conf := prism.DefaultConf() diff --git a/cmd/test.go b/cmd/test.go index 051ddf7..e611b22 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -29,7 +29,7 @@ func init() { func testCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - memStore := store.NewMemoryBlobStore() + memStore := store.NewMemoryStore() reflectorServer := reflector.NewServer(memStore) reflectorServer.Timeout = 3 * time.Minute diff --git a/cmd/upload.go b/cmd/upload.go index 006f007..636e824 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { checkErr(err) st := store.NewDBBackedStore( - store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), + store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), db) uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload) diff --git a/peer/server_test.go b/peer/server_test.go index 2e1d9bc..7c9b5f2 100644 --- a/peer/server_test.go +++ b/peer/server_test.go @@ -34,7 +34,7 @@ var availabilityRequests = []pair{ } func getServer(t *testing.T, withBlobs bool) *Server { - st := store.NewMemoryBlobStore() + st := store.NewMemoryStore() if withBlobs { for k, v := range blobs { err := st.Put(k, v) diff --git a/reflector/server_test.go b/reflector/server_test.go index 1dd7e83..1039417 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) { t.Fatal(err) } - srv := NewServer(store.NewMemoryBlobStore()) + srv := NewServer(store.NewMemoryStore()) err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { t.Fatal(err) @@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) { t.Fatal(err) } - srv := NewServer(store.NewMemoryBlobStore()) + srv := NewServer(store.NewMemoryStore()) srv.Timeout = testTimeout err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { @@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) { //} type mockPartialStore struct { - *store.MemoryBlobStore + *store.MemoryStore missing []string } @@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) { missing[i] = bits.Rand().String() } - st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing}) + st := store.BlobStore(&mockPartialStore{MemoryStore: store.NewMemoryStore(), missing: missing}) if _, ok := st.(neededBlobChecker); !ok { t.Fatal("mock does not implement the relevant interface") } diff --git a/store/caching.go b/store/caching.go index 4dd5b6b..e4ff6b3 100644 --- a/store/caching.go +++ b/store/caching.go @@ -11,22 +11,22 @@ import ( "golang.org/x/sync/singleflight" ) -// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance. +// CachingStore combines two stores, typically a local and a remote store, to improve performance. // Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they // are retrieved from the origin and cached. Puts are cached and also forwarded to the origin. -type CachingBlobStore struct { +type CachingStore struct { origin, cache BlobStore sf *singleflight.Group } -// NewCachingBlobStore makes a new caching disk store and returns a pointer to it. -func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore { - return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)} +// NewCachingStore makes a new caching disk store and returns a pointer to it. +func NewCachingStore(origin, cache BlobStore) *CachingStore { + return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)} } // Has checks the cache and then the origin for a hash. It returns true if either store has it. -func (c *CachingBlobStore) Has(hash string) (bool, error) { +func (c *CachingStore) Has(hash string) (bool, error) { has, err := c.cache.Has(hash) if has || err != nil { return has, err @@ -36,7 +36,7 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) { // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // from the origin, it is also stored in the cache. -func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { +func (c *CachingStore) Get(hash string) (stream.Blob, error) { start := time.Now() blob, err := c.cache.Get(hash) if err == nil || !errors.Is(err, ErrBlobNotFound) { @@ -52,7 +52,7 @@ func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { // getFromOrigin ensures that only one Get per hash is sent to the origin at a time, // thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem -func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) { +func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) { metrics.CacheWaitingRequestsCount.Inc() defer metrics.CacheWaitingRequestsCount.Dec() originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) { @@ -78,7 +78,7 @@ func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) { } // Put stores the blob in the origin and the cache -func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { +func (c *CachingStore) Put(hash string, blob stream.Blob) error { err := c.origin.Put(hash, blob) if err != nil { return err @@ -87,7 +87,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob in the origin and the cache -func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { +func (c *CachingStore) PutSD(hash string, blob stream.Blob) error { err := c.origin.PutSD(hash, blob) if err != nil { return err @@ -96,7 +96,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { } // Delete deletes the blob from the origin and the cache -func (c *CachingBlobStore) Delete(hash string) error { +func (c *CachingStore) Delete(hash string) error { err := c.origin.Delete(hash) if err != nil { return err diff --git a/store/caching_test.go b/store/caching_test.go index 13f44a2..f56c833 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -10,9 +10,9 @@ import ( ) func TestCachingBlobStore_Put(t *testing.T) { - origin := NewMemoryBlobStore() - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) + origin := NewMemoryStore() + cache := NewMemoryStore() + s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -40,9 +40,9 @@ func TestCachingBlobStore_Put(t *testing.T) { } func TestCachingBlobStore_CacheMiss(t *testing.T) { - origin := NewMemoryBlobStore() - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) + origin := NewMemoryStore() + cache := NewMemoryStore() + s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -79,8 +79,8 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) { func TestCachingBlobStore_ThunderingHerd(t *testing.T) { storeDelay := 100 * time.Millisecond origin := NewSlowBlobStore(storeDelay) - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) + cache := NewMemoryStore() + s := NewCachingStore(origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -129,13 +129,13 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) { // SlowBlobStore adds a delay to each request type SlowBlobStore struct { - mem *MemoryBlobStore + mem *MemoryStore delay time.Duration } func NewSlowBlobStore(delay time.Duration) *SlowBlobStore { return &SlowBlobStore{ - mem: NewMemoryBlobStore(), + mem: NewMemoryStore(), delay: delay, } } diff --git a/store/cloudfront.go b/store/cloudfront.go index fbfafec..c4adb3f 100644 --- a/store/cloudfront.go +++ b/store/cloudfront.go @@ -1,109 +1,112 @@ package store import ( + "io" "io/ioutil" "net/http" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + log "github.com/sirupsen/logrus" ) -// CloudFrontBlobStore is an CloudFront backed store (retrieval only) -type CloudFrontBlobStore struct { - cfEndpoint string - s3Store *S3BlobStore +// CloudFrontStore wraps an S3 store. Reads go to Cloudfront, writes go to S3. +type CloudFrontStore struct { + s3 *S3Store + endpoint string // cloudflare endpoint } -// NewS3BlobStore returns an initialized S3 store pointer. -func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore { - return &CloudFrontBlobStore{ - cfEndpoint: cloudFrontEndpoint, - s3Store: S3Store, +// NewCloudFrontStore returns an initialized CloudFrontStore store pointer. +// NOTE: It panics if S3Store is nil. +func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore { + if s3 == nil { + panic("S3Store must not be nil") + } + + return &CloudFrontStore{ + endpoint: cfEndpoint, + s3: s3, } } -// Has returns T/F or Error if the store contains the blob. -func (s *CloudFrontBlobStore) Has(hash string) (bool, error) { - url := s.cfEndpoint + hash - - req, err := http.NewRequest("HEAD", url, nil) +// Has checks if the hash is in the store. +func (c *CloudFrontStore) Has(hash string) (bool, error) { + status, body, err := c.cfRequest(http.MethodHead, hash) if err != nil { - return false, errors.Err(err) + return false, err } - req.Header.Add("User-Agent", "reflector.go/"+meta.Version) - res, err := http.DefaultClient.Do(req) - if err != nil { - return false, errors.Err(err) - } - defer res.Body.Close() + defer body.Close() - switch res.StatusCode { + switch status { case http.StatusNotFound, http.StatusForbidden: return false, nil case http.StatusOK: return true, nil default: - return false, errors.Err(res.Status) + return false, errors.Err("unexpected status %d", status) } } -// Get returns the blob slice if present or errors. -func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) { - url := s.cfEndpoint + hash +// Get gets the blob from Cloudfront. +func (c *CloudFrontStore) Get(hash string) (stream.Blob, error) { log.Debugf("Getting %s from S3", hash[:8]) defer func(t time.Time) { log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) }(time.Now()) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, errors.Err(err) - } - req.Header.Add("User-Agent", "reflector.go/"+meta.Version) - res, err := http.DefaultClient.Do(req) - if err != nil { - return nil, errors.Err(err) - } - defer res.Body.Close() - switch res.StatusCode { + status, body, err := c.cfRequest(http.MethodGet, hash) + if err != nil { + return nil, err + } + defer body.Close() + + switch status { case http.StatusNotFound, http.StatusForbidden: return nil, errors.Err(ErrBlobNotFound) case http.StatusOK: - b, err := ioutil.ReadAll(res.Body) + b, err := ioutil.ReadAll(body) if err != nil { return nil, errors.Err(err) } metrics.MtrInBytesS3.Add(float64(len(b))) return b, nil default: - return nil, errors.Err(res.Status) + return nil, errors.Err("unexpected status %d", status) } } -// Put stores the blob on S3 or errors if S3 store is not present. -func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error { - if s.s3Store != nil { - return s.s3Store.Put(hash, blob) +func (c *CloudFrontStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { + url := c.endpoint + hash + req, err := http.NewRequest(method, url, nil) + if err != nil { + return 0, nil, errors.Err(err) } - return errors.Err("not implemented in cloudfront store") + req.Header.Add("User-Agent", "reflector.go/"+meta.Version) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return 0, nil, errors.Err(err) + } + + return res.StatusCode, res.Body, nil } -// PutSD stores the sd blob on S3 or errors if S3 store is not present. -func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error { - if s.s3Store != nil { - return s.s3Store.PutSD(hash, blob) - } - return errors.Err("not implemented in cloudfront store") +// Put stores the blob on S3 +func (c *CloudFrontStore) Put(hash string, blob stream.Blob) error { + return c.s3.Put(hash, blob) } -func (s *CloudFrontBlobStore) Delete(hash string) error { - if s.s3Store != nil { - return s.s3Store.Delete(hash) - } - return errors.Err("not implemented in cloudfront store") +// PutSD stores the sd blob on S3 +func (c *CloudFrontStore) PutSD(hash string, blob stream.Blob) error { + return c.s3.PutSD(hash, blob) +} + +// Delete deletes the blob from S3 +func (c *CloudFrontStore) Delete(hash string) error { + return c.s3.Delete(hash) } diff --git a/store/disk.go b/store/disk.go index dfbc0f2..71c8058 100644 --- a/store/disk.go +++ b/store/disk.go @@ -12,8 +12,8 @@ import ( "github.com/spf13/afero" ) -// DiskBlobStore stores blobs on a local disk -type DiskBlobStore struct { +// DiskStore stores blobs on a local disk +type DiskStore struct { // the location of blobs on disk blobDir string // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. @@ -26,31 +26,31 @@ type DiskBlobStore struct { initialized bool } -// NewDiskBlobStore returns an initialized file disk store pointer. -func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { - return &DiskBlobStore{ +// NewDiskStore returns an initialized file disk store pointer. +func NewDiskStore(dir string, prefixLength int) *DiskStore { + return &DiskStore{ blobDir: dir, prefixLength: prefixLength, fs: afero.NewOsFs(), } } -func (d *DiskBlobStore) dir(hash string) string { +func (d *DiskStore) dir(hash string) string { if d.prefixLength <= 0 || len(hash) < d.prefixLength { return d.blobDir } return path.Join(d.blobDir, hash[:d.prefixLength]) } -func (d *DiskBlobStore) path(hash string) string { +func (d *DiskStore) path(hash string) string { return path.Join(d.dir(hash), hash) } -func (d *DiskBlobStore) ensureDirExists(dir string) error { +func (d *DiskStore) ensureDirExists(dir string) error { return errors.Err(d.fs.MkdirAll(dir, 0755)) } -func (d *DiskBlobStore) initOnce() error { +func (d *DiskStore) initOnce() error { if d.initialized { return nil } @@ -65,7 +65,7 @@ func (d *DiskBlobStore) initOnce() error { } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. -func (d *DiskBlobStore) Has(hash string) (bool, error) { +func (d *DiskStore) Has(hash string) (bool, error) { err := d.initOnce() if err != nil { return false, err @@ -82,7 +82,7 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) { } // Get returns the blob or an error if the blob doesn't exist. -func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { +func (d *DiskStore) Get(hash string) (stream.Blob, error) { err := d.initOnce() if err != nil { return nil, err @@ -102,7 +102,7 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob on disk -func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { +func (d *DiskStore) Put(hash string, blob stream.Blob) error { err := d.initOnce() if err != nil { return err @@ -118,12 +118,12 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob on the disk -func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error { +func (d *DiskStore) PutSD(hash string, blob stream.Blob) error { return d.Put(hash, blob) } // Delete deletes the blob from the store -func (d *DiskBlobStore) Delete(hash string) error { +func (d *DiskStore) Delete(hash string) error { err := d.initOnce() if err != nil { return err @@ -142,7 +142,7 @@ func (d *DiskBlobStore) Delete(hash string) error { } // list returns a slice of blobs that already exist in the blobDir -func (d *DiskBlobStore) list() ([]string, error) { +func (d *DiskStore) list() ([]string, error) { dirs, err := afero.ReadDir(d.fs, d.blobDir) if err != nil { return nil, err diff --git a/store/lru_test.go b/store/lru_test.go index 92ddb7d..c95ca6b 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -14,13 +14,13 @@ import ( const cacheMaxBlobs = 3 -func testLRUStore() (*LRUStore, *DiskBlobStore) { - d := NewDiskBlobStore("/", 2) +func testLRUStore() (*LRUStore, *DiskStore) { + d := NewDiskStore("/", 2) d.fs = afero.NewMemMapFs() return NewLRUStore(d, 3), d } -func countOnDisk(t *testing.T, disk *DiskBlobStore) int { +func countOnDisk(t *testing.T, disk *DiskStore) int { t.Helper() count := 0 diff --git a/store/memory.go b/store/memory.go index 27dbc0f..c50f282 100644 --- a/store/memory.go +++ b/store/memory.go @@ -5,25 +5,25 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -// MemoryBlobStore is an in memory only blob store with no persistence. -type MemoryBlobStore struct { +// MemoryStore is an in memory only blob store with no persistence. +type MemoryStore struct { blobs map[string]stream.Blob } -func NewMemoryBlobStore() *MemoryBlobStore { - return &MemoryBlobStore{ +func NewMemoryStore() *MemoryStore { + return &MemoryStore{ blobs: make(map[string]stream.Blob), } } // Has returns T/F if the blob is currently stored. It will never error. -func (m *MemoryBlobStore) Has(hash string) (bool, error) { +func (m *MemoryStore) Has(hash string) (bool, error) { _, ok := m.blobs[hash] return ok, nil } // Get returns the blob byte slice if present and errors if the blob is not found. -func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { +func (m *MemoryStore) Get(hash string) (stream.Blob, error) { blob, ok := m.blobs[hash] if !ok { return nil, errors.Err(ErrBlobNotFound) @@ -32,23 +32,23 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob in memory -func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error { +func (m *MemoryStore) Put(hash string, blob stream.Blob) error { m.blobs[hash] = blob return nil } // PutSD stores the sd blob in memory -func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error { +func (m *MemoryStore) PutSD(hash string, blob stream.Blob) error { return m.Put(hash, blob) } // Delete deletes the blob from the store -func (m *MemoryBlobStore) Delete(hash string) error { +func (m *MemoryStore) Delete(hash string) error { delete(m.blobs, hash) return nil } // Debug returns the blobs in memory. It's useful for testing and debugging. -func (m *MemoryBlobStore) Debug() map[string]stream.Blob { +func (m *MemoryStore) Debug() map[string]stream.Blob { return m.blobs } diff --git a/store/memory_test.go b/store/memory_test.go index e359803..892c735 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -8,7 +8,7 @@ import ( ) func TestMemoryBlobStore_Put(t *testing.T) { - s := NewMemoryBlobStore() + s := NewMemoryStore() blob := []byte("abcdefg") err := s.Put("abc", blob) if err != nil { @@ -17,7 +17,7 @@ func TestMemoryBlobStore_Put(t *testing.T) { } func TestMemoryBlobStore_Get(t *testing.T) { - s := NewMemoryBlobStore() + s := NewMemoryStore() hash := "abc" blob := []byte("abcdefg") err := s.Put(hash, blob) diff --git a/store/s3.go b/store/s3.go index e08b910..2189ef1 100644 --- a/store/s3.go +++ b/store/s3.go @@ -18,8 +18,8 @@ import ( log "github.com/sirupsen/logrus" ) -// S3BlobStore is an S3 store -type S3BlobStore struct { +// S3Store is an S3 store +type S3Store struct { awsID string awsSecret string region string @@ -28,9 +28,9 @@ type S3BlobStore struct { session *session.Session } -// NewS3BlobStore returns an initialized S3 store pointer. -func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { - return &S3BlobStore{ +// NewS3Store returns an initialized S3 store pointer. +func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store { + return &S3Store{ awsID: awsID, awsSecret: awsSecret, region: region, @@ -38,7 +38,7 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { } } -func (s *S3BlobStore) initOnce() error { +func (s *S3Store) initOnce() error { if s.session != nil { return nil } @@ -56,7 +56,7 @@ func (s *S3BlobStore) initOnce() error { } // Has returns T/F or Error ( from S3 ) if the store contains the blob. -func (s *S3BlobStore) Has(hash string) (bool, error) { +func (s *S3Store) Has(hash string) (bool, error) { err := s.initOnce() if err != nil { return false, err @@ -77,7 +77,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) { } // Get returns the blob slice if present or errors on S3. -func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { +func (s *S3Store) Get(hash string) (stream.Blob, error) { //Todo-Need to handle error for blob doesn't exist for consistency. err := s.initOnce() if err != nil { @@ -110,7 +110,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { +func (s *S3Store) Put(hash string, blob stream.Blob) error { err := s.initOnce() if err != nil { return err @@ -133,12 +133,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error { +func (s *S3Store) PutSD(hash string, blob stream.Blob) error { //Todo - handle missing stream for consistency return s.Put(hash, blob) } -func (s *S3BlobStore) Delete(hash string) error { +func (s *S3Store) Delete(hash string) error { err := s.initOnce() if err != nil { return err