Ittt #52

Merged
anbsky merged 62 commits from ittt into master 2021-07-24 02:35:22 +02:00
4 changed files with 27 additions and 18 deletions
Showing only changes of commit ff13d7b2f7 - Show all commits

View file

@ -16,8 +16,10 @@ import (
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/c2h5oh/datasize"
log "github.com/sirupsen/logrus"
"github.com/spf13/cast"
"github.com/spf13/cobra"
)
@ -56,9 +58,9 @@ func init() {
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
"enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
"enable disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
cmd.Flags().StringVar(&bufferReflectorCmdDiskCache, "buffer-disk-cache", "",
"enable buffer disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
"enable buffer disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
rootCmd.AddCommand(cmd)
}
@ -151,7 +153,9 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped := s
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
cacheMaxSizeInBytes := float64(diskCacheMaxSize * 2 * 1000 * 1000)
//we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte
// so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup
realCacheSize := float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
@ -160,12 +164,12 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLFUDAStore("hdd", store.NewDiskStore(diskCachePath, 2), cacheMaxSizeInBytes),
store.NewLFUDAStore("hdd", store.NewDiskStore(diskCachePath, 2), realCacheSize),
)
}
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
cacheMaxSizeInBytes = float64(diskCacheMaxSize * 2 * 1000 * 1000)
realCacheSize = float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
if diskCacheMaxSize > 0 {
err := os.MkdirAll(diskCachePath, os.ModePerm)
if err != nil {
@ -174,7 +178,7 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
wrapped = store.NewCachingStore(
"reflector",
wrapped,
store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), cacheMaxSizeInBytes),
store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize),
)
}
@ -199,15 +203,19 @@ func diskCacheParams(diskParams string) (int, string) {
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
}
maxSize := cast.ToInt(parts[0])
if maxSize <= 0 {
log.Fatalf("--disk-cache max size must be more than 0")
}
diskCacheSize := parts[0]
path := parts[1]
if len(path) == 0 || path[0] != '/' {
log.Fatalf("--disk-cache path must start with '/'")
}
return maxSize, path
var maxSize datasize.ByteSize
err := maxSize.UnmarshalText([]byte(diskCacheSize))
if err != nil {
log.Fatal(err)
}
if maxSize <= 0 {
log.Fatal("--disk-cache size must be more than 0")
}
return int(maxSize), path
}

2
go.mod
View file

@ -8,12 +8,12 @@ require (
github.com/bparli/lfuda-go v0.3.0
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2
github.com/davecgh/go-spew v1.1.1
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0 // indirect
github.com/google/gops v0.3.7
github.com/google/martian v2.1.0+incompatible
github.com/gorilla/mux v1.7.4
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/golang-lru v0.5.4

4
go.sum
View file

@ -41,6 +41,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 h1:t8KYCwSKsOEZBFELI4Pn/phbp38iJ1RRAkDFNin1aak=
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
@ -219,8 +221,6 @@ github.com/lbryio/types v0.0.0-20190422033210-321fb2abda9c/go.mod h1:CG3wsDv5BiV
github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386 h1:JOQkGpeCM9FWkEHRx+kRPqySPCXElNW1em1++7tVS4M=
github.com/lbryio/types v0.0.0-20201019032447-f0b4476ef386/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lucas-clemente/quic-go v0.19.1 h1:J9TkQJGJVOR3UmGhd4zdVYwKSA0EoXbLRf15uQJ6gT4=
github.com/lucas-clemente/quic-go v0.19.1/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
github.com/lucas-clemente/quic-go v0.19.2 h1:w8BBYUx5Z+kNpeaOeQW/KzcNsKWhh4O6PeQhb0nURPg=
github.com/lucas-clemente/quic-go v0.19.2/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=

View file

@ -5,6 +5,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/sirupsen/logrus"
)
// LRUStore adds a max cache size and LRU eviction to a BlobStore
@ -21,14 +22,13 @@ func NewLFUDAStore(component string, store BlobStore, maxSize float64) *LFUDASto
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
})
lfuda.Age()
l := &LFUDAStore{
store: store,
lfuda: lfuda,
}
go func() {
if lstr, ok := store.(lister); ok {
err := l.loadExisting(lstr, int(maxSize/2000000.0))
err := l.loadExisting(lstr, int(maxSize))
if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
}
@ -101,6 +101,7 @@ func (l *LFUDAStore) Delete(hash string) error {
// loadExisting imports existing blobs from the underlying store into the LRU cache
func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
logrus.Infof("loading at most %d items", maxItems)
existing, err := store.list()
if err != nil {
return err