Organize peers by subnet.

This commit restructures PeerMaps to be a map from Subnet to
PeerID to Peer. This reduces the complexity require to gather peers from
the same subnet.
This commit is contained in:
Jimmy Zelinskie 2014-09-23 23:00:50 -04:00
parent 6a96245d90
commit b910fdabf5
7 changed files with 131 additions and 118 deletions

View file

@ -40,16 +40,19 @@ type DriverConfig struct {
Params map[string]string `json:"params,omitempty"` Params map[string]string `json:"params,omitempty"`
} }
type SubnetConfig struct {
PreferredSubnet bool `json:"preferred_subnet,omitempty"`
PreferredIPv4Subnet int `json:"preferred_ipv4_subnet,omitempty"`
PreferredIPv6Subnet int `json:"preferred_ipv6_subnet,omitempty"`
}
// NetConfig is the configuration used to tune networking behaviour. // NetConfig is the configuration used to tune networking behaviour.
type NetConfig struct { type NetConfig struct {
AllowIPSpoofing bool `json:"allow_ip_spoofing"` AllowIPSpoofing bool `json:"allow_ip_spoofing"`
DualStackedPeers bool `json:"dual_stacked_peers"` DualStackedPeers bool `json:"dual_stacked_peers"`
RealIPHeader string `json:"real_ip_header"` RealIPHeader string `json:"real_ip_header"`
RespectAF bool `json:"respect_af"` RespectAF bool `json:"respect_af"`
SubnetConfig
PreferredSubnet bool `json:"preferred_subnet,omitempty"`
PreferredIPv4Subnet int `json:"preferred_ipv4_subnet,omitempty"`
PreferredIPv6Subnet int `json:"preferred_ipv6_subnet,omitempty"`
} }
type StatsConfig struct { type StatsConfig struct {

View file

@ -359,8 +359,8 @@ func loadPrivateTestData(tkr *tracker.Tracker) {
torrent := &models.Torrent{ torrent := &models.Torrent{
ID: 1, ID: 1,
Infohash: infoHash, Infohash: infoHash,
Seeders: models.NewPeerMap(true), Seeders: models.NewPeerMap(true, tkr.Config),
Leechers: models.NewPeerMap(false), Leechers: models.NewPeerMap(false, tkr.Config),
} }
tkr.PutTorrent(torrent) tkr.PutTorrent(torrent)

View file

@ -12,14 +12,14 @@ import (
// HandleAnnounce encapsulates all of the logic of handling a BitTorrent // HandleAnnounce encapsulates all of the logic of handling a BitTorrent
// client's Announce without being coupled to any transport protocol. // client's Announce without being coupled to any transport protocol.
func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) { func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) {
if tkr.cfg.ClientWhitelistEnabled { if tkr.Config.ClientWhitelistEnabled {
if err = tkr.ClientApproved(ann.ClientID()); err != nil { if err = tkr.ClientApproved(ann.ClientID()); err != nil {
return err return err
} }
} }
var user *models.User var user *models.User
if tkr.cfg.PrivateEnabled { if tkr.Config.PrivateEnabled {
if user, err = tkr.FindUser(ann.Passkey); err != nil { if user, err = tkr.FindUser(ann.Passkey); err != nil {
return err return err
} }
@ -27,11 +27,11 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) {
torrent, err := tkr.FindTorrent(ann.Infohash) torrent, err := tkr.FindTorrent(ann.Infohash)
if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled { if err == models.ErrTorrentDNE && !tkr.Config.PrivateEnabled {
torrent = &models.Torrent{ torrent = &models.Torrent{
Infohash: ann.Infohash, Infohash: ann.Infohash,
Seeders: models.NewPeerMap(true), Seeders: models.NewPeerMap(true, tkr.Config),
Leechers: models.NewPeerMap(false), Leechers: models.NewPeerMap(false, tkr.Config),
} }
tkr.PutTorrent(torrent) tkr.PutTorrent(torrent)
@ -43,7 +43,7 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) {
ann.BuildPeer(user, torrent) ann.BuildPeer(user, torrent)
var delta *models.AnnounceDelta var delta *models.AnnounceDelta
if tkr.cfg.PrivateEnabled { if tkr.Config.PrivateEnabled {
delta = newAnnounceDelta(ann, torrent) delta = newAnnounceDelta(ann, torrent)
} }
@ -57,13 +57,13 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) {
return err return err
} }
if tkr.cfg.PrivateEnabled { if tkr.Config.PrivateEnabled {
delta.Created = created delta.Created = created
delta.Snatched = snatched delta.Snatched = snatched
if err = tkr.backend.RecordAnnounce(delta); err != nil { if err = tkr.Backend.RecordAnnounce(delta); err != nil {
return err return err
} }
} else if tkr.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 { } else if tkr.Config.PurgeInactiveTorrents && torrent.PeerCount() == 0 {
// Rather than deleting the torrent explicitly, let the tracker driver // Rather than deleting the torrent explicitly, let the tracker driver
// ensure there are no race conditions. // ensure there are no race conditions.
tkr.PurgeInactiveTorrent(torrent.Infohash) tkr.PurgeInactiveTorrent(torrent.Infohash)
@ -230,8 +230,8 @@ func (tkr *Tracker) handlePeerEvent(ann *models.Announce, p *models.Peer) (snatc
} }
case ann.Event == "completed": case ann.Event == "completed":
v4seed := t.Seeders.Contains(models.NewPeerKey(p.ID, false)) v4seed := t.Seeders.Contains(p.Key())
v6seed := t.Seeders.Contains(models.NewPeerKey(p.ID, true)) v6seed := t.Seeders.Contains(p.Key())
if t.Leechers.Contains(p.Key()) { if t.Leechers.Contains(p.Key()) {
err = tkr.leecherFinished(t, p) err = tkr.leecherFinished(t, p)

View file

@ -8,6 +8,7 @@ package models
import ( import (
"net" "net"
"strings"
"time" "time"
"github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/config"
@ -45,12 +46,16 @@ func (e NotFoundError) Error() string { return string(e) }
type PeerList []Peer type PeerList []Peer
type PeerKey string type PeerKey string
func NewPeerKey(peerID string, ipv6 bool) PeerKey { func NewPeerKey(peerID string, ip net.IP) PeerKey {
if ipv6 { return PeerKey(peerID + "//" + ip.String())
return PeerKey("6:" + peerID) }
}
return PeerKey("4:" + peerID) func (pk PeerKey) IP() net.IP {
return net.ParseIP(strings.Split(string(pk), "//")[1])
}
func (pk PeerKey) PeerID() string {
return strings.Split(string(pk), "//")[0]
} }
// Peer is a participant in a swarm. // Peer is a participant in a swarm.
@ -79,7 +84,7 @@ func (p *Peer) HasIPv6() bool {
} }
func (p *Peer) Key() PeerKey { func (p *Peer) Key() PeerKey {
return NewPeerKey(p.ID, p.HasIPv6()) return NewPeerKey(p.ID, p.IP)
} }
// Torrent is a swarm for a given torrent file. // Torrent is a swarm for a given torrent file.

View file

@ -5,36 +5,70 @@
package models package models
import ( import (
"encoding/json"
"net" "net"
"sync" "sync"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/stats" "github.com/chihaya/chihaya/stats"
) )
// PeerMap is a thread-safe map from PeerKeys to Peers. // PeerMap is a thread-safe map from PeerKeys to Peers. When PreferredSubnet is
// enabled, it is a thread-safe map of maps from MaskedIPs to Peerkeys to Peers.
type PeerMap struct { type PeerMap struct {
seeders bool Peers map[string]map[PeerKey]Peer `json:"peers"`
peers map[PeerKey]Peer Seeders bool `json:"seeders"`
Config config.SubnetConfig `json:"config"`
sync.RWMutex sync.RWMutex
} }
// NewPeerMap initializes the map for a new PeerMap. // NewPeerMap initializes the map for a new PeerMap.
func NewPeerMap(seeders bool) PeerMap { func NewPeerMap(seeders bool, cfg *config.Config) PeerMap {
return PeerMap{ pm := PeerMap{
peers: make(map[PeerKey]Peer), Peers: make(map[string]map[PeerKey]Peer),
seeders: seeders, Seeders: seeders,
Config: cfg.NetConfig.SubnetConfig,
} }
if !pm.Config.PreferredSubnet {
pm.Peers[""] = make(map[PeerKey]Peer)
}
return pm
} }
// Contains is true if a peer is contained with a PeerMap. // Contains is true if a peer is contained with a PeerMap.
func (pm *PeerMap) Contains(pk PeerKey) (exists bool) { func (pm *PeerMap) Contains(pk PeerKey) bool {
pm.RLock() pm.RLock()
defer pm.RUnlock() defer pm.RUnlock()
_, exists = pm.peers[pk] if pm.Config.PreferredSubnet {
maskedIP := pm.mask(pk.IP())
peers, exists := pm.Peers[maskedIP]
if !exists {
return false
}
return _, exists = peers[pk]
return exists
}
_, exists := pm.Peers[""][pk]
return exists
}
func (pm *PeerMap) mask(ip net.IP) string {
if !pm.Config.PreferredSubnet {
return ""
}
var maskedIP net.IP
if len(ip) == net.IPv6len {
maskedIP = ip.Mask(net.CIDRMask(pm.Config.PreferredIPv6Subnet, 128))
} else {
maskedIP = ip.Mask(net.CIDRMask(pm.Config.PreferredIPv4Subnet, 32))
}
return maskedIP.String()
} }
// LookUp is a thread-safe read from a PeerMap. // LookUp is a thread-safe read from a PeerMap.
@ -42,7 +76,12 @@ func (pm *PeerMap) LookUp(pk PeerKey) (peer Peer, exists bool) {
pm.RLock() pm.RLock()
defer pm.RUnlock() defer pm.RUnlock()
peer, exists = pm.peers[pk] maskedIP := pm.mask(pk.IP())
peers, exists := pm.Peers[maskedIP]
if !exists {
return Peer{}, false
}
peer, exists = peers[pk]
return return
} }
@ -52,7 +91,12 @@ func (pm *PeerMap) Put(p Peer) {
pm.Lock() pm.Lock()
defer pm.Unlock() defer pm.Unlock()
pm.peers[p.Key()] = p maskedIP := pm.mask(p.IP)
_, exists := pm.Peers[maskedIP]
if !exists {
pm.Peers[maskedIP] = make(map[PeerKey]Peer)
}
pm.Peers[maskedIP][p.Key()] = p
} }
// Delete is a thread-safe delete from a PeerMap. // Delete is a thread-safe delete from a PeerMap.
@ -60,7 +104,8 @@ func (pm *PeerMap) Delete(pk PeerKey) {
pm.Lock() pm.Lock()
defer pm.Unlock() defer pm.Unlock()
delete(pm.peers, pk) maskedIP := pm.mask(pk.IP())
delete(pm.Peers[maskedIP], pk)
} }
// Len returns the number of peers within a PeerMap. // Len returns the number of peers within a PeerMap.
@ -68,27 +113,11 @@ func (pm *PeerMap) Len() int {
pm.RLock() pm.RLock()
defer pm.RUnlock() defer pm.RUnlock()
return len(pm.peers) var count int
} for _, subnetmap := range pm.Peers {
count += len(subnetmap)
func (pm *PeerMap) MarshalJSON() ([]byte, error) {
pm.RLock()
defer pm.RUnlock()
return json.Marshal(pm.peers)
}
func (pm *PeerMap) UnmarshalJSON(b []byte) error {
pm.Lock()
defer pm.Unlock()
peers := make(map[PeerKey]Peer)
err := json.Unmarshal(b, &peers)
if err != nil {
return err
} }
return count
pm.peers = peers
return nil
} }
// Purge iterates over all of the peers within a PeerMap and deletes them if // Purge iterates over all of the peers within a PeerMap and deletes them if
@ -97,79 +126,56 @@ func (pm *PeerMap) Purge(unixtime int64) {
pm.Lock() pm.Lock()
defer pm.Unlock() defer pm.Unlock()
for key, peer := range pm.peers { for _, subnetmap := range pm.Peers {
if peer.LastAnnounce <= unixtime { for key, peer := range subnetmap {
delete(pm.peers, key) if peer.LastAnnounce <= unixtime {
if pm.seeders { delete(subnetmap, key)
stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6()) if pm.Seeders {
} else { stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6())
stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6()) } else {
stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6())
}
} }
} }
} }
} }
// AppendPeers adds peers to given IPv4 or IPv6 lists. If a preferred Subnet is // AppendPeers adds peers to given IPv4 or IPv6 lists.
// configured, this function calls AppendSubnetPeers.
func (pm *PeerMap) AppendPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) { func (pm *PeerMap) AppendPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) {
if ann.Config.PreferredSubnet { maskedIP := pm.mask(ann.Peer.IP)
return pm.AppendSubnetPeers(ipv4s, ipv6s, ann, wanted)
}
pm.RLock() pm.RLock()
defer pm.RUnlock() defer pm.RUnlock()
count := 0 count := 0
for _, peer := range pm.peers { // Attempt to append all the peers in the same subnet.
for _, peer := range pm.Peers[maskedIP] {
if count >= wanted { if count >= wanted {
break break
} else if peersEquivalent(&peer, ann.Peer) { } else if peersEquivalent(&peer, ann.Peer) {
continue continue
} } else {
appendPeer(&ipv4s, &ipv6s, ann, &peer, &count)
}
return ipv4s, ipv6s
}
// AppendSubnetPeers is an alternative version of AppendPeers used when the
// config variable PreferredSubnet is enabled.
func (pm *PeerMap) AppendSubnetPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) {
var subnetIPv4 net.IPNet
var subnetIPv6 net.IPNet
if ann.HasIPv4() {
subnetIPv4 = net.IPNet{ann.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)}
}
if ann.HasIPv6() {
subnetIPv6 = net.IPNet{ann.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)}
}
pm.RLock()
defer pm.RUnlock()
// Iterate over the peers twice: first add only peers in the same subnet and
// if we still need more peers grab ones that haven't already been added.
count := 0
for _, checkInSubnet := range [2]bool{true, false} {
for _, peer := range pm.peers {
if count >= wanted {
break
}
inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IP)
inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IP)
if peersEquivalent(&peer, ann.Peer) || checkInSubnet != (inSubnet4 || inSubnet6) {
continue
}
// Add the peers optionally respecting AF
appendPeer(&ipv4s, &ipv6s, ann, &peer, &count) appendPeer(&ipv4s, &ipv6s, ann, &peer, &count)
} }
} }
// Add any more peers out of the other subnets.
for subnet, peers := range pm.Peers {
if subnet == maskedIP {
continue
} else {
for _, peer := range peers {
if count >= wanted {
break
} else if peersEquivalent(&peer, ann.Peer) {
continue
} else {
appendPeer(&ipv4s, &ipv6s, ann, &peer, &count)
}
}
}
}
return ipv4s, ipv6s return ipv4s, ipv6s
} }

View file

@ -9,7 +9,7 @@ import "github.com/chihaya/chihaya/tracker/models"
// HandleScrape encapsulates all the logic of handling a BitTorrent client's // HandleScrape encapsulates all the logic of handling a BitTorrent client's
// scrape without being coupled to any transport protocol. // scrape without being coupled to any transport protocol.
func (tkr *Tracker) HandleScrape(scrape *models.Scrape, w Writer) (err error) { func (tkr *Tracker) HandleScrape(scrape *models.Scrape, w Writer) (err error) {
if tkr.cfg.PrivateEnabled { if tkr.Config.PrivateEnabled {
if _, err = tkr.FindUser(scrape.Passkey); err != nil { if _, err = tkr.FindUser(scrape.Passkey); err != nil {
return err return err
} }

View file

@ -19,9 +19,8 @@ import (
// Tracker represents the logic necessary to service BitTorrent announces, // Tracker represents the logic necessary to service BitTorrent announces,
// independently of the underlying data transports used. // independently of the underlying data transports used.
type Tracker struct { type Tracker struct {
cfg *config.Config Config *config.Config
backend backend.Conn Backend backend.Conn
*Storage *Storage
} }
@ -34,8 +33,8 @@ func New(cfg *config.Config) (*Tracker, error) {
} }
tkr := &Tracker{ tkr := &Tracker{
cfg: cfg, Config: cfg,
backend: bc, Backend: bc,
Storage: NewStorage(), Storage: NewStorage(),
} }
@ -54,7 +53,7 @@ func New(cfg *config.Config) (*Tracker, error) {
// Close gracefully shutdowns a Tracker by closing any database connections. // Close gracefully shutdowns a Tracker by closing any database connections.
func (tkr *Tracker) Close() error { func (tkr *Tracker) Close() error {
return tkr.backend.Close() return tkr.Backend.Close()
} }
// LoadApprovedClients loads a list of client IDs into the tracker's storage. // LoadApprovedClients loads a list of client IDs into the tracker's storage.