reflector.go/store/http.go

171 lines
4.5 KiB
Go
Raw Permalink Normal View History

2021-05-21 05:49:02 +02:00
package store
import (
"bytes"
"context"
2021-05-21 05:49:02 +02:00
"io"
2021-05-21 21:06:59 +02:00
"net"
2021-05-21 05:49:02 +02:00
"net/http"
"sync"
"time"
"github.com/lbryio/reflector.go/internal/metrics"
"github.com/lbryio/reflector.go/shared"
2021-05-21 19:09:02 +02:00
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
2021-05-21 05:49:02 +02:00
)
2021-07-20 02:09:14 +02:00
// HttpStore is a store that works on top of the HTTP protocol
2021-05-21 05:49:02 +02:00
type HttpStore struct {
upstream string
httpClient *http.Client
2022-07-29 04:59:15 +02:00
edgeToken string
2021-05-21 05:49:02 +02:00
}
2022-07-29 04:59:15 +02:00
func NewHttpStore(upstream, edgeToken string) *HttpStore {
2021-05-21 05:49:02 +02:00
return &HttpStore{
2021-05-21 17:58:33 +02:00
upstream: "http://" + upstream,
2021-05-21 05:49:02 +02:00
httpClient: getClient(),
2022-07-29 04:59:15 +02:00
edgeToken: edgeToken,
2021-05-21 05:49:02 +02:00
}
}
const nameHttp = "http"
2021-07-22 04:09:16 +02:00
func (n *HttpStore) Name() string { return nameHttp }
2021-05-21 05:49:02 +02:00
func (n *HttpStore) Has(hash string) (bool, error) {
url := n.upstream + "/blob?hash=" + hash
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return false, errors.Err(err)
}
res, err := n.httpClient.Do(req)
if err != nil {
return false, errors.Err(err)
}
defer func() { _ = res.Body.Close() }()
2021-05-21 05:49:02 +02:00
if res.StatusCode == http.StatusNotFound {
return false, nil
}
if res.StatusCode == http.StatusNoContent {
return true, nil
}
var body []byte
if res.Body != nil {
body, _ = io.ReadAll(res.Body)
2021-05-21 05:49:02 +02:00
}
return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
}
func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now()
url := n.upstream + "/blob?hash=" + hash
2022-07-29 04:59:15 +02:00
if n.edgeToken != "" {
2022-07-29 05:40:50 +02:00
url += "&edge_token=" + n.edgeToken
2022-07-29 04:59:15 +02:00
}
2021-05-21 05:49:02 +02:00
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err)
}
res, err := n.httpClient.Do(req)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), errors.Err(err)
}
defer func() { _ = res.Body.Close() }()
2021-05-21 05:49:02 +02:00
tmp := getBuffer()
defer putBuffer(tmp)
serialized := res.Header.Get("Via")
trace := shared.NewBlobTrace(time.Since(start), n.Name())
if serialized != "" {
parsedTrace, err := shared.Deserialize(serialized)
if err != nil {
return nil, shared.NewBlobTrace(time.Since(start), n.Name()), err
}
trace = *parsedTrace
}
if res.StatusCode == http.StatusNotFound {
return nil, trace.Stack(time.Since(start), n.Name()), ErrBlobNotFound
}
if res.StatusCode == http.StatusOK {
written, err := io.Copy(tmp, res.Body)
if err != nil {
return nil, trace.Stack(time.Since(start), n.Name()), errors.Err(err)
}
blob := make([]byte, written)
copy(blob, tmp.Bytes())
metrics.MtrInBytesHttp.Add(float64(len(blob)))
2021-05-21 05:53:13 +02:00
return blob, trace.Stack(time.Since(start), n.Name()), nil
2021-05-21 05:49:02 +02:00
}
var body []byte
if res.Body != nil {
body, _ = io.ReadAll(res.Body)
2021-05-21 05:49:02 +02:00
}
return nil, trace.Stack(time.Since(start), n.Name()), errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
}
func (n *HttpStore) Put(string, stream.Blob) error {
return shared.ErrNotImplemented
}
func (n *HttpStore) PutSD(string, stream.Blob) error {
return shared.ErrNotImplemented
}
func (n *HttpStore) Delete(string) error {
return shared.ErrNotImplemented
}
2021-07-24 00:08:13 +02:00
func (n *HttpStore) Shutdown() {}
2021-05-21 05:49:02 +02:00
// buffer pool to reduce GC
// https://www.captaincodeman.com/2017/06/02/golang-buffer-pool-gotcha
var buffers = sync.Pool{
// New is called when a new instance is needed
New: func() interface{} {
buf := make([]byte, 0, stream.MaxBlobSize)
return bytes.NewBuffer(buf)
},
}
// getBuffer fetches a buffer from the pool
func getBuffer() *bytes.Buffer {
return buffers.Get().(*bytes.Buffer)
}
// putBuffer returns a buffer to the pool
func putBuffer(buf *bytes.Buffer) {
buf.Reset()
buffers.Put(buf)
}
func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}
return dialer.DialContext(ctx, network, address)
}
2021-05-21 05:49:02 +02:00
// getClient gets an http client that's customized to be more performant when dealing with blobs of 2MB in size (most of our blobs)
func getClient() *http.Client {
// Customize the Transport to have larger connection pool
2021-05-21 21:06:59 +02:00
defaultTransport := &http.Transport{
DialContext: dialContext,
2021-05-21 21:06:59 +02:00
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DisableCompression: true,
MaxIdleConnsPerHost: 100,
ReadBufferSize: stream.MaxBlobSize + 1024*10, //add an extra few KBs to make sure it fits the extra information
}
2021-05-21 05:49:02 +02:00
2021-05-21 21:06:59 +02:00
return &http.Client{Transport: defaultTransport}
2021-05-21 05:49:02 +02:00
}