reflector.go/reflector/stats.go

123 lines
2.4 KiB
Go
Raw Normal View History

2018-08-28 17:18:06 +02:00
package reflector
import (
"fmt"
"strings"
2018-08-28 17:18:06 +02:00
"sync"
"time"
2019-01-09 23:52:30 +01:00
"github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/stop"
2018-08-28 17:18:06 +02:00
log "github.com/sirupsen/logrus"
)
2018-08-31 03:08:59 +02:00
// TODO: store daily stats too. and maybe other intervals
2019-02-08 20:56:41 +01:00
type Stats struct {
2018-08-28 17:18:06 +02:00
mu *sync.Mutex
blobs int
streams int
errors map[string]int
2019-02-08 20:56:41 +01:00
started bool
2018-08-28 17:18:06 +02:00
2019-02-08 20:56:41 +01:00
name string
2018-08-28 17:18:06 +02:00
logger *log.Logger
logFreq time.Duration
grp *stop.Group
}
2019-02-08 20:56:41 +01:00
func NewStatLogger(name string, logger *log.Logger, logFreq time.Duration, parentGrp *stop.Group) *Stats {
return &Stats{
2018-08-28 17:18:06 +02:00
mu: &sync.Mutex{},
grp: stop.New(parentGrp),
logger: logger,
logFreq: logFreq,
errors: make(map[string]int),
2019-02-08 20:56:41 +01:00
name: name,
2018-08-28 17:18:06 +02:00
}
}
2019-02-08 20:56:41 +01:00
func (s *Stats) Start() {
s.started = true
2018-08-28 17:18:06 +02:00
s.grp.Add(1)
go func() {
defer s.grp.Done()
s.runSlackLogger()
}()
}
2019-02-08 20:56:41 +01:00
func (s *Stats) Shutdown() {
if !s.started {
return
}
2018-09-04 17:08:34 +02:00
s.log()
2018-08-28 17:18:06 +02:00
s.grp.StopAndWait()
2019-02-08 20:56:41 +01:00
s.started = false
2018-08-28 17:18:06 +02:00
}
2019-02-08 20:56:41 +01:00
func (s *Stats) AddBlob() {
2018-08-28 17:18:06 +02:00
s.mu.Lock()
defer s.mu.Unlock()
2018-08-31 02:20:15 +02:00
s.blobs++
2018-08-28 17:18:06 +02:00
}
2019-02-08 20:56:41 +01:00
func (s *Stats) AddStream() {
2018-08-28 17:18:06 +02:00
s.mu.Lock()
defer s.mu.Unlock()
2018-08-31 02:20:15 +02:00
s.streams++
2018-08-28 17:18:06 +02:00
}
2019-02-08 20:56:41 +01:00
func (s *Stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but whatever
2018-08-28 17:18:06 +02:00
if e == nil {
return
}
err := errors.Wrap(e, 0)
name := err.TypeName()
if strings.Contains(err.Error(), "i/o timeout") { // hit a read or write deadline
name = "i/o timeout"
} else if strings.Contains(err.Error(), "read: connection reset by peer") { // the other side closed the connection using TCP reset
name = "read conn reset"
} else if strings.Contains(err.Error(), "unexpected EOF") { // tried to read from closed pipe or socket
2018-09-26 22:00:19 +02:00
name = "unexpected EOF"
} else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer
2018-09-26 22:00:19 +02:00
name = "write broken pipe"
2018-08-31 03:10:39 +02:00
} else {
shouldLog = true
}
2018-08-28 17:18:06 +02:00
s.mu.Lock()
defer s.mu.Unlock()
2018-08-31 02:20:15 +02:00
s.errors[name]++
return
2018-08-28 17:18:06 +02:00
}
2019-02-08 20:56:41 +01:00
func (s *Stats) runSlackLogger() {
2018-08-28 17:18:06 +02:00
t := time.NewTicker(s.logFreq)
for {
select {
case <-s.grp.Ch():
return
case <-t.C:
s.log()
}
}
}
2019-02-08 20:56:41 +01:00
func (s *Stats) log() {
2018-08-28 17:18:06 +02:00
s.mu.Lock()
blobs, streams := s.blobs, s.streams
s.blobs, s.streams = 0, 0
errStr := ""
for name, count := range s.errors {
errStr += fmt.Sprintf("%d %s, ", count, name)
delete(s.errors, name)
}
s.mu.Unlock()
if len(errStr) > 2 {
errStr = errStr[:len(errStr)-2] // trim last comma and space
}
2019-02-08 20:56:41 +01:00
s.logger.Printf("%s stats: %d blobs, %d streams, errors: %s", s.name, blobs, streams, errStr)
2018-08-28 17:18:06 +02:00
}