remove all hashing on the download path #54
2 changed files with 1 additions and 29 deletions
|
@ -274,7 +274,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
response.IncomingBlob = incomingBlob{
|
response.IncomingBlob = incomingBlob{
|
||||||
BlobHash: reflector.BlobHash(blob),
|
BlobHash: request.RequestedBlob,
|
||||||
Length: len(blob),
|
Length: len(blob),
|
||||||
}
|
}
|
||||||
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
|
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
|
||||||
|
|
|
@ -2,9 +2,6 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/sha512"
|
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
@ -18,8 +15,6 @@ import (
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
"github.com/brk0v/directio"
|
"github.com/brk0v/directio"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
"go.uber.org/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// DiskStore stores blobs on a local disk
|
// DiskStore stores blobs on a local disk
|
||||||
|
@ -31,12 +26,8 @@ type DiskStore struct {
|
||||||
|
|
||||||
// true if initOnce ran, false otherwise
|
// true if initOnce ran, false otherwise
|
||||||
initialized bool
|
initialized bool
|
||||||
|
|
||||||
concurrentChecks atomic.Int32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxConcurrentChecks = 30
|
|
||||||
|
|
||||||
// NewDiskStore returns an initialized file disk store pointer.
|
// NewDiskStore returns an initialized file disk store pointer.
|
||||||
func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
func NewDiskStore(dir string, prefixLength int) *DiskStore {
|
||||||
return &DiskStore{
|
return &DiskStore{
|
||||||
|
@ -82,25 +73,6 @@ func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
}
|
}
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a rather poor yet effective way of throttling how many blobs can be checked concurrently
|
|
||||||
// poor because there is a possible race condition between the check and the actual +1
|
|
||||||
if d.concurrentChecks.Load() < maxConcurrentChecks {
|
|
||||||
d.concurrentChecks.Add(1)
|
|
||||||
defer d.concurrentChecks.Sub(1)
|
|
||||||
hashBytes := sha512.Sum384(blob)
|
|
||||||
readHash := hex.EncodeToString(hashBytes[:])
|
|
||||||
if hash != readHash {
|
|
||||||
message := fmt.Sprintf("[%s] found a broken blob while reading from disk. Actual hash: %s", hash, readHash)
|
|
||||||
log.Errorf("%s", message)
|
|
||||||
err := d.Delete(hash)
|
|
||||||
if err != nil {
|
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
|
|
||||||
}
|
|
||||||
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
|
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue