remove gc from storage, but implement it in memory

This commit is contained in:
Jimmy Zelinskie 2016-08-15 23:44:06 -04:00
parent d3f153c938
commit 6fddcb8eea
4 changed files with 95 additions and 79 deletions

View file

@ -22,10 +22,11 @@ import (
type ConfigFile struct {
MainConfigBlock struct {
middleware.Config
PrometheusAddr string `yaml:"prometheus_addr"`
HTTPConfig httpfrontend.Config `yaml:"http"`
UDPConfig udpfrontend.Config `yaml:"udp"`
middleware.Config
Storage memory.Config `yaml:"storage"`
} `yaml:"trakr"`
}
@ -95,15 +96,18 @@ func main() {
}
}()
// TODO create PeerStore
// TODO create Hooks
logic := middleware.NewLogic(cfg.Config, nil, nil, nil, nil, nil)
// Force the compiler to enforce memory against the storage interface.
peerStore, err := memory.New(cfg.Storage)
if err != nil {
return err
}
// Force the compiler to enforce memory against the storage interface.
_, _ = memory.New(memory.Config{1})
// TODO create PeerStore
// TODO create Hooks
logic := middleware.NewLogic(cfg.Config, peerStore, nil, nil, nil, nil)
if err != nil {
return err
}
errChan := make(chan error)
closedChan := make(chan struct{})

View file

@ -18,8 +18,8 @@ trakr:
name: memory
config:
shards: 1
gc_interval: 15m
gc_expiration: 15m
gc_interval: 14m
peer_lifetime: 15m
prehooks:
- name: jwt

View file

@ -16,27 +16,41 @@ import (
// Config holds the configuration of a memory PeerStore.
type Config struct {
ShardCount int `yaml:"shard_count"`
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"`
ShardCount int `yaml:"shard_count"`
}
// New creates a new memory PeerStore.
//
// The PeerStore will have at least one shard.
// New creates a new PeerStore backed by memory.
func New(cfg Config) (storage.PeerStore, error) {
shardCount := 1
if cfg.ShardCount > 0 {
shardCount = cfg.ShardCount
}
shards := make([]*peerShard, shardCount*2)
for i := 0; i < shardCount*2; i++ {
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
ps := &peerStore{
shards: make([]*peerShard, shardCount*2),
closed: make(chan struct{}),
}
return &peerStore{
shards: shards,
closed: make(chan struct{}),
}, nil
for i := 0; i < shardCount*2; i++ {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
go func() {
for {
select {
case <-ps.closed:
return
case <-time.After(cfg.GarbageCollectionInterval):
before := time.Now().Add(-cfg.GarbageCollectionInterval)
log.Println("memory: purging peers with no announces since ", before)
ps.collectGarbage(before)
}
}
}()
return ps, nil
}
type serializedPeer string
@ -225,59 +239,6 @@ func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) e
return nil
}
func (s *peerStore) CollectGarbage(cutoff time.Time) error {
select {
case <-s.closed:
panic("attempted to interact with stopped memory store")
default:
}
log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String())
cutoffUnix := cutoff.UnixNano()
for _, shard := range s.shards {
shard.RLock()
var infohashes []bittorrent.InfoHash
for ih := range shard.swarms {
infohashes = append(infohashes, ih)
}
shard.RUnlock()
runtime.Gosched()
for _, ih := range infohashes {
shard.Lock()
if _, stillExists := shard.swarms[ih]; !stillExists {
shard.Unlock()
runtime.Gosched()
continue
}
for pk, mtime := range shard.swarms[ih].leechers {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].leechers, pk)
}
}
for pk, mtime := range shard.swarms[ih].seeders {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].seeders, pk)
}
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
shard.Unlock()
runtime.Gosched()
}
runtime.Gosched()
}
return nil
}
func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
select {
case <-s.closed:
@ -340,6 +301,64 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
return
}
// collectGarbage deletes all Peers from the PeerStore which are older than the
// cutoff time.
//
// This function must be able to execute while other methods on this interface
// are being executed in parallel.
func (s *peerStore) collectGarbage(cutoff time.Time) error {
select {
case <-s.closed:
panic("attempted to interact with stopped memory store")
default:
}
log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String())
cutoffUnix := cutoff.UnixNano()
for _, shard := range s.shards {
shard.RLock()
var infohashes []bittorrent.InfoHash
for ih := range shard.swarms {
infohashes = append(infohashes, ih)
}
shard.RUnlock()
runtime.Gosched()
for _, ih := range infohashes {
shard.Lock()
if _, stillExists := shard.swarms[ih]; !stillExists {
shard.Unlock()
runtime.Gosched()
continue
}
for pk, mtime := range shard.swarms[ih].leechers {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].leechers, pk)
}
}
for pk, mtime := range shard.swarms[ih].seeders {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].seeders, pk)
}
}
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
shard.Unlock()
runtime.Gosched()
}
runtime.Gosched()
}
return nil
}
func (s *peerStore) Stop() <-chan error {
toReturn := make(chan error)
go func() {

View file

@ -1,8 +1,6 @@
package storage
import (
"time"
"github.com/jzelinskie/trakr/bittorrent"
"github.com/jzelinskie/trakr/stopper"
)
@ -53,11 +51,6 @@ type PeerStore interface {
// - if seeder is false, should ideally return more seeders than leechers
AnnouncePeers(infoHash bittorrent.InfoHash, seeder bool, numWant int, p bittorrent.Peer) (peers []bittorrent.Peer, err error)
// CollectGarbage deletes all Peers from the PeerStore which are older than
// the cutoff time. This function must be able to execute while other methods
// on this interface are being executed in parallel.
CollectGarbage(cutoff time.Time) error
// Stopper is an interface that expects a Stop method to stops the PeerStore.
// For more details see the documentation in the stopper package.
stopper.Stopper