Smarter caches #46
30 changed files with 957 additions and 444 deletions
|
@ -28,9 +28,10 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
|
|||
addr := args[0]
|
||||
sdHash := args[1]
|
||||
|
||||
s := store.NewCachingBlobStore(
|
||||
s := store.NewCachingStore(
|
||||
"getstream",
|
||||
peer.NewStore(peer.StoreOpts{Address: addr}),
|
||||
store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2),
|
||||
store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
|
||||
)
|
||||
|
||||
wd, err := os.Getwd()
|
||||
|
|
|
@ -29,7 +29,7 @@ func init() {
|
|||
func peerCmd(cmd *cobra.Command, args []string) {
|
||||
var err error
|
||||
|
||||
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
peerServer := peer.NewServer(s3)
|
||||
|
||||
if !peerNoDB {
|
||||
|
|
196
cmd/reflector.go
196
cmd/reflector.go
|
@ -4,6 +4,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -16,21 +17,25 @@ import (
|
|||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var reflectorCmdCacheDir string
|
||||
var tcpPeerPort int
|
||||
var http3PeerPort int
|
||||
var receiverPort int
|
||||
var metricsPort int
|
||||
var disableUploads bool
|
||||
var disableBlocklist bool
|
||||
var proxyAddress string
|
||||
var proxyPort string
|
||||
var proxyProtocol string
|
||||
var useDB bool
|
||||
var cloudFrontEndpoint string
|
||||
var (
|
||||
tcpPeerPort int
|
||||
http3PeerPort int
|
||||
receiverPort int
|
||||
metricsPort int
|
||||
disableUploads bool
|
||||
disableBlocklist bool
|
||||
proxyAddress string
|
||||
proxyPort string
|
||||
proxyProtocol string
|
||||
useDB bool
|
||||
cloudFrontEndpoint string
|
||||
reflectorCmdDiskCache string
|
||||
reflectorCmdMemCache int
|
||||
)
|
||||
|
||||
func init() {
|
||||
var cmd = &cobra.Command{
|
||||
|
@ -38,7 +43,6 @@ func init() {
|
|||
Short: "Run reflector server",
|
||||
Run: reflectorCmd,
|
||||
}
|
||||
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)")
|
||||
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
|
||||
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
|
||||
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
|
||||
|
@ -50,94 +54,142 @@ func init() {
|
|||
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
|
||||
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
|
||||
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
|
||||
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
|
||||
"enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
|
||||
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
|
||||
rootCmd.AddCommand(cmd)
|
||||
}
|
||||
|
||||
func reflectorCmd(cmd *cobra.Command, args []string) {
|
||||
log.Printf("reflector %s", meta.VersionString())
|
||||
|
||||
var blobStore store.BlobStore
|
||||
if proxyAddress != "" {
|
||||
switch proxyProtocol {
|
||||
case "tcp":
|
||||
blobStore = peer.NewStore(peer.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
case "http3":
|
||||
blobStore = http3.NewStore(http3.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
default:
|
||||
log.Fatalf("specified protocol is not recognized: %s", proxyProtocol)
|
||||
}
|
||||
} else {
|
||||
s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
if cloudFrontEndpoint != "" {
|
||||
blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store)
|
||||
} else {
|
||||
blobStore = s3Store
|
||||
}
|
||||
}
|
||||
// the blocklist logic requires the db backed store to be the outer-most store
|
||||
underlyingStore := setupStore()
|
||||
outerStore := wrapWithCache(underlyingStore)
|
||||
|
||||
var err error
|
||||
var reflectorServer *reflector.Server
|
||||
|
||||
if useDB {
|
||||
db := new(db.SQL)
|
||||
db.TrackAccessTime = true
|
||||
err = db.Connect(globalConfig.DBConn)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
blobStore = store.NewDBBackedStore(blobStore, db)
|
||||
|
||||
//this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work....
|
||||
//having this here prevents uploaded blobs from being stored in the disk cache
|
||||
if !disableUploads {
|
||||
reflectorServer = reflector.NewServer(blobStore)
|
||||
reflectorServer := reflector.NewServer(underlyingStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
reflectorServer.EnableBlocklist = !disableBlocklist
|
||||
|
||||
err = reflectorServer.Start(":" + strconv.Itoa(receiverPort))
|
||||
err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
defer reflectorServer.Shutdown()
|
||||
}
|
||||
|
||||
if reflectorCmdCacheDir != "" {
|
||||
err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm)
|
||||
peerServer := peer.NewServer(outerStore)
|
||||
err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2))
|
||||
}
|
||||
defer peerServer.Shutdown()
|
||||
|
||||
peerServer := peer.NewServer(blobStore)
|
||||
err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
http3PeerServer := http3.NewServer(blobStore)
|
||||
http3PeerServer := http3.NewServer(outerStore)
|
||||
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer http3PeerServer.Shutdown()
|
||||
|
||||
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
|
||||
metricsServer.Start()
|
||||
defer metricsServer.Shutdown()
|
||||
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
<-interruptChan
|
||||
metricsServer.Shutdown()
|
||||
peerServer.Shutdown()
|
||||
http3PeerServer.Shutdown()
|
||||
if reflectorServer != nil {
|
||||
reflectorServer.Shutdown()
|
||||
}
|
||||
// deferred shutdowns happen now
|
||||
}
|
||||
|
||||
func setupStore() store.BlobStore {
|
||||
var s store.BlobStore
|
||||
|
||||
if proxyAddress != "" {
|
||||
switch proxyProtocol {
|
||||
case "tcp":
|
||||
s = peer.NewStore(peer.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
case "http3":
|
||||
s = http3.NewStore(http3.StoreOpts{
|
||||
Address: proxyAddress + ":" + proxyPort,
|
||||
Timeout: 30 * time.Second,
|
||||
})
|
||||
default:
|
||||
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
|
||||
}
|
||||
} else {
|
||||
s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
if cloudFrontEndpoint != "" {
|
||||
s = store.NewCloudFrontRWStore(store.NewCloudFrontROStore(cloudFrontEndpoint), s3Store)
|
||||
} else {
|
||||
s = s3Store
|
||||
}
|
||||
}
|
||||
|
||||
if useDB {
|
||||
db := new(db.SQL)
|
||||
db.TrackAccessTime = true
|
||||
err := db.Connect(globalConfig.DBConn)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
s = store.NewDBBackedStore(s, db)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func wrapWithCache(s store.BlobStore) store.BlobStore {
|
||||
wrapped := s
|
||||
|
||||
diskCacheMaxSize, diskCachePath := diskCacheParams()
|
||||
if diskCacheMaxSize > 0 {
|
||||
err := os.MkdirAll(diskCachePath, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
|
||||
)
|
||||
}
|
||||
|
||||
if reflectorCmdMemCache > 0 {
|
||||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache),
|
||||
)
|
||||
}
|
||||
|
||||
return wrapped
|
||||
}
|
||||
|
||||
func diskCacheParams() (int, string) {
|
||||
if reflectorCmdDiskCache == "" {
|
||||
return 0, ""
|
||||
}
|
||||
|
||||
parts := strings.Split(reflectorCmdDiskCache, ":")
|
||||
if len(parts) != 2 {
|
||||
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
|
||||
}
|
||||
|
||||
maxSize := cast.ToInt(parts[0])
|
||||
if maxSize <= 0 {
|
||||
log.Fatalf("--disk-cache max size must be more than 0")
|
||||
}
|
||||
|
||||
path := parts[1]
|
||||
if len(path) == 0 || path[0] != '/' {
|
||||
log.Fatalf("--disk-cache path must start with '/'")
|
||||
}
|
||||
|
||||
return maxSize, path
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) {
|
|||
db := new(db.SQL)
|
||||
err := db.Connect(globalConfig.DBConn)
|
||||
checkErr(err)
|
||||
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
comboStore := store.NewDBBackedStore(s3, db)
|
||||
|
||||
conf := prism.DefaultConf()
|
||||
|
|
|
@ -29,7 +29,7 @@ func init() {
|
|||
func testCmd(cmd *cobra.Command, args []string) {
|
||||
log.Printf("reflector %s", meta.VersionString())
|
||||
|
||||
memStore := store.NewMemoryBlobStore()
|
||||
memStore := store.NewMemStore()
|
||||
|
||||
reflectorServer := reflector.NewServer(memStore)
|
||||
reflectorServer.Timeout = 3 * time.Minute
|
||||
|
|
|
@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) {
|
|||
checkErr(err)
|
||||
|
||||
st := store.NewDBBackedStore(
|
||||
store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
|
||||
store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
|
||||
db)
|
||||
|
||||
uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload)
|
||||
|
|
6
go.mod
6
go.mod
|
@ -14,11 +14,12 @@ require (
|
|||
github.com/google/gops v0.3.7
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.3 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/hashicorp/memberlist v0.1.4 // indirect
|
||||
github.com/hashicorp/serf v0.8.2
|
||||
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
|
||||
github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07
|
||||
github.com/karrick/godirwalk v1.16.1
|
||||
github.com/lbryio/chainquery v1.9.0
|
||||
github.com/lbryio/lbry.go v1.1.2 // indirect
|
||||
github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128
|
||||
|
@ -27,9 +28,11 @@ require (
|
|||
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
|
||||
github.com/prometheus/client_golang v0.9.2
|
||||
github.com/sirupsen/logrus v1.4.2
|
||||
github.com/spf13/afero v1.4.1
|
||||
github.com/spf13/cast v1.3.0
|
||||
github.com/spf13/cobra v0.0.3
|
||||
github.com/spf13/pflag v1.0.3 // indirect
|
||||
github.com/stretchr/testify v1.4.0
|
||||
github.com/volatiletech/null v8.0.0+incompatible
|
||||
go.uber.org/atomic v1.5.1
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
|
||||
|
@ -37,6 +40,7 @@ require (
|
|||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
|
||||
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect
|
||||
google.golang.org/appengine v1.6.2 // indirect
|
||||
gotest.tools v2.2.0+incompatible
|
||||
)
|
||||
|
||||
go 1.15
|
||||
|
|
11
go.sum
11
go.sum
|
@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA
|
|||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
|
||||
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno=
|
||||
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
|
||||
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
|
||||
|
@ -186,12 +188,15 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
|||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro=
|
||||
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
|
||||
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
|
||||
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
|
||||
github.com/keybase/go-ps v0.0.0-20161005175911-668c8856d999/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
|
@ -280,6 +285,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
|||
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
|
||||
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
|
||||
|
@ -346,6 +352,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
|
|||
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
|
||||
github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I=
|
||||
github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
|
||||
github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ=
|
||||
github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
|
||||
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
|
||||
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
|
||||
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
|
@ -394,6 +402,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
|
|||
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
|
@ -471,6 +480,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
|||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
|
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
||||
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w=
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
|
||||
ee "github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
|
@ -58,6 +59,7 @@ func (s *Server) Shutdown() {
|
|||
|
||||
const (
|
||||
ns = "reflector"
|
||||
subsystemCache = "cache"
|
||||
|
||||
labelDirection = "direction"
|
||||
labelErrorType = "error_type"
|
||||
|
@ -65,7 +67,9 @@ const (
|
|||
DirectionUpload = "upload" // to reflector
|
||||
DirectionDownload = "download" // from reflector
|
||||
|
||||
MtrLabelSource = "source"
|
||||
LabelCacheType = "cache_type"
|
||||
LabelComponent = "component"
|
||||
LabelSource = "source"
|
||||
|
||||
errConnReset = "conn_reset"
|
||||
errReadConnReset = "read_conn_reset"
|
||||
|
@ -92,6 +96,12 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "error_total",
|
||||
Help: "Total number of errors",
|
||||
}, []string{labelDirection, labelErrorType})
|
||||
|
||||
BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "blob_download_total",
|
||||
|
@ -107,27 +117,44 @@ var (
|
|||
Name: "http3_blob_download_total",
|
||||
Help: "Total number of blobs downloaded from reflector through QUIC protocol",
|
||||
})
|
||||
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
|
||||
CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_hit_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "hit_total",
|
||||
Help: "Total number of blobs retrieved from the cache storage",
|
||||
})
|
||||
CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_miss_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "miss_total",
|
||||
Help: "Total number of blobs retrieved from origin rather than cache storage",
|
||||
})
|
||||
CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_origin_requests_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "origin_requests_total",
|
||||
Help: "How many Get requests are in flight from the cache to the origin",
|
||||
})
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
// during thundering-herd situations, the metric below should be a lot smaller than the metric above
|
||||
CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "cache_waiting_requests_total",
|
||||
Subsystem: subsystemCache,
|
||||
Name: "waiting_requests_total",
|
||||
Help: "How many cache requests are waiting for an in-flight origin request",
|
||||
})
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Subsystem: subsystemCache,
|
||||
Name: "evict_total",
|
||||
Help: "Count of blobs evicted from cache",
|
||||
}, []string{LabelCacheType, LabelComponent})
|
||||
CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "speed_mbps",
|
||||
Help: "Speed of blob retrieval from cache or from origin",
|
||||
}, []string{LabelCacheType, LabelComponent, LabelSource})
|
||||
|
||||
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "blob_upload_total",
|
||||
|
@ -138,16 +165,7 @@ var (
|
|||
Name: "sdblob_upload_total",
|
||||
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
|
||||
})
|
||||
RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: ns,
|
||||
Name: "speed_mbps",
|
||||
Help: "Speed of blob retrieval",
|
||||
}, []string{MtrLabelSource})
|
||||
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "error_total",
|
||||
Help: "Total number of errors",
|
||||
}, []string{labelDirection, labelErrorType})
|
||||
|
||||
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: ns,
|
||||
Name: "tcp_in_bytes",
|
||||
|
@ -185,6 +203,13 @@ var (
|
|||
})
|
||||
)
|
||||
|
||||
func CacheLabels(name, component string) prometheus.Labels {
|
||||
return prometheus.Labels{
|
||||
LabelCacheType: name,
|
||||
LabelComponent: component,
|
||||
}
|
||||
}
|
||||
|
||||
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever
|
||||
if e == nil {
|
||||
return
|
||||
|
|
|
@ -164,7 +164,7 @@ func generateTLSConfig() *tls.Config {
|
|||
|
||||
func (s *Server) listenAndServe(server *http3.Server) {
|
||||
err := server.ListenAndServe()
|
||||
if err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
if err != nil && err.Error() != "server closed" {
|
||||
log.Errorln(errors.FullTrace(err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) {
|
|||
return c, errors.Prefix("connection error", err)
|
||||
}
|
||||
|
||||
func (p *Store) Name() string { return "http3" }
|
||||
|
||||
// Has asks the peer if they have a hash
|
||||
func (p *Store) Has(hash string) (bool, error) {
|
||||
c, err := p.getClient()
|
||||
|
|
|
@ -34,7 +34,7 @@ var availabilityRequests = []pair{
|
|||
}
|
||||
|
||||
func getServer(t *testing.T, withBlobs bool) *Server {
|
||||
st := store.NewMemoryBlobStore()
|
||||
st := store.NewMemStore()
|
||||
if withBlobs {
|
||||
for k, v := range blobs {
|
||||
err := st.Put(k, v)
|
||||
|
|
|
@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) {
|
|||
return c, errors.Prefix("connection error", err)
|
||||
}
|
||||
|
||||
func (p *Store) Name() string { return "peer" }
|
||||
|
||||
// Has asks the peer if they have a hash
|
||||
func (p *Store) Has(hash string) (bool, error) {
|
||||
c, err := p.getClient()
|
||||
|
|
|
@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
srv := NewServer(store.NewMemoryBlobStore())
|
||||
srv := NewServer(store.NewMemStore())
|
||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
srv := NewServer(store.NewMemoryBlobStore())
|
||||
srv := NewServer(store.NewMemStore())
|
||||
srv.Timeout = testTimeout
|
||||
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
|
||||
if err != nil {
|
||||
|
@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) {
|
|||
//}
|
||||
|
||||
type mockPartialStore struct {
|
||||
*store.MemoryBlobStore
|
||||
*store.MemStore
|
||||
missing []string
|
||||
}
|
||||
|
||||
|
@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) {
|
|||
missing[i] = bits.Rand().String()
|
||||
}
|
||||
|
||||
st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing})
|
||||
st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing})
|
||||
if _, ok := st.(neededBlobChecker); !ok {
|
||||
t.Fatal("mock does not implement the relevant interface")
|
||||
}
|
||||
|
|
|
@ -7,26 +7,32 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance.
|
||||
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
|
||||
// Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they
|
||||
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
|
||||
type CachingBlobStore struct {
|
||||
type CachingStore struct {
|
||||
origin, cache BlobStore
|
||||
|
||||
sf *singleflight.Group
|
||||
component string
|
||||
}
|
||||
|
||||
// NewCachingBlobStore makes a new caching disk store and returns a pointer to it.
|
||||
func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore {
|
||||
return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)}
|
||||
// NewCachingStore makes a new caching disk store and returns a pointer to it.
|
||||
func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
|
||||
return &CachingStore{
|
||||
component: component,
|
||||
origin: WithSingleFlight(component, origin),
|
||||
cache: cache,
|
||||
}
|
||||
}
|
||||
|
||||
const nameCaching = "caching"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CachingStore) Name() string { return nameCaching }
|
||||
|
||||
// Has checks the cache and then the origin for a hash. It returns true if either store has it.
|
||||
func (c *CachingBlobStore) Has(hash string) (bool, error) {
|
||||
func (c *CachingStore) Has(hash string) (bool, error) {
|
||||
has, err := c.cache.Has(hash)
|
||||
if has || err != nil {
|
||||
return has, err
|
||||
|
@ -36,49 +42,33 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) {
|
|||
|
||||
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
|
||||
// from the origin, it is also stored in the cache.
|
||||
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
||||
start := time.Now()
|
||||
blob, err := c.cache.Get(hash)
|
||||
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
||||
metrics.CacheHitCount.Inc()
|
||||
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate)
|
||||
metrics.CacheRetrievalSpeed.With(map[string]string{
|
||||
metrics.LabelCacheType: c.cache.Name(),
|
||||
metrics.LabelComponent: c.component,
|
||||
metrics.LabelSource: "cache",
|
||||
}).Set(rate)
|
||||
return blob, err
|
||||
}
|
||||
|
||||
metrics.CacheMissCount.Inc()
|
||||
return c.getFromOrigin(hash)
|
||||
}
|
||||
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||
|
||||
// getFromOrigin ensures that only one Get per hash is sent to the origin at a time,
|
||||
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
||||
func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) {
|
||||
metrics.CacheWaitingRequestsCount.Inc()
|
||||
defer metrics.CacheWaitingRequestsCount.Dec()
|
||||
originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) {
|
||||
metrics.CacheOriginRequestsCount.Inc()
|
||||
defer metrics.CacheOriginRequestsCount.Dec()
|
||||
|
||||
start := time.Now()
|
||||
blob, err := c.origin.Get(hash)
|
||||
blob, err = c.origin.Get(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate)
|
||||
|
||||
err = c.cache.Put(hash, blob)
|
||||
return blob, err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return originBlob.(stream.Blob), nil
|
||||
}
|
||||
|
||||
// Put stores the blob in the origin and the cache
|
||||
func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (c *CachingStore) Put(hash string, blob stream.Blob) error {
|
||||
err := c.origin.Put(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -87,7 +77,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
|
|||
}
|
||||
|
||||
// PutSD stores the sd blob in the origin and the cache
|
||||
func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (c *CachingStore) PutSD(hash string, blob stream.Blob) error {
|
||||
err := c.origin.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -96,7 +86,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
|
|||
}
|
||||
|
||||
// Delete deletes the blob from the origin and the cache
|
||||
func (c *CachingBlobStore) Delete(hash string) error {
|
||||
func (c *CachingStore) Delete(hash string) error {
|
||||
err := c.origin.Delete(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -9,10 +9,10 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
func TestCachingBlobStore_Put(t *testing.T) {
|
||||
origin := NewMemoryBlobStore()
|
||||
cache := NewMemoryBlobStore()
|
||||
s := NewCachingBlobStore(origin, cache)
|
||||
func TestCachingStore_Put(t *testing.T) {
|
||||
origin := NewMemStore()
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore("test", origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
hash := "hash"
|
||||
|
@ -39,10 +39,10 @@ func TestCachingBlobStore_Put(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
||||
origin := NewMemoryBlobStore()
|
||||
cache := NewMemoryBlobStore()
|
||||
s := NewCachingBlobStore(origin, cache)
|
||||
func TestCachingStore_CacheMiss(t *testing.T) {
|
||||
origin := NewMemStore()
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore("test", origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
hash := "hash"
|
||||
|
@ -76,11 +76,11 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
|
||||
func TestCachingStore_ThunderingHerd(t *testing.T) {
|
||||
storeDelay := 100 * time.Millisecond
|
||||
origin := NewSlowBlobStore(storeDelay)
|
||||
cache := NewMemoryBlobStore()
|
||||
s := NewCachingBlobStore(origin, cache)
|
||||
cache := NewMemStore()
|
||||
s := NewCachingStore("test", origin, cache)
|
||||
|
||||
b := []byte("this is a blob of stuff")
|
||||
hash := "hash"
|
||||
|
@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
|
|||
|
||||
// SlowBlobStore adds a delay to each request
|
||||
type SlowBlobStore struct {
|
||||
mem *MemoryBlobStore
|
||||
mem *MemStore
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func NewSlowBlobStore(delay time.Duration) *SlowBlobStore {
|
||||
return &SlowBlobStore{
|
||||
mem: NewMemoryBlobStore(),
|
||||
mem: NewMemStore(),
|
||||
delay: delay,
|
||||
}
|
||||
}
|
||||
func (s *SlowBlobStore) Name() string {
|
||||
return "slow"
|
||||
}
|
||||
|
||||
func (s *SlowBlobStore) Has(hash string) (bool, error) {
|
||||
time.Sleep(s.delay)
|
||||
|
|
|
@ -1,109 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// CloudFrontBlobStore is an CloudFront backed store (retrieval only)
|
||||
type CloudFrontBlobStore struct {
|
||||
cfEndpoint string
|
||||
s3Store *S3BlobStore
|
||||
}
|
||||
|
||||
// NewS3BlobStore returns an initialized S3 store pointer.
|
||||
func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore {
|
||||
return &CloudFrontBlobStore{
|
||||
cfEndpoint: cloudFrontEndpoint,
|
||||
s3Store: S3Store,
|
||||
}
|
||||
}
|
||||
|
||||
// Has returns T/F or Error if the store contains the blob.
|
||||
func (s *CloudFrontBlobStore) Has(hash string) (bool, error) {
|
||||
url := s.cfEndpoint + hash
|
||||
|
||||
req, err := http.NewRequest("HEAD", url, nil)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return false, nil
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
default:
|
||||
return false, errors.Err(res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the blob slice if present or errors.
|
||||
func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
url := s.cfEndpoint + hash
|
||||
log.Debugf("Getting %s from S3", hash[:8])
|
||||
defer func(t time.Time) {
|
||||
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
||||
}(time.Now())
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
switch res.StatusCode {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
case http.StatusOK:
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
metrics.MtrInBytesS3.Add(float64(len(b)))
|
||||
return b, nil
|
||||
default:
|
||||
return nil, errors.Err(res.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// Put stores the blob on S3 or errors if S3 store is not present.
|
||||
func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.Put(hash, blob)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob on S3 or errors if S3 store is not present.
|
||||
func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.PutSD(hash, blob)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
||||
|
||||
func (s *CloudFrontBlobStore) Delete(hash string) error {
|
||||
if s.s3Store != nil {
|
||||
return s.s3Store.Delete(hash)
|
||||
}
|
||||
return errors.Err("not implemented in cloudfront store")
|
||||
}
|
105
store/cloudfront_ro.go
Normal file
105
store/cloudfront_ro.go
Normal file
|
@ -0,0 +1,105 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// CloudFrontROStore reads from cloudfront. All writes panic.
|
||||
type CloudFrontROStore struct {
|
||||
endpoint string // cloudflare endpoint
|
||||
}
|
||||
|
||||
// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer.
|
||||
func NewCloudFrontROStore(endpoint string) *CloudFrontROStore {
|
||||
return &CloudFrontROStore{endpoint: endpoint}
|
||||
}
|
||||
|
||||
const nameCloudFrontRO = "cloudfront_ro"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO }
|
||||
|
||||
// Has checks if the hash is in the store.
|
||||
func (c *CloudFrontROStore) Has(hash string) (bool, error) {
|
||||
status, body, err := c.cfRequest(http.MethodHead, hash)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer body.Close()
|
||||
|
||||
switch status {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return false, nil
|
||||
case http.StatusOK:
|
||||
return true, nil
|
||||
default:
|
||||
return false, errors.Err("unexpected status %d", status)
|
||||
}
|
||||
}
|
||||
|
||||
// Get gets the blob from Cloudfront.
|
||||
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) {
|
||||
log.Debugf("Getting %s from S3", hash[:8])
|
||||
defer func(t time.Time) {
|
||||
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
||||
}(time.Now())
|
||||
|
||||
status, body, err := c.cfRequest(http.MethodGet, hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer body.Close()
|
||||
|
||||
switch status {
|
||||
case http.StatusNotFound, http.StatusForbidden:
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
case http.StatusOK:
|
||||
b, err := ioutil.ReadAll(body)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
metrics.MtrInBytesS3.Add(float64(len(b)))
|
||||
return b, nil
|
||||
default:
|
||||
return nil, errors.Err("unexpected status %d", status)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) {
|
||||
url := c.endpoint + hash
|
||||
req, err := http.NewRequest(method, url, nil)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Err(err)
|
||||
}
|
||||
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return 0, nil, errors.Err(err)
|
||||
}
|
||||
|
||||
return res.StatusCode, res.Body, nil
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
||||
|
||||
func (c *CloudFrontROStore) Delete(_ string) error {
|
||||
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
|
||||
}
|
50
store/cloudfront_rw.go
Normal file
50
store/cloudfront_rw.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3.
|
||||
type CloudFrontRWStore struct {
|
||||
cf *CloudFrontROStore
|
||||
s3 *S3Store
|
||||
}
|
||||
|
||||
// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer.
|
||||
// NOTE: It panics if either argument is nil.
|
||||
func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore {
|
||||
if cf == nil || s3 == nil {
|
||||
panic("both stores must be set")
|
||||
}
|
||||
return &CloudFrontRWStore{cf: cf, s3: s3}
|
||||
}
|
||||
|
||||
const nameCloudFrontRW = "cloudfront_rw"
|
||||
|
||||
// Name is the cache type name
|
||||
func (c *CloudFrontRWStore) Name() string { return nameCloudFrontRW }
|
||||
|
||||
// Has checks if the hash is in the store.
|
||||
func (c *CloudFrontRWStore) Has(hash string) (bool, error) {
|
||||
return c.cf.Has(hash)
|
||||
}
|
||||
|
||||
// Get gets the blob from Cloudfront.
|
||||
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) {
|
||||
return c.cf.Get(hash)
|
||||
}
|
||||
|
||||
// Put stores the blob on S3
|
||||
func (c *CloudFrontRWStore) Put(hash string, blob stream.Blob) error {
|
||||
return c.s3.Put(hash, blob)
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob on S3
|
||||
func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error {
|
||||
return c.s3.PutSD(hash, blob)
|
||||
}
|
||||
|
||||
// Delete deletes the blob from S3
|
||||
func (c *CloudFrontRWStore) Delete(hash string) error {
|
||||
return c.s3.Delete(hash)
|
||||
}
|
|
@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore {
|
|||
return &DBBackedStore{blobs: blobs, db: db}
|
||||
}
|
||||
|
||||
const nameDBBacked = "db-backed"
|
||||
|
||||
// Name is the cache type name
|
||||
func (d *DBBackedStore) Name() string { return nameDBBacked }
|
||||
|
||||
// Has returns true if the blob is in the store
|
||||
func (d *DBBackedStore) Has(hash string) (bool, error) {
|
||||
return d.db.HasBlob(hash)
|
||||
|
|
174
store/disk.go
174
store/disk.go
|
@ -4,82 +4,38 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/lbryio/reflector.go/store/speedwalk"
|
||||
)
|
||||
|
||||
// DiskBlobStore stores blobs on a local disk
|
||||
type DiskBlobStore struct {
|
||||
// DiskStore stores blobs on a local disk
|
||||
type DiskStore struct {
|
||||
// the location of blobs on disk
|
||||
blobDir string
|
||||
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
|
||||
prefixLength int
|
||||
|
||||
// true if initOnce ran, false otherwise
|
||||
initialized bool
|
||||
lastChecked time.Time
|
||||
diskCleanupBusy chan bool
|
||||
}
|
||||
|
||||
// NewDiskBlobStore returns an initialized file disk store pointer.
|
||||
func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore {
|
||||
dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)}
|
||||
dbs.diskCleanupBusy <- true
|
||||
return &dbs
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) dir(hash string) string {
|
||||
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
|
||||
return d.blobDir
|
||||
// NewDiskStore returns an initialized file disk store pointer.
|
||||
func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
||||
return &DiskStore{
|
||||
blobDir: dir,
|
||||
prefixLength: prefixLength,
|
||||
}
|
||||
return path.Join(d.blobDir, hash[:d.prefixLength])
|
||||
}
|
||||
|
||||
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
|
||||
func (d *DiskBlobStore) getUsedSpace() (float32, error) {
|
||||
var stat syscall.Statfs_t
|
||||
err := syscall.Statfs(d.blobDir, &stat)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Available blocks * size per block = available space in bytes
|
||||
all := stat.Blocks * uint64(stat.Bsize)
|
||||
free := stat.Bfree * uint64(stat.Bsize)
|
||||
used := all - free
|
||||
const nameDisk = "disk"
|
||||
|
||||
return float32(used) / float32(all), nil
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) path(hash string) string {
|
||||
return path.Join(d.dir(hash), hash)
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) ensureDirExists(dir string) error {
|
||||
return errors.Err(os.MkdirAll(dir, 0755))
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) initOnce() error {
|
||||
if d.initialized {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := d.ensureDirExists(d.blobDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
// Name is the cache type name
|
||||
func (d *DiskStore) Name() string { return nameDisk }
|
||||
|
||||
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
|
||||
func (d *DiskBlobStore) Has(hash string) (bool, error) {
|
||||
func (d *DiskStore) Has(hash string) (bool, error) {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -90,13 +46,13 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) {
|
|||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
return false, errors.Err(err)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Get returns the blob or an error if the blob doesn't exist.
|
||||
func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (d *DiskStore) Get(hash string) (stream.Blob, error) {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -111,11 +67,12 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
|
|||
}
|
||||
defer file.Close()
|
||||
|
||||
return ioutil.ReadAll(file)
|
||||
blob, err := ioutil.ReadAll(file)
|
||||
return blob, errors.Err(err)
|
||||
}
|
||||
|
||||
// Put stores the blob on disk
|
||||
func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (d *DiskStore) Put(hash string, blob stream.Blob) error {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -126,16 +83,17 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return ioutil.WriteFile(d.path(hash), blob, 0644)
|
||||
err = ioutil.WriteFile(d.path(hash), blob, 0644)
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob on the disk
|
||||
func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (d *DiskStore) PutSD(hash string, blob stream.Blob) error {
|
||||
return d.Put(hash, blob)
|
||||
}
|
||||
|
||||
// Delete deletes the blob from the store
|
||||
func (d *DiskBlobStore) Delete(hash string) error {
|
||||
func (d *DiskStore) Delete(hash string) error {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -149,75 +107,45 @@ func (d *DiskBlobStore) Delete(hash string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return os.Remove(d.path(hash))
|
||||
err = os.Remove(d.path(hash))
|
||||
return errors.Err(err)
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) ensureDiskSpace() {
|
||||
defer func() {
|
||||
d.lastChecked = time.Now()
|
||||
d.diskCleanupBusy <- true
|
||||
}()
|
||||
// list returns the hashes of blobs that already exist in the blobDir
|
||||
func (d *DiskStore) list() ([]string, error) {
|
||||
err := d.initOnce()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
used, err := d.getUsedSpace()
|
||||
if err != nil {
|
||||
log.Errorln(err.Error())
|
||||
return
|
||||
}
|
||||
log.Infof("disk usage: %.2f%%\n", used*100)
|
||||
if used > 0.90 {
|
||||
log.Infoln("over 0.90, cleaning up")
|
||||
err = d.WipeOldestBlobs()
|
||||
if err != nil {
|
||||
log.Errorln(err.Error())
|
||||
return
|
||||
}
|
||||
log.Infoln("Done cleaning up")
|
||||
}
|
||||
return speedwalk.AllFiles(d.blobDir, true)
|
||||
}
|
||||
|
||||
func (d *DiskBlobStore) WipeOldestBlobs() (err error) {
|
||||
dirs, err := ioutil.ReadDir(d.blobDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
type datedFile struct {
|
||||
Atime time.Time
|
||||
File *os.FileInfo
|
||||
FullPath string
|
||||
}
|
||||
datedFiles := make([]datedFile, 0, 5000)
|
||||
for _, dir := range dirs {
|
||||
if dir.IsDir() {
|
||||
files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, file := range files {
|
||||
if file.Mode().IsRegular() && !file.IsDir() {
|
||||
datedFiles = append(datedFiles, datedFile{
|
||||
Atime: atime(file),
|
||||
File: &file,
|
||||
FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()),
|
||||
})
|
||||
}
|
||||
}
|
||||
func (d *DiskStore) dir(hash string) string {
|
||||
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
|
||||
return d.blobDir
|
||||
}
|
||||
return path.Join(d.blobDir, hash[:d.prefixLength])
|
||||
}
|
||||
|
||||
func (d *DiskStore) path(hash string) string {
|
||||
return path.Join(d.dir(hash), hash)
|
||||
}
|
||||
|
||||
func (d *DiskStore) ensureDirExists(dir string) error {
|
||||
return errors.Err(os.MkdirAll(dir, 0755))
|
||||
}
|
||||
|
||||
func (d *DiskStore) initOnce() error {
|
||||
if d.initialized {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Slice(datedFiles, func(i, j int) bool {
|
||||
return datedFiles[i].Atime.Before(datedFiles[j].Atime)
|
||||
})
|
||||
//delete the first 50000 blobs
|
||||
for i, df := range datedFiles {
|
||||
if i >= 50000 {
|
||||
break
|
||||
}
|
||||
log.Infoln(df.FullPath)
|
||||
log.Infoln(df.Atime.String())
|
||||
err = os.Remove(df.FullPath)
|
||||
err := d.ensureDirExists(d.blobDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
d.initialized = true
|
||||
return nil
|
||||
}
|
||||
|
|
120
store/lru.go
Normal file
120
store/lru.go
Normal file
|
@ -0,0 +1,120 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
||||
golru "github.com/hashicorp/golang-lru"
|
||||
Yep. We're actually planning to merge reflector and player into a single program/process so they don't have to send blobs back and forth via TCP. Yep. We're actually planning to merge reflector and player into a single program/process so they don't have to send blobs back and forth via TCP.
|
||||
)
|
||||
|
||||
// LRUStore adds a max cache size and LRU eviction to a BlobStore
|
||||
type LRUStore struct {
|
||||
// underlying store
|
||||
store BlobStore
|
||||
// lru implementation
|
||||
lru *golru.Cache
|
||||
}
|
||||
|
||||
// NewLRUStore initialize a new LRUStore
|
||||
func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
|
||||
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
|
||||
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
|
||||
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
l := &LRUStore{
|
||||
store: store,
|
||||
lru: lru,
|
||||
}
|
||||
|
||||
if lstr, ok := store.(lister); ok {
|
||||
err = l.loadExisting(lstr, maxItems)
|
||||
if err != nil {
|
||||
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
|
||||
}
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
const nameLRU = "lru"
|
||||
|
||||
// Name is the cache type name
|
||||
func (l *LRUStore) Name() string { return nameLRU }
|
||||
|
||||
// Has returns whether the blob is in the store, without updating the recent-ness.
|
||||
func (l *LRUStore) Has(hash string) (bool, error) {
|
||||
return l.lru.Contains(hash), nil
|
||||
}
|
||||
|
||||
// Get returns the blob or an error if the blob doesn't exist.
|
||||
func (l *LRUStore) Get(hash string) (stream.Blob, error) {
|
||||
_, has := l.lru.Get(hash)
|
||||
if !has {
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
}
|
||||
blob, err := l.store.Get(hash)
|
||||
if errors.Is(err, ErrBlobNotFound) {
|
||||
// Blob disappeared from underlying store
|
||||
l.lru.Remove(hash)
|
||||
}
|
||||
return blob, err
|
||||
}
|
||||
|
||||
// Put stores the blob
|
||||
func (l *LRUStore) Put(hash string, blob stream.Blob) error {
|
||||
err := l.store.Put(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.lru.Add(hash, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob
|
||||
func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
|
||||
err := l.store.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
l.lru.Add(hash, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the blob from the store
|
||||
func (l *LRUStore) Delete(hash string) error {
|
||||
err := l.store.Delete(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// This must come after store.Delete()
|
||||
// Remove triggers onEvict function, which also tries to delete blob from store
|
||||
// We need to delete it manually first so any errors can be propagated up
|
||||
l.lru.Remove(hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadExisting imports existing blobs from the underlying store into the LRU cache
|
||||
func (l *LRUStore) loadExisting(store lister, maxItems int) error {
|
||||
existing, err := store.list()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
added := 0
|
||||
for _, h := range existing {
|
||||
l.lru.Add(h, true)
|
||||
added++
|
||||
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
121
store/lru_test.go
Normal file
121
store/lru_test.go
Normal file
|
@ -0,0 +1,121 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
Given that test functions have to be named Given that test functions have to be named `Test*`, this looks like a typo. `createTestLRUstore`?
agreed agreed
|
||||
|
||||
const cacheMaxBlobs = 3
|
||||
|
||||
func getTestLRUStore() (*LRUStore, *MemStore) {
|
||||
m := NewMemStore()
|
||||
return NewLRUStore("test", m, 3), m
|
||||
}
|
||||
|
||||
func TestLRUStore_Eviction(t *testing.T) {
|
||||
lru, mem := getTestLRUStore()
|
||||
b := []byte("x")
|
||||
err := lru.Put("one", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("two", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("three", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("four", b)
|
||||
require.NoError(t, err)
|
||||
err = lru.Put("five", b)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||
|
||||
for k, v := range map[string]bool{
|
||||
"one": false,
|
||||
"two": false,
|
||||
"three": true,
|
||||
"four": true,
|
||||
"five": true,
|
||||
"six": false,
|
||||
} {
|
||||
has, err := lru.Has(k)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, v, has)
|
||||
}
|
||||
|
||||
lru.Get("three") // touch so it stays in cache
|
||||
lru.Put("six", b)
|
||||
|
||||
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||
|
||||
for k, v := range map[string]bool{
|
||||
"one": false,
|
||||
"two": false,
|
||||
"three": true,
|
||||
"four": false,
|
||||
"five": true,
|
||||
"six": true,
|
||||
} {
|
||||
has, err := lru.Has(k)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, v, has)
|
||||
}
|
||||
|
||||
err = lru.Delete("three")
|
||||
assert.NoError(t, err)
|
||||
err = lru.Delete("five")
|
||||
assert.NoError(t, err)
|
||||
err = lru.Delete("six")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, len(mem.Debug()))
|
||||
}
|
||||
|
||||
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
|
||||
lru, mem := getTestLRUStore()
|
||||
hash := "hash"
|
||||
b := []byte("this is a blob of stuff")
|
||||
err := lru.Put(hash, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = mem.Delete(hash)
|
||||
require.NoError(t, err)
|
||||
|
||||
// hash still exists in lru
|
||||
assert.True(t, lru.lru.Contains(hash))
|
||||
|
||||
blob, err := lru.Get(hash)
|
||||
assert.Nil(t, blob)
|
||||
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
|
||||
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
|
||||
reflect.TypeOf(err).String(), err.Error())
|
||||
|
||||
// lru.Get() removes hash if underlying store doesn't have it
|
||||
assert.False(t, lru.lru.Contains(hash))
|
||||
}
|
||||
|
||||
func TestLRUStore_loadExisting(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tmpDir)
|
||||
d := NewDiskStore(tmpDir, 2)
|
||||
|
||||
hash := "hash"
|
||||
b := []byte("this is a blob of stuff")
|
||||
err = d.Put(hash, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
existing, err := d.list()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(existing), "blob should exist in cache")
|
||||
assert.Equal(t, hash, existing[0])
|
||||
|
||||
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created
|
||||
has, err := lru.Has(hash)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, has, "hash should be loaded from disk store but it's not")
|
||||
}
|
|
@ -1,29 +1,42 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
)
|
||||
|
||||
// MemoryBlobStore is an in memory only blob store with no persistence.
|
||||
type MemoryBlobStore struct {
|
||||
// MemStore is an in memory only blob store with no persistence.
|
||||
type MemStore struct {
|
||||
blobs map[string]stream.Blob
|
||||
mu *sync.RWMutex
|
||||
}
|
||||
|
||||
func NewMemoryBlobStore() *MemoryBlobStore {
|
||||
return &MemoryBlobStore{
|
||||
func NewMemStore() *MemStore {
|
||||
return &MemStore{
|
||||
blobs: make(map[string]stream.Blob),
|
||||
mu: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
const nameMem = "mem"
|
||||
|
||||
// Name is the cache type name
|
||||
func (m *MemStore) Name() string { return nameMem }
|
||||
|
||||
// Has returns T/F if the blob is currently stored. It will never error.
|
||||
func (m *MemoryBlobStore) Has(hash string) (bool, error) {
|
||||
func (m *MemStore) Has(hash string) (bool, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
_, ok := m.blobs[hash]
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// Get returns the blob byte slice if present and errors if the blob is not found.
|
||||
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (m *MemStore) Get(hash string) (stream.Blob, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
blob, ok := m.blobs[hash]
|
||||
if !ok {
|
||||
return nil, errors.Err(ErrBlobNotFound)
|
||||
|
@ -32,23 +45,29 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
|
|||
}
|
||||
|
||||
// Put stores the blob in memory
|
||||
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (m *MemStore) Put(hash string, blob stream.Blob) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.blobs[hash] = blob
|
||||
return nil
|
||||
}
|
||||
|
||||
// PutSD stores the sd blob in memory
|
||||
func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (m *MemStore) PutSD(hash string, blob stream.Blob) error {
|
||||
return m.Put(hash, blob)
|
||||
}
|
||||
|
||||
// Delete deletes the blob from the store
|
||||
func (m *MemoryBlobStore) Delete(hash string) error {
|
||||
func (m *MemStore) Delete(hash string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.blobs, hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Debug returns the blobs in memory. It's useful for testing and debugging.
|
||||
func (m *MemoryBlobStore) Debug() map[string]stream.Blob {
|
||||
func (m *MemStore) Debug() map[string]stream.Blob {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.blobs
|
||||
}
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
)
|
||||
|
||||
func TestMemoryBlobStore_Put(t *testing.T) {
|
||||
s := NewMemoryBlobStore()
|
||||
func TestMemStore_Put(t *testing.T) {
|
||||
s := NewMemStore()
|
||||
blob := []byte("abcdefg")
|
||||
err := s.Put("abc", blob)
|
||||
if err != nil {
|
||||
|
@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMemoryBlobStore_Get(t *testing.T) {
|
||||
s := NewMemoryBlobStore()
|
||||
func TestMemStore_Get(t *testing.T) {
|
||||
s := NewMemStore()
|
||||
hash := "abc"
|
||||
blob := []byte("abcdefg")
|
||||
err := s.Put(hash, blob)
|
||||
|
|
15
store/noop.go
Normal file
15
store/noop.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package store
|
||||
|
||||
import "github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
// NoopStore is a store that does nothing
|
||||
type NoopStore struct{}
|
||||
|
||||
const nameNoop = "noop"
|
||||
|
||||
func (n *NoopStore) Name() string { return nameNoop }
|
||||
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
|
||||
func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
|
||||
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
|
||||
func (n *NoopStore) Delete(_ string) error { return nil }
|
55
store/s3.go
55
store/s3.go
|
@ -18,8 +18,8 @@ import (
|
|||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// S3BlobStore is an S3 store
|
||||
type S3BlobStore struct {
|
||||
// S3Store is an S3 store
|
||||
type S3Store struct {
|
||||
awsID string
|
||||
awsSecret string
|
||||
region string
|
||||
|
@ -28,9 +28,9 @@ type S3BlobStore struct {
|
|||
session *session.Session
|
||||
}
|
||||
|
||||
// NewS3BlobStore returns an initialized S3 store pointer.
|
||||
func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
|
||||
return &S3BlobStore{
|
||||
// NewS3Store returns an initialized S3 store pointer.
|
||||
func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store {
|
||||
return &S3Store{
|
||||
awsID: awsID,
|
||||
awsSecret: awsSecret,
|
||||
region: region,
|
||||
|
@ -38,25 +38,13 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *S3BlobStore) initOnce() error {
|
||||
if s.session != nil {
|
||||
return nil
|
||||
}
|
||||
const nameS3 = "s3"
|
||||
|
||||
sess, err := session.NewSession(&aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
|
||||
Region: aws.String(s.region),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
// Name is the cache type name
|
||||
func (s *S3Store) Name() string { return nameS3 }
|
||||
|
||||
// Has returns T/F or Error ( from S3 ) if the store contains the blob.
|
||||
func (s *S3BlobStore) Has(hash string) (bool, error) {
|
||||
func (s *S3Store) Has(hash string) (bool, error) {
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -77,7 +65,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) {
|
|||
}
|
||||
|
||||
// Get returns the blob slice if present or errors on S3.
|
||||
func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
|
||||
func (s *S3Store) Get(hash string) (stream.Blob, error) {
|
||||
//Todo-Need to handle error for blob doesn't exist for consistency.
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
|
@ -110,7 +98,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
|
|||
}
|
||||
|
||||
// Put stores the blob on S3 or errors if S3 connection errors.
|
||||
func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
|
||||
func (s *S3Store) Put(hash string, blob stream.Blob) error {
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -133,12 +121,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
|
|||
}
|
||||
|
||||
// PutSD stores the sd blob on S3 or errors if S3 connection errors.
|
||||
func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error {
|
||||
func (s *S3Store) PutSD(hash string, blob stream.Blob) error {
|
||||
//Todo - handle missing stream for consistency
|
||||
return s.Put(hash, blob)
|
||||
}
|
||||
|
||||
func (s *S3BlobStore) Delete(hash string) error {
|
||||
func (s *S3Store) Delete(hash string) error {
|
||||
err := s.initOnce()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -153,3 +141,20 @@ func (s *S3BlobStore) Delete(hash string) error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *S3Store) initOnce() error {
|
||||
if s.session != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
sess, err := session.NewSession(&aws.Config{
|
||||
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
|
||||
Region: aws.String(s.region),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.session = sess
|
||||
return nil
|
||||
}
|
||||
|
|
67
store/singleflight.go
Normal file
67
store/singleflight.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
func WithSingleFlight(component string, origin BlobStore) BlobStore {
|
||||
return &singleflightStore{
|
||||
BlobStore: origin,
|
||||
component: component,
|
||||
sf: new(singleflight.Group),
|
||||
}
|
||||
}
|
||||
|
||||
type singleflightStore struct {
|
||||
BlobStore
|
||||
|
||||
component string
|
||||
sf *singleflight.Group
|
||||
}
|
||||
|
||||
func (s *singleflightStore) Name() string {
|
||||
return "sf_" + s.BlobStore.Name()
|
||||
}
|
||||
|
||||
// Get ensures that only one request per hash is sent to the origin at a time,
|
||||
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
||||
func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
|
||||
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
|
||||
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
|
||||
|
||||
blob, err, _ := s.sf.Do(hash, s.getter(hash))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return blob.(stream.Blob), nil
|
||||
}
|
||||
|
||||
// getter returns a function that gets a blob from the origin
|
||||
// only one getter per hash will be executing at a time
|
||||
func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
||||
return func() (interface{}, error) {
|
||||
metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
|
||||
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
|
||||
|
||||
start := time.Now()
|
||||
blob, err := s.BlobStore.Get(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||
metrics.CacheRetrievalSpeed.With(map[string]string{
|
||||
metrics.LabelCacheType: s.BlobStore.Name(),
|
||||
metrics.LabelComponent: s.component,
|
||||
metrics.LabelSource: "origin",
|
||||
}).Set(rate)
|
||||
|
||||
return blob, nil
|
||||
}
|
||||
}
|
89
store/speedwalk/speedwalk.go
Normal file
89
store/speedwalk/speedwalk.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
package speedwalk
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
|
||||
"github.com/karrick/godirwalk"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// AllFiles recursively lists every file in every subdirectory of a given directory
|
||||
// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir.
|
||||
func AllFiles(startDir string, basename bool) ([]string, error) {
|
||||
items, err := ioutil.ReadDir(startDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pathChan := make(chan string)
|
||||
paths := make([]string, 0, 1000)
|
||||
pathWG := &sync.WaitGroup{}
|
||||
pathWG.Add(1)
|
||||
go func() {
|
||||
defer pathWG.Done()
|
||||
for {
|
||||
path, ok := <-pathChan
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
paths = append(paths, path)
|
||||
}
|
||||
}()
|
||||
|
||||
maxThreads := runtime.NumCPU() - 1
|
||||
goroutineLimiter := make(chan struct{}, maxThreads)
|
||||
for i := 0; i < maxThreads; i++ {
|
||||
goroutineLimiter <- struct{}{}
|
||||
}
|
||||
|
||||
walkerWG := &sync.WaitGroup{}
|
||||
for _, item := range items {
|
||||
if !item.IsDir() {
|
||||
if basename {
|
||||
pathChan <- item.Name()
|
||||
} else {
|
||||
pathChan <- filepath.Join(startDir, item.Name())
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
<-goroutineLimiter
|
||||
walkerWG.Add(1)
|
||||
|
||||
go func(dir string) {
|
||||
defer func() {
|
||||
walkerWG.Done()
|
||||
goroutineLimiter <- struct{}{}
|
||||
}()
|
||||
|
||||
err = godirwalk.Walk(filepath.Join(startDir, dir), &godirwalk.Options{
|
||||
Unsorted: true, // faster this way
|
||||
Callback: func(osPathname string, de *godirwalk.Dirent) error {
|
||||
if de.IsRegular() {
|
||||
if basename {
|
||||
pathChan <- de.Name()
|
||||
} else {
|
||||
pathChan <- osPathname
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
logrus.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
}(item.Name())
|
||||
}
|
||||
|
||||
walkerWG.Wait()
|
||||
|
||||
close(pathChan)
|
||||
pathWG.Wait()
|
||||
|
||||
return paths, nil
|
||||
}
|
|
@ -7,15 +7,17 @@ import (
|
|||
|
||||
// BlobStore is an interface for handling blob storage.
|
||||
type BlobStore interface {
|
||||
// Does blob exist in the store
|
||||
// Name of blob store (useful for metrics)
|
||||
Name() string
|
||||
// Does blob exist in the store.
|
||||
Has(hash string) (bool, error)
|
||||
// Get the blob from the store
|
||||
// Get the blob from the store. Must return ErrBlobNotFound if blob is not in store.
|
||||
Get(hash string) (stream.Blob, error)
|
||||
// Put the blob into the store
|
||||
// Put the blob into the store.
|
||||
Put(hash string, blob stream.Blob) error
|
||||
// Put an SD blob into the store
|
||||
// Put an SD blob into the store.
|
||||
PutSD(hash string, blob stream.Blob) error
|
||||
// Delete the blob from the store
|
||||
// Delete the blob from the store.
|
||||
Delete(hash string) error
|
||||
}
|
||||
|
||||
|
@ -27,5 +29,11 @@ type Blocklister interface {
|
|||
Wants(hash string) (bool, error)
|
||||
}
|
||||
|
||||
// lister is a store that can list cached blobs. This is helpful when an overlay
|
||||
// cache needs to track blob existence.
|
||||
type lister interface {
|
||||
list() ([]string, error)
|
||||
}
|
||||
|
||||
//ErrBlobNotFound is a standard error when a blob is not found in the store.
|
||||
var ErrBlobNotFound = errors.Base("blob not found")
|
||||
|
|
Loading…
Reference in a new issue
We're using a different cache lib in @nikooo777 's proposed cache in lbrytv-player. Is either of them better than the other? Should probably keep it simple and stick to one.