300 lines
5.9 KiB
Go
300 lines
5.9 KiB
Go
// Copyright 2015 The Chihaya Authors. All rights reserved.
|
|
// Use of this source code is governed by the BSD 2-Clause license,
|
|
// which can be found in the LICENSE file.
|
|
|
|
package tracker
|
|
|
|
import (
|
|
"hash/fnv"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/chihaya/chihaya/config"
|
|
"github.com/chihaya/chihaya/stats"
|
|
"github.com/chihaya/chihaya/tracker/models"
|
|
)
|
|
|
|
type Torrents struct {
|
|
torrents map[string]*models.Torrent
|
|
sync.RWMutex
|
|
}
|
|
|
|
type Storage struct {
|
|
users map[string]*models.User
|
|
usersM sync.RWMutex
|
|
|
|
shards []Torrents
|
|
size int32
|
|
|
|
clients map[string]bool
|
|
clientsM sync.RWMutex
|
|
}
|
|
|
|
func NewStorage(cfg *config.Config) *Storage {
|
|
s := &Storage{
|
|
users: make(map[string]*models.User),
|
|
shards: make([]Torrents, cfg.TorrentMapShards),
|
|
clients: make(map[string]bool),
|
|
}
|
|
for i := range s.shards {
|
|
s.shards[i].torrents = make(map[string]*models.Torrent)
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (s *Storage) Len() int {
|
|
return int(atomic.LoadInt32(&s.size))
|
|
}
|
|
|
|
func (s *Storage) getShardIndex(infohash string) uint32 {
|
|
idx := fnv.New32()
|
|
idx.Write([]byte(infohash))
|
|
return idx.Sum32() % uint32(len(s.shards))
|
|
}
|
|
|
|
func (s *Storage) getTorrentShard(infohash string, readonly bool) *Torrents {
|
|
shardindex := s.getShardIndex(infohash)
|
|
if readonly {
|
|
s.shards[shardindex].RLock()
|
|
} else {
|
|
s.shards[shardindex].Lock()
|
|
}
|
|
return &s.shards[shardindex]
|
|
}
|
|
|
|
func (s *Storage) TouchTorrent(infohash string) error {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return models.ErrTorrentDNE
|
|
}
|
|
|
|
torrent.LastAction = time.Now().Unix()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
|
|
shard := s.getTorrentShard(infohash, true)
|
|
defer shard.RUnlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return nil, models.ErrTorrentDNE
|
|
}
|
|
|
|
return &*torrent, nil
|
|
}
|
|
|
|
func (s *Storage) PutTorrent(torrent *models.Torrent) {
|
|
shard := s.getTorrentShard(torrent.Infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
_, exists := shard.torrents[torrent.Infohash]
|
|
if !exists {
|
|
atomic.AddInt32(&s.size, 1)
|
|
}
|
|
shard.torrents[torrent.Infohash] = &*torrent
|
|
}
|
|
|
|
func (s *Storage) DeleteTorrent(infohash string) {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
if _, exists := shard.torrents[infohash]; exists {
|
|
atomic.AddInt32(&s.size, -1)
|
|
delete(shard.torrents, infohash)
|
|
}
|
|
}
|
|
|
|
func (s *Storage) IncrementTorrentSnatches(infohash string) error {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return models.ErrTorrentDNE
|
|
}
|
|
|
|
torrent.Snatches++
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return models.ErrTorrentDNE
|
|
}
|
|
|
|
torrent.Leechers.Put(*p)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return models.ErrTorrentDNE
|
|
}
|
|
|
|
torrent.Leechers.Delete(p.Key())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return models.ErrTorrentDNE
|
|
}
|
|
|
|
torrent.Seeders.Put(*p)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return models.ErrTorrentDNE
|
|
}
|
|
|
|
torrent.Seeders.Delete(p.Key())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) PurgeInactiveTorrent(infohash string) error {
|
|
shard := s.getTorrentShard(infohash, false)
|
|
defer shard.Unlock()
|
|
|
|
torrent, exists := shard.torrents[infohash]
|
|
if !exists {
|
|
return models.ErrTorrentDNE
|
|
}
|
|
|
|
if torrent.PeerCount() == 0 {
|
|
delete(shard.torrents, infohash)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error {
|
|
unixtime := before.Unix()
|
|
|
|
// Build a list of keys to process.
|
|
index := 0
|
|
maxkeys := s.Len()
|
|
keys := make([]string, maxkeys)
|
|
for i := range s.shards {
|
|
shard := &s.shards[i]
|
|
shard.RLock()
|
|
for infohash := range shard.torrents {
|
|
keys[index] = infohash
|
|
index++
|
|
if index >= maxkeys {
|
|
break
|
|
}
|
|
}
|
|
shard.RUnlock()
|
|
if index >= maxkeys {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Process the keys while allowing other goroutines to run.
|
|
for _, infohash := range keys {
|
|
runtime.Gosched()
|
|
shard := s.getTorrentShard(infohash, false)
|
|
torrent := shard.torrents[infohash]
|
|
|
|
if torrent == nil {
|
|
// The torrent has already been deleted since keys were computed.
|
|
shard.Unlock()
|
|
continue
|
|
}
|
|
|
|
torrent.Seeders.Purge(unixtime)
|
|
torrent.Leechers.Purge(unixtime)
|
|
|
|
peers := torrent.PeerCount()
|
|
shard.Unlock()
|
|
|
|
if purgeEmptyTorrents && peers == 0 {
|
|
s.PurgeInactiveTorrent(infohash)
|
|
stats.RecordEvent(stats.ReapedTorrent)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) FindUser(passkey string) (*models.User, error) {
|
|
s.usersM.RLock()
|
|
defer s.usersM.RUnlock()
|
|
|
|
user, exists := s.users[passkey]
|
|
if !exists {
|
|
return nil, models.ErrUserDNE
|
|
}
|
|
|
|
return &*user, nil
|
|
}
|
|
|
|
func (s *Storage) PutUser(user *models.User) {
|
|
s.usersM.Lock()
|
|
defer s.usersM.Unlock()
|
|
|
|
s.users[user.Passkey] = &*user
|
|
}
|
|
|
|
func (s *Storage) DeleteUser(passkey string) {
|
|
s.usersM.Lock()
|
|
defer s.usersM.Unlock()
|
|
|
|
delete(s.users, passkey)
|
|
}
|
|
|
|
func (s *Storage) ClientApproved(peerID string) error {
|
|
s.clientsM.RLock()
|
|
defer s.clientsM.RUnlock()
|
|
|
|
_, exists := s.clients[peerID]
|
|
if !exists {
|
|
return models.ErrClientUnapproved
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Storage) PutClient(peerID string) {
|
|
s.clientsM.Lock()
|
|
defer s.clientsM.Unlock()
|
|
|
|
s.clients[peerID] = true
|
|
}
|
|
|
|
func (s *Storage) DeleteClient(peerID string) {
|
|
s.clientsM.Lock()
|
|
defer s.clientsM.Unlock()
|
|
|
|
delete(s.clients, peerID)
|
|
}
|