Smarter caches #46

Merged
lyoshenka merged 13 commits from smarter_caches into master 2020-11-04 22:04:22 +01:00
30 changed files with 957 additions and 444 deletions

View file

@ -28,9 +28,10 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
addr := args[0]
sdHash := args[1]
s := store.NewCachingBlobStore(
s := store.NewCachingStore(
"getstream",
peer.NewStore(peer.StoreOpts{Address: addr}),
store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2),
store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2),
)
wd, err := os.Getwd()

View file

@ -29,7 +29,7 @@ func init() {
func peerCmd(cmd *cobra.Command, args []string) {
var err error
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
peerServer := peer.NewServer(s3)
if !peerNoDB {

View file

@ -4,6 +4,7 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
@ -16,21 +17,25 @@ import (
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
"github.com/spf13/cast"
"github.com/spf13/cobra"
)
var reflectorCmdCacheDir string
var tcpPeerPort int
var http3PeerPort int
var receiverPort int
var metricsPort int
var disableUploads bool
var disableBlocklist bool
var proxyAddress string
var proxyPort string
var proxyProtocol string
var useDB bool
var cloudFrontEndpoint string
var (
tcpPeerPort int
http3PeerPort int
receiverPort int
metricsPort int
disableUploads bool
disableBlocklist bool
proxyAddress string
proxyPort string
proxyProtocol string
useDB bool
cloudFrontEndpoint string
reflectorCmdDiskCache string
reflectorCmdMemCache int
)
func init() {
var cmd = &cobra.Command{
@ -38,7 +43,6 @@ func init() {
Short: "Run reflector server",
Run: reflectorCmd,
}
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)")
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
@ -50,94 +54,142 @@ func init() {
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
"enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
rootCmd.AddCommand(cmd)
}
func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
var blobStore store.BlobStore
if proxyAddress != "" {
switch proxyProtocol {
case "tcp":
blobStore = peer.NewStore(peer.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
case "http3":
blobStore = http3.NewStore(http3.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
default:
log.Fatalf("specified protocol is not recognized: %s", proxyProtocol)
}
} else {
s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
if cloudFrontEndpoint != "" {
blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store)
} else {
blobStore = s3Store
}
}
// the blocklist logic requires the db backed store to be the outer-most store
underlyingStore := setupStore()
outerStore := wrapWithCache(underlyingStore)
var err error
var reflectorServer *reflector.Server
if !disableUploads {
reflectorServer := reflector.NewServer(underlyingStore)
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = !disableBlocklist
if useDB {
db := new(db.SQL)
db.TrackAccessTime = true
err = db.Connect(globalConfig.DBConn)
err := reflectorServer.Start(":" + strconv.Itoa(receiverPort))
if err != nil {
log.Fatal(err)
}
blobStore = store.NewDBBackedStore(blobStore, db)
//this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work....
//having this here prevents uploaded blobs from being stored in the disk cache
if !disableUploads {
reflectorServer = reflector.NewServer(blobStore)
reflectorServer.Timeout = 3 * time.Minute
reflectorServer.EnableBlocklist = !disableBlocklist
err = reflectorServer.Start(":" + strconv.Itoa(receiverPort))
if err != nil {
log.Fatal(err)
}
}
defer reflectorServer.Shutdown()
}
if reflectorCmdCacheDir != "" {
err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm)
if err != nil {
log.Fatal(err)
}
blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2))
}
peerServer := peer.NewServer(blobStore)
err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
peerServer := peer.NewServer(outerStore)
err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort))
if err != nil {
log.Fatal(err)
}
defer peerServer.Shutdown()
http3PeerServer := http3.NewServer(blobStore)
http3PeerServer := http3.NewServer(outerStore)
err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort))
if err != nil {
log.Fatal(err)
}
defer http3PeerServer.Shutdown()
metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start()
defer metricsServer.Shutdown()
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan
metricsServer.Shutdown()
peerServer.Shutdown()
http3PeerServer.Shutdown()
if reflectorServer != nil {
reflectorServer.Shutdown()
}
// deferred shutdowns happen now
}
func setupStore() store.BlobStore {
var s store.BlobStore
if proxyAddress != "" {
switch proxyProtocol {
case "tcp":
s = peer.NewStore(peer.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
case "http3":
s = http3.NewStore(http3.StoreOpts{
Address: proxyAddress + ":" + proxyPort,
Timeout: 30 * time.Second,
})
default:
log.Fatalf("protocol is not recognized: %s", proxyProtocol)
}
} else {
s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
if cloudFrontEndpoint != "" {
s = store.NewCloudFrontRWStore(store.NewCloudFrontROStore(cloudFrontEndpoint), s3Store)
} else {
s = s3Store
}
}
if useDB {
db := new(db.SQL)
db.TrackAccessTime = true
err := db.Connect(globalConfig.DBConn)
if err != nil {
log.Fatal(err)
}
s = store.NewDBBackedStore(s, db)
}
return s
}
func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped := s
diskCacheMaxSize, diskCachePath := diskCacheParams()
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
log.Fatal(err)
}
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize),
)
}
if reflectorCmdMemCache > 0 {
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache),
)
}
return wrapped
}
func diskCacheParams() (int, string) {
if reflectorCmdDiskCache == "" {
return 0, ""
}
parts := strings.Split(reflectorCmdDiskCache, ":")
if len(parts) != 2 {
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
}
maxSize := cast.ToInt(parts[0])
if maxSize <= 0 {
log.Fatalf("--disk-cache max size must be more than 0")
}
path := parts[1]
if len(path) == 0 || path[0] != '/' {
log.Fatalf("--disk-cache path must start with '/'")
}
return maxSize, path
}

View file

@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) {
db := new(db.SQL)
err := db.Connect(globalConfig.DBConn)
checkErr(err)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
comboStore := store.NewDBBackedStore(s3, db)
conf := prism.DefaultConf()

View file

@ -29,7 +29,7 @@ func init() {
func testCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
memStore := store.NewMemoryBlobStore()
memStore := store.NewMemStore()
reflectorServer := reflector.NewServer(memStore)
reflectorServer.Timeout = 3 * time.Minute

View file

@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) {
checkErr(err)
st := store.NewDBBackedStore(
store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
db)
uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload)

6
go.mod
View file

@ -14,11 +14,12 @@ require (
github.com/google/gops v0.3.7
github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/memberlist v0.1.4 // indirect
github.com/hashicorp/serf v0.8.2
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07
github.com/karrick/godirwalk v1.16.1
github.com/lbryio/chainquery v1.9.0
github.com/lbryio/lbry.go v1.1.2 // indirect
github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128
@ -27,9 +28,11 @@ require (
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.4.1
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.4.0
github.com/volatiletech/null v8.0.0+incompatible
go.uber.org/atomic v1.5.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
@ -37,6 +40,7 @@ require (
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect
google.golang.org/appengine v1.6.2 // indirect
gotest.tools v2.2.0+incompatible
)
go 1.15

11
go.sum
View file

@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno=
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
@ -186,12 +188,15 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro=
github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw=
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
github.com/keybase/go-ps v0.0.0-20161005175911-668c8856d999/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -280,6 +285,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
@ -346,6 +352,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I=
github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ=
github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
@ -394,6 +402,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -471,6 +480,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w=

View file

@ -12,6 +12,7 @@ import (
ee "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -57,7 +58,8 @@ func (s *Server) Shutdown() {
}
const (
ns = "reflector"
ns = "reflector"
subsystemCache = "cache"
labelDirection = "direction"
labelErrorType = "error_type"
@ -65,7 +67,9 @@ const (
DirectionUpload = "upload" // to reflector
DirectionDownload = "download" // from reflector
MtrLabelSource = "source"
LabelCacheType = "cache_type"
LabelComponent = "component"
LabelSource = "source"
errConnReset = "conn_reset"
errReadConnReset = "read_conn_reset"
@ -92,6 +96,12 @@ const (
)
var (
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "error_total",
Help: "Total number of errors",
}, []string{labelDirection, labelErrorType})
BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "blob_download_total",
@ -107,27 +117,44 @@ var (
Name: "http3_blob_download_total",
Help: "Total number of blobs downloaded from reflector through QUIC protocol",
})
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "cache_hit_total",
Subsystem: subsystemCache,
Name: "hit_total",
Help: "Total number of blobs retrieved from the cache storage",
})
CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{
}, []string{LabelCacheType, LabelComponent})
CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "cache_miss_total",
Subsystem: subsystemCache,
Name: "miss_total",
Help: "Total number of blobs retrieved from origin rather than cache storage",
})
CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
}, []string{LabelCacheType, LabelComponent})
CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "cache_origin_requests_total",
Subsystem: subsystemCache,
Name: "origin_requests_total",
Help: "How many Get requests are in flight from the cache to the origin",
})
}, []string{LabelCacheType, LabelComponent})
// during thundering-herd situations, the metric below should be a lot smaller than the metric above
CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{
CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "cache_waiting_requests_total",
Subsystem: subsystemCache,
Name: "waiting_requests_total",
Help: "How many cache requests are waiting for an in-flight origin request",
})
}, []string{LabelCacheType, LabelComponent})
CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: subsystemCache,
Name: "evict_total",
Help: "Count of blobs evicted from cache",
}, []string{LabelCacheType, LabelComponent})
CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "speed_mbps",
Help: "Speed of blob retrieval from cache or from origin",
}, []string{LabelCacheType, LabelComponent, LabelSource})
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "blob_upload_total",
@ -138,16 +165,7 @@ var (
Name: "sdblob_upload_total",
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
})
RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "speed_mbps",
Help: "Speed of blob retrieval",
}, []string{MtrLabelSource})
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "error_total",
Help: "Total number of errors",
}, []string{labelDirection, labelErrorType})
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "tcp_in_bytes",
@ -185,6 +203,13 @@ var (
})
)
func CacheLabels(name, component string) prometheus.Labels {
return prometheus.Labels{
LabelCacheType: name,
LabelComponent: component,
}
}
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever
if e == nil {
return

View file

@ -164,7 +164,7 @@ func generateTLSConfig() *tls.Config {
func (s *Server) listenAndServe(server *http3.Server) {
err := server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
if err != nil && err.Error() != "server closed" {
log.Errorln(errors.FullTrace(err))
}
}

View file

@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) {
return c, errors.Prefix("connection error", err)
}
func (p *Store) Name() string { return "http3" }
// Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) {
c, err := p.getClient()

View file

@ -34,7 +34,7 @@ var availabilityRequests = []pair{
}
func getServer(t *testing.T, withBlobs bool) *Server {
st := store.NewMemoryBlobStore()
st := store.NewMemStore()
if withBlobs {
for k, v := range blobs {
err := st.Put(k, v)

View file

@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) {
return c, errors.Prefix("connection error", err)
}
func (p *Store) Name() string { return "peer" }
// Has asks the peer if they have a hash
func (p *Store) Has(hash string) (bool, error) {
c, err := p.getClient()

View file

@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) {
t.Fatal(err)
}
srv := NewServer(store.NewMemoryBlobStore())
srv := NewServer(store.NewMemStore())
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil {
t.Fatal(err)
@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) {
t.Fatal(err)
}
srv := NewServer(store.NewMemoryBlobStore())
srv := NewServer(store.NewMemStore())
srv.Timeout = testTimeout
err = srv.Start("127.0.0.1:" + strconv.Itoa(port))
if err != nil {
@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) {
//}
type mockPartialStore struct {
*store.MemoryBlobStore
*store.MemStore
missing []string
}
@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) {
missing[i] = bits.Rand().String()
}
st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing})
st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing})
if _, ok := st.(neededBlobChecker); !ok {
t.Fatal("mock does not implement the relevant interface")
}

View file

@ -7,26 +7,32 @@ import (
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"golang.org/x/sync/singleflight"
)
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance.
// CachingStore combines two stores, typically a local and a remote store, to improve performance.
// Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they
// are retrieved from the origin and cached. Puts are cached and also forwarded to the origin.
type CachingBlobStore struct {
type CachingStore struct {
origin, cache BlobStore
sf *singleflight.Group
component string
}
// NewCachingBlobStore makes a new caching disk store and returns a pointer to it.
func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore {
return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)}
// NewCachingStore makes a new caching disk store and returns a pointer to it.
func NewCachingStore(component string, origin, cache BlobStore) *CachingStore {
return &CachingStore{
component: component,
origin: WithSingleFlight(component, origin),
cache: cache,
}
}
const nameCaching = "caching"
// Name is the cache type name
func (c *CachingStore) Name() string { return nameCaching }
// Has checks the cache and then the origin for a hash. It returns true if either store has it.
func (c *CachingBlobStore) Has(hash string) (bool, error) {
func (c *CachingStore) Has(hash string) (bool, error) {
has, err := c.cache.Has(hash)
if has || err != nil {
return has, err
@ -36,49 +42,33 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) {
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
// from the origin, it is also stored in the cache.
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) {
func (c *CachingStore) Get(hash string) (stream.Blob, error) {
start := time.Now()
blob, err := c.cache.Get(hash)
if err == nil || !errors.Is(err, ErrBlobNotFound) {
metrics.CacheHitCount.Inc()
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate)
metrics.CacheRetrievalSpeed.With(map[string]string{
metrics.LabelCacheType: c.cache.Name(),
metrics.LabelComponent: c.component,
metrics.LabelSource: "cache",
}).Set(rate)
return blob, err
}
metrics.CacheMissCount.Inc()
return c.getFromOrigin(hash)
}
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
// getFromOrigin ensures that only one Get per hash is sent to the origin at a time,
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) {
metrics.CacheWaitingRequestsCount.Inc()
defer metrics.CacheWaitingRequestsCount.Dec()
originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) {
metrics.CacheOriginRequestsCount.Inc()
defer metrics.CacheOriginRequestsCount.Dec()
start := time.Now()
blob, err := c.origin.Get(hash)
if err != nil {
return nil, err
}
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate)
err = c.cache.Put(hash, blob)
return blob, err
})
blob, err = c.origin.Get(hash)
if err != nil {
return nil, err
}
return originBlob.(stream.Blob), nil
err = c.cache.Put(hash, blob)
return blob, err
}
// Put stores the blob in the origin and the cache
func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
func (c *CachingStore) Put(hash string, blob stream.Blob) error {
err := c.origin.Put(hash, blob)
if err != nil {
return err
@ -87,7 +77,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error {
}
// PutSD stores the sd blob in the origin and the cache
func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
func (c *CachingStore) PutSD(hash string, blob stream.Blob) error {
err := c.origin.PutSD(hash, blob)
if err != nil {
return err
@ -96,7 +86,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error {
}
// Delete deletes the blob from the origin and the cache
func (c *CachingBlobStore) Delete(hash string) error {
func (c *CachingStore) Delete(hash string) error {
err := c.origin.Delete(hash)
if err != nil {
return err

View file

@ -9,10 +9,10 @@ import (
"github.com/lbryio/lbry.go/v2/stream"
)
func TestCachingBlobStore_Put(t *testing.T) {
origin := NewMemoryBlobStore()
cache := NewMemoryBlobStore()
s := NewCachingBlobStore(origin, cache)
func TestCachingStore_Put(t *testing.T) {
origin := NewMemStore()
cache := NewMemStore()
s := NewCachingStore("test", origin, cache)
b := []byte("this is a blob of stuff")
hash := "hash"
@ -39,10 +39,10 @@ func TestCachingBlobStore_Put(t *testing.T) {
}
}
func TestCachingBlobStore_CacheMiss(t *testing.T) {
origin := NewMemoryBlobStore()
cache := NewMemoryBlobStore()
s := NewCachingBlobStore(origin, cache)
func TestCachingStore_CacheMiss(t *testing.T) {
origin := NewMemStore()
cache := NewMemStore()
s := NewCachingStore("test", origin, cache)
b := []byte("this is a blob of stuff")
hash := "hash"
@ -76,11 +76,11 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) {
}
}
func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
func TestCachingStore_ThunderingHerd(t *testing.T) {
storeDelay := 100 * time.Millisecond
origin := NewSlowBlobStore(storeDelay)
cache := NewMemoryBlobStore()
s := NewCachingBlobStore(origin, cache)
cache := NewMemStore()
s := NewCachingStore("test", origin, cache)
b := []byte("this is a blob of stuff")
hash := "hash"
@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) {
// SlowBlobStore adds a delay to each request
type SlowBlobStore struct {
mem *MemoryBlobStore
mem *MemStore
delay time.Duration
}
func NewSlowBlobStore(delay time.Duration) *SlowBlobStore {
return &SlowBlobStore{
mem: NewMemoryBlobStore(),
mem: NewMemStore(),
delay: delay,
}
}
func (s *SlowBlobStore) Name() string {
return "slow"
}
func (s *SlowBlobStore) Has(hash string) (bool, error) {
time.Sleep(s.delay)

View file

@ -1,109 +0,0 @@
package store
import (
"io/ioutil"
"net/http"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
log "github.com/sirupsen/logrus"
)
// CloudFrontBlobStore is an CloudFront backed store (retrieval only)
type CloudFrontBlobStore struct {
cfEndpoint string
s3Store *S3BlobStore
}
// NewS3BlobStore returns an initialized S3 store pointer.
func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore {
return &CloudFrontBlobStore{
cfEndpoint: cloudFrontEndpoint,
s3Store: S3Store,
}
}
// Has returns T/F or Error if the store contains the blob.
func (s *CloudFrontBlobStore) Has(hash string) (bool, error) {
url := s.cfEndpoint + hash
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return false, errors.Err(err)
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return false, errors.Err(err)
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusNotFound, http.StatusForbidden:
return false, nil
case http.StatusOK:
return true, nil
default:
return false, errors.Err(res.Status)
}
}
// Get returns the blob slice if present or errors.
func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) {
url := s.cfEndpoint + hash
log.Debugf("Getting %s from S3", hash[:8])
defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
}(time.Now())
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, errors.Err(err)
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, errors.Err(err)
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusNotFound, http.StatusForbidden:
return nil, errors.Err(ErrBlobNotFound)
case http.StatusOK:
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, errors.Err(err)
}
metrics.MtrInBytesS3.Add(float64(len(b)))
return b, nil
default:
return nil, errors.Err(res.Status)
}
}
// Put stores the blob on S3 or errors if S3 store is not present.
func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error {
if s.s3Store != nil {
return s.s3Store.Put(hash, blob)
}
return errors.Err("not implemented in cloudfront store")
}
// PutSD stores the sd blob on S3 or errors if S3 store is not present.
func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error {
if s.s3Store != nil {
return s.s3Store.PutSD(hash, blob)
}
return errors.Err("not implemented in cloudfront store")
}
func (s *CloudFrontBlobStore) Delete(hash string) error {
if s.s3Store != nil {
return s.s3Store.Delete(hash)
}
return errors.Err("not implemented in cloudfront store")
}

105
store/cloudfront_ro.go Normal file
View file

@ -0,0 +1,105 @@
package store
import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)
// CloudFrontROStore reads from cloudfront. All writes panic.
type CloudFrontROStore struct {
endpoint string // cloudflare endpoint
}
// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer.
func NewCloudFrontROStore(endpoint string) *CloudFrontROStore {
return &CloudFrontROStore{endpoint: endpoint}
}
const nameCloudFrontRO = "cloudfront_ro"
// Name is the cache type name
func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO }
// Has checks if the hash is in the store.
func (c *CloudFrontROStore) Has(hash string) (bool, error) {
status, body, err := c.cfRequest(http.MethodHead, hash)
if err != nil {
return false, err
}
defer body.Close()
switch status {
case http.StatusNotFound, http.StatusForbidden:
return false, nil
case http.StatusOK:
return true, nil
default:
return false, errors.Err("unexpected status %d", status)
}
}
// Get gets the blob from Cloudfront.
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) {
log.Debugf("Getting %s from S3", hash[:8])
defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
}(time.Now())
status, body, err := c.cfRequest(http.MethodGet, hash)
if err != nil {
return nil, err
}
defer body.Close()
switch status {
case http.StatusNotFound, http.StatusForbidden:
return nil, errors.Err(ErrBlobNotFound)
case http.StatusOK:
b, err := ioutil.ReadAll(body)
if err != nil {
return nil, errors.Err(err)
}
metrics.MtrInBytesS3.Add(float64(len(b)))
return b, nil
default:
return nil, errors.Err("unexpected status %d", status)
}
}
func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) {
url := c.endpoint + hash
req, err := http.NewRequest(method, url, nil)
if err != nil {
return 0, nil, errors.Err(err)
}
req.Header.Add("User-Agent", "reflector.go/"+meta.Version)
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, nil, errors.Err(err)
}
return res.StatusCode, res.Body, nil
}
func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error {
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
}
func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error {
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
}
func (c *CloudFrontROStore) Delete(_ string) error {
panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore")
}

50
store/cloudfront_rw.go Normal file
View file

@ -0,0 +1,50 @@
package store
import (
"github.com/lbryio/lbry.go/v2/stream"
)
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3.
type CloudFrontRWStore struct {
cf *CloudFrontROStore
s3 *S3Store
}
// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer.
// NOTE: It panics if either argument is nil.
func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore {
if cf == nil || s3 == nil {
panic("both stores must be set")
}
return &CloudFrontRWStore{cf: cf, s3: s3}
}
const nameCloudFrontRW = "cloudfront_rw"
// Name is the cache type name
func (c *CloudFrontRWStore) Name() string { return nameCloudFrontRW }
// Has checks if the hash is in the store.
func (c *CloudFrontRWStore) Has(hash string) (bool, error) {
return c.cf.Has(hash)
}
// Get gets the blob from Cloudfront.
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) {
return c.cf.Get(hash)
}
// Put stores the blob on S3
func (c *CloudFrontRWStore) Put(hash string, blob stream.Blob) error {
return c.s3.Put(hash, blob)
}
// PutSD stores the sd blob on S3
func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error {
return c.s3.PutSD(hash, blob)
}
// Delete deletes the blob from S3
func (c *CloudFrontRWStore) Delete(hash string) error {
return c.s3.Delete(hash)
}

View file

@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore {
return &DBBackedStore{blobs: blobs, db: db}
}
const nameDBBacked = "db-backed"
// Name is the cache type name
func (d *DBBackedStore) Name() string { return nameDBBacked }
// Has returns true if the blob is in the store
func (d *DBBackedStore) Has(hash string) (bool, error) {
return d.db.HasBlob(hash)

View file

@ -4,82 +4,38 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"syscall"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
"github.com/lbryio/reflector.go/store/speedwalk"
)
// DiskBlobStore stores blobs on a local disk
type DiskBlobStore struct {
// DiskStore stores blobs on a local disk
type DiskStore struct {
// the location of blobs on disk
blobDir string
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
prefixLength int
initialized bool
lastChecked time.Time
diskCleanupBusy chan bool
// true if initOnce ran, false otherwise
initialized bool
}
// NewDiskBlobStore returns an initialized file disk store pointer.
func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore {
dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)}
dbs.diskCleanupBusy <- true
return &dbs
}
func (d *DiskBlobStore) dir(hash string) string {
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
return d.blobDir
// NewDiskStore returns an initialized file disk store pointer.
func NewDiskStore(dir string, prefixLength int) *DiskStore {
return &DiskStore{
blobDir: dir,
prefixLength: prefixLength,
}
return path.Join(d.blobDir, hash[:d.prefixLength])
}
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func (d *DiskBlobStore) getUsedSpace() (float32, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(d.blobDir, &stat)
if err != nil {
return 0, err
}
// Available blocks * size per block = available space in bytes
all := stat.Blocks * uint64(stat.Bsize)
free := stat.Bfree * uint64(stat.Bsize)
used := all - free
const nameDisk = "disk"
return float32(used) / float32(all), nil
}
func (d *DiskBlobStore) path(hash string) string {
return path.Join(d.dir(hash), hash)
}
func (d *DiskBlobStore) ensureDirExists(dir string) error {
return errors.Err(os.MkdirAll(dir, 0755))
}
func (d *DiskBlobStore) initOnce() error {
if d.initialized {
return nil
}
err := d.ensureDirExists(d.blobDir)
if err != nil {
return err
}
d.initialized = true
return nil
}
// Name is the cache type name
func (d *DiskStore) Name() string { return nameDisk }
// Has returns T/F or Error if it the blob stored already. It will error with any IO disk error.
func (d *DiskBlobStore) Has(hash string) (bool, error) {
func (d *DiskStore) Has(hash string) (bool, error) {
err := d.initOnce()
if err != nil {
return false, err
@ -90,13 +46,13 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) {
if os.IsNotExist(err) {
return false, nil
}
return false, err
return false, errors.Err(err)
}
return true, nil
}
// Get returns the blob or an error if the blob doesn't exist.
func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
func (d *DiskStore) Get(hash string) (stream.Blob, error) {
err := d.initOnce()
if err != nil {
return nil, err
@ -111,11 +67,12 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
}
defer file.Close()
return ioutil.ReadAll(file)
blob, err := ioutil.ReadAll(file)
return blob, errors.Err(err)
}
// Put stores the blob on disk
func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
func (d *DiskStore) Put(hash string, blob stream.Blob) error {
err := d.initOnce()
if err != nil {
return err
@ -126,16 +83,17 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
return err
}
return ioutil.WriteFile(d.path(hash), blob, 0644)
err = ioutil.WriteFile(d.path(hash), blob, 0644)
return errors.Err(err)
}
// PutSD stores the sd blob on the disk
func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error {
func (d *DiskStore) PutSD(hash string, blob stream.Blob) error {
return d.Put(hash, blob)
}
// Delete deletes the blob from the store
func (d *DiskBlobStore) Delete(hash string) error {
func (d *DiskStore) Delete(hash string) error {
err := d.initOnce()
if err != nil {
return err
@ -149,75 +107,45 @@ func (d *DiskBlobStore) Delete(hash string) error {
return nil
}
return os.Remove(d.path(hash))
err = os.Remove(d.path(hash))
return errors.Err(err)
}
func (d *DiskBlobStore) ensureDiskSpace() {
defer func() {
d.lastChecked = time.Now()
d.diskCleanupBusy <- true
}()
used, err := d.getUsedSpace()
// list returns the hashes of blobs that already exist in the blobDir
func (d *DiskStore) list() ([]string, error) {
err := d.initOnce()
if err != nil {
log.Errorln(err.Error())
return
}
log.Infof("disk usage: %.2f%%\n", used*100)
if used > 0.90 {
log.Infoln("over 0.90, cleaning up")
err = d.WipeOldestBlobs()
if err != nil {
log.Errorln(err.Error())
return
}
log.Infoln("Done cleaning up")
return nil, err
}
return speedwalk.AllFiles(d.blobDir, true)
}
func (d *DiskBlobStore) WipeOldestBlobs() (err error) {
dirs, err := ioutil.ReadDir(d.blobDir)
func (d *DiskStore) dir(hash string) string {
if d.prefixLength <= 0 || len(hash) < d.prefixLength {
return d.blobDir
}
return path.Join(d.blobDir, hash[:d.prefixLength])
}
func (d *DiskStore) path(hash string) string {
return path.Join(d.dir(hash), hash)
}
func (d *DiskStore) ensureDirExists(dir string) error {
return errors.Err(os.MkdirAll(dir, 0755))
}
func (d *DiskStore) initOnce() error {
if d.initialized {
return nil
}
err := d.ensureDirExists(d.blobDir)
if err != nil {
return err
}
type datedFile struct {
Atime time.Time
File *os.FileInfo
FullPath string
}
datedFiles := make([]datedFile, 0, 5000)
for _, dir := range dirs {
if dir.IsDir() {
files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
if err != nil {
return err
}
for _, file := range files {
if file.Mode().IsRegular() && !file.IsDir() {
datedFiles = append(datedFiles, datedFile{
Atime: atime(file),
File: &file,
FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()),
})
}
}
}
}
sort.Slice(datedFiles, func(i, j int) bool {
return datedFiles[i].Atime.Before(datedFiles[j].Atime)
})
//delete the first 50000 blobs
for i, df := range datedFiles {
if i >= 50000 {
break
}
log.Infoln(df.FullPath)
log.Infoln(df.Atime.String())
err = os.Remove(df.FullPath)
if err != nil {
return err
}
}
d.initialized = true
return nil
}

120
store/lru.go Normal file
View file

@ -0,0 +1,120 @@
package store
import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
golru "github.com/hashicorp/golang-lru"
anbsky commented 2020-10-23 07:26:17 +02:00 (Migrated from github.com)
Review

We're using a different cache lib in @nikooo777 's proposed cache in lbrytv-player. Is either of them better than the other? Should probably keep it simple and stick to one.

We're using [a different cache lib](https://github.com/karlseguin/ccache) in @nikooo777 's [proposed cache](https://github.com/lbryio/lbrytv-player/pull/25/files) in lbrytv-player. Is either of them better than the other? Should probably keep it simple and stick to one.
lyoshenka commented 2020-10-23 17:34:30 +02:00 (Migrated from github.com)
Review

Yep. We're actually planning to merge reflector and player into a single program/process so they don't have to send blobs back and forth via TCP.

Yep. We're actually planning to merge reflector and player into a single program/process so they don't have to send blobs back and forth via TCP.
)
// LRUStore adds a max cache size and LRU eviction to a BlobStore
type LRUStore struct {
// underlying store
store BlobStore
// lru implementation
lru *golru.Cache
}
// NewLRUStore initialize a new LRUStore
func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore {
lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) {
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
})
if err != nil {
panic(err)
}
l := &LRUStore{
store: store,
lru: lru,
}
if lstr, ok := store.(lister); ok {
err = l.loadExisting(lstr, maxItems)
if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
}
}
return l
}
const nameLRU = "lru"
// Name is the cache type name
func (l *LRUStore) Name() string { return nameLRU }
// Has returns whether the blob is in the store, without updating the recent-ness.
func (l *LRUStore) Has(hash string) (bool, error) {
return l.lru.Contains(hash), nil
}
// Get returns the blob or an error if the blob doesn't exist.
func (l *LRUStore) Get(hash string) (stream.Blob, error) {
_, has := l.lru.Get(hash)
if !has {
return nil, errors.Err(ErrBlobNotFound)
}
blob, err := l.store.Get(hash)
if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store
l.lru.Remove(hash)
}
return blob, err
}
// Put stores the blob
func (l *LRUStore) Put(hash string, blob stream.Blob) error {
err := l.store.Put(hash, blob)
if err != nil {
return err
}
l.lru.Add(hash, true)
return nil
}
// PutSD stores the sd blob
func (l *LRUStore) PutSD(hash string, blob stream.Blob) error {
err := l.store.PutSD(hash, blob)
if err != nil {
return err
}
l.lru.Add(hash, true)
return nil
}
// Delete deletes the blob from the store
func (l *LRUStore) Delete(hash string) error {
err := l.store.Delete(hash)
if err != nil {
return err
}
// This must come after store.Delete()
// Remove triggers onEvict function, which also tries to delete blob from store
// We need to delete it manually first so any errors can be propagated up
l.lru.Remove(hash)
return nil
}
// loadExisting imports existing blobs from the underlying store into the LRU cache
func (l *LRUStore) loadExisting(store lister, maxItems int) error {
existing, err := store.list()
if err != nil {
return err
}
added := 0
for _, h := range existing {
l.lru.Add(h, true)
added++
if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache
break
}
}
return nil
}

121
store/lru_test.go Normal file
View file

@ -0,0 +1,121 @@
package store
import (
"io/ioutil"
"os"
"reflect"
"testing"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
anbsky commented 2020-10-23 07:33:13 +02:00 (Migrated from github.com)
Review

Given that test functions have to be named Test*, this looks like a typo. createTestLRUstore?

Given that test functions have to be named `Test*`, this looks like a typo. `createTestLRUstore`?
lyoshenka commented 2020-10-23 17:35:00 +02:00 (Migrated from github.com)
Review

agreed

agreed
const cacheMaxBlobs = 3
func getTestLRUStore() (*LRUStore, *MemStore) {
m := NewMemStore()
return NewLRUStore("test", m, 3), m
}
func TestLRUStore_Eviction(t *testing.T) {
lru, mem := getTestLRUStore()
b := []byte("x")
err := lru.Put("one", b)
require.NoError(t, err)
err = lru.Put("two", b)
require.NoError(t, err)
err = lru.Put("three", b)
require.NoError(t, err)
err = lru.Put("four", b)
require.NoError(t, err)
err = lru.Put("five", b)
require.NoError(t, err)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{
"one": false,
"two": false,
"three": true,
"four": true,
"five": true,
"six": false,
} {
has, err := lru.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
lru.Get("three") // touch so it stays in cache
lru.Put("six", b)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
for k, v := range map[string]bool{
"one": false,
"two": false,
"three": true,
"four": false,
"five": true,
"six": true,
} {
has, err := lru.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
err = lru.Delete("three")
assert.NoError(t, err)
err = lru.Delete("five")
assert.NoError(t, err)
err = lru.Delete("six")
assert.NoError(t, err)
assert.Equal(t, 0, len(mem.Debug()))
}
func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
lru, mem := getTestLRUStore()
hash := "hash"
b := []byte("this is a blob of stuff")
err := lru.Put(hash, b)
require.NoError(t, err)
err = mem.Delete(hash)
require.NoError(t, err)
// hash still exists in lru
assert.True(t, lru.lru.Contains(hash))
blob, err := lru.Get(hash)
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
reflect.TypeOf(err).String(), err.Error())
// lru.Get() removes hash if underlying store doesn't have it
assert.False(t, lru.lru.Contains(hash))
}
func TestLRUStore_loadExisting(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "reflector_test_*")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
d := NewDiskStore(tmpDir, 2)
hash := "hash"
b := []byte("this is a blob of stuff")
err = d.Put(hash, b)
require.NoError(t, err)
existing, err := d.list()
require.NoError(t, err)
require.Equal(t, 1, len(existing), "blob should exist in cache")
assert.Equal(t, hash, existing[0])
lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created
has, err := lru.Has(hash)
require.NoError(t, err)
assert.True(t, has, "hash should be loaded from disk store but it's not")
}

View file

@ -1,29 +1,42 @@
package store
import (
"sync"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
)
// MemoryBlobStore is an in memory only blob store with no persistence.
type MemoryBlobStore struct {
// MemStore is an in memory only blob store with no persistence.
type MemStore struct {
blobs map[string]stream.Blob
mu *sync.RWMutex
}
func NewMemoryBlobStore() *MemoryBlobStore {
return &MemoryBlobStore{
func NewMemStore() *MemStore {
return &MemStore{
blobs: make(map[string]stream.Blob),
mu: &sync.RWMutex{},
}
}
const nameMem = "mem"
// Name is the cache type name
func (m *MemStore) Name() string { return nameMem }
// Has returns T/F if the blob is currently stored. It will never error.
func (m *MemoryBlobStore) Has(hash string) (bool, error) {
func (m *MemStore) Has(hash string) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.blobs[hash]
return ok, nil
}
// Get returns the blob byte slice if present and errors if the blob is not found.
func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
func (m *MemStore) Get(hash string) (stream.Blob, error) {
m.mu.RLock()
defer m.mu.RUnlock()
blob, ok := m.blobs[hash]
if !ok {
return nil, errors.Err(ErrBlobNotFound)
@ -32,23 +45,29 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) {
}
// Put stores the blob in memory
func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error {
func (m *MemStore) Put(hash string, blob stream.Blob) error {
m.mu.Lock()
defer m.mu.Unlock()
m.blobs[hash] = blob
return nil
}
// PutSD stores the sd blob in memory
func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error {
func (m *MemStore) PutSD(hash string, blob stream.Blob) error {
return m.Put(hash, blob)
}
// Delete deletes the blob from the store
func (m *MemoryBlobStore) Delete(hash string) error {
func (m *MemStore) Delete(hash string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.blobs, hash)
return nil
}
// Debug returns the blobs in memory. It's useful for testing and debugging.
func (m *MemoryBlobStore) Debug() map[string]stream.Blob {
func (m *MemStore) Debug() map[string]stream.Blob {
m.mu.RLock()
defer m.mu.RUnlock()
return m.blobs
}

View file

@ -7,8 +7,8 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
)
func TestMemoryBlobStore_Put(t *testing.T) {
s := NewMemoryBlobStore()
func TestMemStore_Put(t *testing.T) {
s := NewMemStore()
blob := []byte("abcdefg")
err := s.Put("abc", blob)
if err != nil {
@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) {
}
}
func TestMemoryBlobStore_Get(t *testing.T) {
s := NewMemoryBlobStore()
func TestMemStore_Get(t *testing.T) {
s := NewMemStore()
hash := "abc"
blob := []byte("abcdefg")
err := s.Put(hash, blob)

15
store/noop.go Normal file
View file

@ -0,0 +1,15 @@
package store
import "github.com/lbryio/lbry.go/v2/stream"
// NoopStore is a store that does nothing
type NoopStore struct{}
const nameNoop = "noop"
func (n *NoopStore) Name() string { return nameNoop }
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) Delete(_ string) error { return nil }

View file

@ -18,8 +18,8 @@ import (
log "github.com/sirupsen/logrus"
)
// S3BlobStore is an S3 store
type S3BlobStore struct {
// S3Store is an S3 store
type S3Store struct {
awsID string
awsSecret string
region string
@ -28,9 +28,9 @@ type S3BlobStore struct {
session *session.Session
}
// NewS3BlobStore returns an initialized S3 store pointer.
func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
return &S3BlobStore{
// NewS3Store returns an initialized S3 store pointer.
func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store {
return &S3Store{
awsID: awsID,
awsSecret: awsSecret,
region: region,
@ -38,25 +38,13 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore {
}
}
func (s *S3BlobStore) initOnce() error {
if s.session != nil {
return nil
}
const nameS3 = "s3"
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
Region: aws.String(s.region),
})
if err != nil {
return err
}
s.session = sess
return nil
}
// Name is the cache type name
func (s *S3Store) Name() string { return nameS3 }
// Has returns T/F or Error ( from S3 ) if the store contains the blob.
func (s *S3BlobStore) Has(hash string) (bool, error) {
func (s *S3Store) Has(hash string) (bool, error) {
err := s.initOnce()
if err != nil {
return false, err
@ -77,7 +65,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) {
}
// Get returns the blob slice if present or errors on S3.
func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
func (s *S3Store) Get(hash string) (stream.Blob, error) {
//Todo-Need to handle error for blob doesn't exist for consistency.
err := s.initOnce()
if err != nil {
@ -110,7 +98,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) {
}
// Put stores the blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
func (s *S3Store) Put(hash string, blob stream.Blob) error {
err := s.initOnce()
if err != nil {
return err
@ -133,12 +121,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
}
// PutSD stores the sd blob on S3 or errors if S3 connection errors.
func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error {
func (s *S3Store) PutSD(hash string, blob stream.Blob) error {
//Todo - handle missing stream for consistency
return s.Put(hash, blob)
}
func (s *S3BlobStore) Delete(hash string) error {
func (s *S3Store) Delete(hash string) error {
err := s.initOnce()
if err != nil {
return err
@ -153,3 +141,20 @@ func (s *S3BlobStore) Delete(hash string) error {
return err
}
func (s *S3Store) initOnce() error {
if s.session != nil {
return nil
}
sess, err := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""),
Region: aws.String(s.region),
})
if err != nil {
return err
}
s.session = sess
return nil
}

67
store/singleflight.go Normal file
View file

@ -0,0 +1,67 @@
package store
import (
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/lbry.go/v2/stream"
"golang.org/x/sync/singleflight"
)
func WithSingleFlight(component string, origin BlobStore) BlobStore {
return &singleflightStore{
BlobStore: origin,
component: component,
sf: new(singleflight.Group),
}
}
type singleflightStore struct {
BlobStore
component string
sf *singleflight.Group
}
func (s *singleflightStore) Name() string {
return "sf_" + s.BlobStore.Name()
}
// Get ensures that only one request per hash is sent to the origin at a time,
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
blob, err, _ := s.sf.Do(hash, s.getter(hash))
if err != nil {
return nil, err
}
return blob.(stream.Blob), nil
}
// getter returns a function that gets a blob from the origin
// only one getter per hash will be executing at a time
func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
return func() (interface{}, error) {
metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc()
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec()
start := time.Now()
blob, err := s.BlobStore.Get(hash)
if err != nil {
return nil, err
}
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
metrics.CacheRetrievalSpeed.With(map[string]string{
metrics.LabelCacheType: s.BlobStore.Name(),
metrics.LabelComponent: s.component,
metrics.LabelSource: "origin",
}).Set(rate)
return blob, nil
}
}

View file

@ -0,0 +1,89 @@
package speedwalk
import (
"io/ioutil"
"path/filepath"
"runtime"
"sync"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/karrick/godirwalk"
"github.com/sirupsen/logrus"
)
// AllFiles recursively lists every file in every subdirectory of a given directory
// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir.
func AllFiles(startDir string, basename bool) ([]string, error) {
items, err := ioutil.ReadDir(startDir)
if err != nil {
return nil, err
}
pathChan := make(chan string)
paths := make([]string, 0, 1000)
pathWG := &sync.WaitGroup{}
pathWG.Add(1)
go func() {
defer pathWG.Done()
for {
path, ok := <-pathChan
if !ok {
return
}
paths = append(paths, path)
}
}()
maxThreads := runtime.NumCPU() - 1
goroutineLimiter := make(chan struct{}, maxThreads)
for i := 0; i < maxThreads; i++ {
goroutineLimiter <- struct{}{}
}
walkerWG := &sync.WaitGroup{}
for _, item := range items {
if !item.IsDir() {
if basename {
pathChan <- item.Name()
} else {
pathChan <- filepath.Join(startDir, item.Name())
}
continue
}
<-goroutineLimiter
walkerWG.Add(1)
go func(dir string) {
defer func() {
walkerWG.Done()
goroutineLimiter <- struct{}{}
}()
err = godirwalk.Walk(filepath.Join(startDir, dir), &godirwalk.Options{
Unsorted: true, // faster this way
Callback: func(osPathname string, de *godirwalk.Dirent) error {
if de.IsRegular() {
if basename {
pathChan <- de.Name()
} else {
pathChan <- osPathname
}
}
return nil
},
})
if err != nil {
logrus.Errorf(errors.FullTrace(err))
}
}(item.Name())
}
walkerWG.Wait()
close(pathChan)
pathWG.Wait()
return paths, nil
}

View file

@ -7,15 +7,17 @@ import (
// BlobStore is an interface for handling blob storage.
type BlobStore interface {
// Does blob exist in the store
// Name of blob store (useful for metrics)
Name() string
// Does blob exist in the store.
Has(hash string) (bool, error)
// Get the blob from the store
// Get the blob from the store. Must return ErrBlobNotFound if blob is not in store.
Get(hash string) (stream.Blob, error)
// Put the blob into the store
// Put the blob into the store.
Put(hash string, blob stream.Blob) error
// Put an SD blob into the store
// Put an SD blob into the store.
PutSD(hash string, blob stream.Blob) error
// Delete the blob from the store
// Delete the blob from the store.
Delete(hash string) error
}
@ -27,5 +29,11 @@ type Blocklister interface {
Wants(hash string) (bool, error)
}
// lister is a store that can list cached blobs. This is helpful when an overlay
// cache needs to track blob existence.
type lister interface {
list() ([]string, error)
}
//ErrBlobNotFound is a standard error when a blob is not found in the store.
var ErrBlobNotFound = errors.Base("blob not found")