Merge branch 'metrics'

* metrics:
  use labels for different error types
  switch to prometheus for metrics
This commit is contained in:
Alex Grintsvayg 2020-01-02 13:13:29 -05:00
commit 5465527faf
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
7 changed files with 186 additions and 80 deletions

View file

@ -7,11 +7,11 @@ import (
"syscall"
"time"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus"
@ -52,10 +52,6 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
reflectorServer = reflector.NewServer(blobStore)
reflectorServer.Timeout = 3 * time.Minute
if globalConfig.SlackHookURL != "" {
reflectorServer.StatLogger = log.StandardLogger()
reflectorServer.StatReportFrequency = 1 * time.Hour
}
reflectorServer.EnableBlocklist = true
err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort))
@ -65,18 +61,18 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
}
peerServer := peer.NewServer(blobStore)
if globalConfig.SlackHookURL != "" {
peerServer.StatLogger = log.StandardLogger()
peerServer.StatReportFrequency = 1 * time.Hour
}
err = peerServer.Start(":5567")
if err != nil {
log.Fatal(err)
}
metricsServer := metrics.NewServer(":2112", "/metrics")
metricsServer.Start()
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan
metricsServer.Shutdown()
peerServer.Shutdown()
if reflectorServer != nil {
reflectorServer.Shutdown()

1
go.mod
View file

@ -21,6 +21,7 @@ require (
github.com/lbryio/lbryschema.go v0.0.0-20190602173230-6d2f69a36f46
github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec
github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6
github.com/prometheus/client_golang v0.9.2
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cast v1.3.0
github.com/spf13/cobra v0.0.3

6
go.sum
View file

@ -12,6 +12,7 @@ github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:l
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.16.11 h1:g/c7gJeVyHoXCxM2fddS85bPGVkBF8s2q8t3fyElegc=
github.com/aws/aws-sdk-go v1.16.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
@ -191,6 +192,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@ -226,9 +228,13 @@ github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6J
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rubenv/sql-migrate v0.0.0-20170330050058-38004e7a77f2 h1:pZcZXvSLkiJfwtodlUyIbG0wSdFprYDwd4lIWJRYV+k=
github.com/rubenv/sql-migrate v0.0.0-20170330050058-38004e7a77f2/go.mod h1:WS0rl9eEliYI8DPnr3TOwz4439pay+qNgzJoVya/DmY=

159
internal/metrics/metrics.go Normal file
View file

@ -0,0 +1,159 @@
package metrics
import (
"context"
"encoding/json"
"errors"
"io"
"net/http"
"strings"
"syscall"
"time"
"github.com/lbryio/reflector.go/reflector"
ee "github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
)
type Server struct {
srv *http.Server
stop *stop.Stopper
}
func NewServer(address string, path string) *Server {
h := http.NewServeMux()
h.Handle(path, promhttp.Handler())
return &Server{
srv: &http.Server{
Addr: address,
Handler: h,
//https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/
//https://blog.cloudflare.com/exposing-go-on-the-internet/
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
},
stop: stop.New(),
}
}
func (s *Server) Start() {
s.stop.Add(1)
go func() {
defer s.stop.Done()
err := s.srv.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error(err)
}
}()
}
func (s *Server) Shutdown() {
s.srv.Shutdown(context.Background())
s.stop.StopAndWait()
}
const (
ns = "reflector"
labelDirection = "direction"
labelErrorType = "error_type"
DirectionUpload = "upload" // to reflector
DirectionDownload = "download" // from reflector
errConnReset = "conn_reset"
errReadConnReset = "read_conn_reset"
errWriteConnReset = "write_conn_reset"
errReadConnTimedOut = "read_conn_timed_out"
errWriteBrokenPipe = "write_broken_pipe"
errIOTimeout = "io_timeout"
errUnexpectedEOF = "unexpected_eof"
errJSONSyntax = "json_syntax"
errBlobTooBig = "blob_too_big"
errOther = "other"
)
var (
BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "blob_download_total",
Help: "Total number of blobs downloaded from reflector",
})
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "blob_upload_total",
Help: "Total number of blobs uploaded to reflector",
})
SDBlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "sdblob_upload_total",
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
})
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "error_total",
Help: "Total number of errors",
}, []string{labelDirection, labelErrorType})
)
func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever
if e == nil {
return
}
err := ee.Wrap(e, 0)
errType := errOther
//name := err.TypeName()
if errors.Is(e, context.DeadlineExceeded) {
errType = errIOTimeout
} else if strings.Contains(err.Error(), "i/o timeout") { // hit a read or write deadline
log.Warnln("i/o timeout is not the same as context.DeadlineExceeded")
errType = errIOTimeout
} else if errors.Is(e, syscall.ECONNRESET) {
errType = errConnReset
} else if strings.Contains(err.Error(), "read: connection reset by peer") { // the other side closed the connection using TCP reset
log.Warnln("read conn reset by peer is not the same as ECONNRESET")
errType = errReadConnReset
} else if strings.Contains(err.Error(), "write: connection reset by peer") { // the other side closed the connection using TCP reset
log.Warnln("write conn reset by peer is not the same as ECONNRESET")
errType = errWriteConnReset
} else if errors.Is(e, syscall.ETIMEDOUT) {
errType = errReadConnTimedOut
} else if strings.Contains(err.Error(), "read: connection timed out") { // the other side closed the connection using TCP reset
log.Warnln("read conn timed out is not the same as ETIMEDOUT")
errType = errReadConnTimedOut
} else if errors.Is(e, io.ErrUnexpectedEOF) {
errType = errUnexpectedEOF
} else if strings.Contains(err.Error(), "unexpected EOF") { // tried to read from closed pipe or socket
log.Warnln("unexpected eof is not the same as io.ErrUnexpectedEOF")
errType = errUnexpectedEOF
} else if errors.Is(e, syscall.EPIPE) {
errType = errWriteBrokenPipe
} else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer
log.Warnln("broken pipe is not the same as EPIPE")
errType = errWriteBrokenPipe
} else if errors.Is(e, reflector.ErrBlobTooBig) {
errType = errBlobTooBig
} else if strings.Contains(err.Error(), "blob must be at most") {
log.Warnln("blob must be at most X bytes is not the same as ErrBlobTooBig")
errType = errBlobTooBig
} else if _, ok := e.(*json.SyntaxError); ok {
errType = errJSONSyntax
} else {
shouldLog = true
}
ErrorCount.With(map[string]string{
labelDirection: direction,
labelErrorType: errType,
}).Inc()
return
}

View file

@ -8,6 +8,7 @@ import (
"strings"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
@ -27,9 +28,6 @@ const (
// Server is an instance of a peer server that houses the listener and store.
type Server struct {
StatLogger *log.Logger // logger to log stats
StatReportFrequency time.Duration // how often to log stats
store store.BlobStore
closed bool
@ -47,8 +45,7 @@ func NewServer(store store.BlobStore) *Server {
// Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() {
log.Debug("shutting down peer server...")
s.stats.Shutdown()
log.Debug("shutting down peer server")
s.grp.StopAndWait()
log.Debug("peer server stopped")
}
@ -68,11 +65,6 @@ func (s *Server) Start(address string) error {
s.grp.Done()
}()
s.stats = reflector.NewStatLogger("DOWNLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child())
if s.StatLogger != nil && s.StatReportFrequency > 0 {
s.stats.Start()
}
return nil
}
@ -275,7 +267,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
BlobHash: reflector.BlobHash(blob),
Length: len(blob),
}
s.stats.AddBlob()
metrics.BlobDownloadCount.Inc()
}
}
@ -291,27 +283,10 @@ func (s *Server) logError(e error) {
if e == nil {
return
}
shouldLog := s.stats.AddError(e)
shouldLog := metrics.TrackError(metrics.DirectionDownload, e)
if shouldLog {
log.Errorln(errors.FullTrace(e))
}
return
// old stuff below. its here for posterity, because we're gonna have to deal with these errors someday for real
//err := errors.Wrap(e, 0)
// these happen because the peer protocol does not have a way to cancel blob downloads
// so the client will just close the connection if its in the middle of downloading a blob
// but receives the blob from a different peer first or simply goes offline (timeout)
//if strings.Contains(err.Error(), "connection reset by peer") ||
// strings.Contains(err.Error(), "i/o timeout") ||
// strings.Contains(err.Error(), "broken pipe") {
// return
//}
//
//log.Error(errors.FullTrace(e))
}
func readNextMessage(buf *bufio.Reader) ([]byte, error) {

View file

@ -8,9 +8,9 @@ import (
"io"
"io/ioutil"
"net"
"strconv"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/v2/extras/errors"
@ -32,18 +32,16 @@ const (
maxBlobSize = stream.MaxBlobSize
)
var ErrBlobTooBig = errors.Base("blob must be at most %d bytes", maxBlobSize)
// Server is and instance of the reflector server. It houses the blob store and listener.
type Server struct {
Timeout time.Duration // timeout to read or write next message
StatLogger *log.Logger // logger to log stats
StatReportFrequency time.Duration // how often to log stats
EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled
store store.BlobStore
grp *stop.Group
stats *Stats
}
// NewServer returns an initialized reflector server pointer.
@ -58,7 +56,6 @@ func NewServer(store store.BlobStore) *Server {
// Shutdown shuts down the reflector server gracefully.
func (s *Server) Shutdown() {
log.Println("shutting down reflector server...")
s.stats.Shutdown()
s.grp.StopAndWait()
log.Println("reflector server stopped")
}
@ -87,11 +84,6 @@ func (s *Server) Start(address string) error {
s.grp.Done()
}()
s.stats = NewStatLogger("UPLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child())
if s.StatLogger != nil && s.StatReportFrequency > 0 {
s.stats.Start()
}
if s.EnableBlocklist {
if b, ok := s.store.(store.Blocklister); ok {
s.grp.Add(1)
@ -173,7 +165,10 @@ func (s *Server) handleConn(conn net.Conn) {
}
func (s *Server) doError(conn net.Conn, err error) error {
shouldLog := s.stats.AddError(err)
if err == nil {
return nil
}
shouldLog := metrics.TrackError(metrics.DirectionUpload, err)
if shouldLog {
log.Errorln(errors.FullTrace(err))
}
@ -262,9 +257,9 @@ func (s *Server) receiveBlob(conn net.Conn) error {
return err
}
s.stats.AddBlob()
metrics.BlobUploadCount.Inc()
if isSdBlob {
s.stats.AddStream()
metrics.SDBlobUploadCount.Inc()
}
return s.sendTransferResponse(conn, true, isSdBlob)
}
@ -311,7 +306,7 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) {
return blobSize, blobHash, isSdBlob, errors.Err("blob hash is empty")
}
if blobSize > maxBlobSize {
return blobSize, blobHash, isSdBlob, errors.Err("blob must be at most " + strconv.Itoa(maxBlobSize) + " bytes")
return blobSize, blobHash, isSdBlob, errors.Err(ErrBlobTooBig)
}
if blobSize == 0 {
return blobSize, blobHash, isSdBlob, errors.Err("0-byte blob received")

View file

@ -2,11 +2,9 @@ package reflector
import (
"fmt"
"strings"
"sync"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/stop"
log "github.com/sirupsen/logrus"
@ -67,30 +65,6 @@ func (s *Stats) AddStream() {
s.streams++
}
func (s *Stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but whatever
if e == nil {
return
}
err := errors.Wrap(e, 0)
name := err.TypeName()
if strings.Contains(err.Error(), "i/o timeout") { // hit a read or write deadline
name = "i/o timeout"
} else if strings.Contains(err.Error(), "read: connection reset by peer") { // the other side closed the connection using TCP reset
name = "read conn reset"
} else if strings.Contains(err.Error(), "unexpected EOF") { // tried to read from closed pipe or socket
name = "unexpected EOF"
} else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer
name = "write broken pipe"
} else {
shouldLog = true
}
s.mu.Lock()
defer s.mu.Unlock()
s.errors[name]++
return
}
func (s *Stats) runSlackLogger() {
t := time.NewTicker(s.logFreq)
for {