when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50

Closed
shyba wants to merge 39 commits from insert_under_tx into master
26 changed files with 317 additions and 104 deletions
Showing only changes of commit 6291e33ee1 - Show all commits

View file

@ -41,7 +41,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
var sd stream.SDBlob
sdb, err := s.Get(sdHash)
sdb, _, err := s.Get(sdHash)
if err != nil {
log.Fatal(err)
}
@ -62,7 +62,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
}
for i := 0; i < len(sd.BlobInfos)-1; i++ {
b, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
b, _, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
log.Fatal(err)
}

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
@ -57,10 +58,11 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
var sd stream.SDBlob
b, err := c.GetBlob(sdHash)
b, trace, err := c.GetBlob(sdHash)
if err != nil {
return nil, err
}
log.Debug(trace.String())
err = sd.FromBlob(b)
if err != nil {
@ -71,10 +73,11 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
s[0] = b
for i := 0; i < len(sd.BlobInfos)-1; i++ {
s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
s[i+1], trace, err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
return nil, err
}
log.Debug(trace.String())
}
return s, nil
@ -114,47 +117,52 @@ func (c *Client) HasBlob(hash string) (bool, error) {
}
// GetBlob gets a blob
func (c *Client) GetBlob(hash string) (stream.Blob, error) {
func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
if !c.connected {
return nil, errors.Err("not connected")
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), errors.Err("not connected")
}
sendRequest, err := json.Marshal(blobRequest{
RequestedBlob: hash,
})
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
}
err = c.write(sendRequest)
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
}
var resp blobResponse
err = c.read(&resp)
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
}
trace := shared.NewBlobTrace(time.Since(start), "tcp")
if resp.RequestTrace != nil {
trace = *resp.RequestTrace
}
if resp.IncomingBlob.Error != "" {
return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
return nil, trace, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
}
if resp.IncomingBlob.BlobHash != hash {
return nil, errors.Prefix(hash[:8], "blob hash in response does not match requested hash")
return nil, trace.Stack(time.Since(start), "tcp"), errors.Prefix(hash[:8], "blob hash in response does not match requested hash")
}
if resp.IncomingBlob.Length <= 0 {
return nil, errors.Prefix(hash[:8], "length reported as <= 0")
return nil, trace, errors.Prefix(hash[:8], "length reported as <= 0")
}
log.Debugf("receiving blob %s from %s", hash[:8], c.conn.RemoteAddr())
blob, err := c.readRawBlob(resp.IncomingBlob.Length)
if err != nil {
return nil, err
return nil, (*resp.RequestTrace).Stack(time.Since(start), "tcp"), err
}
metrics.MtrInBytesTcp.Add(float64(len(blob)))
return blob, nil
return blob, trace.Stack(time.Since(start), "tcp"), nil
}
func (c *Client) read(v interface{}) error {

View file

@ -9,12 +9,14 @@ import (
"sync"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lucas-clemente/quic-go/http3"
log "github.com/sirupsen/logrus"
)
// Client is an instance of a client connected to a server.
@ -35,7 +37,7 @@ func (c *Client) Close() error {
func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) {
var sd stream.SDBlob
b, err := c.GetBlob(sdHash)
b, _, err := c.GetBlob(sdHash)
if err != nil {
return nil, err
}
@ -49,10 +51,12 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
s[0] = b
for i := 0; i < len(sd.BlobInfos)-1; i++ {
s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
var trace shared.BlobTrace
s[i+1], trace, err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
if err != nil {
return nil, err
}
log.Debug(trace.String())
}
return s, nil
@ -75,26 +79,35 @@ func (c *Client) HasBlob(hash string) (bool, error) {
}
// GetBlob gets a blob
func (c *Client) GetBlob(hash string) (stream.Blob, error) {
resp, err := c.conn.Get(fmt.Sprintf("https://%s/get/%s", c.ServerAddr, hash))
func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
resp, err := c.conn.Get(fmt.Sprintf("https://%s/get/%s?trace=true", c.ServerAddr, hash))
if err != nil {
return nil, errors.Err(err)
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode)
return nil, errors.Err(store.ErrBlobNotFound)
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(store.ErrBlobNotFound)
} else if resp.StatusCode != http.StatusOK {
return nil, errors.Err("non 200 status code returned: %d", resp.StatusCode)
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err("non 200 status code returned: %d", resp.StatusCode)
}
tmp := getBuffer()
defer putBuffer(tmp)
serialized := resp.Header.Get("Via")
trace := shared.NewBlobTrace(time.Since(start), "http3")
if serialized != "" {
parsedTrace, err := shared.Deserialize(serialized)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), "http3"), err
}
trace = *parsedTrace
}
written, err := io.Copy(tmp, resp.Body)
if err != nil {
return nil, errors.Err(err)
return nil, trace.Stack(time.Since(start), "http3"), errors.Err(err)
}
blob := make([]byte, written)
@ -102,7 +115,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
metrics.MtrInBytesUdp.Add(float64(len(blob)))
return blob, nil
return blob, trace.Stack(time.Since(start), "http3"), nil
}
// buffer pool to reduce GC

View file

@ -10,6 +10,7 @@ import (
"fmt"
"math/big"
"net/http"
"strconv"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
@ -71,7 +72,26 @@ func (s *Server) Start(address string) error {
r.HandleFunc("/get/{hash}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
requestedBlob := vars["hash"]
blob, err := s.store.Get(requestedBlob)
traceParam := r.URL.Query().Get("trace")
var err error
wantsTrace := false
if traceParam != "" {
wantsTrace, err = strconv.ParseBool(traceParam)
if err != nil {
wantsTrace = false
}
}
blob, trace, err := s.store.Get(requestedBlob)
if wantsTrace {
serialized, err := trace.Serialize()
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.Header().Add("Via", serialized)
log.Debug(trace.String())
}
if err != nil {
if errors.Is(err, store.ErrBlobNotFound) {
http.Error(w, err.Error(), http.StatusNotFound)

View file

@ -8,6 +8,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
"github.com/lucas-clemente/quic-go"
"github.com/lucas-clemente/quic-go/http3"
)
@ -68,10 +69,11 @@ func (p *Store) Has(hash string) (bool, error) {
}
// Get downloads the blob from the peer
func (p *Store) Get(hash string) (stream.Blob, error) {
func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
c, err := p.getClient()
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
}
defer c.Close()
return c.GetBlob(hash)

View file

@ -12,6 +12,7 @@ import (
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
@ -253,6 +254,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
}
var blob []byte
var trace shared.BlobTrace
if request.RequestedBlob != "" {
if len(request.RequestedBlob) != stream.BlobHashHexLength {
return nil, errors.Err("Invalid blob hash length")
@ -260,7 +262,8 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
log.Debugln("Sending blob " + request.RequestedBlob[:8])
blob, err = s.store.Get(request.RequestedBlob)
blob, trace, err = s.store.Get(request.RequestedBlob)
log.Debug(trace.String())
if errors.Is(err, store.ErrBlobNotFound) {
response.IncomingBlob = incomingBlob{
Error: err.Error(),
@ -382,6 +385,7 @@ type incomingBlob struct {
}
type blobResponse struct {
IncomingBlob incomingBlob `json:"incoming_blob"`
RequestTrace *shared.BlobTrace
}
type compositeRequest struct {

View file

@ -5,6 +5,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
)
// Store is a blob store that gets blobs from a peer.
@ -43,10 +44,11 @@ func (p *Store) Has(hash string) (bool, error) {
}
// Get downloads the blob from the peer
func (p *Store) Get(hash string) (stream.Blob, error) {
func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
c, err := p.getClient()
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
}
defer c.Close()
return c.GetBlob(hash)

82
shared/shared.go Normal file
View file

@ -0,0 +1,82 @@
package shared
import (
"encoding/json"
"fmt"
"os"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
)
type BlobStack struct {
Timing time.Duration `json:"timing"`
OriginName string `json:"origin_name"`
HostName string `json:"host_name"`
}
type BlobTrace struct {
Stacks []BlobStack `json:"stacks"`
}
var hostName *string
func getHostName() string {
if hostName == nil {
hn, err := os.Hostname()
if err != nil {
hn = "unknown"
}
hostName = &hn
}
return *hostName
}
func (b *BlobTrace) Stack(timing time.Duration, originName string) BlobTrace {
b.Stacks = append(b.Stacks, BlobStack{
Timing: timing,
OriginName: originName,
HostName: getHostName(),
})
return *b
}
func (b *BlobTrace) Merge(otherTrance BlobTrace) BlobTrace {
b.Stacks = append(b.Stacks, otherTrance.Stacks...)
return *b
}
func NewBlobTrace(timing time.Duration, originName string) BlobTrace {
b := BlobTrace{}
b.Stacks = append(b.Stacks, BlobStack{
Timing: timing,
OriginName: originName,
HostName: getHostName(),
})
return b
}
func (b BlobTrace) String() string {
var fullTrace string
for i, stack := range b.Stacks {
delta := time.Duration(0)
if i > 0 {
delta = stack.Timing - b.Stacks[i-1].Timing
}
fullTrace += fmt.Sprintf("[%d](%s) origin: %s - timing: %s - delta: %s\n", i, stack.HostName, stack.OriginName, stack.Timing.String(), delta.String())
}
return fullTrace
}
func (b BlobTrace) Serialize() (string, error) {
t, err := json.Marshal(b)
if err != nil {
return "", errors.Err(err)
}
return string(t), nil
}
func Deserialize(serializedData string) (*BlobTrace, error) {
var trace BlobTrace
err := json.Unmarshal([]byte(serializedData), &trace)
if err != nil {
return nil, errors.Err(err)
}
return &trace, nil
}

32
shared/shared_test.go Normal file
View file

@ -0,0 +1,32 @@
package shared
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestBlobTrace_Serialize(t *testing.T) {
stack := NewBlobTrace(10*time.Second, "test")
stack.Stack(20*time.Second, "test2")
stack.Stack(30*time.Second, "test3")
serialized, err := stack.Serialize()
assert.NoError(t, err)
t.Log(serialized)
expected := "{\"stacks\":[{\"timing\":10000000000,\"origin_name\":\"test\"},{\"timing\":20000000000,\"origin_name\":\"test2\"},{\"timing\":30000000000,\"origin_name\":\"test3\"}]}"
assert.Equal(t, expected, serialized)
}
func TestBlobTrace_Deserialize(t *testing.T) {
serialized := "{\"stacks\":[{\"timing\":10000000000,\"origin_name\":\"test\"},{\"timing\":20000000000,\"origin_name\":\"test2\"},{\"timing\":30000000000,\"origin_name\":\"test3\"}]}"
stack, err := Deserialize(serialized)
assert.NoError(t, err)
assert.Len(t, stack.Stacks, 3)
assert.Equal(t, stack.Stacks[0].Timing, 10*time.Second)
assert.Equal(t, stack.Stacks[1].Timing, 20*time.Second)
assert.Equal(t, stack.Stacks[2].Timing, 30*time.Second)
assert.Equal(t, stack.Stacks[0].OriginName, "test")
assert.Equal(t, stack.Stacks[1].OriginName, "test2")
assert.Equal(t, stack.Stacks[2].OriginName, "test3")
}

View file

@ -5,6 +5,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
log "github.com/sirupsen/logrus"
"github.com/lbryio/reflector.go/internal/metrics"
@ -43,9 +44,9 @@ func (c *CachingStore) Has(hash string) (bool, error) {
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
// from the origin, it is also stored in the cache.
func (c *CachingStore) Get(hash string) (stream.Blob, error) {
func (c *CachingStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
blob, err := c.cache.Get(hash)
blob, trace, err := c.cache.Get(hash)
if err == nil || !errors.Is(err, ErrBlobNotFound) {
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
@ -54,14 +55,14 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
metrics.LabelComponent: c.component,
metrics.LabelSource: "cache",
}).Set(rate)
return blob, err
return blob, trace.Stack(time.Since(start), c.Name()), err
}
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
blob, err = c.origin.Get(hash)
blob, trace, err = c.origin.Get(hash)
if err != nil {
return nil, err
return nil, trace.Stack(time.Since(start), c.Name()), err
}
// there is no need to wait for the blob to be stored before we return it
// TODO: however this should be refactored to limit the amount of routines that the process can spawn to avoid a possible DoS
@ -71,7 +72,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
}
}()
return blob, nil
return blob, trace.Stack(time.Since(start), c.Name()), nil
}
// Put stores the blob in the origin and the cache

View file

@ -7,6 +7,7 @@ import (
"time"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
)
func TestCachingStore_Put(t *testing.T) {
@ -51,7 +52,7 @@ func TestCachingStore_CacheMiss(t *testing.T) {
t.Fatal(err)
}
res, err := s.Get(hash)
res, stack, err := s.Get(hash)
if err != nil {
t.Fatal(err)
}
@ -67,14 +68,16 @@ func TestCachingStore_CacheMiss(t *testing.T) {
if !has {
t.Errorf("Get() did not copy blob to cache")
}
t.Logf("stack: %s", stack.String())
res, err = cache.Get(hash)
res, stack, err = cache.Get(hash)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(b, res) {
t.Errorf("expected cached Get() to return %s, got %s", string(b), string(res))
}
t.Logf("stack: %s", stack.String())
}
func TestCachingStore_ThunderingHerd(t *testing.T) {
@ -93,7 +96,7 @@ func TestCachingStore_ThunderingHerd(t *testing.T) {
wg := &sync.WaitGroup{}
getNoErr := func() {
res, err := s.Get(hash)
res, _, err := s.Get(hash)
if err != nil {
t.Fatal(err)
}
@ -149,7 +152,7 @@ func (s *SlowBlobStore) Has(hash string) (bool, error) {
return s.mem.Has(hash)
}
func (s *SlowBlobStore) Get(hash string) (stream.Blob, error) {
func (s *SlowBlobStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
time.Sleep(s.delay)
return s.mem.Get(hash)
}

View file

@ -6,11 +6,11 @@ import (
"net/http"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/shared"
log "github.com/sirupsen/logrus"
)
@ -49,30 +49,30 @@ func (c *CloudFrontROStore) Has(hash string) (bool, error) {
}
// Get gets the blob from Cloudfront.
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) {
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
log.Debugf("Getting %s from S3", hash[:8])
start := time.Now()
defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
}(time.Now())
}(start)
status, body, err := c.cfRequest(http.MethodGet, hash)
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), err
}
defer body.Close()
switch status {
case http.StatusNotFound, http.StatusForbidden:
return nil, errors.Err(ErrBlobNotFound)
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(ErrBlobNotFound)
case http.StatusOK:
b, err := ioutil.ReadAll(body)
if err != nil {
return nil, errors.Err(err)
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err)
}
metrics.MtrInBytesS3.Add(float64(len(b)))
return b, nil
return b, shared.NewBlobTrace(time.Since(start), c.Name()), nil
default:
return nil, errors.Err("unexpected status %d", status)
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err("unexpected status %d", status)
}
}

View file

@ -1,7 +1,10 @@
package store
import (
"time"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
)
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3.
@ -30,8 +33,10 @@ func (c *CloudFrontRWStore) Has(hash string) (bool, error) {
}
// Get gets the blob from Cloudfront.
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) {
return c.cf.Get(hash)
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
blob, trace, err := c.cf.Get(hash)
return blob, trace.Stack(time.Since(start), c.Name()), err
}
// Put stores the blob on S3

View file

@ -3,10 +3,12 @@ package store
import (
"encoding/json"
"sync"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/shared"
log "github.com/sirupsen/logrus"
)
@ -36,16 +38,17 @@ func (d *DBBackedStore) Has(hash string) (bool, error) {
}
// Get gets the blob
func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
func (d *DBBackedStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
has, err := d.db.HasBlob(hash, true)
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
}
if !has {
return nil, ErrBlobNotFound
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), ErrBlobNotFound
}
b, err := d.blobs.Get(hash)
b, stack, err := d.blobs.Get(hash)
if d.deleteOnMiss && errors.Is(err, ErrBlobNotFound) {
e2 := d.Delete(hash)
if e2 != nil {
@ -53,7 +56,7 @@ func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
}
}
return b, err
return b, stack.Stack(time.Since(start), d.Name()), err
}
// Put stores the blob in the S3 store and stores the blob information in the DB.

View file

@ -4,9 +4,11 @@ import (
"io/ioutil"
"os"
"path"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/reflector.go/store/speedwalk"
)
@ -52,21 +54,22 @@ func (d *DiskStore) Has(hash string) (bool, error) {
}
// Get returns the blob or an error if the blob doesn't exist.
func (d *DiskStore) Get(hash string) (stream.Blob, error) {
func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
err := d.initOnce()
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
}
blob, err := ioutil.ReadFile(d.path(hash))
if err != nil {
if os.IsNotExist(err) {
return nil, errors.Err(ErrBlobNotFound)
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(ErrBlobNotFound)
}
return nil, errors.Err(err)
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
}
return blob, nil
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
}
// Put stores the blob on disk

View file

@ -28,7 +28,7 @@ func TestDiskStore_Get(t *testing.T) {
err = ioutil.WriteFile(expectedPath, data, os.ModePerm)
require.NoError(t, err)
blob, err := d.Get(hash)
blob, _, err := d.Get(hash)
assert.NoError(t, err)
assert.EqualValues(t, data, blob)
}
@ -39,7 +39,7 @@ func TestDiskStore_GetNonexistentBlob(t *testing.T) {
defer os.RemoveAll(tmpDir)
d := NewDiskStore(tmpDir, 2)
blob, err := d.Get("nonexistent")
blob, _, err := d.Get("nonexistent")
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound))
}

View file

@ -1,10 +1,13 @@
package store
import (
"time"
"github.com/bparli/lfuda-go"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
"github.com/sirupsen/logrus"
)
@ -49,17 +52,18 @@ func (l *LFUDAStore) Has(hash string) (bool, error) {
}
// Get returns the blob or an error if the blob doesn't exist.
func (l *LFUDAStore) Get(hash string) (stream.Blob, error) {
func (l *LFUDAStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
_, has := l.lfuda.Get(hash)
if !has {
return nil, errors.Err(ErrBlobNotFound)
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
}
blob, err := l.store.Get(hash)
blob, stack, err := l.store.Get(hash)
if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store
l.lfuda.Remove(hash)
}
return blob, err
return blob, stack.Stack(time.Since(start), l.Name()), err
}
// Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!!

View file

@ -40,11 +40,11 @@ func TestFUDAStore_Eviction(t *testing.T) {
err = lfuda.Put("two", b)
require.NoError(t, err)
_, err = lfuda.Get("five")
_, _, err = lfuda.Get("five")
require.NoError(t, err)
_, err = lfuda.Get("four")
_, _, err = lfuda.Get("four")
require.NoError(t, err)
_, err = lfuda.Get("two")
_, _, err = lfuda.Get("two")
require.NoError(t, err)
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
@ -102,7 +102,7 @@ func TestFUDAStore_UnderlyingBlobMissing(t *testing.T) {
// hash still exists in lru
assert.True(t, lfuda.lfuda.Contains(hash))
blob, err := lfuda.Get(hash)
blob, _, err := lfuda.Get(hash)
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),

View file

@ -1,9 +1,12 @@
package store
import (
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
golru "github.com/hashicorp/golang-lru"
"github.com/sirupsen/logrus"
@ -56,17 +59,18 @@ func (l *LRUStore) Has(hash string) (bool, error) {
}
// Get returns the blob or an error if the blob doesn't exist.
func (l *LRUStore) Get(hash string) (stream.Blob, error) {
func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
_, has := l.lru.Get(hash)
if !has {
return nil, errors.Err(ErrBlobNotFound)
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
}
blob, err := l.store.Get(hash)
blob, stack, err := l.store.Get(hash)
if errors.Is(err, ErrBlobNotFound) {
// Blob disappeared from underlying store
l.lru.Remove(hash)
}
return blob, err
return blob, stack.Stack(time.Since(start), l.Name()), err
}
// Put stores the blob

View file

@ -89,7 +89,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
// hash still exists in lru
assert.True(t, lru.lru.Contains(hash))
blob, err := lru.Get(hash)
blob, _, err := lru.Get(hash)
assert.Nil(t, blob)
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),

View file

@ -2,9 +2,11 @@ package store
import (
"sync"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
)
// MemStore is an in memory only blob store with no persistence.
@ -34,14 +36,15 @@ func (m *MemStore) Has(hash string) (bool, error) {
}
// Get returns the blob byte slice if present and errors if the blob is not found.
func (m *MemStore) Get(hash string) (stream.Blob, error) {
func (m *MemStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
m.mu.RLock()
defer m.mu.RUnlock()
blob, ok := m.blobs[hash]
if !ok {
return nil, errors.Err(ErrBlobNotFound)
return nil, shared.NewBlobTrace(time.Since(start), m.Name()), errors.Err(ErrBlobNotFound)
}
return blob, nil
return blob, shared.NewBlobTrace(time.Since(start), m.Name()), nil
}
// Put stores the blob in memory

View file

@ -25,7 +25,7 @@ func TestMemStore_Get(t *testing.T) {
t.Error("error getting memory blob - ", err)
}
gotBlob, err := s.Get(hash)
gotBlob, _, err := s.Get(hash)
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
@ -33,7 +33,7 @@ func TestMemStore_Get(t *testing.T) {
t.Error("Got blob that is different from expected blob")
}
missingBlob, err := s.Get("nonexistent hash")
missingBlob, _, err := s.Get("nonexistent hash")
if err == nil {
t.Errorf("Expected ErrBlobNotFound, got nil")
}

View file

@ -1,15 +1,22 @@
package store
import "github.com/lbryio/lbry.go/v2/stream"
import (
"time"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
)
// NoopStore is a store that does nothing
type NoopStore struct{}
const nameNoop = "noop"
func (n *NoopStore) Name() string { return nameNoop }
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
func (n *NoopStore) Name() string { return nameNoop }
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
func (n *NoopStore) Get(_ string) (stream.Blob, shared.BlobTrace, error) {
return nil, shared.NewBlobTrace(time.Since(time.Now()), n.Name()), nil
}
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) Delete(_ string) error { return nil }

View file

@ -8,6 +8,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
@ -65,17 +66,18 @@ func (s *S3Store) Has(hash string) (bool, error) {
}
// Get returns the blob slice if present or errors on S3.
func (s *S3Store) Get(hash string) (stream.Blob, error) {
func (s *S3Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
//Todo-Need to handle error for blob doesn't exist for consistency.
err := s.initOnce()
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err
}
log.Debugf("Getting %s from S3", hash[:8])
defer func(t time.Time) {
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
}(time.Now())
}(start)
buf := &aws.WriteAtBuffer{}
_, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{
@ -86,15 +88,15 @@ func (s *S3Store) Get(hash string) (stream.Blob, error) {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchBucket:
return nil, errors.Err("bucket %s does not exist", s.bucket)
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err("bucket %s does not exist", s.bucket)
case s3.ErrCodeNoSuchKey:
return nil, errors.Err(ErrBlobNotFound)
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err(ErrBlobNotFound)
}
}
return buf.Bytes(), err
return buf.Bytes(), shared.NewBlobTrace(time.Since(start), s.Name()), err
}
return buf.Bytes(), nil
return buf.Bytes(), shared.NewBlobTrace(time.Since(start), s.Name()), nil
}
// Put stores the blob on S3 or errors if S3 connection errors.

View file

@ -4,6 +4,7 @@ import (
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
"github.com/lbryio/lbry.go/v2/stream"
@ -29,17 +30,24 @@ func (s *singleflightStore) Name() string {
return "sf_" + s.BlobStore.Name()
}
type getterResponse struct {
blob stream.Blob
stack shared.BlobTrace
}
// Get ensures that only one request per hash is sent to the origin at a time,
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
func (s *singleflightStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc()
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
blob, err, _ := s.sf.Do(hash, s.getter(hash))
gr, err, _ := s.sf.Do(hash, s.getter(hash))
if err != nil {
return nil, err
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err
}
return blob.(stream.Blob), nil
rsp := gr.(getterResponse)
return rsp.blob, rsp.stack, nil
}
// getter returns a function that gets a blob from the origin
@ -50,9 +58,12 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
start := time.Now()
blob, err := s.BlobStore.Get(hash)
blob, stack, err := s.BlobStore.Get(hash)
if err != nil {
return nil, err
return getterResponse{
blob: nil,
stack: stack.Stack(time.Since(start), s.Name()),
}, err
}
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
@ -62,7 +73,10 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
metrics.LabelSource: "origin",
}).Set(rate)
return blob, nil
return getterResponse{
blob: blob,
stack: stack.Stack(time.Since(start), s.Name()),
}, nil
}
}

View file

@ -3,6 +3,7 @@ package store
import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/shared"
)
// BlobStore is an interface for handling blob storage.
@ -12,7 +13,7 @@ type BlobStore interface {
// Does blob exist in the store.
Has(hash string) (bool, error)
// Get the blob from the store. Must return ErrBlobNotFound if blob is not in store.
Get(hash string) (stream.Blob, error)
Get(hash string) (stream.Blob, shared.BlobTrace, error)
// Put the blob into the store.
Put(hash string, blob stream.Blob) error
// Put an SD blob into the store.