Merge pull request #306 from mrd0ll4r/storage-stats

storage: add prometheus metrics for seeders/leechers
This commit is contained in:
mrd0ll4r 2017-05-29 19:11:07 +02:00 committed by GitHub
commit 634bc6b706
3 changed files with 87 additions and 25 deletions

View file

@ -78,6 +78,10 @@ chihaya:
# higher degree of parallelism. # higher degree of parallelism.
shard_count: 1024 shard_count: 1024
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# This block defines configuration used for middleware executed before a # This block defines configuration used for middleware executed before a
# response has been returned to a BitTorrent client. # response has been returned to a BitTorrent client.
prehooks: prehooks:

View file

@ -19,6 +19,7 @@ import (
func init() { func init() {
prometheus.MustRegister(promGCDurationMilliseconds) prometheus.MustRegister(promGCDurationMilliseconds)
prometheus.MustRegister(promInfohashesCount) prometheus.MustRegister(promInfohashesCount)
prometheus.MustRegister(promSeedersCount, promLeechersCount)
} }
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{ var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
@ -32,25 +33,31 @@ var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{
Help: "The number of infohashes tracked", Help: "The number of infohashes tracked",
}) })
var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_seeders_count",
Help: "The number of seeders tracked",
})
var promLeechersCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_leechers_count",
Help: "The number of leechers tracked",
})
// recordGCDuration records the duration of a GC sweep. // recordGCDuration records the duration of a GC sweep.
func recordGCDuration(duration time.Duration) { func recordGCDuration(duration time.Duration) {
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
} }
// recordInfohashesDelta records a change in the number of Infohashes tracked.
func recordInfohashesDelta(delta float64) {
promInfohashesCount.Add(delta)
}
// ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is // ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is
// less than or equal to zero. // less than or equal to zero.
var ErrInvalidGCInterval = errors.New("invalid garbage collection interval") var ErrInvalidGCInterval = errors.New("invalid garbage collection interval")
// Config holds the configuration of a memory PeerStore. // Config holds the configuration of a memory PeerStore.
type Config struct { type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"` GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"` PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
ShardCount int `yaml:"shard_count"` PeerLifetime time.Duration `yaml:"peer_lifetime"`
ShardCount int `yaml:"shard_count"`
} }
// LogFields renders the current config as a set of Logrus fields. // LogFields renders the current config as a set of Logrus fields.
@ -108,10 +115,6 @@ func New(cfg Config) (storage.PeerStore, error) {
select { select {
case <-ps.closing: case <-ps.closing:
t.Stop() t.Stop()
select {
case <-t.C:
default:
}
return return
case now := <-t.C: case now := <-t.C:
ps.setClock(now.UnixNano()) ps.setClock(now.UnixNano())
@ -119,13 +122,32 @@ func New(cfg Config) (storage.PeerStore, error) {
} }
}() }()
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(cfg.PrometheusReportingInterval)
for {
select {
case <-ps.closing:
t.Stop()
return
case <-t.C:
before := time.Now()
ps.populateProm()
log.Debugf("memory: populateProm() took %s", time.Since(before))
}
}
}()
return ps, nil return ps, nil
} }
type serializedPeer string type serializedPeer string
type peerShard struct { type peerShard struct {
swarms map[bittorrent.InfoHash]swarm swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
numLeechers uint64
sync.RWMutex sync.RWMutex
} }
@ -144,6 +166,24 @@ type peerStore struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
}
promInfohashesCount.Set(float64(numInfohashes))
promSeedersCount.Set(float64(numSeeders))
promLeechersCount.Set(float64(numLeechers))
}
var _ storage.PeerStore = &peerStore{} var _ storage.PeerStore = &peerStore{}
func (ps *peerStore) getClock() int64 { func (ps *peerStore) getClock() int64 {
@ -209,9 +249,12 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
seeders: make(map[serializedPeer]int64), seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64), leechers: make(map[serializedPeer]int64),
} }
recordInfohashesDelta(1)
} }
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
// new peer
shard.numSeeders++
}
shard.swarms[ih].seeders[pk] = ps.getClock() shard.swarms[ih].seeders[pk] = ps.getClock()
shard.Unlock() shard.Unlock()
@ -240,11 +283,14 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
return storage.ErrResourceDoesNotExist return storage.ErrResourceDoesNotExist
} }
delete(shard.swarms[ih].seeders, pk) if _, ok := shard.swarms[ih].seeders[pk]; ok {
// seeder actually removed
shard.numSeeders--
delete(shard.swarms[ih].seeders, pk)
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih) delete(shard.swarms, ih)
recordInfohashesDelta(-1)
} }
shard.Unlock() shard.Unlock()
@ -268,9 +314,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
seeders: make(map[serializedPeer]int64), seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64), leechers: make(map[serializedPeer]int64),
} }
recordInfohashesDelta(1)
} }
if _, ok := shard.swarms[ih].leechers[pk]; !ok {
// new leecher
shard.numLeechers++
}
shard.swarms[ih].leechers[pk] = ps.getClock() shard.swarms[ih].leechers[pk] = ps.getClock()
shard.Unlock() shard.Unlock()
@ -299,11 +348,14 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
return storage.ErrResourceDoesNotExist return storage.ErrResourceDoesNotExist
} }
delete(shard.swarms[ih].leechers, pk) if _, ok := shard.swarms[ih].leechers[pk]; ok {
// leecher actually removed
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih) delete(shard.swarms, ih)
recordInfohashesDelta(-1)
} }
shard.Unlock() shard.Unlock()
@ -327,11 +379,18 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
seeders: make(map[serializedPeer]int64), seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64), leechers: make(map[serializedPeer]int64),
} }
recordInfohashesDelta(1)
} }
delete(shard.swarms[ih].leechers, pk) if _, ok := shard.swarms[ih].leechers[pk]; ok {
// leecher actually removed
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
}
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
// new seeder
shard.numSeeders++
}
shard.swarms[ih].seeders[pk] = ps.getClock() shard.swarms[ih].seeders[pk] = ps.getClock()
shard.Unlock() shard.Unlock()
@ -435,7 +494,6 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
default: default:
} }
var ihDelta float64
cutoffUnix := cutoff.UnixNano() cutoffUnix := cutoff.UnixNano()
start := time.Now() start := time.Now()
for _, shard := range ps.shards { for _, shard := range ps.shards {
@ -459,18 +517,19 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
for pk, mtime := range shard.swarms[ih].leechers { for pk, mtime := range shard.swarms[ih].leechers {
if mtime <= cutoffUnix { if mtime <= cutoffUnix {
delete(shard.swarms[ih].leechers, pk) delete(shard.swarms[ih].leechers, pk)
shard.numLeechers--
} }
} }
for pk, mtime := range shard.swarms[ih].seeders { for pk, mtime := range shard.swarms[ih].seeders {
if mtime <= cutoffUnix { if mtime <= cutoffUnix {
delete(shard.swarms[ih].seeders, pk) delete(shard.swarms[ih].seeders, pk)
shard.numSeeders--
} }
} }
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih) delete(shard.swarms, ih)
ihDelta--
} }
shard.Unlock() shard.Unlock()
@ -481,7 +540,6 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
} }
recordGCDuration(time.Since(start)) recordGCDuration(time.Since(start))
recordInfohashesDelta(ihDelta)
return nil return nil
} }

View file

@ -9,7 +9,7 @@ import (
) )
func createNew() s.PeerStore { func createNew() s.PeerStore {
ps, err := New(Config{ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute}) ps, err := New(Config{ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute})
if err != nil { if err != nil {
panic(err) panic(err)
} }