when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50
4 changed files with 27 additions and 18 deletions
|
@ -16,8 +16,10 @@ import (
|
|||
"github.com/lbryio/reflector.go/reflector"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
"github.com/c2h5oh/datasize"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
|
@ -56,9 +58,9 @@ func init() {
|
|||
cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating")
|
||||
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
|
||||
cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "",
|
||||
"enable disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
|
||||
"enable disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
|
||||
cmd.Flags().StringVar(&bufferReflectorCmdDiskCache, "buffer-disk-cache", "",
|
||||
"enable buffer disk cache, setting max size and path where to store blobs. format is 'MAX_BLOBS:CACHE_PATH'")
|
||||
"enable buffer disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'")
|
||||
cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs")
|
||||
rootCmd.AddCommand(cmd)
|
||||
}
|
||||
|
@ -151,7 +153,9 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
|||
wrapped := s
|
||||
|
||||
diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache)
|
||||
cacheMaxSizeInBytes := float64(diskCacheMaxSize * 2 * 1000 * 1000)
|
||||
//we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte
|
||||
// so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup
|
||||
realCacheSize := float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
|
||||
if diskCacheMaxSize > 0 {
|
||||
err := os.MkdirAll(diskCachePath, os.ModePerm)
|
||||
if err != nil {
|
||||
|
@ -160,12 +164,12 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
|||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLFUDAStore("hdd", store.NewDiskStore(diskCachePath, 2), cacheMaxSizeInBytes),
|
||||
store.NewLFUDAStore("hdd", store.NewDiskStore(diskCachePath, 2), realCacheSize),
|
||||
)
|
||||
}
|
||||
|
||||
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
|
||||
cacheMaxSizeInBytes = float64(diskCacheMaxSize * 2 * 1000 * 1000)
|
||||
realCacheSize = float64(diskCacheMaxSize) / float64(stream.MaxBlobSize)
|
||||
if diskCacheMaxSize > 0 {
|
||||
err := os.MkdirAll(diskCachePath, os.ModePerm)
|
||||
if err != nil {
|
||||
|
@ -174,7 +178,7 @@ func wrapWithCache(s store.BlobStore) store.BlobStore {
|
|||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), cacheMaxSizeInBytes),
|
||||
store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -199,15 +203,19 @@ func diskCacheParams(diskParams string) (int, string) {
|
|||
log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string")
|
||||
}
|
||||
|
||||
maxSize := cast.ToInt(parts[0])
|
||||
if maxSize <= 0 {
|
||||
log.Fatalf("--disk-cache max size must be more than 0")
|
||||
}
|
||||
|
||||
diskCacheSize := parts[0]
|
||||
path := parts[1]
|
||||
if len(path) == 0 || path[0] != '/' {
|
||||
log.Fatalf("--disk-cache path must start with '/'")
|
||||
}
|
||||
|
||||
return maxSize, path
|
||||
var maxSize datasize.ByteSize
|
||||
err := maxSize.UnmarshalText([]byte(diskCacheSize))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if maxSize <= 0 {
|
||||
log.Fatal("--disk-cache size must be more than 0")
|
||||
}
|
||||
return int(maxSize), path
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -8,12 +8,12 @@ require (
|
|||
github.com/bparli/lfuda-go v0.3.0
|
||||
github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3
|
||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d
|
||||
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/go-sql-driver/mysql v1.4.1
|
||||
github.com/golang/protobuf v1.4.2
|
||||
github.com/google/btree v1.0.0 // indirect
|
||||
github.com/google/gops v0.3.7
|
||||
github.com/google/martian v2.1.0+incompatible
|
||||
github.com/gorilla/mux v1.7.4
|
||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
|
|
4
go.sum
4
go.sum
|
@ -41,6 +41,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3
|
|||
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
|
||||
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
|
||||
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
|
||||
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 h1:t8KYCwSKsOEZBFELI4Pn/phbp38iJ1RRAkDFNin1aak=
|
||||
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
|
||||
github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE=
|
||||
github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ=
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
|
||||
|
@ -223,8 +225,6 @@ github.com/lbryio/types v0.0.0-20191009145016-1bb8107e04f8/go.mod h1:CG3wsDv5BiV
|
|||
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec h1:2xk/qg4VTOCJ8RzV/ED5AKqDcJ00zVb08ltf9V+sr3c=
|
||||
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec/go.mod h1:CG3wsDv5BiVYQd5i1Jp7wGsaVyjZTJshqXeWMVKsISE=
|
||||
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lucas-clemente/quic-go v0.19.1 h1:J9TkQJGJVOR3UmGhd4zdVYwKSA0EoXbLRf15uQJ6gT4=
|
||||
github.com/lucas-clemente/quic-go v0.19.1/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
|
||||
github.com/lucas-clemente/quic-go v0.19.2 h1:w8BBYUx5Z+kNpeaOeQW/KzcNsKWhh4O6PeQhb0nURPg=
|
||||
github.com/lucas-clemente/quic-go v0.19.2/go.mod h1:ZUygOqIoai0ASXXLJ92LTnKdbqh9MHCLTX6Nr1jUrK0=
|
||||
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// LRUStore adds a max cache size and LRU eviction to a BlobStore
|
||||
|
@ -21,14 +22,13 @@ func NewLFUDAStore(component string, store BlobStore, maxSize float64) *LFUDASto
|
|||
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc()
|
||||
_ = store.Delete(key.(string)) // TODO: log this error. may happen if underlying entry is gone but cache entry still there
|
||||
})
|
||||
lfuda.Age()
|
||||
l := &LFUDAStore{
|
||||
store: store,
|
||||
lfuda: lfuda,
|
||||
}
|
||||
go func() {
|
||||
if lstr, ok := store.(lister); ok {
|
||||
err := l.loadExisting(lstr, int(maxSize/2000000.0))
|
||||
err := l.loadExisting(lstr, int(maxSize))
|
||||
if err != nil {
|
||||
panic(err) // TODO: what should happen here? panic? return nil? just keep going?
|
||||
}
|
||||
|
@ -101,6 +101,7 @@ func (l *LFUDAStore) Delete(hash string) error {
|
|||
|
||||
// loadExisting imports existing blobs from the underlying store into the LRU cache
|
||||
func (l *LFUDAStore) loadExisting(store lister, maxItems int) error {
|
||||
logrus.Infof("loading at most %d items", maxItems)
|
||||
existing, err := store.list()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in a new issue