Remove IPv4/IPv6 stats specializations and replace with RecordPeerEvent

This commit is contained in:
Justin Li 2014-07-22 23:41:20 -04:00
parent 11d06f7830
commit ce2e335562
3 changed files with 86 additions and 111 deletions

View file

@ -12,21 +12,13 @@ const (
Announce = iota Announce = iota
Scrape Scrape
CompletedIPv4 Completed
NewLeechIPv4 NewLeech
DeletedLeechIPv4 DeletedLeech
ReapedLeechIPv4 ReapedLeech
NewSeedIPv4 NewSeed
DeletedSeedIPv4 DeletedSeed
ReapedSeedIPv4 ReapedSeed
CompletedIPv6
NewLeechIPv6
DeletedLeechIPv6
ReapedLeechIPv6
NewSeedIPv6
DeletedSeedIPv6
ReapedSeedIPv6
NewTorrent NewTorrent
DeletedTorrent DeletedTorrent
@ -93,6 +85,8 @@ type Stats struct {
ResponseTime PercentileTimes `json:"response_time"` ResponseTime PercentileTimes `json:"response_time"`
events chan int events chan int
ipv4PeerEvents chan int
ipv6PeerEvents chan int
responseTimeEvents chan time.Duration responseTimeEvents chan time.Duration
} }
@ -101,7 +95,10 @@ func New(chanSize int) *Stats {
Start: time.Now(), Start: time.Now(),
events: make(chan int, chanSize), events: make(chan int, chanSize),
ipv4PeerEvents: make(chan int, chanSize),
ipv6PeerEvents: make(chan int, chanSize),
responseTimeEvents: make(chan time.Duration, chanSize), responseTimeEvents: make(chan time.Duration, chanSize),
ResponseTime: PercentileTimes{ ResponseTime: PercentileTimes{
P50: NewPercentile(0.5), P50: NewPercentile(0.5),
P90: NewPercentile(0.9), P90: NewPercentile(0.9),
@ -110,6 +107,7 @@ func New(chanSize int) *Stats {
} }
go s.handleEvents() go s.handleEvents()
go s.handlePeerEvents()
go s.handleTimings() go s.handleTimings()
return s return s
@ -127,6 +125,14 @@ func (s *Stats) RecordEvent(event int) {
s.events <- event s.events <- event
} }
func (s *Stats) RecordPeerEvent(event int, ipv6 bool) {
if ipv6 {
s.ipv6PeerEvents <- event
} else {
s.ipv4PeerEvents <- event
}
}
func (s *Stats) RecordTiming(event int, duration time.Duration) { func (s *Stats) RecordTiming(event int, duration time.Duration) {
switch event { switch event {
case ResponseTime: case ResponseTime:
@ -141,71 +147,16 @@ func (s *Stats) handleEvents() {
switch event { switch event {
case Announce: case Announce:
s.Announces++ s.Announces++
case Scrape: case Scrape:
s.Scrapes++ s.Scrapes++
case CompletedIPv4:
s.IPv4Peers.Completed++
s.IPv4Peers.SeedsCurrent++
case NewLeechIPv4:
s.IPv4Peers.Joined++
s.IPv4Peers.Current++
case DeletedLeechIPv4:
s.IPv4Peers.Left++
s.IPv4Peers.Current--
case ReapedLeechIPv4:
s.IPv4Peers.Reaped++
s.IPv4Peers.Current--
case NewSeedIPv4:
s.IPv4Peers.SeedsJoined++
s.IPv4Peers.SeedsCurrent++
s.IPv4Peers.Joined++
s.IPv4Peers.Current++
case DeletedSeedIPv4:
s.IPv4Peers.SeedsLeft++
s.IPv4Peers.SeedsCurrent--
s.IPv4Peers.Left++
s.IPv4Peers.Current--
case ReapedSeedIPv4:
s.IPv4Peers.SeedsReaped++
s.IPv4Peers.SeedsCurrent--
s.IPv4Peers.Reaped++
s.IPv4Peers.Current--
case CompletedIPv6:
s.IPv6Peers.Completed++
s.IPv6Peers.SeedsCurrent++
case NewLeechIPv6:
s.IPv6Peers.Joined++
s.IPv6Peers.Current++
case DeletedLeechIPv6:
s.IPv6Peers.Left++
s.IPv6Peers.Current--
case ReapedLeechIPv6:
s.IPv6Peers.Reaped++
s.IPv6Peers.Current--
case NewSeedIPv6:
s.IPv6Peers.SeedsJoined++
s.IPv6Peers.SeedsCurrent++
s.IPv6Peers.Joined++
s.IPv6Peers.Current++
case DeletedSeedIPv6:
s.IPv6Peers.SeedsLeft++
s.IPv6Peers.SeedsCurrent--
s.IPv6Peers.Left++
s.IPv6Peers.Current--
case ReapedSeedIPv6:
s.IPv6Peers.SeedsReaped++
s.IPv6Peers.SeedsCurrent--
s.IPv6Peers.Reaped++
s.IPv6Peers.Current--
case NewTorrent: case NewTorrent:
s.TorrentsAdded++ s.TorrentsAdded++
case DeletedTorrent: case DeletedTorrent:
s.TorrentsRemoved++ s.TorrentsRemoved++
case ReapedTorrent: case ReapedTorrent:
s.TorrentsReaped++ s.TorrentsReaped++
@ -228,6 +179,56 @@ func (s *Stats) handleEvents() {
} }
} }
func (s *Stats) handlePeerEvents() {
for {
select {
case event := <-s.ipv4PeerEvents:
s.handlePeerEvent(&s.IPv4Peers, event)
case event := <-s.ipv6PeerEvents:
s.handlePeerEvent(&s.IPv6Peers, event)
}
}
}
func (s *Stats) handlePeerEvent(ps *PeerStats, event int) {
switch event {
case Completed:
ps.Completed++
ps.SeedsCurrent++
case NewLeech:
ps.Joined++
ps.Current++
case DeletedLeech:
ps.Left++
ps.Current--
case ReapedLeech:
ps.Reaped++
ps.Current--
case NewSeed:
ps.SeedsJoined++
ps.SeedsCurrent++
ps.Joined++
ps.Current++
case DeletedSeed:
ps.SeedsLeft++
ps.SeedsCurrent--
ps.Left++
ps.Current--
case ReapedSeed:
ps.SeedsReaped++
ps.SeedsCurrent--
ps.Reaped++
ps.Current--
}
}
func (s *Stats) handleTimings() { func (s *Stats) handleTimings() {
for { for {
select { select {
@ -245,6 +246,11 @@ func RecordEvent(event int) {
DefaultStats.RecordEvent(event) DefaultStats.RecordEvent(event)
} }
// RecordPeerEvent broadcasts a peer event to the default stats queue.
func RecordPeerEvent(event int, ipv6 bool) {
DefaultStats.RecordPeerEvent(event, ipv6)
}
// RecordTiming broadcasts a timing event to the default stats queue. // RecordTiming broadcasts a timing event to the default stats queue.
func RecordTiming(event int, duration time.Duration) { func RecordTiming(event int, duration time.Duration) {
DefaultStats.RecordTiming(event, duration) DefaultStats.RecordTiming(event, duration)

View file

@ -120,11 +120,7 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent
return return
} }
t.Seeders[p.ID] = *p t.Seeders[p.ID] = *p
if p.IPv4() { stats.RecordPeerEvent(stats.NewSeed, p.IPv6())
stats.RecordEvent(stats.NewSeedIPv4)
} else {
stats.RecordEvent(stats.NewSeedIPv6)
}
} else { } else {
err = c.PutLeecher(t.Infohash, p) err = c.PutLeecher(t.Infohash, p)
@ -132,11 +128,7 @@ func updateSwarm(c Conn, ann *models.Announce, p *models.Peer, t *models.Torrent
return return
} }
t.Leechers[p.ID] = *p t.Leechers[p.ID] = *p
if p.IPv4() { stats.RecordPeerEvent(stats.NewLeech, p.IPv6())
stats.RecordEvent(stats.NewLeechIPv4)
} else {
stats.RecordEvent(stats.NewLeechIPv6)
}
} }
created = true created = true
} }
@ -155,11 +147,7 @@ func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t
return return
} }
delete(t.Seeders, p.ID) delete(t.Seeders, p.ID)
if p.IPv4() { stats.RecordPeerEvent(stats.DeletedSeed, p.IPv6())
stats.RecordEvent(stats.DeletedSeedIPv4)
} else {
stats.RecordEvent(stats.DeletedSeedIPv6)
}
} else if t.InLeecherPool(p) { } else if t.InLeecherPool(p) {
err = c.DeleteLeecher(t.Infohash, p.ID) err = c.DeleteLeecher(t.Infohash, p.ID)
@ -167,11 +155,7 @@ func handleEvent(c Conn, ann *models.Announce, p *models.Peer, u *models.User, t
return return
} }
delete(t.Leechers, p.ID) delete(t.Leechers, p.ID)
if p.IPv4() { stats.RecordPeerEvent(stats.DeletedLeech, p.IPv6())
stats.RecordEvent(stats.DeletedLeechIPv4)
} else {
stats.RecordEvent(stats.DeletedLeechIPv6)
}
} }
case ann.Event == "completed": case ann.Event == "completed":
@ -212,12 +196,7 @@ func leecherFinished(c Conn, infohash string, p *models.Peer) error {
return err return err
} }
if p.IPv4() { stats.RecordPeerEvent(stats.Completed, p.IPv6())
stats.RecordEvent(stats.CompletedIPv4)
} else {
stats.RecordEvent(stats.CompletedIPv6)
}
return nil return nil
} }

View file

@ -271,24 +271,14 @@ func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) err
for key, peer := range torrent.Seeders { for key, peer := range torrent.Seeders {
if peer.LastAnnounce <= unixtime { if peer.LastAnnounce <= unixtime {
delete(torrent.Seeders, key) delete(torrent.Seeders, key)
stats.RecordPeerEvent(stats.ReapedSeed, peer.IPv6())
if peer.IPv4() {
stats.RecordEvent(stats.ReapedSeedIPv4)
} else {
stats.RecordEvent(stats.ReapedSeedIPv6)
}
} }
} }
for key, peer := range torrent.Leechers { for key, peer := range torrent.Leechers {
if peer.LastAnnounce <= unixtime { if peer.LastAnnounce <= unixtime {
delete(torrent.Leechers, key) delete(torrent.Leechers, key)
stats.RecordPeerEvent(stats.ReapedLeech, peer.IPv6())
if peer.IPv4() {
stats.RecordEvent(stats.ReapedLeechIPv4)
} else {
stats.RecordEvent(stats.ReapedLeechIPv6)
}
} }
} }