ensure only single origin Get request is in flight per hash. protects against thundering herd. #45

Merged
lyoshenka merged 5 commits from thundering_herd into master 2020-10-15 03:16:35 +02:00
9 changed files with 48 additions and 4 deletions
Showing only changes of commit c3db95a6c1 - Show all commits

View file

@ -148,6 +148,41 @@ var (
Name: "error_total", Name: "error_total",
Help: "Total number of errors", Help: "Total number of errors",
}, []string{labelDirection, labelErrorType}) }, []string{labelDirection, labelErrorType})
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "tcp_in_bytes",
Help: "Total number of bytes downloaded through TCP",
})
MtrOutBytesTcp = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "tcp_out_bytes",
Help: "Total number of bytes streamed out through TCP",
})
MtrInBytesUdp = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "udp_in_bytes",
Help: "Total number of bytes downloaded through UDP",
})
MtrOutBytesUdp = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "udp_out_bytes",
Help: "Total number of bytes streamed out through UDP",
})
MtrInBytesReflector = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "reflector_in_bytes",
Help: "Total number of incoming bytes (from users)",
})
MtrOutBytesReflector = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "s3_out_bytes",
Help: "Total number of outgoing bytes (to S3)",
})
MtrInBytesS3 = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "s3_in_bytes",
Help: "Total number of incoming bytes (from S3-CF)",
})
) )
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever

View file

@ -8,6 +8,7 @@ import (
"net" "net"
"time" "time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
@ -152,7 +153,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
metrics.MtrInBytesTcp.Add(float64(len(blob)))
return blob, nil return blob, nil
} }

View file

@ -10,6 +10,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lucas-clemente/quic-go/http3" "github.com/lucas-clemente/quic-go/http3"
@ -91,5 +92,6 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
metrics.MtrInBytesUdp.Add(float64(len(body.Bytes())))
return body.Bytes(), nil return body.Bytes(), nil
} }

View file

@ -64,7 +64,7 @@ type availabilityResponse struct {
func (s *Server) Start(address string) error { func (s *Server) Start(address string) error {
log.Println("HTTP3 peer listening on " + address) log.Println("HTTP3 peer listening on " + address)
quicConf := &quic.Config{ quicConf := &quic.Config{
HandshakeTimeout: 3 * time.Second, HandshakeTimeout: 4 * time.Second,
MaxIdleTimeout: 5 * time.Second, MaxIdleTimeout: 5 * time.Second,
} }
r := mux.NewRouter() r := mux.NewRouter()
@ -87,6 +87,7 @@ func (s *Server) Start(address string) error {
if err != nil { if err != nil {
s.logError(err) s.logError(err)
} }
metrics.MtrOutBytesUdp.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc() metrics.BlobDownloadCount.Inc()
metrics.Http3DownloadCount.Inc() metrics.Http3DownloadCount.Inc()
}) })

View file

@ -31,7 +31,7 @@ func NewStore(opts StoreOpts) *Store {
func (p *Store) getClient() (*Client, error) { func (p *Store) getClient() (*Client, error) {
var qconf quic.Config var qconf quic.Config
qconf.HandshakeTimeout = 3 * time.Second qconf.HandshakeTimeout = 4 * time.Second
qconf.MaxIdleTimeout = 5 * time.Second qconf.MaxIdleTimeout = 5 * time.Second
pool, err := x509.SystemCertPool() pool, err := x509.SystemCertPool()
if err != nil { if err != nil {

View file

@ -272,6 +272,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
BlobHash: reflector.BlobHash(blob), BlobHash: reflector.BlobHash(blob),
Length: len(blob), Length: len(blob),
} }
metrics.MtrOutBytesTcp.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc() metrics.BlobDownloadCount.Inc()
metrics.PeerDownloadCount.Inc() metrics.PeerDownloadCount.Inc()
} }

View file

@ -256,7 +256,7 @@ func (s *Server) receiveBlob(conn net.Conn) error {
if err != nil { if err != nil {
return err return err
} }
metrics.MtrInBytesReflector.Add(float64(len(blob)))
metrics.BlobUploadCount.Inc() metrics.BlobUploadCount.Inc()
if isSdBlob { if isSdBlob {
metrics.SDBlobUploadCount.Inc() metrics.SDBlobUploadCount.Inc()

View file

@ -7,6 +7,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/meta"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -77,6 +78,7 @@ func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) {
if err != nil { if err != nil {
return nil, errors.Err(err) return nil, errors.Err(err)
} }
metrics.MtrInBytesS3.Add(float64(len(b)))
return b, nil return b, nil
default: default:
return nil, errors.Err(res.Status) return nil, errors.Err(res.Status)

View file

@ -7,6 +7,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
@ -126,6 +127,7 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error {
Body: bytes.NewBuffer(blob), Body: bytes.NewBuffer(blob),
StorageClass: aws.String(s3.StorageClassIntelligentTiering), StorageClass: aws.String(s3.StorageClassIntelligentTiering),
}) })
metrics.MtrOutBytesReflector.Add(float64(blob.Size()))
return err return err
} }