track peer errors as well

This commit is contained in:
Alex Grintsvayg 2019-02-08 14:56:41 -05:00
parent 5788f5aa55
commit a98990f573
No known key found for this signature in database
GPG key ID: AEB3F089F86A22B5
4 changed files with 49 additions and 23 deletions

View file

@ -51,6 +51,10 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
} }
peerServer := peer.NewServer(combo) peerServer := peer.NewServer(combo)
if globalConfig.SlackHookURL != "" {
peerServer.StatLogger = log.StandardLogger()
peerServer.StatReportFrequency = 1 * time.Hour
}
err = peerServer.Start(":5567") err = peerServer.Start(":5567")
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -10,9 +10,11 @@ import (
"strings" "strings"
"time" "time"
"github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/extras/errors" "github.com/lbryio/lbry.go/extras/errors"
"github.com/lbryio/lbry.go/extras/stop" "github.com/lbryio/lbry.go/extras/stop"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -26,10 +28,14 @@ const (
// Server is an instance of a peer server that houses the listener and store. // Server is an instance of a peer server that houses the listener and store.
type Server struct { type Server struct {
StatLogger *log.Logger // logger to log stats
StatReportFrequency time.Duration // how often to log stats
store store.BlobStore store store.BlobStore
closed bool closed bool
grp *stop.Group grp *stop.Group
stats *reflector.Stats
} }
// NewServer returns an initialized Server pointer. // NewServer returns an initialized Server pointer.
@ -43,6 +49,7 @@ func NewServer(store store.BlobStore) *Server {
// Shutdown gracefully shuts down the peer server. // Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
log.Debug("shutting down peer server...") log.Debug("shutting down peer server...")
s.stats.Shutdown()
s.grp.StopAndWait() s.grp.StopAndWait()
log.Debug("peer server stopped") log.Debug("peer server stopped")
} }
@ -62,6 +69,11 @@ func (s *Server) Start(address string) error {
s.grp.Done() s.grp.Done()
}() }()
s.stats = reflector.NewStatLogger("DOWNLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child())
if s.StatLogger != nil && s.StatReportFrequency > 0 {
s.stats.Start()
}
return nil return nil
} }
@ -286,6 +298,14 @@ func (s *Server) logError(e error) {
if e == nil { if e == nil {
return return
} }
shouldLog := s.stats.AddError(e)
if shouldLog {
log.Errorln(errors.FullTrace(e))
}
return
// old stuff below. its here for posterity, because we're gonna have to deal with these errors someday for real
err := errors.Wrap(e, 0) err := errors.Wrap(e, 0)

View file

@ -42,7 +42,7 @@ type Server struct {
store store.BlobStore store store.BlobStore
grp *stop.Group grp *stop.Group
stats *stats stats *Stats
} }
// NewServer returns an initialized reflector server pointer. // NewServer returns an initialized reflector server pointer.
@ -57,9 +57,7 @@ func NewServer(store store.BlobStore) *Server {
// Shutdown shuts down the reflector server gracefully. // Shutdown shuts down the reflector server gracefully.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
log.Println("shutting down reflector server...") log.Println("shutting down reflector server...")
if s.isReportStats() {
s.stats.Shutdown() s.stats.Shutdown()
}
s.grp.StopAndWait() s.grp.StopAndWait()
log.Println("reflector server stopped") log.Println("reflector server stopped")
} }
@ -88,8 +86,8 @@ func (s *Server) Start(address string) error {
s.grp.Done() s.grp.Done()
}() }()
s.stats = newStatLogger(s.StatLogger, s.StatReportFrequency, s.grp.Child()) s.stats = NewStatLogger("UPLOAD", s.StatLogger, s.StatReportFrequency, s.grp.Child())
if s.isReportStats() { if s.StatLogger != nil && s.StatReportFrequency > 0 {
s.stats.Start() s.stats.Start()
} }
@ -393,10 +391,6 @@ func (s *Server) quitting() bool {
} }
} }
func (s *Server) isReportStats() bool {
return s.StatLogger != nil && s.StatReportFrequency > 0
}
func BlobHash(blob []byte) string { func BlobHash(blob []byte) string {
hashBytes := sha512.Sum384(blob) hashBytes := sha512.Sum384(blob)
return hex.EncodeToString(hashBytes[:]) return hex.EncodeToString(hashBytes[:])

View file

@ -14,28 +14,32 @@ import (
// TODO: store daily stats too. and maybe other intervals // TODO: store daily stats too. and maybe other intervals
type stats struct { type Stats struct {
mu *sync.Mutex mu *sync.Mutex
blobs int blobs int
streams int streams int
errors map[string]int errors map[string]int
started bool
name string
logger *log.Logger logger *log.Logger
logFreq time.Duration logFreq time.Duration
grp *stop.Group grp *stop.Group
} }
func newStatLogger(logger *log.Logger, logFreq time.Duration, parentGrp *stop.Group) *stats { func NewStatLogger(name string, logger *log.Logger, logFreq time.Duration, parentGrp *stop.Group) *Stats {
return &stats{ return &Stats{
mu: &sync.Mutex{}, mu: &sync.Mutex{},
grp: stop.New(parentGrp), grp: stop.New(parentGrp),
logger: logger, logger: logger,
logFreq: logFreq, logFreq: logFreq,
errors: make(map[string]int), errors: make(map[string]int),
name: name,
} }
} }
func (s *stats) Start() { func (s *Stats) Start() {
s.started = true
s.grp.Add(1) s.grp.Add(1)
go func() { go func() {
defer s.grp.Done() defer s.grp.Done()
@ -43,23 +47,27 @@ func (s *stats) Start() {
}() }()
} }
func (s *stats) Shutdown() { func (s *Stats) Shutdown() {
if !s.started {
return
}
s.log() s.log()
s.grp.StopAndWait() s.grp.StopAndWait()
s.started = false
} }
func (s *stats) AddBlob() { func (s *Stats) AddBlob() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.blobs++ s.blobs++
} }
func (s *stats) AddStream() { func (s *Stats) AddStream() {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.streams++ s.streams++
} }
func (s *stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but whatever func (s *Stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but whatever
if e == nil { if e == nil {
return return
} }
@ -83,7 +91,7 @@ func (s *stats) AddError(e error) (shouldLog bool) { // shouldLog is a hack, but
return return
} }
func (s *stats) runSlackLogger() { func (s *Stats) runSlackLogger() {
t := time.NewTicker(s.logFreq) t := time.NewTicker(s.logFreq)
for { for {
select { select {
@ -95,7 +103,7 @@ func (s *stats) runSlackLogger() {
} }
} }
func (s *stats) log() { func (s *Stats) log() {
s.mu.Lock() s.mu.Lock()
blobs, streams := s.blobs, s.streams blobs, streams := s.blobs, s.streams
s.blobs, s.streams = 0, 0 s.blobs, s.streams = 0, 0
@ -110,5 +118,5 @@ func (s *stats) log() {
errStr = errStr[:len(errStr)-2] // trim last comma and space errStr = errStr[:len(errStr)-2] // trim last comma and space
} }
s.logger.Printf("Stats: %d blobs, %d streams, errors: %s", blobs, streams, errStr) s.logger.Printf("%s stats: %d blobs, %d streams, errors: %s", s.name, blobs, streams, errStr)
} }