add prometheus metrics

This commit is contained in:
Niko Storni 2020-05-19 23:13:01 +02:00
parent 414ed1c130
commit 7907ee3579
10 changed files with 236 additions and 7 deletions

1
go.mod
View file

@ -26,6 +26,7 @@ require (
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect
github.com/prometheus/client_golang v0.9.2
github.com/shopspring/decimal v0.0.0-20191009025716-f1972eb1d1f5
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5

6
go.sum
View file

@ -29,6 +29,7 @@ github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:l
github.com/aws/aws-sdk-go v1.16.11/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.25.9 h1:WtVzerf5wSgPwlTTwl+ktCq/0GCS5MI9ZlLIcjsTr+Q=
github.com/aws/aws-sdk-go v1.25.9/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
@ -208,6 +209,7 @@ github.com/lyoshenka/bencode v0.0.0-20180323155644-b7abd7672df5/go.mod h1:H0aPCW
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.22 h1:Jm64b3bO9kP43ddLjL2EY3Io6bmy1qGb9Xxz6TqS6rc=
@ -248,9 +250,13 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8=
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=

View file

@ -3,6 +3,7 @@ package main
import (
"fmt"
"math/rand"
"net/http"
"os"
"time"
@ -11,6 +12,7 @@ import (
"github.com/lbryio/ytsync/manager"
"github.com/lbryio/ytsync/sdk"
ytUtils "github.com/lbryio/ytsync/util"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -38,7 +40,10 @@ var (
func main() {
rand.Seed(time.Now().UnixNano())
log.SetLevel(log.DebugLevel)
http.Handle("/metrics", promhttp.Handler())
go func() {
log.Error(http.ListenAndServe(":2112", nil))
}()
cmd := &cobra.Command{
Use: "ytsync",
Short: "Publish youtube channels into LBRY network automatically.",

View file

@ -10,6 +10,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/timing"
logUtils "github.com/lbryio/ytsync/util"
"github.com/lbryio/ytsync/tags_manager"
@ -49,6 +50,10 @@ func (s *Sync) enableAddressReuse() error {
return nil
}
func (s *Sync) walletSetup() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("walletSetup").Add(time.Since(start))
}(start)
//prevent unnecessary concurrent execution and publishing while refilling/reallocating UTXOs
s.walletMux.Lock()
defer s.walletMux.Unlock()
@ -151,6 +156,10 @@ func (s *Sync) walletSetup() error {
}
func (s *Sync) getDefaultAccount() (string, error) {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("getDefaultAccount").Add(time.Since(start))
}(start)
if s.defaultAccountID == "" {
accountsResponse, err := s.daemon.AccountList(1, 50)
if err != nil {
@ -177,6 +186,10 @@ func (s *Sync) getDefaultAccount() (string, error) {
}
func (s *Sync) ensureEnoughUTXOs() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("ensureEnoughUTXOs").Add(time.Since(start))
}(start)
defaultAccount, err := s.getDefaultAccount()
if err != nil {
return err
@ -253,6 +266,10 @@ func (s *Sync) ensureEnoughUTXOs() error {
}
func (s *Sync) waitForNewBlock() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("waitForNewBlock").Add(time.Since(start))
}(start)
log.Printf("regtest: %t, docker: %t", logUtils.IsRegTest(), logUtils.IsUsingDocker())
status, err := s.daemon.Status()
if err != nil {
@ -302,6 +319,10 @@ func (s *Sync) GenerateRegtestBlock() error {
}
func (s *Sync) ensureChannelOwnership() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("ensureChannelOwnership").Add(time.Since(start))
}(start)
if s.LbryChannelName == "" {
return errors.Err("no channel name set")
}
@ -451,6 +472,10 @@ func (s *Sync) ensureChannelOwnership() error {
}
func (s *Sync) addCredits(amountToAdd float64) error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("addCredits").Add(time.Since(start))
}(start)
log.Printf("Adding %f credits", amountToAdd)
lbrycrdd, err := logUtils.GetLbrycrdClient(s.LbrycrdString)
if err != nil {

View file

@ -10,13 +10,17 @@ import (
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/timing"
log "github.com/sirupsen/logrus"
)
func waitConfirmations(s *Sync) error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("waitConfirmations").Add(time.Since(start))
}(start)
defaultAccount, err := s.getDefaultAccount()
if err != nil {
return err
@ -54,6 +58,10 @@ type abandonResponse struct {
}
func abandonSupports(s *Sync) (float64, error) {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("abandonSupports").Add(time.Since(start))
}(start)
totalPages := uint64(1)
var allSupports []jsonrpc.Claim
defaultAccount, err := s.getDefaultAccount()
@ -185,6 +193,10 @@ type updateInfo struct {
}
func transferVideos(s *Sync) error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("transferVideos").Add(time.Since(start))
}(start)
cleanTransfer := true
streamChan := make(chan updateInfo, s.ConcurrentVideos)
@ -273,7 +285,9 @@ func transferVideos(s *Sync) error {
}
func (s *Sync) streamUpdate(ui *updateInfo) error {
start := time.Now()
result, updateError := s.daemon.StreamUpdate(ui.ClaimID, *ui.streamUpdateOptions)
timing.TimedComponent("transferStreamUpdate").Add(time.Since(start))
if updateError != nil {
ui.videoStatus.FailureReason = updateError.Error()
ui.videoStatus.Status = VideoStatusTranferFailed
@ -290,6 +304,10 @@ func (s *Sync) streamUpdate(ui *updateInfo) error {
}
func transferChannel(s *Sync) error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("transferChannel").Add(time.Since(start))
}(start)
account, err := s.getDefaultAccount()
if err != nil {
return err

View file

@ -19,6 +19,7 @@ import (
"github.com/lbryio/ytsync/sdk"
"github.com/lbryio/ytsync/sources"
"github.com/lbryio/ytsync/thumbs"
"github.com/lbryio/ytsync/timing"
logUtils "github.com/lbryio/ytsync/util"
"github.com/lbryio/lbry.go/v2/extras/errors"
@ -258,7 +259,7 @@ func (s *Sync) FullCycle() (e error) {
}
s.setExceptions()
defer timing.ClearTimings()
s.syncedVideosMux = &sync.RWMutex{}
s.walletMux = &sync.RWMutex{}
s.grp = stopGroup
@ -277,7 +278,6 @@ func (s *Sync) FullCycle() (e error) {
}
defer s.setChannelTerminationStatus(&e)
err = s.downloadWallet()
if err != nil && err.Error() != "wallet not on S3" {
return errors.Prefix("failure in downloading wallet", err)
@ -320,10 +320,10 @@ func (s *Sync) FullCycle() (e error) {
}
if s.shouldTransfer() {
return s.processTransfers()
err = s.processTransfers()
}
return nil
timing.Report()
return err
}
func (s *Sync) processTransfers() (e error) {
@ -428,6 +428,9 @@ func (s *Sync) setChannelTerminationStatus(e *error) {
func (s *Sync) waitForDaemonStart() error {
beginTime := time.Now()
defer func(start time.Time) {
timing.TimedComponent("waitForDaemonStart").Add(time.Since(start))
}(beginTime)
for {
select {
case <-s.grp.Ch():
@ -505,6 +508,10 @@ func isYtsyncClaim(c jsonrpc.Claim, expectedChannelID string) bool {
// fixDupes abandons duplicate claims
func (s *Sync) fixDupes(claims []jsonrpc.Claim) (bool, error) {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("fixDupes").Add(time.Since(start))
}(start)
abandonedClaims := false
videoIDs := make(map[string]jsonrpc.Claim)
for _, c := range claims {
@ -714,6 +721,10 @@ func (s *Sync) getClaims(defaultOnly bool) ([]jsonrpc.Claim, error) {
}
func (s *Sync) checkIntegrity() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("checkIntegrity").Add(time.Since(start))
}(start)
allClaims, err := s.getClaims(false)
if err != nil {
return err
@ -984,6 +995,10 @@ func (s *Sync) startWorker(workerNum int) {
var mostRecentlyFailedChannel string
func (s *Sync) enqueueYoutubeVideos() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("enqueueYoutubeVideos").Add(time.Since(start))
}(start)
client := &http.Client{
Transport: &transport.APIKey{Key: s.APIConfig.YoutubeAPIKey},
}

25
metrics/metrics.go Normal file
View file

@ -0,0 +1,25 @@
package metrics
import (
"os"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
var (
Durations = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "ytsync",
Subsystem: getHostname(),
Name: "duration",
Help: "The durations of the individual modules",
}, []string{"path"})
)
func getHostname() string {
hostname, err := os.Hostname()
if err != nil {
hostname = "ytsync-unknown"
}
return hostname
}

View file

@ -17,6 +17,7 @@ import (
"github.com/lbryio/lbry.go/v2/extras/jsonrpc"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/lbry.go/v2/extras/util"
"github.com/lbryio/ytsync/timing"
logUtils "github.com/lbryio/ytsync/util"
"github.com/lbryio/ytsync/ip_manager"
@ -180,6 +181,10 @@ func (v *YoutubeVideo) getAbbrevDescription() string {
}
func (v *YoutubeVideo) download() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("download").Add(time.Since(start))
}(start)
if v.youtubeInfo.Snippet.LiveBroadcastContent != "none" {
return errors.Err("video is a live stream and hasn't completed yet")
}
@ -371,6 +376,10 @@ func (v *YoutubeVideo) triggerThumbnailSave() (err error) {
}
func (v *YoutubeVideo) publish(daemon *jsonrpc.Client, params SyncParams) (*SyncSummary, error) {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("publish").Add(time.Since(start))
}(start)
languages, locations, tags := v.getMetadata()
var fee *jsonrpc.Fee
if params.Fee != nil {
@ -580,10 +589,12 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
v.walletLock.RLock()
defer v.walletLock.RUnlock()
if v.mocked {
start := time.Now()
pr, err := daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
StreamCreateOptions: streamCreateOptions,
FileSize: &videoSize,
})
timing.TimedComponent("StreamUpdate").Add(time.Since(start))
if err != nil {
return nil, err
}
@ -603,6 +614,7 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
streamCreateOptions.ClaimCreateOptions.Description = util.PtrToString(v.getAbbrevDescription())
streamCreateOptions.Duration = util.PtrToUint64(uint64(math.Ceil(videoDuration.ToDuration().Seconds())))
streamCreateOptions.ReleaseTime = util.PtrToInt64(v.publishedAt.Unix())
start := time.Now()
pr, err := daemon.StreamUpdate(existingVideoData.ClaimID, jsonrpc.StreamUpdateOptions{
ClearLanguages: util.PtrToBool(true),
ClearLocations: util.PtrToBool(true),
@ -610,6 +622,7 @@ func (v *YoutubeVideo) reprocess(daemon *jsonrpc.Client, params SyncParams, exis
StreamCreateOptions: streamCreateOptions,
FileSize: &videoSize,
})
timing.TimedComponent("StreamUpdate").Add(time.Since(start))
if err != nil {
return nil, err
}

111
timing/timing.go Normal file
View file

@ -0,0 +1,111 @@
package timing
import (
"sync"
"sync/atomic"
"time"
"github.com/lbryio/ytsync/metrics"
"github.com/sirupsen/logrus"
)
type Timing struct {
component string
milliseconds int64
min int64
max int64
invocations int32
}
var timings *sync.Map
func TimedComponent(component string) *Timing {
if timings == nil {
timings = &sync.Map{}
}
stored, _ := timings.LoadOrStore(component, &Timing{
component: component,
milliseconds: 0,
min: int64(99999999),
})
t, _ := stored.(*Timing)
return t
}
func ClearTimings() {
timings.Range(func(key interface{}, value interface{}) bool {
timings.Delete(key)
return true
})
}
func Report() {
var totalTime time.Duration
timings.Range(func(key interface{}, value interface{}) bool {
totalTime += value.(*Timing).Get()
return true
})
timings.Range(func(key interface{}, value interface{}) bool {
component := key
componentRuntime := value.(*Timing).Get().String()
percentTime := float64(value.(*Timing).Get()) / float64(totalTime) * 100
invocations := value.(*Timing).Invocations()
avgTime := (time.Duration(int64(float64(value.(*Timing).Get()) / float64(value.(*Timing).Invocations())))).String()
minRuntime := value.(*Timing).Min().String()
maxRuntime := value.(*Timing).Max().String()
logrus.Printf("component %s ran for %s (%.2f%% of the total time) - invoked %d times with an average of %s per call, a minimum of %s and a maximum of %s",
component,
componentRuntime,
percentTime,
invocations,
avgTime,
minRuntime,
maxRuntime,
)
return true
})
}
func (t *Timing) Add(d time.Duration) {
metrics.Durations.WithLabelValues(t.component).Observe(d.Seconds())
atomic.AddInt64(&t.milliseconds, d.Milliseconds())
for {
oldMin := atomic.LoadInt64(&t.min)
if d.Milliseconds() < oldMin {
if atomic.CompareAndSwapInt64(&t.min, oldMin, d.Milliseconds()) {
break
}
} else {
break
}
}
for {
oldMax := atomic.LoadInt64(&t.max)
if d.Milliseconds() > oldMax {
if atomic.CompareAndSwapInt64(&t.max, oldMax, d.Milliseconds()) {
break
}
} else {
break
}
}
atomic.AddInt32(&t.invocations, 1)
}
func (t *Timing) Get() time.Duration {
ms := atomic.LoadInt64(&t.milliseconds)
return time.Duration(ms) * time.Millisecond
}
func (t *Timing) Invocations() int32 {
return atomic.LoadInt32(&t.invocations)
}
func (t *Timing) Min() time.Duration {
ms := atomic.LoadInt64(&t.min)
return time.Duration(ms) * time.Millisecond
}
func (t *Timing) Max() time.Duration {
ms := atomic.LoadInt64(&t.max)
return time.Duration(ms) * time.Millisecond
}

View file

@ -7,9 +7,11 @@ import (
"os/user"
"path/filepath"
"strconv"
"time"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/lbrycrd"
"github.com/lbryio/ytsync/timing"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
@ -270,6 +272,10 @@ func CleanupLbrynet() error {
}
func StartDaemon() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("startDaemon").Add(time.Since(start))
}(start)
if IsUsingDocker() {
return startDaemonViaDocker()
}
@ -277,6 +283,10 @@ func StartDaemon() error {
}
func StopDaemon() error {
start := time.Now()
defer func(start time.Time) {
timing.TimedComponent("stopDaemon").Add(time.Since(start))
}(start)
if IsUsingDocker() {
return stopDaemonViaDocker()
}