From dc95351cf30c3332185c929b18fc0eba29f54afa Mon Sep 17 00:00:00 2001 From: Niko Storni Date: Wed, 7 Apr 2021 04:46:18 +0200 Subject: [PATCH] add integrity check cmd throttle live integrity checks bug fixes --- cmd/integrity.go | 93 ++++++++++++++++++++++++++++++++++++++++++++ peer/http3/worker.go | 18 +++++---- store/disk.go | 30 +++++++++----- 3 files changed, 124 insertions(+), 17 deletions(-) create mode 100644 cmd/integrity.go diff --git a/cmd/integrity.go b/cmd/integrity.go new file mode 100644 index 0000000..8be9e2b --- /dev/null +++ b/cmd/integrity.go @@ -0,0 +1,93 @@ +package cmd + +import ( + "crypto/sha512" + "encoding/hex" + "io/ioutil" + "os" + "path" + "runtime" + "sync/atomic" + "time" + + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/reflector.go/meta" + "github.com/lbryio/reflector.go/store/speedwalk" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var threads int + +func init() { + var cmd = &cobra.Command{ + Use: "check-integrity", + Short: "check blobs integrity for a given path", + Run: integrityCheckCmd, + } + cmd.Flags().StringVar(&diskStorePath, "store-path", "", "path of the store where all blobs are cached") + cmd.Flags().IntVar(&threads, "threads", runtime.NumCPU()-1, "number of concurrent threads to process blobs") + rootCmd.AddCommand(cmd) +} + +func integrityCheckCmd(cmd *cobra.Command, args []string) { + log.Printf("reflector %s", meta.VersionString()) + if diskStorePath == "" { + log.Fatal("store-path must be defined") + } + + blobs, err := speedwalk.AllFiles(diskStorePath, true) + if err != nil { + log.Errorf("error while reading blobs from disk %s", errors.FullTrace(err)) + } + tasks := make(chan string, len(blobs)) + done := make(chan bool) + processed := new(int32) + go produce(tasks, blobs) + cpus := runtime.NumCPU() + for i := 0; i < cpus-1; i++ { + go consume(i, tasks, done, len(blobs), processed) + } + <-done +} + +func produce(tasks chan<- string, blobs []string) { + for _, b := range blobs { + tasks <- b + } + close(tasks) +} + +func consume(worker int, tasks <-chan string, done chan<- bool, totalTasks int, processed *int32) { + start := time.Now() + + for b := range tasks { + checked := atomic.AddInt32(processed, 1) + if worker == 0 { + remaining := int32(totalTasks) - checked + timePerBlob := time.Since(start).Microseconds() / int64(checked) + remainingTime := time.Duration(int64(remaining)*timePerBlob) * time.Microsecond + log.Infof("[T%d] %d/%d blobs checked. ETA: %s", worker, checked, totalTasks, remainingTime.String()) + } + blobPath := path.Join(diskStorePath, b[:2], b) + blob, err := ioutil.ReadFile(blobPath) + if err != nil { + if os.IsNotExist(err) { + continue + } + log.Errorf("[Worker %d] Error looking up blob %s: %s", worker, b, err.Error()) + continue + } + hashBytes := sha512.Sum384(blob) + readHash := hex.EncodeToString(hashBytes[:]) + if readHash != b { + log.Infof("[%s] found a broken blob while reading from disk. Actual hash: %s", b, readHash) + err := os.Remove(blobPath) + if err != nil { + log.Errorf("Error while deleting broken blob %s: %s", b, err.Error()) + } + } + } + done <- true +} diff --git a/peer/http3/worker.go b/peer/http3/worker.go index dac2893..d2b625b 100644 --- a/peer/http3/worker.go +++ b/peer/http3/worker.go @@ -15,21 +15,23 @@ type blobRequest struct { finished *sync.WaitGroup } -var getReqCh = make(chan *blobRequest) +var getReqCh = make(chan *blobRequest, 20000) -func InitWorkers(server *Server, workers int) error { +func InitWorkers(server *Server, workers int) { stopper := stop.New(server.grp) for i := 0; i < workers; i++ { go func(worker int) { - select { - case <-stopper.Ch(): - case r := <-getReqCh: - metrics.Http3BlobReqQueue.Dec() - process(server, r) + for { + select { + case <-stopper.Ch(): + case r := <-getReqCh: + metrics.Http3BlobReqQueue.Dec() + process(server, r) + } } }(i) } - return nil + return } func enqueue(b *blobRequest) { diff --git a/store/disk.go b/store/disk.go index 08cdbd1..eaecfbb 100644 --- a/store/disk.go +++ b/store/disk.go @@ -14,6 +14,7 @@ import ( "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store/speedwalk" log "github.com/sirupsen/logrus" + "go.uber.org/atomic" ) // DiskStore stores blobs on a local disk @@ -25,8 +26,12 @@ type DiskStore struct { // true if initOnce ran, false otherwise initialized bool + + concurrentChecks atomic.Int32 } +const maxConcurrentChecks = 3 + // NewDiskStore returns an initialized file disk store pointer. func NewDiskStore(dir string, prefixLength int) *DiskStore { return &DiskStore{ @@ -72,16 +77,23 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { } return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err) } - hashBytes := sha512.Sum384(blob) - readHash := hex.EncodeToString(hashBytes[:]) - if hash != readHash { - message := fmt.Sprintf("[%s] found a broken blob while reading from disk. Actual hash: %s", hash, readHash) - log.Errorf("%s", message) - err := d.Delete(hash) - if err != nil { - return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err + + // this is a rather poor yet effective way of throttling how many blobs can be checked concurrently + // poor because there is a possible race condition between the check and the actual +1 + if d.concurrentChecks.Load() < maxConcurrentChecks { + d.concurrentChecks.Add(1) + defer d.concurrentChecks.Sub(1) + hashBytes := sha512.Sum384(blob) + readHash := hex.EncodeToString(hashBytes[:]) + if hash != readHash { + message := fmt.Sprintf("[%s] found a broken blob while reading from disk. Actual hash: %s", hash, readHash) + log.Errorf("%s", message) + err := d.Delete(hash) + if err != nil { + return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err + } + return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(message) } - return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(message) } return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil