Insert in tx #51

Closed
lyoshenka wants to merge 52 commits from insert_in_tx into master
3 changed files with 124 additions and 17 deletions
Showing only changes of commit dc95351cf3 - Show all commits

93
cmd/integrity.go Normal file
View file

@ -0,0 +1,93 @@
package cmd
import (
"crypto/sha512"
"encoding/hex"
"io/ioutil"
"os"
"path"
"runtime"
"sync/atomic"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/store/speedwalk"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var threads int
func init() {
var cmd = &cobra.Command{
Use: "check-integrity",
Short: "check blobs integrity for a given path",
Run: integrityCheckCmd,
}
cmd.Flags().StringVar(&diskStorePath, "store-path", "", "path of the store where all blobs are cached")
cmd.Flags().IntVar(&threads, "threads", runtime.NumCPU()-1, "number of concurrent threads to process blobs")
rootCmd.AddCommand(cmd)
}
func integrityCheckCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
if diskStorePath == "" {
log.Fatal("store-path must be defined")
}
blobs, err := speedwalk.AllFiles(diskStorePath, true)
if err != nil {
log.Errorf("error while reading blobs from disk %s", errors.FullTrace(err))
}
tasks := make(chan string, len(blobs))
done := make(chan bool)
processed := new(int32)
go produce(tasks, blobs)
cpus := runtime.NumCPU()
for i := 0; i < cpus-1; i++ {
go consume(i, tasks, done, len(blobs), processed)
}
<-done
}
func produce(tasks chan<- string, blobs []string) {
for _, b := range blobs {
tasks <- b
}
close(tasks)
}
func consume(worker int, tasks <-chan string, done chan<- bool, totalTasks int, processed *int32) {
start := time.Now()
for b := range tasks {
checked := atomic.AddInt32(processed, 1)
if worker == 0 {
remaining := int32(totalTasks) - checked
timePerBlob := time.Since(start).Microseconds() / int64(checked)
remainingTime := time.Duration(int64(remaining)*timePerBlob) * time.Microsecond
log.Infof("[T%d] %d/%d blobs checked. ETA: %s", worker, checked, totalTasks, remainingTime.String())
}
blobPath := path.Join(diskStorePath, b[:2], b)
blob, err := ioutil.ReadFile(blobPath)
if err != nil {
if os.IsNotExist(err) {
continue
}
log.Errorf("[Worker %d] Error looking up blob %s: %s", worker, b, err.Error())
continue
}
hashBytes := sha512.Sum384(blob)
readHash := hex.EncodeToString(hashBytes[:])
if readHash != b {
log.Infof("[%s] found a broken blob while reading from disk. Actual hash: %s", b, readHash)
err := os.Remove(blobPath)
if err != nil {
log.Errorf("Error while deleting broken blob %s: %s", b, err.Error())
}
}
}
done <- true
}

View file

@ -15,21 +15,23 @@ type blobRequest struct {
finished *sync.WaitGroup finished *sync.WaitGroup
} }
var getReqCh = make(chan *blobRequest) var getReqCh = make(chan *blobRequest, 20000)
func InitWorkers(server *Server, workers int) error { func InitWorkers(server *Server, workers int) {
stopper := stop.New(server.grp) stopper := stop.New(server.grp)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go func(worker int) { go func(worker int) {
for {
select { select {
case <-stopper.Ch(): case <-stopper.Ch():
case r := <-getReqCh: case r := <-getReqCh:
metrics.Http3BlobReqQueue.Dec() metrics.Http3BlobReqQueue.Dec()
process(server, r) process(server, r)
} }
}
}(i) }(i)
} }
return nil return
} }
func enqueue(b *blobRequest) { func enqueue(b *blobRequest) {

View file

@ -14,6 +14,7 @@ import (
"github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store/speedwalk" "github.com/lbryio/reflector.go/store/speedwalk"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
) )
// DiskStore stores blobs on a local disk // DiskStore stores blobs on a local disk
@ -25,8 +26,12 @@ type DiskStore struct {
// true if initOnce ran, false otherwise // true if initOnce ran, false otherwise
initialized bool initialized bool
concurrentChecks atomic.Int32
} }
const maxConcurrentChecks = 3
// NewDiskStore returns an initialized file disk store pointer. // NewDiskStore returns an initialized file disk store pointer.
func NewDiskStore(dir string, prefixLength int) *DiskStore { func NewDiskStore(dir string, prefixLength int) *DiskStore {
return &DiskStore{ return &DiskStore{
@ -72,6 +77,12 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
} }
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err) return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
} }
// this is a rather poor yet effective way of throttling how many blobs can be checked concurrently
// poor because there is a possible race condition between the check and the actual +1
if d.concurrentChecks.Load() < maxConcurrentChecks {
d.concurrentChecks.Add(1)
defer d.concurrentChecks.Sub(1)
hashBytes := sha512.Sum384(blob) hashBytes := sha512.Sum384(blob)
readHash := hex.EncodeToString(hashBytes[:]) readHash := hex.EncodeToString(hashBytes[:])
if hash != readHash { if hash != readHash {
@ -83,6 +94,7 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
} }
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(message) return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(message)
} }
}
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
} }