Ittt #52
26 changed files with 317 additions and 104 deletions
|
@ -41,7 +41,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
|
||||||
|
|
||||||
var sd stream.SDBlob
|
var sd stream.SDBlob
|
||||||
|
|
||||||
sdb, err := s.Get(sdHash)
|
sdb, _, err := s.Get(sdHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ func getStreamCmd(cmd *cobra.Command, args []string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(sd.BlobInfos)-1; i++ {
|
for i := 0; i < len(sd.BlobInfos)-1; i++ {
|
||||||
b, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
|
b, _, err := s.Get(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
@ -57,10 +58,11 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
|
||||||
|
|
||||||
var sd stream.SDBlob
|
var sd stream.SDBlob
|
||||||
|
|
||||||
b, err := c.GetBlob(sdHash)
|
b, trace, err := c.GetBlob(sdHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.Debug(trace.String())
|
||||||
|
|
||||||
err = sd.FromBlob(b)
|
err = sd.FromBlob(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -71,10 +73,11 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
|
||||||
s[0] = b
|
s[0] = b
|
||||||
|
|
||||||
for i := 0; i < len(sd.BlobInfos)-1; i++ {
|
for i := 0; i < len(sd.BlobInfos)-1; i++ {
|
||||||
s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
|
s[i+1], trace, err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.Debug(trace.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
|
@ -114,47 +117,52 @@ func (c *Client) HasBlob(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBlob gets a blob
|
// GetBlob gets a blob
|
||||||
func (c *Client) GetBlob(hash string) (stream.Blob, error) {
|
func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
if !c.connected {
|
if !c.connected {
|
||||||
return nil, errors.Err("not connected")
|
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), errors.Err("not connected")
|
||||||
}
|
}
|
||||||
|
|
||||||
sendRequest, err := json.Marshal(blobRequest{
|
sendRequest, err := json.Marshal(blobRequest{
|
||||||
RequestedBlob: hash,
|
RequestedBlob: hash,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.write(sendRequest)
|
err = c.write(sendRequest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
|
||||||
}
|
}
|
||||||
|
|
||||||
var resp blobResponse
|
var resp blobResponse
|
||||||
err = c.read(&resp)
|
err = c.read(&resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), "tcp"), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trace := shared.NewBlobTrace(time.Since(start), "tcp")
|
||||||
|
if resp.RequestTrace != nil {
|
||||||
|
trace = *resp.RequestTrace
|
||||||
|
}
|
||||||
if resp.IncomingBlob.Error != "" {
|
if resp.IncomingBlob.Error != "" {
|
||||||
return nil, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
|
return nil, trace, errors.Prefix(hash[:8], resp.IncomingBlob.Error)
|
||||||
}
|
}
|
||||||
if resp.IncomingBlob.BlobHash != hash {
|
if resp.IncomingBlob.BlobHash != hash {
|
||||||
return nil, errors.Prefix(hash[:8], "blob hash in response does not match requested hash")
|
return nil, trace.Stack(time.Since(start), "tcp"), errors.Prefix(hash[:8], "blob hash in response does not match requested hash")
|
||||||
}
|
}
|
||||||
if resp.IncomingBlob.Length <= 0 {
|
if resp.IncomingBlob.Length <= 0 {
|
||||||
return nil, errors.Prefix(hash[:8], "length reported as <= 0")
|
return nil, trace, errors.Prefix(hash[:8], "length reported as <= 0")
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("receiving blob %s from %s", hash[:8], c.conn.RemoteAddr())
|
log.Debugf("receiving blob %s from %s", hash[:8], c.conn.RemoteAddr())
|
||||||
|
|
||||||
blob, err := c.readRawBlob(resp.IncomingBlob.Length)
|
blob, err := c.readRawBlob(resp.IncomingBlob.Length)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, (*resp.RequestTrace).Stack(time.Since(start), "tcp"), err
|
||||||
}
|
}
|
||||||
metrics.MtrInBytesTcp.Add(float64(len(blob)))
|
metrics.MtrInBytesTcp.Add(float64(len(blob)))
|
||||||
return blob, nil
|
return blob, trace.Stack(time.Since(start), "tcp"), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) read(v interface{}) error {
|
func (c *Client) read(v interface{}) error {
|
||||||
|
|
|
@ -9,12 +9,14 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
"github.com/lucas-clemente/quic-go/http3"
|
"github.com/lucas-clemente/quic-go/http3"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Client is an instance of a client connected to a server.
|
// Client is an instance of a client connected to a server.
|
||||||
|
@ -35,7 +37,7 @@ func (c *Client) Close() error {
|
||||||
func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) {
|
func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Stream, error) {
|
||||||
var sd stream.SDBlob
|
var sd stream.SDBlob
|
||||||
|
|
||||||
b, err := c.GetBlob(sdHash)
|
b, _, err := c.GetBlob(sdHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -49,10 +51,12 @@ func (c *Client) GetStream(sdHash string, blobCache store.BlobStore) (stream.Str
|
||||||
s[0] = b
|
s[0] = b
|
||||||
|
|
||||||
for i := 0; i < len(sd.BlobInfos)-1; i++ {
|
for i := 0; i < len(sd.BlobInfos)-1; i++ {
|
||||||
s[i+1], err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
|
var trace shared.BlobTrace
|
||||||
|
s[i+1], trace, err = c.GetBlob(hex.EncodeToString(sd.BlobInfos[i].BlobHash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.Debug(trace.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
return s, nil
|
return s, nil
|
||||||
|
@ -75,26 +79,35 @@ func (c *Client) HasBlob(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBlob gets a blob
|
// GetBlob gets a blob
|
||||||
func (c *Client) GetBlob(hash string) (stream.Blob, error) {
|
func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
resp, err := c.conn.Get(fmt.Sprintf("https://%s/get/%s", c.ServerAddr, hash))
|
start := time.Now()
|
||||||
|
resp, err := c.conn.Get(fmt.Sprintf("https://%s/get/%s?trace=true", c.ServerAddr, hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode)
|
fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode)
|
||||||
return nil, errors.Err(store.ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err(store.ErrBlobNotFound)
|
||||||
} else if resp.StatusCode != http.StatusOK {
|
} else if resp.StatusCode != http.StatusOK {
|
||||||
return nil, errors.Err("non 200 status code returned: %d", resp.StatusCode)
|
return nil, shared.NewBlobTrace(time.Since(start), "http3"), errors.Err("non 200 status code returned: %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
tmp := getBuffer()
|
tmp := getBuffer()
|
||||||
defer putBuffer(tmp)
|
defer putBuffer(tmp)
|
||||||
|
serialized := resp.Header.Get("Via")
|
||||||
|
trace := shared.NewBlobTrace(time.Since(start), "http3")
|
||||||
|
if serialized != "" {
|
||||||
|
parsedTrace, err := shared.Deserialize(serialized)
|
||||||
|
if err != nil {
|
||||||
|
return nil, shared.NewBlobTrace(time.Since(start), "http3"), err
|
||||||
|
}
|
||||||
|
trace = *parsedTrace
|
||||||
|
}
|
||||||
written, err := io.Copy(tmp, resp.Body)
|
written, err := io.Copy(tmp, resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Err(err)
|
return nil, trace.Stack(time.Since(start), "http3"), errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
blob := make([]byte, written)
|
blob := make([]byte, written)
|
||||||
|
@ -102,7 +115,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
|
||||||
|
|
||||||
metrics.MtrInBytesUdp.Add(float64(len(blob)))
|
metrics.MtrInBytesUdp.Add(float64(len(blob)))
|
||||||
|
|
||||||
return blob, nil
|
return blob, trace.Stack(time.Since(start), "http3"), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buffer pool to reduce GC
|
// buffer pool to reduce GC
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
@ -71,7 +72,26 @@ func (s *Server) Start(address string) error {
|
||||||
r.HandleFunc("/get/{hash}", func(w http.ResponseWriter, r *http.Request) {
|
r.HandleFunc("/get/{hash}", func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
requestedBlob := vars["hash"]
|
requestedBlob := vars["hash"]
|
||||||
blob, err := s.store.Get(requestedBlob)
|
traceParam := r.URL.Query().Get("trace")
|
||||||
|
var err error
|
||||||
|
wantsTrace := false
|
||||||
|
if traceParam != "" {
|
||||||
|
wantsTrace, err = strconv.ParseBool(traceParam)
|
||||||
|
if err != nil {
|
||||||
|
wantsTrace = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
blob, trace, err := s.store.Get(requestedBlob)
|
||||||
|
|
||||||
|
if wantsTrace {
|
||||||
|
serialized, err := trace.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Add("Via", serialized)
|
||||||
|
log.Debug(trace.String())
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, store.ErrBlobNotFound) {
|
if errors.Is(err, store.ErrBlobNotFound) {
|
||||||
http.Error(w, err.Error(), http.StatusNotFound)
|
http.Error(w, err.Error(), http.StatusNotFound)
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
"github.com/lucas-clemente/quic-go"
|
"github.com/lucas-clemente/quic-go"
|
||||||
"github.com/lucas-clemente/quic-go/http3"
|
"github.com/lucas-clemente/quic-go/http3"
|
||||||
)
|
)
|
||||||
|
@ -68,10 +69,11 @@ func (p *Store) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get downloads the blob from the peer
|
// Get downloads the blob from the peer
|
||||||
func (p *Store) Get(hash string) (stream.Blob, error) {
|
func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
c, err := p.getClient()
|
c, err := p.getClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
|
||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
return c.GetBlob(hash)
|
return c.GetBlob(hash)
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
"github.com/lbryio/reflector.go/reflector"
|
"github.com/lbryio/reflector.go/reflector"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
"github.com/lbryio/reflector.go/store"
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
@ -253,6 +254,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var blob []byte
|
var blob []byte
|
||||||
|
var trace shared.BlobTrace
|
||||||
if request.RequestedBlob != "" {
|
if request.RequestedBlob != "" {
|
||||||
if len(request.RequestedBlob) != stream.BlobHashHexLength {
|
if len(request.RequestedBlob) != stream.BlobHashHexLength {
|
||||||
return nil, errors.Err("Invalid blob hash length")
|
return nil, errors.Err("Invalid blob hash length")
|
||||||
|
@ -260,7 +262,8 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
|
|
||||||
log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
log.Debugln("Sending blob " + request.RequestedBlob[:8])
|
||||||
|
|
||||||
blob, err = s.store.Get(request.RequestedBlob)
|
blob, trace, err = s.store.Get(request.RequestedBlob)
|
||||||
|
log.Debug(trace.String())
|
||||||
if errors.Is(err, store.ErrBlobNotFound) {
|
if errors.Is(err, store.ErrBlobNotFound) {
|
||||||
response.IncomingBlob = incomingBlob{
|
response.IncomingBlob = incomingBlob{
|
||||||
Error: err.Error(),
|
Error: err.Error(),
|
||||||
|
@ -382,6 +385,7 @@ type incomingBlob struct {
|
||||||
}
|
}
|
||||||
type blobResponse struct {
|
type blobResponse struct {
|
||||||
IncomingBlob incomingBlob `json:"incoming_blob"`
|
IncomingBlob incomingBlob `json:"incoming_blob"`
|
||||||
|
RequestTrace *shared.BlobTrace
|
||||||
}
|
}
|
||||||
|
|
||||||
type compositeRequest struct {
|
type compositeRequest struct {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store is a blob store that gets blobs from a peer.
|
// Store is a blob store that gets blobs from a peer.
|
||||||
|
@ -43,10 +44,11 @@ func (p *Store) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get downloads the blob from the peer
|
// Get downloads the blob from the peer
|
||||||
func (p *Store) Get(hash string) (stream.Blob, error) {
|
func (p *Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
c, err := p.getClient()
|
c, err := p.getClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), p.Name()), err
|
||||||
}
|
}
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
return c.GetBlob(hash)
|
return c.GetBlob(hash)
|
||||||
|
|
82
shared/shared.go
Normal file
82
shared/shared.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package shared
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BlobStack struct {
|
||||||
|
Timing time.Duration `json:"timing"`
|
||||||
|
OriginName string `json:"origin_name"`
|
||||||
|
HostName string `json:"host_name"`
|
||||||
|
}
|
||||||
|
type BlobTrace struct {
|
||||||
|
Stacks []BlobStack `json:"stacks"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var hostName *string
|
||||||
|
|
||||||
|
func getHostName() string {
|
||||||
|
if hostName == nil {
|
||||||
|
hn, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
hn = "unknown"
|
||||||
|
}
|
||||||
|
hostName = &hn
|
||||||
|
}
|
||||||
|
return *hostName
|
||||||
|
}
|
||||||
|
func (b *BlobTrace) Stack(timing time.Duration, originName string) BlobTrace {
|
||||||
|
b.Stacks = append(b.Stacks, BlobStack{
|
||||||
|
Timing: timing,
|
||||||
|
OriginName: originName,
|
||||||
|
HostName: getHostName(),
|
||||||
|
})
|
||||||
|
return *b
|
||||||
|
}
|
||||||
|
func (b *BlobTrace) Merge(otherTrance BlobTrace) BlobTrace {
|
||||||
|
b.Stacks = append(b.Stacks, otherTrance.Stacks...)
|
||||||
|
return *b
|
||||||
|
}
|
||||||
|
func NewBlobTrace(timing time.Duration, originName string) BlobTrace {
|
||||||
|
b := BlobTrace{}
|
||||||
|
b.Stacks = append(b.Stacks, BlobStack{
|
||||||
|
Timing: timing,
|
||||||
|
OriginName: originName,
|
||||||
|
HostName: getHostName(),
|
||||||
|
})
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BlobTrace) String() string {
|
||||||
|
var fullTrace string
|
||||||
|
for i, stack := range b.Stacks {
|
||||||
|
delta := time.Duration(0)
|
||||||
|
if i > 0 {
|
||||||
|
delta = stack.Timing - b.Stacks[i-1].Timing
|
||||||
|
}
|
||||||
|
fullTrace += fmt.Sprintf("[%d](%s) origin: %s - timing: %s - delta: %s\n", i, stack.HostName, stack.OriginName, stack.Timing.String(), delta.String())
|
||||||
|
}
|
||||||
|
return fullTrace
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b BlobTrace) Serialize() (string, error) {
|
||||||
|
t, err := json.Marshal(b)
|
||||||
|
if err != nil {
|
||||||
|
return "", errors.Err(err)
|
||||||
|
}
|
||||||
|
return string(t), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func Deserialize(serializedData string) (*BlobTrace, error) {
|
||||||
|
var trace BlobTrace
|
||||||
|
err := json.Unmarshal([]byte(serializedData), &trace)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Err(err)
|
||||||
|
}
|
||||||
|
return &trace, nil
|
||||||
|
}
|
32
shared/shared_test.go
Normal file
32
shared/shared_test.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package shared
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBlobTrace_Serialize(t *testing.T) {
|
||||||
|
stack := NewBlobTrace(10*time.Second, "test")
|
||||||
|
stack.Stack(20*time.Second, "test2")
|
||||||
|
stack.Stack(30*time.Second, "test3")
|
||||||
|
serialized, err := stack.Serialize()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
t.Log(serialized)
|
||||||
|
expected := "{\"stacks\":[{\"timing\":10000000000,\"origin_name\":\"test\"},{\"timing\":20000000000,\"origin_name\":\"test2\"},{\"timing\":30000000000,\"origin_name\":\"test3\"}]}"
|
||||||
|
assert.Equal(t, expected, serialized)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlobTrace_Deserialize(t *testing.T) {
|
||||||
|
serialized := "{\"stacks\":[{\"timing\":10000000000,\"origin_name\":\"test\"},{\"timing\":20000000000,\"origin_name\":\"test2\"},{\"timing\":30000000000,\"origin_name\":\"test3\"}]}"
|
||||||
|
stack, err := Deserialize(serialized)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Len(t, stack.Stacks, 3)
|
||||||
|
assert.Equal(t, stack.Stacks[0].Timing, 10*time.Second)
|
||||||
|
assert.Equal(t, stack.Stacks[1].Timing, 20*time.Second)
|
||||||
|
assert.Equal(t, stack.Stacks[2].Timing, 30*time.Second)
|
||||||
|
assert.Equal(t, stack.Stacks[0].OriginName, "test")
|
||||||
|
assert.Equal(t, stack.Stacks[1].OriginName, "test2")
|
||||||
|
assert.Equal(t, stack.Stacks[2].OriginName, "test3")
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
@ -43,9 +44,9 @@ func (c *CachingStore) Has(hash string) (bool, error) {
|
||||||
|
|
||||||
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
|
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
|
||||||
// from the origin, it is also stored in the cache.
|
// from the origin, it is also stored in the cache.
|
||||||
func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
func (c *CachingStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
blob, err := c.cache.Get(hash)
|
blob, trace, err := c.cache.Get(hash)
|
||||||
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
||||||
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
metrics.CacheHitCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||||
|
@ -54,14 +55,14 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
||||||
metrics.LabelComponent: c.component,
|
metrics.LabelComponent: c.component,
|
||||||
metrics.LabelSource: "cache",
|
metrics.LabelSource: "cache",
|
||||||
}).Set(rate)
|
}).Set(rate)
|
||||||
return blob, err
|
return blob, trace.Stack(time.Since(start), c.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
metrics.CacheMissCount.With(metrics.CacheLabels(c.cache.Name(), c.component)).Inc()
|
||||||
|
|
||||||
blob, err = c.origin.Get(hash)
|
blob, trace, err = c.origin.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, trace.Stack(time.Since(start), c.Name()), err
|
||||||
}
|
}
|
||||||
// there is no need to wait for the blob to be stored before we return it
|
// there is no need to wait for the blob to be stored before we return it
|
||||||
// TODO: however this should be refactored to limit the amount of routines that the process can spawn to avoid a possible DoS
|
// TODO: however this should be refactored to limit the amount of routines that the process can spawn to avoid a possible DoS
|
||||||
|
@ -71,7 +72,7 @@ func (c *CachingStore) Get(hash string) (stream.Blob, error) {
|
||||||
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
|
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return blob, nil
|
return blob, trace.Stack(time.Since(start), c.Name()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob in the origin and the cache
|
// Put stores the blob in the origin and the cache
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCachingStore_Put(t *testing.T) {
|
func TestCachingStore_Put(t *testing.T) {
|
||||||
|
@ -51,7 +52,7 @@ func TestCachingStore_CacheMiss(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := s.Get(hash)
|
res, stack, err := s.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -67,14 +68,16 @@ func TestCachingStore_CacheMiss(t *testing.T) {
|
||||||
if !has {
|
if !has {
|
||||||
t.Errorf("Get() did not copy blob to cache")
|
t.Errorf("Get() did not copy blob to cache")
|
||||||
}
|
}
|
||||||
|
t.Logf("stack: %s", stack.String())
|
||||||
|
|
||||||
res, err = cache.Get(hash)
|
res, stack, err = cache.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(b, res) {
|
if !bytes.Equal(b, res) {
|
||||||
t.Errorf("expected cached Get() to return %s, got %s", string(b), string(res))
|
t.Errorf("expected cached Get() to return %s, got %s", string(b), string(res))
|
||||||
}
|
}
|
||||||
|
t.Logf("stack: %s", stack.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCachingStore_ThunderingHerd(t *testing.T) {
|
func TestCachingStore_ThunderingHerd(t *testing.T) {
|
||||||
|
@ -93,7 +96,7 @@ func TestCachingStore_ThunderingHerd(t *testing.T) {
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
getNoErr := func() {
|
getNoErr := func() {
|
||||||
res, err := s.Get(hash)
|
res, _, err := s.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -149,7 +152,7 @@ func (s *SlowBlobStore) Has(hash string) (bool, error) {
|
||||||
return s.mem.Has(hash)
|
return s.mem.Has(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SlowBlobStore) Get(hash string) (stream.Blob, error) {
|
func (s *SlowBlobStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
time.Sleep(s.delay)
|
time.Sleep(s.delay)
|
||||||
return s.mem.Get(hash)
|
return s.mem.Get(hash)
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,11 +6,11 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
|
||||||
"github.com/lbryio/reflector.go/meta"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
"github.com/lbryio/reflector.go/meta"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -49,30 +49,30 @@ func (c *CloudFrontROStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get gets the blob from Cloudfront.
|
// Get gets the blob from Cloudfront.
|
||||||
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, error) {
|
func (c *CloudFrontROStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
log.Debugf("Getting %s from S3", hash[:8])
|
log.Debugf("Getting %s from S3", hash[:8])
|
||||||
|
start := time.Now()
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
||||||
}(time.Now())
|
}(start)
|
||||||
|
|
||||||
status, body, err := c.cfRequest(http.MethodGet, hash)
|
status, body, err := c.cfRequest(http.MethodGet, hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), err
|
||||||
}
|
}
|
||||||
defer body.Close()
|
defer body.Close()
|
||||||
|
|
||||||
switch status {
|
switch status {
|
||||||
case http.StatusNotFound, http.StatusForbidden:
|
case http.StatusNotFound, http.StatusForbidden:
|
||||||
return nil, errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(ErrBlobNotFound)
|
||||||
case http.StatusOK:
|
case http.StatusOK:
|
||||||
b, err := ioutil.ReadAll(body)
|
b, err := ioutil.ReadAll(body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err(err)
|
||||||
}
|
}
|
||||||
metrics.MtrInBytesS3.Add(float64(len(b)))
|
metrics.MtrInBytesS3.Add(float64(len(b)))
|
||||||
return b, nil
|
return b, shared.NewBlobTrace(time.Since(start), c.Name()), nil
|
||||||
default:
|
default:
|
||||||
return nil, errors.Err("unexpected status %d", status)
|
return nil, shared.NewBlobTrace(time.Since(start), c.Name()), errors.Err("unexpected status %d", status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3.
|
// CloudFrontRWStore combines a Cloudfront and an S3 store. Reads go to Cloudfront, writes go to S3.
|
||||||
|
@ -30,8 +33,10 @@ func (c *CloudFrontRWStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get gets the blob from Cloudfront.
|
// Get gets the blob from Cloudfront.
|
||||||
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, error) {
|
func (c *CloudFrontRWStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
return c.cf.Get(hash)
|
start := time.Now()
|
||||||
|
blob, trace, err := c.cf.Get(hash)
|
||||||
|
return blob, trace.Stack(time.Since(start), c.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob on S3
|
// Put stores the blob on S3
|
||||||
|
|
|
@ -3,10 +3,12 @@ package store
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
"github.com/lbryio/reflector.go/db"
|
"github.com/lbryio/reflector.go/db"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
@ -36,16 +38,17 @@ func (d *DBBackedStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get gets the blob
|
// Get gets the blob
|
||||||
func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
|
func (d *DBBackedStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
has, err := d.db.HasBlob(hash, true)
|
has, err := d.db.HasBlob(hash, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
|
||||||
}
|
}
|
||||||
if !has {
|
if !has {
|
||||||
return nil, ErrBlobNotFound
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), ErrBlobNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := d.blobs.Get(hash)
|
b, stack, err := d.blobs.Get(hash)
|
||||||
if d.deleteOnMiss && errors.Is(err, ErrBlobNotFound) {
|
if d.deleteOnMiss && errors.Is(err, ErrBlobNotFound) {
|
||||||
e2 := d.Delete(hash)
|
e2 := d.Delete(hash)
|
||||||
if e2 != nil {
|
if e2 != nil {
|
||||||
|
@ -53,7 +56,7 @@ func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return b, err
|
return b, stack.Stack(time.Since(start), d.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob in the S3 store and stores the blob information in the DB.
|
// Put stores the blob in the S3 store and stores the blob information in the DB.
|
||||||
|
|
|
@ -4,9 +4,11 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
"github.com/lbryio/reflector.go/store/speedwalk"
|
"github.com/lbryio/reflector.go/store/speedwalk"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -52,21 +54,22 @@ func (d *DiskStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the blob or an error if the blob doesn't exist.
|
// Get returns the blob or an error if the blob doesn't exist.
|
||||||
func (d *DiskStore) Get(hash string) (stream.Blob, error) {
|
func (d *DiskStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
err := d.initOnce()
|
err := d.initOnce()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
blob, err := ioutil.ReadFile(d.path(hash))
|
blob, err := ioutil.ReadFile(d.path(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
return nil, errors.Err(err)
|
return nil, shared.NewBlobTrace(time.Since(start), d.Name()), errors.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return blob, nil
|
return blob, shared.NewBlobTrace(time.Since(start), d.Name()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob on disk
|
// Put stores the blob on disk
|
||||||
|
|
|
@ -28,7 +28,7 @@ func TestDiskStore_Get(t *testing.T) {
|
||||||
err = ioutil.WriteFile(expectedPath, data, os.ModePerm)
|
err = ioutil.WriteFile(expectedPath, data, os.ModePerm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
blob, err := d.Get(hash)
|
blob, _, err := d.Get(hash)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.EqualValues(t, data, blob)
|
assert.EqualValues(t, data, blob)
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ func TestDiskStore_GetNonexistentBlob(t *testing.T) {
|
||||||
defer os.RemoveAll(tmpDir)
|
defer os.RemoveAll(tmpDir)
|
||||||
d := NewDiskStore(tmpDir, 2)
|
d := NewDiskStore(tmpDir, 2)
|
||||||
|
|
||||||
blob, err := d.Get("nonexistent")
|
blob, _, err := d.Get("nonexistent")
|
||||||
assert.Nil(t, blob)
|
assert.Nil(t, blob)
|
||||||
assert.True(t, errors.Is(err, ErrBlobNotFound))
|
assert.True(t, errors.Is(err, ErrBlobNotFound))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/bparli/lfuda-go"
|
"github.com/bparli/lfuda-go"
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -49,17 +52,18 @@ func (l *LFUDAStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the blob or an error if the blob doesn't exist.
|
// Get returns the blob or an error if the blob doesn't exist.
|
||||||
func (l *LFUDAStore) Get(hash string) (stream.Blob, error) {
|
func (l *LFUDAStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
_, has := l.lfuda.Get(hash)
|
_, has := l.lfuda.Get(hash)
|
||||||
if !has {
|
if !has {
|
||||||
return nil, errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
blob, err := l.store.Get(hash)
|
blob, stack, err := l.store.Get(hash)
|
||||||
if errors.Is(err, ErrBlobNotFound) {
|
if errors.Is(err, ErrBlobNotFound) {
|
||||||
// Blob disappeared from underlying store
|
// Blob disappeared from underlying store
|
||||||
l.lfuda.Remove(hash)
|
l.lfuda.Remove(hash)
|
||||||
}
|
}
|
||||||
return blob, err
|
return blob, stack.Stack(time.Since(start), l.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!!
|
// Put stores the blob. Following LFUDA rules it's not guaranteed that a SET will store the value!!!
|
||||||
|
|
|
@ -40,11 +40,11 @@ func TestFUDAStore_Eviction(t *testing.T) {
|
||||||
err = lfuda.Put("two", b)
|
err = lfuda.Put("two", b)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = lfuda.Get("five")
|
_, _, err = lfuda.Get("five")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, err = lfuda.Get("four")
|
_, _, err = lfuda.Get("four")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, err = lfuda.Get("two")
|
_, _, err = lfuda.Get("two")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
assert.Equal(t, cacheMaxBlobs, len(mem.Debug()))
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ func TestFUDAStore_UnderlyingBlobMissing(t *testing.T) {
|
||||||
// hash still exists in lru
|
// hash still exists in lru
|
||||||
assert.True(t, lfuda.lfuda.Contains(hash))
|
assert.True(t, lfuda.lfuda.Contains(hash))
|
||||||
|
|
||||||
blob, err := lfuda.Get(hash)
|
blob, _, err := lfuda.Get(hash)
|
||||||
assert.Nil(t, blob)
|
assert.Nil(t, blob)
|
||||||
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
|
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
|
||||||
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
|
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
|
||||||
|
|
12
store/lru.go
12
store/lru.go
|
@ -1,9 +1,12 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
|
|
||||||
golru "github.com/hashicorp/golang-lru"
|
golru "github.com/hashicorp/golang-lru"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
@ -56,17 +59,18 @@ func (l *LRUStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the blob or an error if the blob doesn't exist.
|
// Get returns the blob or an error if the blob doesn't exist.
|
||||||
func (l *LRUStore) Get(hash string) (stream.Blob, error) {
|
func (l *LRUStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
_, has := l.lru.Get(hash)
|
_, has := l.lru.Get(hash)
|
||||||
if !has {
|
if !has {
|
||||||
return nil, errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), l.Name()), errors.Err(ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
blob, err := l.store.Get(hash)
|
blob, stack, err := l.store.Get(hash)
|
||||||
if errors.Is(err, ErrBlobNotFound) {
|
if errors.Is(err, ErrBlobNotFound) {
|
||||||
// Blob disappeared from underlying store
|
// Blob disappeared from underlying store
|
||||||
l.lru.Remove(hash)
|
l.lru.Remove(hash)
|
||||||
}
|
}
|
||||||
return blob, err
|
return blob, stack.Stack(time.Since(start), l.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob
|
// Put stores the blob
|
||||||
|
|
|
@ -89,7 +89,7 @@ func TestLRUStore_UnderlyingBlobMissing(t *testing.T) {
|
||||||
// hash still exists in lru
|
// hash still exists in lru
|
||||||
assert.True(t, lru.lru.Contains(hash))
|
assert.True(t, lru.lru.Contains(hash))
|
||||||
|
|
||||||
blob, err := lru.Get(hash)
|
blob, _, err := lru.Get(hash)
|
||||||
assert.Nil(t, blob)
|
assert.Nil(t, blob)
|
||||||
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
|
assert.True(t, errors.Is(err, ErrBlobNotFound), "expected (%s) %s, got (%s) %s",
|
||||||
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
|
reflect.TypeOf(ErrBlobNotFound).String(), ErrBlobNotFound.Error(),
|
||||||
|
|
|
@ -2,9 +2,11 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MemStore is an in memory only blob store with no persistence.
|
// MemStore is an in memory only blob store with no persistence.
|
||||||
|
@ -34,14 +36,15 @@ func (m *MemStore) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the blob byte slice if present and errors if the blob is not found.
|
// Get returns the blob byte slice if present and errors if the blob is not found.
|
||||||
func (m *MemStore) Get(hash string) (stream.Blob, error) {
|
func (m *MemStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
blob, ok := m.blobs[hash]
|
blob, ok := m.blobs[hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), m.Name()), errors.Err(ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
return blob, nil
|
return blob, shared.NewBlobTrace(time.Since(start), m.Name()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob in memory
|
// Put stores the blob in memory
|
||||||
|
|
|
@ -25,7 +25,7 @@ func TestMemStore_Get(t *testing.T) {
|
||||||
t.Error("error getting memory blob - ", err)
|
t.Error("error getting memory blob - ", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
gotBlob, err := s.Get(hash)
|
gotBlob, _, err := s.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Expected no error, got %v", err)
|
t.Errorf("Expected no error, got %v", err)
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ func TestMemStore_Get(t *testing.T) {
|
||||||
t.Error("Got blob that is different from expected blob")
|
t.Error("Got blob that is different from expected blob")
|
||||||
}
|
}
|
||||||
|
|
||||||
missingBlob, err := s.Get("nonexistent hash")
|
missingBlob, _, err := s.Get("nonexistent hash")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Expected ErrBlobNotFound, got nil")
|
t.Errorf("Expected ErrBlobNotFound, got nil")
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import "github.com/lbryio/lbry.go/v2/stream"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
|
)
|
||||||
|
|
||||||
// NoopStore is a store that does nothing
|
// NoopStore is a store that does nothing
|
||||||
type NoopStore struct{}
|
type NoopStore struct{}
|
||||||
|
@ -9,7 +14,9 @@ const nameNoop = "noop"
|
||||||
|
|
||||||
func (n *NoopStore) Name() string { return nameNoop }
|
func (n *NoopStore) Name() string { return nameNoop }
|
||||||
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
|
func (n *NoopStore) Has(_ string) (bool, error) { return false, nil }
|
||||||
func (n *NoopStore) Get(_ string) (stream.Blob, error) { return nil, nil }
|
func (n *NoopStore) Get(_ string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
return nil, shared.NewBlobTrace(time.Since(time.Now()), n.Name()), nil
|
||||||
|
}
|
||||||
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
|
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
|
||||||
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
|
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
|
||||||
func (n *NoopStore) Delete(_ string) error { return nil }
|
func (n *NoopStore) Delete(_ string) error { return nil }
|
||||||
|
|
16
store/s3.go
16
store/s3.go
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
@ -65,17 +66,18 @@ func (s *S3Store) Has(hash string) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns the blob slice if present or errors on S3.
|
// Get returns the blob slice if present or errors on S3.
|
||||||
func (s *S3Store) Get(hash string) (stream.Blob, error) {
|
func (s *S3Store) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
//Todo-Need to handle error for blob doesn't exist for consistency.
|
//Todo-Need to handle error for blob doesn't exist for consistency.
|
||||||
err := s.initOnce()
|
err := s.initOnce()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Getting %s from S3", hash[:8])
|
log.Debugf("Getting %s from S3", hash[:8])
|
||||||
defer func(t time.Time) {
|
defer func(t time.Time) {
|
||||||
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
log.Debugf("Getting %s from S3 took %s", hash[:8], time.Since(t).String())
|
||||||
}(time.Now())
|
}(start)
|
||||||
|
|
||||||
buf := &aws.WriteAtBuffer{}
|
buf := &aws.WriteAtBuffer{}
|
||||||
_, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{
|
_, err = s3manager.NewDownloader(s.session).Download(buf, &s3.GetObjectInput{
|
||||||
|
@ -86,15 +88,15 @@ func (s *S3Store) Get(hash string) (stream.Blob, error) {
|
||||||
if aerr, ok := err.(awserr.Error); ok {
|
if aerr, ok := err.(awserr.Error); ok {
|
||||||
switch aerr.Code() {
|
switch aerr.Code() {
|
||||||
case s3.ErrCodeNoSuchBucket:
|
case s3.ErrCodeNoSuchBucket:
|
||||||
return nil, errors.Err("bucket %s does not exist", s.bucket)
|
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err("bucket %s does not exist", s.bucket)
|
||||||
case s3.ErrCodeNoSuchKey:
|
case s3.ErrCodeNoSuchKey:
|
||||||
return nil, errors.Err(ErrBlobNotFound)
|
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), errors.Err(ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return buf.Bytes(), err
|
return buf.Bytes(), shared.NewBlobTrace(time.Since(start), s.Name()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
return buf.Bytes(), nil
|
return buf.Bytes(), shared.NewBlobTrace(time.Since(start), s.Name()), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put stores the blob on S3 or errors if S3 connection errors.
|
// Put stores the blob on S3 or errors if S3 connection errors.
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/internal/metrics"
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
|
@ -29,17 +30,24 @@ func (s *singleflightStore) Name() string {
|
||||||
return "sf_" + s.BlobStore.Name()
|
return "sf_" + s.BlobStore.Name()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type getterResponse struct {
|
||||||
|
blob stream.Blob
|
||||||
|
stack shared.BlobTrace
|
||||||
|
}
|
||||||
|
|
||||||
// Get ensures that only one request per hash is sent to the origin at a time,
|
// Get ensures that only one request per hash is sent to the origin at a time,
|
||||||
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
// thereby protecting against https://en.wikipedia.org/wiki/Thundering_herd_problem
|
||||||
func (s *singleflightStore) Get(hash string) (stream.Blob, error) {
|
func (s *singleflightStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
|
start := time.Now()
|
||||||
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc()
|
metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Inc()
|
||||||
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
|
defer metrics.CacheWaitingRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
|
||||||
|
|
||||||
blob, err, _ := s.sf.Do(hash, s.getter(hash))
|
gr, err, _ := s.sf.Do(hash, s.getter(hash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, shared.NewBlobTrace(time.Since(start), s.Name()), err
|
||||||
}
|
}
|
||||||
return blob.(stream.Blob), nil
|
rsp := gr.(getterResponse)
|
||||||
|
return rsp.blob, rsp.stack, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getter returns a function that gets a blob from the origin
|
// getter returns a function that gets a blob from the origin
|
||||||
|
@ -50,9 +58,12 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
||||||
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
|
defer metrics.CacheOriginRequestsCount.With(metrics.CacheLabels(s.Name(), s.component)).Dec()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
blob, err := s.BlobStore.Get(hash)
|
blob, stack, err := s.BlobStore.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return getterResponse{
|
||||||
|
blob: nil,
|
||||||
|
stack: stack.Stack(time.Since(start), s.Name()),
|
||||||
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
rate := float64(len(blob)) / 1024 / 1024 / time.Since(start).Seconds()
|
||||||
|
@ -62,7 +73,10 @@ func (s *singleflightStore) getter(hash string) func() (interface{}, error) {
|
||||||
metrics.LabelSource: "origin",
|
metrics.LabelSource: "origin",
|
||||||
}).Set(rate)
|
}).Set(rate)
|
||||||
|
|
||||||
return blob, nil
|
return getterResponse{
|
||||||
|
blob: blob,
|
||||||
|
stack: stack.Stack(time.Since(start), s.Name()),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package store
|
||||||
import (
|
import (
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
// BlobStore is an interface for handling blob storage.
|
// BlobStore is an interface for handling blob storage.
|
||||||
|
@ -12,7 +13,7 @@ type BlobStore interface {
|
||||||
// Does blob exist in the store.
|
// Does blob exist in the store.
|
||||||
Has(hash string) (bool, error)
|
Has(hash string) (bool, error)
|
||||||
// Get the blob from the store. Must return ErrBlobNotFound if blob is not in store.
|
// Get the blob from the store. Must return ErrBlobNotFound if blob is not in store.
|
||||||
Get(hash string) (stream.Blob, error)
|
Get(hash string) (stream.Blob, shared.BlobTrace, error)
|
||||||
// Put the blob into the store.
|
// Put the blob into the store.
|
||||||
Put(hash string, blob stream.Blob) error
|
Put(hash string, blob stream.Blob) error
|
||||||
// Put an SD blob into the store.
|
// Put an SD blob into the store.
|
||||||
|
|
Loading…
Reference in a new issue