Merge pull request #30 from psaab/develop
Reduce contention on the torrent map
This commit is contained in:
commit
9fea8195bb
3 changed files with 101 additions and 51 deletions
|
@ -63,6 +63,10 @@ type StatsConfig struct {
|
|||
MemUpdateInterval Duration `json:"mem_stats_interval"`
|
||||
}
|
||||
|
||||
type ShardConfig struct {
|
||||
TorrentMapShards int `json:"torrent_map_shards"`
|
||||
}
|
||||
|
||||
// Config is a configuration for a Server.
|
||||
type Config struct {
|
||||
Addr string `json:"addr"`
|
||||
|
@ -85,6 +89,7 @@ type Config struct {
|
|||
|
||||
StatsConfig
|
||||
NetConfig
|
||||
ShardConfig
|
||||
}
|
||||
|
||||
// DefaultConfig is a configuration that can be used as a fallback value.
|
||||
|
@ -124,6 +129,10 @@ var DefaultConfig = Config{
|
|||
RespectAF: false,
|
||||
},
|
||||
|
||||
ShardConfig: ShardConfig{
|
||||
TorrentMapShards: 1,
|
||||
},
|
||||
|
||||
ClientWhitelistEnabled: false,
|
||||
}
|
||||
|
||||
|
|
|
@ -5,38 +5,66 @@
|
|||
package tracker
|
||||
|
||||
import (
|
||||
"hash/fnv"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/chihaya/chihaya/config"
|
||||
"github.com/chihaya/chihaya/stats"
|
||||
"github.com/chihaya/chihaya/tracker/models"
|
||||
)
|
||||
|
||||
type Torrents struct {
|
||||
torrents map[string]*models.Torrent
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type Storage struct {
|
||||
users map[string]*models.User
|
||||
usersM sync.RWMutex
|
||||
|
||||
torrents map[string]*models.Torrent
|
||||
torrentsM sync.RWMutex
|
||||
shards []Torrents
|
||||
size int32
|
||||
|
||||
clients map[string]bool
|
||||
clientsM sync.RWMutex
|
||||
}
|
||||
|
||||
func NewStorage() *Storage {
|
||||
return &Storage{
|
||||
users: make(map[string]*models.User),
|
||||
torrents: make(map[string]*models.Torrent),
|
||||
clients: make(map[string]bool),
|
||||
func NewStorage(cfg *config.Config) *Storage {
|
||||
s := &Storage{
|
||||
users: make(map[string]*models.User),
|
||||
shards: make([]Torrents, cfg.ShardConfig.TorrentMapShards),
|
||||
clients: make(map[string]bool),
|
||||
}
|
||||
for i := range s.shards {
|
||||
s.shards[i].torrents = make(map[string]*models.Torrent)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Storage) GetShardIndex(infohash string) uint32 {
|
||||
idx := fnv.New32()
|
||||
idx.Write([]byte(infohash))
|
||||
return idx.Sum32() % uint32(len(s.shards))
|
||||
}
|
||||
|
||||
func (s *Storage) GetTorrentShard(infohash string, readonly bool) *Torrents {
|
||||
shardindex := s.GetShardIndex(infohash)
|
||||
if readonly {
|
||||
s.shards[shardindex].RLock()
|
||||
} else {
|
||||
s.shards[shardindex].Lock()
|
||||
}
|
||||
return &s.shards[shardindex]
|
||||
}
|
||||
|
||||
func (s *Storage) TouchTorrent(infohash string) error {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return models.ErrTorrentDNE
|
||||
}
|
||||
|
@ -47,10 +75,10 @@ func (s *Storage) TouchTorrent(infohash string) error {
|
|||
}
|
||||
|
||||
func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
|
||||
s.torrentsM.RLock()
|
||||
defer s.torrentsM.RUnlock()
|
||||
shard := s.GetTorrentShard(infohash, true)
|
||||
defer shard.RUnlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return nil, models.ErrTorrentDNE
|
||||
}
|
||||
|
@ -59,24 +87,29 @@ func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
|
|||
}
|
||||
|
||||
func (s *Storage) PutTorrent(torrent *models.Torrent) {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(torrent.Infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
s.torrents[torrent.Infohash] = &*torrent
|
||||
_, exists := shard.torrents[torrent.Infohash]
|
||||
if !exists {
|
||||
atomic.AddInt32(&s.size, 1)
|
||||
}
|
||||
shard.torrents[torrent.Infohash] = &*torrent
|
||||
}
|
||||
|
||||
func (s *Storage) DeleteTorrent(infohash string) {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
delete(s.torrents, infohash)
|
||||
atomic.AddInt32(&s.size, -1)
|
||||
delete(shard.torrents, infohash)
|
||||
}
|
||||
|
||||
func (s *Storage) IncrementTorrentSnatches(infohash string) error {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return models.ErrTorrentDNE
|
||||
}
|
||||
|
@ -87,10 +120,10 @@ func (s *Storage) IncrementTorrentSnatches(infohash string) error {
|
|||
}
|
||||
|
||||
func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return models.ErrTorrentDNE
|
||||
}
|
||||
|
@ -101,10 +134,10 @@ func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
|
|||
}
|
||||
|
||||
func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return models.ErrTorrentDNE
|
||||
}
|
||||
|
@ -115,10 +148,10 @@ func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
|
|||
}
|
||||
|
||||
func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return models.ErrTorrentDNE
|
||||
}
|
||||
|
@ -129,10 +162,10 @@ func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
|
|||
}
|
||||
|
||||
func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return models.ErrTorrentDNE
|
||||
}
|
||||
|
@ -143,16 +176,16 @@ func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
|
|||
}
|
||||
|
||||
func (s *Storage) PurgeInactiveTorrent(infohash string) error {
|
||||
s.torrentsM.Lock()
|
||||
defer s.torrentsM.Unlock()
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
defer shard.Unlock()
|
||||
|
||||
torrent, exists := s.torrents[infohash]
|
||||
torrent, exists := shard.torrents[infohash]
|
||||
if !exists {
|
||||
return models.ErrTorrentDNE
|
||||
}
|
||||
|
||||
if torrent.PeerCount() == 0 {
|
||||
delete(s.torrents, infohash)
|
||||
delete(shard.torrents, infohash)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -162,26 +195,34 @@ func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time)
|
|||
unixtime := before.Unix()
|
||||
|
||||
// Build a list of keys to process.
|
||||
s.torrentsM.RLock()
|
||||
index := 0
|
||||
keys := make([]string, len(s.torrents))
|
||||
|
||||
for infohash := range s.torrents {
|
||||
keys[index] = infohash
|
||||
index++
|
||||
maxkeys := int(atomic.LoadInt32(&s.size))
|
||||
keys := make([]string, maxkeys)
|
||||
for i := range s.shards {
|
||||
shard := &s.shards[i]
|
||||
shard.RLock()
|
||||
for infohash := range shard.torrents {
|
||||
keys[index] = infohash
|
||||
index++
|
||||
if index >= maxkeys {
|
||||
break
|
||||
}
|
||||
}
|
||||
shard.RUnlock()
|
||||
if index >= maxkeys {
|
||||
break
|
||||
}
|
||||
}
|
||||
s.torrentsM.RUnlock()
|
||||
|
||||
// Process the keys while allowing other goroutines to run.
|
||||
for _, infohash := range keys {
|
||||
runtime.Gosched()
|
||||
|
||||
s.torrentsM.Lock()
|
||||
torrent := s.torrents[infohash]
|
||||
shard := s.GetTorrentShard(infohash, false)
|
||||
torrent := shard.torrents[infohash]
|
||||
|
||||
if torrent == nil {
|
||||
// The torrent has already been deleted since keys were computed.
|
||||
s.torrentsM.Unlock()
|
||||
shard.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -189,7 +230,7 @@ func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time)
|
|||
torrent.Leechers.Purge(unixtime)
|
||||
|
||||
peers := torrent.PeerCount()
|
||||
s.torrentsM.Unlock()
|
||||
shard.Unlock()
|
||||
|
||||
if purgeEmptyTorrents && peers == 0 {
|
||||
s.PurgeInactiveTorrent(infohash)
|
||||
|
|
|
@ -35,7 +35,7 @@ func New(cfg *config.Config) (*Tracker, error) {
|
|||
tkr := &Tracker{
|
||||
Config: cfg,
|
||||
Backend: bc,
|
||||
Storage: NewStorage(),
|
||||
Storage: NewStorage(cfg),
|
||||
}
|
||||
|
||||
go tkr.purgeInactivePeers(
|
||||
|
|
Loading…
Reference in a new issue