memory: optimize peer store memory footprint and performance

This commit is contained in:
Leo Balduf 2016-07-09 20:36:53 -04:00
parent 1fdbe1bad1
commit ed8cc0d5b3

View file

@ -5,7 +5,9 @@
package memory package memory
import ( import (
"hash/fnv" "encoding/binary"
"log"
"net"
"runtime" "runtime"
"sync" "sync"
"time" "time"
@ -31,7 +33,7 @@ func (d *peerStoreDriver) New(storecfg *store.DriverConfig) (store.PeerStore, er
shards := make([]*peerShard, cfg.Shards) shards := make([]*peerShard, cfg.Shards)
for i := 0; i < cfg.Shards; i++ { for i := 0; i < cfg.Shards; i++ {
shards[i] = &peerShard{} shards[i] = &peerShard{}
shards[i].peers = make(map[string]map[string]peer) shards[i].swarms = make(map[chihaya.InfoHash]*swarm)
} }
return &peerStore{ return &peerStore{
shards: shards, shards: shards,
@ -61,16 +63,19 @@ func newPeerStoreConfig(storecfg *store.DriverConfig) (*peerStoreConfig, error)
return &cfg, nil return &cfg, nil
} }
type peer struct { type serializedPeer string
chihaya.Peer
LastAction time.Time
}
type peerShard struct { type peerShard struct {
peers map[string]map[string]peer swarms map[chihaya.InfoHash]*swarm
sync.RWMutex sync.RWMutex
} }
type swarm struct {
// map serialized peer to mtime
seeders map[serializedPeer]int64
leechers map[serializedPeer]int64
}
type peerStore struct { type peerStore struct {
shards []*peerShard shards []*peerShard
closed chan struct{} closed chan struct{}
@ -79,183 +84,193 @@ type peerStore struct {
var _ store.PeerStore = &peerStore{} var _ store.PeerStore = &peerStore{}
func (s *peerStore) shardIndex(infoHash chihaya.InfoHash) uint32 { func (s *peerStore) shardIndex(infoHash chihaya.InfoHash) uint32 {
idx := fnv.New32() return binary.BigEndian.Uint32(infoHash[:4]) % uint32(len(s.shards))
idx.Write(infoHash[:])
return idx.Sum32() % uint32(len(s.shards))
} }
func peerKey(p chihaya.Peer) string { func peerKey(p chihaya.Peer) serializedPeer {
return string(p.IP) + string(p.ID[:]) b := make([]byte, 20+2+len(p.IP))
copy(b[:20], p.ID[:])
binary.BigEndian.PutUint16(b[20:22], p.Port)
copy(b[22:], p.IP)
return serializedPeer(b)
} }
func seedersKey(infoHash chihaya.InfoHash) string { func decodePeerKey(pk serializedPeer) chihaya.Peer {
return string(infoHash[:]) + "-s" return chihaya.Peer{
} ID: chihaya.PeerIDFromString(string(pk[:20])),
Port: binary.BigEndian.Uint16([]byte(pk[20:22])),
func leechersKey(infoHash chihaya.InfoHash) string { IP: net.IP(pk[22:]),
return string(infoHash[:]) + "-l" }
} }
func (s *peerStore) PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error { func (s *peerStore) PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
key := seedersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.Lock()
defer shard.Unlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
if shard.peers[key] == nil { shard := s.shards[s.shardIndex(infoHash)]
shard.peers[key] = make(map[string]peer) shard.Lock()
defer shard.Unlock()
if shard.swarms[infoHash] == nil {
shard.swarms[infoHash] = &swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
} }
shard.peers[key][peerKey(p)] = peer{ shard.swarms[infoHash].seeders[peerKey(p)] = time.Now().UnixNano()
Peer: p,
LastAction: time.Now(),
}
return nil return nil
} }
func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error { func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error {
key := seedersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.Lock()
defer shard.Unlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
if shard.peers[key] == nil { shard := s.shards[s.shardIndex(infoHash)]
return store.ErrResourceDoesNotExist
}
pk := peerKey(p) pk := peerKey(p)
shard.Lock()
defer shard.Unlock()
if _, ok := shard.peers[key][pk]; !ok { if shard.swarms[infoHash] == nil {
return store.ErrResourceDoesNotExist return store.ErrResourceDoesNotExist
} }
delete(shard.peers[key], pk) if _, ok := shard.swarms[infoHash].seeders[pk]; !ok {
return store.ErrResourceDoesNotExist
}
if len(shard.peers[key]) == 0 { delete(shard.swarms[infoHash].seeders, pk)
delete(shard.peers, key)
if len(shard.swarms[infoHash].seeders)|len(shard.swarms[infoHash].leechers) == 0 {
delete(shard.swarms, infoHash)
} }
return nil return nil
} }
func (s *peerStore) PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error { func (s *peerStore) PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
key := leechersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.Lock()
defer shard.Unlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
if shard.peers[key] == nil { shard := s.shards[s.shardIndex(infoHash)]
shard.peers[key] = make(map[string]peer) shard.Lock()
defer shard.Unlock()
if shard.swarms[infoHash] == nil {
shard.swarms[infoHash] = &swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
} }
shard.peers[key][peerKey(p)] = peer{ shard.swarms[infoHash].leechers[peerKey(p)] = time.Now().UnixNano()
Peer: p,
LastAction: time.Now(),
}
return nil return nil
} }
func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error { func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
key := leechersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.Lock()
defer shard.Unlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
if shard.peers[key] == nil { shard := s.shards[s.shardIndex(infoHash)]
return store.ErrResourceDoesNotExist
}
pk := peerKey(p) pk := peerKey(p)
shard.Lock()
defer shard.Unlock()
if _, ok := shard.peers[key][pk]; !ok { if shard.swarms[infoHash] == nil {
return store.ErrResourceDoesNotExist return store.ErrResourceDoesNotExist
} }
delete(shard.peers[key], pk) if _, ok := shard.swarms[infoHash].leechers[pk]; !ok {
return store.ErrResourceDoesNotExist
}
if len(shard.peers[key]) == 0 { delete(shard.swarms[infoHash].leechers, pk)
delete(shard.peers, key)
if len(shard.swarms[infoHash].seeders)|len(shard.swarms[infoHash].leechers) == 0 {
delete(shard.swarms, infoHash)
} }
return nil return nil
} }
func (s *peerStore) GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error { func (s *peerStore) GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error {
lkey := leechersKey(infoHash)
skey := seedersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.Lock()
defer shard.Unlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
if shard.peers[lkey] != nil { key := peerKey(p)
delete(shard.peers[lkey], peerKey(p)) shard := s.shards[s.shardIndex(infoHash)]
shard.Lock()
defer shard.Unlock()
if shard.swarms[infoHash] == nil {
shard.swarms[infoHash] = &swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
} }
if shard.peers[skey] == nil { if _, ok := shard.swarms[infoHash].leechers[key]; ok {
shard.peers[skey] = make(map[string]peer) delete(shard.swarms[infoHash].leechers, key)
} }
shard.peers[skey][peerKey(p)] = peer{ shard.swarms[infoHash].seeders[key] = time.Now().UnixNano()
Peer: p,
LastAction: time.Now(),
}
return nil return nil
} }
func (s *peerStore) CollectGarbage(cutoff time.Time) error { func (s *peerStore) CollectGarbage(cutoff time.Time) error {
select {
case <-s.closed:
panic("attempted to interact with stopped store")
default:
}
log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String())
cutoffUnix := cutoff.UnixNano()
for _, shard := range s.shards { for _, shard := range s.shards {
shard.RLock() shard.RLock()
var keys []string var infohashes []chihaya.InfoHash
for key := range shard.peers { for key := range shard.swarms {
keys = append(keys, key) infohashes = append(infohashes, key)
} }
shard.RUnlock() shard.RUnlock()
runtime.Gosched() runtime.Gosched()
for _, key := range keys { for _, infohash := range infohashes {
shard.Lock() shard.Lock()
for peerKey, p := range shard.peers[key] { for peerKey, mtime := range shard.swarms[infohash].leechers {
if p.LastAction.Before(cutoff) { if mtime <= cutoffUnix {
delete(shard.peers[key], peerKey) delete(shard.swarms[infohash].leechers, peerKey)
} }
} }
if len(shard.peers[key]) == 0 { for peerKey, mtime := range shard.swarms[infohash].seeders {
delete(shard.peers, key) if mtime <= cutoffUnix {
delete(shard.swarms[infohash].seeders, peerKey)
}
}
if len(shard.swarms[infohash].seeders)|len(shard.swarms[infohash].leechers) == 0 {
delete(shard.swarms, infohash)
} }
shard.Unlock() shard.Unlock()
@ -269,67 +284,68 @@ func (s *peerStore) CollectGarbage(cutoff time.Time) error {
} }
func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWant int, peer4, peer6 chihaya.Peer) (peers, peers6 []chihaya.Peer, err error) { func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWant int, peer4, peer6 chihaya.Peer) (peers, peers6 []chihaya.Peer, err error) {
lkey := leechersKey(infoHash)
skey := seedersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
if seeder { if seeder {
// Append leechers as possible. // Append leechers as possible.
leechers := shard.peers[lkey] leechers := shard.swarms[infoHash].leechers
for _, p := range leechers { for p := range leechers {
decodedPeer := decodePeerKey(p)
if numWant == 0 { if numWant == 0 {
break break
} }
if p.IP.To4() == nil { if decodedPeer.IP.To4() == nil {
peers6 = append(peers6, p.Peer) peers6 = append(peers6, decodedPeer)
} else { } else {
peers = append(peers, p.Peer) peers = append(peers, decodedPeer)
} }
numWant-- numWant--
} }
} else { } else {
// Append as many seeders as possible. // Append as many seeders as possible.
seeders := shard.peers[skey] seeders := shard.swarms[infoHash].seeders
for _, p := range seeders { for p := range seeders {
decodedPeer := decodePeerKey(p)
if numWant == 0 { if numWant == 0 {
break break
} }
if p.IP.To4() == nil { if decodedPeer.IP.To4() == nil {
peers6 = append(peers6, p.Peer) peers6 = append(peers6, decodedPeer)
} else { } else {
peers = append(peers, p.Peer) peers = append(peers, decodedPeer)
} }
numWant-- numWant--
} }
// Append leechers until we reach numWant. // Append leechers until we reach numWant.
leechers := shard.peers[lkey] leechers := shard.swarms[infoHash].leechers
if numWant > 0 { if numWant > 0 {
for _, p := range leechers { for p := range leechers {
decodedPeer := decodePeerKey(p)
if numWant == 0 { if numWant == 0 {
break break
} }
if p.IP.To4() == nil { if decodedPeer.IP.To4() == nil {
if p.Equal(peer6) { if decodedPeer.Equal(peer6) {
continue continue
} }
peers6 = append(peers6, p.Peer) peers6 = append(peers6, decodedPeer)
} else { } else {
if p.Equal(peer4) { if decodedPeer.Equal(peer4) {
continue continue
} }
peers = append(peers, p.Peer) peers = append(peers, decodedPeer)
} }
numWant-- numWant--
} }
@ -340,98 +356,105 @@ func (s *peerStore) AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWan
} }
func (s *peerStore) GetSeeders(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) { func (s *peerStore) GetSeeders(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) {
key := seedersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
seeders := shard.peers[key] shard := s.shards[s.shardIndex(infoHash)]
for _, p := range seeders { shard.RLock()
if p.IP.To4() == nil { defer shard.RUnlock()
peers6 = append(peers6, p.Peer)
if shard.swarms[infoHash] == nil {
return nil, nil, store.ErrResourceDoesNotExist
}
seeders := shard.swarms[infoHash].seeders
for p := range seeders {
decodedPeer := decodePeerKey(p)
if decodedPeer.IP.To4() == nil {
peers6 = append(peers6, decodedPeer)
} else { } else {
peers = append(peers, p.Peer) peers = append(peers, decodedPeer)
} }
} }
return return
} }
func (s *peerStore) GetLeechers(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) { func (s *peerStore) GetLeechers(infoHash chihaya.InfoHash) (peers, peers6 []chihaya.Peer, err error) {
key := leechersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
leechers := shard.peers[key] shard := s.shards[s.shardIndex(infoHash)]
for _, p := range leechers { shard.RLock()
if p.IP.To4() == nil { defer shard.RUnlock()
peers6 = append(peers6, p.Peer)
if shard.swarms[infoHash] == nil {
return nil, nil, store.ErrResourceDoesNotExist
}
seeders := shard.swarms[infoHash].leechers
for p := range seeders {
decodedPeer := decodePeerKey(p)
if decodedPeer.IP.To4() == nil {
peers6 = append(peers6, decodedPeer)
} else { } else {
peers = append(peers, p.Peer) peers = append(peers, decodedPeer)
} }
} }
return return
} }
func (s *peerStore) NumSeeders(infoHash chihaya.InfoHash) int { func (s *peerStore) NumSeeders(infoHash chihaya.InfoHash) int {
key := seedersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
return len(shard.peers[key]) shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
if shard.swarms[infoHash] == nil {
return 0
}
return len(shard.swarms[infoHash].seeders)
} }
func (s *peerStore) NumLeechers(infoHash chihaya.InfoHash) int { func (s *peerStore) NumLeechers(infoHash chihaya.InfoHash) int {
key := leechersKey(infoHash)
shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
select { select {
case <-s.closed: case <-s.closed:
panic("attempted to interact with stopped store") panic("attempted to interact with stopped store")
default: default:
} }
return len(shard.peers[key]) shard := s.shards[s.shardIndex(infoHash)]
shard.RLock()
defer shard.RUnlock()
if shard.swarms[infoHash] == nil {
return 0
}
return len(shard.swarms[infoHash].leechers)
} }
func (s *peerStore) Stop() <-chan error { func (s *peerStore) Stop() <-chan error {
toReturn := make(chan error) toReturn := make(chan error)
go func() { go func() {
oldshards := s.shards shards := make([]*peerShard, len(s.shards))
for _, shard := range oldshards { for i := 0; i < len(s.shards); i++ {
shard.Lock()
}
shards := make([]*peerShard, len(oldshards))
for i := 0; i < len(oldshards); i++ {
shards[i] = &peerShard{} shards[i] = &peerShard{}
shards[i].peers = make(map[string]map[string]peer) shards[i].swarms = make(map[chihaya.InfoHash]*swarm)
} }
s.shards = shards s.shards = shards
close(s.closed) close(s.closed)
for _, shard := range oldshards {
shard.Unlock()
}
close(toReturn) close(toReturn)
}() }()
return toReturn return toReturn