diff --git a/cmd/getstream.go b/cmd/getstream.go index c02ad0b..1c25df5 100644 --- a/cmd/getstream.go +++ b/cmd/getstream.go @@ -41,7 +41,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) { var sd stream.SDBlob - sdb, err := s.Get(sdHash) + sdb, _, err := s.Get(sdHash) if err != nil { log.Fatal(err) } @@ -62,7 +62,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) { } for i := 0; i < len(sd.BlobInfos)-1; i++ { - b, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) + b, _, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) if err != nil { log.Fatal(err) } diff --git a/peer/client.go b/peer/client.go index cdafd1c..9eae088 100644 --- a/peer/client.go +++ b/peer/client.go @@ -9,6 +9,7 @@ import ( "time" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -57,10 +58,11 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str var sd stream.SDBlob - b, err := c.GetBlob(sdHash) + b, trace, err := c.GetBlob(sdHash) if err != nil { return nil, err } + log.Debug(trace.String()) err = sd.FromBlob(b) if err != nil { @@ -71,10 +73,11 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str s[0] = b for i := 0; i < len(sd.BlobInfos)-1; i++ { - s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) + s[i+1], trace, err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) if err != nil { return nil, err } + log.Debug(trace.String()) } return s, nil @@ -114,47 +117,52 @@ func (c *Client) HasBlob(hash string) (bool, error) { } // GetBlob gets a blob -func (c *Client) GetBlob(hash string) (stream.Blob, error) { +func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() if !c.connected { - return nil, errors.Err("not connected") + return nil, shared.NewBlobTrace(time.Since(start), "tcp"), errors.Err("not connected") } sendRequest, err := json.Marshal(blobRequest{ RequestedBlob: hash, }) if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err } err = c.write(sendRequest) if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err } var resp blobResponse err = c.read(&resp) if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err } + trace := shared.NewBlobTrace(time.Since(start), "tcp") + if resp.RequestTrace != nil { + trace = *resp.RequestTrace + } if resp.IncomingBlob.Error != "" { - return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error) + return nil, trace, errors.Prefix(hash[:8], resp.IncomingBlob.Error) } if resp.IncomingBlob.BlobHash != hash { - return nil, errors.Prefix(hash[:8], "blob hash in response does not match requested hash") + return nil, trace.Stack(time.Since(start), "tcp"), errors.Prefix(hash[:8], "blob hash in response does not match requested hash") } if resp.IncomingBlob.Length <= 0 { - return nil, errors.Prefix(hash[:8], "length reported as <= 0") + return nil, trace, errors.Prefix(hash[:8], "length reported as <= 0") } log.Debugf("receiving blob %s from %s", hash[:8], c.conn.RemoteAddr()) blob, err := c.readRawBlob(resp.IncomingBlob.Length) if err != nil { - return nil, err + return nil, (*resp.RequestTrace).Stack(time.Since(start), "tcp"), err } metrics.MtrInBytesTcp.Add(float64(len(blob))) - return blob, nil + return blob, trace.Stack(time.Since(start), "tcp"), nil } func (c *Client) read(v interface{}) error { diff --git a/peer/http3/client.go b/peer/http3/client.go index ad5c741..d9b1495 100644 --- a/peer/http3/client.go +++ b/peer/http3/client.go @@ -9,12 +9,14 @@ import ( "sync" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/stream" "github.com/lucas-clemente/quic-go/http3" + log "github.com/sirupsen/logrus" ) // Client is an instance of a client connected to a server. @@ -35,7 +37,7 @@ func (c *Client) Close() error { func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) { var sd stream.SDBlob - b, err := c.GetBlob(sdHash) + b, _, err := c.GetBlob(sdHash) if err != nil { return nil, err } @@ -49,10 +51,12 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str s[0] = b for i := 0; i < len(sd.BlobInfos)-1; i++ { - s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) + var trace shared.BlobTrace + s[i+1], trace, err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash)) if err != nil { return nil, err } + log.Debug(trace.String()) } return s, nil @@ -75,26 +79,35 @@ func (c *Client) HasBlob(hash string) (bool, error) { } // GetBlob gets a blob -func (c *Client) GetBlob(hash string) (stream.Blob, error) { - resp, err := c.conn.Get(fmt.Sprintf("https://%s/get/%s", c.ServerAddr, hash)) +func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() + resp, err := c.conn.Get(fmt.Sprintf("https://%s/get/%s?trace=true", c.ServerAddr, hash)) if err != nil { - return nil, errors.Err(err) + return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(err) } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound { fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode) - return nil, errors.Err(store.ErrBlobNotFound) + return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(store.ErrBlobNotFound) } else if resp.StatusCode != http.StatusOK { - return nil, errors.Err("non 200 status code returned: %d", resp.StatusCode) + return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err("non 200 status code returned: %d", resp.StatusCode) } tmp := getBuffer() defer putBuffer(tmp) - + serialized := resp.Header.Get("Via") + trace := shared.NewBlobTrace(time.Since(start), "http3") + if serialized != "" { + parsedTrace, err := shared.Deserialize(serialized) + if err != nil { + return nil, shared.NewBlobTrace(time.Since(start), "http3"), err + } + trace = *parsedTrace + } written, err := io.Copy(tmp, resp.Body) if err != nil { - return nil, errors.Err(err) + return nil, trace.Stack(time.Since(start), "http3"), errors.Err(err) } blob := make([]byte, written) @@ -102,7 +115,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { metrics.MtrInBytesUdp.Add(float64(len(blob))) - return blob, nil + return blob, trace.Stack(time.Since(start), "http3"), nil } // buffer pool to reduce GC diff --git a/peer/http3/server.go b/peer/http3/server.go index aaadac0..c33a676 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -10,6 +10,7 @@ import ( "fmt" "math/big" "net/http" + "strconv" "time" "github.com/lbryio/reflector.go/internal/metrics" @@ -71,7 +72,26 @@ func (s *Server) Start(address string) error { r.HandleFunc("/get/{hash}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) requestedBlob := vars["hash"] - blob, err := s.store.Get(requestedBlob) + traceParam := r.URL.Query().Get("trace") + var err error + wantsTrace := false + if traceParam != "" { + wantsTrace, err = strconv.ParseBool(traceParam) + if err != nil { + wantsTrace = false + } + } + blob, trace, err := s.store.Get(requestedBlob) + + if wantsTrace { + serialized, err := trace.Serialize() + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + w.Header().Add("Via", serialized) + log.Debug(trace.String()) + } if err != nil { if errors.Is(err, store.ErrBlobNotFound) { http.Error(w, err.Error(), http.StatusNotFound) diff --git a/peer/http3/store.go b/peer/http3/store.go index b681443..2d9a23c 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -8,6 +8,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" "github.com/lucas-clemente/quic-go" "github.com/lucas-clemente/quic-go/http3" ) @@ -68,10 +69,11 @@ func (p *Store) Has(hash string) (bool, error) { } // Get downloads the blob from the peer -func (p *Store) Get(hash string) (stream.Blob, error) { +func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() c, err := p.getClient() if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err } defer c.Close() return c.GetBlob(hash) diff --git a/peer/server.go b/peer/server.go index 44916cb..708fe53 100644 --- a/peer/server.go +++ b/peer/server.go @@ -12,6 +12,7 @@ import ( "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/reflector" + "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -253,6 +254,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { } var blob []byte + var trace shared.BlobTrace if request.RequestedBlob != "" { if len(request.RequestedBlob) != stream.BlobHashHexLength { return nil, errors.Err("Invalid blob hash length") @@ -260,7 +262,8 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { log.Debugln("Sending blob " + request.RequestedBlob[:8]) - blob, err = s.store.Get(request.RequestedBlob) + blob, trace, err = s.store.Get(request.RequestedBlob) + log.Debug(trace.String()) if errors.Is(err, store.ErrBlobNotFound) { response.IncomingBlob = incomingBlob{ Error: err.Error(), @@ -382,6 +385,7 @@ type incomingBlob struct { } type blobResponse struct { IncomingBlob incomingBlob `json:"incoming_blob"` + RequestTrace *shared.BlobTrace } type compositeRequest struct { diff --git a/peer/store.go b/peer/store.go index 3ef9622..c227884 100644 --- a/peer/store.go +++ b/peer/store.go @@ -5,6 +5,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" ) // Store is a blob store that gets blobs from a peer. @@ -43,10 +44,11 @@ func (p *Store) Has(hash string) (bool, error) { } // Get downloads the blob from the peer -func (p *Store) Get(hash string) (stream.Blob, error) { +func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() c, err := p.getClient() if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err } defer c.Close() return c.GetBlob(hash) diff --git a/shared/shared.go b/shared/shared.go new file mode 100644 index 0000000..c232cf6 --- /dev/null +++ b/shared/shared.go @@ -0,0 +1,82 @@ +package shared + +import ( + "encoding/json" + "fmt" + "os" + "time" + + "github.com/lbryio/lbry.go/v2/extras/errors" +) + +type BlobStack struct { + Timing time.Duration `json:"timing"` + OriginName string `json:"origin_name"` + HostName string `json:"host_name"` +} +type BlobTrace struct { + Stacks []BlobStack `json:"stacks"` +} + +var hostName *string + +func getHostName() string { + if hostName == nil { + hn, err := os.Hostname() + if err != nil { + hn = "unknown" + } + hostName = &hn + } + return *hostName +} +func (b *BlobTrace) Stack(timing time.Duration, originName string) BlobTrace { + b.Stacks = append(b.Stacks, BlobStack{ + Timing: timing, + OriginName: originName, + HostName: getHostName(), + }) + return *b +} +func (b *BlobTrace) Merge(otherTrance BlobTrace) BlobTrace { + b.Stacks = append(b.Stacks, otherTrance.Stacks...) + return *b +} +func NewBlobTrace(timing time.Duration, originName string) BlobTrace { + b := BlobTrace{} + b.Stacks = append(b.Stacks, BlobStack{ + Timing: timing, + OriginName: originName, + HostName: getHostName(), + }) + return b +} + +func (b BlobTrace) String() string { + var fullTrace string + for i, stack := range b.Stacks { + delta := time.Duration(0) + if i > 0 { + delta = stack.Timing - b.Stacks[i-1].Timing + } + fullTrace += fmt.Sprintf("[%d](%s) origin: %s - timing: %s - delta: %s\n", i, stack.HostName, stack.OriginName, stack.Timing.String(), delta.String()) + } + return fullTrace +} + +func (b BlobTrace) Serialize() (string, error) { + t, err := json.Marshal(b) + if err != nil { + return "", errors.Err(err) + } + return string(t), nil +} + +func Deserialize(serializedData string) (*BlobTrace, error) { + var trace BlobTrace + err := json.Unmarshal([]byte(serializedData), &trace) + if err != nil { + return nil, errors.Err(err) + } + return &trace, nil +} diff --git a/shared/shared_test.go b/shared/shared_test.go new file mode 100644 index 0000000..47c8eef --- /dev/null +++ b/shared/shared_test.go @@ -0,0 +1,32 @@ +package shared + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestBlobTrace_Serialize(t *testing.T) { + stack := NewBlobTrace(10*time.Second, "test") + stack.Stack(20*time.Second, "test2") + stack.Stack(30*time.Second, "test3") + serialized, err := stack.Serialize() + assert.NoError(t, err) + t.Log(serialized) + expected := "{\"stacks\":[{\"timing\":10000000000,\"origin_name\":\"test\"},{\"timing\":20000000000,\"origin_name\":\"test2\"},{\"timing\":30000000000,\"origin_name\":\"test3\"}]}" + assert.Equal(t, expected, serialized) +} + +func TestBlobTrace_Deserialize(t *testing.T) { + serialized := "{\"stacks\":[{\"timing\":10000000000,\"origin_name\":\"test\"},{\"timing\":20000000000,\"origin_name\":\"test2\"},{\"timing\":30000000000,\"origin_name\":\"test3\"}]}" + stack, err := Deserialize(serialized) + assert.NoError(t, err) + assert.Len(t, stack.Stacks, 3) + assert.Equal(t, stack.Stacks[0].Timing, 10*time.Second) + assert.Equal(t, stack.Stacks[1].Timing, 20*time.Second) + assert.Equal(t, stack.Stacks[2].Timing, 30*time.Second) + assert.Equal(t, stack.Stacks[0].OriginName, "test") + assert.Equal(t, stack.Stacks[1].OriginName, "test2") + assert.Equal(t, stack.Stacks[2].OriginName, "test3") +} diff --git a/store/caching.go b/store/caching.go index 53b692c..6f1d4d4 100644 --- a/store/caching.go +++ b/store/caching.go @@ -5,6 +5,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" log "github.com/sirupsen/logrus" "github.com/lbryio/reflector.go/internal/metrics" @@ -43,9 +44,9 @@ func (c *CachingStore) Has(hash string) (bool, error) { // Get tries to get the blob from the cache first, falling back to the origin. If the blob comes // from the origin, it is also stored in the cache. -func (c *CachingStore) Get(hash string) (stream.Blob, error) { +func (c *CachingStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { start := time.Now() - blob, err := c.cache.Get(hash) + blob, trace, err := c.cache.Get(hash) if err == nil || !errors.Is(err, ErrBlobNotFound) { metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() @@ -54,14 +55,14 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) { metrics.LabelComponent: c.component, metrics.LabelSource: "cache", }).Set(rate) - return blob, err + return blob, trace.Stack(time.Since(start), c.Name()), err } metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc() - blob, err = c.origin.Get(hash) + blob, trace, err = c.origin.Get(hash) if err != nil { - return nil, err + return nil, trace.Stack(time.Since(start), c.Name()), err } // there is no need to wait for the blob to be stored before we return it // TODO: however this should be refactored to limit the amount of routines that the process can spawn to avoid a possible DoS @@ -71,7 +72,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) { log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err)) } }() - return blob, nil + return blob, trace.Stack(time.Since(start), c.Name()), nil } // Put stores the blob in the origin and the cache diff --git a/store/caching_test.go b/store/caching_test.go index 0636583..66a42fd 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" ) func TestCachingStore_Put(t *testing.T) { @@ -51,7 +52,7 @@ func TestCachingStore_CacheMiss(t *testing.T) { t.Fatal(err) } - res, err := s.Get(hash) + res, stack, err := s.Get(hash) if err != nil { t.Fatal(err) } @@ -67,14 +68,16 @@ func TestCachingStore_CacheMiss(t *testing.T) { if !has { t.Errorf("Get() did not copy blob to cache") } + t.Logf("stack: %s", stack.String()) - res, err = cache.Get(hash) + res, stack, err = cache.Get(hash) if err != nil { t.Fatal(err) } if !bytes.Equal(b, res) { t.Errorf("expected cached Get() to return %s, got %s", string(b), string(res)) } + t.Logf("stack: %s", stack.String()) } func TestCachingStore_ThunderingHerd(t *testing.T) { @@ -93,7 +96,7 @@ func TestCachingStore_ThunderingHerd(t *testing.T) { wg := &sync.WaitGroup{} getNoErr := func() { - res, err := s.Get(hash) + res, _, err := s.Get(hash) if err != nil { t.Fatal(err) } @@ -149,7 +152,7 @@ func (s *SlowBlobStore) Has(hash string) (bool, error) { return s.mem.Has(hash) } -func (s *SlowBlobStore) Get(hash string) (stream.Blob, error) { +func (s *SlowBlobStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { time.Sleep(s.delay) return s.mem.Get(hash) } diff --git a/store/cloudfront_ro.go b/store/cloudfront_ro.go index 3dcffc0..51fe4b8 100644 --- a/store/cloudfront_ro.go +++ b/store/cloudfront_ro.go @@ -6,11 +6,11 @@ import ( "net/http" "time" - "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/meta" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/meta" + "github.com/lbryio/reflector.go/shared" log "github.com/sirupsen/logrus" ) @@ -49,30 +49,30 @@ func (c *CloudFrontROStore) Has(hash string) (bool, error) { } // Get gets the blob from Cloudfront. -func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) { +func (c *CloudFrontROStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { log.Debugf("Getting %s from S3", hash[:8]) + start := time.Now() defer func(t time.Time) { log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) - }(time.Now()) + }(start) status, body, err := c.cfRequest(http.MethodGet, hash) if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), c.Name()), err } defer body.Close() - switch status { case http.StatusNotFound, http.StatusForbidden: - return nil, errors.Err(ErrBlobNotFound) + return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(ErrBlobNotFound) case http.StatusOK: b, err := ioutil.ReadAll(body) if err != nil { - return nil, errors.Err(err) + return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err) } metrics.MtrInBytesS3.Add(float64(len(b))) - return b, nil + return b, shared.NewBlobTrace(time.Since(start), c.Name()), nil default: - return nil, errors.Err("unexpected status %d", status) + return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err("unexpected status %d", status) } } diff --git a/store/cloudfront_rw.go b/store/cloudfront_rw.go index 6de64af..32ee418 100644 --- a/store/cloudfront_rw.go +++ b/store/cloudfront_rw.go @@ -1,7 +1,10 @@ package store import ( + "time" + "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" ) // CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3. @@ -30,8 +33,10 @@ func (c *CloudFrontRWStore) Has(hash string) (bool, error) { } // Get gets the blob from Cloudfront. -func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) { - return c.cf.Get(hash) +func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() + blob, trace, err := c.cf.Get(hash) + return blob, trace.Stack(time.Since(start), c.Name()), err } // Put stores the blob on S3 diff --git a/store/dbbacked.go b/store/dbbacked.go index 1484c49..90c9fed 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -3,10 +3,12 @@ package store import ( "encoding/json" "sync" + "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/shared" log "github.com/sirupsen/logrus" ) @@ -36,16 +38,17 @@ func (d *DBBackedStore) Has(hash string) (bool, error) { } // Get gets the blob -func (d *DBBackedStore) Get(hash string) (stream.Blob, error) { +func (d *DBBackedStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() has, err := d.db.HasBlob(hash, true) if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err } if !has { - return nil, ErrBlobNotFound + return nil, shared.NewBlobTrace(time.Since(start), d.Name()), ErrBlobNotFound } - b, err := d.blobs.Get(hash) + b, stack, err := d.blobs.Get(hash) if d.deleteOnMiss && errors.Is(err, ErrBlobNotFound) { e2 := d.Delete(hash) if e2 != nil { @@ -53,7 +56,7 @@ func (d *DBBackedStore) Get(hash string) (stream.Blob, error) { } } - return b, err + return b, stack.Stack(time.Since(start), d.Name()), err } // Put stores the blob in the S3 store and stores the blob information in the DB. diff --git a/store/disk.go b/store/disk.go index 03cc393..412d326 100644 --- a/store/disk.go +++ b/store/disk.go @@ -4,9 +4,11 @@ import ( "io/ioutil" "os" "path" + "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" "github.com/lbryio/reflector.go/store/speedwalk" ) @@ -52,21 +54,22 @@ func (d *DiskStore) Has(hash string) (bool, error) { } // Get returns the blob or an error if the blob doesn't exist. -func (d *DiskStore) Get(hash string) (stream.Blob, error) { +func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() err := d.initOnce() if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err } blob, err := ioutil.ReadFile(d.path(hash)) if err != nil { if os.IsNotExist(err) { - return nil, errors.Err(ErrBlobNotFound) + return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(ErrBlobNotFound) } - return nil, errors.Err(err) + return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err) } - return blob, nil + return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil } // Put stores the blob on disk diff --git a/store/disk_test.go b/store/disk_test.go index 3bc088a..72c7755 100644 --- a/store/disk_test.go +++ b/store/disk_test.go @@ -28,7 +28,7 @@ func TestDiskStore_Get(t *testing.T) { err = ioutil.WriteFile(expectedPath, data, os.ModePerm) require.NoError(t, err) - blob, err := d.Get(hash) + blob, _, err := d.Get(hash) assert.NoError(t, err) assert.EqualValues(t, data, blob) } @@ -39,7 +39,7 @@ func TestDiskStore_GetNonexistentBlob(t *testing.T) { defer os.RemoveAll(tmpDir) d := NewDiskStore(tmpDir, 2) - blob, err := d.Get("nonexistent") + blob, _, err := d.Get("nonexistent") assert.Nil(t, blob) assert.True(t, errors.Is(err, ErrBlobNotFound)) } diff --git a/store/lfuda.go b/store/lfuda.go index 0eb7296..b0437b0 100644 --- a/store/lfuda.go +++ b/store/lfuda.go @@ -1,10 +1,13 @@ package store import ( + "time" + "github.com/bparli/lfuda-go" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" "github.com/sirupsen/logrus" ) @@ -49,17 +52,18 @@ func (l *LFUDAStore) Has(hash string) (bool, error) { } // Get returns the blob or an error if the blob doesn't exist. -func (l *LFUDAStore) Get(hash string) (stream.Blob, error) { +func (l *LFUDAStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() _, has := l.lfuda.Get(hash) if !has { - return nil, errors.Err(ErrBlobNotFound) + return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound) } - blob, err := l.store.Get(hash) + blob, stack, err := l.store.Get(hash) if errors.Is(err, ErrBlobNotFound) { // Blob disappeared from underlying store l.lfuda.Remove(hash) } - return blob, err + return blob, stack.Stack(time.Since(start), l.Name()), err } // Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!! diff --git a/store/lfuda_test.go b/store/lfuda_test.go index d5eae47..d8657ac 100644 --- a/store/lfuda_test.go +++ b/store/lfuda_test.go @@ -40,11 +40,11 @@ func TestFUDAStore_Eviction(t *testing.T) { err = lfuda.Put("two", b) require.NoError(t, err) - _, err = lfuda.Get("five") + _, _, err = lfuda.Get("five") require.NoError(t, err) - _, err = lfuda.Get("four") + _, _, err = lfuda.Get("four") require.NoError(t, err) - _, err = lfuda.Get("two") + _, _, err = lfuda.Get("two") require.NoError(t, err) assert.Equal(t, cacheMaxBlobs, len(mem.Debug())) @@ -102,7 +102,7 @@ func TestFUDAStore_UnderlyingBlobMissing(t *testing.T) { // hash still exists in lru assert.True(t, lfuda.lfuda.Contains(hash)) - blob, err := lfuda.Get(hash) + blob, _, err := lfuda.Get(hash) assert.Nil(t, blob) assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), diff --git a/store/lru.go b/store/lru.go index ce0d83d..e96931b 100644 --- a/store/lru.go +++ b/store/lru.go @@ -1,9 +1,12 @@ package store import ( + "time" + "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" golru "github.com/hashicorp/golang-lru" "github.com/sirupsen/logrus" @@ -56,17 +59,18 @@ func (l *LRUStore) Has(hash string) (bool, error) { } // Get returns the blob or an error if the blob doesn't exist. -func (l *LRUStore) Get(hash string) (stream.Blob, error) { +func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() _, has := l.lru.Get(hash) if !has { - return nil, errors.Err(ErrBlobNotFound) + return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound) } - blob, err := l.store.Get(hash) + blob, stack, err := l.store.Get(hash) if errors.Is(err, ErrBlobNotFound) { // Blob disappeared from underlying store l.lru.Remove(hash) } - return blob, err + return blob, stack.Stack(time.Since(start), l.Name()), err } // Put stores the blob diff --git a/store/lru_test.go b/store/lru_test.go index 2bd934c..6aec768 100644 --- a/store/lru_test.go +++ b/store/lru_test.go @@ -89,7 +89,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) { // hash still exists in lru assert.True(t, lru.lru.Contains(hash)) - blob, err := lru.Get(hash) + blob, _, err := lru.Get(hash) assert.Nil(t, blob) assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s", reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(), diff --git a/store/memory.go b/store/memory.go index 62b93b6..30d8dea 100644 --- a/store/memory.go +++ b/store/memory.go @@ -2,9 +2,11 @@ package store import ( "sync" + "time" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" ) // MemStore is an in memory only blob store with no persistence. @@ -34,14 +36,15 @@ func (m *MemStore) Has(hash string) (bool, error) { } // Get returns the blob byte slice if present and errors if the blob is not found. -func (m *MemStore) Get(hash string) (stream.Blob, error) { +func (m *MemStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() m.mu.RLock() defer m.mu.RUnlock() blob, ok := m.blobs[hash] if !ok { - return nil, errors.Err(ErrBlobNotFound) + return nil, shared.NewBlobTrace(time.Since(start), m.Name()), errors.Err(ErrBlobNotFound) } - return blob, nil + return blob, shared.NewBlobTrace(time.Since(start), m.Name()), nil } // Put stores the blob in memory diff --git a/store/memory_test.go b/store/memory_test.go index 8d85114..775850b 100644 --- a/store/memory_test.go +++ b/store/memory_test.go @@ -25,7 +25,7 @@ func TestMemStore_Get(t *testing.T) { t.Error("error getting memory blob - ", err) } - gotBlob, err := s.Get(hash) + gotBlob, _, err := s.Get(hash) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -33,7 +33,7 @@ func TestMemStore_Get(t *testing.T) { t.Error("Got blob that is different from expected blob") } - missingBlob, err := s.Get("nonexistent hash") + missingBlob, _, err := s.Get("nonexistent hash") if err == nil { t.Errorf("Expected ErrBlobNotFound, got nil") } diff --git a/store/noop.go b/store/noop.go index 2792a0d..9cadf5c 100644 --- a/store/noop.go +++ b/store/noop.go @@ -1,15 +1,22 @@ package store -import "github.com/lbryio/lbry.go/v2/stream" +import ( + "time" + + "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" +) // NoopStore is a store that does nothing type NoopStore struct{} const nameNoop = "noop" -func (n *NoopStore) Name() string { return nameNoop } -func (n *NoopStore) Has(_ string) (bool, error) { return false, nil } -func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil } +func (n *NoopStore) Name() string { return nameNoop } +func (n *NoopStore) Has(_ string) (bool, error) { return false, nil } +func (n *NoopStore) Get(_ string) (stream.Blob, shared.BlobTrace, error) { + return nil, shared.NewBlobTrace(time.Since(time.Now()), n.Name()), nil +} func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil } func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil } func (n *NoopStore) Delete(_ string) error { return nil } diff --git a/store/s3.go b/store/s3.go index 02a23c5..75d66cc 100644 --- a/store/s3.go +++ b/store/s3.go @@ -8,6 +8,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -65,17 +66,18 @@ func (s *S3Store) Has(hash string) (bool, error) { } // Get returns the blob slice if present or errors on S3. -func (s *S3Store) Get(hash string) (stream.Blob, error) { +func (s *S3Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() //Todo-Need to handle error for blob doesn't exist for consistency. err := s.initOnce() if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err } log.Debugf("Getting %s from S3", hash[:8]) defer func(t time.Time) { log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String()) - }(time.Now()) + }(start) buf := &aws.WriteAtBuffer{} _, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{ @@ -86,15 +88,15 @@ func (s *S3Store) Get(hash string) (stream.Blob, error) { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { case s3.ErrCodeNoSuchBucket: - return nil, errors.Err("bucket %s does not exist", s.bucket) + return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err("bucket %s does not exist", s.bucket) case s3.ErrCodeNoSuchKey: - return nil, errors.Err(ErrBlobNotFound) + return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err(ErrBlobNotFound) } } - return buf.Bytes(), err + return buf.Bytes(), shared.NewBlobTrace(time.Since(start), s.Name()), err } - return buf.Bytes(), nil + return buf.Bytes(), shared.NewBlobTrace(time.Since(start), s.Name()), nil } // Put stores the blob on S3 or errors if S3 connection errors. diff --git a/store/singleflight.go b/store/singleflight.go index 7d0139d..362182d 100644 --- a/store/singleflight.go +++ b/store/singleflight.go @@ -4,6 +4,7 @@ import ( "time" "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/shared" "github.com/lbryio/lbry.go/v2/stream" @@ -29,17 +30,24 @@ func (s *singleflightStore) Name() string { return "sf_" + s.BlobStore.Name() } +type getterResponse struct { + blob stream.Blob + stack shared.BlobTrace +} + // Get ensures that only one request per hash is sent to the origin at a time, // thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem -func (s *singleflightStore) Get(hash string) (stream.Blob, error) { +func (s *singleflightStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { + start := time.Now() metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc() defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec() - blob, err, _ := s.sf.Do(hash, s.getter(hash)) + gr, err, _ := s.sf.Do(hash, s.getter(hash)) if err != nil { - return nil, err + return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err } - return blob.(stream.Blob), nil + rsp := gr.(getterResponse) + return rsp.blob, rsp.stack, nil } // getter returns a function that gets a blob from the origin @@ -50,9 +58,12 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) { defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec() start := time.Now() - blob, err := s.BlobStore.Get(hash) + blob, stack, err := s.BlobStore.Get(hash) if err != nil { - return nil, err + return getterResponse{ + blob: nil, + stack: stack.Stack(time.Since(start), s.Name()), + }, err } rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds() @@ -62,7 +73,10 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) { metrics.LabelSource: "origin", }).Set(rate) - return blob, nil + return getterResponse{ + blob: blob, + stack: stack.Stack(time.Since(start), s.Name()), + }, nil } } diff --git a/store/store.go b/store/store.go index 36d388d..bd9223b 100644 --- a/store/store.go +++ b/store/store.go @@ -3,6 +3,7 @@ package store import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/shared" ) // BlobStore is an interface for handling blob storage. @@ -12,7 +13,7 @@ type BlobStore interface { // Does blob exist in the store. Has(hash string) (bool, error) // Get the blob from the store. Must return ErrBlobNotFound if blob is not in store. - Get(hash string) (stream.Blob, error) + Get(hash string) (stream.Blob, shared.BlobTrace, error) // Put the blob into the store. Put(hash string, blob stream.Blob) error // Put an SD blob into the store.