diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 2570677..4c30606 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -219,6 +219,11 @@ var ( Name: "http3_blob_request_queue_size", Help: "Blob requests of https queue size", }) + RoutinesQueue = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "routines", + Help: "routines running by type", + }, []string{"package", "kind"}) ) func CacheLabels(name, component string) prometheus.Labels { diff --git a/reflector/blocklist.go b/reflector/blocklist.go index 9abfc4a..26edff6 100644 --- a/reflector/blocklist.go +++ b/reflector/blocklist.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/wallet" @@ -109,8 +111,9 @@ func sdHashesForOutpoints(walletServers, outpoints []string, stopper stop.Chan) } done := make(chan bool) - + metrics.RoutinesQueue.WithLabelValues("reflector", "sdhashesforoutput").Inc() go func() { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "sdhashesforoutput").Dec() select { case <-done: case <-stopper: diff --git a/reflector/server.go b/reflector/server.go index 8e78e22..3171ced 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -74,7 +74,9 @@ func (s *Server) Start(address string) error { log.Fatal(err) } s.grp.Add(1) + metrics.RoutinesQueue.WithLabelValues("reflector", "listener").Inc() go func() { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "listener").Dec() <-s.grp.Ch() err := l.Close() if err != nil { @@ -84,7 +86,9 @@ func (s *Server) Start(address string) error { }() s.grp.Add(1) + metrics.RoutinesQueue.WithLabelValues("reflector", "start").Inc() go func() { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "start").Dec() s.listenAndServe(l) s.grp.Done() }() @@ -92,7 +96,9 @@ func (s *Server) Start(address string) error { if s.EnableBlocklist { if b, ok := s.underlyingStore.(store.Blocklister); ok { s.grp.Add(1) + metrics.RoutinesQueue.WithLabelValues("reflector", "enableblocklist").Inc() go func() { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "enableblocklist").Dec() s.enableBlocklist(b) s.grp.Done() }() @@ -115,7 +121,9 @@ func (s *Server) listenAndServe(listener net.Listener) { log.Error(err) } else { s.grp.Add(1) + metrics.RoutinesQueue.WithLabelValues("reflector", "server-listenandserve").Inc() go func() { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "server-listenandserve").Inc() s.handleConn(conn) s.grp.Done() }() @@ -130,7 +138,9 @@ func (s *Server) handleConn(conn net.Conn) { close(connNeedsClosing) }() s.grp.Add(1) + metrics.RoutinesQueue.WithLabelValues("reflector", "server-handleconn").Inc() go func() { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "server-handleconn").Dec() defer s.grp.Done() select { case <-connNeedsClosing: diff --git a/reflector/uploader.go b/reflector/uploader.go index e5b2688..b421272 100644 --- a/reflector/uploader.go +++ b/reflector/uploader.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/lbryio/reflector.go/internal/metrics" + "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/store" @@ -88,7 +90,9 @@ func (u *Uploader) Upload(dirOrFilePath string) error { for i := 0; i < u.workers; i++ { workerWG.Add(1) + metrics.RoutinesQueue.WithLabelValues("reflector", "upload").Inc() go func(i int) { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "upload").Dec() defer workerWG.Done() defer func(i int) { log.Debugf("worker %d quitting", i) }(i) u.worker(pathChan) @@ -97,7 +101,9 @@ func (u *Uploader) Upload(dirOrFilePath string) error { countWG := sync.WaitGroup{} countWG.Add(1) + metrics.RoutinesQueue.WithLabelValues("reflector", "uploader").Inc() go func() { + defer metrics.RoutinesQueue.WithLabelValues("reflector", "uploader").Dec() defer countWG.Done() u.counter() }()