From 71fb06fbb85fcd1cfc3be08cee00a5f58cedfdd0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 2 Aug 2021 03:23:54 -0300 Subject: [PATCH] sendfile proof of concept --- server/peer/server.go | 41 ++++++++++++++++++++--------------------- store/disk.go | 21 +++++++++++++++++++++ 2 files changed, 41 insertions(+), 21 deletions(-) diff --git a/server/peer/server.go b/server/peer/server.go index 4061ea5..5c8b7a1 100644 --- a/server/peer/server.go +++ b/server/peer/server.go @@ -32,15 +32,17 @@ const ( // Server is an instance of a peer server that houses the listener and store. type Server struct { store store.BlobStore + disk *store.DiskStore closed bool grp *stop.Group } // NewServer returns an initialized Server pointer. -func NewServer(store store.BlobStore) *Server { +func NewServer(blobstore store.BlobStore) *Server { return &Server{ - store: store, + store: blobstore, + disk: store.NewDiskStore("/external/blobs", 2), grp: stop.New(), } } @@ -131,7 +133,7 @@ func (s *Server) handleConnection(conn net.Conn) { log.Error(errors.FullTrace(err)) } - response, err = s.handleCompositeRequest(request) + response, blobHash, err := s.handleCompositeRequest(request) if err != nil { log.Error(errors.FullTrace(err)) return @@ -143,6 +145,7 @@ func (s *Server) handleConnection(conn net.Conn) { } n, err := conn.Write(response) + s.disk.Sendfile(blobHash, conn) if err != nil { if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset s.logError(err) @@ -221,15 +224,15 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) { // return append(response, blob...), nil //} -func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { +func (s *Server) handleCompositeRequest(data []byte) ([]byte, string, error) { var request compositeRequest err := json.Unmarshal(data, &request) if err != nil { var je *json.SyntaxError if ee.As(err, &je) { - return nil, errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data)) + return nil, "", errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data)) } - return nil, errors.Err(err) + return nil, "", errors.Err(err) } response := compositeResponse{ @@ -241,7 +244,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { for _, blobHash := range request.RequestedBlobs { exists, err := s.store.Has(blobHash) if err != nil { - return nil, err + return nil, "", err } if exists { availableBlobs = append(availableBlobs, blobHash) @@ -256,39 +259,35 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { } var blob []byte - var trace shared.BlobTrace if request.RequestedBlob != "" { if len(request.RequestedBlob) != stream.BlobHashHexLength { - return nil, errors.Err("Invalid blob hash length") + return nil, "", errors.Err("Invalid blob hash length") } log.Debugln("Sending blob " + request.RequestedBlob[:8]) - blob, trace, err = s.store.Get(request.RequestedBlob) - log.Debug(trace.String()) - if errors.Is(err, store.ErrBlobNotFound) { + size, err := s.disk.Size(request.RequestedBlob) + if err != nil { response.IncomingBlob = incomingBlob{ Error: err.Error(), } - } else if err != nil { - return nil, err } else { response.IncomingBlob = incomingBlob{ - BlobHash: reflector.BlobHash(blob), - Length: len(blob), + BlobHash: request.RequestedBlob, + Length: int(size), } - metrics.MtrOutBytesTcp.Add(float64(len(blob))) - metrics.BlobDownloadCount.Inc() - metrics.PeerDownloadCount.Inc() } + metrics.MtrOutBytesTcp.Add(float64(len(blob))) + metrics.BlobDownloadCount.Inc() + metrics.PeerDownloadCount.Inc() } respData, err := json.Marshal(response) if err != nil { - return nil, err + return nil, "", err } - return append(respData, blob...), nil + return respData, request.RequestedBlob, nil } func (s *Server) logError(e error) { diff --git a/store/disk.go b/store/disk.go index 233123a..3876204 100644 --- a/store/disk.go +++ b/store/disk.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "os" "path" "time" @@ -67,6 +68,14 @@ func (d *DiskStore) Has(hash string) (bool, error) { return true, nil } +func (d *DiskStore) Size(hash string) (int64, error) { + stat, err := os.Stat(d.path(hash)) + if err != nil { + return 0, err + } + return stat.Size(), nil +} + // Get returns the blob or an error if the blob doesn't exist. func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { start := time.Now() @@ -211,3 +220,15 @@ func (d *DiskStore) initOnce() error { // Shutdown shuts down the store gracefully func (d *DiskStore) Shutdown() { } + +func (d *DiskStore) Sendfile(hash string, conn net.Conn) error { + f, err := os.Open(d.path(hash)) + if err != nil { + return err + } + _, err = io.Copy(conn, f) + if err != nil { + return err + } + return nil +} -- 2.45.2