diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 8d46266..2793fc0 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -201,6 +201,11 @@ var ( Name: "s3_in_bytes", Help: "Total number of incoming bytes (from S3-CF)", }) + Http3BlobReqQueue = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "http3_blob_request_queue_size", + Help: "Blob requests of https queue size", + }) ) func CacheLabels(name, component string) prometheus.Labels { diff --git a/peer/http3/server.go b/peer/http3/server.go index 16fa156..da8f485 100644 --- a/peer/http3/server.go +++ b/peer/http3/server.go @@ -70,46 +70,7 @@ func (s *Server) Start(address string) error { } r := mux.NewRouter() r.HandleFunc("/get/{hash}", func(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - requestedBlob := vars["hash"] - traceParam := r.URL.Query().Get("trace") - var err error - wantsTrace := false - if traceParam != "" { - wantsTrace, err = strconv.ParseBool(traceParam) - if err != nil { - wantsTrace = false - } - } - blob, trace, err := s.store.Get(requestedBlob) - - if wantsTrace { - serialized, err := trace.Serialize() - if err != nil { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - w.Header().Add("Via", serialized) - log.Debug(trace.String()) - } - if err != nil { - if errors.Is(err, store.ErrBlobNotFound) { - http.Error(w, err.Error(), http.StatusNotFound) - return - } - fmt.Printf("%s: %s", requestedBlob, errors.FullTrace(err)) - s.logError(err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - _, err = w.Write(blob) - if err != nil { - s.logError(err) - } - metrics.MtrOutBytesUdp.Add(float64(len(blob))) - metrics.BlobDownloadCount.Inc() - metrics.Http3DownloadCount.Inc() + enqueue(&blobRequest{request: r, reply: w}) }) r.HandleFunc("/has/{hash}", func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -147,7 +108,7 @@ func (s *Server) Start(address string) error { }, QuicConfig: quicConf, } - + go InitWorkers(s, 100) go s.listenForShutdown(&server) s.grp.Add(1) go func() { @@ -196,3 +157,47 @@ func (s *Server) listenForShutdown(listener *http3.Server) { log.Error("error closing listener for peer server - ", err) } } + +func (s *Server) HandleGetBlob(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + requestedBlob := vars["hash"] + traceParam := r.URL.Query().Get("trace") + var err error + wantsTrace := false + if traceParam != "" { + wantsTrace, err = strconv.ParseBool(traceParam) + if err != nil { + wantsTrace = false + } + } + + blob, trace, err := s.store.Get(requestedBlob) + + if wantsTrace { + serialized, err := trace.Serialize() + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + w.Header().Add("Via", serialized) + log.Debug(trace.String()) + } + if err != nil { + if errors.Is(err, store.ErrBlobNotFound) { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + fmt.Printf("%s: %s", requestedBlob, errors.FullTrace(err)) + s.logError(err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + _, err = w.Write(blob) + if err != nil { + s.logError(err) + } + metrics.MtrOutBytesUdp.Add(float64(len(blob))) + metrics.BlobDownloadCount.Inc() + metrics.Http3DownloadCount.Inc() +} diff --git a/peer/http3/worker.go b/peer/http3/worker.go new file mode 100644 index 0000000..182ca08 --- /dev/null +++ b/peer/http3/worker.go @@ -0,0 +1,40 @@ +package http3 + +import ( + "net/http" + + "github.com/lbryio/reflector.go/internal/metrics" + + "github.com/lbryio/lbry.go/v2/extras/stop" +) + +type blobRequest struct { + request *http.Request + reply http.ResponseWriter +} + +var getReqCh = make(chan *blobRequest) + +func InitWorkers(server *Server, workers int) error { + stopper := stop.New(server.grp) + for i := 0; i < workers; i++ { + go func(worker int) { + select { + case <-stopper.Ch(): + case r := <-getReqCh: + metrics.Http3BlobReqQueue.Dec() + process(server, r) + } + }(i) + } + return nil +} + +func enqueue(b *blobRequest) { + metrics.Http3BlobReqQueue.Inc() + getReqCh <- b +} + +func process(server *Server, r *blobRequest) { + server.HandleGetBlob(r.reply, r.request) +}