From ed8cc0d5b3e4faeef424544a902c3ae2944f5376 Mon Sep 17 00:00:00 2001 From: Leo Balduf Date: Sat, 9 Jul 2016 20:36:53 -0400 Subject: [PATCH] memory: optimize peer store memory footprint and performance --- server/store/memory/peer_store.go | 331 ++++++++++++++++-------------- 1 file changed, 177 insertions(+), 154 deletions(-) diff --git a/server/store/memory/peer_store.go b/server/store/memory/peer_store.go index 9c07deb..db86910 100644 --- a/server/store/memory/peer_store.go +++ b/server/store/memory/peer_store.go @@ -5,7 +5,9 @@ package memory import ( - "hash/fnv" + "encoding/binary" + "log" + "net" "runtime" "sync" "time" @@ -31,7 +33,7 @@ func (d *peerStoreDriver) New(storecfg *store.DriverConfig) (store.PeerStore, er shards := make([]*peerShard, cfg.Shards) for i := 0; i < cfg.Shards; i++ { shards[i] = &peerShard{} - shards[i].peers = make(map[string]map[string]peer) + shards[i].swarms = make(map[chihaya.InfoHash]*swarm) } return &peerStore{ shards: shards, @@ -61,16 +63,19 @@ func newPeerStoreConfig(storecfg *store.DriverConfig) (*peerStoreConfig, error) return &cfg, nil } -type peer struct { - chihaya.Peer - LastAction time.Time -} +type serializedPeer string type peerShard struct { - peers map[string]map[string]peer + swarms map[chihaya.InfoHash]*swarm sync.RWMutex } +type swarm struct { + // map serialized peer to mtime + seeders map[serializedPeer]int64 + leechers map[serializedPeer]int64 +} + type peerStore struct { shards []*peerShard closed chan struct{} @@ -79,183 +84,193 @@ type peerStore struct { var _ store.PeerStore = &peerStore{} func (s *peerStore) shardIndex(infoHash chihaya.InfoHash) uint32 { - idx := fnv.New32() - idx.Write(infoHash[:]) - return idx.Sum32() % uint32(len(s.shards)) + return binary.BigEndian.Uint32(infoHash[:4]) % uint32(len(s.shards)) } -func peerKey(p chihaya.Peer) string { - return string(p.IP) + string(p.ID[:]) +func peerKey(p chihaya.Peer) serializedPeer { + b := make([]byte, 20+2+len(p.IP)) + copy(b[:20], p.ID[:]) + binary.BigEndian.PutUint16(b[20:22], p.Port) + copy(b[22:], p.IP) + + return serializedPeer(b) } -func seedersKey(infoHash chihaya.InfoHash) string { - return string(infoHash[:]) + "-s" -} - -func leechersKey(infoHash chihaya.InfoHash) string { - return string(infoHash[:]) + "-l" +func decodePeerKey(pk serializedPeer) chihaya.Peer { + return chihaya.Peer{ + ID: chihaya.PeerIDFromString(string(pk[:20])), + Port: binary.BigEndian.Uint16([]byte(pk[20:22])), + IP: net.IP(pk[22:]), + } } func (s *peerStore) PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error { - key := seedersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.Lock() - defer shard.Unlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - if shard.peers[key] == nil { - shard.peers[key] = make(map[string]peer) + shard := s.shards[s.shardIndex(infoHash)] + shard.Lock() + defer shard.Unlock() + + if shard.swarms[infoHash] == nil { + shard.swarms[infoHash] = &swarm{ + seeders: make(map[serializedPeer]int64), + leechers: make(map[serializedPeer]int64), + } } - shard.peers[key][peerKey(p)] = peer{ - Peer: p, - LastAction: time.Now(), - } + shard.swarms[infoHash].seeders[peerKey(p)] = time.Now().UnixNano() return nil } func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error { - key := seedersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.Lock() - defer shard.Unlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - if shard.peers[key] == nil { - return store.ErrResourceDoesNotExist - } - + shard := s.shards[s.shardIndex(infoHash)] pk := peerKey(p) + shard.Lock() + defer shard.Unlock() - if _, ok := shard.peers[key][pk]; !ok { + if shard.swarms[infoHash] == nil { return store.ErrResourceDoesNotExist } - delete(shard.peers[key], pk) + if _, ok := shard.swarms[infoHash].seeders[pk]; !ok { + return store.ErrResourceDoesNotExist + } - if len(shard.peers[key]) == 0 { - delete(shard.peers, key) + delete(shard.swarms[infoHash].seeders, pk) + + if len(shard.swarms[infoHash].seeders)|len(shard.swarms[infoHash].leechers) == 0 { + delete(shard.swarms, infoHash) } return nil } func (s *peerStore) PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error { - key := leechersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.Lock() - defer shard.Unlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - if shard.peers[key] == nil { - shard.peers[key] = make(map[string]peer) + shard := s.shards[s.shardIndex(infoHash)] + shard.Lock() + defer shard.Unlock() + + if shard.swarms[infoHash] == nil { + shard.swarms[infoHash] = &swarm{ + seeders: make(map[serializedPeer]int64), + leechers: make(map[serializedPeer]int64), + } } - shard.peers[key][peerKey(p)] = peer{ - Peer: p, - LastAction: time.Now(), - } + shard.swarms[infoHash].leechers[peerKey(p)] = time.Now().UnixNano() return nil } func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error { - key := leechersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.Lock() - defer shard.Unlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - if shard.peers[key] == nil { - return store.ErrResourceDoesNotExist - } - + shard := s.shards[s.shardIndex(infoHash)] pk := peerKey(p) + shard.Lock() + defer shard.Unlock() - if _, ok := shard.peers[key][pk]; !ok { + if shard.swarms[infoHash] == nil { return store.ErrResourceDoesNotExist } - delete(shard.peers[key], pk) + if _, ok := shard.swarms[infoHash].leechers[pk]; !ok { + return store.ErrResourceDoesNotExist + } - if len(shard.peers[key]) == 0 { - delete(shard.peers, key) + delete(shard.swarms[infoHash].leechers, pk) + + if len(shard.swarms[infoHash].seeders)|len(shard.swarms[infoHash].leechers) == 0 { + delete(shard.swarms, infoHash) } return nil } func (s *peerStore) GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error { - lkey := leechersKey(infoHash) - skey := seedersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.Lock() - defer shard.Unlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - if shard.peers[lkey] != nil { - delete(shard.peers[lkey], peerKey(p)) + key := peerKey(p) + shard := s.shards[s.shardIndex(infoHash)] + shard.Lock() + defer shard.Unlock() + + if shard.swarms[infoHash] == nil { + shard.swarms[infoHash] = &swarm{ + seeders: make(map[serializedPeer]int64), + leechers: make(map[serializedPeer]int64), + } } - if shard.peers[skey] == nil { - shard.peers[skey] = make(map[string]peer) + if _, ok := shard.swarms[infoHash].leechers[key]; ok { + delete(shard.swarms[infoHash].leechers, key) } - shard.peers[skey][peerKey(p)] = peer{ - Peer: p, - LastAction: time.Now(), - } + shard.swarms[infoHash].seeders[key] = time.Now().UnixNano() return nil } func (s *peerStore) CollectGarbage(cutoff time.Time) error { + select { + case <-s.closed: + panic("attempted to interact with stopped store") + default: + } + + log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String()) + cutoffUnix := cutoff.UnixNano() for _, shard := range s.shards { shard.RLock() - var keys []string - for key := range shard.peers { - keys = append(keys, key) + var infohashes []chihaya.InfoHash + for key := range shard.swarms { + infohashes = append(infohashes, key) } shard.RUnlock() runtime.Gosched() - for _, key := range keys { + for _, infohash := range infohashes { shard.Lock() - for peerKey, p := range shard.peers[key] { - if p.LastAction.Before(cutoff) { - delete(shard.peers[key], peerKey) + for peerKey, mtime := range shard.swarms[infohash].leechers { + if mtime <= cutoffUnix { + delete(shard.swarms[infohash].leechers, peerKey) } } - if len(shard.peers[key]) == 0 { - delete(shard.peers, key) + for peerKey, mtime := range shard.swarms[infohash].seeders { + if mtime <= cutoffUnix { + delete(shard.swarms[infohash].seeders, peerKey) + } + } + + if len(shard.swarms[infohash].seeders)|len(shard.swarms[infohash].leechers) == 0 { + delete(shard.swarms, infohash) } shard.Unlock() @@ -269,67 +284,68 @@ func (s *peerStore) CollectGarbage(cutoff time.Time) error { } func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWant int, peer4, peer6 chihaya.Peer) (peers, peers6 []chihaya.Peer, err error) { - lkey := leechersKey(infoHash) - skey := seedersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.RLock() - defer shard.RUnlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } + shard := s.shards[s.shardIndex(infoHash)] + shard.RLock() + defer shard.RUnlock() + if seeder { // Append leechers as possible. - leechers := shard.peers[lkey] - for _, p := range leechers { + leechers := shard.swarms[infoHash].leechers + for p := range leechers { + decodedPeer := decodePeerKey(p) if numWant == 0 { break } - if p.IP.To4() == nil { - peers6 = append(peers6, p.Peer) + if decodedPeer.IP.To4() == nil { + peers6 = append(peers6, decodedPeer) } else { - peers = append(peers, p.Peer) + peers = append(peers, decodedPeer) } numWant-- } } else { // Append as many seeders as possible. - seeders := shard.peers[skey] - for _, p := range seeders { + seeders := shard.swarms[infoHash].seeders + for p := range seeders { + decodedPeer := decodePeerKey(p) if numWant == 0 { break } - if p.IP.To4() == nil { - peers6 = append(peers6, p.Peer) + if decodedPeer.IP.To4() == nil { + peers6 = append(peers6, decodedPeer) } else { - peers = append(peers, p.Peer) + peers = append(peers, decodedPeer) } numWant-- } // Append leechers until we reach numWant. - leechers := shard.peers[lkey] + leechers := shard.swarms[infoHash].leechers if numWant > 0 { - for _, p := range leechers { + for p := range leechers { + decodedPeer := decodePeerKey(p) if numWant == 0 { break } - if p.IP.To4() == nil { - if p.Equal(peer6) { + if decodedPeer.IP.To4() == nil { + if decodedPeer.Equal(peer6) { continue } - peers6 = append(peers6, p.Peer) + peers6 = append(peers6, decodedPeer) } else { - if p.Equal(peer4) { + if decodedPeer.Equal(peer4) { continue } - peers = append(peers, p.Peer) + peers = append(peers, decodedPeer) } numWant-- } @@ -340,98 +356,105 @@ func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWan } func (s *peerStore) GetSeeders(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) { - key := seedersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.RLock() - defer shard.RUnlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - seeders := shard.peers[key] - for _, p := range seeders { - if p.IP.To4() == nil { - peers6 = append(peers6, p.Peer) + shard := s.shards[s.shardIndex(infoHash)] + shard.RLock() + defer shard.RUnlock() + + if shard.swarms[infoHash] == nil { + return nil, nil, store.ErrResourceDoesNotExist + } + + seeders := shard.swarms[infoHash].seeders + for p := range seeders { + decodedPeer := decodePeerKey(p) + if decodedPeer.IP.To4() == nil { + peers6 = append(peers6, decodedPeer) } else { - peers = append(peers, p.Peer) + peers = append(peers, decodedPeer) } } return } func (s *peerStore) GetLeechers(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) { - key := leechersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.RLock() - defer shard.RUnlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - leechers := shard.peers[key] - for _, p := range leechers { - if p.IP.To4() == nil { - peers6 = append(peers6, p.Peer) + shard := s.shards[s.shardIndex(infoHash)] + shard.RLock() + defer shard.RUnlock() + + if shard.swarms[infoHash] == nil { + return nil, nil, store.ErrResourceDoesNotExist + } + + seeders := shard.swarms[infoHash].leechers + for p := range seeders { + decodedPeer := decodePeerKey(p) + if decodedPeer.IP.To4() == nil { + peers6 = append(peers6, decodedPeer) } else { - peers = append(peers, p.Peer) + peers = append(peers, decodedPeer) } } return } func (s *peerStore) NumSeeders(infoHash chihaya.InfoHash) int { - key := seedersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.RLock() - defer shard.RUnlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - return len(shard.peers[key]) + shard := s.shards[s.shardIndex(infoHash)] + shard.RLock() + defer shard.RUnlock() + + if shard.swarms[infoHash] == nil { + return 0 + } + + return len(shard.swarms[infoHash].seeders) } func (s *peerStore) NumLeechers(infoHash chihaya.InfoHash) int { - key := leechersKey(infoHash) - shard := s.shards[s.shardIndex(infoHash)] - shard.RLock() - defer shard.RUnlock() - select { case <-s.closed: panic("attempted to interact with stopped store") default: } - return len(shard.peers[key]) + shard := s.shards[s.shardIndex(infoHash)] + shard.RLock() + defer shard.RUnlock() + + if shard.swarms[infoHash] == nil { + return 0 + } + + return len(shard.swarms[infoHash].leechers) } func (s *peerStore) Stop() <-chan error { toReturn := make(chan error) go func() { - oldshards := s.shards - for _, shard := range oldshards { - shard.Lock() - } - shards := make([]*peerShard, len(oldshards)) - for i := 0; i < len(oldshards); i++ { + shards := make([]*peerShard, len(s.shards)) + for i := 0; i < len(s.shards); i++ { shards[i] = &peerShard{} - shards[i].peers = make(map[string]map[string]peer) + shards[i].swarms = make(map[chihaya.InfoHash]*swarm) } s.shards = shards close(s.closed) - for _, shard := range oldshards { - shard.Unlock() - } close(toReturn) }() return toReturn