From 124d4065c249f0425cfbdbacd7d5536eebe0ba4a Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Wed, 28 Oct 2020 13:59:02 -0400 Subject: [PATCH] split cloudfront into RO and RW stores --- cmd/reflector.go | 2 +- store/{cloudfront.go => cloudfront_ro.go} | 44 ++++++++------------ store/cloudfront_rw.go | 50 +++++++++++++++++++++++ 3 files changed, 67 insertions(+), 29 deletions(-) rename store/{cloudfront.go => cloudfront_ro.go} (60%) create mode 100644 store/cloudfront_rw.go diff --git a/cmd/reflector.go b/cmd/reflector.go index d46fd09..ff379ad 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -124,7 +124,7 @@ func setupStore() store.BlobStore { } else { s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) if cloudFrontEndpoint != "" { - s = store.NewCloudFrontStore(s3Store, cloudFrontEndpoint) + s = store.NewCloudFrontRWStore(store.NewCloudFrontROStore(cloudFrontEndpoint), s3Store) } else { s = s3Store } diff --git a/store/cloudfront.go b/store/cloudfront_ro.go similarity index 60% rename from store/cloudfront.go rename to store/cloudfront_ro.go index 32f1246..a914285 100644 --- a/store/cloudfront.go +++ b/store/cloudfront_ro.go @@ -15,32 +15,23 @@ import ( log "github.com/sirupsen/logrus" ) -// CloudFrontStore wraps an S3 store. Reads go to Cloudfront, writes go to S3. -type CloudFrontStore struct { - s3 *S3Store +// CloudFrontROStore reads from cloudfront. All writes panic. +type CloudFrontROStore struct { endpoint string // cloudflare endpoint } -// NewCloudFrontStore returns an initialized CloudFrontStore store pointer. -// NOTE: It panics if S3Store is nil. -func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore { - if s3 == nil { - panic("S3Store must not be nil") - } - - return &CloudFrontStore{ - endpoint: cfEndpoint, - s3: s3, - } +// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer. +func NewCloudFrontROStore(endpoint string) *CloudFrontROStore { + return &CloudFrontROStore{endpoint: endpoint} } -const nameCloudFront = "cloudfront" +const nameCloudFrontRO = "cloudfront_ro" // Name is the cache type name -func (c *CloudFrontStore) Name() string { return nameCloudFront } +func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO } // Has checks if the hash is in the store. -func (c *CloudFrontStore) Has(hash string) (bool, error) { +func (c *CloudFrontROStore) Has(hash string) (bool, error) { status, body, err := c.cfRequest(http.MethodHead, hash) if err != nil { return false, err @@ -58,7 +49,7 @@ func (c *CloudFrontStore) Has(hash string) (bool, error) { } // Get gets the blob from Cloudfront. -func (c *CloudFrontStore) Get(hash string) (stream.Blob, error) { +func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) { log.Debugf("Getting %s from S3", hash[:8]) defer func(t time.Time) { log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) @@ -85,7 +76,7 @@ func (c *CloudFrontStore) Get(hash string) (stream.Blob, error) { } } -func (c *CloudFrontStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { +func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { url := c.endpoint + hash req, err := http.NewRequest(method, url, nil) if err != nil { @@ -101,17 +92,14 @@ func (c *CloudFrontStore) cfRequest(method, hash string) (int, io.ReadCloser, er return res.StatusCode, res.Body, nil } -// Put stores the blob on S3 -func (c *CloudFrontStore) Put(hash string, blob stream.Blob) error { - return c.s3.Put(hash, blob) +func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") } -// PutSD stores the sd blob on S3 -func (c *CloudFrontStore) PutSD(hash string, blob stream.Blob) error { - return c.s3.PutSD(hash, blob) +func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") } -// Delete deletes the blob from S3 -func (c *CloudFrontStore) Delete(hash string) error { - return c.s3.Delete(hash) +func (c *CloudFrontROStore) Delete(_ string) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") } diff --git a/store/cloudfront_rw.go b/store/cloudfront_rw.go new file mode 100644 index 0000000..ee771da --- /dev/null +++ b/store/cloudfront_rw.go @@ -0,0 +1,50 @@ +package store + +import ( + "github.com/lbryio/lbry.go/v2/stream" +) + +// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3. +type CloudFrontRWStore struct { + cf *CloudFrontROStore + s3 *S3Store +} + +// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer. +// NOTE: It panics if either argument is nil. +func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore { + if cf == nil || s3 == nil { + panic("both stores must be set") + } + return &CloudFrontRWStore{cf: cf, s3: s3} +} + +const nameCloudFrontRW = "cloudfront_rw" + +// Name is the cache type name +func (c *CloudFrontRWStore) Name() string { return nameCloudFrontRW } + +// Has checks if the hash is in the store. +func (c *CloudFrontRWStore) Has(hash string) (bool, error) { + return c.cf.Has(hash) +} + +// Get gets the blob from Cloudfront. +func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) { + return c.cf.Get(hash) +} + +// Put stores the blob on S3 +func (c *CloudFrontRWStore) Put(hash string, blob stream.Blob) error { + return c.s3.Put(hash, blob) +} + +// PutSD stores the sd blob on S3 +func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error { + return c.s3.PutSD(hash, blob) +} + +// Delete deletes the blob from S3 +func (c *CloudFrontRWStore) Delete(hash string) error { + return c.s3.Delete(hash) +}