reflector.go/store/cloudfront_ro.go

109 lines
3 KiB
Go
Raw Permalink Normal View History

2020-09-09 00:18:07 +02:00
package store
import (
"io"
2020-09-09 00:18:07 +02:00
"net/http"
"time"
2021-01-09 05:08:20 +01:00
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/shared"
2021-07-20 02:09:14 +02:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
2020-09-09 00:18:07 +02:00
log "github.com/sirupsen/logrus"
)
2020-10-28 18:59:02 +01:00
// CloudFrontROStore reads from cloudfront. All writes panic.
type CloudFrontROStore struct {
endpoint string // cloudflare endpoint
2020-09-09 00:18:07 +02:00
}
2020-10-28 18:59:02 +01:00
// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer.
func NewCloudFrontROStore(endpoint string) *CloudFrontROStore {
return &CloudFrontROStore{endpoint: endpoint}
}
2020-10-28 18:59:02 +01:00
const nameCloudFrontRO = "cloudfront_ro"
2020-10-22 19:49:02 +02:00
// Name is the cache type name
2020-10-28 18:59:02 +01:00
func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO }
2020-10-22 19:49:02 +02:00
// Has checks if the hash is in the store.
2020-10-28 18:59:02 +01:00
func (c *CloudFrontROStore) Has(hash string) (bool, error) {
status, body, err := c.cfRequest(http.MethodHead, hash)
2020-09-09 00:18:07 +02:00
if err != nil {
return false, err
2020-09-09 00:18:07 +02:00
}
defer func() { _ = body.Close() }()
switch status {
2020-09-09 00:18:07 +02:00
case http.StatusNotFound, http.StatusForbidden:
return false, nil
case http.StatusOK:
return true, nil
default:
return false, errors.Err("unexpected status %d", status)
2020-09-09 00:18:07 +02:00
}
}
// Get gets the blob from Cloudfront.
2021-01-09 05:08:20 +01:00
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
2020-09-09 00:18:07 +02:00
log.Debugf("Getting %s from S3", hash[:8])
2021-01-09 05:08:20 +01:00
start := time.Now()
2020-09-09 00:18:07 +02:00
defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
2021-01-09 05:08:20 +01:00
}(start)
status, body, err := c.cfRequest(http.MethodGet, hash)
2020-09-09 00:18:07 +02:00
if err != nil {
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), err
2020-09-09 00:18:07 +02:00
}
defer func() { _ = body.Close() }()
switch status {
2020-09-09 00:18:07 +02:00
case http.StatusNotFound, http.StatusForbidden:
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(ErrBlobNotFound)
2020-09-09 00:18:07 +02:00
case http.StatusOK:
b, err := io.ReadAll(body)
2020-09-09 00:18:07 +02:00
if err != nil {
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err)
2020-09-09 00:18:07 +02:00
}
metrics.MtrInBytesS3.Add(float64(len(b)))
2021-01-09 05:08:20 +01:00
return b, shared.NewBlobTrace(time.Since(start), c.Name()), nil
2020-09-09 00:18:07 +02:00
default:
2021-01-09 05:08:20 +01:00
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err("unexpected status %d", status)
2020-09-09 00:18:07 +02:00
}
}
2020-10-28 18:59:02 +01:00
func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) {
url := c.endpoint + hash
req, err := http.NewRequest(method, url, nil)
if err != nil {
return 0, nil, errors.Err(err)
2020-09-09 00:18:07 +02:00
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version())
2020-09-09 00:18:07 +02:00
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, nil, errors.Err(err)
2020-09-09 00:18:07 +02:00
}
return res.StatusCode, res.Body, nil
2020-09-09 00:18:07 +02:00
}
2020-10-28 18:59:02 +01:00
func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error {
2021-02-23 15:23:46 +01:00
return errors.Err(shared.ErrNotImplemented)
}
2020-10-28 18:59:02 +01:00
func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error {
2021-02-23 15:23:46 +01:00
return errors.Err(shared.ErrNotImplemented)
}
2020-10-28 18:59:02 +01:00
func (c *CloudFrontROStore) Delete(_ string) error {
2021-02-23 15:23:46 +01:00
return errors.Err(shared.ErrNotImplemented)
2020-09-09 00:18:07 +02:00
}
// Shutdown shuts down the store gracefully
func (c *CloudFrontROStore) Shutdown() {
}