Merge branch 'smarter_caches'
* smarter_caches: use speedwalk for faster file listing remove afero fs abstraction in prep for using speedwalk add faster file.Walk fn. meant for DiskStore.list() clarify Get() error requirement separate singleflight cache wrapper, component names for cache metrics apparently the normal check doesn't work split cloudfront into RO and RW stores add noop store that does nothing add test for DiskStore.list() add lru cache eviction metric rename the stores, add caching to reflector cmd separate disk and lru behavior LRU cache for disk store, abstract fs in disk store for testing
This commit is contained in:
commit
7bddcf01b8
30 changed files with 957 additions and 444 deletions
|
@ -28,9 +28,10 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
|
|||
addr := args[0]
|
||||
sdHash := args[1]
|
||||
|
||||
s := store.NewCachingBlobStore(
|
||||
s := store.NewCachingStore(
|
||||
"getstream",
|
||||
peer.NewStore(peer.StoreOpts{Address: addr}),
|
||||
store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2),
|
||||
store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
|
||||
)
|
||||
|
||||
wd, err := os.Getwd()
|
||||
|
|
|
@ -29,7 +29,7 @@ func init() {
|
|||
func peerCmd(cmd *cobra.Command, args []string) {
|
||||
var err error
|
||||
|
||||
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
peerServer := peer.NewServer(s3)
|
||||
|
||||
if !peerNoDB {
|
||||
|
|
202
cmd/reflector.go
202
cmd/reflector.go
|
@ -4,6 +4,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -16,21 +17,25 @@ import (
|
|||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var reflectorCmdCacheDir string
|
||||
var tcpPeerPort int
|
||||
var http3PeerPort int
|
||||
var receiverPort int
|
||||
var metricsPort int
|
||||
var disableUploads bool
|
||||
var disableBlocklist bool
|
||||
var proxyAddress string
|
||||
var proxyPort string
|
||||
var proxyProtocol string
|
||||
var useDB bool
|
||||
var cloudFrontEndpoint string
|
||||
var (
|
||||
tcpPeerPort int
|
||||
http3PeerPort int
|
||||
receiverPort int
|
||||
metricsPort int
|
||||
disableUploads bool
|
||||
disableBlocklist bool
|
||||
proxyAddress string
|
||||
proxyPort string
|
||||
proxyProtocol string
|
||||
useDB bool
|
||||
cloudFrontEndpoint string
|
||||
reflectorCmdDiskCache string
|
||||
reflectorCmdMemCache int
|
||||
)
|
||||
|
||||
func init() {
|
||||
var cmd = &cobra.Command{
|
||||
|
@ -38,7 +43,6 @@ func init() {
|
|||
Short: "Run reflector server",
|
||||
Run: reflectorCmd,
|
||||
}
|
||||
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)")
|
||||
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
|
||||
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
|
||||
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
|
||||
|
@ -50,94 +54,142 @@ func init() {
|
|||
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
|
||||
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
|
||||
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
|
||||
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
|
||||
"enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
|
||||
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
|
||||
rootCmd.AddCommand(cmd)
|
||||
}
|
||||
|
||||
func reflectorCmd(cmd *cobra.Command, args []string) {
|
||||
log.Printf("reflector %s", meta.VersionString())
|
||||
|
||||
var blobStore store.BlobStore
|
||||
if proxyAddress != "" {
|
||||
switch proxyProtocol {
|
||||
case "tcp":
|
||||
blobStore = peer.NewStore(peer.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
case "http3":
|
||||
blobStore = http3.NewStore(http3.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
default:
|
||||
log.Fatalf("specified protocol is not recognized: %s", proxyProtocol)
|
||||
}
|
||||
} else {
|
||||
s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
if cloudFrontEndpoint != "" {
|
||||
blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store)
|
||||
} else {
|
||||
blobStore = s3Store
|
||||
}
|
||||
}
|
||||
// the blocklist logic requires the db backed store to be the outer-most store
|
||||
underlyingStore := setupStore()
|
||||
outerStore := wrapWithCache(underlyingStore)
|
||||
|
||||
var err error
|
||||
var reflectorServer *reflector.Server
|
||||
if !disableUploads {
|
||||
reflectorServer := reflector.NewServer(underlyingStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
reflectorServer.EnableBlocklist = !disableBlocklist
|
||||
|
||||
if useDB {
|
||||
db := new(db.SQL)
|
||||
db.TrackAccessTime = true
|
||||
err = db.Connect(globalConfig.DBConn)
|
||||
err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
blobStore = store.NewDBBackedStore(blobStore, db)
|
||||
|
||||
//this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work....
|
||||
//having this here prevents uploaded blobs from being stored in the disk cache
|
||||
if !disableUploads {
|
||||
reflectorServer = reflector.NewServer(blobStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
reflectorServer.EnableBlocklist = !disableBlocklist
|
||||
|
||||
err = reflectorServer.Start(":" + strconv.Itoa(receiverPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
defer reflectorServer.Shutdown()
|
||||
}
|
||||
|
||||
if reflectorCmdCacheDir != "" {
|
||||
err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2))
|
||||
}
|
||||
|
||||
peerServer := peer.NewServer(blobStore)
|
||||
err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
|
||||
peerServer := peer.NewServer(outerStore)
|
||||
err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer peerServer.Shutdown()
|
||||
|
||||
http3PeerServer := http3.NewServer(blobStore)
|
||||
http3PeerServer := http3.NewServer(outerStore)
|
||||
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer http3PeerServer.Shutdown()
|
||||
|
||||
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
||||
metricsServer.Start()
|
||||
defer metricsServer.Shutdown()
|
||||
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
<-interruptChan
|
||||
metricsServer.Shutdown()
|
||||
peerServer.Shutdown()
|
||||
http3PeerServer.Shutdown()
|
||||
if reflectorServer != nil {
|
||||
reflectorServer.Shutdown()
|
||||
}
|
||||
// deferred shutdowns happen now
|
||||
}
|
||||
|
||||
func setupStore() store.BlobStore {
|
||||
var s store.BlobStore
|
||||
|
||||
if proxyAddress != "" {
|
||||
switch proxyProtocol {
|
||||
case "tcp":
|
||||
s = peer.NewStore(peer.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
case "http3":
|
||||
s = http3.NewStore(http3.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
default:
|
||||
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
|
||||
}
|
||||
} else {
|
||||
s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
if cloudFrontEndpoint != "" {
|
||||
s = store.NewCloudFrontRWStore(store.NewCloudFrontROStore(cloudFrontEndpoint), s3Store)
|
||||
} else {
|
||||
s = s3Store
|
||||
}
|
||||
}
|
||||
|
||||
if useDB {
|
||||
db := new(db.SQL)
|
||||
db.TrackAccessTime = true
|
||||
err := db.Connect(globalConfig.DBConn)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
s = store.NewDBBackedStore(s, db)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func wrapWithCache(s store.BlobStore) store.BlobStore {
|
||||
wrapped := s
|
||||
|
||||
diskCacheMaxSize, diskCachePath := diskCacheParams()
|
||||
if diskCacheMaxSize > 0 {
|
||||
err := os.MkdirAll(diskCachePath, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
|
||||
)
|
||||
}
|
||||
|
||||
if reflectorCmdMemCache > 0 {
|
||||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache),
|
||||
)
|
||||
}
|
||||
|
||||
return wrapped
|
||||
}
|
||||
|
||||
func diskCacheParams() (int, string) {
|
||||
if reflectorCmdDiskCache == "" {
|
||||
return 0, ""
|
||||
}
|
||||
|
||||
parts := strings.Split(reflectorCmdDiskCache, ":")
|
||||
if len(parts) != 2 {
|
||||
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
|
||||
}
|
||||
|
||||
maxSize := cast.ToInt(parts[0])
|
||||
if maxSize <= 0 {
|
||||
log.Fatalf("--disk-cache max size must be more than 0")
|
||||
}
|
||||
|
||||
path := parts[1]
|
||||
if len(path) == 0 || path[0] != '/' {
|
||||
log.Fatalf("--disk-cache path must start with '/'")
|
||||
}
|
||||
|
||||
return maxSize, path
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) {
|
|||
db := new(db.SQL)
|
||||
err := db.Connect(globalConfig.DBConn)
|
||||
checkErr(err)
|
||||
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
comboStore := store.NewDBBackedStore(s3, db)
|
||||
|
||||
conf := prism.DefaultConf()
|
||||
|
|
|
@ -29,7 +29,7 @@ func init() {
|
|||
func testCmd(cmd *cobra.Command, args []string) {
|
||||
log.Printf("reflector %s", meta.VersionString())
|
||||
|
||||
memStore := store.NewMemoryBlobStore()
|
||||
memStore := store.NewMemStore()
|
||||
|
||||
reflectorServer := reflector.NewServer(memStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
|
|
|
@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) {
|
|||
checkErr(err)
|
||||
|
||||
st := store.NewDBBackedStore(
|
||||
store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
|
||||
store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
|
||||
db)
|
||||
|
||||
uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload)
|
||||
|
|
6
go.mod
6
go.mod
|
@ -14,11 +14,12 @@ require (
|
|||
github.com/google/gops v0.3.7
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.3 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/hashicorp/memberlist v0.1.4 // indirect
|
||||
github.com/hashicorp/serf v0.8.2
|
||||
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
|
||||
github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07
|
||||
github.com/karrick/godirwalk v1.16.1
|
||||
github.com/lbryio/chainquery v1.9.0
|
||||
github.com/lbryio/lbry.go v1.1.2 // indirect
|
||||
github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128
|
||||
|
@ -27,9 +28,11 @@ require (
|
|||
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
|
||||
github.com/prometheus/client_golang v0.9.2
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
github.com/spf13/afero v1.4.1
|
||||
github.com/spf13/cast v1.3.0
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/pflag v1.0.3 // indirect
|
||||
github.com/stretchr/testify v1.4.0
|
||||
github.com/volatiletech/null v8.0.0+incompatible
|
||||
go.uber.org/atomic v1.5.1
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
|
||||
|
@ -37,6 +40,7 @@ require (
|
|||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
|
||||
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect
|
||||
google.golang.org/appengine v1.6.2 // indirect
|
||||
gotest.tools v2.2.0+incompatible
|
||||
)
|
||||
|
||||
go 1.15
|
||||
|
|
11
go.sum
11
go.sum
|
@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA
|
|||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
|
||||
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno=
|
||||
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
|
||||
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
|
||||
|
@ -186,12 +188,15 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
|||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro=
|
||||
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
|
||||
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
|
||||
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
|
||||
github.com/keybase/go-ps v0.0.0-20161005175911-668c8856d999/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
|
@ -280,6 +285,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
|||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
|
||||
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
|
||||
|
@ -346,6 +352,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
|
|||
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
|
||||
github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I=
|
||||
github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ=
|
||||
github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
|
||||
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
|
||||
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
|
@ -394,6 +402,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
|
|||
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
|
@ -471,6 +480,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
|||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w=
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
ee "github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
@ -57,7 +58,8 @@ func (s *Server) Shutdown() {
|
|||
}
|
||||
|
||||
const (
|
||||
ns = "reflector"
|
||||
ns = "reflector"
|
||||
subsystemCache = "cache"
|
||||
|
||||
labelDirection = "direction"
|
||||
labelErrorType = "error_type"
|
||||
|
@ -65,7 +67,9 @@ const (
|
|||
DirectionUpload = "upload" // to reflector
|
||||
DirectionDownload = "download" // from reflector
|
||||
|
||||
MtrLabelSource = "source"
|
||||
LabelCacheType = "cache_type"
|
||||
LabelComponent = "component"
|
||||
LabelSource = "source"
|
||||
|
||||
errConnReset = "conn_reset"
|
||||
errReadConnReset = "read_conn_reset"
|
||||
|
@ -92,6 +96,12 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "error_total",
|
||||
Help: "Total number of errors",
|
||||
}, []string{labelDirection, labelErrorType})
|
||||
|
||||
BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "blob_download_total",
|
||||
|
@ -107,27 +117,44 @@ var (
|
|||
Name: "http3_blob_download_total",
|
||||
Help: "Total number of blobs downloaded from reflector through QUIC protocol",
|
||||
})
|
||||
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
|
||||
CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_hit_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "hit_total",
|
||||
Help: "Total number of blobs retrieved from the cache storage",
|
||||
})
|
||||
CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_miss_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "miss_total",
|
||||
Help: "Total number of blobs retrieved from origin rather than cache storage",
|
||||
})
|
||||
CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_origin_requests_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "origin_requests_total",
|
||||
Help: "How many Get requests are in flight from the cache to the origin",
|
||||
})
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
// during thundering-herd situations, the metric below should be a lot smaller than the metric above
|
||||
CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_waiting_requests_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "waiting_requests_total",
|
||||
Help: "How many cache requests are waiting for an in-flight origin request",
|
||||
})
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Subsystem: subsystemCache,
|
||||
Name: "evict_total",
|
||||
Help: "Count of blobs evicted from cache",
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "speed_mbps",
|
||||
Help: "Speed of blob retrieval from cache or from origin",
|
||||
}, []string{LabelCacheType, LabelComponent, LabelSource})
|
||||
|
||||
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "blob_upload_total",
|
||||
|
@ -138,16 +165,7 @@ var (
|
|||
Name: "sdblob_upload_total",
|
||||
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
|
||||
})
|
||||
RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "speed_mbps",
|
||||
Help: "Speed of blob retrieval",
|
||||
}, []string{MtrLabelSource})
|
||||
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "error_total",
|
||||
Help: "Total number of errors",
|
||||
}, []string{labelDirection, labelErrorType})
|
||||
|
||||
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "tcp_in_bytes",
|
||||
|
@ -185,6 +203,13 @@ var (
|
|||
})
|
||||
)
|
||||
|
||||
func CacheLabels(name, component string) prometheus.Labels {
|
||||
return prometheus.Labels{
|
||||
LabelCacheType: name,
|
||||
LabelComponent: component,
|
||||
}
|
||||
}
|
||||
|
||||
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever
|
||||
if e == nil {
|
||||
return
|
||||
|
|
|
@ -164,7 +164,7 @@ func generateTLSConfig() *tls.Config {
|
|||
|
||||
func (s *Server) listenAndServe(server *http3.Server) {
|
||||
err := server.ListenAndServe()
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
if err != nil && err.Error() != "server closed" {
|
||||
log.Errorln(errors.FullTrace(err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) {
|
|||
return c, errors.Prefix("connection error", err)
|
||||
}
|
||||
|
||||
func (p *Store) Name() string { return "http3" }
|
||||
|
||||
// Has asks the peer if they have a hash
|
||||
func (p *Store) Has(hash string) (bool, error) {
|
||||
c, err := p.getClient()
|
||||
|
|
|
@ -34,7 +34,7 @@ var availabilityRequests = []pair{
|
|||
}
|
||||
|
||||
func getServer(t *testing.T, withBlobs bool) *Server {
|
||||
st := store.NewMemoryBlobStore()
|
||||
st := store.NewMemStore()
|
||||
if withBlobs {
|
||||
for k, v := range blobs {
|
||||
err := st.Put(k, v)
|
||||
|
|
|
@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) {
|
|||
return c, errors.Prefix("connection error", err)
|
||||
}
|
||||
|
||||
func (p *Store) Name() string { return "peer" }
|
||||
|
||||
// Has asks the peer if they have a hash
|
||||
func (p *Store) Has(hash string) (bool, error) {
|
||||
c, err := p.getClient()
|
||||
|
|
|
@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
srv := NewServer(store.NewMemoryBlobStore())
|
||||
srv := NewServer(store.NewMemStore())
|
||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
srv := NewServer(store.NewMemoryBlobStore())
|
||||
srv := NewServer(store.NewMemStore())
|
||||
srv.Timeout = testTimeout
|
||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||
if err != nil {
|
||||
|
@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) {
|
|||
//}
|
||||
|
||||
type mockPartialStore struct {
|
||||
*store.MemoryBlobStore
|
||||
*store.MemStore
|
||||
missing []string
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) {
|
|||
missing[i] = bits.Rand().String()
|
||||
}
|
||||
|
||||
st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing})
|
||||
st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing})
|
||||
if _, ok := st.(neededBlobChecker); !ok {
|
||||
t.Fatal("mock does not implement the relevant interface")
|
||||
}
|
||||
|
|
|
@ -7,26 +7,32 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance.
|
||||
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
|
||||
// Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they
|
||||
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
|
||||
type CachingBlobStore struct {
|
||||
type CachingStore struct {
|
||||
origin, cache BlobStore
|
||||
|
||||
sf *singleflight.Group
|
||||
component string
|
||||
}
|
||||
|
||||
// NewCachingBlobStore makes a new caching disk store and returns a pointer to it.
|
||||
func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore {
|
||||
return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)}
|
||||
// NewCachingStore makes a new caching disk store and returns a pointer to it.
|
||||
func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
|
||||
return &CachingStore{
|
||||
component: component,
|
||||
origin: WithSingleFlight(component, origin),
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
const nameCaching = "caching"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CachingStore) Name() string { return nameCaching }
|
||||
|
||||
// Has checks the cache and then the origin for a hash. It returns true if either store has it.
|
||||
func (c *CachingBlobStore) Has(hash string) (bool, error) {
|
||||
func (c *CachingStore) Has(hash string) (bool, error) {
|
||||
has, err := c.cache.Has(hash)
|
||||
if has || err != nil {
|
||||
return has, err
|
||||
|
@ -36,49 +42,33 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) {
|
|||
|
||||
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
|
||||
// from the origin, it is also stored in the cache.
|
||||
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
||||
start := time.Now()
|
||||
blob, err := c.cache.Get(hash)
|
||||
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
||||
metrics.CacheHitCount.Inc()
|
||||
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate)
|
||||
metrics.CacheRetrievalSpeed.With(map[string]string{
|
||||
metrics.LabelCacheType: c.cache.Name(),
|
||||
metrics.LabelComponent: c.component,
|
||||
metrics.LabelSource: "cache",
|
||||
}).Set(rate)
|
||||
return blob, err
|
||||
}
|
||||
|
||||
metrics.CacheMissCount.Inc()
|
||||
return c.getFromOrigin(hash)
|
||||
}
|
||||
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||
|
||||
// getFromOrigin ensures that only one Get per hash is sent to the origin at a time,
|
||||
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
||||
func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) {
|
||||
metrics.CacheWaitingRequestsCount.Inc()
|
||||
defer metrics.CacheWaitingRequestsCount.Dec()
|
||||
originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) {
|
||||
metrics.CacheOriginRequestsCount.Inc()
|
||||
defer metrics.CacheOriginRequestsCount.Dec()
|
||||
|
||||
start := time.Now()
|
||||
blob, err := c.origin.Get(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate)
|
||||
|
||||
err = c.cache.Put(hash, blob)
|
||||
return blob, err
|
||||
})
|
||||
blob, err = c.origin.Get(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return originBlob.(stream.Blob), nil
|
||||
|
||||
err = c.cache.Put(hash, blob)
|
||||
return blob, err
|
||||
}
|
||||
|
||||
// Put stores the blob in the origin and the cache
|
||||
func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (c *CachingStore) Put(hash string, blob stream.Blob) error {
|
||||
err := c.origin.Put(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -87,7 +77,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
|
|||
}
|
||||
|
||||
// PutSD stores the sd blob in the origin and the cache
|
||||
func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (c *CachingStore) PutSD(hash string, blob stream.Blob) error {
|
||||
err := c.origin.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -96,7 +86,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
|
|||
}
|
||||
|
||||
// Delete deletes the blob from the origin and the cache
|
||||
func (c *CachingBlobStore) Delete(hash string) error {
|
||||
func (c *CachingStore) Delete(hash string) error {
|
||||
err := c.origin.Delete(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -9,10 +9,10 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
func TestCachingBlobStore_Put(t *testing.T) {
|
||||
origin := NewMemoryBlobStore()
|
||||
cache := NewMemoryBlobStore()
|
||||
s := NewCachingBlobStore(origin, cache)
|
||||
func TestCachingStore_Put(t *testing.T) {
|
||||
origin := NewMemStore()
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore("test", origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
hash := "hash"
|
||||
|
@ -39,10 +39,10 @@ func TestCachingBlobStore_Put(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
||||
origin := NewMemoryBlobStore()
|
||||
cache := NewMemoryBlobStore()
|
||||
s := NewCachingBlobStore(origin, cache)
|
||||
func TestCachingStore_CacheMiss(t *testing.T) {
|
||||
origin := NewMemStore()
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore("test", origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
hash := "hash"
|
||||
|
@ -76,11 +76,11 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
|
||||
func TestCachingStore_ThunderingHerd(t *testing.T) {
|
||||
storeDelay := 100 * time.Millisecond
|
||||
origin := NewSlowBlobStore(storeDelay)
|
||||
cache := NewMemoryBlobStore()
|
||||
s := NewCachingBlobStore(origin, cache)
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore("test", origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
hash := "hash"
|
||||
|
@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
|
|||
|
||||
// SlowBlobStore adds a delay to each request
|
||||
type SlowBlobStore struct {
|
||||
mem *MemoryBlobStore
|
||||
mem *MemStore
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func NewSlowBlobStore(delay time.Duration) *SlowBlobStore {
|
||||
return &SlowBlobStore{
|
||||
mem: NewMemoryBlobStore(),
|
||||
mem: NewMemStore(),
|
||||
delay: delay,
|
||||
}
|
||||
}
|
||||
func (s *SlowBlobStore) Name() string {
|
||||
return "slow"
|
||||
}
|
||||
|
||||
func (s *SlowBlobStore) Has(hash string) (bool, error) {
|
||||
time.Sleep(s.delay)
|
||||
|
|
|
@ -1,109 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// CloudFrontBlobStore is an CloudFront backed store (retrieval only)
|
||||
type CloudFrontBlobStore struct {
|
||||
cfEndpoint string
|
||||
s3Store *S3BlobStore
|
||||
}
|
||||
|
||||
// NewS3BlobStore returns an initialized S3 store pointer.
|
||||
func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore {
|
||||
return &CloudFrontBlobStore{
|
||||
cfEndpoint: cloudFrontEndpoint,
|
||||
s3Store: S3Store,
|
||||
}
|
||||
}
|
||||
|
||||
// Has returns T/F or Error if the store contains the blob.
|
||||
func (s *CloudFrontBlobStore) Has(hash string) (bool, error) {
|
||||
url := s.cfEndpoint + hash
|
||||
|
||||
req, err := http.NewRequest("HEAD", url, nil)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return false, nil
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
default:
|
||||
return false, errors.Err(res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the blob slice if present or errors.
|
||||
func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
url := s.cfEndpoint + hash
|
||||
log.Debugf("Getting %s from S3", hash[:8])
|
||||
defer func(t time.Time) {
|
||||
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
||||
}(time.Now())
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
case http.StatusOK:
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
metrics.MtrInBytesS3.Add(float64(len(b)))
|
||||
return b, nil
|
||||
default:
|
||||
return nil, errors.Err(res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// Put stores the blob on S3 or errors if S3 store is not present.
|
||||
func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.Put(hash, blob)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob on S3 or errors if S3 store is not present.
|
||||
func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.PutSD(hash, blob)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
||||
|
||||
func (s *CloudFrontBlobStore) Delete(hash string) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.Delete(hash)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
105
store/cloudfront_ro.go
Normal file
105
store/cloudfront_ro.go
Normal file
|
@ -0,0 +1,105 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// CloudFrontROStore reads from cloudfront. All writes panic.
|
||||
type CloudFrontROStore struct {
|
||||
endpoint string // cloudflare endpoint
|
||||
}
|
||||
|
||||
// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer.
|
||||
func NewCloudFrontROStore(endpoint string) *CloudFrontROStore {
|
||||
return &CloudFrontROStore{endpoint: endpoint}
|
||||
}
|
||||
|
||||
const nameCloudFrontRO = "cloudfront_ro"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO }
|
||||
|
||||
// Has checks if the hash is in the store.
|
||||
func (c *CloudFrontROStore) Has(hash string) (bool, error) {
|
||||
status, body, err := c.cfRequest(http.MethodHead, hash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer body.Close()
|
||||
|
||||
switch status {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return false, nil
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
default:
|
||||
return false, errors.Err("unexpected status %d", status)
|
||||
}
|
||||
}
|
||||
|
||||
// Get gets the blob from Cloudfront.
|
||||
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) {
|
||||
log.Debugf("Getting %s from S3", hash[:8])
|
||||
defer func(t time.Time) {
|
||||
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
||||
}(time.Now())
|
||||
|
||||
status, body, err := c.cfRequest(http.MethodGet, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer body.Close()
|
||||
|
||||
switch status {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
case http.StatusOK:
|
||||
b, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
metrics.MtrInBytesS3.Add(float64(len(b)))
|
||||
return b, nil
|
||||
default:
|
||||
return nil, errors.Err("unexpected status %d", status)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) {
|
||||
url := c.endpoint + hash
|
||||
req, err := http.NewRequest(method, url, nil)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Err(err)
|
||||
}
|
||||
|
||||
return res.StatusCode, res.Body, nil
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) Delete(_ string) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
50
store/cloudfront_rw.go
Normal file
50
store/cloudfront_rw.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3.
|
||||
type CloudFrontRWStore struct {
|
||||
cf *CloudFrontROStore
|
||||
s3 *S3Store
|
||||
}
|
||||
|
||||
// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer.
|
||||
// NOTE: It panics if either argument is nil.
|
||||
func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore {
|
||||
if cf == nil || s3 == nil {
|
||||
panic("both stores must be set")
|
||||
}
|
||||
return &CloudFrontRWStore{cf: cf, s3: s3}
|
||||
}
|
||||
|
||||
const nameCloudFrontRW = "cloudfront_rw"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CloudFrontRWStore) Name() string { return nameCloudFrontRW }
|
||||
|
||||
// Has checks if the hash is in the store.
|
||||
func (c *CloudFrontRWStore) Has(hash string) (bool, error) {
|
||||
return c.cf.Has(hash)
|
||||
}
|
||||
|
||||
// Get gets the blob from Cloudfront.
|
||||
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) {
|
||||
return c.cf.Get(hash)
|
||||
}
|
||||
|
||||
// Put stores the blob on S3
|
||||
func (c *CloudFrontRWStore) Put(hash string, blob stream.Blob) error {
|
||||
return c.s3.Put(hash, blob)
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob on S3
|
||||
func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error {
|
||||
return c.s3.PutSD(hash, blob)
|
||||
}
|
||||
|
||||
// Delete deletes the blob from S3
|
||||
func (c *CloudFrontRWStore) Delete(hash string) error {
|
||||
return c.s3.Delete(hash)
|
||||
}
|
|
@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore {
|
|||
return &DBBackedStore{blobs: blobs, db: db}
|
||||
}
|
||||
|
||||
const nameDBBacked = "db-backed"
|
||||
|
||||
// Name is the cache type name
|
||||
func (d *DBBackedStore) Name() string { return nameDBBacked }
|
||||
|
||||
// Has returns true if the blob is in the store
|
||||
func (d *DBBackedStore) Has(hash string) (bool, error) {
|
||||
return d.db.HasBlob(hash)
|
||||
|
|
178
store/disk.go
178
store/disk.go
|
@ -4,82 +4,38 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/lbryio/reflector.go/store/speedwalk"
|
||||
)
|
||||
|
||||
// DiskBlobStore stores blobs on a local disk
|
||||
type DiskBlobStore struct {
|
||||
// DiskStore stores blobs on a local disk
|
||||
type DiskStore struct {
|
||||
// the location of blobs on disk
|
||||
blobDir string
|
||||
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
|
||||
prefixLength int
|
||||
|
||||
initialized bool
|
||||
lastChecked time.Time
|
||||
diskCleanupBusy chan bool
|
||||
// true if initOnce ran, false otherwise
|
||||
initialized bool
|
||||
}
|
||||
|
||||
// NewDiskBlobStore returns an initialized file disk store pointer.
|
||||
func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore {
|
||||
dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)}
|
||||
dbs.diskCleanupBusy <- true
|
||||
return &dbs
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) dir(hash string) string {
|
||||
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
|
||||
return d.blobDir
|
||||
// NewDiskStore returns an initialized file disk store pointer.
|
||||
func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
||||
return &DiskStore{
|
||||
blobDir: dir,
|
||||
prefixLength: prefixLength,
|
||||
}
|
||||
return path.Join(d.blobDir, hash[:d.prefixLength])
|
||||
}
|
||||
|
||||
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
|
||||
func (d *DiskBlobStore) getUsedSpace() (float32, error) {
|
||||
var stat syscall.Statfs_t
|
||||
err := syscall.Statfs(d.blobDir, &stat)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Available blocks * size per block = available space in bytes
|
||||
all := stat.Blocks * uint64(stat.Bsize)
|
||||
free := stat.Bfree * uint64(stat.Bsize)
|
||||
used := all - free
|
||||
const nameDisk = "disk"
|
||||
|
||||
return float32(used) / float32(all), nil
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) path(hash string) string {
|
||||
return path.Join(d.dir(hash), hash)
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) ensureDirExists(dir string) error {
|
||||
return errors.Err(os.MkdirAll(dir, 0755))
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) initOnce() error {
|
||||
if d.initialized {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := d.ensureDirExists(d.blobDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
// Name is the cache type name
|
||||
func (d *DiskStore) Name() string { return nameDisk }
|
||||
|
||||
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
|
||||
func (d *DiskBlobStore) Has(hash string) (bool, error) {
|
||||
func (d *DiskStore) Has(hash string) (bool, error) {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -90,13 +46,13 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) {
|
|||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Get returns the blob or an error if the blob doesn't exist.
|
||||
func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (d *DiskStore) Get(hash string) (stream.Blob, error) {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -111,11 +67,12 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
|
|||
}
|
||||
defer file.Close()
|
||||
|
||||
return ioutil.ReadAll(file)
|
||||
blob, err := ioutil.ReadAll(file)
|
||||
return blob, errors.Err(err)
|
||||
}
|
||||
|
||||
// Put stores the blob on disk
|
||||
func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -126,16 +83,17 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(d.path(hash), blob, 0644)
|
||||
err = ioutil.WriteFile(d.path(hash), blob, 0644)
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob on the disk
|
||||
func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (d *DiskStore) PutSD(hash string, blob stream.Blob) error {
|
||||
return d.Put(hash, blob)
|
||||
}
|
||||
|
||||
// Delete deletes the blob from the store
|
||||
func (d *DiskBlobStore) Delete(hash string) error {
|
||||
func (d *DiskStore) Delete(hash string) error {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -149,75 +107,45 @@ func (d *DiskBlobStore) Delete(hash string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return os.Remove(d.path(hash))
|
||||
err = os.Remove(d.path(hash))
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) ensureDiskSpace() {
|
||||
defer func() {
|
||||
d.lastChecked = time.Now()
|
||||
d.diskCleanupBusy <- true
|
||||
}()
|
||||
|
||||
used, err := d.getUsedSpace()
|
||||
// list returns the hashes of blobs that already exist in the blobDir
|
||||
func (d *DiskStore) list() ([]string, error) {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
log.Errorln(err.Error())
|
||||
return
|
||||
}
|
||||
log.Infof("disk usage: %.2f%%\n", used*100)
|
||||
if used > 0.90 {
|
||||
log.Infoln("over 0.90, cleaning up")
|
||||
err = d.WipeOldestBlobs()
|
||||
if err != nil {
|
||||
log.Errorln(err.Error())
|
||||
return
|
||||
}
|
||||
log.Infoln("Done cleaning up")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return speedwalk.AllFiles(d.blobDir, true)
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) WipeOldestBlobs() (err error) {
|
||||
dirs, err := ioutil.ReadDir(d.blobDir)
|
||||
func (d *DiskStore) dir(hash string) string {
|
||||
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
|
||||
return d.blobDir
|
||||
}
|
||||
return path.Join(d.blobDir, hash[:d.prefixLength])
|
||||
}
|
||||
|
||||
func (d *DiskStore) path(hash string) string {
|
||||
return path.Join(d.dir(hash), hash)
|
||||
}
|
||||
|
||||
func (d *DiskStore) ensureDirExists(dir string) error {
|
||||
return errors.Err(os.MkdirAll(dir, 0755))
|
||||
}
|
||||
|
||||
func (d *DiskStore) initOnce() error {
|
||||
if d.initialized {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := d.ensureDirExists(d.blobDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
type datedFile struct {
|
||||
Atime time.Time
|
||||
File *os.FileInfo
|
||||
FullPath string
|
||||
}
|
||||
datedFiles := make([]datedFile, 0, 5000)
|
||||
for _, dir := range dirs {
|
||||
if dir.IsDir() {
|
||||
files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
if file.Mode().IsRegular() && !file.IsDir() {
|
||||
datedFiles = append(datedFiles, datedFile{
|
||||
Atime: atime(file),
|
||||
File: &file,
|
||||
FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(datedFiles, func(i, j int) bool {
|
||||
return datedFiles[i].Atime.Before(datedFiles[j].Atime)
|
||||
})
|
||||
//delete the first 50000 blobs
|
||||
for i, df := range datedFiles {
|
||||
if i >= 50000 {
|
||||
break
|
||||
}
|
||||
log.Infoln(df.FullPath)
|
||||
log.Infoln(df.Atime.String())
|
||||
err = os.Remove(df.FullPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
|
120
store/lru.go
Normal file
120
store/lru.go
Normal file
|
@ -0,0 +1,120 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
||||
golru "github.com/hashicorp/golang-lru"
|
||||
)
|
||||
|
||||
// LRUStore adds a max cache size and LRU eviction to a BlobStore
|
||||
type LRUStore struct {
|
||||
// underlying store
|
||||
store BlobStore
|
||||
// lru implementation
|
||||
lru *golru.Cache
|
||||
}
|
||||
|
||||
// NewLRUStore initialize a new LRUStore
|
||||
func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
|
||||
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
|
||||
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
|
||||
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
l := &LRUStore{
|
||||
store: store,
|
||||
lru: lru,
|
||||
}
|
||||
|
||||
if lstr, ok := store.(lister); ok {
|
||||
err = l.loadExisting(lstr, maxItems)
|
||||
if err != nil {
|
||||
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
|
||||
}
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
const nameLRU = "lru"
|
||||
|
||||
// Name is the cache type name
|
||||
func (l *LRUStore) Name() string { return nameLRU }
|
||||
|
||||
// Has returns whether the blob is in the store, without updating the recent-ness.
|
||||
func (l *LRUStore) Has(hash string) (bool, error) {
|
||||
return l.lru.Contains(hash), nil
|
||||
}
|
||||
|
||||
// Get returns the blob or an error if the blob doesn't exist.
|
||||
func (l *LRUStore) Get(hash string) (stream.Blob, error) {
|
||||
_, has := l.lru.Get(hash)
|
||||
if !has {
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
}
|
||||
blob, err := l.store.Get(hash)
|
||||
if errors.Is(err, ErrBlobNotFound) {
|
||||
// Blob disappeared from underlying store
|
||||
l.lru.Remove(hash)
|
||||
}
|
||||
return blob, err
|
||||
}
|
||||
|
||||
// Put stores the blob
|
||||
func (l *LRUStore) Put(hash string, blob stream.Blob) error {
|
||||
err := l.store.Put(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.lru.Add(hash, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob
|
||||
func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
|
||||
err := l.store.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.lru.Add(hash, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the blob from the store
|
||||
func (l *LRUStore) Delete(hash string) error {
|
||||
err := l.store.Delete(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// This must come after store.Delete()
|
||||
// Remove triggers onEvict function, which also tries to delete blob from store
|
||||
// We need to delete it manually first so any errors can be propagated up
|
||||
l.lru.Remove(hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadExisting imports existing blobs from the underlying store into the LRU cache
|
||||
func (l *LRUStore) loadExisting(store lister, maxItems int) error {
|
||||
existing, err := store.list()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
added := 0
|
||||
for _, h := range existing {
|
||||
l.lru.Add(h, true)
|
||||
added++
|
||||
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
121
store/lru_test.go
Normal file
121
store/lru_test.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const cacheMaxBlobs = 3
|
||||
|
||||
func getTestLRUStore() (*LRUStore, *MemStore) {
|
||||
m := NewMemStore()
|
||||
return NewLRUStore("test", m, 3), m
|
||||
}
|
||||
|
||||
func TestLRUStore_Eviction(t *testing.T) {
|
||||
lru, mem := getTestLRUStore()
|
||||
b := []byte("x")
|
||||
err := lru.Put("one", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("two", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("three", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("four", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("five", b)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||
|
||||
for k, v := range map[string]bool{
|
||||
"one": false,
|
||||
"two": false,
|
||||
"three": true,
|
||||
"four": true,
|
||||
"five": true,
|
||||
"six": false,
|
||||
} {
|
||||
has, err := lru.Has(k)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, v, has)
|
||||
}
|
||||
|
||||
lru.Get("three") // touch so it stays in cache
|
||||
lru.Put("six", b)
|
||||
|
||||
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||
|
||||
for k, v := range map[string]bool{
|
||||
"one": false,
|
||||
"two": false,
|
||||
"three": true,
|
||||
"four": false,
|
||||
"five": true,
|
||||
"six": true,
|
||||
} {
|
||||
has, err := lru.Has(k)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, v, has)
|
||||
}
|
||||
|
||||
err = lru.Delete("three")
|
||||
assert.NoError(t, err)
|
||||
err = lru.Delete("five")
|
||||
assert.NoError(t, err)
|
||||
err = lru.Delete("six")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, len(mem.Debug()))
|
||||
}
|
||||
|
||||
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
|
||||
lru, mem := getTestLRUStore()
|
||||
hash := "hash"
|
||||
b := []byte("this is a blob of stuff")
|
||||
err := lru.Put(hash, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = mem.Delete(hash)
|
||||
require.NoError(t, err)
|
||||
|
||||
// hash still exists in lru
|
||||
assert.True(t, lru.lru.Contains(hash))
|
||||
|
||||
blob, err := lru.Get(hash)
|
||||
assert.Nil(t, blob)
|
||||
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
|
||||
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
|
||||
reflect.TypeOf(err).String(), err.Error())
|
||||
|
||||
// lru.Get() removes hash if underlying store doesn't have it
|
||||
assert.False(t, lru.lru.Contains(hash))
|
||||
}
|
||||
|
||||
func TestLRUStore_loadExisting(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
d := NewDiskStore(tmpDir, 2)
|
||||
|
||||
hash := "hash"
|
||||
b := []byte("this is a blob of stuff")
|
||||
err = d.Put(hash, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
existing, err := d.list()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(existing), "blob should exist in cache")
|
||||
assert.Equal(t, hash, existing[0])
|
||||
|
||||
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created
|
||||
has, err := lru.Has(hash)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, has, "hash should be loaded from disk store but it's not")
|
||||
}
|
|
@ -1,29 +1,42 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
// MemoryBlobStore is an in memory only blob store with no persistence.
|
||||
type MemoryBlobStore struct {
|
||||
// MemStore is an in memory only blob store with no persistence.
|
||||
type MemStore struct {
|
||||
blobs map[string]stream.Blob
|
||||
mu *sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMemoryBlobStore() *MemoryBlobStore {
|
||||
return &MemoryBlobStore{
|
||||
func NewMemStore() *MemStore {
|
||||
return &MemStore{
|
||||
blobs: make(map[string]stream.Blob),
|
||||
mu: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
const nameMem = "mem"
|
||||
|
||||
// Name is the cache type name
|
||||
func (m *MemStore) Name() string { return nameMem }
|
||||
|
||||
// Has returns T/F if the blob is currently stored. It will never error.
|
||||
func (m *MemoryBlobStore) Has(hash string) (bool, error) {
|
||||
func (m *MemStore) Has(hash string) (bool, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
_, ok := m.blobs[hash]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// Get returns the blob byte slice if present and errors if the blob is not found.
|
||||
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (m *MemStore) Get(hash string) (stream.Blob, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
blob, ok := m.blobs[hash]
|
||||
if !ok {
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
|
@ -32,23 +45,29 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
|
|||
}
|
||||
|
||||
// Put stores the blob in memory
|
||||
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (m *MemStore) Put(hash string, blob stream.Blob) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.blobs[hash] = blob
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob in memory
|
||||
func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (m *MemStore) PutSD(hash string, blob stream.Blob) error {
|
||||
return m.Put(hash, blob)
|
||||
}
|
||||
|
||||
// Delete deletes the blob from the store
|
||||
func (m *MemoryBlobStore) Delete(hash string) error {
|
||||
func (m *MemStore) Delete(hash string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.blobs, hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Debug returns the blobs in memory. It's useful for testing and debugging.
|
||||
func (m *MemoryBlobStore) Debug() map[string]stream.Blob {
|
||||
func (m *MemStore) Debug() map[string]stream.Blob {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.blobs
|
||||
}
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
)
|
||||
|
||||
func TestMemoryBlobStore_Put(t *testing.T) {
|
||||
s := NewMemoryBlobStore()
|
||||
func TestMemStore_Put(t *testing.T) {
|
||||
s := NewMemStore()
|
||||
blob := []byte("abcdefg")
|
||||
err := s.Put("abc", blob)
|
||||
if err != nil {
|
||||
|
@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMemoryBlobStore_Get(t *testing.T) {
|
||||
s := NewMemoryBlobStore()
|
||||
func TestMemStore_Get(t *testing.T) {
|
||||
s := NewMemStore()
|
||||
hash := "abc"
|
||||
blob := []byte("abcdefg")
|
||||
err := s.Put(hash, blob)
|
||||
|
|
15
store/noop.go
Normal file
15
store/noop.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package store
|
||||
|
||||
import "github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
// NoopStore is a store that does nothing
|
||||
type NoopStore struct{}
|
||||
|
||||
const nameNoop = "noop"
|
||||
|
||||
func (n *NoopStore) Name() string { return nameNoop }
|
||||
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
|
||||
func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
|
||||
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) Delete(_ string) error { return nil }
|
55
store/s3.go
55
store/s3.go
|
@ -18,8 +18,8 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// S3BlobStore is an S3 store
|
||||
type S3BlobStore struct {
|
||||
// S3Store is an S3 store
|
||||
type S3Store struct {
|
||||
awsID string
|
||||
awsSecret string
|
||||
region string
|
||||
|
@ -28,9 +28,9 @@ type S3BlobStore struct {
|
|||
session *session.Session
|
||||
}
|
||||
|
||||
// NewS3BlobStore returns an initialized S3 store pointer.
|
||||
func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
|
||||
return &S3BlobStore{
|
||||
// NewS3Store returns an initialized S3 store pointer.
|
||||
func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store {
|
||||
return &S3Store{
|
||||
awsID: awsID,
|
||||
awsSecret: awsSecret,
|
||||
region: region,
|
||||
|
@ -38,25 +38,13 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *S3BlobStore) initOnce() error {
|
||||
if s.session != nil {
|
||||
return nil
|
||||
}
|
||||
const nameS3 = "s3"
|
||||
|
||||
sess, err := session.NewSession(&aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
|
||||
Region: aws.String(s.region),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
// Name is the cache type name
|
||||
func (s *S3Store) Name() string { return nameS3 }
|
||||
|
||||
// Has returns T/F or Error ( from S3 ) if the store contains the blob.
|
||||
func (s *S3BlobStore) Has(hash string) (bool, error) {
|
||||
func (s *S3Store) Has(hash string) (bool, error) {
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -77,7 +65,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) {
|
|||
}
|
||||
|
||||
// Get returns the blob slice if present or errors on S3.
|
||||
func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (s *S3Store) Get(hash string) (stream.Blob, error) {
|
||||
//Todo-Need to handle error for blob doesn't exist for consistency.
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
|
@ -110,7 +98,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
|
|||
}
|
||||
|
||||
// Put stores the blob on S3 or errors if S3 connection errors.
|
||||
func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (s *S3Store) Put(hash string, blob stream.Blob) error {
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -133,12 +121,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
|
|||
}
|
||||
|
||||
// PutSD stores the sd blob on S3 or errors if S3 connection errors.
|
||||
func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (s *S3Store) PutSD(hash string, blob stream.Blob) error {
|
||||
//Todo - handle missing stream for consistency
|
||||
return s.Put(hash, blob)
|
||||
}
|
||||
|
||||
func (s *S3BlobStore) Delete(hash string) error {
|
||||
func (s *S3Store) Delete(hash string) error {
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -153,3 +141,20 @@ func (s *S3BlobStore) Delete(hash string) error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *S3Store) initOnce() error {
|
||||
if s.session != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
sess, err := session.NewSession(&aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
|
||||
Region: aws.String(s.region),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
|
|
67
store/singleflight.go
Normal file
67
store/singleflight.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
func WithSingleFlight(component string, origin BlobStore) BlobStore {
|
||||
return &singleflightStore{
|
||||
BlobStore: origin,
|
||||
component: component,
|
||||
sf: new(singleflight.Group),
|
||||
}
|
||||
}
|
||||
|
||||
type singleflightStore struct {
|
||||
BlobStore
|
||||
|
||||
component string
|
||||
sf *singleflight.Group
|
||||
}
|
||||
|
||||
func (s *singleflightStore) Name() string {
|
||||
return "sf_" + s.BlobStore.Name()
|
||||
}
|
||||
|
||||
// Get ensures that only one request per hash is sent to the origin at a time,
|
||||
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
||||
func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
|
||||
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
|
||||
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
|
||||
|
||||
blob, err, _ := s.sf.Do(hash, s.getter(hash))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return blob.(stream.Blob), nil
|
||||
}
|
||||
|
||||
// getter returns a function that gets a blob from the origin
|
||||
// only one getter per hash will be executing at a time
|
||||
func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
||||
return func() (interface{}, error) {
|
||||
metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
|
||||
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
|
||||
|
||||
start := time.Now()
|
||||
blob, err := s.BlobStore.Get(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.CacheRetrievalSpeed.With(map[string]string{
|
||||
metrics.LabelCacheType: s.BlobStore.Name(),
|
||||
metrics.LabelComponent: s.component,
|
||||
metrics.LabelSource: "origin",
|
||||
}).Set(rate)
|
||||
|
||||
return blob, nil
|
||||
}
|
||||
}
|
89
store/speedwalk/speedwalk.go
Normal file
89
store/speedwalk/speedwalk.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package speedwalk
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
|
||||
"github.com/karrick/godirwalk"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// AllFiles recursively lists every file in every subdirectory of a given directory
|
||||
// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir.
|
||||
func AllFiles(startDir string, basename bool) ([]string, error) {
|
||||
items, err := ioutil.ReadDir(startDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pathChan := make(chan string)
|
||||
paths := make([]string, 0, 1000)
|
||||
pathWG := &sync.WaitGroup{}
|
||||
pathWG.Add(1)
|
||||
go func() {
|
||||
defer pathWG.Done()
|
||||
for {
|
||||
path, ok := <-pathChan
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
paths = append(paths, path)
|
||||
}
|
||||
}()
|
||||
|
||||
maxThreads := runtime.NumCPU() - 1
|
||||
goroutineLimiter := make(chan struct{}, maxThreads)
|
||||
for i := 0; i < maxThreads; i++ {
|
||||
goroutineLimiter <- struct{}{}
|
||||
}
|
||||
|
||||
walkerWG := &sync.WaitGroup{}
|
||||
for _, item := range items {
|
||||
if !item.IsDir() {
|
||||
if basename {
|
||||
pathChan <- item.Name()
|
||||
} else {
|
||||
pathChan <- filepath.Join(startDir, item.Name())
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
<-goroutineLimiter
|
||||
walkerWG.Add(1)
|
||||
|
||||
go func(dir string) {
|
||||
defer func() {
|
||||
walkerWG.Done()
|
||||
goroutineLimiter <- struct{}{}
|
||||
}()
|
||||
|
||||
err = godirwalk.Walk(filepath.Join(startDir, dir), &godirwalk.Options{
|
||||
Unsorted: true, // faster this way
|
||||
Callback: func(osPathname string, de *godirwalk.Dirent) error {
|
||||
if de.IsRegular() {
|
||||
if basename {
|
||||
pathChan <- de.Name()
|
||||
} else {
|
||||
pathChan <- osPathname
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
}(item.Name())
|
||||
}
|
||||
|
||||
walkerWG.Wait()
|
||||
|
||||
close(pathChan)
|
||||
pathWG.Wait()
|
||||
|
||||
return paths, nil
|
||||
}
|
|
@ -7,15 +7,17 @@ import (
|
|||
|
||||
// BlobStore is an interface for handling blob storage.
|
||||
type BlobStore interface {
|
||||
// Does blob exist in the store
|
||||
// Name of blob store (useful for metrics)
|
||||
Name() string
|
||||
// Does blob exist in the store.
|
||||
Has(hash string) (bool, error)
|
||||
// Get the blob from the store
|
||||
// Get the blob from the store. Must return ErrBlobNotFound if blob is not in store.
|
||||
Get(hash string) (stream.Blob, error)
|
||||
// Put the blob into the store
|
||||
// Put the blob into the store.
|
||||
Put(hash string, blob stream.Blob) error
|
||||
// Put an SD blob into the store
|
||||
// Put an SD blob into the store.
|
||||
PutSD(hash string, blob stream.Blob) error
|
||||
// Delete the blob from the store
|
||||
// Delete the blob from the store.
|
||||
Delete(hash string) error
|
||||
}
|
||||
|
||||
|
@ -27,5 +29,11 @@ type Blocklister interface {
|
|||
Wants(hash string) (bool, error)
|
||||
}
|
||||
|
||||
// lister is a store that can list cached blobs. This is helpful when an overlay
|
||||
// cache needs to track blob existence.
|
||||
type lister interface {
|
||||
list() ([]string, error)
|
||||
}
|
||||
|
||||
//ErrBlobNotFound is a standard error when a blob is not found in the store.
|
||||
var ErrBlobNotFound = errors.Base("blob not found")
|
||||
|
|
Loading…
Add table
Reference in a new issue