Smarter caches #46

Merged
lyoshenka merged 13 commits from smarter_caches into master 2020-11-04 22:04:22 +01:00
7 changed files with 208 additions and 119 deletions
Showing only changes of commit 69fa06420b - Show all commits

View file

@ -30,7 +30,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
s := store.NewCachingBlobStore(
peer.NewStore(peer.StoreOpts{Address: addr}),
store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 2),
store.NewDiskBlobStore("/tmp/lbry_downloaded_blobs", 1000, 2),
)
wd, err := os.Getwd()

View file

@ -19,18 +19,21 @@ import (
"github.com/spf13/cobra"
)
var reflectorCmdCacheDir string
var tcpPeerPort int
var http3PeerPort int
var receiverPort int
var metricsPort int
var disableUploads bool
var disableBlocklist bool
var proxyAddress string
var proxyPort string
var proxyProtocol string
var useDB bool
var cloudFrontEndpoint string
var (
tcpPeerPort int
http3PeerPort int
receiverPort int
metricsPort int
disableUploads bool
disableBlocklist bool
proxyAddress string
proxyPort string
proxyProtocol string
useDB bool
cloudFrontEndpoint string
reflectorCmdCacheDir string
reflectorCmdCacheMaxBlobs int
)
func init() {
var cmd = &cobra.Command{
@ -38,7 +41,6 @@ func init() {
Short: "Run reflector server",
Run: reflectorCmd,
}
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)")
cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)")
@ -50,6 +52,8 @@ func init() {
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "if specified, the path where blobs should be cached (disabled when left empty)")
cmd.Flags().IntVar(&reflectorCmdCacheMaxBlobs, "cache-max-blobs", 0, "if cache is enabled, this option sets the max blobs the cache will hold")
rootCmd.AddCommand(cmd)
}
@ -113,7 +117,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
if err != nil {
log.Fatal(err)
}
blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, 2))
blobStore = store.NewCachingBlobStore(blobStore, store.NewDiskBlobStore(reflectorCmdCacheDir, reflectorCmdCacheMaxBlobs, 2))
}
peerServer := peer.NewServer(blobStore)

5
go.mod
View file

@ -14,7 +14,7 @@ require (
github.com/google/gops v0.3.7
github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/memberlist v0.1.4 // indirect
github.com/hashicorp/serf v0.8.2
github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf
@ -27,9 +27,11 @@ require (
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/afero v1.4.1
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.4.0
github.com/volatiletech/null v8.0.0+incompatible
go.uber.org/atomic v1.5.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
@ -37,6 +39,7 @@ require (
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 // indirect
google.golang.org/appengine v1.6.2 // indirect
gotest.tools v2.2.0+incompatible
)
go 1.15

9
go.sum
View file

@ -152,6 +152,8 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce h1:xdsDDbiBDQTKASoGEZ+pEmF1OnWuu8AQ9I8iNbHNeno=
github.com/hashicorp/hcl v0.0.0-20180404174102-ef8a98b0bbce/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
@ -192,6 +194,7 @@ github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -280,6 +283,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
@ -346,6 +350,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spf13/afero v1.1.1 h1:Lt3ihYMlE+lreX1GS4Qw4ZsNpYQLxIXKBTEOXm3nt6I=
github.com/spf13/afero v1.1.1/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.4.1 h1:asw9sl74539yqavKaglDM5hFpdJVK0Y5Dr/JOgQ89nQ=
github.com/spf13/afero v1.4.1/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I=
github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
@ -394,6 +400,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -471,6 +478,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 h1:xQwXv67TxFo9nC1GJFyab5eq/5B590r6RlnL/G8Sz7w=

View file

@ -92,6 +92,12 @@ const (
)
var (
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "error_total",
Help: "Total number of errors",
}, []string{labelDirection, labelErrorType})
BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "blob_download_total",
@ -107,6 +113,7 @@ var (
Name: "http3_blob_download_total",
Help: "Total number of blobs downloaded from reflector through QUIC protocol",
})
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "cache_hit_total",
@ -128,6 +135,7 @@ var (
Name: "cache_waiting_requests_total",
Help: "How many cache requests are waiting for an in-flight origin request",
})
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "blob_upload_total",
@ -138,16 +146,13 @@ var (
Name: "sdblob_upload_total",
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
})
RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "speed_mbps",
Help: "Speed of blob retrieval",
}, []string{MtrLabelSource})
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "error_total",
Help: "Total number of errors",
}, []string{labelDirection, labelErrorType})
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "tcp_in_bytes",

View file

@ -5,32 +5,40 @@ import (
"os"
"path"
"path/filepath"
"sort"
"syscall"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/spf13/afero"
log "github.com/sirupsen/logrus"
lru "github.com/hashicorp/golang-lru"
)
// DiskBlobStore stores blobs on a local disk
type DiskBlobStore struct {
// the location of blobs on disk
blobDir string
// max number of blobs to store
maxBlobs int
// store files in subdirectories based on the first N chars in the filename. 0 = don't create subdirectories.
prefixLength int
initialized bool
lastChecked time.Time
diskCleanupBusy chan bool
// lru cache
lru *lru.Cache
// filesystem abstraction
fs afero.Fs
// true if initOnce ran, false otherwise
initialized bool
}
// NewDiskBlobStore returns an initialized file disk store pointer.
func NewDiskBlobStore(dir string, prefixLength int) *DiskBlobStore {
dbs := DiskBlobStore{blobDir: dir, prefixLength: prefixLength, diskCleanupBusy: make(chan bool, 1)}
dbs.diskCleanupBusy <- true
func NewDiskBlobStore(dir string, maxBlobs, prefixLength int) *DiskBlobStore {
dbs := DiskBlobStore{
blobDir: dir,
maxBlobs: maxBlobs,
prefixLength: prefixLength,
fs: afero.NewOsFs(),
}
return &dbs
}
@ -41,27 +49,12 @@ func (d *DiskBlobStore) dir(hash string) string {
return path.Join(d.blobDir, hash[:d.prefixLength])
}
// GetUsedSpace returns a value between 0 and 1, with 0 being completely empty and 1 being full, for the disk that holds the provided path
func (d *DiskBlobStore) getUsedSpace() (float32, error) {
var stat syscall.Statfs_t
err := syscall.Statfs(d.blobDir, &stat)
if err != nil {
return 0, err
}
// Available blocks * size per block = available space in bytes
all := stat.Blocks * uint64(stat.Bsize)
free := stat.Bfree * uint64(stat.Bsize)
used := all - free
return float32(used) / float32(all), nil
}
func (d *DiskBlobStore) path(hash string) string {
return path.Join(d.dir(hash), hash)
}
func (d *DiskBlobStore) ensureDirExists(dir string) error {
return errors.Err(os.MkdirAll(dir, 0755))
return errors.Err(d.fs.MkdirAll(dir, 0755))
}
func (d *DiskBlobStore) initOnce() error {
@ -74,6 +67,19 @@ func (d *DiskBlobStore) initOnce() error {
return err
}
l, err := lru.NewWithEvict(d.maxBlobs, func(key interface{}, value interface{}) {
_ = d.fs.Remove(d.path(key.(string))) // TODO: log this error. may happen if file is gone but cache entry still there?
})
if err != nil {
return errors.Err(err)
}
d.lru = l
err = d.loadExisting()
if err != nil {
return err
}
d.initialized = true
return nil
}
@ -85,14 +91,7 @@ func (d *DiskBlobStore) Has(hash string) (bool, error) {
return false, err
}
_, err = os.Stat(d.path(hash))
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
return d.lru.Contains(hash), nil
}
// Get returns the blob or an error if the blob doesn't exist.
@ -102,9 +101,15 @@ func (d *DiskBlobStore) Get(hash string) (stream.Blob, error) {
return nil, err
}
file, err := os.Open(d.path(hash))
_, has := d.lru.Get(hash)
if !has {
return nil, errors.Err(ErrBlobNotFound)
}
file, err := d.fs.Open(d.path(hash))
if err != nil {
if os.IsNotExist(err) {
d.lru.Remove(hash)
return nil, errors.Err(ErrBlobNotFound)
}
return nil, err
@ -126,7 +131,14 @@ func (d *DiskBlobStore) Put(hash string, blob stream.Blob) error {
return err
}
return ioutil.WriteFile(d.path(hash), blob, 0644)
err = afero.WriteFile(d.fs, d.path(hash), blob, 0644)
if err != nil {
return errors.Err(err)
}
d.lru.Add(hash, true)
return nil
}
// PutSD stores the sd blob on the disk
@ -141,83 +153,30 @@ func (d *DiskBlobStore) Delete(hash string) error {
return err
}
has, err := d.Has(hash)
d.lru.Remove(hash)
return nil
}
// loadExisting scans the blobDir and imports existing blobs into lru cache
func (d *DiskBlobStore) loadExisting() error {
dirs, err := afero.ReadDir(d.fs, d.blobDir)
if err != nil {
return err
}
if !has {
return nil
}
return os.Remove(d.path(hash))
}
func (d *DiskBlobStore) ensureDiskSpace() {
defer func() {
d.lastChecked = time.Now()
d.diskCleanupBusy <- true
}()
used, err := d.getUsedSpace()
if err != nil {
log.Errorln(err.Error())
return
}
log.Infof("disk usage: %.2f%%\n", used*100)
if used > 0.90 {
log.Infoln("over 0.90, cleaning up")
err = d.WipeOldestBlobs()
if err != nil {
log.Errorln(err.Error())
return
}
log.Infoln("Done cleaning up")
}
}
func (d *DiskBlobStore) WipeOldestBlobs() (err error) {
dirs, err := ioutil.ReadDir(d.blobDir)
if err != nil {
return err
}
type datedFile struct {
Atime time.Time
File *os.FileInfo
FullPath string
}
datedFiles := make([]datedFile, 0, 5000)
for _, dir := range dirs {
if dir.IsDir() {
files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
files, err := afero.ReadDir(d.fs, filepath.Join(d.blobDir, dir.Name()))
if err != nil {
return err
}
for _, file := range files {
if file.Mode().IsRegular() && !file.IsDir() {
datedFiles = append(datedFiles, datedFile{
Atime: atime(file),
File: &file,
FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()),
})
d.lru.Add(file.Name(), true)
}
}
}
}
sort.Slice(datedFiles, func(i, j int) bool {
return datedFiles[i].Atime.Before(datedFiles[j].Atime)
})
//delete the first 50000 blobs
for i, df := range datedFiles {
if i >= 50000 {
break
}
log.Infoln(df.FullPath)
log.Infoln(df.Atime.String())
err = os.Remove(df.FullPath)
if err != nil {
return err
}
}
return nil
}

109
store/disk_test.go Normal file
View file

@ -0,0 +1,109 @@
package store
import (
"os"
"reflect"
"testing"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const cacheMaxBlobs = 3
func memDiskStore() *DiskBlobStore {
d := NewDiskBlobStore("/", cacheMaxBlobs, 2)
d.fs = afero.NewMemMapFs()
return d
}
func countOnDisk(t *testing.T, fs afero.Fs) int {
t.Helper()
count := 0
afero.Walk(fs, "/", func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Fatal(err)
}
if !info.IsDir() {
count++
}
return nil
})
return count
}
func TestDiskBlobStore_LRU(t *testing.T) {
d := memDiskStore()
b := []byte("x")
err := d.Put("one", b)
require.NoError(t, err)
err = d.Put("two", b)
require.NoError(t, err)
err = d.Put("three", b)
require.NoError(t, err)
err = d.Put("four", b)
require.NoError(t, err)
err = d.Put("five", b)
require.NoError(t, err)
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs))
for k, v := range map[string]bool{
"one": false,
"two": false,
"three": true,
"four": true,
"five": true,
"six": false,
} {
has, err := d.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
d.Get("three") // touch so it stays in cache
d.Put("six", b)
assert.Equal(t, cacheMaxBlobs, countOnDisk(t, d.fs))
for k, v := range map[string]bool{
"one": false,
"two": false,
"three": true,
"four": false,
"five": true,
"six": true,
} {
has, err := d.Has(k)
assert.NoError(t, err)
assert.Equal(t, v, has)
}
err = d.Delete("three")
assert.NoError(t, err)
err = d.Delete("five")
assert.NoError(t, err)
err = d.Delete("six")
assert.NoError(t, err)
assert.Equal(t, 0, countOnDisk(t, d.fs))
}
func TestDiskBlobStore_FileMissingOnDisk(t *testing.T) {
d := memDiskStore()
hash := "hash"
b := []byte("this is a blob of stuff")
err := d.Put(hash, b)
require.NoError(t, err)
err = d.fs.Remove("/ha/hash")
require.NoError(t, err)
blob, err := d.Get(hash)
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
reflect.TypeOf(err).String(), err.Error())
}