when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50

Closed
shyba wants to merge 39 commits from insert_under_tx into master
3 changed files with 91 additions and 41 deletions
Showing only changes of commit bd13836897 - Show all commits

View file

@ -201,6 +201,11 @@ var (
Name: "s3_in_bytes",
Help: "Total number of incoming bytes (from S3-CF)",
})
Http3BlobReqQueue = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: ns,
Name: "http3_blob_request_queue_size",
Help: "Blob requests of https queue size",
})
)
func CacheLabels(name, component string) prometheus.Labels {

View file

@ -70,46 +70,7 @@ func (s *Server) Start(address string) error {
}
r := mux.NewRouter()
r.HandleFunc("/get/{hash}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
requestedBlob := vars["hash"]
traceParam := r.URL.Query().Get("trace")
var err error
wantsTrace := false
if traceParam != "" {
wantsTrace, err = strconv.ParseBool(traceParam)
if err != nil {
wantsTrace = false
}
}
blob, trace, err := s.store.Get(requestedBlob)
if wantsTrace {
serialized, err := trace.Serialize()
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.Header().Add("Via", serialized)
log.Debug(trace.String())
}
if err != nil {
if errors.Is(err, store.ErrBlobNotFound) {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
fmt.Printf("%s: %s", requestedBlob, errors.FullTrace(err))
s.logError(err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_, err = w.Write(blob)
if err != nil {
s.logError(err)
}
metrics.MtrOutBytesUdp.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc()
metrics.Http3DownloadCount.Inc()
enqueue(&blobRequest{request: r, reply: w})
})
r.HandleFunc("/has/{hash}", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
@ -147,7 +108,7 @@ func (s *Server) Start(address string) error {
},
QuicConfig: quicConf,
}
go InitWorkers(s, 100)
go s.listenForShutdown(&server)
s.grp.Add(1)
go func() {
@ -196,3 +157,47 @@ func (s *Server) listenForShutdown(listener *http3.Server) {
log.Error("error closing listener for peer server - ", err)
}
}
func (s *Server) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
requestedBlob := vars["hash"]
traceParam := r.URL.Query().Get("trace")
var err error
wantsTrace := false
if traceParam != "" {
wantsTrace, err = strconv.ParseBool(traceParam)
if err != nil {
wantsTrace = false
}
}
blob, trace, err := s.store.Get(requestedBlob)
if wantsTrace {
serialized, err := trace.Serialize()
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
w.Header().Add("Via", serialized)
log.Debug(trace.String())
}
if err != nil {
if errors.Is(err, store.ErrBlobNotFound) {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
fmt.Printf("%s: %s", requestedBlob, errors.FullTrace(err))
s.logError(err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
_, err = w.Write(blob)
if err != nil {
s.logError(err)
}
metrics.MtrOutBytesUdp.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc()
metrics.Http3DownloadCount.Inc()
}

40
peer/http3/worker.go Normal file
View file

@ -0,0 +1,40 @@
package http3
import (
"net/http"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/lbry.go/v2/extras/stop"
)
type blobRequest struct {
request *http.Request
reply http.ResponseWriter
}
var getReqCh = make(chan *blobRequest)
func InitWorkers(server *Server, workers int) error {
stopper := stop.New(server.grp)
for i := 0; i < workers; i++ {
go func(worker int) {
select {
case <-stopper.Ch():
case r := <-getReqCh:
metrics.Http3BlobReqQueue.Dec()
process(server, r)
}
}(i)
}
return nil
}
func enqueue(b *blobRequest) {
metrics.Http3BlobReqQueue.Inc()
getReqCh <- b
}
func process(server *Server, r *blobRequest) {
server.HandleGetBlob(r.reply, r.request)
}