rename the stores, add caching to reflector cmd

This commit is contained in:
Alex Grintsvayg 2020-10-22 13:12:31 -04:00
parent c6b53792c8
commit c9fa04043c
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
16 changed files with 252 additions and 207 deletions

View file

@ -28,9 +28,9 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
addr := args[0] addr := args[0]
sdHash := args[1] sdHash := args[1]
s := store.NewCachingBlobStore( s := store.NewCachingStore(
peer.NewStore(peer.StoreOpts{Address: addr}), peer.NewStore(peer.StoreOpts{Address: addr}),
store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 1000, 2), store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
) )
wd, err := os.Getwd() wd, err := os.Getwd()

View file

@ -29,7 +29,7 @@ func init() {
func peerCmd(cmd *cobra.Command, args []string) { func peerCmd(cmd *cobra.Command, args []string) {
var err error var err error
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
peerServer := peer.NewServer(s3) peerServer := peer.NewServer(s3)
if !peerNoDB { if !peerNoDB {

View file

@ -4,6 +4,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"strconv" "strconv"
"strings"
"syscall" "syscall"
"time" "time"
@ -16,23 +17,24 @@ import (
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cast"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
var ( var (
tcpPeerPort int tcpPeerPort int
http3PeerPort int http3PeerPort int
receiverPort int receiverPort int
metricsPort int metricsPort int
disableUploads bool disableUploads bool
disableBlocklist bool disableBlocklist bool
proxyAddress string proxyAddress string
proxyPort string proxyPort string
proxyProtocol string proxyProtocol string
useDB bool useDB bool
cloudFrontEndpoint string cloudFrontEndpoint string
reflectorCmdCacheDir string reflectorCmdDiskCache string
reflectorCmdCacheMaxBlobs int reflectorCmdMemCache int
) )
func init() { func init() {
@ -52,96 +54,136 @@ func init() {
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
cmd.Flags().IntVar(&reflectorCmdCacheMaxBlobs, "cache-max-blobs", 0, "if cache is enabled, this option sets the max blobs the cache will hold") "enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
} }
func reflectorCmd(cmd *cobra.Command, args []string) { func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString()) log.Printf("reflector %s", meta.VersionString())
var blobStore store.BlobStore // the blocklist logic requires the db backed store to be the outer-most store
if proxyAddress != "" { underlyingStore := setupStore()
switch proxyProtocol { outerStore := wrapWithCache(underlyingStore)
case "tcp":
blobStore = peer.NewStore(peer.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
case "http3":
blobStore = http3.NewStore(http3.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
default:
log.Fatalf("specified protocol is not recognized: %s", proxyProtocol)
}
} else {
s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
if cloudFrontEndpoint != "" {
blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store)
} else {
blobStore = s3Store
}
}
var err error if !disableUploads {
var reflectorServer *reflector.Server reflectorServer := reflector.NewServer(underlyingStore)
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = !disableBlocklist
if useDB { err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
db := new(db.SQL)
db.TrackAccessTime = true
err = db.Connect(globalConfig.DBConn)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer reflectorServer.Shutdown()
blobStore = store.NewDBBackedStore(blobStore, db)
//this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work....
//having this here prevents uploaded blobs from being stored in the disk cache
if !disableUploads {
reflectorServer = reflector.NewServer(blobStore)
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = !disableBlocklist
err = reflectorServer.Start(":" + strconv.Itoa(receiverPort))
if err != nil {
log.Fatal(err)
}
}
} }
if reflectorCmdCacheDir != "" { peerServer := peer.NewServer(outerStore)
err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm) err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
if err != nil {
log.Fatal(err)
}
blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, reflectorCmdCacheMaxBlobs, 2))
}
peerServer := peer.NewServer(blobStore)
err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer peerServer.Shutdown()
http3PeerServer := http3.NewServer(blobStore) http3PeerServer := http3.NewServer(outerStore)
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer http3PeerServer.Shutdown()
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start() metricsServer.Start()
defer metricsServer.Shutdown()
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan <-interruptChan
metricsServer.Shutdown() // deferred shutdowns happen now
peerServer.Shutdown() }
http3PeerServer.Shutdown()
if reflectorServer != nil { func setupStore() store.BlobStore {
reflectorServer.Shutdown() var s store.BlobStore
}
if proxyAddress != "" {
switch proxyProtocol {
case "tcp":
s = peer.NewStore(peer.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
case "http3":
s = http3.NewStore(http3.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
default:
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
}
} else {
s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
if cloudFrontEndpoint != "" {
s = store.NewCloudFrontStore(s3Store, cloudFrontEndpoint)
} else {
s = s3Store
}
}
if useDB {
db := new(db.SQL)
db.TrackAccessTime = true
err := db.Connect(globalConfig.DBConn)
if err != nil {
log.Fatal(err)
}
s = store.NewDBBackedStore(s, db)
}
return s
}
func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped := s
diskCacheMaxSize, diskCachePath := diskCacheParams()
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
log.Fatal(err)
}
wrapped = store.NewCachingStore(wrapped,
store.NewLRUStore(store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize))
}
if reflectorCmdMemCache > 0 {
wrapped = store.NewCachingStore(wrapped,
store.NewLRUStore(store.NewMemoryStore(), reflectorCmdMemCache))
}
return wrapped
}
func diskCacheParams() (int, string) {
if reflectorCmdDiskCache == "" {
return 0, ""
}
parts := strings.Split(reflectorCmdDiskCache, ":")
if len(parts) != 2 {
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
}
maxSize := cast.ToInt(parts[0])
if maxSize <= 0 {
log.Fatalf("--disk-cache max size must be more than 0")
}
path := parts[1]
if len(path) == 0 || path[0] != '/' {
log.Fatalf("--disk-cache path must start with '/'")
}
return maxSize, path
} }

View file

@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL) db := new(db.SQL)
err := db.Connect(globalConfig.DBConn) err := db.Connect(globalConfig.DBConn)
checkErr(err) checkErr(err)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
comboStore := store.NewDBBackedStore(s3, db) comboStore := store.NewDBBackedStore(s3, db)
conf := prism.DefaultConf() conf := prism.DefaultConf()

View file

@ -29,7 +29,7 @@ func init() {
func testCmd(cmd *cobra.Command, args []string) { func testCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString()) log.Printf("reflector %s", meta.VersionString())
memStore := store.NewMemoryBlobStore() memStore := store.NewMemoryStore()
reflectorServer := reflector.NewServer(memStore) reflectorServer := reflector.NewServer(memStore)
reflectorServer.Timeout = 3 * time.Minute reflectorServer.Timeout = 3 * time.Minute

View file

@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) {
checkErr(err) checkErr(err)
st := store.NewDBBackedStore( st := store.NewDBBackedStore(
store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
db) db)
uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload) uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload)

View file

@ -34,7 +34,7 @@ var availabilityRequests = []pair{
} }
func getServer(t *testing.T, withBlobs bool) *Server { func getServer(t *testing.T, withBlobs bool) *Server {
st := store.NewMemoryBlobStore() st := store.NewMemoryStore()
if withBlobs { if withBlobs {
for k, v := range blobs { for k, v := range blobs {
err := st.Put(k, v) err := st.Put(k, v)

View file

@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
t.Fatal(err) t.Fatal(err)
} }
srv := NewServer(store.NewMemoryBlobStore()) srv := NewServer(store.NewMemoryStore())
err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
srv := NewServer(store.NewMemoryBlobStore()) srv := NewServer(store.NewMemoryStore())
srv.Timeout = testTimeout srv.Timeout = testTimeout
err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil { if err != nil {
@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) {
//} //}
type mockPartialStore struct { type mockPartialStore struct {
*store.MemoryBlobStore *store.MemoryStore
missing []string missing []string
} }
@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) {
missing[i] = bits.Rand().String() missing[i] = bits.Rand().String()
} }
st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing}) st := store.BlobStore(&mockPartialStore{MemoryStore: store.NewMemoryStore(), missing: missing})
if _, ok := st.(neededBlobChecker); !ok { if _, ok := st.(neededBlobChecker); !ok {
t.Fatal("mock does not implement the relevant interface") t.Fatal("mock does not implement the relevant interface")
} }

View file

@ -11,22 +11,22 @@ import (
"golang.org/x/sync/singleflight" "golang.org/x/sync/singleflight"
) )
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance. // CachingStore combines two stores, typically a local and a remote store, to improve performance.
// Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they // Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin. // are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
type CachingBlobStore struct { type CachingStore struct {
origin, cache BlobStore origin, cache BlobStore
sf *singleflight.Group sf *singleflight.Group
} }
// NewCachingBlobStore makes a new caching disk store and returns a pointer to it. // NewCachingStore makes a new caching disk store and returns a pointer to it.
func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore { func NewCachingStore(origin, cache BlobStore) *CachingStore {
return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)} return &CachingStore{origin: origin, cache: cache, sf: new(singleflight.Group)}
} }
// Has checks the cache and then the origin for a hash. It returns true if either store has it. // Has checks the cache and then the origin for a hash. It returns true if either store has it.
func (c *CachingBlobStore) Has(hash string) (bool, error) { func (c *CachingStore) Has(hash string) (bool, error) {
has, err := c.cache.Has(hash) has, err := c.cache.Has(hash)
if has || err != nil { if has || err != nil {
return has, err return has, err
@ -36,7 +36,7 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) {
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
// from the origin, it is also stored in the cache. // from the origin, it is also stored in the cache.
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { func (c *CachingStore) Get(hash string) (stream.Blob, error) {
start := time.Now() start := time.Now()
blob, err := c.cache.Get(hash) blob, err := c.cache.Get(hash)
if err == nil || !errors.Is(err, ErrBlobNotFound) { if err == nil || !errors.Is(err, ErrBlobNotFound) {
@ -52,7 +52,7 @@ func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) {
// getFromOrigin ensures that only one Get per hash is sent to the origin at a time, // getFromOrigin ensures that only one Get per hash is sent to the origin at a time,
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem // thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) { func (c *CachingStore) getFromOrigin(hash string) (stream.Blob, error) {
metrics.CacheWaitingRequestsCount.Inc() metrics.CacheWaitingRequestsCount.Inc()
defer metrics.CacheWaitingRequestsCount.Dec() defer metrics.CacheWaitingRequestsCount.Dec()
originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) { originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) {
@ -78,7 +78,7 @@ func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) {
} }
// Put stores the blob in the origin and the cache // Put stores the blob in the origin and the cache
func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { func (c *CachingStore) Put(hash string, blob stream.Blob) error {
err := c.origin.Put(hash, blob) err := c.origin.Put(hash, blob)
if err != nil { if err != nil {
return err return err
@ -87,7 +87,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
} }
// PutSD stores the sd blob in the origin and the cache // PutSD stores the sd blob in the origin and the cache
func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { func (c *CachingStore) PutSD(hash string, blob stream.Blob) error {
err := c.origin.PutSD(hash, blob) err := c.origin.PutSD(hash, blob)
if err != nil { if err != nil {
return err return err
@ -96,7 +96,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
} }
// Delete deletes the blob from the origin and the cache // Delete deletes the blob from the origin and the cache
func (c *CachingBlobStore) Delete(hash string) error { func (c *CachingStore) Delete(hash string) error {
err := c.origin.Delete(hash) err := c.origin.Delete(hash)
if err != nil { if err != nil {
return err return err

View file

@ -10,9 +10,9 @@ import (
) )
func TestCachingBlobStore_Put(t *testing.T) { func TestCachingBlobStore_Put(t *testing.T) {
origin := NewMemoryBlobStore() origin := NewMemoryStore()
cache := NewMemoryBlobStore() cache := NewMemoryStore()
s := NewCachingBlobStore(origin, cache) s := NewCachingStore(origin, cache)
b := []byte("this is a blob of stuff") b := []byte("this is a blob of stuff")
hash := "hash" hash := "hash"
@ -40,9 +40,9 @@ func TestCachingBlobStore_Put(t *testing.T) {
} }
func TestCachingBlobStore_CacheMiss(t *testing.T) { func TestCachingBlobStore_CacheMiss(t *testing.T) {
origin := NewMemoryBlobStore() origin := NewMemoryStore()
cache := NewMemoryBlobStore() cache := NewMemoryStore()
s := NewCachingBlobStore(origin, cache) s := NewCachingStore(origin, cache)
b := []byte("this is a blob of stuff") b := []byte("this is a blob of stuff")
hash := "hash" hash := "hash"
@ -79,8 +79,8 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) {
func TestCachingBlobStore_ThunderingHerd(t *testing.T) { func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
storeDelay := 100 * time.Millisecond storeDelay := 100 * time.Millisecond
origin := NewSlowBlobStore(storeDelay) origin := NewSlowBlobStore(storeDelay)
cache := NewMemoryBlobStore() cache := NewMemoryStore()
s := NewCachingBlobStore(origin, cache) s := NewCachingStore(origin, cache)
b := []byte("this is a blob of stuff") b := []byte("this is a blob of stuff")
hash := "hash" hash := "hash"
@ -129,13 +129,13 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
// SlowBlobStore adds a delay to each request // SlowBlobStore adds a delay to each request
type SlowBlobStore struct { type SlowBlobStore struct {
mem *MemoryBlobStore mem *MemoryStore
delay time.Duration delay time.Duration
} }
func NewSlowBlobStore(delay time.Duration) *SlowBlobStore { func NewSlowBlobStore(delay time.Duration) *SlowBlobStore {
return &SlowBlobStore{ return &SlowBlobStore{
mem: NewMemoryBlobStore(), mem: NewMemoryStore(),
delay: delay, delay: delay,
} }
} }

View file

@ -1,109 +1,112 @@
package store package store
import ( import (
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time" "time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/meta"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// CloudFrontBlobStore is an CloudFront backed store (retrieval only) // CloudFrontStore wraps an S3 store. Reads go to Cloudfront, writes go to S3.
type CloudFrontBlobStore struct { type CloudFrontStore struct {
cfEndpoint string s3 *S3Store
s3Store *S3BlobStore endpoint string // cloudflare endpoint
} }
// NewS3BlobStore returns an initialized S3 store pointer. // NewCloudFrontStore returns an initialized CloudFrontStore store pointer.
func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore { // NOTE: It panics if S3Store is nil.
return &CloudFrontBlobStore{ func NewCloudFrontStore(s3 *S3Store, cfEndpoint string) *CloudFrontStore {
cfEndpoint: cloudFrontEndpoint, if s3 == nil {
s3Store: S3Store, panic("S3Store must not be nil")
}
return &CloudFrontStore{
endpoint: cfEndpoint,
s3: s3,
} }
} }
// Has returns T/F or Error if the store contains the blob. // Has checks if the hash is in the store.
func (s *CloudFrontBlobStore) Has(hash string) (bool, error) { func (c *CloudFrontStore) Has(hash string) (bool, error) {
url := s.cfEndpoint + hash status, body, err := c.cfRequest(http.MethodHead, hash)
req, err := http.NewRequest("HEAD", url, nil)
if err != nil { if err != nil {
return false, errors.Err(err) return false, err
} }
req.Header.Add("User-Agent", "reflector.go/"+meta.Version) defer body.Close()
res, err := http.DefaultClient.Do(req)
if err != nil {
return false, errors.Err(err)
}
defer res.Body.Close()
switch res.StatusCode { switch status {
case http.StatusNotFound, http.StatusForbidden: case http.StatusNotFound, http.StatusForbidden:
return false, nil return false, nil
case http.StatusOK: case http.StatusOK:
return true, nil return true, nil
default: default:
return false, errors.Err(res.Status) return false, errors.Err("unexpected status %d", status)
} }
} }
// Get returns the blob slice if present or errors. // Get gets the blob from Cloudfront.
func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) { func (c *CloudFrontStore) Get(hash string) (stream.Blob, error) {
url := s.cfEndpoint + hash
log.Debugf("Getting %s from S3", hash[:8]) log.Debugf("Getting %s from S3", hash[:8])
defer func(t time.Time) { defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
}(time.Now()) }(time.Now())
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, errors.Err(err)
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, errors.Err(err)
}
defer res.Body.Close()
switch res.StatusCode { status, body, err := c.cfRequest(http.MethodGet, hash)
if err != nil {
return nil, err
}
defer body.Close()
switch status {
case http.StatusNotFound, http.StatusForbidden: case http.StatusNotFound, http.StatusForbidden:
return nil, errors.Err(ErrBlobNotFound) return nil, errors.Err(ErrBlobNotFound)
case http.StatusOK: case http.StatusOK:
b, err := ioutil.ReadAll(res.Body) b, err := ioutil.ReadAll(body)
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
metrics.MtrInBytesS3.Add(float64(len(b))) metrics.MtrInBytesS3.Add(float64(len(b)))
return b, nil return b, nil
default: default:
return nil, errors.Err(res.Status) return nil, errors.Err("unexpected status %d", status)
} }
} }
// Put stores the blob on S3 or errors if S3 store is not present. func (c *CloudFrontStore) cfRequest(method, hash string) (int, io.ReadCloser, error) {
func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error { url := c.endpoint + hash
if s.s3Store != nil { req, err := http.NewRequest(method, url, nil)
return s.s3Store.Put(hash, blob) if err != nil {
return 0, nil, errors.Err(err)
} }
return errors.Err("not implemented in cloudfront store") req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, nil, errors.Err(err)
}
return res.StatusCode, res.Body, nil
} }
// PutSD stores the sd blob on S3 or errors if S3 store is not present. // Put stores the blob on S3
func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error { func (c *CloudFrontStore) Put(hash string, blob stream.Blob) error {
if s.s3Store != nil { return c.s3.Put(hash, blob)
return s.s3Store.PutSD(hash, blob)
}
return errors.Err("not implemented in cloudfront store")
} }
func (s *CloudFrontBlobStore) Delete(hash string) error { // PutSD stores the sd blob on S3
if s.s3Store != nil { func (c *CloudFrontStore) PutSD(hash string, blob stream.Blob) error {
return s.s3Store.Delete(hash) return c.s3.PutSD(hash, blob)
} }
return errors.Err("not implemented in cloudfront store")
// Delete deletes the blob from S3
func (c *CloudFrontStore) Delete(hash string) error {
return c.s3.Delete(hash)
} }

View file

@ -12,8 +12,8 @@ import (
"github.com/spf13/afero" "github.com/spf13/afero"
) )
// DiskBlobStore stores blobs on a local disk // DiskStore stores blobs on a local disk
type DiskBlobStore struct { type DiskStore struct {
// the location of blobs on disk // the location of blobs on disk
blobDir string blobDir string
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
@ -26,31 +26,31 @@ type DiskBlobStore struct {
initialized bool initialized bool
} }
// NewDiskBlobStore returns an initialized file disk store pointer. // NewDiskStore returns an initialized file disk store pointer.
func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { func NewDiskStore(dir string, prefixLength int) *DiskStore {
return &DiskBlobStore{ return &DiskStore{
blobDir: dir, blobDir: dir,
prefixLength: prefixLength, prefixLength: prefixLength,
fs: afero.NewOsFs(), fs: afero.NewOsFs(),
} }
} }
func (d *DiskBlobStore) dir(hash string) string { func (d *DiskStore) dir(hash string) string {
if d.prefixLength <= 0 || len(hash) < d.prefixLength { if d.prefixLength <= 0 || len(hash) < d.prefixLength {
return d.blobDir return d.blobDir
} }
return path.Join(d.blobDir, hash[:d.prefixLength]) return path.Join(d.blobDir, hash[:d.prefixLength])
} }
func (d *DiskBlobStore) path(hash string) string { func (d *DiskStore) path(hash string) string {
return path.Join(d.dir(hash), hash) return path.Join(d.dir(hash), hash)
} }
func (d *DiskBlobStore) ensureDirExists(dir string) error { func (d *DiskStore) ensureDirExists(dir string) error {
return errors.Err(d.fs.MkdirAll(dir, 0755)) return errors.Err(d.fs.MkdirAll(dir, 0755))
} }
func (d *DiskBlobStore) initOnce() error { func (d *DiskStore) initOnce() error {
if d.initialized { if d.initialized {
return nil return nil
} }
@ -65,7 +65,7 @@ func (d *DiskBlobStore) initOnce() error {
} }
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
func (d *DiskBlobStore) Has(hash string) (bool, error) { func (d *DiskStore) Has(hash string) (bool, error) {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return false, err return false, err
@ -82,7 +82,7 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) {
} }
// Get returns the blob or an error if the blob doesn't exist. // Get returns the blob or an error if the blob doesn't exist.
func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { func (d *DiskStore) Get(hash string) (stream.Blob, error) {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return nil, err return nil, err
@ -102,7 +102,7 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
} }
// Put stores the blob on disk // Put stores the blob on disk
func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { func (d *DiskStore) Put(hash string, blob stream.Blob) error {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return err return err
@ -118,12 +118,12 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
} }
// PutSD stores the sd blob on the disk // PutSD stores the sd blob on the disk
func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error { func (d *DiskStore) PutSD(hash string, blob stream.Blob) error {
return d.Put(hash, blob) return d.Put(hash, blob)
} }
// Delete deletes the blob from the store // Delete deletes the blob from the store
func (d *DiskBlobStore) Delete(hash string) error { func (d *DiskStore) Delete(hash string) error {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return err return err
@ -142,7 +142,7 @@ func (d *DiskBlobStore) Delete(hash string) error {
} }
// list returns a slice of blobs that already exist in the blobDir // list returns a slice of blobs that already exist in the blobDir
func (d *DiskBlobStore) list() ([]string, error) { func (d *DiskStore) list() ([]string, error) {
dirs, err := afero.ReadDir(d.fs, d.blobDir) dirs, err := afero.ReadDir(d.fs, d.blobDir)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -14,13 +14,13 @@ import (
const cacheMaxBlobs = 3 const cacheMaxBlobs = 3
func testLRUStore() (*LRUStore, *DiskBlobStore) { func testLRUStore() (*LRUStore, *DiskStore) {
d := NewDiskBlobStore("/", 2) d := NewDiskStore("/", 2)
d.fs = afero.NewMemMapFs() d.fs = afero.NewMemMapFs()
return NewLRUStore(d, 3), d return NewLRUStore(d, 3), d
} }
func countOnDisk(t *testing.T, disk *DiskBlobStore) int { func countOnDisk(t *testing.T, disk *DiskStore) int {
t.Helper() t.Helper()
count := 0 count := 0

View file

@ -5,25 +5,25 @@ import (
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
) )
// MemoryBlobStore is an in memory only blob store with no persistence. // MemoryStore is an in memory only blob store with no persistence.
type MemoryBlobStore struct { type MemoryStore struct {
blobs map[string]stream.Blob blobs map[string]stream.Blob
} }
func NewMemoryBlobStore() *MemoryBlobStore { func NewMemoryStore() *MemoryStore {
return &MemoryBlobStore{ return &MemoryStore{
blobs: make(map[string]stream.Blob), blobs: make(map[string]stream.Blob),
} }
} }
// Has returns T/F if the blob is currently stored. It will never error. // Has returns T/F if the blob is currently stored. It will never error.
func (m *MemoryBlobStore) Has(hash string) (bool, error) { func (m *MemoryStore) Has(hash string) (bool, error) {
_, ok := m.blobs[hash] _, ok := m.blobs[hash]
return ok, nil return ok, nil
} }
// Get returns the blob byte slice if present and errors if the blob is not found. // Get returns the blob byte slice if present and errors if the blob is not found.
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { func (m *MemoryStore) Get(hash string) (stream.Blob, error) {
blob, ok := m.blobs[hash] blob, ok := m.blobs[hash]
if !ok { if !ok {
return nil, errors.Err(ErrBlobNotFound) return nil, errors.Err(ErrBlobNotFound)
@ -32,23 +32,23 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
} }
// Put stores the blob in memory // Put stores the blob in memory
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error { func (m *MemoryStore) Put(hash string, blob stream.Blob) error {
m.blobs[hash] = blob m.blobs[hash] = blob
return nil return nil
} }
// PutSD stores the sd blob in memory // PutSD stores the sd blob in memory
func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error { func (m *MemoryStore) PutSD(hash string, blob stream.Blob) error {
return m.Put(hash, blob) return m.Put(hash, blob)
} }
// Delete deletes the blob from the store // Delete deletes the blob from the store
func (m *MemoryBlobStore) Delete(hash string) error { func (m *MemoryStore) Delete(hash string) error {
delete(m.blobs, hash) delete(m.blobs, hash)
return nil return nil
} }
// Debug returns the blobs in memory. It's useful for testing and debugging. // Debug returns the blobs in memory. It's useful for testing and debugging.
func (m *MemoryBlobStore) Debug() map[string]stream.Blob { func (m *MemoryStore) Debug() map[string]stream.Blob {
return m.blobs return m.blobs
} }

View file

@ -8,7 +8,7 @@ import (
) )
func TestMemoryBlobStore_Put(t *testing.T) { func TestMemoryBlobStore_Put(t *testing.T) {
s := NewMemoryBlobStore() s := NewMemoryStore()
blob := []byte("abcdefg") blob := []byte("abcdefg")
err := s.Put("abc", blob) err := s.Put("abc", blob)
if err != nil { if err != nil {
@ -17,7 +17,7 @@ func TestMemoryBlobStore_Put(t *testing.T) {
} }
func TestMemoryBlobStore_Get(t *testing.T) { func TestMemoryBlobStore_Get(t *testing.T) {
s := NewMemoryBlobStore() s := NewMemoryStore()
hash := "abc" hash := "abc"
blob := []byte("abcdefg") blob := []byte("abcdefg")
err := s.Put(hash, blob) err := s.Put(hash, blob)

View file

@ -18,8 +18,8 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// S3BlobStore is an S3 store // S3Store is an S3 store
type S3BlobStore struct { type S3Store struct {
awsID string awsID string
awsSecret string awsSecret string
region string region string
@ -28,9 +28,9 @@ type S3BlobStore struct {
session *session.Session session *session.Session
} }
// NewS3BlobStore returns an initialized S3 store pointer. // NewS3Store returns an initialized S3 store pointer.
func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store {
return &S3BlobStore{ return &S3Store{
awsID: awsID, awsID: awsID,
awsSecret: awsSecret, awsSecret: awsSecret,
region: region, region: region,
@ -38,7 +38,7 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
} }
} }
func (s *S3BlobStore) initOnce() error { func (s *S3Store) initOnce() error {
if s.session != nil { if s.session != nil {
return nil return nil
} }
@ -56,7 +56,7 @@ func (s *S3BlobStore) initOnce() error {
} }
// Has returns T/F or Error ( from S3 ) if the store contains the blob. // Has returns T/F or Error ( from S3 ) if the store contains the blob.
func (s *S3BlobStore) Has(hash string) (bool, error) { func (s *S3Store) Has(hash string) (bool, error) {
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
return false, err return false, err
@ -77,7 +77,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) {
} }
// Get returns the blob slice if present or errors on S3. // Get returns the blob slice if present or errors on S3.
func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { func (s *S3Store) Get(hash string) (stream.Blob, error) {
//Todo-Need to handle error for blob doesn't exist for consistency. //Todo-Need to handle error for blob doesn't exist for consistency.
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
@ -110,7 +110,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
} }
// Put stores the blob on S3 or errors if S3 connection errors. // Put stores the blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { func (s *S3Store) Put(hash string, blob stream.Blob) error {
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
return err return err
@ -133,12 +133,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
} }
// PutSD stores the sd blob on S3 or errors if S3 connection errors. // PutSD stores the sd blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error { func (s *S3Store) PutSD(hash string, blob stream.Blob) error {
//Todo - handle missing stream for consistency //Todo - handle missing stream for consistency
return s.Put(hash, blob) return s.Put(hash, blob)
} }
func (s *S3BlobStore) Delete(hash string) error { func (s *S3Store) Delete(hash string) error {
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
return err return err