Smarter caches #46

Merged
lyoshenka merged 13 commits from smarter_caches into master 2020-11-04 22:04:22 +01:00
30 changed files with 957 additions and 444 deletions

View file

@ -28,9 +28,10 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
addr := args[0] addr := args[0]
sdHash := args[1] sdHash := args[1]
s := store.NewCachingBlobStore( s := store.NewCachingStore(
"getstream",
peer.NewStore(peer.StoreOpts{Address: addr}), peer.NewStore(peer.StoreOpts{Address: addr}),
store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2), store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
) )
wd, err := os.Getwd() wd, err := os.Getwd()

View file

@ -29,7 +29,7 @@ func init() {
func peerCmd(cmd *cobra.Command, args []string) { func peerCmd(cmd *cobra.Command, args []string) {
var err error var err error
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
peerServer := peer.NewServer(s3) peerServer := peer.NewServer(s3)
if !peerNoDB { if !peerNoDB {

View file

@ -4,6 +4,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"strconv" "strconv"
"strings"
"syscall" "syscall"
"time" "time"
@ -16,21 +17,25 @@ import (
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cast"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
var reflectorCmdCacheDir string var (
var tcpPeerPort int tcpPeerPort int
var http3PeerPort int http3PeerPort int
var receiverPort int receiverPort int
var metricsPort int metricsPort int
var disableUploads bool disableUploads bool
var disableBlocklist bool disableBlocklist bool
var proxyAddress string proxyAddress string
var proxyPort string proxyPort string
var proxyProtocol string proxyProtocol string
var useDB bool useDB bool
var cloudFrontEndpoint string cloudFrontEndpoint string
reflectorCmdDiskCache string
reflectorCmdMemCache int
)
func init() { func init() {
var cmd = &cobra.Command{ var cmd = &cobra.Command{
@ -38,7 +43,6 @@ func init() {
Short: "Run reflector server", Short: "Run reflector server",
Run: reflectorCmd, Run: reflectorCmd,
} }
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)")
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
@ -50,94 +54,142 @@ func init() {
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
"enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
} }
func reflectorCmd(cmd *cobra.Command, args []string) { func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString()) log.Printf("reflector %s", meta.VersionString())
var blobStore store.BlobStore // the blocklist logic requires the db backed store to be the outer-most store
if proxyAddress != "" { underlyingStore := setupStore()
switch proxyProtocol { outerStore := wrapWithCache(underlyingStore)
case "tcp":
blobStore = peer.NewStore(peer.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
case "http3":
blobStore = http3.NewStore(http3.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
default:
log.Fatalf("specified protocol is not recognized: %s", proxyProtocol)
}
} else {
s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
if cloudFrontEndpoint != "" {
blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store)
} else {
blobStore = s3Store
}
}
var err error if !disableUploads {
var reflectorServer *reflector.Server reflectorServer := reflector.NewServer(underlyingStore)
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = !disableBlocklist
if useDB { err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
db := new(db.SQL)
db.TrackAccessTime = true
err = db.Connect(globalConfig.DBConn)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer reflectorServer.Shutdown()
blobStore = store.NewDBBackedStore(blobStore, db)
//this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work....
//having this here prevents uploaded blobs from being stored in the disk cache
if !disableUploads {
reflectorServer = reflector.NewServer(blobStore)
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = !disableBlocklist
err = reflectorServer.Start(":" + strconv.Itoa(receiverPort))
if err != nil {
log.Fatal(err)
}
}
} }
if reflectorCmdCacheDir != "" { peerServer := peer.NewServer(outerStore)
err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm) err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
if err != nil {
log.Fatal(err)
}
blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2))
}
peerServer := peer.NewServer(blobStore)
err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer peerServer.Shutdown()
http3PeerServer := http3.NewServer(blobStore) http3PeerServer := http3.NewServer(outerStore)
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
defer http3PeerServer.Shutdown()
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start() metricsServer.Start()
defer metricsServer.Shutdown()
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan <-interruptChan
metricsServer.Shutdown() // deferred shutdowns happen now
peerServer.Shutdown() }
http3PeerServer.Shutdown()
if reflectorServer != nil { func setupStore() store.BlobStore {
reflectorServer.Shutdown() var s store.BlobStore
}
if proxyAddress != "" {
switch proxyProtocol {
case "tcp":
s = peer.NewStore(peer.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
case "http3":
s = http3.NewStore(http3.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
default:
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
}
} else {
s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
if cloudFrontEndpoint != "" {
s = store.NewCloudFrontRWStore(store.NewCloudFrontROStore(cloudFrontEndpoint), s3Store)
} else {
s = s3Store
}
}
if useDB {
db := new(db.SQL)
db.TrackAccessTime = true
err := db.Connect(globalConfig.DBConn)
if err != nil {
log.Fatal(err)
}
s = store.NewDBBackedStore(s, db)
}
return s
}
func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped := s
diskCacheMaxSize, diskCachePath := diskCacheParams()
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
log.Fatal(err)
}
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
)
}
if reflectorCmdMemCache > 0 {
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache),
)
}
return wrapped
}
func diskCacheParams() (int, string) {
if reflectorCmdDiskCache == "" {
return 0, ""
}
parts := strings.Split(reflectorCmdDiskCache, ":")
if len(parts) != 2 {
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
}
maxSize := cast.ToInt(parts[0])
if maxSize <= 0 {
log.Fatalf("--disk-cache max size must be more than 0")
}
path := parts[1]
if len(path) == 0 || path[0] != '/' {
log.Fatalf("--disk-cache path must start with '/'")
}
return maxSize, path
} }

View file

@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL) db := new(db.SQL)
err := db.Connect(globalConfig.DBConn) err := db.Connect(globalConfig.DBConn)
checkErr(err) checkErr(err)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
comboStore := store.NewDBBackedStore(s3, db) comboStore := store.NewDBBackedStore(s3, db)
conf := prism.DefaultConf() conf := prism.DefaultConf()

View file

@ -29,7 +29,7 @@ func init() {
func testCmd(cmd *cobra.Command, args []string) { func testCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString()) log.Printf("reflector %s", meta.VersionString())
memStore := store.NewMemoryBlobStore() memStore := store.NewMemStore()
reflectorServer := reflector.NewServer(memStore) reflectorServer := reflector.NewServer(memStore)
reflectorServer.Timeout = 3 * time.Minute reflectorServer.Timeout = 3 * time.Minute

View file

@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) {
checkErr(err) checkErr(err)
st := store.NewDBBackedStore( st := store.NewDBBackedStore(
store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
db) db)
uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload) uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload)

6
go.mod
View file

@ -14,11 +14,12 @@ require (
github.com/google/gops v0.3.7 github.com/google/gops v0.3.7
github.com/gorilla/mux v1.7.4 github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/memberlist v0.1.4 // indirect github.com/hashicorp/memberlist v0.1.4 // indirect
github.com/hashicorp/serf v0.8.2 github.com/hashicorp/serf v0.8.2
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07 github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07
github.com/karrick/godirwalk v1.16.1
github.com/lbryio/chainquery v1.9.0 github.com/lbryio/chainquery v1.9.0
github.com/lbryio/lbry.go v1.1.2 // indirect github.com/lbryio/lbry.go v1.1.2 // indirect
github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128 github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128
@ -27,9 +28,11 @@ require (
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
github.com/prometheus/client_golang v0.9.2 github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.4.1
github.com/spf13/cast v1.3.0 github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.3 github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.4.0
github.com/volatiletech/null v8.0.0+incompatible github.com/volatiletech/null v8.0.0+incompatible
go.uber.org/atomic v1.5.1 go.uber.org/atomic v1.5.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
@ -37,6 +40,7 @@ require (
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect
google.golang.org/appengine v1.6.2 // indirect google.golang.org/appengine v1.6.2 // indirect
gotest.tools v2.2.0+incompatible
) )
go 1.15 go 1.15

11
go.sum
View file

@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno=
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
@ -186,12 +188,15 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro=
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/keybase/go-ps v0.0.0-20161005175911-668c8856d999/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ= github.com/keybase/go-ps v0.0.0-20161005175911-668c8856d999/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -280,6 +285,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
@ -346,6 +352,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I= github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I=
github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ=
github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
@ -394,6 +402,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -471,6 +480,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w= golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w=

View file

@ -12,6 +12,7 @@ import (
ee "github.com/lbryio/lbry.go/v2/extras/errors" ee "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
@ -57,7 +58,8 @@ func (s *Server) Shutdown() {
} }
const ( const (
ns = "reflector" ns = "reflector"
subsystemCache = "cache"
labelDirection = "direction" labelDirection = "direction"
labelErrorType = "error_type" labelErrorType = "error_type"
@ -65,7 +67,9 @@ const (
DirectionUpload = "upload" // to reflector DirectionUpload = "upload" // to reflector
DirectionDownload = "download" // from reflector DirectionDownload = "download" // from reflector
MtrLabelSource = "source" LabelCacheType = "cache_type"
LabelComponent = "component"
LabelSource = "source"
errConnReset = "conn_reset" errConnReset = "conn_reset"
errReadConnReset = "read_conn_reset" errReadConnReset = "read_conn_reset"
@ -92,6 +96,12 @@ const (
) )
var ( var (
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "error_total",
Help: "Total number of errors",
}, []string{labelDirection, labelErrorType})
BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "blob_download_total", Name: "blob_download_total",
@ -107,27 +117,44 @@ var (
Name: "http3_blob_download_total", Name: "http3_blob_download_total",
Help: "Total number of blobs downloaded from reflector through QUIC protocol", Help: "Total number of blobs downloaded from reflector through QUIC protocol",
}) })
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "cache_hit_total", Subsystem: subsystemCache,
Name: "hit_total",
Help: "Total number of blobs retrieved from the cache storage", Help: "Total number of blobs retrieved from the cache storage",
}) }, []string{LabelCacheType, LabelComponent})
CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{ CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "cache_miss_total", Subsystem: subsystemCache,
Name: "miss_total",
Help: "Total number of blobs retrieved from origin rather than cache storage", Help: "Total number of blobs retrieved from origin rather than cache storage",
}) }, []string{LabelCacheType, LabelComponent})
CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "cache_origin_requests_total", Subsystem: subsystemCache,
Name: "origin_requests_total",
Help: "How many Get requests are in flight from the cache to the origin", Help: "How many Get requests are in flight from the cache to the origin",
}) }, []string{LabelCacheType, LabelComponent})
// during thundering-herd situations, the metric below should be a lot smaller than the metric above // during thundering-herd situations, the metric below should be a lot smaller than the metric above
CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "cache_waiting_requests_total", Subsystem: subsystemCache,
Name: "waiting_requests_total",
Help: "How many cache requests are waiting for an in-flight origin request", Help: "How many cache requests are waiting for an in-flight origin request",
}) }, []string{LabelCacheType, LabelComponent})
CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: subsystemCache,
Name: "evict_total",
Help: "Count of blobs evicted from cache",
}, []string{LabelCacheType, LabelComponent})
CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "speed_mbps",
Help: "Speed of blob retrieval from cache or from origin",
}, []string{LabelCacheType, LabelComponent, LabelSource})
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "blob_upload_total", Name: "blob_upload_total",
@ -138,16 +165,7 @@ var (
Name: "sdblob_upload_total", Name: "sdblob_upload_total",
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
}) })
RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "speed_mbps",
Help: "Speed of blob retrieval",
}, []string{MtrLabelSource})
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "error_total",
Help: "Total number of errors",
}, []string{labelDirection, labelErrorType})
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "tcp_in_bytes", Name: "tcp_in_bytes",
@ -185,6 +203,13 @@ var (
}) })
) )
func CacheLabels(name, component string) prometheus.Labels {
return prometheus.Labels{
LabelCacheType: name,
LabelComponent: component,
}
}
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever
if e == nil { if e == nil {
return return

View file

@ -164,7 +164,7 @@ func generateTLSConfig() *tls.Config {
func (s *Server) listenAndServe(server *http3.Server) { func (s *Server) listenAndServe(server *http3.Server) {
err := server.ListenAndServe() err := server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) { if err != nil && err.Error() != "server closed" {
log.Errorln(errors.FullTrace(err)) log.Errorln(errors.FullTrace(err))
} }
} }

View file

@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) {
return c, errors.Prefix("connection error", err) return c, errors.Prefix("connection error", err)
} }
func (p *Store) Name() string { return "http3" }
// Has asks the peer if they have a hash // Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) { func (p *Store) Has(hash string) (bool, error) {
c, err := p.getClient() c, err := p.getClient()

View file

@ -34,7 +34,7 @@ var availabilityRequests = []pair{
} }
func getServer(t *testing.T, withBlobs bool) *Server { func getServer(t *testing.T, withBlobs bool) *Server {
st := store.NewMemoryBlobStore() st := store.NewMemStore()
if withBlobs { if withBlobs {
for k, v := range blobs { for k, v := range blobs {
err := st.Put(k, v) err := st.Put(k, v)

View file

@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) {
return c, errors.Prefix("connection error", err) return c, errors.Prefix("connection error", err)
} }
func (p *Store) Name() string { return "peer" }
// Has asks the peer if they have a hash // Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) { func (p *Store) Has(hash string) (bool, error) {
c, err := p.getClient() c, err := p.getClient()

View file

@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
t.Fatal(err) t.Fatal(err)
} }
srv := NewServer(store.NewMemoryBlobStore()) srv := NewServer(store.NewMemStore())
err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
srv := NewServer(store.NewMemoryBlobStore()) srv := NewServer(store.NewMemStore())
srv.Timeout = testTimeout srv.Timeout = testTimeout
err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil { if err != nil {
@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) {
//} //}
type mockPartialStore struct { type mockPartialStore struct {
*store.MemoryBlobStore *store.MemStore
missing []string missing []string
} }
@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) {
missing[i] = bits.Rand().String() missing[i] = bits.Rand().String()
} }
st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing}) st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing})
if _, ok := st.(neededBlobChecker); !ok { if _, ok := st.(neededBlobChecker); !ok {
t.Fatal("mock does not implement the relevant interface") t.Fatal("mock does not implement the relevant interface")
} }

View file

@ -7,26 +7,32 @@ import (
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/internal/metrics"
"golang.org/x/sync/singleflight"
) )
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance. // CachingStore combines two stores, typically a local and a remote store, to improve performance.
// Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they // Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin. // are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
type CachingBlobStore struct { type CachingStore struct {
origin, cache BlobStore origin, cache BlobStore
component string
sf *singleflight.Group
} }
// NewCachingBlobStore makes a new caching disk store and returns a pointer to it. // NewCachingStore makes a new caching disk store and returns a pointer to it.
func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore { func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)} return &CachingStore{
component: component,
origin: WithSingleFlight(component, origin),
cache: cache,
}
} }
const nameCaching = "caching"
// Name is the cache type name
func (c *CachingStore) Name() string { return nameCaching }
// Has checks the cache and then the origin for a hash. It returns true if either store has it. // Has checks the cache and then the origin for a hash. It returns true if either store has it.
func (c *CachingBlobStore) Has(hash string) (bool, error) { func (c *CachingStore) Has(hash string) (bool, error) {
has, err := c.cache.Has(hash) has, err := c.cache.Has(hash)
if has || err != nil { if has || err != nil {
return has, err return has, err
@ -36,49 +42,33 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) {
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
// from the origin, it is also stored in the cache. // from the origin, it is also stored in the cache.
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { func (c *CachingStore) Get(hash string) (stream.Blob, error) {
start := time.Now() start := time.Now()
blob, err := c.cache.Get(hash) blob, err := c.cache.Get(hash)
if err == nil || !errors.Is(err, ErrBlobNotFound) { if err == nil || !errors.Is(err, ErrBlobNotFound) {
metrics.CacheHitCount.Inc() metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate) metrics.CacheRetrievalSpeed.With(map[string]string{
metrics.LabelCacheType: c.cache.Name(),
metrics.LabelComponent: c.component,
metrics.LabelSource: "cache",
}).Set(rate)
return blob, err return blob, err
} }
metrics.CacheMissCount.Inc() metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
return c.getFromOrigin(hash)
}
// getFromOrigin ensures that only one Get per hash is sent to the origin at a time, blob, err = c.origin.Get(hash)
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) {
metrics.CacheWaitingRequestsCount.Inc()
defer metrics.CacheWaitingRequestsCount.Dec()
originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) {
metrics.CacheOriginRequestsCount.Inc()
defer metrics.CacheOriginRequestsCount.Dec()
start := time.Now()
blob, err := c.origin.Get(hash)
if err != nil {
return nil, err
}
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate)
err = c.cache.Put(hash, blob)
return blob, err
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
return originBlob.(stream.Blob), nil
err = c.cache.Put(hash, blob)
return blob, err
} }
// Put stores the blob in the origin and the cache // Put stores the blob in the origin and the cache
func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { func (c *CachingStore) Put(hash string, blob stream.Blob) error {
err := c.origin.Put(hash, blob) err := c.origin.Put(hash, blob)
if err != nil { if err != nil {
return err return err
@ -87,7 +77,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
} }
// PutSD stores the sd blob in the origin and the cache // PutSD stores the sd blob in the origin and the cache
func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { func (c *CachingStore) PutSD(hash string, blob stream.Blob) error {
err := c.origin.PutSD(hash, blob) err := c.origin.PutSD(hash, blob)
if err != nil { if err != nil {
return err return err
@ -96,7 +86,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
} }
// Delete deletes the blob from the origin and the cache // Delete deletes the blob from the origin and the cache
func (c *CachingBlobStore) Delete(hash string) error { func (c *CachingStore) Delete(hash string) error {
err := c.origin.Delete(hash) err := c.origin.Delete(hash)
if err != nil { if err != nil {
return err return err

View file

@ -9,10 +9,10 @@ import (
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
) )
func TestCachingBlobStore_Put(t *testing.T) { func TestCachingStore_Put(t *testing.T) {
origin := NewMemoryBlobStore() origin := NewMemStore()
cache := NewMemoryBlobStore() cache := NewMemStore()
s := NewCachingBlobStore(origin, cache) s := NewCachingStore("test", origin, cache)
b := []byte("this is a blob of stuff") b := []byte("this is a blob of stuff")
hash := "hash" hash := "hash"
@ -39,10 +39,10 @@ func TestCachingBlobStore_Put(t *testing.T) {
} }
} }
func TestCachingBlobStore_CacheMiss(t *testing.T) { func TestCachingStore_CacheMiss(t *testing.T) {
origin := NewMemoryBlobStore() origin := NewMemStore()
cache := NewMemoryBlobStore() cache := NewMemStore()
s := NewCachingBlobStore(origin, cache) s := NewCachingStore("test", origin, cache)
b := []byte("this is a blob of stuff") b := []byte("this is a blob of stuff")
hash := "hash" hash := "hash"
@ -76,11 +76,11 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) {
} }
} }
func TestCachingBlobStore_ThunderingHerd(t *testing.T) { func TestCachingStore_ThunderingHerd(t *testing.T) {
storeDelay := 100 * time.Millisecond storeDelay := 100 * time.Millisecond
origin := NewSlowBlobStore(storeDelay) origin := NewSlowBlobStore(storeDelay)
cache := NewMemoryBlobStore() cache := NewMemStore()
s := NewCachingBlobStore(origin, cache) s := NewCachingStore("test", origin, cache)
b := []byte("this is a blob of stuff") b := []byte("this is a blob of stuff")
hash := "hash" hash := "hash"
@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
// SlowBlobStore adds a delay to each request // SlowBlobStore adds a delay to each request
type SlowBlobStore struct { type SlowBlobStore struct {
mem *MemoryBlobStore mem *MemStore
delay time.Duration delay time.Duration
} }
func NewSlowBlobStore(delay time.Duration) *SlowBlobStore { func NewSlowBlobStore(delay time.Duration) *SlowBlobStore {
return &SlowBlobStore{ return &SlowBlobStore{
mem: NewMemoryBlobStore(), mem: NewMemStore(),
delay: delay, delay: delay,
} }
} }
func (s *SlowBlobStore) Name() string {
return "slow"
}
func (s *SlowBlobStore) Has(hash string) (bool, error) { func (s *SlowBlobStore) Has(hash string) (bool, error) {
time.Sleep(s.delay) time.Sleep(s.delay)

View file

@ -1,109 +0,0 @@
package store
import (
"io/ioutil"
"net/http"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
log "github.com/sirupsen/logrus"
)
// CloudFrontBlobStore is an CloudFront backed store (retrieval only)
type CloudFrontBlobStore struct {
cfEndpoint string
s3Store *S3BlobStore
}
// NewS3BlobStore returns an initialized S3 store pointer.
func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore {
return &CloudFrontBlobStore{
cfEndpoint: cloudFrontEndpoint,
s3Store: S3Store,
}
}
// Has returns T/F or Error if the store contains the blob.
func (s *CloudFrontBlobStore) Has(hash string) (bool, error) {
url := s.cfEndpoint + hash
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return false, errors.Err(err)
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return false, errors.Err(err)
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusNotFound, http.StatusForbidden:
return false, nil
case http.StatusOK:
return true, nil
default:
return false, errors.Err(res.Status)
}
}
// Get returns the blob slice if present or errors.
func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) {
url := s.cfEndpoint + hash
log.Debugf("Getting %s from S3", hash[:8])
defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
}(time.Now())
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, errors.Err(err)
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, errors.Err(err)
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusNotFound, http.StatusForbidden:
return nil, errors.Err(ErrBlobNotFound)
case http.StatusOK:
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, errors.Err(err)
}
metrics.MtrInBytesS3.Add(float64(len(b)))
return b, nil
default:
return nil, errors.Err(res.Status)
}
}
// Put stores the blob on S3 or errors if S3 store is not present.
func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error {
if s.s3Store != nil {
return s.s3Store.Put(hash, blob)
}
return errors.Err("not implemented in cloudfront store")
}
// PutSD stores the sd blob on S3 or errors if S3 store is not present.
func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error {
if s.s3Store != nil {
return s.s3Store.PutSD(hash, blob)
}
return errors.Err("not implemented in cloudfront store")
}
func (s *CloudFrontBlobStore) Delete(hash string) error {
if s.s3Store != nil {
return s.s3Store.Delete(hash)
}
return errors.Err("not implemented in cloudfront store")
}

105
store/cloudfront_ro.go Normal file
View file

@ -0,0 +1,105 @@
package store
import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)
// CloudFrontROStore reads from cloudfront. All writes panic.
type CloudFrontROStore struct {
endpoint string // cloudflare endpoint
}
// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer.
func NewCloudFrontROStore(endpoint string) *CloudFrontROStore {
return &CloudFrontROStore{endpoint: endpoint}
}
const nameCloudFrontRO = "cloudfront_ro"
// Name is the cache type name
func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO }
// Has checks if the hash is in the store.
func (c *CloudFrontROStore) Has(hash string) (bool, error) {
status, body, err := c.cfRequest(http.MethodHead, hash)
if err != nil {
return false, err
}
defer body.Close()
switch status {
case http.StatusNotFound, http.StatusForbidden:
return false, nil
case http.StatusOK:
return true, nil
default:
return false, errors.Err("unexpected status %d", status)
}
}
// Get gets the blob from Cloudfront.
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) {
log.Debugf("Getting %s from S3", hash[:8])
defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
}(time.Now())
status, body, err := c.cfRequest(http.MethodGet, hash)
if err != nil {
return nil, err
}
defer body.Close()
switch status {
case http.StatusNotFound, http.StatusForbidden:
return nil, errors.Err(ErrBlobNotFound)
case http.StatusOK:
b, err := ioutil.ReadAll(body)
if err != nil {
return nil, errors.Err(err)
}
metrics.MtrInBytesS3.Add(float64(len(b)))
return b, nil
default:
return nil, errors.Err("unexpected status %d", status)
}
}
func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) {
url := c.endpoint + hash
req, err := http.NewRequest(method, url, nil)
if err != nil {
return 0, nil, errors.Err(err)
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, nil, errors.Err(err)
}
return res.StatusCode, res.Body, nil
}
func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error {
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
}
func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error {
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
}
func (c *CloudFrontROStore) Delete(_ string) error {
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
}

50
store/cloudfront_rw.go Normal file
View file

@ -0,0 +1,50 @@
package store
import (
"github.com/lbryio/lbry.go/v2/stream"
)
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3.
type CloudFrontRWStore struct {
cf *CloudFrontROStore
s3 *S3Store
}
// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer.
// NOTE: It panics if either argument is nil.
func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore {
if cf == nil || s3 == nil {
panic("both stores must be set")
}
return &CloudFrontRWStore{cf: cf, s3: s3}
}
const nameCloudFrontRW = "cloudfront_rw"
// Name is the cache type name
func (c *CloudFrontRWStore) Name() string { return nameCloudFrontRW }
// Has checks if the hash is in the store.
func (c *CloudFrontRWStore) Has(hash string) (bool, error) {
return c.cf.Has(hash)
}
// Get gets the blob from Cloudfront.
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) {
return c.cf.Get(hash)
}
// Put stores the blob on S3
func (c *CloudFrontRWStore) Put(hash string, blob stream.Blob) error {
return c.s3.Put(hash, blob)
}
// PutSD stores the sd blob on S3
func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error {
return c.s3.PutSD(hash, blob)
}
// Delete deletes the blob from S3
func (c *CloudFrontRWStore) Delete(hash string) error {
return c.s3.Delete(hash)
}

View file

@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore {
return &DBBackedStore{blobs: blobs, db: db} return &DBBackedStore{blobs: blobs, db: db}
} }
const nameDBBacked = "db-backed"
// Name is the cache type name
func (d *DBBackedStore) Name() string { return nameDBBacked }
// Has returns true if the blob is in the store // Has returns true if the blob is in the store
func (d *DBBackedStore) Has(hash string) (bool, error) { func (d *DBBackedStore) Has(hash string) (bool, error) {
return d.db.HasBlob(hash) return d.db.HasBlob(hash)

View file

@ -4,82 +4,38 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
"path/filepath"
"sort"
"syscall"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/store/speedwalk"
log "github.com/sirupsen/logrus"
) )
// DiskBlobStore stores blobs on a local disk // DiskStore stores blobs on a local disk
type DiskBlobStore struct { type DiskStore struct {
// the location of blobs on disk // the location of blobs on disk
blobDir string blobDir string
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
prefixLength int prefixLength int
initialized bool // true if initOnce ran, false otherwise
lastChecked time.Time initialized bool
diskCleanupBusy chan bool
} }
// NewDiskBlobStore returns an initialized file disk store pointer. // NewDiskStore returns an initialized file disk store pointer.
func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { func NewDiskStore(dir string, prefixLength int) *DiskStore {
dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)} return &DiskStore{
dbs.diskCleanupBusy <- true blobDir: dir,
return &dbs prefixLength: prefixLength,
}
func (d *DiskBlobStore) dir(hash string) string {
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
return d.blobDir
} }
return path.Join(d.blobDir, hash[:d.prefixLength])
} }
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path const nameDisk = "disk"
func (d *DiskBlobStore) getUsedSpace() (float32, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(d.blobDir, &stat)
if err != nil {
return 0, err
}
// Available blocks * size per block = available space in bytes
all := stat.Blocks * uint64(stat.Bsize)
free := stat.Bfree * uint64(stat.Bsize)
used := all - free
return float32(used) / float32(all), nil // Name is the cache type name
} func (d *DiskStore) Name() string { return nameDisk }
func (d *DiskBlobStore) path(hash string) string {
return path.Join(d.dir(hash), hash)
}
func (d *DiskBlobStore) ensureDirExists(dir string) error {
return errors.Err(os.MkdirAll(dir, 0755))
}
func (d *DiskBlobStore) initOnce() error {
if d.initialized {
return nil
}
err := d.ensureDirExists(d.blobDir)
if err != nil {
return err
}
d.initialized = true
return nil
}
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
func (d *DiskBlobStore) Has(hash string) (bool, error) { func (d *DiskStore) Has(hash string) (bool, error) {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return false, err return false, err
@ -90,13 +46,13 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return false, nil return false, nil
} }
return false, err return false, errors.Err(err)
} }
return true, nil return true, nil
} }
// Get returns the blob or an error if the blob doesn't exist. // Get returns the blob or an error if the blob doesn't exist.
func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { func (d *DiskStore) Get(hash string) (stream.Blob, error) {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return nil, err return nil, err
@ -111,11 +67,12 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
} }
defer file.Close() defer file.Close()
return ioutil.ReadAll(file) blob, err := ioutil.ReadAll(file)
return blob, errors.Err(err)
} }
// Put stores the blob on disk // Put stores the blob on disk
func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { func (d *DiskStore) Put(hash string, blob stream.Blob) error {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return err return err
@ -126,16 +83,17 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
return err return err
} }
return ioutil.WriteFile(d.path(hash), blob, 0644) err = ioutil.WriteFile(d.path(hash), blob, 0644)
return errors.Err(err)
} }
// PutSD stores the sd blob on the disk // PutSD stores the sd blob on the disk
func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error { func (d *DiskStore) PutSD(hash string, blob stream.Blob) error {
return d.Put(hash, blob) return d.Put(hash, blob)
} }
// Delete deletes the blob from the store // Delete deletes the blob from the store
func (d *DiskBlobStore) Delete(hash string) error { func (d *DiskStore) Delete(hash string) error {
err := d.initOnce() err := d.initOnce()
if err != nil { if err != nil {
return err return err
@ -149,75 +107,45 @@ func (d *DiskBlobStore) Delete(hash string) error {
return nil return nil
} }
return os.Remove(d.path(hash)) err = os.Remove(d.path(hash))
return errors.Err(err)
} }
func (d *DiskBlobStore) ensureDiskSpace() { // list returns the hashes of blobs that already exist in the blobDir
defer func() { func (d *DiskStore) list() ([]string, error) {
d.lastChecked = time.Now() err := d.initOnce()
d.diskCleanupBusy <- true
}()
used, err := d.getUsedSpace()
if err != nil { if err != nil {
log.Errorln(err.Error()) return nil, err
return
}
log.Infof("disk usage: %.2f%%\n", used*100)
if used > 0.90 {
log.Infoln("over 0.90, cleaning up")
err = d.WipeOldestBlobs()
if err != nil {
log.Errorln(err.Error())
return
}
log.Infoln("Done cleaning up")
} }
return speedwalk.AllFiles(d.blobDir, true)
} }
func (d *DiskBlobStore) WipeOldestBlobs() (err error) { func (d *DiskStore) dir(hash string) string {
dirs, err := ioutil.ReadDir(d.blobDir) if d.prefixLength <= 0 || len(hash) < d.prefixLength {
return d.blobDir
}
return path.Join(d.blobDir, hash[:d.prefixLength])
}
func (d *DiskStore) path(hash string) string {
return path.Join(d.dir(hash), hash)
}
func (d *DiskStore) ensureDirExists(dir string) error {
return errors.Err(os.MkdirAll(dir, 0755))
}
func (d *DiskStore) initOnce() error {
if d.initialized {
return nil
}
err := d.ensureDirExists(d.blobDir)
if err != nil { if err != nil {
return err return err
} }
type datedFile struct {
Atime time.Time
File *os.FileInfo
FullPath string
}
datedFiles := make([]datedFile, 0, 5000)
for _, dir := range dirs {
if dir.IsDir() {
files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
if err != nil {
return err
}
for _, file := range files {
if file.Mode().IsRegular() && !file.IsDir() {
datedFiles = append(datedFiles, datedFile{
Atime: atime(file),
File: &file,
FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()),
})
}
}
}
}
sort.Slice(datedFiles, func(i, j int) bool { d.initialized = true
return datedFiles[i].Atime.Before(datedFiles[j].Atime)
})
//delete the first 50000 blobs
for i, df := range datedFiles {
if i >= 50000 {
break
}
log.Infoln(df.FullPath)
log.Infoln(df.Atime.String())
err = os.Remove(df.FullPath)
if err != nil {
return err
}
}
return nil return nil
} }

120
store/lru.go Normal file
View file

@ -0,0 +1,120 @@
package store
import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
golru "github.com/hashicorp/golang-lru"
anbsky commented 2020-10-23 07:26:17 +02:00 (Migrated from github.com)
Review

We're using a different cache lib in @nikooo777 's proposed cache in lbrytv-player. Is either of them better than the other? Should probably keep it simple and stick to one.

We're using [a different cache lib](https://github.com/karlseguin/ccache) in @nikooo777 's [proposed cache](https://github.com/lbryio/lbrytv-player/pull/25/files) in lbrytv-player. Is either of them better than the other? Should probably keep it simple and stick to one.
lyoshenka commented 2020-10-23 17:34:30 +02:00 (Migrated from github.com)
Review

Yep. We're actually planning to merge reflector and player into a single program/process so they don't have to send blobs back and forth via TCP.

Yep. We're actually planning to merge reflector and player into a single program/process so they don't have to send blobs back and forth via TCP.
)
// LRUStore adds a max cache size and LRU eviction to a BlobStore
type LRUStore struct {
// underlying store
store BlobStore
// lru implementation
lru *golru.Cache
}
// NewLRUStore initialize a new LRUStore
func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
})
if err != nil {
panic(err)
}
l := &LRUStore{
store: store,
lru: lru,
}
if lstr, ok := store.(lister); ok {
err = l.loadExisting(lstr, maxItems)
if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
}
}
return l
}
const nameLRU = "lru"
// Name is the cache type name
func (l *LRUStore) Name() string { return nameLRU }
// Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LRUStore) Has(hash string) (bool, error) {
return l.lru.Contains(hash), nil
}
// Get returns the blob or an error if the blob doesn't exist.
func (l *LRUStore) Get(hash string) (stream.Blob, error) {
_, has := l.lru.Get(hash)
if !has {
return nil, errors.Err(ErrBlobNotFound)
}
blob, err := l.store.Get(hash)
if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store
l.lru.Remove(hash)
}
return blob, err
}
// Put stores the blob
func (l *LRUStore) Put(hash string, blob stream.Blob) error {
err := l.store.Put(hash, blob)
if err != nil {
return err
}
l.lru.Add(hash, true)
return nil
}
// PutSD stores the sd blob
func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
err := l.store.PutSD(hash, blob)
if err != nil {
return err
}
l.lru.Add(hash, true)
return nil
}
// Delete deletes the blob from the store
func (l *LRUStore) Delete(hash string) error {
err := l.store.Delete(hash)
if err != nil {
return err
}
// This must come after store.Delete()
// Remove triggers onEvict function, which also tries to delete blob from store
// We need to delete it manually first so any errors can be propagated up
l.lru.Remove(hash)
return nil
}
// loadExisting imports existing blobs from the underlying store into the LRU cache
func (l *LRUStore) loadExisting(store lister, maxItems int) error {
existing, err := store.list()
if err != nil {
return err
}
added := 0
for _, h := range existing {
l.lru.Add(h, true)
added++
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
break
}
}
return nil
}

121
store/lru_test.go Normal file
View file

@ -0,0 +1,121 @@
package store
import (
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
anbsky commented 2020-10-23 07:33:13 +02:00 (Migrated from github.com)
Review

Given that test functions have to be named Test*, this looks like a typo. createTestLRUstore?

Given that test functions have to be named `Test*`, this looks like a typo. `createTestLRUstore`?
lyoshenka commented 2020-10-23 17:35:00 +02:00 (Migrated from github.com)
Review

agreed

agreed
const cacheMaxBlobs = 3
func getTestLRUStore() (*LRUStore, *MemStore) {
m := NewMemStore()
return NewLRUStore("test", m, 3), m
}
func TestLRUStore_Eviction(t *testing.T) {
lru, mem := getTestLRUStore()
b := []byte("x")
err := lru.Put("one", b)
require.NoError(t, err)
err = lru.Put("two", b)
require.NoError(t, err)
err = lru.Put("three", b)
require.NoError(t, err)
err = lru.Put("four", b)
require.NoError(t, err)
err = lru.Put("five", b)
require.NoError(t, err)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{
"one": false,
"two": false,
"three": true,
"four": true,
"five": true,
"six": false,
} {
has, err := lru.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
lru.Get("three") // touch so it stays in cache
lru.Put("six", b)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{
"one": false,
"two": false,
"three": true,
"four": false,
"five": true,
"six": true,
} {
has, err := lru.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
err = lru.Delete("three")
assert.NoError(t, err)
err = lru.Delete("five")
assert.NoError(t, err)
err = lru.Delete("six")
assert.NoError(t, err)
assert.Equal(t, 0, len(mem.Debug()))
}
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
lru, mem := getTestLRUStore()
hash := "hash"
b := []byte("this is a blob of stuff")
err := lru.Put(hash, b)
require.NoError(t, err)
err = mem.Delete(hash)
require.NoError(t, err)
// hash still exists in lru
assert.True(t, lru.lru.Contains(hash))
blob, err := lru.Get(hash)
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
reflect.TypeOf(err).String(), err.Error())
// lru.Get() removes hash if underlying store doesn't have it
assert.False(t, lru.lru.Contains(hash))
}
func TestLRUStore_loadExisting(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
d := NewDiskStore(tmpDir, 2)
hash := "hash"
b := []byte("this is a blob of stuff")
err = d.Put(hash, b)
require.NoError(t, err)
existing, err := d.list()
require.NoError(t, err)
require.Equal(t, 1, len(existing), "blob should exist in cache")
assert.Equal(t, hash, existing[0])
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created
has, err := lru.Has(hash)
require.NoError(t, err)
assert.True(t, has, "hash should be loaded from disk store but it's not")
}

View file

@ -1,29 +1,42 @@
package store package store
import ( import (
"sync"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
) )
// MemoryBlobStore is an in memory only blob store with no persistence. // MemStore is an in memory only blob store with no persistence.
type MemoryBlobStore struct { type MemStore struct {
blobs map[string]stream.Blob blobs map[string]stream.Blob
mu *sync.RWMutex
} }
func NewMemoryBlobStore() *MemoryBlobStore { func NewMemStore() *MemStore {
return &MemoryBlobStore{ return &MemStore{
blobs: make(map[string]stream.Blob), blobs: make(map[string]stream.Blob),
mu: &sync.RWMutex{},
} }
} }
const nameMem = "mem"
// Name is the cache type name
func (m *MemStore) Name() string { return nameMem }
// Has returns T/F if the blob is currently stored. It will never error. // Has returns T/F if the blob is currently stored. It will never error.
func (m *MemoryBlobStore) Has(hash string) (bool, error) { func (m *MemStore) Has(hash string) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.blobs[hash] _, ok := m.blobs[hash]
return ok, nil return ok, nil
} }
// Get returns the blob byte slice if present and errors if the blob is not found. // Get returns the blob byte slice if present and errors if the blob is not found.
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { func (m *MemStore) Get(hash string) (stream.Blob, error) {
m.mu.RLock()
defer m.mu.RUnlock()
blob, ok := m.blobs[hash] blob, ok := m.blobs[hash]
if !ok { if !ok {
return nil, errors.Err(ErrBlobNotFound) return nil, errors.Err(ErrBlobNotFound)
@ -32,23 +45,29 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
} }
// Put stores the blob in memory // Put stores the blob in memory
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error { func (m *MemStore) Put(hash string, blob stream.Blob) error {
m.mu.Lock()
defer m.mu.Unlock()
m.blobs[hash] = blob m.blobs[hash] = blob
return nil return nil
} }
// PutSD stores the sd blob in memory // PutSD stores the sd blob in memory
func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error { func (m *MemStore) PutSD(hash string, blob stream.Blob) error {
return m.Put(hash, blob) return m.Put(hash, blob)
} }
// Delete deletes the blob from the store // Delete deletes the blob from the store
func (m *MemoryBlobStore) Delete(hash string) error { func (m *MemStore) Delete(hash string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.blobs, hash) delete(m.blobs, hash)
return nil return nil
} }
// Debug returns the blobs in memory. It's useful for testing and debugging. // Debug returns the blobs in memory. It's useful for testing and debugging.
func (m *MemoryBlobStore) Debug() map[string]stream.Blob { func (m *MemStore) Debug() map[string]stream.Blob {
m.mu.RLock()
defer m.mu.RUnlock()
return m.blobs return m.blobs
} }

View file

@ -7,8 +7,8 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
) )
func TestMemoryBlobStore_Put(t *testing.T) { func TestMemStore_Put(t *testing.T) {
s := NewMemoryBlobStore() s := NewMemStore()
blob := []byte("abcdefg") blob := []byte("abcdefg")
err := s.Put("abc", blob) err := s.Put("abc", blob)
if err != nil { if err != nil {
@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) {
} }
} }
func TestMemoryBlobStore_Get(t *testing.T) { func TestMemStore_Get(t *testing.T) {
s := NewMemoryBlobStore() s := NewMemStore()
hash := "abc" hash := "abc"
blob := []byte("abcdefg") blob := []byte("abcdefg")
err := s.Put(hash, blob) err := s.Put(hash, blob)

15
store/noop.go Normal file
View file

@ -0,0 +1,15 @@
package store
import "github.com/lbryio/lbry.go/v2/stream"
// NoopStore is a store that does nothing
type NoopStore struct{}
const nameNoop = "noop"
func (n *NoopStore) Name() string { return nameNoop }
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) Delete(_ string) error { return nil }

View file

@ -18,8 +18,8 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// S3BlobStore is an S3 store // S3Store is an S3 store
type S3BlobStore struct { type S3Store struct {
awsID string awsID string
awsSecret string awsSecret string
region string region string
@ -28,9 +28,9 @@ type S3BlobStore struct {
session *session.Session session *session.Session
} }
// NewS3BlobStore returns an initialized S3 store pointer. // NewS3Store returns an initialized S3 store pointer.
func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store {
return &S3BlobStore{ return &S3Store{
awsID: awsID, awsID: awsID,
awsSecret: awsSecret, awsSecret: awsSecret,
region: region, region: region,
@ -38,25 +38,13 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
} }
} }
func (s *S3BlobStore) initOnce() error { const nameS3 = "s3"
if s.session != nil {
return nil
}
sess, err := session.NewSession(&aws.Config{ // Name is the cache type name
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), func (s *S3Store) Name() string { return nameS3 }
Region: aws.String(s.region),
})
if err != nil {
return err
}
s.session = sess
return nil
}
// Has returns T/F or Error ( from S3 ) if the store contains the blob. // Has returns T/F or Error ( from S3 ) if the store contains the blob.
func (s *S3BlobStore) Has(hash string) (bool, error) { func (s *S3Store) Has(hash string) (bool, error) {
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
return false, err return false, err
@ -77,7 +65,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) {
} }
// Get returns the blob slice if present or errors on S3. // Get returns the blob slice if present or errors on S3.
func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { func (s *S3Store) Get(hash string) (stream.Blob, error) {
//Todo-Need to handle error for blob doesn't exist for consistency. //Todo-Need to handle error for blob doesn't exist for consistency.
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
@ -110,7 +98,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
} }
// Put stores the blob on S3 or errors if S3 connection errors. // Put stores the blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { func (s *S3Store) Put(hash string, blob stream.Blob) error {
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
return err return err
@ -133,12 +121,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
} }
// PutSD stores the sd blob on S3 or errors if S3 connection errors. // PutSD stores the sd blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error { func (s *S3Store) PutSD(hash string, blob stream.Blob) error {
//Todo - handle missing stream for consistency //Todo - handle missing stream for consistency
return s.Put(hash, blob) return s.Put(hash, blob)
} }
func (s *S3BlobStore) Delete(hash string) error { func (s *S3Store) Delete(hash string) error {
err := s.initOnce() err := s.initOnce()
if err != nil { if err != nil {
return err return err
@ -153,3 +141,20 @@ func (s *S3BlobStore) Delete(hash string) error {
return err return err
} }
func (s *S3Store) initOnce() error {
if s.session != nil {
return nil
}
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
Region: aws.String(s.region),
})
if err != nil {
return err
}
s.session = sess
return nil
}

67
store/singleflight.go Normal file
View file

@ -0,0 +1,67 @@
package store
import (
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/lbry.go/v2/stream"
"golang.org/x/sync/singleflight"
)
func WithSingleFlight(component string, origin BlobStore) BlobStore {
return &singleflightStore{
BlobStore: origin,
component: component,
sf: new(singleflight.Group),
}
}
type singleflightStore struct {
BlobStore
component string
sf *singleflight.Group
}
func (s *singleflightStore) Name() string {
return "sf_" + s.BlobStore.Name()
}
// Get ensures that only one request per hash is sent to the origin at a time,
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
blob, err, _ := s.sf.Do(hash, s.getter(hash))
if err != nil {
return nil, err
}
return blob.(stream.Blob), nil
}
// getter returns a function that gets a blob from the origin
// only one getter per hash will be executing at a time
func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
return func() (interface{}, error) {
metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
start := time.Now()
blob, err := s.BlobStore.Get(hash)
if err != nil {
return nil, err
}
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.CacheRetrievalSpeed.With(map[string]string{
metrics.LabelCacheType: s.BlobStore.Name(),
metrics.LabelComponent: s.component,
metrics.LabelSource: "origin",
}).Set(rate)
return blob, nil
}
}

View file

@ -0,0 +1,89 @@
package speedwalk
import (
"io/ioutil"
"path/filepath"
"runtime"
"sync"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/karrick/godirwalk"
"github.com/sirupsen/logrus"
)
// AllFiles recursively lists every file in every subdirectory of a given directory
// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir.
func AllFiles(startDir string, basename bool) ([]string, error) {
items, err := ioutil.ReadDir(startDir)
if err != nil {
return nil, err
}
pathChan := make(chan string)
paths := make([]string, 0, 1000)
pathWG := &sync.WaitGroup{}
pathWG.Add(1)
go func() {
defer pathWG.Done()
for {
path, ok := <-pathChan
if !ok {
return
}
paths = append(paths, path)
}
}()
maxThreads := runtime.NumCPU() - 1
goroutineLimiter := make(chan struct{}, maxThreads)
for i := 0; i < maxThreads; i++ {
goroutineLimiter <- struct{}{}
}
walkerWG := &sync.WaitGroup{}
for _, item := range items {
if !item.IsDir() {
if basename {
pathChan <- item.Name()
} else {
pathChan <- filepath.Join(startDir, item.Name())
}
continue
}
<-goroutineLimiter
walkerWG.Add(1)
go func(dir string) {
defer func() {
walkerWG.Done()
goroutineLimiter <- struct{}{}
}()
err = godirwalk.Walk(filepath.Join(startDir, dir), &godirwalk.Options{
Unsorted: true, // faster this way
Callback: func(osPathname string, de *godirwalk.Dirent) error {
if de.IsRegular() {
if basename {
pathChan <- de.Name()
} else {
pathChan <- osPathname
}
}
return nil
},
})
if err != nil {
logrus.Errorf(errors.FullTrace(err))
}
}(item.Name())
}
walkerWG.Wait()
close(pathChan)
pathWG.Wait()
return paths, nil
}

View file

@ -7,15 +7,17 @@ import (
// BlobStore is an interface for handling blob storage. // BlobStore is an interface for handling blob storage.
type BlobStore interface { type BlobStore interface {
// Does blob exist in the store // Name of blob store (useful for metrics)
Name() string
// Does blob exist in the store.
Has(hash string) (bool, error) Has(hash string) (bool, error)
// Get the blob from the store // Get the blob from the store. Must return ErrBlobNotFound if blob is not in store.
Get(hash string) (stream.Blob, error) Get(hash string) (stream.Blob, error)
// Put the blob into the store // Put the blob into the store.
Put(hash string, blob stream.Blob) error Put(hash string, blob stream.Blob) error
// Put an SD blob into the store // Put an SD blob into the store.
PutSD(hash string, blob stream.Blob) error PutSD(hash string, blob stream.Blob) error
// Delete the blob from the store // Delete the blob from the store.
Delete(hash string) error Delete(hash string) error
} }
@ -27,5 +29,11 @@ type Blocklister interface {
Wants(hash string) (bool, error) Wants(hash string) (bool, error)
} }
// lister is a store that can list cached blobs. This is helpful when an overlay
// cache needs to track blob existence.
type lister interface {
list() ([]string, error)
}
//ErrBlobNotFound is a standard error when a blob is not found in the store. //ErrBlobNotFound is a standard error when a blob is not found in the store.
var ErrBlobNotFound = errors.Base("blob not found") var ErrBlobNotFound = errors.Base("blob not found")