Insert in tx #51

Closed
lyoshenka wants to merge 52 commits from insert_in_tx into master
4 changed files with 25 additions and 1 deletions
Showing only changes of commit 0152300d8d - Show all commits

View file

@ -219,6 +219,11 @@ var (
Name: "http3_blob_request_queue_size", Name: "http3_blob_request_queue_size",
Help: "Blob requests of https queue size", Help: "Blob requests of https queue size",
}) })
RoutinesQueue = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "routines",
Help: "routines running by type",
}, []string{"package", "kind"})
) )
func CacheLabels(name, component string) prometheus.Labels { func CacheLabels(name, component string) prometheus.Labels {

View file

@ -8,6 +8,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/lbryio/reflector.go/wallet" "github.com/lbryio/reflector.go/wallet"
@ -109,8 +111,9 @@ func sdHashesForOutpoints(walletServers, outpoints []string, stopper stop.Chan)
} }
done := make(chan bool) done := make(chan bool)
metrics.RoutinesQueue.WithLabelValues("reflector", "sdhashesforoutput").Inc()
go func() { go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "sdhashesforoutput").Dec()
select { select {
case <-done: case <-done:
case <-stopper: case <-stopper:

View file

@ -74,7 +74,9 @@ func (s *Server) Start(address string) error {
log.Fatal(err) log.Fatal(err)
} }
s.grp.Add(1) s.grp.Add(1)
metrics.RoutinesQueue.WithLabelValues("reflector", "listener").Inc()
go func() { go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "listener").Dec()
<-s.grp.Ch() <-s.grp.Ch()
err := l.Close() err := l.Close()
if err != nil { if err != nil {
@ -84,7 +86,9 @@ func (s *Server) Start(address string) error {
}() }()
s.grp.Add(1) s.grp.Add(1)
metrics.RoutinesQueue.WithLabelValues("reflector", "start").Inc()
go func() { go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "start").Dec()
s.listenAndServe(l) s.listenAndServe(l)
s.grp.Done() s.grp.Done()
}() }()
@ -92,7 +96,9 @@ func (s *Server) Start(address string) error {
if s.EnableBlocklist { if s.EnableBlocklist {
if b, ok := s.underlyingStore.(store.Blocklister); ok { if b, ok := s.underlyingStore.(store.Blocklister); ok {
s.grp.Add(1) s.grp.Add(1)
metrics.RoutinesQueue.WithLabelValues("reflector", "enableblocklist").Inc()
go func() { go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "enableblocklist").Dec()
s.enableBlocklist(b) s.enableBlocklist(b)
s.grp.Done() s.grp.Done()
}() }()
@ -115,7 +121,9 @@ func (s *Server) listenAndServe(listener net.Listener) {
log.Error(err) log.Error(err)
} else { } else {
s.grp.Add(1) s.grp.Add(1)
metrics.RoutinesQueue.WithLabelValues("reflector", "server-listenandserve").Inc()
go func() { go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "server-listenandserve").Inc()
s.handleConn(conn) s.handleConn(conn)
s.grp.Done() s.grp.Done()
}() }()
@ -130,7 +138,9 @@ func (s *Server) handleConn(conn net.Conn) {
close(connNeedsClosing) close(connNeedsClosing)
}() }()
s.grp.Add(1) s.grp.Add(1)
metrics.RoutinesQueue.WithLabelValues("reflector", "server-handleconn").Inc()
go func() { go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "server-handleconn").Dec()
defer s.grp.Done() defer s.grp.Done()
select { select {
case <-connNeedsClosing: case <-connNeedsClosing:

View file

@ -7,6 +7,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
@ -88,7 +90,9 @@ func (u *Uploader) Upload(dirOrFilePath string) error {
for i := 0; i < u.workers; i++ { for i := 0; i < u.workers; i++ {
workerWG.Add(1) workerWG.Add(1)
metrics.RoutinesQueue.WithLabelValues("reflector", "upload").Inc()
go func(i int) { go func(i int) {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "upload").Dec()
defer workerWG.Done() defer workerWG.Done()
defer func(i int) { log.Debugf("worker %d quitting", i) }(i) defer func(i int) { log.Debugf("worker %d quitting", i) }(i)
u.worker(pathChan) u.worker(pathChan)
@ -97,7 +101,9 @@ func (u *Uploader) Upload(dirOrFilePath string) error {
countWG := sync.WaitGroup{} countWG := sync.WaitGroup{}
countWG.Add(1) countWG.Add(1)
metrics.RoutinesQueue.WithLabelValues("reflector", "uploader").Inc()
go func() { go func() {
defer metrics.RoutinesQueue.WithLabelValues("reflector", "uploader").Dec()
defer countWG.Done() defer countWG.Done()
u.counter() u.counter()
}() }()