add cloudfront support
This commit is contained in:
parent
e8b98bc862
commit
fc5f5ff7d3
2 changed files with 115 additions and 1 deletions
|
@ -29,6 +29,7 @@ var proxyAddress string
|
|||
var proxyPort string
|
||||
var proxyProtocol string
|
||||
var useDB bool
|
||||
var cloudFrontEndpoint string
|
||||
|
||||
func init() {
|
||||
var cmd = &cobra.Command{
|
||||
|
@ -40,6 +41,7 @@ func init() {
|
|||
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
|
||||
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
|
||||
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
|
||||
cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval")
|
||||
cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from")
|
||||
cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol")
|
||||
cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from")
|
||||
|
@ -69,7 +71,12 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
|
|||
log.Fatalf("specified protocol is not recognized: %s", proxyProtocol)
|
||||
}
|
||||
} else {
|
||||
blobStore = store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
if cloudFrontEndpoint != "" {
|
||||
blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store)
|
||||
} else {
|
||||
blobStore = s3Store
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
|
|
107
store/cloudfront.go
Normal file
107
store/cloudfront.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// CloudFrontBlobStore is an CloudFront backed store (retrieval only)
|
||||
type CloudFrontBlobStore struct {
|
||||
cfEndpoint string
|
||||
s3Store *S3BlobStore
|
||||
}
|
||||
|
||||
// NewS3BlobStore returns an initialized S3 store pointer.
|
||||
func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore {
|
||||
return &CloudFrontBlobStore{
|
||||
cfEndpoint: cloudFrontEndpoint,
|
||||
s3Store: S3Store,
|
||||
}
|
||||
}
|
||||
|
||||
// Has returns T/F or Error if the store contains the blob.
|
||||
func (s *CloudFrontBlobStore) Has(hash string) (bool, error) {
|
||||
url := s.cfEndpoint + hash
|
||||
|
||||
req, err := http.NewRequest("HEAD", url, nil)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return false, nil
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
default:
|
||||
return false, errors.Err(res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the blob slice if present or errors.
|
||||
func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
url := s.cfEndpoint + hash
|
||||
log.Debugf("Getting %s from S3", hash[:8])
|
||||
defer func(t time.Time) {
|
||||
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
||||
}(time.Now())
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
case http.StatusOK:
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
return b, nil
|
||||
default:
|
||||
return nil, errors.Err(res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// Put stores the blob on S3 or errors if S3 store is not present.
|
||||
func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.Put(hash, blob)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob on S3 or errors if S3 store is not present.
|
||||
func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.PutSD(hash, blob)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
||||
|
||||
func (s *CloudFrontBlobStore) Delete(hash string) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.Delete(hash)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
Loading…
Add table
Reference in a new issue