diff --git a/cmd/reflector.go b/cmd/reflector.go index c065f58..3e8564d 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -7,11 +7,11 @@ import ( "syscall" "time" - "github.com/lbryio/reflector.go/reflector" - "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" log "github.com/sirupsen/logrus" @@ -52,10 +52,6 @@ func reflectorCmd(cmd *cobra.Command, args []string) { reflectorServer = reflector.NewServer(blobStore) reflectorServer.Timeout = 3 * time.Minute - if globalConfig.SlackHookURL != "" { - reflectorServer.StatLogger = log.StandardLogger() - reflectorServer.StatReportFrequency = 1 * time.Hour - } reflectorServer.EnableBlocklist = true err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort)) @@ -65,18 +61,18 @@ func reflectorCmd(cmd *cobra.Command, args []string) { } peerServer := peer.NewServer(blobStore) - if globalConfig.SlackHookURL != "" { - peerServer.StatLogger = log.StandardLogger() - peerServer.StatReportFrequency = 1 * time.Hour - } err = peerServer.Start(":5567") if err != nil { log.Fatal(err) } + metricsServer := metrics.NewServer(":2112", "/metrics") + metricsServer.Start() + interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan + metricsServer.Shutdown() peerServer.Shutdown() if reflectorServer != nil { reflectorServer.Shutdown() diff --git a/go.mod b/go.mod index a7c83a7..a83d35d 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/lbryio/lbryschema.go v0.0.0-20190602173230-6d2f69a36f46 github.com/lbryio/types v0.0.0-20191228214437-05a22073b4ec github.com/phayes/freeport v0.0.0-20171002185219-e27662a4a9d6 + github.com/prometheus/client_golang v0.9.2 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cast v1.3.0 github.com/spf13/cobra v0.0.3 diff --git a/go.sum b/go.sum index aea0316..0ff2261 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,7 @@ github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:l github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.16.11 h1:g/c7gJeVyHoXCxM2fddS85bPGVkBF8s2q8t3fyElegc= github.com/aws/aws-sdk-go v1.16.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= @@ -191,6 +192,7 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -226,9 +228,13 @@ github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6J github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rubenv/sql-migrate v0.0.0-20170330050058-38004e7a77f2 h1:pZcZXvSLkiJfwtodlUyIbG0wSdFprYDwd4lIWJRYV+k= github.com/rubenv/sql-migrate v0.0.0-20170330050058-38004e7a77f2/go.mod h1:WS0rl9eEliYI8DPnr3TOwz4439pay+qNgzJoVya/DmY= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go new file mode 100644 index 0000000..89aed11 --- /dev/null +++ b/internal/metrics/metrics.go @@ -0,0 +1,159 @@ +package metrics + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "strings" + "syscall" + "time" + + "github.com/lbryio/reflector.go/reflector" + + ee "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + log "github.com/sirupsen/logrus" +) + +type Server struct { + srv *http.Server + stop *stop.Stopper +} + +func NewServer(address string, path string) *Server { + h := http.NewServeMux() + h.Handle(path, promhttp.Handler()) + return &Server{ + srv: &http.Server{ + Addr: address, + Handler: h, + //https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ + //https://blog.cloudflare.com/exposing-go-on-the-internet/ + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 120 * time.Second, + }, + stop: stop.New(), + } +} + +func (s *Server) Start() { + s.stop.Add(1) + go func() { + defer s.stop.Done() + err := s.srv.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error(err) + } + }() +} + +func (s *Server) Shutdown() { + s.srv.Shutdown(context.Background()) + s.stop.StopAndWait() +} + +const ( + ns = "reflector" + + labelDirection = "direction" + labelErrorType = "error_type" + + DirectionUpload = "upload" // to reflector + DirectionDownload = "download" // from reflector + + errConnReset = "conn_reset" + errReadConnReset = "read_conn_reset" + errWriteConnReset = "write_conn_reset" + errReadConnTimedOut = "read_conn_timed_out" + errWriteBrokenPipe = "write_broken_pipe" + errIOTimeout = "io_timeout" + errUnexpectedEOF = "unexpected_eof" + errJSONSyntax = "json_syntax" + errBlobTooBig = "blob_too_big" + errOther = "other" +) + +var ( + BlobDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "blob_download_total", + Help: "Total number of blobs downloaded from reflector", + }) + BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "blob_upload_total", + Help: "Total number of blobs uploaded to reflector", + }) + SDBlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "sdblob_upload_total", + Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", + }) + ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Name: "error_total", + Help: "Total number of errors", + }, []string{labelDirection, labelErrorType}) +) + +func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a hack, but whatever + if e == nil { + return + } + + err := ee.Wrap(e, 0) + errType := errOther + //name := err.TypeName() + if errors.Is(e, context.DeadlineExceeded) { + errType = errIOTimeout + } else if strings.Contains(err.Error(), "i/o timeout") { // hit a read or write deadline + log.Warnln("i/o timeout is not the same as context.DeadlineExceeded") + errType = errIOTimeout + } else if errors.Is(e, syscall.ECONNRESET) { + errType = errConnReset + } else if strings.Contains(err.Error(), "read: connection reset by peer") { // the other side closed the connection using TCP reset + log.Warnln("read conn reset by peer is not the same as ECONNRESET") + errType = errReadConnReset + } else if strings.Contains(err.Error(), "write: connection reset by peer") { // the other side closed the connection using TCP reset + log.Warnln("write conn reset by peer is not the same as ECONNRESET") + errType = errWriteConnReset + } else if errors.Is(e, syscall.ETIMEDOUT) { + errType = errReadConnTimedOut + } else if strings.Contains(err.Error(), "read: connection timed out") { // the other side closed the connection using TCP reset + log.Warnln("read conn timed out is not the same as ETIMEDOUT") + errType = errReadConnTimedOut + } else if errors.Is(e, io.ErrUnexpectedEOF) { + errType = errUnexpectedEOF + } else if strings.Contains(err.Error(), "unexpected EOF") { // tried to read from closed pipe or socket + log.Warnln("unexpected eof is not the same as io.ErrUnexpectedEOF") + errType = errUnexpectedEOF + } else if errors.Is(e, syscall.EPIPE) { + errType = errWriteBrokenPipe + } else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer + log.Warnln("broken pipe is not the same as EPIPE") + errType = errWriteBrokenPipe + } else if errors.Is(e, reflector.ErrBlobTooBig) { + errType = errBlobTooBig + } else if strings.Contains(err.Error(), "blob must be at most") { + log.Warnln("blob must be at most X bytes is not the same as ErrBlobTooBig") + errType = errBlobTooBig + } else if _, ok := e.(*json.SyntaxError); ok { + errType = errJSONSyntax + } else { + shouldLog = true + } + + ErrorCount.With(map[string]string{ + labelDirection: direction, + labelErrorType: errType, + }).Inc() + + return +} diff --git a/peer/server.go b/peer/server.go index 19bb87a..3c6556a 100644 --- a/peer/server.go +++ b/peer/server.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" @@ -27,9 +28,6 @@ const ( // Server is an instance of a peer server that houses the listener and store. type Server struct { - StatLogger *log.Logger // logger to log stats - StatReportFrequency time.Duration // how often to log stats - store store.BlobStore closed bool @@ -47,8 +45,7 @@ func NewServer(store store.BlobStore) *Server { // Shutdown gracefully shuts down the peer server. func (s *Server) Shutdown() { - log.Debug("shutting down peer server...") - s.stats.Shutdown() + log.Debug("shutting down peer server") s.grp.StopAndWait() log.Debug("peer server stopped") } @@ -68,11 +65,6 @@ func (s *Server) Start(address string) error { s.grp.Done() }() - s.stats = reflector.NewStatLogger("DOWNLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child()) - if s.StatLogger != nil && s.StatReportFrequency > 0 { - s.stats.Start() - } - return nil } @@ -275,7 +267,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { BlobHash: reflector.BlobHash(blob), Length: len(blob), } - s.stats.AddBlob() + metrics.BlobDownloadCount.Inc() } } @@ -291,27 +283,10 @@ func (s *Server) logError(e error) { if e == nil { return } - shouldLog := s.stats.AddError(e) + shouldLog := metrics.TrackError(metrics.DirectionDownload, e) if shouldLog { log.Errorln(errors.FullTrace(e)) } - - return - - // old stuff below. its here for posterity, because we're gonna have to deal with these errors someday for real - - //err := errors.Wrap(e, 0) - - // these happen because the peer protocol does not have a way to cancel blob downloads - // so the client will just close the connection if its in the middle of downloading a blob - // but receives the blob from a different peer first or simply goes offline (timeout) - //if strings.Contains(err.Error(), "connection reset by peer") || - // strings.Contains(err.Error(), "i/o timeout") || - // strings.Contains(err.Error(), "broken pipe") { - // return - //} - // - //log.Error(errors.FullTrace(e)) } func readNextMessage(buf *bufio.Reader) ([]byte, error) { diff --git a/reflector/server.go b/reflector/server.go index 6fe11b9..d305c5c 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -8,9 +8,9 @@ import ( "io" "io/ioutil" "net" - "strconv" "time" + "github.com/lbryio/reflector.go/internal/metrics" "github.com/lbryio/reflector.go/store" "github.com/lbryio/lbry.go/v2/extras/errors" @@ -32,18 +32,16 @@ const ( maxBlobSize = stream.MaxBlobSize ) +var ErrBlobTooBig = errors.Base("blob must be at most %d bytes", maxBlobSize) + // Server is and instance of the reflector server. It houses the blob store and listener. type Server struct { Timeout time.Duration // timeout to read or write next message - StatLogger *log.Logger // logger to log stats - StatReportFrequency time.Duration // how often to log stats - EnableBlocklist bool // if true, blocklist checking and blob deletion will be enabled store store.BlobStore grp *stop.Group - stats *Stats } // NewServer returns an initialized reflector server pointer. @@ -58,7 +56,6 @@ func NewServer(store store.BlobStore) *Server { // Shutdown shuts down the reflector server gracefully. func (s *Server) Shutdown() { log.Println("shutting down reflector server...") - s.stats.Shutdown() s.grp.StopAndWait() log.Println("reflector server stopped") } @@ -87,11 +84,6 @@ func (s *Server) Start(address string) error { s.grp.Done() }() - s.stats = NewStatLogger("UPLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child()) - if s.StatLogger != nil && s.StatReportFrequency > 0 { - s.stats.Start() - } - if s.EnableBlocklist { if b, ok := s.store.(store.Blocklister); ok { s.grp.Add(1) @@ -173,7 +165,10 @@ func (s *Server) handleConn(conn net.Conn) { } func (s *Server) doError(conn net.Conn, err error) error { - shouldLog := s.stats.AddError(err) + if err == nil { + return nil + } + shouldLog := metrics.TrackError(metrics.DirectionUpload, err) if shouldLog { log.Errorln(errors.FullTrace(err)) } @@ -262,9 +257,9 @@ func (s *Server) receiveBlob(conn net.Conn) error { return err } - s.stats.AddBlob() + metrics.BlobUploadCount.Inc() if isSdBlob { - s.stats.AddStream() + metrics.SDBlobUploadCount.Inc() } return s.sendTransferResponse(conn, true, isSdBlob) } @@ -311,7 +306,7 @@ func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) { return blobSize, blobHash, isSdBlob, errors.Err("blob hash is empty") } if blobSize > maxBlobSize { - return blobSize, blobHash, isSdBlob, errors.Err("blob must be at most " + strconv.Itoa(maxBlobSize) + " bytes") + return blobSize, blobHash, isSdBlob, errors.Err(ErrBlobTooBig) } if blobSize == 0 { return blobSize, blobHash, isSdBlob, errors.Err("0-byte blob received") diff --git a/reflector/stats.go b/reflector/stats.go index 6256c51..015d8e6 100644 --- a/reflector/stats.go +++ b/reflector/stats.go @@ -2,11 +2,9 @@ package reflector import ( "fmt" - "strings" "sync" "time" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" log "github.com/sirupsen/logrus" @@ -67,30 +65,6 @@ func (s *Stats) AddStream() { s.streams++ } -func (s *Stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but whatever - if e == nil { - return - } - err := errors.Wrap(e, 0) - name := err.TypeName() - if strings.Contains(err.Error(), "i/o timeout") { // hit a read or write deadline - name = "i/o timeout" - } else if strings.Contains(err.Error(), "read: connection reset by peer") { // the other side closed the connection using TCP reset - name = "read conn reset" - } else if strings.Contains(err.Error(), "unexpected EOF") { // tried to read from closed pipe or socket - name = "unexpected EOF" - } else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer - name = "write broken pipe" - } else { - shouldLog = true - } - - s.mu.Lock() - defer s.mu.Unlock() - s.errors[name]++ - return -} - func (s *Stats) runSlackLogger() { t := time.NewTicker(s.logFreq) for {