Merge pull request #31 from psaab/develop

Introduce TorrentSize stat and replace some locks in PeerMap with atomic increments.
This commit is contained in:
Jimmy Zelinskie 2014-09-24 19:33:30 -04:00
commit ec71e28fdc
3 changed files with 26 additions and 12 deletions

View file

@ -81,6 +81,7 @@ type Stats struct {
Announces uint64 `json:"Tracker.Announces"`
Scrapes uint64 `json:"Tracker.Scrapes"`
TorrentsSize uint64 `json:"Torrents.Size"`
TorrentsAdded uint64 `json:"Torrents.Added"`
TorrentsRemoved uint64 `json:"Torrents.Removed"`
TorrentsReaped uint64 `json:"Torrents.Reaped"`
@ -192,12 +193,15 @@ func (s *Stats) handleEvent(event int) {
case NewTorrent:
s.TorrentsAdded++
s.TorrentsSize++
case DeletedTorrent:
s.TorrentsRemoved++
s.TorrentsSize--
case ReapedTorrent:
s.TorrentsReaped++
s.TorrentsSize--
case AcceptedConnection:
s.ConnectionsAccepted++

View file

@ -7,6 +7,7 @@ package models
import (
"net"
"sync"
"sync/atomic"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/stats"
@ -18,6 +19,7 @@ type PeerMap struct {
Peers map[string]map[PeerKey]Peer `json:"peers"`
Seeders bool `json:"seeders"`
Config config.SubnetConfig `json:"config"`
Size int32
sync.RWMutex
}
@ -96,6 +98,10 @@ func (pm *PeerMap) Put(p Peer) {
if !exists {
pm.Peers[maskedIP] = make(map[PeerKey]Peer)
}
_, exists = pm.Peers[maskedIP][p.Key()]
if !exists {
atomic.AddInt32(&(pm.Size), 1)
}
pm.Peers[maskedIP][p.Key()] = p
}
@ -105,19 +111,16 @@ func (pm *PeerMap) Delete(pk PeerKey) {
defer pm.Unlock()
maskedIP := pm.mask(pk.IP())
delete(pm.Peers[maskedIP], pk)
_, exists := pm.Peers[maskedIP][pk]
if exists {
atomic.AddInt32(&(pm.Size), -1)
delete(pm.Peers[maskedIP], pk)
}
}
// Len returns the number of peers within a PeerMap.
func (pm *PeerMap) Len() int {
pm.RLock()
defer pm.RUnlock()
var count int
for _, subnetmap := range pm.Peers {
count += len(subnetmap)
}
return count
return int(atomic.LoadInt32(&pm.Size))
}
// Purge iterates over all of the peers within a PeerMap and deletes them if
@ -129,6 +132,7 @@ func (pm *PeerMap) Purge(unixtime int64) {
for _, subnetmap := range pm.Peers {
for key, peer := range subnetmap {
if peer.LastAnnounce <= unixtime {
atomic.AddInt32(&(pm.Size), -1)
delete(subnetmap, key)
if pm.Seeders {
stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6())

View file

@ -44,6 +44,10 @@ func NewStorage(cfg *config.Config) *Storage {
return s
}
func (s *Storage) Len() int {
return int(atomic.LoadInt32(&s.size))
}
func (s *Storage) GetShardIndex(infohash string) uint32 {
idx := fnv.New32()
idx.Write([]byte(infohash))
@ -101,8 +105,10 @@ func (s *Storage) DeleteTorrent(infohash string) {
shard := s.GetTorrentShard(infohash, false)
defer shard.Unlock()
atomic.AddInt32(&s.size, -1)
delete(shard.torrents, infohash)
if _, exists := shard.torrents[infohash]; exists {
atomic.AddInt32(&s.size, -1)
delete(shard.torrents, infohash)
}
}
func (s *Storage) IncrementTorrentSnatches(infohash string) error {
@ -196,7 +202,7 @@ func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time)
// Build a list of keys to process.
index := 0
maxkeys := int(atomic.LoadInt32(&s.size))
maxkeys := s.Len()
keys := make([]string, maxkeys)
for i := range s.shards {
shard := &s.shards[i]