remove all hashing on the download path #54

Merged
shyba merged 1 commit from hash_twice into master 2021-09-21 18:01:13 +02:00
2 changed files with 1 additions and 29 deletions

View file

@ -274,7 +274,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
return nil, err return nil, err
} else { } else {
response.IncomingBlob = incomingBlob{ response.IncomingBlob = incomingBlob{
BlobHash: reflector.BlobHash(blob), BlobHash: request.RequestedBlob,
Length: len(blob), Length: len(blob),
} }
metrics.MtrOutBytesTcp.Add(float64(len(blob))) metrics.MtrOutBytesTcp.Add(float64(len(blob)))

View file

@ -2,9 +2,6 @@ package store
import ( import (
"bytes" "bytes"
"crypto/sha512"
"encoding/hex"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
@ -18,8 +15,6 @@ import (
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/brk0v/directio" "github.com/brk0v/directio"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
) )
// DiskStore stores blobs on a local disk // DiskStore stores blobs on a local disk
@ -31,12 +26,8 @@ type DiskStore struct {
// true if initOnce ran, false otherwise // true if initOnce ran, false otherwise
initialized bool initialized bool
concurrentChecks atomic.Int32
} }
const maxConcurrentChecks = 30
// NewDiskStore returns an initialized file disk store pointer. // NewDiskStore returns an initialized file disk store pointer.
func NewDiskStore(dir string, prefixLength int) *DiskStore { func NewDiskStore(dir string, prefixLength int) *DiskStore {
return &DiskStore{ return &DiskStore{
@ -82,25 +73,6 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
} }
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err) return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
} }
// this is a rather poor yet effective way of throttling how many blobs can be checked concurrently
// poor because there is a possible race condition between the check and the actual +1
if d.concurrentChecks.Load() < maxConcurrentChecks {
d.concurrentChecks.Add(1)
defer d.concurrentChecks.Sub(1)
hashBytes := sha512.Sum384(blob)
readHash := hex.EncodeToString(hashBytes[:])
if hash != readHash {
message := fmt.Sprintf("[%s] found a broken blob while reading from disk. Actual hash: %s", hash, readHash)
log.Errorf("%s", message)
err := d.Delete(hash)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
}
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(message)
}
}
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
} }