Replace QUIC with HTTP3/QUIC #43
5 changed files with 46 additions and 8 deletions
|
@ -12,8 +12,6 @@ import (
|
||||||
|
|
||||||
ee "github.com/lbryio/lbry.go/v2/extras/errors"
|
ee "github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||||
"github.com/lbryio/reflector.go/store"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||||
|
@ -67,6 +65,8 @@ const (
|
||||||
DirectionUpload = "upload" // to reflector
|
DirectionUpload = "upload" // to reflector
|
||||||
DirectionDownload = "download" // from reflector
|
DirectionDownload = "download" // from reflector
|
||||||
|
|
||||||
|
MtrLabelSource = "source"
|
||||||
|
|
||||||
errConnReset = "conn_reset"
|
errConnReset = "conn_reset"
|
||||||
errReadConnReset = "read_conn_reset"
|
errReadConnReset = "read_conn_reset"
|
||||||
errWriteConnReset = "write_conn_reset"
|
errWriteConnReset = "write_conn_reset"
|
||||||
|
@ -96,6 +96,26 @@ var (
|
||||||
Name: "blob_download_total",
|
Name: "blob_download_total",
|
||||||
Help: "Total number of blobs downloaded from reflector",
|
Help: "Total number of blobs downloaded from reflector",
|
||||||
})
|
})
|
||||||
|
PeerDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: ns,
|
||||||
|
Name: "peer_download_total",
|
||||||
|
Help: "Total number of blobs downloaded from reflector through tcp protocol",
|
||||||
|
})
|
||||||
|
Http3DownloadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: ns,
|
||||||
|
Name: "http3_blob_download_total",
|
||||||
|
Help: "Total number of blobs downloaded from reflector through QUIC protocol",
|
||||||
|
})
|
||||||
|
CacheHitCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: ns,
|
||||||
|
Name: "cache_hit_total",
|
||||||
|
Help: "Total number of blobs retrieved from the cache storage",
|
||||||
|
})
|
||||||
|
CacheMissCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: ns,
|
||||||
|
Name: "cache_miss_total",
|
||||||
|
Help: "Total number of blobs retrieved from origin rather than cache storage",
|
||||||
|
})
|
||||||
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
|
BlobUploadCount = promauto.NewCounter(prometheus.CounterOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Name: "blob_upload_total",
|
Name: "blob_upload_total",
|
||||||
|
@ -106,6 +126,11 @@ var (
|
||||||
Name: "sdblob_upload_total",
|
Name: "sdblob_upload_total",
|
||||||
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
|
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
|
||||||
})
|
})
|
||||||
|
RetrieverSpeed = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: ns,
|
||||||
|
Name: "speed_mbps",
|
||||||
|
Help: "Speed of blob retrieval",
|
||||||
|
}, []string{MtrLabelSource})
|
||||||
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
ErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
Name: "error_total",
|
Name: "error_total",
|
||||||
|
@ -148,8 +173,6 @@ func TrackError(direction string, e error) (shouldLog bool) { // shouldLog is a
|
||||||
errType = errUnexpectedEOFStr
|
errType = errUnexpectedEOFStr
|
||||||
} else if errors.Is(e, syscall.EPIPE) {
|
} else if errors.Is(e, syscall.EPIPE) {
|
||||||
errType = errEPipe
|
errType = errEPipe
|
||||||
} else if errors.Is(e, store.ErrBlobNotFound) {
|
|
||||||
errType = errBlobNotFound
|
|
||||||
} else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer
|
} else if strings.Contains(err.Error(), "write: broken pipe") { // tried to write to a pipe or socket that was closed by the peer
|
||||||
// I believe this is the same as EPipe when direction == "download", but not for upload
|
// I believe this is the same as EPipe when direction == "download", but not for upload
|
||||||
errType = errWriteBrokenPipe
|
errType = errWriteBrokenPipe
|
||||||
|
|
|
@ -8,10 +8,9 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/reflector.go/store"
|
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
"github.com/lbryio/reflector.go/store"
|
||||||
|
|
||||||
"github.com/lucas-clemente/quic-go/http3"
|
"github.com/lucas-clemente/quic-go/http3"
|
||||||
)
|
)
|
||||||
|
@ -79,6 +78,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, error) {
|
||||||
return nil, errors.Err(err)
|
return nil, errors.Err(err)
|
||||||
}
|
}
|
||||||
if resp.StatusCode == http.StatusNotFound {
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
fmt.Printf("%s blob not found %d\n", hash, resp.StatusCode)
|
||||||
return nil, errors.Err(store.ErrBlobNotFound)
|
return nil, errors.Err(store.ErrBlobNotFound)
|
||||||
}
|
}
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
|
|
@ -84,6 +84,8 @@ func (s *Server) Start(address string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logError(err)
|
s.logError(err)
|
||||||
}
|
}
|
||||||
|
metrics.BlobDownloadCount.Inc()
|
||||||
|
metrics.Http3DownloadCount.Inc()
|
||||||
})
|
})
|
||||||
r.HandleFunc("/has/{hash}", func(w http.ResponseWriter, r *http.Request) {
|
r.HandleFunc("/has/{hash}", func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
|
|
|
@ -273,6 +273,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
||||||
Length: len(blob),
|
Length: len(blob),
|
||||||
}
|
}
|
||||||
metrics.BlobDownloadCount.Inc()
|
metrics.BlobDownloadCount.Inc()
|
||||||
|
metrics.PeerDownloadCount.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,12 @@
|
||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||||
"github.com/lbryio/lbry.go/v2/stream"
|
"github.com/lbryio/lbry.go/v2/stream"
|
||||||
|
|
||||||
|
"github.com/lbryio/reflector.go/internal/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance.
|
// CachingBlobStore combines two stores, typically a local and a remote store, to improve performance.
|
||||||
|
@ -29,18 +33,26 @@ func (c *CachingBlobStore) Has(hash string) (bool, error) {
|
||||||
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
|
// Get tries to get the blob from the cache first, falling back to the origin. If the blob comes
|
||||||
// from the origin, it is also stored in the cache.
|
// from the origin, it is also stored in the cache.
|
||||||
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) {
|
func (c *CachingBlobStore) Get(hash string) (stream.Blob, error) {
|
||||||
|
start := time.Now()
|
||||||
blob, err := c.cache.Get(hash)
|
blob, err := c.cache.Get(hash)
|
||||||
|
retrievalTime := time.Since(start)
|
||||||
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
if err == nil || !errors.Is(err, ErrBlobNotFound) {
|
||||||
|
metrics.CacheHitCount.Inc()
|
||||||
|
rate := float64(len(blob)) / 1024 / 1024 / retrievalTime.Seconds()
|
||||||
|
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "cache"}).Set(rate)
|
||||||
return blob, err
|
return blob, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
blob, err = c.origin.Get(hash)
|
blob, err = c.origin.Get(hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
retrievalTime = time.Since(start)
|
||||||
err = c.cache.Put(hash, blob)
|
err = c.cache.Put(hash, blob)
|
||||||
|
rate := float64(len(blob)) / 1024 / 1024 / retrievalTime.Seconds()
|
||||||
|
metrics.RetrieverSpeed.With(map[string]string{metrics.MtrLabelSource: "origin"}).Set(rate)
|
||||||
|
metrics.CacheMissCount.Inc()
|
||||||
return blob, err
|
return blob, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue