diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 7ed38e5..4d70cd1 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -148,6 +148,41 @@ var ( Name: "error_total", Help: "Total number of errors", }, []string{labelDirection, labelErrorType}) + MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "tcp_in_bytes", + Help: "Total number of bytes downloaded through TCP", + }) + MtrOutBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "tcp_out_bytes", + Help: "Total number of bytes streamed out through TCP", + }) + MtrInBytesUdp = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "udp_in_bytes", + Help: "Total number of bytes downloaded through UDP", + }) + MtrOutBytesUdp = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "udp_out_bytes", + Help: "Total number of bytes streamed out through UDP", + }) + MtrInBytesReflector = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "reflector_in_bytes", + Help: "Total number of incoming bytes (from users)", + }) + MtrOutBytesReflector = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "s3_out_bytes", + Help: "Total number of outgoing bytes (to S3)", + }) + MtrInBytesS3 = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "s3_in_bytes", + Help: "Total number of incoming bytes (from S3-CF)", + }) ) func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever diff --git a/peer/client.go b/peer/client.go index faa0fa2..cdafd1c 100644 --- a/peer/client.go +++ b/peer/client.go @@ -8,6 +8,7 @@ import ( "net" "time" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -152,7 +153,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { if err != nil { return nil, err } - + metrics.MtrInBytesTcp.Add(float64(len(blob))) return blob, nil } diff --git a/peer/http3/client.go b/peer/http3/client.go index 4aa5381..41c40fb 100644 --- a/peer/http3/client.go +++ b/peer/http3/client.go @@ -10,6 +10,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/store" "github.com/lucas-clemente/quic-go/http3" @@ -91,5 +92,6 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) { if err != nil { return nil, errors.Err(err) } + metrics.MtrInBytesUdp.Add(float64(len(body.Bytes()))) return body.Bytes(), nil } diff --git a/peer/http3/server.go b/peer/http3/server.go index 30b510c..415f640 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -64,7 +64,7 @@ type availabilityResponse struct { func (s *Server) Start(address string) error { log.Println("HTTP3 peer listening on " + address) quicConf := &quic.Config{ - HandshakeTimeout: 3 * time.Second, + HandshakeTimeout: 4 * time.Second, MaxIdleTimeout: 5 * time.Second, } r := mux.NewRouter() @@ -87,6 +87,7 @@ func (s *Server) Start(address string) error { if err != nil { s.logError(err) } + metrics.MtrOutBytesUdp.Add(float64(len(blob))) metrics.BlobDownloadCount.Inc() metrics.Http3DownloadCount.Inc() }) diff --git a/peer/http3/store.go b/peer/http3/store.go index 7b8e4fe..1b3c3bc 100644 --- a/peer/http3/store.go +++ b/peer/http3/store.go @@ -31,7 +31,7 @@ func NewStore(opts StoreOpts) *Store { func (p *Store) getClient() (*Client, error) { var qconf quic.Config - qconf.HandshakeTimeout = 3 * time.Second + qconf.HandshakeTimeout = 4 * time.Second qconf.MaxIdleTimeout = 5 * time.Second pool, err := x509.SystemCertPool() if err != nil { diff --git a/peer/server.go b/peer/server.go index 9a53ef5..44916cb 100644 --- a/peer/server.go +++ b/peer/server.go @@ -272,6 +272,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { BlobHash: reflector.BlobHash(blob), Length: len(blob), } + metrics.MtrOutBytesTcp.Add(float64(len(blob))) metrics.BlobDownloadCount.Inc() metrics.PeerDownloadCount.Inc() } diff --git a/reflector/server.go b/reflector/server.go index d305c5c..bbe3d33 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -256,7 +256,7 @@ func (s *Server) receiveBlob(conn net.Conn) error { if err != nil { return err } - + metrics.MtrInBytesReflector.Add(float64(len(blob))) metrics.BlobUploadCount.Inc() if isSdBlob { metrics.SDBlobUploadCount.Inc() diff --git a/store/cloudfront.go b/store/cloudfront.go index cd8e7a9..fbfafec 100644 --- a/store/cloudfront.go +++ b/store/cloudfront.go @@ -7,6 +7,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" log "github.com/sirupsen/logrus" @@ -77,6 +78,7 @@ func (s *CloudFrontBlobStore) Get(hash string) (stream.Blob, error) { if err != nil { return nil, errors.Err(err) } + metrics.MtrInBytesS3.Add(float64(len(b))) return b, nil default: return nil, errors.Err(res.Status) diff --git a/store/s3.go b/store/s3.go index aa53aee..e08b910 100644 --- a/store/s3.go +++ b/store/s3.go @@ -7,6 +7,7 @@ import ( "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -126,6 +127,7 @@ func (s *S3BlobStore) Put(hash string, blob stream.Blob) error { Body: bytes.NewBuffer(blob), StorageClass: aws.String(s3.StorageClassIntelligentTiering), }) + metrics.MtrOutBytesReflector.Add(float64(blob.Size())) return err }