From c1caf1938c35082fe18920f59cbabb276297fba4 Mon Sep 17 00:00:00 2001 From: Mark Beamer Jr Date: Thu, 20 May 2021 20:41:47 -0400 Subject: [PATCH] Add queue to prevent writing too many files at once. --- store/disk.go | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/store/disk.go b/store/disk.go index 1515135..e52731a 100644 --- a/store/disk.go +++ b/store/disk.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "path" + "runtime" "time" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -18,6 +19,23 @@ import ( "go.uber.org/atomic" ) +func init() { + writeCh = make(chan writeRequest) + for i := 0; i < runtime.NumCPU(); i++ { + go func() { + select { + case r := <-writeCh: + err := ioutil.WriteFile(r.filename, r.data, r.perm) + if err != nil { + log.Errorf("could not write file %s to disk, failed with error: %s", r.filename, err.Error()) + } + } + }() + } +} + +var writeCh chan writeRequest + // DiskStore stores blobs on a local disk type DiskStore struct { // the location of blobs on disk @@ -128,10 +146,7 @@ func (d *DiskStore) Put(hash string, blob stream.Blob) error { hashBytes := sha512.Sum384(blob) readHash := hex.EncodeToString(hashBytes[:]) matchesBeforeWriting := readHash == hash - err = ioutil.WriteFile(d.path(hash), blob, 0644) - if err != nil { - log.Errorf("Error saving to disk: %s", err.Error()) - } + writeFile(d.path(hash), blob, 0644) readBlob, err := ioutil.ReadFile(d.path(hash)) matchesAfterReading := false if err != nil { @@ -215,7 +230,21 @@ func (d *DiskStore) initOnce() error { return nil } +type writeRequest struct { + filename string + data []byte + perm os.FileMode +} + // Shutdown shuts down the store gracefully func (d *DiskStore) Shutdown() { return } + +func writeFile(filename string, data []byte, perm os.FileMode) { + writeCh <- writeRequest{ + filename: filename, + data: data, + perm: perm, + } +}