From febfc51cb0929effd49eb7986bc7df6b2ba80079 Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Tue, 20 Jul 2021 02:09:14 +0200 Subject: [PATCH] refactor refactor refactor --- cmd/cluster.go | 1 + cmd/integrity.go | 13 +- cmd/populatedb.go | 3 +- cmd/reflector.go | 293 +++++++++++++++++++++--------------- cmd/root.go | 3 +- cmd/start.go | 5 +- cmd/upload.go | 2 +- cmd/version.go | 1 + db/db.go | 88 +---------- internal/metrics/metrics.go | 2 +- peer/client.go | 3 - peer/http3/client.go | 1 + peer/http3/store.go | 6 +- peer/store.go | 5 +- prism/prism.go | 4 +- prism/prism_test.go | 3 +- publish/publish.go | 33 ++-- reflector/blocklist.go | 1 - reflector/server_test.go | 3 +- reflector/uploader.go | 3 +- server/http/routes.go | 9 +- server/http/server.go | 6 +- shared/shared_test.go | 1 + store/caching.go | 7 +- store/caching_test.go | 3 +- store/cloudfront_ro.go | 5 +- store/cloudfront_rw.go | 3 +- store/dbbacked.go | 5 +- store/http.go | 2 +- store/ittt.go | 12 +- store/lfuda.go | 14 +- store/memory.go | 3 +- store/noop.go | 3 +- store/s3.go | 5 +- store/singleflight.go | 2 +- store/store.go | 7 +- 36 files changed, 274 insertions(+), 286 deletions(-) diff --git a/cmd/cluster.go b/cmd/cluster.go index 2c9dcff..12581d3 100644 --- a/cmd/cluster.go +++ b/cmd/cluster.go @@ -7,6 +7,7 @@ import ( "syscall" "github.com/lbryio/lbry.go/v2/extras/crypto" + "github.com/lbryio/reflector.go/cluster" log "github.com/sirupsen/logrus" diff --git a/cmd/integrity.go b/cmd/integrity.go index 8be9e2b..e95338d 100644 --- a/cmd/integrity.go +++ b/cmd/integrity.go @@ -10,10 +10,11 @@ import ( "sync/atomic" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/store/speedwalk" + "github.com/lbryio/lbry.go/v2/extras/errors" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -39,7 +40,7 @@ func integrityCheckCmd(cmd *cobra.Command, args []string) { blobs, err := speedwalk.AllFiles(diskStorePath, true) if err != nil { - log.Errorf("error while reading blobs from disk %s", errors.FullTrace(err)) + log.Fatalf("error while reading blobs from disk %s", errors.FullTrace(err)) } tasks := make(chan string, len(blobs)) done := make(chan bool) @@ -63,12 +64,12 @@ func consume(worker int, tasks <-chan string, done chan<- bool, totalTasks int, start := time.Now() for b := range tasks { - checked := atomic.AddInt32(processed, 1) + processedSoFar := atomic.AddInt32(processed, 1) if worker == 0 { - remaining := int32(totalTasks) - checked - timePerBlob := time.Since(start).Microseconds() / int64(checked) + remaining := int32(totalTasks) - processedSoFar + timePerBlob := time.Since(start).Microseconds() / int64(processedSoFar) remainingTime := time.Duration(int64(remaining)*timePerBlob) * time.Microsecond - log.Infof("[T%d] %d/%d blobs checked. ETA: %s", worker, checked, totalTasks, remainingTime.String()) + log.Infof("[T%d] %d/%d blobs processed so far. ETA: %s", worker, processedSoFar, totalTasks, remainingTime.String()) } blobPath := path.Join(diskStorePath, b[:2], b) blob, err := ioutil.ReadFile(blobPath) diff --git a/cmd/populatedb.go b/cmd/populatedb.go index 7199e42..4dad52a 100644 --- a/cmd/populatedb.go +++ b/cmd/populatedb.go @@ -1,11 +1,12 @@ package cmd import ( - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/store/speedwalk" + "github.com/lbryio/lbry.go/v2/extras/errors" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) diff --git a/cmd/reflector.go b/cmd/reflector.go index 9cae1f0..3ee853d 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -8,8 +8,7 @@ import ( "syscall" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/lbryio/lbry.go/v2/extras/util" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" @@ -19,6 +18,8 @@ import ( "github.com/lbryio/reflector.go/server/http" "github.com/lbryio/reflector.go/store" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/stream" "github.com/c2h5oh/datasize" @@ -27,23 +28,40 @@ import ( ) var ( - tcpPeerPort int - http3PeerPort int - httpPort int - receiverPort int - metricsPort int - disableUploads bool - disableBlocklist bool - proxyAddress string - proxyPort string - proxyProtocol string - useDB bool - cloudFrontEndpoint string - WasabiEndpoint string - reflectorCmdDiskCache string - bufferReflectorCmdDiskCache string - reflectorCmdMemCache int - requestQueueSize int + //port configuration + tcpPeerPort int + http3PeerPort int + httpPort int + receiverPort int + metricsPort int + + //flags configuration + disableUploads bool + disableBlocklist bool + useDB bool + + //upstream configuration + upstreamReflector string + upstreamProtocol string + + //downstream configuration + requestQueueSize int + + //upstream edge configuration (to "cold" storage) + originEndpoint string + originEndpointFallback string + + //cache configuration + diskCache string + secondaryDiskCache string + memCache int +) +var cacheManagers = []string{"localdb", "lfuda", "lru"} + +const ( + LOCALDB int = iota + LFUDA + LRU ) func init() { @@ -52,38 +70,41 @@ func init() { Short: "Run reflector server", Run: reflectorCmd, } - cmd.Flags().StringVar(&proxyAddress, "proxy-address", "", "address of another reflector server where blobs are fetched from") - cmd.Flags().StringVar(&proxyPort, "proxy-port", "5567", "port of another reflector server where blobs are fetched from") - cmd.Flags().StringVar(&proxyProtocol, "proxy-protocol", "http3", "protocol used to fetch blobs from another reflector server (tcp/http3)") - cmd.Flags().StringVar(&cloudFrontEndpoint, "cloudfront-endpoint", "", "CloudFront edge endpoint for standard HTTP retrieval") - cmd.Flags().StringVar(&WasabiEndpoint, "wasabi-endpoint", "", "Wasabi edge endpoint for standard HTTP retrieval") - cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from") + + cmd.Flags().IntVar(&tcpPeerPort, "tcp-peer-port", 5567, "The port reflector will distribute content from for the TCP (LBRY) protocol") cmd.Flags().IntVar(&http3PeerPort, "http3-peer-port", 5568, "The port reflector will distribute content from over HTTP3 protocol") cmd.Flags().IntVar(&httpPort, "http-port", 5569, "The port reflector will distribute content from over HTTP protocol") cmd.Flags().IntVar(&receiverPort, "receiver-port", 5566, "The port reflector will receive content from") - cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") - cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests should be submitted to upstream") + cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for prometheus metrics") + cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") cmd.Flags().BoolVar(&disableBlocklist, "disable-blocklist", false, "Disable blocklist watching/updating") - cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") - cmd.Flags().StringVar(&reflectorCmdDiskCache, "disk-cache", "", - "enable disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'") - cmd.Flags().StringVar(&bufferReflectorCmdDiskCache, "buffer-disk-cache", "", - "enable buffer disk cache, setting max size and path where to store blobs. format is 'sizeGB:CACHE_PATH'") - cmd.Flags().IntVar(&reflectorCmdMemCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") + cmd.Flags().BoolVar(&useDB, "use-db", true, "Whether to connect to the reflector db or not") + + cmd.Flags().StringVar(&upstreamReflector, "upstream-reflector", "", "host:port of a reflector server where blobs are fetched from") + cmd.Flags().StringVar(&upstreamProtocol, "proxy-protocol", "http", "protocol used to fetch blobs from another reflector server (tcp/http3/http)") + + cmd.Flags().IntVar(&requestQueueSize, "request-queue-size", 200, "How many concurrent requests from downstream should be handled at once (the rest will wait)") + + cmd.Flags().StringVar(&originEndpoint, "origin-endpoint", "", "HTTP edge endpoint for standard HTTP retrieval") + cmd.Flags().StringVar(&originEndpointFallback, "origin-endpoint-fallback", "", "HTTP edge endpoint for standard HTTP retrieval if first origin fails") + + cmd.Flags().StringVar(&diskCache, "disk-cache", "100GB:/tmp/downloaded_blobs:localdb", "Where to cache blobs on the file system. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru)") + cmd.Flags().StringVar(&secondaryDiskCache, "optional-disk-cache", "", "Optional secondary file system cache for blobs. format is 'sizeGB:CACHE_PATH:cachemanager' (cachemanagers: localdb/lfuda/lru) (this would get hit before the one specified in disk-cache)") + cmd.Flags().IntVar(&memCache, "mem-cache", 0, "enable in-memory cache with a max size of this many blobs") + rootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) - cleanerStopper := stop.New() // the blocklist logic requires the db backed store to be the outer-most store - underlyingStore := setupStore() - outerStore := wrapWithCache(underlyingStore, cleanerStopper) + underlyingStore := initStores() + underlyingStoreWithCaches, cleanerStopper := initCaches(underlyingStore) if !disableUploads { - reflectorServer := reflector.NewServer(underlyingStore, outerStore) + reflectorServer := reflector.NewServer(underlyingStore, underlyingStoreWithCaches) reflectorServer.Timeout = 3 * time.Minute reflectorServer.EnableBlocklist = !disableBlocklist @@ -94,21 +115,21 @@ func reflectorCmd(cmd *cobra.Command, args []string) { defer reflectorServer.Shutdown() } - peerServer := peer.NewServer(outerStore) + peerServer := peer.NewServer(underlyingStoreWithCaches) err := peerServer.Start(":" + strconv.Itoa(tcpPeerPort)) if err != nil { log.Fatal(err) } defer peerServer.Shutdown() - http3PeerServer := http3.NewServer(outerStore, requestQueueSize) + http3PeerServer := http3.NewServer(underlyingStoreWithCaches, requestQueueSize) err = http3PeerServer.Start(":" + strconv.Itoa(http3PeerPort)) if err != nil { log.Fatal(err) } defer http3PeerServer.Shutdown() - httpServer := http.NewServer(outerStore, requestQueueSize) + httpServer := http.NewServer(underlyingStoreWithCaches, requestQueueSize) err = httpServer.Start(":" + strconv.Itoa(httpPort)) if err != nil { log.Fatal(err) @@ -118,8 +139,8 @@ func reflectorCmd(cmd *cobra.Command, args []string) { metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() defer metricsServer.Shutdown() - defer outerStore.Shutdown() - defer underlyingStore.Shutdown() + defer underlyingStoreWithCaches.Shutdown() + defer underlyingStore.Shutdown() //do we actually need this? Oo interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) @@ -128,45 +149,52 @@ func reflectorCmd(cmd *cobra.Command, args []string) { cleanerStopper.StopAndWait() } -func setupStore() store.BlobStore { +func initUpstreamStore() store.BlobStore { + var s store.BlobStore + if upstreamReflector == "" { + return nil + } + switch upstreamProtocol { + case "tcp": + s = peer.NewStore(peer.StoreOpts{ + Address: upstreamReflector, + Timeout: 30 * time.Second, + }) + case "http3": + s = http3.NewStore(http3.StoreOpts{ + Address: upstreamReflector, + Timeout: 30 * time.Second, + }) + case "http": + s = store.NewHttpStore(upstreamReflector) + default: + log.Fatalf("protocol is not recognized: %s", upstreamProtocol) + } + return s +} +func initEdgeStore() store.BlobStore { + var s3Store *store.S3Store var s store.BlobStore - if proxyAddress != "" { - switch proxyProtocol { - case "tcp": - s = peer.NewStore(peer.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - case "http3": - s = http3.NewStore(http3.StoreOpts{ - Address: proxyAddress + ":" + proxyPort, - Timeout: 30 * time.Second, - }) - case "http": - s = store.NewHttpStore(proxyAddress + ":" + proxyPort) - default: - log.Fatalf("protocol is not recognized: %s", proxyProtocol) - } - } else { - var s3Store *store.S3Store - if conf != "none" { - s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - } - if cloudFrontEndpoint != "" && WasabiEndpoint != "" { - ittt := store.NewITTTStore(store.NewCloudFrontROStore(WasabiEndpoint), store.NewCloudFrontROStore(cloudFrontEndpoint)) - if s3Store != nil { - s = store.NewCloudFrontRWStore(ittt, s3Store) - } else { - s = ittt - } - } else if s3Store != nil { - s = s3Store - } else { - log.Fatalf("this configuration does not include a valid upstream source") - } + if conf != "none" { + s3Store = store.NewS3Store(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) } + if originEndpointFallback != "" && originEndpoint != "" { + ittt := store.NewITTTStore(store.NewCloudFrontROStore(originEndpoint), store.NewCloudFrontROStore(originEndpointFallback)) + if s3Store != nil { + s = store.NewCloudFrontRWStore(ittt, s3Store) + } else { + s = ittt + } + } else if s3Store != nil { + s = s3Store + } else { + log.Fatalf("this configuration does not include a valid upstream source") + } + return s +} +func initDBStore(s store.BlobStore) store.BlobStore { if useDB { dbInst := &db.SQL{ TrackAccess: db.TrackAccessStreams, @@ -176,26 +204,55 @@ func setupStore() store.BlobStore { if err != nil { log.Fatal(err) } - s = store.NewDBBackedStore(s, dbInst, false) } - return s } -func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore { - wrapped := s +func initStores() store.BlobStore { + s := initUpstreamStore() + if s == nil { + s = initEdgeStore() + } + s = initDBStore(s) + return s +} - diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache) +// initCaches returns a store wrapped with caches and a stop group to execute a clean shutdown +func initCaches(s store.BlobStore) (store.BlobStore, *stop.Group) { + stopper := stop.New() + diskStore := initDiskStore(s, diskCache, stopper) + finalStore := initDiskStore(diskStore, secondaryDiskCache, stopper) + stop.New() + if memCache > 0 { + finalStore = store.NewCachingStore( + "reflector", + finalStore, + store.NewLRUStore("mem", store.NewMemStore(), memCache), + ) + } + return finalStore, stopper +} + +func initDiskStore(upstreamStore store.BlobStore, diskParams string, stopper *stop.Group) store.BlobStore { + diskCacheMaxSize, diskCachePath, cacheManager := diskCacheParams(diskParams) //we are tracking blobs in memory with a 1 byte long boolean, which means that for each 2MB (a blob) we need 1Byte // so if the underlying cache holds 10MB, 10MB/2MB=5Bytes which is also the exact count of objects to restore on startup realCacheSize := float64(diskCacheMaxSize) / float64(stream.MaxBlobSize) - if diskCacheMaxSize > 0 { - err := os.MkdirAll(diskCachePath, os.ModePerm) - if err != nil { - log.Fatal(err) - } + if diskCacheMaxSize == 0 { + return upstreamStore + } + err := os.MkdirAll(diskCachePath, os.ModePerm) + if err != nil { + log.Fatal(err) + } + diskStore := store.NewDiskStore(diskCachePath, 2) + var unwrappedStore store.BlobStore + cleanerStopper := stop.New(stopper) + + switch cacheManager { + case cacheManagers[LOCALDB]: localDb := &db.SQL{ SoftDelete: true, TrackAccess: db.TrackAccessBlobs, @@ -205,55 +262,41 @@ func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStor if err != nil { log.Fatal(err) } - dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, true) - wrapped = store.NewCachingStore( - "reflector", - wrapped, - dbBackedDiskStore, - ) - - go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper) + unwrappedStore = store.NewDBBackedStore(diskStore, localDb, true) + go cleanOldestBlobs(int(realCacheSize), localDb, unwrappedStore, cleanerStopper) + case cacheManagers[LFUDA]: + unwrappedStore = store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize) + case cacheManagers[LRU]: + unwrappedStore = store.NewLRUStore("nvme", store.NewDiskStore(diskCachePath, 2), int(realCacheSize)) } - - diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache) - realCacheSize = float64(diskCacheMaxSize) / float64(stream.MaxBlobSize) - if diskCacheMaxSize > 0 { - err := os.MkdirAll(diskCachePath, os.ModePerm) - if err != nil { - log.Fatal(err) - } - wrapped = store.NewCachingStore( - "reflector", - wrapped, - store.NewLFUDAStore("nvme", store.NewDiskStore(diskCachePath, 2), realCacheSize), - ) - } - - if reflectorCmdMemCache > 0 { - wrapped = store.NewCachingStore( - "reflector", - wrapped, - store.NewLRUStore("mem", store.NewMemStore(), reflectorCmdMemCache), - ) - } - + wrapped := store.NewCachingStore( + "reflector", + upstreamStore, + unwrappedStore, + ) return wrapped } -func diskCacheParams(diskParams string) (int, string) { +func diskCacheParams(diskParams string) (int, string, string) { if diskParams == "" { - return 0, "" + return 0, "", "" } parts := strings.Split(diskParams, ":") - if len(parts) != 2 { - log.Fatalf("--disk-cache must be a number, followed by ':', followed by a string") + if len(parts) != 3 { + log.Fatalf("%s does is formatted incorrectly. Expected format: 'sizeGB:CACHE_PATH:cachemanager' for example: '100GB:/tmp/downloaded_blobs:localdb'", diskParams) } diskCacheSize := parts[0] path := parts[1] + cacheManager := parts[2] + if len(path) == 0 || path[0] != '/' { - log.Fatalf("--disk-cache path must start with '/'") + log.Fatalf("disk cache paths must start with '/'") + } + + if !util.InSlice(cacheManager, cacheManagers) { + log.Fatalf("specified cache manager '%s' is not supported. Use one of the following: %v", cacheManager, cacheManagers) } var maxSize datasize.ByteSize @@ -262,9 +305,9 @@ func diskCacheParams(diskParams string) (int, string) { log.Fatal(err) } if maxSize <= 0 { - log.Fatal("--disk-cache size must be more than 0") + log.Fatal("disk cache size must be more than 0") } - return int(maxSize), path + return int(maxSize), path, cacheManager } func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { diff --git a/cmd/root.go b/cmd/root.go index 1e422a7..d5e3bd7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -6,10 +6,11 @@ import ( "os" "strings" + "github.com/lbryio/reflector.go/updater" + "github.com/lbryio/lbry.go/v2/dht" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/util" - "github.com/lbryio/reflector.go/updater" "github.com/johntdyer/slackrus" "github.com/sirupsen/logrus" diff --git a/cmd/start.go b/cmd/start.go index 7b5facd..f7a91de 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -7,8 +7,6 @@ import ( "strings" "syscall" - "github.com/lbryio/lbry.go/v2/dht" - "github.com/lbryio/lbry.go/v2/dht/bits" "github.com/lbryio/reflector.go/cluster" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/peer" @@ -16,6 +14,9 @@ import ( "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" + "github.com/lbryio/lbry.go/v2/dht" + "github.com/lbryio/lbry.go/v2/dht/bits" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) diff --git a/cmd/upload.go b/cmd/upload.go index e2a49cd..afd9063 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -8,8 +8,8 @@ import ( "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" - log "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) diff --git a/cmd/version.go b/cmd/version.go index b3ad97c..4ddee5b 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/lbryio/reflector.go/meta" + "github.com/spf13/cobra" ) diff --git a/db/db.go b/db/db.go index babe641..eca573c 100644 --- a/db/db.go +++ b/db/db.go @@ -39,11 +39,11 @@ type SdBlob struct { type trackAccess int const ( - // Don't track accesses + //TrackAccessNone Don't track accesses TrackAccessNone trackAccess = iota - // Track accesses at the stream level + //TrackAccessStreams Track accesses at the stream level TrackAccessStreams - // Track accesses at the blob level + //TrackAccessBlobs Track accesses at the blob level TrackAccessBlobs ) @@ -101,7 +101,7 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error { return err } -// AddBlob adds a blob to the database. +//AddBlobs adds blobs to the database. func (s *SQL) AddBlobs(hash []string) error { if s.conn == nil { return errors.Err("not connected") @@ -419,7 +419,7 @@ func (s *SQL) Delete(hash string) error { return errors.Err(err) } -// GetHashRange gets the smallest and biggest hashes in the db +//LeastRecentlyAccessedHashes gets the least recently accessed blobs func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) { if s.conn == nil { return nil, errors.Err("not connected") @@ -451,40 +451,6 @@ func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) { return blobs, nil } -// AllHashes writes all hashes from the db into the channel. -// It does not close the channel when it finishes. -//func (s *SQL) AllHashes(ch chan<- string) error { -// if s.conn == nil { -// return errors.Err("not connected") -// } -// -// query := "SELECT hash from blob_" -// if s.SoftDelete { -// query += " where is_stored = 1" -// } -// s.logQuery(query) -// -// rows, err := s.conn.Query(query) -// if err != nil { -// return errors.Err(err) -// } -// defer closeRows(rows) -// -// for rows.Next() { -// var hash string -// err := rows.Scan(&hash) -// if err != nil { -// return errors.Err(err) -// } -// ch <- hash -// // TODO: this needs testing -// // TODO: need a way to cancel this early (e.g. in case of shutdown) -// } -// -// close(ch) -// return nil -//} - func (s *SQL) Count() (int, error) { if s.conn == nil { return 0, errors.Err("not connected") @@ -813,47 +779,3 @@ CREATE TABLE blocked ( ); */ - -//func (d *LiteDBBackedStore) selfClean() { -// d.stopper.Add(1) -// defer d.stopper.Done() -// lastCleanup := time.Now() -// const cleanupInterval = 10 * time.Second -// for { -// select { -// case <-d.stopper.Ch(): -// log.Infoln("stopping self cleanup") -// return -// default: -// time.Sleep(1 * time.Second) -// } -// if time.Since(lastCleanup) < cleanupInterval { -// continue -// -// blobsCount, err := d.db.BlobsCount() -// if err != nil { -// log.Errorf(errors.FullTrace(err)) -// } -// if blobsCount >= d.maxItems { -// itemsToDelete := blobsCount / 100 * 10 -// blobs, err := d.db.GetLRUBlobs(itemsToDelete) -// if err != nil { -// log.Errorf(errors.FullTrace(err)) -// } -// for _, hash := range blobs { -// select { -// case <-d.stopper.Ch(): -// return -// default: -// -// } -// err = d.Delete(hash) -// if err != nil { -// log.Errorf(errors.FullTrace(err)) -// } -// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc() -// } -// } -// lastCleanup = time.Now() -// } -//} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 24bb7b4..d6a93fd 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -154,7 +154,7 @@ var ( Name: "origin_requests_total", Help: "How many Get requests are in flight from the cache to the origin", }, []string{LabelCacheType, LabelComponent}) - // during thundering-herd situations, the metric below should be a lot smaller than the metric above + //during thundering-herd situations, the metric below should be a lot smaller than the metric above CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Subsystem: subsystemCache, diff --git a/peer/client.go b/peer/client.go index 9eae088..fc4192a 100644 --- a/peer/client.go +++ b/peer/client.go @@ -18,9 +18,6 @@ import ( log "github.com/sirupsen/logrus" ) -// ErrBlobExists is a default error for when a blob already exists on the reflector server. -var ErrBlobExists = errors.Base("blob exists on server") - // Client is an instance of a client connected to a server. type Client struct { Timeout time.Duration diff --git a/peer/http3/client.go b/peer/http3/client.go index d9b1495..a92d53d 100644 --- a/peer/http3/client.go +++ b/peer/http3/client.go @@ -15,6 +15,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lucas-clemente/quic-go/http3" log "github.com/sirupsen/logrus" ) diff --git a/peer/http3/store.go b/peer/http3/store.go index 7d8fc34..e25c6f1 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -8,10 +8,12 @@ import ( "sync" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/http3" ) diff --git a/peer/store.go b/peer/store.go index 689d1c0..c1682e8 100644 --- a/peer/store.go +++ b/peer/store.go @@ -4,10 +4,11 @@ import ( "strings" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" ) // Store is a blob store that gets blobs from a peer. diff --git a/prism/prism.go b/prism/prism.go index ee18ba1..3484775 100644 --- a/prism/prism.go +++ b/prism/prism.go @@ -5,14 +5,14 @@ import ( "strconv" "sync" - "github.com/lbryio/lbry.go/v2/dht" - "github.com/lbryio/lbry.go/v2/dht/bits" "github.com/lbryio/reflector.go/cluster" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" + "github.com/lbryio/lbry.go/v2/dht" + "github.com/lbryio/lbry.go/v2/dht/bits" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" diff --git a/prism/prism_test.go b/prism/prism_test.go index 1297f7a..c71df4f 100644 --- a/prism/prism_test.go +++ b/prism/prism_test.go @@ -4,8 +4,9 @@ import ( "math/big" "testing" - "github.com/davecgh/go-spew/spew" "github.com/lbryio/lbry.go/v2/dht/bits" + + "github.com/davecgh/go-spew/spew" ) func TestAnnounceRange(t *testing.T) { diff --git a/publish/publish.go b/publish/publish.go index 9e585e8..efb6478 100644 --- a/publish/publish.go +++ b/publish/publish.go @@ -22,24 +22,23 @@ import ( "github.com/golang/protobuf/proto" ) -var TODO = ` - import cert from wallet - get all utxos from chainquery - create transaction - sign it with the channel - track state of utxos across publishes from this channel so that we can just do one query to get utxos - prioritize only confirmed utxos +/* TODO: +import cert from wallet +get all utxos from chainquery +create transaction +sign it with the channel +track state of utxos across publishes from this channel so that we can just do one query to get utxos +prioritize only confirmed utxos - Handling all the issues we handle currently with lbrynet: - "Couldn't find private key for id", - "You already have a stream claim published under the name", - "Cannot publish using channel", - "txn-mempool-conflict", - "too-long-mempool-chain", - "Missing inputs", - "Not enough funds to cover this transaction", -} -` +Handling all the issues we handle currently with lbrynet: + "Couldn't find private key for id", + "You already have a stream claim published under the name", + "Cannot publish using channel", + "txn-mempool-conflict", + "too-long-mempool-chain", + "Missing inputs", + "Not enough funds to cover this transaction", +*/ type Details struct { Title string diff --git a/reflector/blocklist.go b/reflector/blocklist.go index 26edff6..262671f 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -9,7 +9,6 @@ import ( "time" "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/wallet" diff --git a/reflector/server_test.go b/reflector/server_test.go index 9004d84..1630afd 100644 --- a/reflector/server_test.go +++ b/reflector/server_test.go @@ -9,9 +9,10 @@ import ( "testing" "time" - "github.com/lbryio/lbry.go/v2/dht/bits" "github.com/lbryio/reflector.go/store" + "github.com/lbryio/lbry.go/v2/dht/bits" + "github.com/davecgh/go-spew/spew" "github.com/phayes/freeport" ) diff --git a/reflector/uploader.go b/reflector/uploader.go index b421272..6804ae7 100644 --- a/reflector/uploader.go +++ b/reflector/uploader.go @@ -7,9 +7,8 @@ import ( "sync" "time" - "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/extras/errors" diff --git a/server/http/routes.go b/server/http/routes.go index 0984c47..cea3e0a 100644 --- a/server/http/routes.go +++ b/server/http/routes.go @@ -5,12 +5,13 @@ import ( "sync" "time" - "github.com/gin-gonic/gin" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/reflector.go/shared" - "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/v2/extras/errors" + + "github.com/gin-gonic/gin" ) func (s *Server) getBlob(c *gin.Context) { diff --git a/server/http/server.go b/server/http/server.go index f0561cf..11c2c16 100644 --- a/server/http/server.go +++ b/server/http/server.go @@ -5,11 +5,13 @@ import ( "net/http" "time" + "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/v2/extras/stop" + "github.com/bluele/gcache" nice "github.com/ekyoung/gin-nice-recovery" "github.com/gin-gonic/gin" - "github.com/lbryio/lbry.go/v2/extras/stop" - "github.com/lbryio/reflector.go/store" log "github.com/sirupsen/logrus" ) diff --git a/shared/shared_test.go b/shared/shared_test.go index a85ea98..3b67742 100644 --- a/shared/shared_test.go +++ b/shared/shared_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/lbryio/lbry.go/v2/extras/util" + "github.com/stretchr/testify/assert" ) diff --git a/store/caching.go b/store/caching.go index 2127119..f4f4d28 100644 --- a/store/caching.go +++ b/store/caching.go @@ -3,12 +3,13 @@ package store import ( "time" + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - "github.com/lbryio/reflector.go/shared" - log "github.com/sirupsen/logrus" - "github.com/lbryio/reflector.go/internal/metrics" + log "github.com/sirupsen/logrus" ) // CachingStore combines two stores, typically a local and a remote store, to improve performance. diff --git a/store/caching_test.go b/store/caching_test.go index 66a42fd..6fd5544 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -6,8 +6,9 @@ import ( "testing" "time" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/shared" + + "github.com/lbryio/lbry.go/v2/stream" ) func TestCachingStore_Put(t *testing.T) { diff --git a/store/cloudfront_ro.go b/store/cloudfront_ro.go index 757174c..56b1cfb 100644 --- a/store/cloudfront_ro.go +++ b/store/cloudfront_ro.go @@ -6,12 +6,13 @@ import ( "net/http" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + log "github.com/sirupsen/logrus" ) diff --git a/store/cloudfront_rw.go b/store/cloudfront_rw.go index 6b293a8..0947e32 100644 --- a/store/cloudfront_rw.go +++ b/store/cloudfront_rw.go @@ -3,8 +3,9 @@ package store import ( "time" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/shared" + + "github.com/lbryio/lbry.go/v2/stream" ) // CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront/Wasabi, writes go to S3. diff --git a/store/dbbacked.go b/store/dbbacked.go index 90c9fed..970cd54 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -5,11 +5,12 @@ import ( "sync" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + log "github.com/sirupsen/logrus" ) diff --git a/store/http.go b/store/http.go index c24169a..63dedce 100644 --- a/store/http.go +++ b/store/http.go @@ -16,7 +16,7 @@ import ( "github.com/lbryio/lbry.go/v2/stream" ) -// NoopStore is a store that does nothing +// HttpStore is a store that works on top of the HTTP protocol type HttpStore struct { upstream string httpClient *http.Client diff --git a/store/ittt.go b/store/ittt.go index 5edc67d..3abac7c 100644 --- a/store/ittt.go +++ b/store/ittt.go @@ -3,19 +3,19 @@ package store import ( "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" - "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/shared" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" ) -// ITTT store performs an operation on this storage, if this fails, it attempts to run it on that +// ITTTStore performs an operation on this storage, if this fails, it attempts to run it on that type ITTTStore struct { this, that BlobStore } -// NewCachingStore makes a new caching disk store and returns a pointer to it. +// NewITTTStore returns a new instance of the IF THIS THAN THAT store func NewITTTStore(this, that BlobStore) *ITTTStore { return &ITTTStore{ this: this, @@ -28,7 +28,7 @@ const nameIttt = "ittt" // Name is the cache type name func (c *ITTTStore) Name() string { return nameIttt } -// Has checks the cache and then the origin for a hash. It returns true if either store has it. +// Has checks in this for a hash, if it fails it checks in that. It returns true if either store has it. func (c *ITTTStore) Has(hash string) (bool, error) { has, err := c.this.Has(hash) if err != nil || !has { diff --git a/store/lfuda.go b/store/lfuda.go index b0437b0..7454686 100644 --- a/store/lfuda.go +++ b/store/lfuda.go @@ -1,17 +1,21 @@ package store +//TODO: the caching strategy is actually not LFUDA, it should become a parameter and the name of the struct should be changed + import ( "time" - "github.com/bparli/lfuda-go" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/shared" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + + "github.com/bparli/lfuda-go" "github.com/sirupsen/logrus" ) -// LRUStore adds a max cache size and LRU eviction to a BlobStore +// LFUDAStore adds a max cache size and Greedy-Dual-Size-Frequency cache eviction strategy to a BlobStore type LFUDAStore struct { // underlying store store BlobStore @@ -19,7 +23,7 @@ type LFUDAStore struct { lfuda *lfuda.Cache } -// NewLRUStore initialize a new LRUStore +// NewLFUDAStore initialize a new LRUStore func NewLFUDAStore(component string, store BlobStore, maxSize float64) *LFUDAStore { lfuda := lfuda.NewGDSFWithEvict(maxSize, func(key interface{}, value interface{}) { metrics.CacheLRUEvictCount.With(metrics.CacheLabels(store.Name(), component)).Inc() diff --git a/store/memory.go b/store/memory.go index 30d8dea..62d1047 100644 --- a/store/memory.go +++ b/store/memory.go @@ -4,9 +4,10 @@ import ( "sync" "time" + "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - "github.com/lbryio/reflector.go/shared" ) // MemStore is an in memory only blob store with no persistence. diff --git a/store/noop.go b/store/noop.go index 9cadf5c..fd7b35e 100644 --- a/store/noop.go +++ b/store/noop.go @@ -3,8 +3,9 @@ package store import ( "time" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/shared" + + "github.com/lbryio/lbry.go/v2/stream" ) // NoopStore is a store that does nothing diff --git a/store/s3.go b/store/s3.go index 75ba459..00b3f95 100644 --- a/store/s3.go +++ b/store/s3.go @@ -5,11 +5,12 @@ import ( "net/http" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" diff --git a/store/singleflight.go b/store/singleflight.go index 02337a2..fa8ae99 100644 --- a/store/singleflight.go +++ b/store/singleflight.go @@ -3,10 +3,10 @@ package store import ( "time" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" "golang.org/x/sync/singleflight" diff --git a/store/store.go b/store/store.go index bd9223b..387a9ec 100644 --- a/store/store.go +++ b/store/store.go @@ -1,22 +1,23 @@ package store import ( + "github.com/lbryio/reflector.go/shared" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" - "github.com/lbryio/reflector.go/shared" ) // BlobStore is an interface for handling blob storage. type BlobStore interface { // Name of blob store (useful for metrics) Name() string - // Does blob exist in the store. + // Has Does blob exist in the store. Has(hash string) (bool, error) // Get the blob from the store. Must return ErrBlobNotFound if blob is not in store. Get(hash string) (stream.Blob, shared.BlobTrace, error) // Put the blob into the store. Put(hash string, blob stream.Blob) error - // Put an SD blob into the store. + // PutSD an SD blob into the store. PutSD(hash string, blob stream.Blob) error // Delete the blob from the store. Delete(hash string) error