diff --git a/cmd/trakr/main.go b/cmd/trakr/main.go index 1c7e2e8..3b34214 100644 --- a/cmd/trakr/main.go +++ b/cmd/trakr/main.go @@ -17,6 +17,7 @@ import ( httpfrontend "github.com/jzelinskie/trakr/frontend/http" udpfrontend "github.com/jzelinskie/trakr/frontend/udp" "github.com/jzelinskie/trakr/middleware" + "github.com/jzelinskie/trakr/storage/memory" ) type ConfigFile struct { @@ -101,6 +102,9 @@ func main() { return err } + // Force the compiler to enforce memory against the storage interface. + _, _ = memory.New(memory.Config{1}) + errChan := make(chan error) closedChan := make(chan struct{}) diff --git a/middleware/middleware.go b/middleware/middleware.go index 91d1cc5..b9b31d1 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -10,6 +10,7 @@ import ( "github.com/jzelinskie/trakr/bittorrent" "github.com/jzelinskie/trakr/frontend" + "github.com/jzelinskie/trakr/storage" ) type Config struct { @@ -18,7 +19,7 @@ type Config struct { var _ frontend.TrackerLogic = &Logic{} -func NewLogic(config Config, peerStore PeerStore, announcePreHooks, announcePostHooks, scrapePreHooks, scrapePostHooks []Hook) *Logic { +func NewLogic(config Config, peerStore storage.PeerStore, announcePreHooks, announcePostHooks, scrapePreHooks, scrapePostHooks []Hook) *Logic { l := &Logic{ announceInterval: config.AnnounceInterval, peerStore: peerStore, @@ -51,7 +52,7 @@ func NewLogic(config Config, peerStore PeerStore, announcePreHooks, announcePost // executing a series of middleware hooks. type Logic struct { announceInterval time.Duration - peerStore PeerStore + peerStore storage.PeerStore announcePreHooks []Hook announcePostHooks []Hook scrapePreHooks []Hook diff --git a/storage/memory/peer_store.go b/storage/memory/peer_store.go new file mode 100644 index 0000000..80aec6b --- /dev/null +++ b/storage/memory/peer_store.go @@ -0,0 +1,371 @@ +package memory + +import ( + "encoding/binary" + "log" + "net" + "runtime" + "sync" + "time" + + "github.com/jzelinskie/trakr/bittorrent" + "github.com/jzelinskie/trakr/storage" +) + +// TODO(jzelinskie): separate ipv4 and ipv6 swarms + +type Config struct { + ShardCount int `yaml:"shard_count"` +} + +func New(cfg Config) (storage.PeerStore, error) { + shardCount := 1 + if cfg.ShardCount > 0 { + shardCount = cfg.ShardCount + } + + shards := make([]*peerShard, shardCount) + for i := 0; i < shardCount; i++ { + shards[i] = &peerShard{} + shards[i].swarms = make(map[swarmKey]swarm) + } + + return &peerStore{ + shards: shards, + closed: make(chan struct{}), + }, nil +} + +type serializedPeer string + +type swarmKey [21]byte + +func newSwarmKey(ih bittorrent.InfoHash, p bittorrent.Peer) (key swarmKey) { + for i, ihbyte := range ih { + key[i] = ihbyte + } + if len(p.IP) == net.IPv4len { + key[20] = byte(4) + } else { + key[20] = byte(6) + } + + return +} + +type peerShard struct { + swarms map[swarmKey]swarm + sync.RWMutex +} + +type swarm struct { + // map serialized peer to mtime + seeders map[serializedPeer]int64 + leechers map[serializedPeer]int64 +} + +type peerStore struct { + shards []*peerShard + closed chan struct{} +} + +var _ storage.PeerStore = &peerStore{} + +func (s *peerStore) shardIndex(infoHash bittorrent.InfoHash) uint32 { + return binary.BigEndian.Uint32(infoHash[:4]) % uint32(len(s.shards)) +} + +func newPeerKey(p bittorrent.Peer) serializedPeer { + b := make([]byte, 20+2+len(p.IP)) + copy(b[:20], p.ID[:]) + binary.BigEndian.PutUint16(b[20:22], p.Port) + copy(b[22:], p.IP) + + return serializedPeer(b) +} + +func decodePeerKey(pk serializedPeer) bittorrent.Peer { + return bittorrent.Peer{ + ID: bittorrent.PeerIDFromString(string(pk[:20])), + Port: binary.BigEndian.Uint16([]byte(pk[20:22])), + IP: net.IP(pk[22:]), + } +} + +func (s *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + sk := newSwarmKey(ih, p) + pk := newPeerKey(p) + + shard := s.shards[s.shardIndex(ih)] + shard.Lock() + + if _, ok := shard.swarms[sk]; !ok { + shard.swarms[sk] = swarm{ + seeders: make(map[serializedPeer]int64), + leechers: make(map[serializedPeer]int64), + } + } + + shard.swarms[sk].seeders[pk] = time.Now().UnixNano() + + shard.Unlock() + return nil +} + +func (s *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + sk := newSwarmKey(ih, p) + pk := newPeerKey(p) + + shard := s.shards[s.shardIndex(ih)] + shard.Lock() + + if _, ok := shard.swarms[sk]; !ok { + shard.Unlock() + return storage.ErrResourceDoesNotExist + } + + if _, ok := shard.swarms[sk].seeders[pk]; !ok { + shard.Unlock() + return storage.ErrResourceDoesNotExist + } + + delete(shard.swarms[sk].seeders, pk) + + if len(shard.swarms[sk].seeders)|len(shard.swarms[sk].leechers) == 0 { + delete(shard.swarms, sk) + } + + shard.Unlock() + return nil +} + +func (s *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + sk := newSwarmKey(ih, p) + pk := newPeerKey(p) + + shard := s.shards[s.shardIndex(ih)] + shard.Lock() + + if _, ok := shard.swarms[sk]; !ok { + shard.swarms[sk] = swarm{ + seeders: make(map[serializedPeer]int64), + leechers: make(map[serializedPeer]int64), + } + } + + shard.swarms[sk].leechers[pk] = time.Now().UnixNano() + + shard.Unlock() + return nil +} + +func (s *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + sk := newSwarmKey(ih, p) + pk := newPeerKey(p) + + shard := s.shards[s.shardIndex(ih)] + shard.Lock() + + if _, ok := shard.swarms[sk]; !ok { + shard.Unlock() + return storage.ErrResourceDoesNotExist + } + + if _, ok := shard.swarms[sk].leechers[pk]; !ok { + shard.Unlock() + return storage.ErrResourceDoesNotExist + } + + delete(shard.swarms[sk].leechers, pk) + + if len(shard.swarms[sk].seeders)|len(shard.swarms[sk].leechers) == 0 { + delete(shard.swarms, sk) + } + + shard.Unlock() + return nil +} + +func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + sk := newSwarmKey(ih, p) + pk := newPeerKey(p) + + shard := s.shards[s.shardIndex(ih)] + shard.Lock() + + if _, ok := shard.swarms[sk]; !ok { + shard.swarms[sk] = swarm{ + seeders: make(map[serializedPeer]int64), + leechers: make(map[serializedPeer]int64), + } + } + + delete(shard.swarms[sk].leechers, pk) + + shard.swarms[sk].seeders[pk] = time.Now().UnixNano() + + shard.Unlock() + return nil +} + +func (s *peerStore) CollectGarbage(cutoff time.Time) error { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String()) + cutoffUnix := cutoff.UnixNano() + for _, shard := range s.shards { + shard.RLock() + var swarmKeys []swarmKey + for sk := range shard.swarms { + swarmKeys = append(swarmKeys, sk) + } + shard.RUnlock() + runtime.Gosched() + + for _, sk := range swarmKeys { + shard.Lock() + + if _, stillExists := shard.swarms[sk]; !stillExists { + shard.Unlock() + runtime.Gosched() + continue + } + + for pk, mtime := range shard.swarms[sk].leechers { + if mtime <= cutoffUnix { + delete(shard.swarms[sk].leechers, pk) + } + } + + for pk, mtime := range shard.swarms[sk].seeders { + if mtime <= cutoffUnix { + delete(shard.swarms[sk].seeders, pk) + } + } + + if len(shard.swarms[sk].seeders)|len(shard.swarms[sk].leechers) == 0 { + delete(shard.swarms, sk) + } + + shard.Unlock() + runtime.Gosched() + } + + runtime.Gosched() + } + + return nil +} + +func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + sk := newSwarmKey(ih, announcer) + + shard := s.shards[s.shardIndex(ih)] + shard.RLock() + + if _, ok := shard.swarms[sk]; !ok { + shard.RUnlock() + return nil, storage.ErrResourceDoesNotExist + } + + if seeder { + // Append leechers as possible. + leechers := shard.swarms[sk].leechers + for p := range leechers { + decodedPeer := decodePeerKey(p) + if numWant == 0 { + break + } + + peers = append(peers, decodedPeer) + numWant-- + } + } else { + // Append as many seeders as possible. + seeders := shard.swarms[sk].seeders + for p := range seeders { + decodedPeer := decodePeerKey(p) + if numWant == 0 { + break + } + + peers = append(peers, decodedPeer) + numWant-- + } + + // Append leechers until we reach numWant. + leechers := shard.swarms[sk].leechers + if numWant > 0 { + for p := range leechers { + decodedPeer := decodePeerKey(p) + if numWant == 0 { + break + } + + if decodedPeer.Equal(announcer) { + continue + } + peers = append(peers, decodedPeer) + numWant-- + } + } + } + + shard.RUnlock() + return +} + +func (s *peerStore) Stop() <-chan error { + toReturn := make(chan error) + go func() { + shards := make([]*peerShard, len(s.shards)) + for i := 0; i < len(s.shards); i++ { + shards[i] = &peerShard{} + shards[i].swarms = make(map[swarmKey]swarm) + } + s.shards = shards + close(s.closed) + close(toReturn) + }() + return toReturn +} diff --git a/storage/memory/peer_store_test.go b/storage/memory/peer_store_test.go new file mode 100644 index 0000000..9a00b17 --- /dev/null +++ b/storage/memory/peer_store_test.go @@ -0,0 +1,142 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package memory + +import ( + "testing" + + "github.com/chihaya/chihaya/server/store" +) + +var ( + peerStoreTester = store.PreparePeerStoreTester(&peerStoreDriver{}) + peerStoreBenchmarker = store.PreparePeerStoreBenchmarker(&peerStoreDriver{}) + peerStoreTestConfig = &store.DriverConfig{} +) + +func init() { + unmarshalledConfig := struct { + Shards int + }{ + 1, + } + peerStoreTestConfig.Config = unmarshalledConfig +} + +func TestPeerStore(t *testing.T) { + peerStoreTester.TestPeerStore(t, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutSeeder(b *testing.B) { + peerStoreBenchmarker.PutSeeder(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutSeeder1KInfohash(b *testing.B) { + peerStoreBenchmarker.PutSeeder1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutSeeder1KSeeders(b *testing.B) { + peerStoreBenchmarker.PutSeeder1KSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutSeeder1KInfohash1KSeeders(b *testing.B) { + peerStoreBenchmarker.PutSeeder1KInfohash1KSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutDeleteSeeder(b *testing.B) { + peerStoreBenchmarker.PutDeleteSeeder(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutDeleteSeeder1KInfohash(b *testing.B) { + peerStoreBenchmarker.PutDeleteSeeder1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutDeleteSeeder1KSeeders(b *testing.B) { + peerStoreBenchmarker.PutDeleteSeeder1KSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutDeleteSeeder1KInfohash1KSeeders(b *testing.B) { + peerStoreBenchmarker.PutDeleteSeeder1KInfohash1KSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_DeleteSeederNonExist(b *testing.B) { + peerStoreBenchmarker.DeleteSeederNonExist(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_DeleteSeederNonExist1KInfohash(b *testing.B) { + peerStoreBenchmarker.DeleteSeederNonExist1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_DeleteSeederNonExist1KSeeders(b *testing.B) { + peerStoreBenchmarker.DeleteSeederNonExist1KSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_DeleteSeederNonExist1KInfohash1KSeeders(b *testing.B) { + peerStoreBenchmarker.DeleteSeederNonExist1KInfohash1KSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutGraduateDeleteLeecher(b *testing.B) { + peerStoreBenchmarker.PutGraduateDeleteLeecher(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutGraduateDeleteLeecher1KInfohash(b *testing.B) { + peerStoreBenchmarker.PutGraduateDeleteLeecher1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutGraduateDeleteLeecher1KSeeders(b *testing.B) { + peerStoreBenchmarker.PutGraduateDeleteLeecher1KLeechers(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_PutGraduateDeleteLeecher1KInfohash1KSeeders(b *testing.B) { + peerStoreBenchmarker.PutGraduateDeleteLeecher1KInfohash1KLeechers(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_GraduateLeecherNonExist(b *testing.B) { + peerStoreBenchmarker.GraduateLeecherNonExist(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_GraduateLeecherNonExist1KInfohash(b *testing.B) { + peerStoreBenchmarker.GraduateLeecherNonExist1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_GraduateLeecherNonExist1KSeeders(b *testing.B) { + peerStoreBenchmarker.GraduateLeecherNonExist1KLeechers(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_GraduateLeecherNonExist1KInfohash1KSeeders(b *testing.B) { + peerStoreBenchmarker.GraduateLeecherNonExist1KInfohash1KLeechers(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_AnnouncePeers(b *testing.B) { + peerStoreBenchmarker.AnnouncePeers(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_AnnouncePeers1KInfohash(b *testing.B) { + peerStoreBenchmarker.AnnouncePeers1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_AnnouncePeersSeeder(b *testing.B) { + peerStoreBenchmarker.AnnouncePeersSeeder(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_AnnouncePeersSeeder1KInfohash(b *testing.B) { + peerStoreBenchmarker.AnnouncePeersSeeder1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_GetSeeders(b *testing.B) { + peerStoreBenchmarker.GetSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_GetSeeders1KInfohash(b *testing.B) { + peerStoreBenchmarker.GetSeeders1KInfohash(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_NumSeeders(b *testing.B) { + peerStoreBenchmarker.NumSeeders(b, peerStoreTestConfig) +} + +func BenchmarkPeerStore_NumSeeders1KInfohash(b *testing.B) { + peerStoreBenchmarker.NumSeeders1KInfohash(b, peerStoreTestConfig) +} diff --git a/middleware/storage.go b/storage/storage.go similarity index 99% rename from middleware/storage.go rename to storage/storage.go index d9848a0..fb24a70 100644 --- a/middleware/storage.go +++ b/storage/storage.go @@ -1,4 +1,4 @@ -package middleware +package storage import ( "time"