diff --git a/cmd/reflector.go b/cmd/reflector.go index 8aee692..d806daf 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -35,8 +35,14 @@ func reflectorCmd(cmd *cobra.Command, args []string) { s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) combo := store.NewDBBackedS3Store(s3, db) + reflectorServer := reflector.NewServer(combo) reflectorServer.Timeout = 30 * time.Second + if globalConfig.SlackHookURL != "" { + reflectorServer.StatLogger = log.StandardLogger() + reflectorServer.StatReportFrequency = 10 * time.Minute + } + err = reflectorServer.Start(":" + strconv.Itoa(reflector.DefaultPort)) if err != nil { log.Fatal(err) diff --git a/reflector/server.go b/reflector/server.go index 7510146..80931f0 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -34,8 +34,12 @@ const ( type Server struct { Timeout time.Duration // timeout to read or write next message + StatLogger *log.Logger // logger to log stats + StatReportFrequency time.Duration // how often to log stats + store store.BlobStore grp *stop.Group + stats *stats } // NewServer returns an initialized reflector server pointer. @@ -78,6 +82,11 @@ func (s *Server) Start(address string) error { s.grp.Done() }() + s.stats = newStatLogger(s.StatLogger, s.StatReportFrequency, s.grp.Child()) + if s.StatLogger != nil && s.StatReportFrequency > 0 { + s.stats.Start() + } + return nil } @@ -147,6 +156,7 @@ func (s *Server) handleConn(conn net.Conn) { func (s *Server) doError(conn net.Conn, err error) error { log.Errorln(errors.FullTrace(err)) + s.stats.AddError(err) if e2, ok := err.(*json.SyntaxError); ok { log.Errorf("syntax error at byte offset %d", e2.Offset) } @@ -225,6 +235,10 @@ func (s *Server) receiveBlob(conn net.Conn) error { return err } + s.stats.AddBlob() + if isSdBlob { + s.stats.AddStream() + } return s.sendTransferResponse(conn, true, isSdBlob) } diff --git a/reflector/stats.go b/reflector/stats.go new file mode 100644 index 0000000..b3bbe96 --- /dev/null +++ b/reflector/stats.go @@ -0,0 +1,93 @@ +package reflector + +import ( + "fmt" + "sync" + "time" + + "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/lbry.go/stop" + + log "github.com/sirupsen/logrus" +) + +type stats struct { + mu *sync.Mutex + blobs int + streams int + errors map[string]int + + logger *log.Logger + logFreq time.Duration + grp *stop.Group +} + +func newStatLogger(logger *log.Logger, logFreq time.Duration, parentGrp *stop.Group) *stats { + return &stats{ + mu: &sync.Mutex{}, + grp: stop.New(parentGrp), + logger: logger, + logFreq: logFreq, + errors: make(map[string]int), + } +} + +func (s *stats) Start() { + s.grp.Add(1) + go func() { + defer s.grp.Done() + s.runSlackLogger() + }() +} + +func (s *stats) Shutdown() { + s.grp.StopAndWait() +} + +func (s *stats) AddBlob() { + s.mu.Lock() + defer s.mu.Unlock() + s.blobs += 1 +} +func (s *stats) AddStream() { + s.mu.Lock() + defer s.mu.Unlock() + s.streams += 1 +} +func (s *stats) AddError(e error) { + if e == nil { + return + } + err := errors.Wrap(e, 0) + s.mu.Lock() + defer s.mu.Unlock() + s.errors[err.TypeName()] += 1 +} + +func (s *stats) runSlackLogger() { + t := time.NewTicker(s.logFreq) + for { + select { + case <-s.grp.Ch(): + return + case <-t.C: + s.log() + } + } +} + +func (s *stats) log() { + s.mu.Lock() + blobs, streams := s.blobs, s.streams + s.blobs, s.streams = 0, 0 + errStr := "" + for name, count := range s.errors { + errStr += fmt.Sprintf("%d %s, ", count, name) + delete(s.errors, name) + } + s.mu.Unlock() + s.logger.Printf( + "Stats: %d blobs, %d streams, errors: %s", + blobs, streams, errStr[:len(errStr)-2], // trim last comma and space + ) +}