From 4a5a148843c81065c49ed9e4959cce5d7e6414dd Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 13 Nov 2019 18:50:49 -0500 Subject: [PATCH 1/2] implement disk cleanup --- cmd/reflector.go | 4 +- store/disk.go | 114 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 3 deletions(-) diff --git a/cmd/reflector.go b/cmd/reflector.go index 3e8564d..e922c51 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -35,6 +35,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { useDB := true s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + diskStore := store.NewDiskBlobStore("/home/lbry/lbry_downloaded_blobs", 2) var err error @@ -59,8 +60,9 @@ func reflectorCmd(cmd *cobra.Command, args []string) { log.Fatal(err) } } + cacheStore := store.NewCachingBlobStore(blobStore, diskStore) - peerServer := peer.NewServer(blobStore) + peerServer := peer.NewServer(cacheStore) err = peerServer.Start(":5567") if err != nil { log.Fatal(err) diff --git a/store/disk.go b/store/disk.go index 0d9342c..0dbc30d 100644 --- a/store/disk.go +++ b/store/disk.go @@ -4,9 +4,14 @@ import ( "io/ioutil" "os" "path" + "sort" + "syscall" + "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + + log "github.com/sirupsen/logrus" ) // DiskBlobStore stores blobs on a local disk @@ -16,12 +21,16 @@ type DiskBlobStore struct { // store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories. prefixLength int - initialized bool + initialized bool + lastChecked time.Time + diskCleanupBusy chan bool } // NewDiskBlobStore returns an initialized file disk store pointer. func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore { - return &DiskBlobStore{blobDir: dir, prefixLength: prefixLength} + dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)} + dbs.diskCleanupBusy <- true + return &dbs } func (d *DiskBlobStore) dir(hash string) string { @@ -31,6 +40,21 @@ func (d *DiskBlobStore) dir(hash string) string { return path.Join(d.blobDir, hash[:d.prefixLength]) } +// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path +func (d *DiskBlobStore) getUsedSpace() (float32, error) { + var stat syscall.Statfs_t + err := syscall.Statfs(d.blobDir, &stat) + if err != nil { + return 0, err + } + // Available blocks * size per block = available space in bytes + all := stat.Blocks * uint64(stat.Bsize) + free := stat.Bfree * uint64(stat.Bsize) + used := all - free + + return float32(used) / float32(all), nil +} + func (d *DiskBlobStore) path(hash string) string { return path.Join(d.dir(hash), hash) } @@ -99,6 +123,14 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error { if err != nil { return err } + select { + case <-d.diskCleanupBusy: + if time.Since(d.lastChecked) > 5*time.Minute { + go d.ensureDiskSpace() + } + default: + break + } return ioutil.WriteFile(d.path(hash), blob, 0644) } @@ -125,3 +157,81 @@ func (d *DiskBlobStore) Delete(hash string) error { return os.Remove(d.path(hash)) } + +func (d *DiskBlobStore) ensureDiskSpace() { + defer func() { + d.lastChecked = time.Now() + d.diskCleanupBusy <- true + }() + + used, err := d.getUsedSpace() + if err != nil { + log.Errorln(err.Error()) + return + } + log.Infof("disk usage: %.2f%%\n", used*100) + if used > 0.90 { + log.Infoln("over 0.90, cleaning up") + err = d.WipeOldestBlobs() + if err != nil { + log.Errorln(err.Error()) + return + } + log.Infoln("Done cleaning up") + } +} + +func (d *DiskBlobStore) WipeOldestBlobs() (err error) { + dirs, err := ioutil.ReadDir(d.blobDir) + if err != nil { + return err + } + type datedFile struct { + Atime time.Time + File *os.FileInfo + FullPath string + } + datedFiles := make([]datedFile, 0, 500) + for _, dir := range dirs { + if dir.IsDir() { + files, err := ioutil.ReadDir(d.blobDir + "/" + dir.Name()) + if err != nil { + return err + } + for _, file := range files { + if file.Mode().IsRegular() && !file.IsDir() { + datedFiles = append(datedFiles, datedFile{ + Atime: atime(file), + File: &file, + FullPath: d.blobDir + "/" + dir.Name() + "/" + file.Name(), + }) + } + } + } + } + + sort.Slice(datedFiles, func(i, j int) bool { + return datedFiles[i].Atime.Before(datedFiles[j].Atime) + }) + //delete the first 50000 blobs + for i, df := range datedFiles { + if i >= 50000 { + break + } + log.Infoln(df.FullPath) + log.Infoln(df.Atime.String()) + err = os.Remove(df.FullPath) + if err != nil { + return err + } + } + return nil +} + +func timespecToTime(ts syscall.Timespec) time.Time { + return time.Unix(int64(ts.Sec), int64(ts.Nsec)) +} + +func atime(fi os.FileInfo) time.Time { + return timespecToTime(fi.Sys().(*syscall.Stat_t).Atim) +} From dde93a1fe6a50dd2b0610cd45f2856995adb48d2 Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Thu, 27 Feb 2020 14:52:48 -0500 Subject: [PATCH 2/2] make cache dir a cli flag --- cmd/reflector.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/cmd/reflector.go b/cmd/reflector.go index e922c51..8177d1e 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -18,12 +18,15 @@ import ( "github.com/spf13/cobra" ) +var reflectorCmdCacheDir string + func init() { var cmd = &cobra.Command{ Use: "reflector", Short: "Run reflector server", Run: reflectorCmd, } + cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory") rootCmd.AddCommand(cmd) } @@ -35,7 +38,6 @@ func reflectorCmd(cmd *cobra.Command, args []string) { useDB := true s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - diskStore := store.NewDiskBlobStore("/home/lbry/lbry_downloaded_blobs", 2) var err error @@ -60,9 +62,16 @@ func reflectorCmd(cmd *cobra.Command, args []string) { log.Fatal(err) } } - cacheStore := store.NewCachingBlobStore(blobStore, diskStore) - peerServer := peer.NewServer(cacheStore) + if reflectorCmdCacheDir != "" { + err = os.MkdirAll(reflectorCmdCacheDir, os.ModePerm) + if err != nil { + log.Fatal(err) + } + blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2)) + } + + peerServer := peer.NewServer(blobStore) err = peerServer.Start(":5567") if err != nil { log.Fatal(err)