Insert in tx #51
4 changed files with 9 additions and 0 deletions
|
@ -20,7 +20,9 @@ var getReqCh = make(chan *blobRequest, 20000)
|
||||||
func InitWorkers(server *Server, workers int) {
|
func InitWorkers(server *Server, workers int) {
|
||||||
stopper := stop.New(server.grp)
|
stopper := stop.New(server.grp)
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
|
metrics.RoutinesQueue.WithLabelValues("http3", "worker").Inc()
|
||||||
go func(worker int) {
|
go func(worker int) {
|
||||||
|
defer metrics.RoutinesQueue.WithLabelValues("http3", "worker").Dec()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-stopper.Ch():
|
case <-stopper.Ch():
|
||||||
|
|
|
@ -89,7 +89,9 @@ func (s *Server) listenAndServe(listener net.Listener) {
|
||||||
log.Error(errors.Prefix("accepting conn", err))
|
log.Error(errors.Prefix("accepting conn", err))
|
||||||
} else {
|
} else {
|
||||||
s.grp.Add(1)
|
s.grp.Add(1)
|
||||||
|
metrics.RoutinesQueue.WithLabelValues("peer", "server-handleconn").Inc()
|
||||||
go func() {
|
go func() {
|
||||||
|
defer metrics.RoutinesQueue.WithLabelValues("peer", "server-handleconn").Dec()
|
||||||
s.handleConnection(conn)
|
s.handleConnection(conn)
|
||||||
s.grp.Done()
|
s.grp.Done()
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -66,7 +66,9 @@ func (c *CachingStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
|
||||||
}
|
}
|
||||||
// there is no need to wait for the blob to be stored before we return it
|
// there is no need to wait for the blob to be stored before we return it
|
||||||
// TODO: however this should be refactored to limit the amount of routines that the process can spawn to avoid a possible DoS
|
// TODO: however this should be refactored to limit the amount of routines that the process can spawn to avoid a possible DoS
|
||||||
|
metrics.RoutinesQueue.WithLabelValues("store", "cache-put").Inc()
|
||||||
go func() {
|
go func() {
|
||||||
|
defer metrics.RoutinesQueue.WithLabelValues("store", "cache-put").Dec()
|
||||||
err = c.cache.Put(hash, blob)
|
err = c.cache.Put(hash, blob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
|
log.Errorf("error saving blob to underlying cache: %s", errors.FullTrace(err))
|
||||||
|
|
|
@ -6,6 +6,8 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
|
|
||||||
"github.com/karrick/godirwalk"
|
"github.com/karrick/godirwalk"
|
||||||
|
@ -24,6 +26,7 @@ func AllFiles(startDir string, basename bool) ([]string, error) {
|
||||||
paths := make([]string, 0, 1000)
|
paths := make([]string, 0, 1000)
|
||||||
pathWG := &sync.WaitGroup{}
|
pathWG := &sync.WaitGroup{}
|
||||||
pathWG.Add(1)
|
pathWG.Add(1)
|
||||||
|
metrics.RoutinesQueue.WithLabelValues("speedwalk", "worker").Inc()
|
||||||
go func() {
|
go func() {
|
||||||
defer pathWG.Done()
|
defer pathWG.Done()
|
||||||
for {
|
for {
|
||||||
|
|
Loading…
Reference in a new issue