diff --git a/example_config.yaml b/example_config.yaml index 90b7fc8..df80fc9 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -78,6 +78,10 @@ chihaya: # higher degree of parallelism. shard_count: 1024 + # The interval at which metrics about the number of infohashes and peers + # are collected and posted to Prometheus. + prometheus_reporting_interval: 1s + # This block defines configuration used for middleware executed before a # response has been returned to a BitTorrent client. prehooks: diff --git a/storage/memory/peer_store.go b/storage/memory/peer_store.go index 05ef9c7..00f9e18 100644 --- a/storage/memory/peer_store.go +++ b/storage/memory/peer_store.go @@ -19,6 +19,7 @@ import ( func init() { prometheus.MustRegister(promGCDurationMilliseconds) prometheus.MustRegister(promInfohashesCount) + prometheus.MustRegister(promSeedersCount, promLeechersCount) } var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -32,25 +33,31 @@ var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{ Help: "The number of infohashes tracked", }) +var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "chihaya_storage_seeders_count", + Help: "The number of seeders tracked", +}) + +var promLeechersCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "chihaya_storage_leechers_count", + Help: "The number of leechers tracked", +}) + // recordGCDuration records the duration of a GC sweep. func recordGCDuration(duration time.Duration) { promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } -// recordInfohashesDelta records a change in the number of Infohashes tracked. -func recordInfohashesDelta(delta float64) { - promInfohashesCount.Add(delta) -} - // ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is // less than or equal to zero. var ErrInvalidGCInterval = errors.New("invalid garbage collection interval") // Config holds the configuration of a memory PeerStore. type Config struct { - GarbageCollectionInterval time.Duration `yaml:"gc_interval"` - PeerLifetime time.Duration `yaml:"peer_lifetime"` - ShardCount int `yaml:"shard_count"` + GarbageCollectionInterval time.Duration `yaml:"gc_interval"` + PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` + PeerLifetime time.Duration `yaml:"peer_lifetime"` + ShardCount int `yaml:"shard_count"` } // LogFields renders the current config as a set of Logrus fields. @@ -108,10 +115,6 @@ func New(cfg Config) (storage.PeerStore, error) { select { case <-ps.closing: t.Stop() - select { - case <-t.C: - default: - } return case now := <-t.C: ps.setClock(now.UnixNano()) @@ -119,13 +122,32 @@ func New(cfg Config) (storage.PeerStore, error) { } }() + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + t := time.NewTicker(cfg.PrometheusReportingInterval) + for { + select { + case <-ps.closing: + t.Stop() + return + case <-t.C: + before := time.Now() + ps.populateProm() + log.Debugf("memory: populateProm() took %s", time.Since(before)) + } + } + }() + return ps, nil } type serializedPeer string type peerShard struct { - swarms map[bittorrent.InfoHash]swarm + swarms map[bittorrent.InfoHash]swarm + numSeeders uint64 + numLeechers uint64 sync.RWMutex } @@ -144,6 +166,24 @@ type peerStore struct { wg sync.WaitGroup } +// populateProm aggregates metrics over all shards and then posts them to +// prometheus. +func (ps *peerStore) populateProm() { + var numInfohashes, numSeeders, numLeechers uint64 + + for _, s := range ps.shards { + s.RLock() + numInfohashes += uint64(len(s.swarms)) + numSeeders += s.numSeeders + numLeechers += s.numLeechers + s.RUnlock() + } + + promInfohashesCount.Set(float64(numInfohashes)) + promSeedersCount.Set(float64(numSeeders)) + promLeechersCount.Set(float64(numLeechers)) +} + var _ storage.PeerStore = &peerStore{} func (ps *peerStore) getClock() int64 { @@ -209,9 +249,12 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error seeders: make(map[serializedPeer]int64), leechers: make(map[serializedPeer]int64), } - recordInfohashesDelta(1) } + if _, ok := shard.swarms[ih].seeders[pk]; !ok { + // new peer + shard.numSeeders++ + } shard.swarms[ih].seeders[pk] = ps.getClock() shard.Unlock() @@ -240,11 +283,14 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err return storage.ErrResourceDoesNotExist } - delete(shard.swarms[ih].seeders, pk) + if _, ok := shard.swarms[ih].seeders[pk]; ok { + // seeder actually removed + shard.numSeeders-- + delete(shard.swarms[ih].seeders, pk) + } if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { delete(shard.swarms, ih) - recordInfohashesDelta(-1) } shard.Unlock() @@ -268,9 +314,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error seeders: make(map[serializedPeer]int64), leechers: make(map[serializedPeer]int64), } - recordInfohashesDelta(1) } + if _, ok := shard.swarms[ih].leechers[pk]; !ok { + // new leecher + shard.numLeechers++ + } shard.swarms[ih].leechers[pk] = ps.getClock() shard.Unlock() @@ -299,11 +348,14 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er return storage.ErrResourceDoesNotExist } - delete(shard.swarms[ih].leechers, pk) + if _, ok := shard.swarms[ih].leechers[pk]; ok { + // leecher actually removed + shard.numLeechers-- + delete(shard.swarms[ih].leechers, pk) + } if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { delete(shard.swarms, ih) - recordInfohashesDelta(-1) } shard.Unlock() @@ -327,11 +379,18 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) seeders: make(map[serializedPeer]int64), leechers: make(map[serializedPeer]int64), } - recordInfohashesDelta(1) } - delete(shard.swarms[ih].leechers, pk) + if _, ok := shard.swarms[ih].leechers[pk]; ok { + // leecher actually removed + shard.numLeechers-- + delete(shard.swarms[ih].leechers, pk) + } + if _, ok := shard.swarms[ih].seeders[pk]; !ok { + // new seeder + shard.numSeeders++ + } shard.swarms[ih].seeders[pk] = ps.getClock() shard.Unlock() @@ -435,7 +494,6 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { default: } - var ihDelta float64 cutoffUnix := cutoff.UnixNano() start := time.Now() for _, shard := range ps.shards { @@ -459,18 +517,19 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { for pk, mtime := range shard.swarms[ih].leechers { if mtime <= cutoffUnix { delete(shard.swarms[ih].leechers, pk) + shard.numLeechers-- } } for pk, mtime := range shard.swarms[ih].seeders { if mtime <= cutoffUnix { delete(shard.swarms[ih].seeders, pk) + shard.numSeeders-- } } if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { delete(shard.swarms, ih) - ihDelta-- } shard.Unlock() @@ -481,7 +540,6 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { } recordGCDuration(time.Since(start)) - recordInfohashesDelta(ihDelta) return nil } diff --git a/storage/memory/peer_store_test.go b/storage/memory/peer_store_test.go index f2478b1..c7b9354 100644 --- a/storage/memory/peer_store_test.go +++ b/storage/memory/peer_store_test.go @@ -9,7 +9,7 @@ import ( ) func createNew() s.PeerStore { - ps, err := New(Config{ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute}) + ps, err := New(Config{ShardCount: 1024, GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute}) if err != nil { panic(err) }