From b910fdabf5b94e50aaa3ab2b8ae79c65040caccd Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 23 Sep 2014 23:00:50 -0400 Subject: [PATCH] Organize peers by subnet. This commit restructures PeerMaps to be a map from Subnet to PeerID to Peer. This reduces the complexity require to gather peers from the same subnet. --- config/config.go | 11 ++- http/announce_test.go | 4 +- tracker/announce.go | 22 ++--- tracker/models/models.go | 17 ++-- tracker/models/peermap.go | 182 ++++++++++++++++++++------------------ tracker/scrape.go | 2 +- tracker/tracker.go | 11 ++- 7 files changed, 131 insertions(+), 118 deletions(-) diff --git a/config/config.go b/config/config.go index 8d2118d..1c10ba1 100644 --- a/config/config.go +++ b/config/config.go @@ -40,16 +40,19 @@ type DriverConfig struct { Params map[string]string `json:"params,omitempty"` } +type SubnetConfig struct { + PreferredSubnet bool `json:"preferred_subnet,omitempty"` + PreferredIPv4Subnet int `json:"preferred_ipv4_subnet,omitempty"` + PreferredIPv6Subnet int `json:"preferred_ipv6_subnet,omitempty"` +} + // NetConfig is the configuration used to tune networking behaviour. type NetConfig struct { AllowIPSpoofing bool `json:"allow_ip_spoofing"` DualStackedPeers bool `json:"dual_stacked_peers"` RealIPHeader string `json:"real_ip_header"` RespectAF bool `json:"respect_af"` - - PreferredSubnet bool `json:"preferred_subnet,omitempty"` - PreferredIPv4Subnet int `json:"preferred_ipv4_subnet,omitempty"` - PreferredIPv6Subnet int `json:"preferred_ipv6_subnet,omitempty"` + SubnetConfig } type StatsConfig struct { diff --git a/http/announce_test.go b/http/announce_test.go index ce2bf48..f79594f 100644 --- a/http/announce_test.go +++ b/http/announce_test.go @@ -359,8 +359,8 @@ func loadPrivateTestData(tkr *tracker.Tracker) { torrent := &models.Torrent{ ID: 1, Infohash: infoHash, - Seeders: models.NewPeerMap(true), - Leechers: models.NewPeerMap(false), + Seeders: models.NewPeerMap(true, tkr.Config), + Leechers: models.NewPeerMap(false, tkr.Config), } tkr.PutTorrent(torrent) diff --git a/tracker/announce.go b/tracker/announce.go index ffc74a3..71dca82 100644 --- a/tracker/announce.go +++ b/tracker/announce.go @@ -12,14 +12,14 @@ import ( // HandleAnnounce encapsulates all of the logic of handling a BitTorrent // client's Announce without being coupled to any transport protocol. func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) { - if tkr.cfg.ClientWhitelistEnabled { + if tkr.Config.ClientWhitelistEnabled { if err = tkr.ClientApproved(ann.ClientID()); err != nil { return err } } var user *models.User - if tkr.cfg.PrivateEnabled { + if tkr.Config.PrivateEnabled { if user, err = tkr.FindUser(ann.Passkey); err != nil { return err } @@ -27,11 +27,11 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) { torrent, err := tkr.FindTorrent(ann.Infohash) - if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled { + if err == models.ErrTorrentDNE && !tkr.Config.PrivateEnabled { torrent = &models.Torrent{ Infohash: ann.Infohash, - Seeders: models.NewPeerMap(true), - Leechers: models.NewPeerMap(false), + Seeders: models.NewPeerMap(true, tkr.Config), + Leechers: models.NewPeerMap(false, tkr.Config), } tkr.PutTorrent(torrent) @@ -43,7 +43,7 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) { ann.BuildPeer(user, torrent) var delta *models.AnnounceDelta - if tkr.cfg.PrivateEnabled { + if tkr.Config.PrivateEnabled { delta = newAnnounceDelta(ann, torrent) } @@ -57,13 +57,13 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) { return err } - if tkr.cfg.PrivateEnabled { + if tkr.Config.PrivateEnabled { delta.Created = created delta.Snatched = snatched - if err = tkr.backend.RecordAnnounce(delta); err != nil { + if err = tkr.Backend.RecordAnnounce(delta); err != nil { return err } - } else if tkr.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 { + } else if tkr.Config.PurgeInactiveTorrents && torrent.PeerCount() == 0 { // Rather than deleting the torrent explicitly, let the tracker driver // ensure there are no race conditions. tkr.PurgeInactiveTorrent(torrent.Infohash) @@ -230,8 +230,8 @@ func (tkr *Tracker) handlePeerEvent(ann *models.Announce, p *models.Peer) (snatc } case ann.Event == "completed": - v4seed := t.Seeders.Contains(models.NewPeerKey(p.ID, false)) - v6seed := t.Seeders.Contains(models.NewPeerKey(p.ID, true)) + v4seed := t.Seeders.Contains(p.Key()) + v6seed := t.Seeders.Contains(p.Key()) if t.Leechers.Contains(p.Key()) { err = tkr.leecherFinished(t, p) diff --git a/tracker/models/models.go b/tracker/models/models.go index 577dab3..f74e17b 100644 --- a/tracker/models/models.go +++ b/tracker/models/models.go @@ -8,6 +8,7 @@ package models import ( "net" + "strings" "time" "github.com/chihaya/chihaya/config" @@ -45,12 +46,16 @@ func (e NotFoundError) Error() string { return string(e) } type PeerList []Peer type PeerKey string -func NewPeerKey(peerID string, ipv6 bool) PeerKey { - if ipv6 { - return PeerKey("6:" + peerID) - } +func NewPeerKey(peerID string, ip net.IP) PeerKey { + return PeerKey(peerID + "//" + ip.String()) +} - return PeerKey("4:" + peerID) +func (pk PeerKey) IP() net.IP { + return net.ParseIP(strings.Split(string(pk), "//")[1]) +} + +func (pk PeerKey) PeerID() string { + return strings.Split(string(pk), "//")[0] } // Peer is a participant in a swarm. @@ -79,7 +84,7 @@ func (p *Peer) HasIPv6() bool { } func (p *Peer) Key() PeerKey { - return NewPeerKey(p.ID, p.HasIPv6()) + return NewPeerKey(p.ID, p.IP) } // Torrent is a swarm for a given torrent file. diff --git a/tracker/models/peermap.go b/tracker/models/peermap.go index d06e1aa..26e476b 100644 --- a/tracker/models/peermap.go +++ b/tracker/models/peermap.go @@ -5,36 +5,70 @@ package models import ( - "encoding/json" "net" "sync" + "github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/stats" ) -// PeerMap is a thread-safe map from PeerKeys to Peers. +// PeerMap is a thread-safe map from PeerKeys to Peers. When PreferredSubnet is +// enabled, it is a thread-safe map of maps from MaskedIPs to Peerkeys to Peers. type PeerMap struct { - seeders bool - peers map[PeerKey]Peer + Peers map[string]map[PeerKey]Peer `json:"peers"` + Seeders bool `json:"seeders"` + Config config.SubnetConfig `json:"config"` sync.RWMutex } // NewPeerMap initializes the map for a new PeerMap. -func NewPeerMap(seeders bool) PeerMap { - return PeerMap{ - peers: make(map[PeerKey]Peer), - seeders: seeders, +func NewPeerMap(seeders bool, cfg *config.Config) PeerMap { + pm := PeerMap{ + Peers: make(map[string]map[PeerKey]Peer), + Seeders: seeders, + Config: cfg.NetConfig.SubnetConfig, } + + if !pm.Config.PreferredSubnet { + pm.Peers[""] = make(map[PeerKey]Peer) + } + + return pm } // Contains is true if a peer is contained with a PeerMap. -func (pm *PeerMap) Contains(pk PeerKey) (exists bool) { +func (pm *PeerMap) Contains(pk PeerKey) bool { pm.RLock() defer pm.RUnlock() - _, exists = pm.peers[pk] + if pm.Config.PreferredSubnet { + maskedIP := pm.mask(pk.IP()) + peers, exists := pm.Peers[maskedIP] + if !exists { + return false + } - return + _, exists = peers[pk] + return exists + } + + _, exists := pm.Peers[""][pk] + return exists +} + +func (pm *PeerMap) mask(ip net.IP) string { + if !pm.Config.PreferredSubnet { + return "" + } + + var maskedIP net.IP + if len(ip) == net.IPv6len { + maskedIP = ip.Mask(net.CIDRMask(pm.Config.PreferredIPv6Subnet, 128)) + } else { + maskedIP = ip.Mask(net.CIDRMask(pm.Config.PreferredIPv4Subnet, 32)) + } + + return maskedIP.String() } // LookUp is a thread-safe read from a PeerMap. @@ -42,7 +76,12 @@ func (pm *PeerMap) LookUp(pk PeerKey) (peer Peer, exists bool) { pm.RLock() defer pm.RUnlock() - peer, exists = pm.peers[pk] + maskedIP := pm.mask(pk.IP()) + peers, exists := pm.Peers[maskedIP] + if !exists { + return Peer{}, false + } + peer, exists = peers[pk] return } @@ -52,7 +91,12 @@ func (pm *PeerMap) Put(p Peer) { pm.Lock() defer pm.Unlock() - pm.peers[p.Key()] = p + maskedIP := pm.mask(p.IP) + _, exists := pm.Peers[maskedIP] + if !exists { + pm.Peers[maskedIP] = make(map[PeerKey]Peer) + } + pm.Peers[maskedIP][p.Key()] = p } // Delete is a thread-safe delete from a PeerMap. @@ -60,7 +104,8 @@ func (pm *PeerMap) Delete(pk PeerKey) { pm.Lock() defer pm.Unlock() - delete(pm.peers, pk) + maskedIP := pm.mask(pk.IP()) + delete(pm.Peers[maskedIP], pk) } // Len returns the number of peers within a PeerMap. @@ -68,27 +113,11 @@ func (pm *PeerMap) Len() int { pm.RLock() defer pm.RUnlock() - return len(pm.peers) -} - -func (pm *PeerMap) MarshalJSON() ([]byte, error) { - pm.RLock() - defer pm.RUnlock() - return json.Marshal(pm.peers) -} - -func (pm *PeerMap) UnmarshalJSON(b []byte) error { - pm.Lock() - defer pm.Unlock() - - peers := make(map[PeerKey]Peer) - err := json.Unmarshal(b, &peers) - if err != nil { - return err + var count int + for _, subnetmap := range pm.Peers { + count += len(subnetmap) } - - pm.peers = peers - return nil + return count } // Purge iterates over all of the peers within a PeerMap and deletes them if @@ -97,79 +126,56 @@ func (pm *PeerMap) Purge(unixtime int64) { pm.Lock() defer pm.Unlock() - for key, peer := range pm.peers { - if peer.LastAnnounce <= unixtime { - delete(pm.peers, key) - if pm.seeders { - stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6()) - } else { - stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6()) + for _, subnetmap := range pm.Peers { + for key, peer := range subnetmap { + if peer.LastAnnounce <= unixtime { + delete(subnetmap, key) + if pm.Seeders { + stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6()) + } else { + stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6()) + } } } } } -// AppendPeers adds peers to given IPv4 or IPv6 lists. If a preferred Subnet is -// configured, this function calls AppendSubnetPeers. +// AppendPeers adds peers to given IPv4 or IPv6 lists. func (pm *PeerMap) AppendPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) { - if ann.Config.PreferredSubnet { - return pm.AppendSubnetPeers(ipv4s, ipv6s, ann, wanted) - } + maskedIP := pm.mask(ann.Peer.IP) pm.RLock() defer pm.RUnlock() count := 0 - for _, peer := range pm.peers { + // Attempt to append all the peers in the same subnet. + for _, peer := range pm.Peers[maskedIP] { if count >= wanted { break } else if peersEquivalent(&peer, ann.Peer) { continue - } - appendPeer(&ipv4s, &ipv6s, ann, &peer, &count) - } - - return ipv4s, ipv6s -} - -// AppendSubnetPeers is an alternative version of AppendPeers used when the -// config variable PreferredSubnet is enabled. -func (pm *PeerMap) AppendSubnetPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) { - var subnetIPv4 net.IPNet - var subnetIPv6 net.IPNet - - if ann.HasIPv4() { - subnetIPv4 = net.IPNet{ann.IPv4, net.CIDRMask(ann.Config.PreferredIPv4Subnet, 32)} - } - - if ann.HasIPv6() { - subnetIPv6 = net.IPNet{ann.IPv6, net.CIDRMask(ann.Config.PreferredIPv6Subnet, 128)} - } - - pm.RLock() - defer pm.RUnlock() - - // Iterate over the peers twice: first add only peers in the same subnet and - // if we still need more peers grab ones that haven't already been added. - count := 0 - for _, checkInSubnet := range [2]bool{true, false} { - for _, peer := range pm.peers { - if count >= wanted { - break - } - - inSubnet4 := peer.HasIPv4() && subnetIPv4.Contains(peer.IP) - inSubnet6 := peer.HasIPv6() && subnetIPv6.Contains(peer.IP) - - if peersEquivalent(&peer, ann.Peer) || checkInSubnet != (inSubnet4 || inSubnet6) { - continue - } - - // Add the peers optionally respecting AF + } else { appendPeer(&ipv4s, &ipv6s, ann, &peer, &count) } } + // Add any more peers out of the other subnets. + for subnet, peers := range pm.Peers { + if subnet == maskedIP { + continue + } else { + for _, peer := range peers { + if count >= wanted { + break + } else if peersEquivalent(&peer, ann.Peer) { + continue + } else { + appendPeer(&ipv4s, &ipv6s, ann, &peer, &count) + } + } + } + } + return ipv4s, ipv6s } diff --git a/tracker/scrape.go b/tracker/scrape.go index 4fee8c2..186f7cb 100644 --- a/tracker/scrape.go +++ b/tracker/scrape.go @@ -9,7 +9,7 @@ import "github.com/chihaya/chihaya/tracker/models" // HandleScrape encapsulates all the logic of handling a BitTorrent client's // scrape without being coupled to any transport protocol. func (tkr *Tracker) HandleScrape(scrape *models.Scrape, w Writer) (err error) { - if tkr.cfg.PrivateEnabled { + if tkr.Config.PrivateEnabled { if _, err = tkr.FindUser(scrape.Passkey); err != nil { return err } diff --git a/tracker/tracker.go b/tracker/tracker.go index 4d2ae71..6f11c65 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -19,9 +19,8 @@ import ( // Tracker represents the logic necessary to service BitTorrent announces, // independently of the underlying data transports used. type Tracker struct { - cfg *config.Config - backend backend.Conn - + Config *config.Config + Backend backend.Conn *Storage } @@ -34,8 +33,8 @@ func New(cfg *config.Config) (*Tracker, error) { } tkr := &Tracker{ - cfg: cfg, - backend: bc, + Config: cfg, + Backend: bc, Storage: NewStorage(), } @@ -54,7 +53,7 @@ func New(cfg *config.Config) (*Tracker, error) { // Close gracefully shutdowns a Tracker by closing any database connections. func (tkr *Tracker) Close() error { - return tkr.backend.Close() + return tkr.Backend.Close() } // LoadApprovedClients loads a list of client IDs into the tracker's storage.