diff --git a/cmd/getstream.go b/cmd/getstream.go index f3f6d8f..c02ad0b 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -28,9 +28,10 @@ func getStreamCmd(cmd *cobra.Command, args []string) { addr := args[0] sdHash := args[1] - s := store.NewCachingBlobStore( + s := store.NewCachingStore( + "getstream", peer.NewStore(peer.StoreOpts{Address: addr}), - store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2), + store.NewDiskStore("/tmp/lbry_downloaded_blobs", 2), ) wd, err := os.Getwd() diff --git a/cmd/peer.go b/cmd/peer.go index 88c49e0..3cb2a19 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -29,7 +29,7 @@ func init() { func peerCmd(cmd *cobra.Command, args []string) { var err error - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) peerServer := peer.NewServer(s3) if !peerNoDB { diff --git a/cmd/reflector.go b/cmd/reflector.go index f071f60..2f687d0 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -4,6 +4,7 @@ import ( "os" "os/signal" "strconv" + "strings" "syscall" "time" @@ -16,21 +17,25 @@ import ( "github.com/lbryio/reflector.go/store" log "github.com/sirupsen/logrus" + "github.com/spf13/cast" "github.com/spf13/cobra" ) -var reflectorCmdCacheDir string -var tcpPeerPort int -var http3PeerPort int -var receiverPort int -var metricsPort int -var disableUploads bool -var disableBlocklist bool -var proxyAddress string -var proxyPort string -var proxyProtocol string -var useDB bool -var cloudFrontEndpoint string +var ( + tcpPeerPort int + http3PeerPort int + receiverPort int + metricsPort int + disableUploads bool + disableBlocklist bool + proxyAddress string + proxyPort string + proxyProtocol string + useDB bool + cloudFrontEndpoint string + reflectorCmdDiskCache string + reflectorCmdMemCache int +) func init() { var cmd = &cobra.Command{ @@ -38,7 +43,6 @@ func init() { Short: "Run reflector server", Run: reflectorCmd, } - cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)") cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") @@ -50,94 +54,142 @@ func init() { cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") + cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "", + "enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'") + cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") rootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - var blobStore store.BlobStore - if proxyAddress != "" { - switch proxyProtocol { - case "tcp": - blobStore = peer.NewStore(peer.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - case "http3": - blobStore = http3.NewStore(http3.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - default: - log.Fatalf("specified protocol is not recognized: %s", proxyProtocol) - } - } else { - s3Store := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - if cloudFrontEndpoint != "" { - blobStore = store.NewCloudFrontBlobStore(cloudFrontEndpoint, s3Store) - } else { - blobStore = s3Store - } - } + // the blocklist logic requires the db backed store to be the outer-most store + underlyingStore := setupStore() + outerStore := wrapWithCache(underlyingStore) - var err error - var reflectorServer *reflector.Server + if !disableUploads { + reflectorServer := reflector.NewServer(underlyingStore) + reflectorServer.Timeout = 3 * time.Minute + reflectorServer.EnableBlocklist = !disableBlocklist - if useDB { - db := new(db.SQL) - db.TrackAccessTime = true - err = db.Connect(globalConfig.DBConn) + err := reflectorServer.Start(":" + strconv.Itoa(receiverPort)) if err != nil { log.Fatal(err) } - - blobStore = store.NewDBBackedStore(blobStore, db) - - //this shouldn't go here but the blocklist logic requires the db backed store to be the outer-most store for it to work.... - //having this here prevents uploaded blobs from being stored in the disk cache - if !disableUploads { - reflectorServer = reflector.NewServer(blobStore) - reflectorServer.Timeout = 3 * time.Minute - reflectorServer.EnableBlocklist = !disableBlocklist - - err = reflectorServer.Start(":" + strconv.Itoa(receiverPort)) - if err != nil { - log.Fatal(err) - } - } + defer reflectorServer.Shutdown() } - if reflectorCmdCacheDir != "" { - err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm) - if err != nil { - log.Fatal(err) - } - blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2)) - } - - peerServer := peer.NewServer(blobStore) - err = peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) + peerServer := peer.NewServer(outerStore) + err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) if err != nil { log.Fatal(err) } + defer peerServer.Shutdown() - http3PeerServer := http3.NewServer(blobStore) + http3PeerServer := http3.NewServer(outerStore) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) if err != nil { log.Fatal(err) } + defer http3PeerServer.Shutdown() metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() + defer metricsServer.Shutdown() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan - metricsServer.Shutdown() - peerServer.Shutdown() - http3PeerServer.Shutdown() - if reflectorServer != nil { - reflectorServer.Shutdown() - } + // deferred shutdowns happen now +} + +func setupStore() store.BlobStore { + var s store.BlobStore + + if proxyAddress != "" { + switch proxyProtocol { + case "tcp": + s = peer.NewStore(peer.StoreOpts{ + Address: proxyAddress + ":" + proxyPort, + Timeout: 30 * time.Second, + }) + case "http3": + s = http3.NewStore(http3.StoreOpts{ + Address: proxyAddress + ":" + proxyPort, + Timeout: 30 * time.Second, + }) + default: + log.Fatalf("protocol is not recognized: %s", proxyProtocol) + } + } else { + s3Store := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + if cloudFrontEndpoint != "" { + s = store.NewCloudFrontRWStore(store.NewCloudFrontROStore(cloudFrontEndpoint), s3Store) + } else { + s = s3Store + } + } + + if useDB { + db := new(db.SQL) + db.TrackAccessTime = true + err := db.Connect(globalConfig.DBConn) + if err != nil { + log.Fatal(err) + } + + s = store.NewDBBackedStore(s, db) + } + + return s +} + +func wrapWithCache(s store.BlobStore) store.BlobStore { + wrapped := s + + diskCacheMaxSize, diskCachePath := diskCacheParams() + if diskCacheMaxSize > 0 { + err := os.MkdirAll(diskCachePath, os.ModePerm) + if err != nil { + log.Fatal(err) + } + wrapped = store.NewCachingStore( + "reflector", + wrapped, + store.NewLRUStore("peer_server", store.NewDiskStore(diskCachePath, 2), diskCacheMaxSize), + ) + } + + if reflectorCmdMemCache > 0 { + wrapped = store.NewCachingStore( + "reflector", + wrapped, + store.NewLRUStore("peer_server", store.NewMemStore(), reflectorCmdMemCache), + ) + } + + return wrapped +} + +func diskCacheParams() (int, string) { + if reflectorCmdDiskCache == "" { + return 0, "" + } + + parts := strings.Split(reflectorCmdDiskCache, ":") + if len(parts) != 2 { + log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string") + } + + maxSize := cast.ToInt(parts[0]) + if maxSize <= 0 { + log.Fatalf("--disk-cache max size must be more than 0") + } + + path := parts[1] + if len(path) == 0 || path[0] != '/' { + log.Fatalf("--disk-cache path must start with '/'") + } + + return maxSize, path } diff --git a/cmd/start.go b/cmd/start.go index a82d597..4d4a56b 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -55,7 +55,7 @@ func startCmd(cmd *cobra.Command, args []string) { db := new(db.SQL) err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + s3 := store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) comboStore := store.NewDBBackedStore(s3, db) conf := prism.DefaultConf() diff --git a/cmd/test.go b/cmd/test.go index 051ddf7..a330856 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -29,7 +29,7 @@ func init() { func testCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - memStore := store.NewMemoryBlobStore() + memStore := store.NewMemStore() reflectorServer := reflector.NewServer(memStore) reflectorServer.Timeout = 3 * time.Minute diff --git a/cmd/upload.go b/cmd/upload.go index 006f007..636e824 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -35,7 +35,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { checkErr(err) st := store.NewDBBackedStore( - store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), + store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), db) uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload) diff --git a/go.mod b/go.mod index 4002591..9df120d 100644 --- a/go.mod +++ b/go.mod @@ -14,11 +14,12 @@ require ( github.com/google/gops v0.3.7 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-msgpack v0.5.5 // indirect - github.com/hashicorp/golang-lru v0.5.3 // indirect + github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/memberlist v0.1.4 // indirect github.com/hashicorp/serf v0.8.2 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/johntdyer/slackrus v0.0.0-20180518184837-f7aae3243a07 + github.com/karrick/godirwalk v1.16.1 github.com/lbryio/chainquery v1.9.0 github.com/lbryio/lbry.go v1.1.2 // indirect github.com/lbryio/lbry.go/v2 v2.6.1-0.20200901175808-73382bb02128 @@ -27,9 +28,11 @@ require ( github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 github.com/prometheus/client_golang v0.9.2 github.com/sirupsen/logrus v1.4.2 + github.com/spf13/afero v1.4.1 github.com/spf13/cast v1.3.0 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect + github.com/stretchr/testify v1.4.0 github.com/volatiletech/null v8.0.0+incompatible go.uber.org/atomic v1.5.1 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 @@ -37,6 +40,7 @@ require ( golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect google.golang.org/appengine v1.6.2 // indirect + gotest.tools v2.2.0+incompatible ) go 1.15 diff --git a/go.sum b/go.sum index 4e9282a..84b207a 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno= github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= @@ -186,12 +188,15 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1 h1:PJPDf8OUfOK1bb/NeTKd4f1QXZItOX389VN3B6qC8ro= github.com/kardianos/osext v0.0.0-20170510131534-ae77be60afb1/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8= +github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw= +github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= github.com/keybase/go-ps v0.0.0-20161005175911-668c8856d999/go.mod h1:hY+WOq6m2FpbvyrI93sMaypsttvaIL5nhVR92dTMUcQ= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -280,6 +285,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= @@ -346,6 +352,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I= github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ= +github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= @@ -394,6 +402,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -471,6 +480,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4d70cd1..8d46266 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -12,6 +12,7 @@ import ( ee "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -57,7 +58,8 @@ func (s *Server) Shutdown() { } const ( - ns = "reflector" + ns = "reflector" + subsystemCache = "cache" labelDirection = "direction" labelErrorType = "error_type" @@ -65,7 +67,9 @@ const ( DirectionUpload = "upload" // to reflector DirectionDownload = "download" // from reflector - MtrLabelSource = "source" + LabelCacheType = "cache_type" + LabelComponent = "component" + LabelSource = "source" errConnReset = "conn_reset" errReadConnReset = "read_conn_reset" @@ -92,6 +96,12 @@ const ( ) var ( + ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Name: "error_total", + Help: "Total number of errors", + }, []string{labelDirection, labelErrorType}) + BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "blob_download_total", @@ -107,27 +117,44 @@ var ( Name: "http3_blob_download_total", Help: "Total number of blobs downloaded from reflector through QUIC protocol", }) - CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{ + + CacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, - Name: "cache_hit_total", + Subsystem: subsystemCache, + Name: "hit_total", Help: "Total number of blobs retrieved from the cache storage", - }) - CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{ + }, []string{LabelCacheType, LabelComponent}) + CacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: ns, - Name: "cache_miss_total", + Subsystem: subsystemCache, + Name: "miss_total", Help: "Total number of blobs retrieved from origin rather than cache storage", - }) - CacheOriginRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ + }, []string{LabelCacheType, LabelComponent}) + CacheOriginRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, - Name: "cache_origin_requests_total", + Subsystem: subsystemCache, + Name: "origin_requests_total", Help: "How many Get requests are in flight from the cache to the origin", - }) + }, []string{LabelCacheType, LabelComponent}) // during thundering-herd situations, the metric below should be a lot smaller than the metric above - CacheWaitingRequestsCount = promauto.NewGauge(prometheus.GaugeOpts{ + CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, - Name: "cache_waiting_requests_total", + Subsystem: subsystemCache, + Name: "waiting_requests_total", Help: "How many cache requests are waiting for an in-flight origin request", - }) + }, []string{LabelCacheType, LabelComponent}) + CacheLRUEvictCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: subsystemCache, + Name: "evict_total", + Help: "Count of blobs evicted from cache", + }, []string{LabelCacheType, LabelComponent}) + CacheRetrievalSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "speed_mbps", + Help: "Speed of blob retrieval from cache or from origin", + }, []string{LabelCacheType, LabelComponent, LabelSource}) + BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "blob_upload_total", @@ -138,16 +165,7 @@ var ( Name: "sdblob_upload_total", Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", }) - RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: ns, - Name: "speed_mbps", - Help: "Speed of blob retrieval", - }, []string{MtrLabelSource}) - ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: ns, - Name: "error_total", - Help: "Total number of errors", - }, []string{labelDirection, labelErrorType}) + MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ Namespace: ns, Name: "tcp_in_bytes", @@ -185,6 +203,13 @@ var ( }) ) +func CacheLabels(name, component string) prometheus.Labels { + return prometheus.Labels{ + LabelCacheType: name, + LabelComponent: component, + } +} + func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever if e == nil { return diff --git a/peer/http3/server.go b/peer/http3/server.go index d0a39e1..55ab5fc 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -164,7 +164,7 @@ func generateTLSConfig() *tls.Config { func (s *Server) listenAndServe(server *http3.Server) { err := server.ListenAndServe() - if err != nil && !errors.Is(err, http.ErrServerClosed) { + if err != nil && err.Error() != "server closed" { log.Errorln(errors.FullTrace(err)) } } diff --git a/peer/http3/store.go b/peer/http3/store.go index dea0290..4749ab0 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -55,6 +55,8 @@ func (p *Store) getClient() (*Client, error) { return c, errors.Prefix("connection error", err) } +func (p *Store) Name() string { return "http3" } + // Has asks the peer if they have a hash func (p *Store) Has(hash string) (bool, error) { c, err := p.getClient() diff --git a/peer/server_test.go b/peer/server_test.go index 2e1d9bc..7d21921 100644 --- a/peer/server_test.go +++ b/peer/server_test.go @@ -34,7 +34,7 @@ var availabilityRequests = []pair{ } func getServer(t *testing.T, withBlobs bool) *Server { - st := store.NewMemoryBlobStore() + st := store.NewMemStore() if withBlobs { for k, v := range blobs { err := st.Put(k, v) diff --git a/peer/store.go b/peer/store.go index 3857426..b8abedd 100644 --- a/peer/store.go +++ b/peer/store.go @@ -30,6 +30,8 @@ func (p *Store) getClient() (*Client, error) { return c, errors.Prefix("connection error", err) } +func (p *Store) Name() string { return "peer" } + // Has asks the peer if they have a hash func (p *Store) Has(hash string) (bool, error) { c, err := p.getClient() diff --git a/reflector/server_test.go b/reflector/server_test.go index 1dd7e83..0de200e 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -22,7 +22,7 @@ func startServerOnRandomPort(t *testing.T) (*Server, int) { t.Fatal(err) } - srv := NewServer(store.NewMemoryBlobStore()) + srv := NewServer(store.NewMemStore()) err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { t.Fatal(err) @@ -119,7 +119,7 @@ func TestServer_Timeout(t *testing.T) { t.Fatal(err) } - srv := NewServer(store.NewMemoryBlobStore()) + srv := NewServer(store.NewMemStore()) srv.Timeout = testTimeout err = srv.Start("127.0.0.1:" + strconv.Itoa(port)) if err != nil { @@ -161,7 +161,7 @@ func TestServer_Timeout(t *testing.T) { //} type mockPartialStore struct { - *store.MemoryBlobStore + *store.MemStore missing []string } @@ -181,7 +181,7 @@ func TestServer_PartialUpload(t *testing.T) { missing[i] = bits.Rand().String() } - st := store.BlobStore(&mockPartialStore{MemoryBlobStore: store.NewMemoryBlobStore(), missing: missing}) + st := store.BlobStore(&mockPartialStore{MemStore: store.NewMemStore(), missing: missing}) if _, ok := st.(neededBlobChecker); !ok { t.Fatal("mock does not implement the relevant interface") } diff --git a/store/caching.go b/store/caching.go index 4dd5b6b..0a06395 100644 --- a/store/caching.go +++ b/store/caching.go @@ -7,26 +7,32 @@ import ( "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" - - "golang.org/x/sync/singleflight" ) -// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance. +// CachingStore combines two stores, typically a local and a remote store, to improve performance. // Accessed blobs are stored in and retrieved from the cache. If they are not in the cache, they // are retrieved from the origin and cached. Puts are cached and also forwarded to the origin. -type CachingBlobStore struct { +type CachingStore struct { origin, cache BlobStore - - sf *singleflight.Group + component string } -// NewCachingBlobStore makes a new caching disk store and returns a pointer to it. -func NewCachingBlobStore(origin, cache BlobStore) *CachingBlobStore { - return &CachingBlobStore{origin: origin, cache: cache, sf: new(singleflight.Group)} +// NewCachingStore makes a new caching disk store and returns a pointer to it. +func NewCachingStore(component string, origin, cache BlobStore) *CachingStore { + return &CachingStore{ + component: component, + origin: WithSingleFlight(component, origin), + cache: cache, + } } +const nameCaching = "caching" + +// Name is the cache type name +func (c *CachingStore) Name() string { return nameCaching } + // Has checks the cache and then the origin for a hash. It returns true if either store has it. -func (c *CachingBlobStore) Has(hash string) (bool, error) { +func (c *CachingStore) Has(hash string) (bool, error) { has, err := c.cache.Has(hash) if has || err != nil { return has, err @@ -36,49 +42,33 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) { // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // from the origin, it is also stored in the cache. -func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) { +func (c *CachingStore) Get(hash string) (stream.Blob, error) { start := time.Now() blob, err := c.cache.Get(hash) if err == nil || !errors.Is(err, ErrBlobNotFound) { - metrics.CacheHitCount.Inc() + metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate) + metrics.CacheRetrievalSpeed.With(map[string]string{ + metrics.LabelCacheType: c.cache.Name(), + metrics.LabelComponent: c.component, + metrics.LabelSource: "cache", + }).Set(rate) return blob, err } - metrics.CacheMissCount.Inc() - return c.getFromOrigin(hash) -} + metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() -// getFromOrigin ensures that only one Get per hash is sent to the origin at a time, -// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem -func (c *CachingBlobStore) getFromOrigin(hash string) (stream.Blob, error) { - metrics.CacheWaitingRequestsCount.Inc() - defer metrics.CacheWaitingRequestsCount.Dec() - originBlob, err, _ := c.sf.Do(hash, func() (interface{}, error) { - metrics.CacheOriginRequestsCount.Inc() - defer metrics.CacheOriginRequestsCount.Dec() - - start := time.Now() - blob, err := c.origin.Get(hash) - if err != nil { - return nil, err - } - - rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() - metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate) - - err = c.cache.Put(hash, blob) - return blob, err - }) + blob, err = c.origin.Get(hash) if err != nil { return nil, err } - return originBlob.(stream.Blob), nil + + err = c.cache.Put(hash, blob) + return blob, err } // Put stores the blob in the origin and the cache -func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { +func (c *CachingStore) Put(hash string, blob stream.Blob) error { err := c.origin.Put(hash, blob) if err != nil { return err @@ -87,7 +77,7 @@ func (c *CachingBlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob in the origin and the cache -func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { +func (c *CachingStore) PutSD(hash string, blob stream.Blob) error { err := c.origin.PutSD(hash, blob) if err != nil { return err @@ -96,7 +86,7 @@ func (c *CachingBlobStore) PutSD(hash string, blob stream.Blob) error { } // Delete deletes the blob from the origin and the cache -func (c *CachingBlobStore) Delete(hash string) error { +func (c *CachingStore) Delete(hash string) error { err := c.origin.Delete(hash) if err != nil { return err diff --git a/store/caching_test.go b/store/caching_test.go index 13f44a2..34f928c 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -9,10 +9,10 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -func TestCachingBlobStore_Put(t *testing.T) { - origin := NewMemoryBlobStore() - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) +func TestCachingStore_Put(t *testing.T) { + origin := NewMemStore() + cache := NewMemStore() + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -39,10 +39,10 @@ func TestCachingBlobStore_Put(t *testing.T) { } } -func TestCachingBlobStore_CacheMiss(t *testing.T) { - origin := NewMemoryBlobStore() - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) +func TestCachingStore_CacheMiss(t *testing.T) { + origin := NewMemStore() + cache := NewMemStore() + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -76,11 +76,11 @@ func TestCachingBlobStore_CacheMiss(t *testing.T) { } } -func TestCachingBlobStore_ThunderingHerd(t *testing.T) { +func TestCachingStore_ThunderingHerd(t *testing.T) { storeDelay := 100 * time.Millisecond origin := NewSlowBlobStore(storeDelay) - cache := NewMemoryBlobStore() - s := NewCachingBlobStore(origin, cache) + cache := NewMemStore() + s := NewCachingStore("test", origin, cache) b := []byte("this is a blob of stuff") hash := "hash" @@ -129,16 +129,19 @@ func TestCachingBlobStore_ThunderingHerd(t *testing.T) { // SlowBlobStore adds a delay to each request type SlowBlobStore struct { - mem *MemoryBlobStore + mem *MemStore delay time.Duration } func NewSlowBlobStore(delay time.Duration) *SlowBlobStore { return &SlowBlobStore{ - mem: NewMemoryBlobStore(), + mem: NewMemStore(), delay: delay, } } +func (s *SlowBlobStore) Name() string { + return "slow" +} func (s *SlowBlobStore) Has(hash string) (bool, error) { time.Sleep(s.delay) diff --git a/store/cloudfront.go b/store/cloudfront.go deleted file mode 100644 index fbfafec..0000000 --- a/store/cloudfront.go +++ /dev/null @@ -1,109 +0,0 @@ -package store - -import ( - "io/ioutil" - "net/http" - "time" - - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" - "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/meta" - - log "github.com/sirupsen/logrus" -) - -// CloudFrontBlobStore is an CloudFront backed store (retrieval only) -type CloudFrontBlobStore struct { - cfEndpoint string - s3Store *S3BlobStore -} - -// NewS3BlobStore returns an initialized S3 store pointer. -func NewCloudFrontBlobStore(cloudFrontEndpoint string, S3Store *S3BlobStore) *CloudFrontBlobStore { - return &CloudFrontBlobStore{ - cfEndpoint: cloudFrontEndpoint, - s3Store: S3Store, - } -} - -// Has returns T/F or Error if the store contains the blob. -func (s *CloudFrontBlobStore) Has(hash string) (bool, error) { - url := s.cfEndpoint + hash - - req, err := http.NewRequest("HEAD", url, nil) - if err != nil { - return false, errors.Err(err) - } - req.Header.Add("User-Agent", "reflector.go/"+meta.Version) - res, err := http.DefaultClient.Do(req) - if err != nil { - return false, errors.Err(err) - } - defer res.Body.Close() - - switch res.StatusCode { - case http.StatusNotFound, http.StatusForbidden: - return false, nil - case http.StatusOK: - return true, nil - default: - return false, errors.Err(res.Status) - } -} - -// Get returns the blob slice if present or errors. -func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) { - url := s.cfEndpoint + hash - log.Debugf("Getting %s from S3", hash[:8]) - defer func(t time.Time) { - log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) - }(time.Now()) - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, errors.Err(err) - } - req.Header.Add("User-Agent", "reflector.go/"+meta.Version) - res, err := http.DefaultClient.Do(req) - if err != nil { - return nil, errors.Err(err) - } - defer res.Body.Close() - - switch res.StatusCode { - case http.StatusNotFound, http.StatusForbidden: - return nil, errors.Err(ErrBlobNotFound) - case http.StatusOK: - b, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, errors.Err(err) - } - metrics.MtrInBytesS3.Add(float64(len(b))) - return b, nil - default: - return nil, errors.Err(res.Status) - } -} - -// Put stores the blob on S3 or errors if S3 store is not present. -func (s *CloudFrontBlobStore) Put(hash string, blob stream.Blob) error { - if s.s3Store != nil { - return s.s3Store.Put(hash, blob) - } - return errors.Err("not implemented in cloudfront store") -} - -// PutSD stores the sd blob on S3 or errors if S3 store is not present. -func (s *CloudFrontBlobStore) PutSD(hash string, blob stream.Blob) error { - if s.s3Store != nil { - return s.s3Store.PutSD(hash, blob) - } - return errors.Err("not implemented in cloudfront store") -} - -func (s *CloudFrontBlobStore) Delete(hash string) error { - if s.s3Store != nil { - return s.s3Store.Delete(hash) - } - return errors.Err("not implemented in cloudfront store") -} diff --git a/store/cloudfront_ro.go b/store/cloudfront_ro.go new file mode 100644 index 0000000..a914285 --- /dev/null +++ b/store/cloudfront_ro.go @@ -0,0 +1,105 @@ +package store + +import ( + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/meta" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + log "github.com/sirupsen/logrus" +) + +// CloudFrontROStore reads from cloudfront. All writes panic. +type CloudFrontROStore struct { + endpoint string // cloudflare endpoint +} + +// NewCloudFrontROStore returns an initialized CloudFrontROStore store pointer. +func NewCloudFrontROStore(endpoint string) *CloudFrontROStore { + return &CloudFrontROStore{endpoint: endpoint} +} + +const nameCloudFrontRO = "cloudfront_ro" + +// Name is the cache type name +func (c *CloudFrontROStore) Name() string { return nameCloudFrontRO } + +// Has checks if the hash is in the store. +func (c *CloudFrontROStore) Has(hash string) (bool, error) { + status, body, err := c.cfRequest(http.MethodHead, hash) + if err != nil { + return false, err + } + defer body.Close() + + switch status { + case http.StatusNotFound, http.StatusForbidden: + return false, nil + case http.StatusOK: + return true, nil + default: + return false, errors.Err("unexpected status %d", status) + } +} + +// Get gets the blob from Cloudfront. +func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) { + log.Debugf("Getting %s from S3", hash[:8]) + defer func(t time.Time) { + log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) + }(time.Now()) + + status, body, err := c.cfRequest(http.MethodGet, hash) + if err != nil { + return nil, err + } + defer body.Close() + + switch status { + case http.StatusNotFound, http.StatusForbidden: + return nil, errors.Err(ErrBlobNotFound) + case http.StatusOK: + b, err := ioutil.ReadAll(body) + if err != nil { + return nil, errors.Err(err) + } + metrics.MtrInBytesS3.Add(float64(len(b))) + return b, nil + default: + return nil, errors.Err("unexpected status %d", status) + } +} + +func (c *CloudFrontROStore) cfRequest(method, hash string) (int, io.ReadCloser, error) { + url := c.endpoint + hash + req, err := http.NewRequest(method, url, nil) + if err != nil { + return 0, nil, errors.Err(err) + } + req.Header.Add("User-Agent", "reflector.go/"+meta.Version) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return 0, nil, errors.Err(err) + } + + return res.StatusCode, res.Body, nil +} + +func (c *CloudFrontROStore) Put(_ string, _ stream.Blob) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") +} + +func (c *CloudFrontROStore) PutSD(_ string, _ stream.Blob) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") +} + +func (c *CloudFrontROStore) Delete(_ string) error { + panic("CloudFrontROStore cannot do writes. Use CloudFrontRWStore") +} diff --git a/store/cloudfront_rw.go b/store/cloudfront_rw.go new file mode 100644 index 0000000..ee771da --- /dev/null +++ b/store/cloudfront_rw.go @@ -0,0 +1,50 @@ +package store + +import ( + "github.com/lbryio/lbry.go/v2/stream" +) + +// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3. +type CloudFrontRWStore struct { + cf *CloudFrontROStore + s3 *S3Store +} + +// NewCloudFrontRWStore returns an initialized CloudFrontRWStore store pointer. +// NOTE: It panics if either argument is nil. +func NewCloudFrontRWStore(cf *CloudFrontROStore, s3 *S3Store) *CloudFrontRWStore { + if cf == nil || s3 == nil { + panic("both stores must be set") + } + return &CloudFrontRWStore{cf: cf, s3: s3} +} + +const nameCloudFrontRW = "cloudfront_rw" + +// Name is the cache type name +func (c *CloudFrontRWStore) Name() string { return nameCloudFrontRW } + +// Has checks if the hash is in the store. +func (c *CloudFrontRWStore) Has(hash string) (bool, error) { + return c.cf.Has(hash) +} + +// Get gets the blob from Cloudfront. +func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) { + return c.cf.Get(hash) +} + +// Put stores the blob on S3 +func (c *CloudFrontRWStore) Put(hash string, blob stream.Blob) error { + return c.s3.Put(hash, blob) +} + +// PutSD stores the sd blob on S3 +func (c *CloudFrontRWStore) PutSD(hash string, blob stream.Blob) error { + return c.s3.PutSD(hash, blob) +} + +// Delete deletes the blob from S3 +func (c *CloudFrontRWStore) Delete(hash string) error { + return c.s3.Delete(hash) +} diff --git a/store/dbbacked.go b/store/dbbacked.go index 1b554ca..59c47b5 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -25,6 +25,11 @@ func NewDBBackedStore(blobs BlobStore, db *db.SQL) *DBBackedStore { return &DBBackedStore{blobs: blobs, db: db} } +const nameDBBacked = "db-backed" + +// Name is the cache type name +func (d *DBBackedStore) Name() string { return nameDBBacked } + // Has returns true if the blob is in the store func (d *DBBackedStore) Has(hash string) (bool, error) { return d.db.HasBlob(hash) diff --git a/store/disk.go b/store/disk.go index 82e40ac..4ac2a0f 100644 --- a/store/disk.go +++ b/store/disk.go @@ -4,82 +4,38 @@ import ( "io/ioutil" "os" "path" - "path/filepath" - "sort" - "syscall" - "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - - log "github.com/sirupsen/logrus" + "github.com/lbryio/reflector.go/store/speedwalk" ) -// DiskBlobStore stores blobs on a local disk -type DiskBlobStore struct { +// DiskStore stores blobs on a local disk +type DiskStore struct { // the location of blobs on disk blobDir string // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. prefixLength int - initialized bool - lastChecked time.Time - diskCleanupBusy chan bool + // true if initOnce ran, false otherwise + initialized bool } -// NewDiskBlobStore returns an initialized file disk store pointer. -func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { - dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)} - dbs.diskCleanupBusy <- true - return &dbs -} - -func (d *DiskBlobStore) dir(hash string) string { - if d.prefixLength <= 0 || len(hash) < d.prefixLength { - return d.blobDir +// NewDiskStore returns an initialized file disk store pointer. +func NewDiskStore(dir string, prefixLength int) *DiskStore { + return &DiskStore{ + blobDir: dir, + prefixLength: prefixLength, } - return path.Join(d.blobDir, hash[:d.prefixLength]) } -// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path -func (d *DiskBlobStore) getUsedSpace() (float32, error) { - var stat syscall.Statfs_t - err := syscall.Statfs(d.blobDir, &stat) - if err != nil { - return 0, err - } - // Available blocks * size per block = available space in bytes - all := stat.Blocks * uint64(stat.Bsize) - free := stat.Bfree * uint64(stat.Bsize) - used := all - free +const nameDisk = "disk" - return float32(used) / float32(all), nil -} - -func (d *DiskBlobStore) path(hash string) string { - return path.Join(d.dir(hash), hash) -} - -func (d *DiskBlobStore) ensureDirExists(dir string) error { - return errors.Err(os.MkdirAll(dir, 0755)) -} - -func (d *DiskBlobStore) initOnce() error { - if d.initialized { - return nil - } - - err := d.ensureDirExists(d.blobDir) - if err != nil { - return err - } - - d.initialized = true - return nil -} +// Name is the cache type name +func (d *DiskStore) Name() string { return nameDisk } // Has returns T/F or Error if it the blob stored already. It will error with any IO disk error. -func (d *DiskBlobStore) Has(hash string) (bool, error) { +func (d *DiskStore) Has(hash string) (bool, error) { err := d.initOnce() if err != nil { return false, err @@ -90,13 +46,13 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) { if os.IsNotExist(err) { return false, nil } - return false, err + return false, errors.Err(err) } return true, nil } // Get returns the blob or an error if the blob doesn't exist. -func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { +func (d *DiskStore) Get(hash string) (stream.Blob, error) { err := d.initOnce() if err != nil { return nil, err @@ -111,11 +67,12 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) { } defer file.Close() - return ioutil.ReadAll(file) + blob, err := ioutil.ReadAll(file) + return blob, errors.Err(err) } // Put stores the blob on disk -func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { +func (d *DiskStore) Put(hash string, blob stream.Blob) error { err := d.initOnce() if err != nil { return err @@ -126,16 +83,17 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { return err } - return ioutil.WriteFile(d.path(hash), blob, 0644) + err = ioutil.WriteFile(d.path(hash), blob, 0644) + return errors.Err(err) } // PutSD stores the sd blob on the disk -func (d *DiskBlobStore) PutSD(hash string, blob stream.Blob) error { +func (d *DiskStore) PutSD(hash string, blob stream.Blob) error { return d.Put(hash, blob) } // Delete deletes the blob from the store -func (d *DiskBlobStore) Delete(hash string) error { +func (d *DiskStore) Delete(hash string) error { err := d.initOnce() if err != nil { return err @@ -149,75 +107,45 @@ func (d *DiskBlobStore) Delete(hash string) error { return nil } - return os.Remove(d.path(hash)) + err = os.Remove(d.path(hash)) + return errors.Err(err) } -func (d *DiskBlobStore) ensureDiskSpace() { - defer func() { - d.lastChecked = time.Now() - d.diskCleanupBusy <- true - }() - - used, err := d.getUsedSpace() +// list returns the hashes of blobs that already exist in the blobDir +func (d *DiskStore) list() ([]string, error) { + err := d.initOnce() if err != nil { - log.Errorln(err.Error()) - return - } - log.Infof("disk usage: %.2f%%\n", used*100) - if used > 0.90 { - log.Infoln("over 0.90, cleaning up") - err = d.WipeOldestBlobs() - if err != nil { - log.Errorln(err.Error()) - return - } - log.Infoln("Done cleaning up") + return nil, err } + + return speedwalk.AllFiles(d.blobDir, true) } -func (d *DiskBlobStore) WipeOldestBlobs() (err error) { - dirs, err := ioutil.ReadDir(d.blobDir) +func (d *DiskStore) dir(hash string) string { + if d.prefixLength <= 0 || len(hash) < d.prefixLength { + return d.blobDir + } + return path.Join(d.blobDir, hash[:d.prefixLength]) +} + +func (d *DiskStore) path(hash string) string { + return path.Join(d.dir(hash), hash) +} + +func (d *DiskStore) ensureDirExists(dir string) error { + return errors.Err(os.MkdirAll(dir, 0755)) +} + +func (d *DiskStore) initOnce() error { + if d.initialized { + return nil + } + + err := d.ensureDirExists(d.blobDir) if err != nil { return err } - type datedFile struct { - Atime time.Time - File *os.FileInfo - FullPath string - } - datedFiles := make([]datedFile, 0, 5000) - for _, dir := range dirs { - if dir.IsDir() { - files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name())) - if err != nil { - return err - } - for _, file := range files { - if file.Mode().IsRegular() && !file.IsDir() { - datedFiles = append(datedFiles, datedFile{ - Atime: atime(file), - File: &file, - FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()), - }) - } - } - } - } - sort.Slice(datedFiles, func(i, j int) bool { - return datedFiles[i].Atime.Before(datedFiles[j].Atime) - }) - //delete the first 50000 blobs - for i, df := range datedFiles { - if i >= 50000 { - break - } - log.Infoln(df.FullPath) - log.Infoln(df.Atime.String()) - err = os.Remove(df.FullPath) - if err != nil { - return err - } - } + d.initialized = true return nil } diff --git a/store/lru.go b/store/lru.go new file mode 100644 index 0000000..4ef4908 --- /dev/null +++ b/store/lru.go @@ -0,0 +1,120 @@ +package store + +import ( + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" + + golru "github.com/hashicorp/golang-lru" +) + +// LRUStore adds a max cache size and LRU eviction to a BlobStore +type LRUStore struct { + // underlying store + store BlobStore + // lru implementation + lru *golru.Cache +} + +// NewLRUStore initialize a new LRUStore +func NewLRUStore(component string, store BlobStore, maxItems int) *LRUStore { + lru, err := golru.NewWithEvict(maxItems, func(key interface{}, value interface{}) { + metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc() + _ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there + }) + if err != nil { + panic(err) + } + + l := &LRUStore{ + store: store, + lru: lru, + } + + if lstr, ok := store.(lister); ok { + err = l.loadExisting(lstr, maxItems) + if err != nil { + panic(err) // TODO: what should happen here? panic? return nil? just keep going? + } + } + + return l +} + +const nameLRU = "lru" + +// Name is the cache type name +func (l *LRUStore) Name() string { return nameLRU } + +// Has returns whether the blob is in the store, without updating the recent-ness. +func (l *LRUStore) Has(hash string) (bool, error) { + return l.lru.Contains(hash), nil +} + +// Get returns the blob or an error if the blob doesn't exist. +func (l *LRUStore) Get(hash string) (stream.Blob, error) { + _, has := l.lru.Get(hash) + if !has { + return nil, errors.Err(ErrBlobNotFound) + } + blob, err := l.store.Get(hash) + if errors.Is(err, ErrBlobNotFound) { + // Blob disappeared from underlying store + l.lru.Remove(hash) + } + return blob, err +} + +// Put stores the blob +func (l *LRUStore) Put(hash string, blob stream.Blob) error { + err := l.store.Put(hash, blob) + if err != nil { + return err + } + + l.lru.Add(hash, true) + return nil +} + +// PutSD stores the sd blob +func (l *LRUStore) PutSD(hash string, blob stream.Blob) error { + err := l.store.PutSD(hash, blob) + if err != nil { + return err + } + + l.lru.Add(hash, true) + return nil +} + +// Delete deletes the blob from the store +func (l *LRUStore) Delete(hash string) error { + err := l.store.Delete(hash) + if err != nil { + return err + } + + // This must come after store.Delete() + // Remove triggers onEvict function, which also tries to delete blob from store + // We need to delete it manually first so any errors can be propagated up + l.lru.Remove(hash) + return nil +} + +// loadExisting imports existing blobs from the underlying store into the LRU cache +func (l *LRUStore) loadExisting(store lister, maxItems int) error { + existing, err := store.list() + if err != nil { + return err + } + + added := 0 + for _, h := range existing { + l.lru.Add(h, true) + added++ + if maxItems > 0 && added >= maxItems { // underlying cache is bigger than LRU cache + break + } + } + return nil +} diff --git a/store/lru_test.go b/store/lru_test.go new file mode 100644 index 0000000..968956c --- /dev/null +++ b/store/lru_test.go @@ -0,0 +1,121 @@ +package store + +import ( + "io/ioutil" + "os" + "reflect" + "testing" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const cacheMaxBlobs = 3 + +func getTestLRUStore() (*LRUStore, *MemStore) { + m := NewMemStore() + return NewLRUStore("test", m, 3), m +} + +func TestLRUStore_Eviction(t *testing.T) { + lru, mem := getTestLRUStore() + b := []byte("x") + err := lru.Put("one", b) + require.NoError(t, err) + err = lru.Put("two", b) + require.NoError(t, err) + err = lru.Put("three", b) + require.NoError(t, err) + err = lru.Put("four", b) + require.NoError(t, err) + err = lru.Put("five", b) + require.NoError(t, err) + + assert.Equal(t, cacheMaxBlobs, len(mem.Debug())) + + for k, v := range map[string]bool{ + "one": false, + "two": false, + "three": true, + "four": true, + "five": true, + "six": false, + } { + has, err := lru.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + lru.Get("three") // touch so it stays in cache + lru.Put("six", b) + + assert.Equal(t, cacheMaxBlobs, len(mem.Debug())) + + for k, v := range map[string]bool{ + "one": false, + "two": false, + "three": true, + "four": false, + "five": true, + "six": true, + } { + has, err := lru.Has(k) + assert.NoError(t, err) + assert.Equal(t, v, has) + } + + err = lru.Delete("three") + assert.NoError(t, err) + err = lru.Delete("five") + assert.NoError(t, err) + err = lru.Delete("six") + assert.NoError(t, err) + assert.Equal(t, 0, len(mem.Debug())) +} + +func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { + lru, mem := getTestLRUStore() + hash := "hash" + b := []byte("this is a blob of stuff") + err := lru.Put(hash, b) + require.NoError(t, err) + + err = mem.Delete(hash) + require.NoError(t, err) + + // hash still exists in lru + assert.True(t, lru.lru.Contains(hash)) + + blob, err := lru.Get(hash) + assert.Nil(t, blob) + assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", + reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), + reflect.TypeOf(err).String(), err.Error()) + + // lru.Get() removes hash if underlying store doesn't have it + assert.False(t, lru.lru.Contains(hash)) +} + +func TestLRUStore_loadExisting(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "reflector_test_*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + d := NewDiskStore(tmpDir, 2) + + hash := "hash" + b := []byte("this is a blob of stuff") + err = d.Put(hash, b) + require.NoError(t, err) + + existing, err := d.list() + require.NoError(t, err) + require.Equal(t, 1, len(existing), "blob should exist in cache") + assert.Equal(t, hash, existing[0]) + + lru := NewLRUStore("test", d, 3) // lru should load existing blobs when it's created + has, err := lru.Has(hash) + require.NoError(t, err) + assert.True(t, has, "hash should be loaded from disk store but it's not") +} diff --git a/store/memory.go b/store/memory.go index 27dbc0f..f462a8d 100644 --- a/store/memory.go +++ b/store/memory.go @@ -1,29 +1,42 @@ package store import ( + "sync" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" ) -// MemoryBlobStore is an in memory only blob store with no persistence. -type MemoryBlobStore struct { +// MemStore is an in memory only blob store with no persistence. +type MemStore struct { blobs map[string]stream.Blob + mu *sync.RWMutex } -func NewMemoryBlobStore() *MemoryBlobStore { - return &MemoryBlobStore{ +func NewMemStore() *MemStore { + return &MemStore{ blobs: make(map[string]stream.Blob), + mu: &sync.RWMutex{}, } } +const nameMem = "mem" + +// Name is the cache type name +func (m *MemStore) Name() string { return nameMem } + // Has returns T/F if the blob is currently stored. It will never error. -func (m *MemoryBlobStore) Has(hash string) (bool, error) { +func (m *MemStore) Has(hash string) (bool, error) { + m.mu.RLock() + defer m.mu.RUnlock() _, ok := m.blobs[hash] return ok, nil } // Get returns the blob byte slice if present and errors if the blob is not found. -func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { +func (m *MemStore) Get(hash string) (stream.Blob, error) { + m.mu.RLock() + defer m.mu.RUnlock() blob, ok := m.blobs[hash] if !ok { return nil, errors.Err(ErrBlobNotFound) @@ -32,23 +45,29 @@ func (m *MemoryBlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob in memory -func (m *MemoryBlobStore) Put(hash string, blob stream.Blob) error { +func (m *MemStore) Put(hash string, blob stream.Blob) error { + m.mu.Lock() + defer m.mu.Unlock() m.blobs[hash] = blob return nil } // PutSD stores the sd blob in memory -func (m *MemoryBlobStore) PutSD(hash string, blob stream.Blob) error { +func (m *MemStore) PutSD(hash string, blob stream.Blob) error { return m.Put(hash, blob) } // Delete deletes the blob from the store -func (m *MemoryBlobStore) Delete(hash string) error { +func (m *MemStore) Delete(hash string) error { + m.mu.Lock() + defer m.mu.Unlock() delete(m.blobs, hash) return nil } // Debug returns the blobs in memory. It's useful for testing and debugging. -func (m *MemoryBlobStore) Debug() map[string]stream.Blob { +func (m *MemStore) Debug() map[string]stream.Blob { + m.mu.RLock() + defer m.mu.RUnlock() return m.blobs } diff --git a/store/memory_test.go b/store/memory_test.go index e359803..8d85114 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -7,8 +7,8 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" ) -func TestMemoryBlobStore_Put(t *testing.T) { - s := NewMemoryBlobStore() +func TestMemStore_Put(t *testing.T) { + s := NewMemStore() blob := []byte("abcdefg") err := s.Put("abc", blob) if err != nil { @@ -16,8 +16,8 @@ func TestMemoryBlobStore_Put(t *testing.T) { } } -func TestMemoryBlobStore_Get(t *testing.T) { - s := NewMemoryBlobStore() +func TestMemStore_Get(t *testing.T) { + s := NewMemStore() hash := "abc" blob := []byte("abcdefg") err := s.Put(hash, blob) diff --git a/store/noop.go b/store/noop.go new file mode 100644 index 0000000..9e5d815 --- /dev/null +++ b/store/noop.go @@ -0,0 +1,15 @@ +package store + +import "github.com/lbryio/lbry.go/v2/stream" + +// NoopStore is a store that does nothing +type NoopStore struct{} + +const nameNoop = "noop" + +func (n *NoopStore) Name() string { return nameNoop } +func (n *NoopStore) Has(_ string) (bool, error) { return false, nil } +func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil } +func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil } +func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil } +func (n *NoopStore) Delete(_ string) error { return nil } diff --git a/store/s3.go b/store/s3.go index e08b910..53451be 100644 --- a/store/s3.go +++ b/store/s3.go @@ -18,8 +18,8 @@ import ( log "github.com/sirupsen/logrus" ) -// S3BlobStore is an S3 store -type S3BlobStore struct { +// S3Store is an S3 store +type S3Store struct { awsID string awsSecret string region string @@ -28,9 +28,9 @@ type S3BlobStore struct { session *session.Session } -// NewS3BlobStore returns an initialized S3 store pointer. -func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { - return &S3BlobStore{ +// NewS3Store returns an initialized S3 store pointer. +func NewS3Store(awsID, awsSecret, region, bucket string) *S3Store { + return &S3Store{ awsID: awsID, awsSecret: awsSecret, region: region, @@ -38,25 +38,13 @@ func NewS3BlobStore(awsID, awsSecret, region, bucket string) *S3BlobStore { } } -func (s *S3BlobStore) initOnce() error { - if s.session != nil { - return nil - } +const nameS3 = "s3" - sess, err := session.NewSession(&aws.Config{ - Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), - Region: aws.String(s.region), - }) - if err != nil { - return err - } - - s.session = sess - return nil -} +// Name is the cache type name +func (s *S3Store) Name() string { return nameS3 } // Has returns T/F or Error ( from S3 ) if the store contains the blob. -func (s *S3BlobStore) Has(hash string) (bool, error) { +func (s *S3Store) Has(hash string) (bool, error) { err := s.initOnce() if err != nil { return false, err @@ -77,7 +65,7 @@ func (s *S3BlobStore) Has(hash string) (bool, error) { } // Get returns the blob slice if present or errors on S3. -func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { +func (s *S3Store) Get(hash string) (stream.Blob, error) { //Todo-Need to handle error for blob doesn't exist for consistency. err := s.initOnce() if err != nil { @@ -110,7 +98,7 @@ func (s *S3BlobStore) Get(hash string) (stream.Blob, error) { } // Put stores the blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { +func (s *S3Store) Put(hash string, blob stream.Blob) error { err := s.initOnce() if err != nil { return err @@ -133,12 +121,12 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { } // PutSD stores the sd blob on S3 or errors if S3 connection errors. -func (s *S3BlobStore) PutSD(hash string, blob stream.Blob) error { +func (s *S3Store) PutSD(hash string, blob stream.Blob) error { //Todo - handle missing stream for consistency return s.Put(hash, blob) } -func (s *S3BlobStore) Delete(hash string) error { +func (s *S3Store) Delete(hash string) error { err := s.initOnce() if err != nil { return err @@ -153,3 +141,20 @@ func (s *S3BlobStore) Delete(hash string) error { return err } + +func (s *S3Store) initOnce() error { + if s.session != nil { + return nil + } + + sess, err := session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials(s.awsID, s.awsSecret, ""), + Region: aws.String(s.region), + }) + if err != nil { + return err + } + + s.session = sess + return nil +} diff --git a/store/singleflight.go b/store/singleflight.go new file mode 100644 index 0000000..fbe314f --- /dev/null +++ b/store/singleflight.go @@ -0,0 +1,67 @@ +package store + +import ( + "time" + + "github.com/lbryio/reflector.go/internal/metrics" + + "github.com/lbryio/lbry.go/v2/stream" + + "golang.org/x/sync/singleflight" +) + +func WithSingleFlight(component string, origin BlobStore) BlobStore { + return &singleflightStore{ + BlobStore: origin, + component: component, + sf: new(singleflight.Group), + } +} + +type singleflightStore struct { + BlobStore + + component string + sf *singleflight.Group +} + +func (s *singleflightStore) Name() string { + return "sf_" + s.BlobStore.Name() +} + +// Get ensures that only one request per hash is sent to the origin at a time, +// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem +func (s *singleflightStore) Get(hash string) (stream.Blob, error) { + metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() + defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() + + blob, err, _ := s.sf.Do(hash, s.getter(hash)) + if err != nil { + return nil, err + } + return blob.(stream.Blob), nil +} + +// getter returns a function that gets a blob from the origin +// only one getter per hash will be executing at a time +func (s *singleflightStore) getter(hash string) func() (interface{}, error) { + return func() (interface{}, error) { + metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Inc() + defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.BlobStore.Name(), s.component)).Dec() + + start := time.Now() + blob, err := s.BlobStore.Get(hash) + if err != nil { + return nil, err + } + + rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() + metrics.CacheRetrievalSpeed.With(map[string]string{ + metrics.LabelCacheType: s.BlobStore.Name(), + metrics.LabelComponent: s.component, + metrics.LabelSource: "origin", + }).Set(rate) + + return blob, nil + } +} diff --git a/store/speedwalk/speedwalk.go b/store/speedwalk/speedwalk.go new file mode 100644 index 0000000..e2563ba --- /dev/null +++ b/store/speedwalk/speedwalk.go @@ -0,0 +1,89 @@ +package speedwalk + +import ( + "io/ioutil" + "path/filepath" + "runtime" + "sync" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/karrick/godirwalk" + "github.com/sirupsen/logrus" +) + +// AllFiles recursively lists every file in every subdirectory of a given directory +// If basename is true, return the basename of each file. Otherwise return the full path starting at startDir. +func AllFiles(startDir string, basename bool) ([]string, error) { + items, err := ioutil.ReadDir(startDir) + if err != nil { + return nil, err + } + + pathChan := make(chan string) + paths := make([]string, 0, 1000) + pathWG := &sync.WaitGroup{} + pathWG.Add(1) + go func() { + defer pathWG.Done() + for { + path, ok := <-pathChan + if !ok { + return + } + paths = append(paths, path) + } + }() + + maxThreads := runtime.NumCPU() - 1 + goroutineLimiter := make(chan struct{}, maxThreads) + for i := 0; i < maxThreads; i++ { + goroutineLimiter <- struct{}{} + } + + walkerWG := &sync.WaitGroup{} + for _, item := range items { + if !item.IsDir() { + if basename { + pathChan <- item.Name() + } else { + pathChan <- filepath.Join(startDir, item.Name()) + } + continue + } + + <-goroutineLimiter + walkerWG.Add(1) + + go func(dir string) { + defer func() { + walkerWG.Done() + goroutineLimiter <- struct{}{} + }() + + err = godirwalk.Walk(filepath.Join(startDir, dir), &godirwalk.Options{ + Unsorted: true, // faster this way + Callback: func(osPathname string, de *godirwalk.Dirent) error { + if de.IsRegular() { + if basename { + pathChan <- de.Name() + } else { + pathChan <- osPathname + } + } + return nil + }, + }) + if err != nil { + logrus.Errorf(errors.FullTrace(err)) + } + }(item.Name()) + } + + walkerWG.Wait() + + close(pathChan) + pathWG.Wait() + + return paths, nil +} diff --git a/store/store.go b/store/store.go index e2614e5..200ff61 100644 --- a/store/store.go +++ b/store/store.go @@ -7,15 +7,17 @@ import ( // BlobStore is an interface for handling blob storage. type BlobStore interface { - // Does blob exist in the store + // Name of blob store (useful for metrics) + Name() string + // Does blob exist in the store. Has(hash string) (bool, error) - // Get the blob from the store + // Get the blob from the store. Must return ErrBlobNotFound if blob is not in store. Get(hash string) (stream.Blob, error) - // Put the blob into the store + // Put the blob into the store. Put(hash string, blob stream.Blob) error - // Put an SD blob into the store + // Put an SD blob into the store. PutSD(hash string, blob stream.Blob) error - // Delete the blob from the store + // Delete the blob from the store. Delete(hash string) error } @@ -27,5 +29,11 @@ type Blocklister interface { Wants(hash string) (bool, error) } +// lister is a store that can list cached blobs. This is helpful when an overlay +// cache needs to track blob existence. +type lister interface { + list() ([]string, error) +} + //ErrBlobNotFound is a standard error when a blob is not found in the store. var ErrBlobNotFound = errors.Base("blob not found")