sendfile proof of concept #53
2 changed files with 41 additions and 21 deletions
|
@ -32,15 +32,17 @@ const (
|
||||||
// Server is an instance of a peer server that houses the listener and store.
|
// Server is an instance of a peer server that houses the listener and store.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
store store.BlobStore
|
store store.BlobStore
|
||||||
|
disk *store.DiskStore
|
||||||
closed bool
|
closed bool
|
||||||
|
|
||||||
grp *stop.Group
|
grp *stop.Group
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer returns an initialized Server pointer.
|
// NewServer returns an initialized Server pointer.
|
||||||
func NewServer(store store.BlobStore) *Server {
|
func NewServer(blobstore store.BlobStore) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
store: store,
|
store: blobstore,
|
||||||
|
disk: store.NewDiskStore("/external/blobs", 2),
|
||||||
grp: stop.New(),
|
grp: stop.New(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,7 +133,7 @@ func (s *Server) handleConnection(conn net.Conn) {
|
||||||
log.Error(errors.FullTrace(err))
|
log.Error(errors.FullTrace(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
response, err = s.handleCompositeRequest(request)
|
response, blobHash, err := s.handleCompositeRequest(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(errors.FullTrace(err))
|
log.Error(errors.FullTrace(err))
|
||||||
return
|
return
|
||||||
|
@ -143,6 +145,7 @@ func (s *Server) handleConnection(conn net.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := conn.Write(response)
|
n, err := conn.Write(response)
|
||||||
|
s.disk.Sendfile(blobHash, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset
|
if !strings.Contains(err.Error(), "connection reset by peer") { // means the other side closed the connection using TCP reset
|
||||||
s.logError(err)
|
s.logError(err)
|
||||||
|
@ -221,15 +224,15 @@ func (s *Server) handleAvailabilityRequest(data []byte) ([]byte, error) {
|
||||||
// return append(response, blob...), nil
|
// return append(response, blob...), nil
|
||||||
//}
|
//}
|
||||||
|
|
||||||
func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
func (s *Server) handleCompositeRequest(data []byte) ([]byte, string, error) {
|
||||||
var request compositeRequest
|
var request compositeRequest
|
||||||
err := json.Unmarshal(data, &request)
|
err := json.Unmarshal(data, &request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var je *json.SyntaxError
|
var je *json.SyntaxError
|
||||||
if ee.As(err, &je) {
|
if ee.As(err, &je) {
|
||||||
return nil, errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data))
|
return nil, "", errors.Err("invalid json at offset %d in data %s", je.Offset, hex.EncodeToString(data))
|
||||||
}
|
}
|
||||||
return nil, errors.Err(err)
|
return nil, "", errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
response := compositeResponse{
|
response := compositeResponse{
|
||||||
|
@ -241,7 +244,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
for _, blobHash := range request.RequestedBlobs {
|
for _, blobHash := range request.RequestedBlobs {
|
||||||
exists, err := s.store.Has(blobHash)
|
exists, err := s.store.Has(blobHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
if exists {
|
if exists {
|
||||||
availableBlobs = append(availableBlobs, blobHash)
|
availableBlobs = append(availableBlobs, blobHash)
|
||||||
|
@ -256,39 +259,35 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var blob []byte
|
var blob []byte
|
||||||
var trace shared.BlobTrace
|
|
||||||
if request.RequestedBlob != "" {
|
if request.RequestedBlob != "" {
|
||||||
if len(request.RequestedBlob) != stream.BlobHashHexLength {
|
if len(request.RequestedBlob) != stream.BlobHashHexLength {
|
||||||
return nil, errors.Err("Invalid blob hash length")
|
return nil, "", errors.Err("Invalid blob hash length")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
||||||
|
|
||||||
blob, trace, err = s.store.Get(request.RequestedBlob)
|
size, err := s.disk.Size(request.RequestedBlob)
|
||||||
log.Debug(trace.String())
|
if err != nil {
|
||||||
if errors.Is(err, store.ErrBlobNotFound) {
|
|
||||||
response.IncomingBlob = incomingBlob{
|
response.IncomingBlob = incomingBlob{
|
||||||
Error: err.Error(),
|
Error: err.Error(),
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
|
||||||
return nil, err
|
|
||||||
} else {
|
} else {
|
||||||
response.IncomingBlob = incomingBlob{
|
response.IncomingBlob = incomingBlob{
|
||||||
BlobHash: reflector.BlobHash(blob),
|
BlobHash: request.RequestedBlob,
|
||||||
Length: len(blob),
|
Length: int(size),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
|
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
|
||||||
metrics.BlobDownloadCount.Inc()
|
metrics.BlobDownloadCount.Inc()
|
||||||
metrics.PeerDownloadCount.Inc()
|
metrics.PeerDownloadCount.Inc()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
respData, err := json.Marshal(response)
|
respData, err := json.Marshal(response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
return append(respData, blob...), nil
|
return respData, request.RequestedBlob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) logError(e error) {
|
func (s *Server) logError(e error) {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
@ -67,6 +68,14 @@ func (d *DiskStore) Has(hash string) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *DiskStore) Size(hash string) (int64, error) {
|
||||||
|
stat, err := os.Stat(d.path(hash))
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return stat.Size(), nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get returns the blob or an error if the blob doesn't exist.
|
// Get returns the blob or an error if the blob doesn't exist.
|
||||||
func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
@ -211,3 +220,15 @@ func (d *DiskStore) initOnce() error {
|
||||||
// Shutdown shuts down the store gracefully
|
// Shutdown shuts down the store gracefully
|
||||||
func (d *DiskStore) Shutdown() {
|
func (d *DiskStore) Shutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *DiskStore) Sendfile(hash string, conn net.Conn) error {
|
||||||
|
f, err := os.Open(d.path(hash))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = io.Copy(conn, f)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue