sendfile proof of concept #53

Closed
shyba wants to merge 1 commit from sendfile into master
2 changed files with 41 additions and 21 deletions

View file

@ -32,15 +32,17 @@ const (
// Server is an instance of a peer server that houses the listener and store. // Server is an instance of a peer server that houses the listener and store.
type Server struct { type Server struct {
store store.BlobStore store store.BlobStore
disk *store.DiskStore
closed bool closed bool
grp *stop.Group grp *stop.Group
} }
// NewServer returns an initialized Server pointer. // NewServer returns an initialized Server pointer.
func NewServer(store store.BlobStore) *Server { func NewServer(blobstore store.BlobStore) *Server {
return &Server{ return &Server{
store: store, store: blobstore,
disk: store.NewDiskStore("/external/blobs", 2),
grp: stop.New(), grp: stop.New(),
} }
} }
@ -131,7 +133,7 @@ func (s *Server) handleConnection(conn net.Conn) {
log.Error(errors.FullTrace(err)) log.Error(errors.FullTrace(err))
} }
response, err = s.handleCompositeRequest(request) response, blobHash, err := s.handleCompositeRequest(request)
if err != nil { if err != nil {
log.Error(errors.FullTrace(err)) log.Error(errors.FullTrace(err))
return return
@ -143,6 +145,7 @@ func (s *Server) handleConnection(conn net.Conn) {
} }
n, err := conn.Write(response) n, err := conn.Write(response)
s.disk.Sendfile(blobHash, conn)
if err != nil { if err != nil {
if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset
s.logError(err) s.logError(err)
@ -221,15 +224,15 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
// return append(response, blob...), nil // return append(response, blob...), nil
//} //}
func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { func (s *Server) handleCompositeRequest(data []byte) ([]byte, string, error) {
var request compositeRequest var request compositeRequest
err := json.Unmarshal(data, &request) err := json.Unmarshal(data, &request)
if err != nil { if err != nil {
var je *json.SyntaxError var je *json.SyntaxError
if ee.As(err, &je) { if ee.As(err, &je) {
return nil, errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data)) return nil, "", errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data))
} }
return nil, errors.Err(err) return nil, "", errors.Err(err)
} }
response := compositeResponse{ response := compositeResponse{
@ -241,7 +244,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
for _, blobHash := range request.RequestedBlobs { for _, blobHash := range request.RequestedBlobs {
exists, err := s.store.Has(blobHash) exists, err := s.store.Has(blobHash)
if err != nil { if err != nil {
return nil, err return nil, "", err
} }
if exists { if exists {
availableBlobs = append(availableBlobs, blobHash) availableBlobs = append(availableBlobs, blobHash)
@ -256,39 +259,35 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
} }
var blob []byte var blob []byte
var trace shared.BlobTrace
if request.RequestedBlob != "" { if request.RequestedBlob != "" {
if len(request.RequestedBlob) != stream.BlobHashHexLength { if len(request.RequestedBlob) != stream.BlobHashHexLength {
return nil, errors.Err("Invalid blob hash length") return nil, "", errors.Err("Invalid blob hash length")
} }
log.Debugln("Sending blob " + request.RequestedBlob[:8]) log.Debugln("Sending blob " + request.RequestedBlob[:8])
blob, trace, err = s.store.Get(request.RequestedBlob) size, err := s.disk.Size(request.RequestedBlob)
log.Debug(trace.String()) if err != nil {
if errors.Is(err, store.ErrBlobNotFound) {
response.IncomingBlob = incomingBlob{ response.IncomingBlob = incomingBlob{
Error: err.Error(), Error: err.Error(),
} }
} else if err != nil {
return nil, err
} else { } else {
response.IncomingBlob = incomingBlob{ response.IncomingBlob = incomingBlob{
BlobHash: reflector.BlobHash(blob), BlobHash: request.RequestedBlob,
Length: len(blob), Length: int(size),
}
} }
metrics.MtrOutBytesTcp.Add(float64(len(blob))) metrics.MtrOutBytesTcp.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc() metrics.BlobDownloadCount.Inc()
metrics.PeerDownloadCount.Inc() metrics.PeerDownloadCount.Inc()
} }
}
respData, err := json.Marshal(response) respData, err := json.Marshal(response)
if err != nil { if err != nil {
return nil, err return nil, "", err
} }
return append(respData, blob...), nil return respData, request.RequestedBlob, nil
} }
func (s *Server) logError(e error) { func (s *Server) logError(e error) {

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"os" "os"
"path" "path"
"time" "time"
@ -67,6 +68,14 @@ func (d *DiskStore) Has(hash string) (bool, error) {
return true, nil return true, nil
} }
func (d *DiskStore) Size(hash string) (int64, error) {
stat, err := os.Stat(d.path(hash))
if err != nil {
return 0, err
}
return stat.Size(), nil
}
// Get returns the blob or an error if the blob doesn't exist. // Get returns the blob or an error if the blob doesn't exist.
func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now() start := time.Now()
@ -211,3 +220,15 @@ func (d *DiskStore) initOnce() error {
// Shutdown shuts down the store gracefully // Shutdown shuts down the store gracefully
func (d *DiskStore) Shutdown() { func (d *DiskStore) Shutdown() {
} }
func (d *DiskStore) Sendfile(hash string, conn net.Conn) error {
f, err := os.Open(d.path(hash))
if err != nil {
return err
}
_, err = io.Copy(conn, f)
if err != nil {
return err
}
return nil
}