separate IPv4 and IPv6 swarms on by shards

This commit is contained in:
Leo Balduf 2016-08-11 20:35:39 -04:00 committed by Jimmy Zelinskie
parent 6a45107193
commit e1cf159d9c

View file

@ -24,10 +24,9 @@ func New(cfg Config) (storage.PeerStore, error) {
shardCount = cfg.ShardCount
}
shards := make([]*peerShard, shardCount)
for i := 0; i < shardCount; i++ {
shards[i] = &peerShard{}
shards[i].swarms = make(map[swarmKey]swarm)
shards := make([]*peerShard, shardCount*2)
for i := 0; i < shardCount*2; i++ {
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
return &peerStore{
@ -38,23 +37,8 @@ func New(cfg Config) (storage.PeerStore, error) {
type serializedPeer string
type swarmKey [21]byte
func newSwarmKey(ih bittorrent.InfoHash, p bittorrent.Peer) (key swarmKey) {
for i, ihbyte := range ih {
key[i] = ihbyte
}
if len(p.IP) == net.IPv4len {
key[20] = byte(4)
} else {
key[20] = byte(6)
}
return
}
type peerShard struct {
swarms map[swarmKey]swarm
swarms map[bittorrent.InfoHash]swarm
sync.RWMutex
}
@ -71,8 +55,12 @@ type peerStore struct {
var _ storage.PeerStore = &peerStore{}
func (s *peerStore) shardIndex(infoHash bittorrent.InfoHash) uint32 {
return binary.BigEndian.Uint32(infoHash[:4]) % uint32(len(s.shards))
func (s *peerStore) shardIndex(infoHash bittorrent.InfoHash, p bittorrent.Peer) uint32 {
idx := binary.BigEndian.Uint32(infoHash[:4]) % uint32(len(s.shards))
if len(p.IP) == net.IPv6len {
idx += idx + uint32(len(s.shards)/2)
}
return idx
}
func newPeerKey(p bittorrent.Peer) serializedPeer {
@ -99,20 +87,19 @@ func (s *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
default:
}
sk := newSwarmKey(ih, p)
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih)]
shard := s.shards[s.shardIndex(ih, p)]
shard.Lock()
if _, ok := shard.swarms[sk]; !ok {
shard.swarms[sk] = swarm{
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
}
shard.swarms[sk].seeders[pk] = time.Now().UnixNano()
shard.swarms[ih].seeders[pk] = time.Now().UnixNano()
shard.Unlock()
return nil
@ -125,26 +112,25 @@ func (s *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) erro
default:
}
sk := newSwarmKey(ih, p)
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih)]
shard := s.shards[s.shardIndex(ih, p)]
shard.Lock()
if _, ok := shard.swarms[sk]; !ok {
if _, ok := shard.swarms[ih]; !ok {
shard.Unlock()
return storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[sk].seeders[pk]; !ok {
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
shard.Unlock()
return storage.ErrResourceDoesNotExist
}
delete(shard.swarms[sk].seeders, pk)
delete(shard.swarms[ih].seeders, pk)
if len(shard.swarms[sk].seeders)|len(shard.swarms[sk].leechers) == 0 {
delete(shard.swarms, sk)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
shard.Unlock()
@ -158,20 +144,19 @@ func (s *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
default:
}
sk := newSwarmKey(ih, p)
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih)]
shard := s.shards[s.shardIndex(ih, p)]
shard.Lock()
if _, ok := shard.swarms[sk]; !ok {
shard.swarms[sk] = swarm{
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
}
shard.swarms[sk].leechers[pk] = time.Now().UnixNano()
shard.swarms[ih].leechers[pk] = time.Now().UnixNano()
shard.Unlock()
return nil
@ -184,26 +169,25 @@ func (s *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) err
default:
}
sk := newSwarmKey(ih, p)
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih)]
shard := s.shards[s.shardIndex(ih, p)]
shard.Lock()
if _, ok := shard.swarms[sk]; !ok {
if _, ok := shard.swarms[ih]; !ok {
shard.Unlock()
return storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[sk].leechers[pk]; !ok {
if _, ok := shard.swarms[ih].leechers[pk]; !ok {
shard.Unlock()
return storage.ErrResourceDoesNotExist
}
delete(shard.swarms[sk].leechers, pk)
delete(shard.swarms[ih].leechers, pk)
if len(shard.swarms[sk].seeders)|len(shard.swarms[sk].leechers) == 0 {
delete(shard.swarms, sk)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
shard.Unlock()
@ -217,22 +201,21 @@ func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) e
default:
}
sk := newSwarmKey(ih, p)
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih)]
shard := s.shards[s.shardIndex(ih, p)]
shard.Lock()
if _, ok := shard.swarms[sk]; !ok {
shard.swarms[sk] = swarm{
if _, ok := shard.swarms[ih]; !ok {
shard.swarms[ih] = swarm{
seeders: make(map[serializedPeer]int64),
leechers: make(map[serializedPeer]int64),
}
}
delete(shard.swarms[sk].leechers, pk)
delete(shard.swarms[ih].leechers, pk)
shard.swarms[sk].seeders[pk] = time.Now().UnixNano()
shard.swarms[ih].seeders[pk] = time.Now().UnixNano()
shard.Unlock()
return nil
@ -249,36 +232,36 @@ func (s *peerStore) CollectGarbage(cutoff time.Time) error {
cutoffUnix := cutoff.UnixNano()
for _, shard := range s.shards {
shard.RLock()
var swarmKeys []swarmKey
for sk := range shard.swarms {
swarmKeys = append(swarmKeys, sk)
var infohashes []bittorrent.InfoHash
for ih := range shard.swarms {
infohashes = append(infohashes, ih)
}
shard.RUnlock()
runtime.Gosched()
for _, sk := range swarmKeys {
for _, ih := range infohashes {
shard.Lock()
if _, stillExists := shard.swarms[sk]; !stillExists {
if _, stillExists := shard.swarms[ih]; !stillExists {
shard.Unlock()
runtime.Gosched()
continue
}
for pk, mtime := range shard.swarms[sk].leechers {
for pk, mtime := range shard.swarms[ih].leechers {
if mtime <= cutoffUnix {
delete(shard.swarms[sk].leechers, pk)
delete(shard.swarms[ih].leechers, pk)
}
}
for pk, mtime := range shard.swarms[sk].seeders {
for pk, mtime := range shard.swarms[ih].seeders {
if mtime <= cutoffUnix {
delete(shard.swarms[sk].seeders, pk)
delete(shard.swarms[ih].seeders, pk)
}
}
if len(shard.swarms[sk].seeders)|len(shard.swarms[sk].leechers) == 0 {
delete(shard.swarms, sk)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
}
shard.Unlock()
@ -298,19 +281,17 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
default:
}
sk := newSwarmKey(ih, announcer)
shard := s.shards[s.shardIndex(ih)]
shard := s.shards[s.shardIndex(ih, announcer)]
shard.RLock()
if _, ok := shard.swarms[sk]; !ok {
if _, ok := shard.swarms[ih]; !ok {
shard.RUnlock()
return nil, storage.ErrResourceDoesNotExist
}
if seeder {
// Append leechers as possible.
leechers := shard.swarms[sk].leechers
leechers := shard.swarms[ih].leechers
for p := range leechers {
decodedPeer := decodePeerKey(p)
if numWant == 0 {
@ -322,7 +303,7 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
}
} else {
// Append as many seeders as possible.
seeders := shard.swarms[sk].seeders
seeders := shard.swarms[ih].seeders
for p := range seeders {
decodedPeer := decodePeerKey(p)
if numWant == 0 {
@ -334,7 +315,7 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
}
// Append leechers until we reach numWant.
leechers := shard.swarms[sk].leechers
leechers := shard.swarms[ih].leechers
if numWant > 0 {
for p := range leechers {
decodedPeer := decodePeerKey(p)
@ -360,8 +341,7 @@ func (s *peerStore) Stop() <-chan error {
go func() {
shards := make([]*peerShard, len(s.shards))
for i := 0; i < len(s.shards); i++ {
shards[i] = &peerShard{}
shards[i].swarms = make(map[swarmKey]swarm)
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
s.shards = shards
close(s.closed)