This commit is contained in:
Ivan 2021-09-08 15:44:21 +03:00
parent b2272fef3a
commit 43c78ed7c6
20 changed files with 121 additions and 98 deletions

67
.golangci.yml Normal file
View file

@ -0,0 +1,67 @@
# options for analysis running
run:
# default concurrency is a available CPU number
concurrency: 4
# timeout for analysis, e.g. 30s, 5m, default is 1m
timeout: 5m
# exit code when at least one issue was found, default is 1
issues-exit-code: 1
# include test files or not, default is true
tests: true
# which dirs to skip: issues from them won't be reported;
# can use regexp here: generated.*, regexp is applied on full path;
# default value is empty list, but default dirs are skipped independently
# from this option's value (see skip-dirs-use-default).
# "/" will be replaced by current OS file path separator to properly work
# on Windows.
skip-dirs:
- app/model
- app/migration
# default is true. Enables skipping of directories:
# vendor$, third_party$, testdata$, examples$, Godeps$, builtin$
skip-dirs-use-default: true
# which files to skip: they will be analyzed, but issues from them
# won't be reported. Default value is empty list, but there is
# no need to include all autogenerated files, we confidently recognize
# autogenerated files. If it's not please let us know.
# "/" will be replaced by current OS file path separator to properly work
# on Windows.
skip-files:
- ".*\\.my\\.go$"
# Allow multiple parallel golangci-lint instances running.
# If false (default) - golangci-lint acquires file lock on start.
allow-parallel-runners: false
linters:
enable:
- govet
- unused
- govet
- errcheck
- unconvert
- gocyclo
- goimports
- varcheck
- golint
- nilerr
linters-settings:
gocyclo:
# minimal code complexity to report, 30 by default (but we recommend 10-20)
min-complexity: 35
revive:
# see https://github.com/mgechev/revive#available-rules for details.
ignore-generated-header: true
severity: warning
rules:
- name: indent-error-flow
severity: warning

View file

@ -167,7 +167,7 @@ func initUpstreamStore() store.BlobStore {
Timeout: 30 * time.Second, Timeout: 30 * time.Second,
}) })
case "http": case "http":
s = store.NewHttpStore(upstreamReflector) s = store.NewHTTPStore(upstreamReflector)
default: default:
log.Fatalf("protocol is not recognized: %s", upstreamProtocol) log.Fatalf("protocol is not recognized: %s", upstreamProtocol)
} }

View file

@ -659,43 +659,6 @@ func (s *SQL) GetStoredHashesInRange(ctx context.Context, start, end bits.Bitmap
return return
} }
// txFunc is a function that can be wrapped in a transaction
type txFunc func(tx *sql.Tx) error
// withTx wraps a function in an sql transaction. the transaction is committed if there's no error, or rolled back if there is one.
// if dbOrTx is an sql.DB, a new transaction is started
func withTx(dbOrTx interface{}, f txFunc) (err error) {
var tx *sql.Tx
switch t := dbOrTx.(type) {
case *sql.Tx:
tx = t
case *sql.DB:
tx, err = t.Begin()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
if rollBackError := tx.Rollback(); rollBackError != nil {
log.Error("failed to rollback tx on panic - ", rollBackError)
}
panic(p)
} else if err != nil {
if rollBackError := tx.Rollback(); rollBackError != nil {
log.Error("failed to rollback tx on panic - ", rollBackError)
}
} else {
err = errors.Err(tx.Commit())
}
}()
default:
return errors.Err("db or tx required")
}
return f(tx)
}
func closeRows(rows *sql.Rows) { func closeRows(rows *sql.Rows) {
if rows != nil { if rows != nil {
err := rows.Close() err := rows.Close()

View file

@ -113,12 +113,12 @@ var (
Name: "peer_download_total", Name: "peer_download_total",
Help: "Total number of blobs downloaded from reflector through tcp protocol", Help: "Total number of blobs downloaded from reflector through tcp protocol",
}) })
Http3DownloadCount = promauto.NewCounter(prometheus.CounterOpts{ HTTP3DownloadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "http3_blob_download_total", Name: "http3_blob_download_total",
Help: "Total number of blobs downloaded from reflector through QUIC protocol", Help: "Total number of blobs downloaded from reflector through QUIC protocol",
}) })
HttpDownloadCount = promauto.NewCounter(prometheus.CounterOpts{ HTTPDownloadCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "http_blob_download_total", Name: "http_blob_download_total",
Help: "Total number of blobs downloaded from reflector through HTTP protocol", Help: "Total number of blobs downloaded from reflector through HTTP protocol",
@ -154,7 +154,7 @@ var (
Name: "origin_requests_total", Name: "origin_requests_total",
Help: "How many Get requests are in flight from the cache to the origin", Help: "How many Get requests are in flight from the cache to the origin",
}, []string{LabelCacheType, LabelComponent}) }, []string{LabelCacheType, LabelComponent})
//during thundering-herd situations, the metric below should be a lot smaller than the metric above //nolint //during thundering-herd situations, the metric below should be a lot smaller than the metric above
CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ CacheWaitingRequestsCount = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Subsystem: subsystemCache, Subsystem: subsystemCache,
@ -184,32 +184,32 @@ var (
Help: "Total number of SD blobs (and therefore streams) uploaded to reflector", Help: "Total number of SD blobs (and therefore streams) uploaded to reflector",
}) })
MtrInBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ MtrInBytesTCP = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "tcp_in_bytes", Name: "tcp_in_bytes",
Help: "Total number of bytes downloaded through TCP", Help: "Total number of bytes downloaded through TCP",
}) })
MtrOutBytesTcp = promauto.NewCounter(prometheus.CounterOpts{ MtrOutBytesTCP = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "tcp_out_bytes", Name: "tcp_out_bytes",
Help: "Total number of bytes streamed out through TCP", Help: "Total number of bytes streamed out through TCP",
}) })
MtrInBytesUdp = promauto.NewCounter(prometheus.CounterOpts{ MtrInBytesUDP = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "udp_in_bytes", Name: "udp_in_bytes",
Help: "Total number of bytes downloaded through UDP", Help: "Total number of bytes downloaded through UDP",
}) })
MtrInBytesHttp = promauto.NewCounter(prometheus.CounterOpts{ MtrInBytesHTTP = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "http_in_bytes", Name: "http_in_bytes",
Help: "Total number of bytes downloaded through HTTP", Help: "Total number of bytes downloaded through HTTP",
}) })
MtrOutBytesUdp = promauto.NewCounter(prometheus.CounterOpts{ MtrOutBytesUDP = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "udp_out_bytes", Name: "udp_out_bytes",
Help: "Total number of bytes streamed out through UDP", Help: "Total number of bytes streamed out through UDP",
}) })
MtrOutBytesHttp = promauto.NewCounter(prometheus.CounterOpts{ MtrOutBytesHTTP = promauto.NewCounter(prometheus.CounterOpts{
Namespace: ns, Namespace: ns,
Name: "http_out_bytes", Name: "http_out_bytes",
Help: "Total number of bytes streamed out through UDP", Help: "Total number of bytes streamed out through UDP",
@ -229,12 +229,12 @@ var (
Name: "s3_in_bytes", Name: "s3_in_bytes",
Help: "Total number of incoming bytes (from S3-CF)", Help: "Total number of incoming bytes (from S3-CF)",
}) })
Http3BlobReqQueue = promauto.NewGauge(prometheus.GaugeOpts{ HTTP3BlobReqQueue = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "http3_blob_request_queue_size", Name: "http3_blob_request_queue_size",
Help: "Blob requests of https queue size", Help: "Blob requests of https queue size",
}) })
HttpBlobReqQueue = promauto.NewGauge(prometheus.GaugeOpts{ HTTPBlobReqQueue = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: ns, Namespace: ns,
Name: "http_blob_request_queue_size", Name: "http_blob_request_queue_size",
Help: "Blob requests queue size of the HTTP protocol", Help: "Blob requests queue size of the HTTP protocol",

View file

@ -1,4 +1,4 @@
package lite_db package lite_db //nolint
import ( import (
"database/sql" "database/sql"
@ -111,7 +111,10 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
// HasBlobs checks if the database contains the set of blobs and returns a bool map. // HasBlobs checks if the database contains the set of blobs and returns a bool map.
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
exists, streamsNeedingTouch, err := s.hasBlobs(hashes) exists, streamsNeedingTouch, err := s.hasBlobs(hashes)
s.touch(streamsNeedingTouch) if err != nil {
return nil, err
}
err = s.touch(streamsNeedingTouch)
return exists, err return exists, err
} }

View file

@ -67,9 +67,9 @@ func (s *Server) HandleGetBlob(c *gin.Context) {
c.String(http.StatusInternalServerError, err.Error()) c.String(http.StatusInternalServerError, err.Error())
return return
} }
metrics.MtrOutBytesHttp.Add(float64(len(blob))) metrics.MtrOutBytesHTTP.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc() metrics.BlobDownloadCount.Inc()
metrics.HttpDownloadCount.Inc() metrics.HTTPDownloadCount.Inc()
c.Header("Via", serialized) c.Header("Via", serialized)
c.Header("Content-Disposition", "filename="+hash) c.Header("Content-Disposition", "filename="+hash)
c.Data(http.StatusOK, "application/octet-stream", blob) c.Data(http.StatusOK, "application/octet-stream", blob)

View file

@ -28,7 +28,7 @@ func InitWorkers(server *Server, workers int) {
case <-stopper.Ch(): case <-stopper.Ch():
case r := <-getReqCh: case r := <-getReqCh:
process(server, r) process(server, r)
metrics.HttpBlobReqQueue.Dec() metrics.HTTPBlobReqQueue.Dec()
} }
} }
}(i) }(i)
@ -36,7 +36,7 @@ func InitWorkers(server *Server, workers int) {
} }
func enqueue(b *blobRequest) { func enqueue(b *blobRequest) {
metrics.HttpBlobReqQueue.Inc() metrics.HTTPBlobReqQueue.Inc()
getReqCh <- b getReqCh <- b
} }

View file

@ -114,7 +114,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
blob := make([]byte, written) blob := make([]byte, written)
copy(blob, tmp.Bytes()) copy(blob, tmp.Bytes())
metrics.MtrInBytesUdp.Add(float64(len(blob))) metrics.MtrInBytesUDP.Add(float64(len(blob)))
return blob, trace.Stack(time.Since(start), "http3"), nil return blob, trace.Stack(time.Since(start), "http3"), nil
} }

View file

@ -208,7 +208,7 @@ func (s *Server) HandleGetBlob(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
s.logError(err) s.logError(err)
} }
metrics.MtrOutBytesUdp.Add(float64(len(blob))) metrics.MtrOutBytesUDP.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc() metrics.BlobDownloadCount.Inc()
metrics.Http3DownloadCount.Inc() metrics.HTTP3DownloadCount.Inc()
} }

View file

@ -27,7 +27,7 @@ func InitWorkers(server *Server, workers int) {
select { select {
case <-stopper.Ch(): case <-stopper.Ch():
case r := <-getReqCh: case r := <-getReqCh:
metrics.Http3BlobReqQueue.Dec() metrics.HTTP3BlobReqQueue.Dec()
process(server, r) process(server, r)
} }
} }
@ -36,7 +36,7 @@ func InitWorkers(server *Server, workers int) {
} }
func enqueue(b *blobRequest) { func enqueue(b *blobRequest) {
metrics.Http3BlobReqQueue.Inc() metrics.HTTP3BlobReqQueue.Inc()
getReqCh <- b getReqCh <- b
} }

View file

@ -158,7 +158,7 @@ func (c *Client) GetBlob(hash string) (stream.Blob, shared.BlobTrace, error) {
if err != nil { if err != nil {
return nil, (*resp.RequestTrace).Stack(time.Since(start), "tcp"), err return nil, (*resp.RequestTrace).Stack(time.Since(start), "tcp"), err
} }
metrics.MtrInBytesTcp.Add(float64(len(blob))) metrics.MtrInBytesTCP.Add(float64(len(blob)))
return blob, trace.Stack(time.Since(start), "tcp"), nil return blob, trace.Stack(time.Since(start), "tcp"), nil
} }

View file

@ -277,7 +277,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
BlobHash: reflector.BlobHash(blob), BlobHash: reflector.BlobHash(blob),
Length: len(blob), Length: len(blob),
} }
metrics.MtrOutBytesTcp.Add(float64(len(blob))) metrics.MtrOutBytesTCP.Add(float64(len(blob)))
metrics.BlobDownloadCount.Inc() metrics.BlobDownloadCount.Inc()
metrics.PeerDownloadCount.Inc() metrics.PeerDownloadCount.Inc()
} }
@ -368,14 +368,6 @@ type availabilityResponse struct {
AvailableBlobs []string `json:"available_blobs"` AvailableBlobs []string `json:"available_blobs"`
} }
type paymentRateRequest struct {
BlobDataPaymentRate float64 `json:"blob_data_payment_rate"`
}
type paymentRateResponse struct {
BlobDataPaymentRate string `json:"blob_data_payment_rate"`
}
type blobRequest struct { type blobRequest struct {
RequestedBlob string `json:"requested_blob"` RequestedBlob string `json:"requested_blob"`
} }

View file

@ -174,5 +174,4 @@ func (s *SlowBlobStore) Delete(hash string) error {
} }
func (s *SlowBlobStore) Shutdown() { func (s *SlowBlobStore) Shutdown() {
return
} }

View file

@ -59,7 +59,7 @@ func NewGcacheStore(component string, store BlobStore, maxSize int, strategy Evi
} }
go func() { go func() {
if lstr, ok := store.(lister); ok { if lstr, ok := store.(lister); ok {
err := l.loadExisting(lstr, int(maxSize)) err := l.loadExisting(lstr, maxSize)
if err != nil { if err != nil {
panic(err) // TODO: what should happen here? panic? return nil? just keep going? panic(err) // TODO: what should happen here? panic? return nil? just keep going?
} }

View file

@ -16,23 +16,23 @@ import (
"github.com/lbryio/lbry.go/v2/stream" "github.com/lbryio/lbry.go/v2/stream"
) )
// HttpStore is a store that works on top of the HTTP protocol // HTTPStore is a store that works on top of the HTTP protocol
type HttpStore struct { type HTTPStore struct {
upstream string upstream string
httpClient *http.Client httpClient *http.Client
} }
func NewHttpStore(upstream string) *HttpStore { func NewHTTPStore(upstream string) *HTTPStore {
return &HttpStore{ return &HTTPStore{
upstream: "http://" + upstream, upstream: "http://" + upstream,
httpClient: getClient(), httpClient: getClient(),
} }
} }
const nameHttp = "http" const nameHTTP = "http"
func (n *HttpStore) Name() string { return nameHttp } func (n *HTTPStore) Name() string { return nameHTTP }
func (n *HttpStore) Has(hash string) (bool, error) { func (n *HTTPStore) Has(hash string) (bool, error) {
url := n.upstream + "/blob?hash=" + hash url := n.upstream + "/blob?hash=" + hash
req, err := http.NewRequest("HEAD", url, nil) req, err := http.NewRequest("HEAD", url, nil)
@ -58,7 +58,7 @@ func (n *HttpStore) Has(hash string) (bool, error) {
return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body)) return false, errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
} }
func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { func (n *HTTPStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now() start := time.Now()
url := n.upstream + "/blob?hash=" + hash url := n.upstream + "/blob?hash=" + hash
@ -95,7 +95,7 @@ func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
blob := make([]byte, written) blob := make([]byte, written)
copy(blob, tmp.Bytes()) copy(blob, tmp.Bytes())
metrics.MtrInBytesHttp.Add(float64(len(blob))) metrics.MtrInBytesHTTP.Add(float64(len(blob)))
return blob, trace.Stack(time.Since(start), n.Name()), nil return blob, trace.Stack(time.Since(start), n.Name()), nil
} }
var body []byte var body []byte
@ -106,16 +106,16 @@ func (n *HttpStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
return nil, trace.Stack(time.Since(start), n.Name()), errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body)) return nil, trace.Stack(time.Since(start), n.Name()), errors.Err("upstream error. Status code: %d (%s)", res.StatusCode, string(body))
} }
func (n *HttpStore) Put(string, stream.Blob) error { func (n *HTTPStore) Put(string, stream.Blob) error {
return shared.ErrNotImplemented return shared.ErrNotImplemented
} }
func (n *HttpStore) PutSD(string, stream.Blob) error { func (n *HTTPStore) PutSD(string, stream.Blob) error {
return shared.ErrNotImplemented return shared.ErrNotImplemented
} }
func (n *HttpStore) Delete(string) error { func (n *HTTPStore) Delete(string) error {
return shared.ErrNotImplemented return shared.ErrNotImplemented
} }
func (n *HttpStore) Shutdown() {} func (n *HTTPStore) Shutdown() {}
// buffer pool to reduce GC // buffer pool to reduce GC
// https://www.captaincodeman.com/2017/06/02/golang-buffer-pool-gotcha // https://www.captaincodeman.com/2017/06/02/golang-buffer-pool-gotcha

View file

@ -37,6 +37,7 @@ func (c *ITTTStore) Has(hash string) (bool, error) {
return has, err return has, err
} }
// TODO: refactor error check, why return error? should we check if `err == nil` ?
// Get tries to get the blob from this first, falling back to that. // Get tries to get the blob from this first, falling back to that.
func (c *ITTTStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) { func (c *ITTTStore) Get(hash string) (stream.Blob, shared.BlobTrace, error) {
start := time.Now() start := time.Now()

View file

@ -21,4 +21,4 @@ func (n *NoopStore) Get(_ string) (stream.Blob, shared.BlobTrace, error) {
func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil } func (n *NoopStore) Put(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil } func (n *NoopStore) PutSD(_ string, _ stream.Blob) error { return nil }
func (n *NoopStore) Delete(_ string) error { return nil } func (n *NoopStore) Delete(_ string) error { return nil }
func (n *NoopStore) Shutdown() { return } func (n *NoopStore) Shutdown() {}

View file

@ -166,5 +166,4 @@ func (s *S3Store) initOnce() error {
// Shutdown shuts down the store gracefully // Shutdown shuts down the store gracefully
func (s *S3Store) Shutdown() { func (s *S3Store) Shutdown() {
return
} }

View file

@ -124,5 +124,4 @@ func (s *singleflightStore) putter(hash string, blob stream.Blob) func() (interf
// Shutdown shuts down the store gracefully // Shutdown shuts down the store gracefully
func (s *singleflightStore) Shutdown() { func (s *singleflightStore) Shutdown() {
s.BlobStore.Shutdown() s.BlobStore.Shutdown()
return
} }

View file

@ -36,7 +36,7 @@ type response struct {
type Node struct { type Node struct {
transport *TCPTransport transport *TCPTransport
nextId atomic.Uint32 nextID atomic.Uint32
grp *stop.Group grp *stop.Group
handlersMu *sync.RWMutex handlersMu *sync.RWMutex
@ -155,7 +155,7 @@ func (n *Node) listen() {
return return
case bytes := <-n.transport.Responses(): case bytes := <-n.transport.Responses():
msg := &struct { msg := &struct {
Id uint32 `json:"id"` ID uint32 `json:"id"`
Method string `json:"method"` Method string `json:"method"`
Error struct { Error struct {
Code int `json:"code"` Code int `json:"code"`
@ -163,7 +163,7 @@ func (n *Node) listen() {
} `json:"error"` } `json:"error"`
}{} }{}
msg2 := &struct { msg2 := &struct {
Id uint32 `json:"id"` ID uint32 `json:"id"`
Method string `json:"method"` Method string `json:"method"`
Error struct { Error struct {
Code int `json:"code"` Code int `json:"code"`
@ -181,7 +181,7 @@ func (n *Node) listen() {
// maybe that happens because the wallet server passes a lbrycrd error through to us? // maybe that happens because the wallet server passes a lbrycrd error through to us?
if err2 := json.Unmarshal(bytes, msg2); err2 == nil { if err2 := json.Unmarshal(bytes, msg2); err2 == nil {
err = nil err = nil
msg.Id = msg2.Id msg.ID = msg2.ID
msg.Method = msg2.Method msg.Method = msg2.Method
msg.Error = msg2.Error.Message msg.Error = msg2.Error.Message
} }
@ -210,7 +210,7 @@ func (n *Node) listen() {
} }
n.handlersMu.RLock() n.handlersMu.RLock()
c, ok := n.handlers[msg.Id] c, ok := n.handlers[msg.ID]
n.handlersMu.RUnlock() n.handlersMu.RUnlock()
if ok { if ok {
c <- r c <- r
@ -231,15 +231,15 @@ func (n *Node) listen() {
// request makes a request to the server and unmarshals the response into v. // request makes a request to the server and unmarshals the response into v.
func (n *Node) request(method string, params []string, v interface{}) error { func (n *Node) request(method string, params []string, v interface{}) error {
msg := struct { msg := struct {
Id uint32 `json:"id"` ID uint32 `json:"id"`
Method string `json:"method"` Method string `json:"method"`
Params []string `json:"params"` Params []string `json:"params"`
}{ }{
Id: n.nextId.Load(), ID: n.nextID.Load(),
Method: method, Method: method,
Params: params, Params: params,
} }
n.nextId.Inc() n.nextID.Inc()
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
@ -250,7 +250,7 @@ func (n *Node) request(method string, params []string, v interface{}) error {
c := make(chan response, 1) c := make(chan response, 1)
n.handlersMu.Lock() n.handlersMu.Lock()
n.handlers[msg.Id] = c n.handlers[msg.ID] = c
n.handlersMu.Unlock() n.handlersMu.Unlock()
err = n.transport.Send(bytes) err = n.transport.Send(bytes)
@ -268,7 +268,7 @@ func (n *Node) request(method string, params []string, v interface{}) error {
} }
n.handlersMu.Lock() n.handlersMu.Lock()
delete(n.handlers, msg.Id) delete(n.handlers, msg.ID)
n.handlersMu.Unlock() n.handlersMu.Unlock()
if r.err != nil { if r.err != nil {