memory: add metrics for peers

This commit is contained in:
Leo Balduf 2017-05-12 10:30:20 +02:00 committed by Leo Balduf
parent 1cc0738cbe
commit a70d6dc036
3 changed files with 87 additions and 25 deletions

View file

@ -78,6 +78,10 @@ chihaya:
# higher degree of parallelism.
shard_count: 1024
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.
prometheus_reporting_interval: 1s
# This block defines configuration used for middleware executed before a
# response has been returned to a BitTorrent client.
prehooks:

View file

@ -19,6 +19,7 @@ import (
func init() {
prometheus.MustRegister(promGCDurationMilliseconds)
prometheus.MustRegister(promInfohashesCount)
prometheus.MustRegister(promSeedersCount, promLeechersCount)
}
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
@ -32,25 +33,31 @@ var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{
Help: "The number of infohashes tracked",
})
var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_seeders_count",
Help: "The number of seeders tracked",
})
var promLeechersCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_leechers_count",
Help: "The number of leechers tracked",
})
// recordGCDuration records the duration of a GC sweep.
func recordGCDuration(duration time.Duration) {
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
// recordInfohashesDelta records a change in the number of Infohashes tracked.
func recordInfohashesDelta(delta float64) {
promInfohashesCount.Add(delta)
}
// ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is
// less than or equal to zero.
var ErrInvalidGCInterval = errors.New("invalid garbage collection interval")
// Config holds the configuration of a memory PeerStore.
type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"`
ShardCount int `yaml:"shard_count"`
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"`
ShardCount int `yaml:"shard_count"`
}
// LogFields renders the current config as a set of Logrus fields.
@ -108,10 +115,6 @@ func New(cfg Config) (storage.PeerStore, error) {
select {
case <-ps.closing:
t.Stop()
select {
case <-t.C:
default:
}
return
case now := <-t.C:
ps.setClock(now.UnixNano())
@ -119,13 +122,32 @@ func New(cfg Config) (storage.PeerStore, error) {
}
}()
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(cfg.PrometheusReportingInterval)
for {
select {
case <-ps.closing:
t.Stop()
return
case <-t.C:
before := time.Now()
ps.populateProm()
log.Debugf("memory: populateProm() took %s", time.Since(before))
}
}
}()
return ps, nil
}
type serializedPeer string
type peerShard struct {
swarms map[bittorrent.InfoHash]swarm
swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
numLeechers uint64
sync.RWMutex
}
@ -144,6 +166,24 @@ type peerStore struct {
wg sync.WaitGroup
}
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
}
promInfohashesCount.Set(float64(numInfohashes))
promSeedersCount.Set(float64(numSeeders))
promLeechersCount.Set(float64(numLeechers))
}
var _ storage.PeerStore = &peerStore{}
func (ps *peerStore) getClock() int64 {
@ -209,9 +249,12 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
recordInfohashesDelta(1)
}
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
// new peer
shard.numSeeders++
}
shard.swarms[ih].seeders[pk] = ps.getClock()
shard.Unlock()
@ -240,11 +283,14 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
return storage.ErrResourceDoesNotExist
}
delete(shard.swarms[ih].seeders, pk)
if _, ok := shard.swarms[ih].seeders[pk]; ok {
// seeder actually removed
shard.numSeeders--
delete(shard.swarms[ih].seeders, pk)
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
recordInfohashesDelta(-1)
}
shard.Unlock()
@ -268,9 +314,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
recordInfohashesDelta(1)
}
if _, ok := shard.swarms[ih].leechers[pk]; !ok {
// new leecher
shard.numLeechers++
}
shard.swarms[ih].leechers[pk] = ps.getClock()
shard.Unlock()
@ -299,11 +348,14 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
return storage.ErrResourceDoesNotExist
}
delete(shard.swarms[ih].leechers, pk)
if _, ok := shard.swarms[ih].leechers[pk]; ok {
// leecher actually removed
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
recordInfohashesDelta(-1)
}
shard.Unlock()
@ -327,11 +379,18 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
recordInfohashesDelta(1)
}
delete(shard.swarms[ih].leechers, pk)
if _, ok := shard.swarms[ih].leechers[pk]; ok {
// leecher actually removed
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
}
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
// new seeder
shard.numSeeders++
}
shard.swarms[ih].seeders[pk] = ps.getClock()
shard.Unlock()
@ -435,7 +494,6 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
default:
}
var ihDelta float64
cutoffUnix := cutoff.UnixNano()
start := time.Now()
for _, shard := range ps.shards {
@ -459,18 +517,19 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
for pk, mtime := range shard.swarms[ih].leechers {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].leechers, pk)
shard.numLeechers--
}
}
for pk, mtime := range shard.swarms[ih].seeders {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].seeders, pk)
shard.numSeeders--
}
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
ihDelta--
}
shard.Unlock()
@ -481,7 +540,6 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
}
recordGCDuration(time.Since(start))
recordInfohashesDelta(ihDelta)
return nil
}

View file

@ -9,7 +9,7 @@ import (
)
func createNew() s.PeerStore {
ps, err := New(Config{ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute})
ps, err := New(Config{ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute})
if err != nil {
panic(err)
}