From da19ed3e21a3cc9fc181d92d263b1ca295ddb357 Mon Sep 17 00:00:00 2001 From: Justin Li Date: Thu, 17 Jul 2014 00:09:56 -0400 Subject: [PATCH] Separate tracker logic from the http package, step 1 --- chihaya.go | 10 +- drivers/backend/backend.go | 4 +- drivers/backend/noop/driver.go | 2 +- http/announce.go | 312 ------------------ http/announce_private_test.go | 12 +- http/api.go | 74 +++-- http/http.go | 88 ++--- http/http_test.go | 4 +- http/scrape.go | 83 ----- http/writer.go | 108 ++++++ tracker/announce.go | 236 +++++++++++++ drivers/tracker/tracker.go => tracker/conn.go | 6 +- {drivers/tracker => tracker}/memory/conn.go | 4 +- {drivers/tracker => tracker}/memory/driver.go | 4 +- {drivers/tracker => tracker}/memory/pool.go | 4 +- {models => tracker/models}/models.go | 6 +- {models => tracker/models}/models_test.go | 0 tracker/responses.go | 31 ++ {drivers/tracker => tracker}/routines.go | 0 tracker/scrape.go | 40 +++ tracker/tracker.go | 43 +++ 21 files changed, 568 insertions(+), 503 deletions(-) delete mode 100644 http/announce.go delete mode 100644 http/scrape.go create mode 100644 http/writer.go create mode 100644 tracker/announce.go rename drivers/tracker/tracker.go => tracker/conn.go (94%) rename {drivers/tracker => tracker}/memory/conn.go (98%) rename {drivers/tracker => tracker}/memory/driver.go (88%) rename {drivers/tracker => tracker}/memory/pool.go (86%) rename {models => tracker/models}/models.go (97%) rename {models => tracker/models}/models_test.go (100%) create mode 100644 tracker/responses.go rename {drivers/tracker => tracker}/routines.go (100%) create mode 100644 tracker/scrape.go create mode 100644 tracker/tracker.go diff --git a/chihaya.go b/chihaya.go index 149bfdb..7bb5a34 100644 --- a/chihaya.go +++ b/chihaya.go @@ -14,10 +14,11 @@ import ( "github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/http" + "github.com/chihaya/chihaya/tracker" // See the README for how to import custom drivers. _ "github.com/chihaya/chihaya/drivers/backend/noop" - _ "github.com/chihaya/chihaya/drivers/tracker/memory" + _ "github.com/chihaya/chihaya/tracker/memory" ) var ( @@ -67,6 +68,11 @@ func Boot() { glog.V(1).Infof("Loaded config file: %s", configPath) } - http.Serve(cfg) + tkr, err := tracker.New(cfg) + if err != nil { + glog.Fatal("New: ", err) + } + + http.Serve(cfg, tkr) glog.Info("Gracefully shut down") } diff --git a/drivers/backend/backend.go b/drivers/backend/backend.go index 054102d..a883e28 100644 --- a/drivers/backend/backend.go +++ b/drivers/backend/backend.go @@ -11,7 +11,7 @@ import ( "fmt" "github.com/chihaya/chihaya/config" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker/models" ) var drivers = make(map[string]Driver) @@ -35,7 +35,7 @@ func Register(name string, driver Driver) { drivers[name] = driver } -// Open creates a connection specified by a models configuration. +// Open creates a connection specified by a configuration. func Open(cfg *config.DriverConfig) (Conn, error) { driver, ok := drivers[cfg.Name] if !ok { diff --git a/drivers/backend/noop/driver.go b/drivers/backend/noop/driver.go index edb7e56..4c072ae 100644 --- a/drivers/backend/noop/driver.go +++ b/drivers/backend/noop/driver.go @@ -9,7 +9,7 @@ package noop import ( "github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/drivers/backend" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker/models" ) type driver struct{} diff --git a/http/announce.go b/http/announce.go deleted file mode 100644 index 03d593a..0000000 --- a/http/announce.go +++ /dev/null @@ -1,312 +0,0 @@ -// Copyright 2014 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. - -package http - -import ( - "bytes" - "net/http" - - "github.com/julienschmidt/httprouter" - - "github.com/chihaya/bencode" - "github.com/chihaya/chihaya/drivers/tracker" - "github.com/chihaya/chihaya/models" -) - -func (t *Tracker) ServeAnnounce(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - ann, err := models.NewAnnounce(t.cfg, r, p) - if err == models.ErrMalformedRequest { - fail(w, r, err) - return http.StatusOK, nil - } else if err != nil { - return http.StatusInternalServerError, err - } - - conn, err := t.pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - if t.cfg.Whitelist { - err = conn.FindClient(ann.ClientID()) - if err == tracker.ErrClientUnapproved { - fail(w, r, err) - return http.StatusOK, nil - } else if err != nil { - return http.StatusInternalServerError, err - } - } - - var user *models.User - if t.cfg.Private { - user, err = conn.FindUser(ann.Passkey) - if err == tracker.ErrUserDNE { - fail(w, r, err) - return http.StatusOK, nil - } else if err != nil { - return http.StatusInternalServerError, err - } - } - - var torrent *models.Torrent - torrent, err = conn.FindTorrent(ann.Infohash) - switch { - case !t.cfg.Private && err == tracker.ErrTorrentDNE: - torrent = &models.Torrent{ - Infohash: ann.Infohash, - Seeders: make(map[string]models.Peer), - Leechers: make(map[string]models.Peer), - } - - err = conn.PutTorrent(torrent) - if err != nil { - return http.StatusInternalServerError, err - } - - case t.cfg.Private && err == tracker.ErrTorrentDNE: - fail(w, r, err) - return http.StatusOK, nil - - case err != nil: - return http.StatusInternalServerError, err - } - - peer := models.NewPeer(ann, user, torrent) - - created, err := updateTorrent(conn, ann, peer, torrent) - if err != nil { - return http.StatusInternalServerError, err - } - - snatched, err := handleEvent(conn, ann, peer, user, torrent) - if err != nil { - return http.StatusInternalServerError, err - } - - if t.cfg.Private { - delta := models.NewAnnounceDelta(ann, peer, user, torrent, created, snatched) - err = t.backend.RecordAnnounce(delta) - if err != nil { - return http.StatusInternalServerError, err - } - } else if t.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 { - // Rather than deleting the torrent explicitly, let the tracker driver - // ensure there are no race conditions. - conn.PurgeInactiveTorrent(torrent.Infohash) - } - - resp := newAnnounceResponse(ann, user, torrent) - bencoder := bencode.NewEncoder(w) - err = bencoder.Encode(resp) - if err != nil { - return http.StatusInternalServerError, err - } - - return http.StatusOK, nil -} - -func updateTorrent(c tracker.Conn, a *models.Announce, p *models.Peer, t *models.Torrent) (created bool, err error) { - c.TouchTorrent(t.Infohash) - - switch { - case t.InSeederPool(p): - err = c.PutSeeder(t.Infohash, p) - if err != nil { - return - } - t.Seeders[p.Key()] = *p - - case t.InLeecherPool(p): - err = c.PutLeecher(t.Infohash, p) - if err != nil { - return - } - t.Leechers[p.Key()] = *p - - default: - if a.Left == 0 { - err = c.PutSeeder(t.Infohash, p) - if err != nil { - return - } - t.Seeders[p.Key()] = *p - } else { - err = c.PutLeecher(t.Infohash, p) - if err != nil { - return - } - t.Leechers[p.Key()] = *p - } - created = true - } - - return -} - -func handleEvent(c tracker.Conn, a *models.Announce, p *models.Peer, u *models.User, t *models.Torrent) (snatched bool, err error) { - switch { - case a.Event == "stopped" || a.Event == "paused": - if t.InSeederPool(p) { - err = c.DeleteSeeder(t.Infohash, p.Key()) - if err != nil { - return - } - delete(t.Seeders, p.Key()) - } else if t.InLeecherPool(p) { - err = c.DeleteLeecher(t.Infohash, p.Key()) - if err != nil { - return - } - delete(t.Leechers, p.Key()) - } - - case a.Event == "completed": - err = c.IncrementSnatches(t.Infohash) - if err != nil { - return - } - snatched = true - t.Snatches++ - - if t.InLeecherPool(p) { - err = tracker.LeecherFinished(c, t.Infohash, p) - if err != nil { - return - } - } - - case t.InLeecherPool(p) && a.Left == 0: - // A leecher completed but the event was never received. - err = tracker.LeecherFinished(c, t.Infohash, p) - if err != nil { - return - } - } - - return -} - -func newAnnounceResponse(a *models.Announce, u *models.User, t *models.Torrent) bencode.Dict { - seedCount := len(t.Seeders) - leechCount := len(t.Leechers) - - var peerCount int - if a.Left == 0 { - peerCount = minInt(a.NumWant, leechCount) - } else { - peerCount = minInt(a.NumWant, leechCount+seedCount-1) - } - - resp := bencode.NewDict() - resp["complete"] = seedCount - resp["incomplete"] = leechCount - resp["interval"] = a.Config.Announce.Duration - resp["min interval"] = a.Config.MinAnnounce.Duration - - if a.NumWant > 0 && a.Event != "stopped" && a.Event != "paused" { - ipv4s, ipv6s := getPeers(a, u, t, peerCount) - if a.Compact { - resp["peers"] = compactPeers("ipv4", ipv4s) - resp["peers6"] = compactPeers("ipv6", ipv6s) - } else { - resp["peers"] = peersList(ipv4s, ipv6s) - } - } - - return resp -} - -func compactPeers(ipv string, peers []models.Peer) []byte { - var compactPeers bytes.Buffer - - switch ipv { - case "ipv4": - for _, peer := range peers { - if ip := peer.IP.To4(); ip != nil { - compactPeers.Write(ip) - compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)}) - } - } - - case "ipv6": - for _, peer := range peers { - if ip := peer.IP.To16(); ip != nil { - compactPeers.Write(ip) - compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)}) - } - } - } - - return compactPeers.Bytes() -} - -func getPeers(a *models.Announce, u *models.User, t *models.Torrent, peerCount int) (ipv4s, ipv6s []models.Peer) { - if a.Left == 0 { - // If they're seeding, give them only leechers. - splitPeers(&ipv4s, &ipv6s, a, u, t.Leechers, peerCount) - } else { - // If they're leeching, prioritize giving them seeders. - count := splitPeers(&ipv4s, &ipv6s, a, u, t.Seeders, peerCount) - splitPeers(&ipv4s, &ipv6s, a, u, t.Leechers, peerCount-count) - } - - return -} - -func splitPeers(ipv4s, ipv6s *[]models.Peer, a *models.Announce, u *models.User, peers map[string]models.Peer, peerCount int) (count int) { - for _, peer := range peers { - if count >= peerCount { - break - } - - if a.Config.Private && peer.UserID == u.ID { - continue - } - - if ip := peer.IP.To4(); len(ip) == 4 { - *ipv4s = append(*ipv4s, peer) - } else if ip := peer.IP.To16(); len(ip) == 16 { - *ipv6s = append(*ipv6s, peer) - } - - count++ - } - - return -} - -func peersList(ipv4s, ipv6s []models.Peer) []bencode.Dict { - var peers []bencode.Dict - - for _, peer := range ipv4s { - pd := peerDict(&peer) - peers = append(peers, pd) - } - - for _, peer := range ipv6s { - pd := peerDict(&peer) - peers = append(peers, pd) - } - - return peers -} - -func peerDict(peer *models.Peer) bencode.Dict { - pd := bencode.NewDict() - - pd["ip"] = peer.IP.String() - pd["peer id"] = peer.ID - pd["port"] = peer.Port - - return pd -} - -func minInt(a, b int) int { - if a < b { - return a - } - - return b -} diff --git a/http/announce_private_test.go b/http/announce_private_test.go index 713bf98..2cc2f05 100644 --- a/http/announce_private_test.go +++ b/http/announce_private_test.go @@ -9,7 +9,7 @@ import ( "github.com/chihaya/bencode" "github.com/chihaya/chihaya/config" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker" ) func TestPrivateAnnounce(t *testing.T) { @@ -64,7 +64,7 @@ func TestPrivateAnnounce(t *testing.T) { } func loadTestData(tkr *Tracker) error { - conn, err := tkr.pool.Get() + conn, err := tkr.Pool.Get() if err != nil { return err } @@ -76,7 +76,7 @@ func loadTestData(tkr *Tracker) error { } for i, passkey := range users { - err = conn.PutUser(&models.User{ + err = conn.PutUser(&tracker.User{ ID: uint64(i + 1), Passkey: passkey, }) @@ -91,11 +91,11 @@ func loadTestData(tkr *Tracker) error { return err } - torrent := &models.Torrent{ + torrent := &tracker.Torrent{ ID: 1, Infohash: infoHash, - Seeders: make(map[string]models.Peer), - Leechers: make(map[string]models.Peer), + Seeders: make(map[string]tracker.Peer), + Leechers: make(map[string]tracker.Peer), } return conn.PutTorrent(torrent) diff --git a/http/api.go b/http/api.go index 331e173..84f73e5 100644 --- a/http/api.go +++ b/http/api.go @@ -12,13 +12,13 @@ import ( "github.com/julienschmidt/httprouter" - "github.com/chihaya/chihaya/drivers/tracker" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker" + "github.com/chihaya/chihaya/tracker/models" ) const jsonContentType = "application/json; charset=UTF-8" -func (t *Tracker) check(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { +func (s *Server) check(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { _, err := w.Write([]byte("An easter egg goes here.")) if err != nil { return http.StatusInternalServerError, err @@ -27,8 +27,44 @@ func (t *Tracker) check(w http.ResponseWriter, r *http.Request, p httprouter.Par return http.StatusOK, nil } -func (t *Tracker) getTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := t.pool.Get() +func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + ann, err := models.NewAnnounce(s.config, r, p) + writer := &Writer{w} + + if err == models.ErrMalformedRequest { + writer.WriteError(err) + return http.StatusOK, nil + } else if err != nil { + return http.StatusInternalServerError, err + } + + if err = s.tracker.HandleAnnounce(ann, writer); err != nil { + return http.StatusInternalServerError, err + } + + return http.StatusOK, nil +} + +func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + scrape, err := models.NewScrape(s.config, r, p) + writer := &Writer{w} + + if err == models.ErrMalformedRequest { + writer.WriteError(err) + return http.StatusOK, nil + } else if err != nil { + return http.StatusInternalServerError, err + } + + if err = s.tracker.HandleScrape(scrape, writer); err != nil { + return http.StatusInternalServerError, err + } + + return http.StatusOK, nil +} + +func (s *Server) getTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } @@ -55,7 +91,7 @@ func (t *Tracker) getTorrent(w http.ResponseWriter, r *http.Request, p httproute return http.StatusOK, nil } -func (t *Tracker) putTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { +func (s *Server) putTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { body, err := ioutil.ReadAll(r.Body) if err != nil { return http.StatusInternalServerError, err @@ -67,7 +103,7 @@ func (t *Tracker) putTorrent(w http.ResponseWriter, r *http.Request, p httproute return http.StatusBadRequest, err } - conn, err := t.pool.Get() + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } @@ -80,8 +116,8 @@ func (t *Tracker) putTorrent(w http.ResponseWriter, r *http.Request, p httproute return http.StatusOK, nil } -func (t *Tracker) delTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := t.pool.Get() +func (s *Server) delTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } @@ -101,8 +137,8 @@ func (t *Tracker) delTorrent(w http.ResponseWriter, r *http.Request, p httproute return http.StatusOK, nil } -func (t *Tracker) getUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := t.pool.Get() +func (s *Server) getUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } @@ -124,7 +160,7 @@ func (t *Tracker) getUser(w http.ResponseWriter, r *http.Request, p httprouter.P return http.StatusOK, nil } -func (t *Tracker) putUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { +func (s *Server) putUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { body, err := ioutil.ReadAll(r.Body) if err != nil { return http.StatusInternalServerError, err @@ -136,7 +172,7 @@ func (t *Tracker) putUser(w http.ResponseWriter, r *http.Request, p httprouter.P return http.StatusBadRequest, err } - conn, err := t.pool.Get() + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } @@ -149,8 +185,8 @@ func (t *Tracker) putUser(w http.ResponseWriter, r *http.Request, p httprouter.P return http.StatusOK, nil } -func (t *Tracker) delUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := t.pool.Get() +func (s *Server) delUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } @@ -165,8 +201,8 @@ func (t *Tracker) delUser(w http.ResponseWriter, r *http.Request, p httprouter.P return http.StatusOK, nil } -func (t *Tracker) putClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := t.pool.Get() +func (s *Server) putClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } @@ -179,8 +215,8 @@ func (t *Tracker) putClient(w http.ResponseWriter, r *http.Request, p httprouter return http.StatusOK, nil } -func (t *Tracker) delClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := t.pool.Get() +func (s *Server) delClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { + conn, err := s.tracker.Pool.Get() if err != nil { return http.StatusInternalServerError, err } diff --git a/http/http.go b/http/http.go index aaaf286..9637440 100644 --- a/http/http.go +++ b/http/http.go @@ -13,45 +13,17 @@ import ( "github.com/julienschmidt/httprouter" "github.com/stretchr/graceful" - "github.com/chihaya/bencode" "github.com/chihaya/chihaya/config" - "github.com/chihaya/chihaya/drivers/backend" - "github.com/chihaya/chihaya/drivers/tracker" + "github.com/chihaya/chihaya/tracker" ) -type Tracker struct { - cfg *config.Config - pool tracker.Pool - backend backend.Conn -} - -func NewTracker(cfg *config.Config) (*Tracker, error) { - tp, err := tracker.Open(&cfg.Tracker) - if err != nil { - return nil, err - } - - bc, err := backend.Open(&cfg.Backend) - if err != nil { - return nil, err - } - - go tracker.PurgeInactivePeers( - tp, - cfg.PurgeInactiveTorrents, - cfg.Announce.Duration*2, - cfg.Announce.Duration, - ) - - return &Tracker{ - cfg: cfg, - pool: tp, - backend: bc, - }, nil -} - type ResponseHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (int, error) +type Server struct { + config *config.Config + tracker *tracker.Tracker +} + func makeHandler(handler ResponseHandler) httprouter.Handle { return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { start := time.Now() @@ -73,47 +45,39 @@ func makeHandler(handler ResponseHandler) httprouter.Handle { } } -func NewRouter(t *Tracker, cfg *config.Config) *httprouter.Router { +func NewRouter(s *Server) *httprouter.Router { r := httprouter.New() - if cfg.Private { - r.GET("/users/:passkey/announce", makeHandler(t.ServeAnnounce)) - r.GET("/users/:passkey/scrape", makeHandler(t.ServeScrape)) + if s.config.Private { + r.GET("/users/:passkey/announce", makeHandler(s.serveAnnounce)) + r.GET("/users/:passkey/scrape", makeHandler(s.serveScrape)) - r.PUT("/users/:passkey", makeHandler(t.putUser)) - r.DELETE("/users/:passkey", makeHandler(t.delUser)) + r.PUT("/users/:passkey", makeHandler(s.putUser)) + r.DELETE("/users/:passkey", makeHandler(s.delUser)) } else { - r.GET("/announce", makeHandler(t.ServeAnnounce)) - r.GET("/scrape", makeHandler(t.ServeScrape)) + r.GET("/announce", makeHandler(s.serveAnnounce)) + r.GET("/scrape", makeHandler(s.serveScrape)) } - if cfg.Whitelist { - r.PUT("/clients/:clientID", makeHandler(t.putClient)) - r.DELETE("/clients/:clientID", makeHandler(t.delClient)) + if s.config.Whitelist { + r.PUT("/clients/:clientID", makeHandler(s.putClient)) + r.DELETE("/clients/:clientID", makeHandler(s.delClient)) } - r.GET("/torrents/:infohash", makeHandler(t.getTorrent)) - r.PUT("/torrents/:infohash", makeHandler(t.putTorrent)) - r.DELETE("/torrents/:infohash", makeHandler(t.delTorrent)) - r.GET("/check", makeHandler(t.check)) + r.GET("/torrents/:infohash", makeHandler(s.getTorrent)) + r.PUT("/torrents/:infohash", makeHandler(s.putTorrent)) + r.DELETE("/torrents/:infohash", makeHandler(s.delTorrent)) + r.GET("/check", makeHandler(s.check)) return r } -func Serve(cfg *config.Config) { - t, err := NewTracker(cfg) - if err != nil { - glog.Fatal("New: ", err) +func Serve(cfg *config.Config, tkr *tracker.Tracker) { + srv := &Server{ + config: cfg, + tracker: tkr, } glog.V(0).Info("Starting on ", cfg.Addr) - graceful.Run(cfg.Addr, cfg.RequestTimeout.Duration, NewRouter(t, cfg)) -} - -func fail(w http.ResponseWriter, r *http.Request, err error) { - dict := bencode.NewDict() - dict["failure reason"] = err.Error() - - bencoder := bencode.NewEncoder(w) - bencoder.Encode(dict) + graceful.Run(cfg.Addr, cfg.RequestTimeout.Duration, NewRouter(srv)) } diff --git a/http/http_test.go b/http/http_test.go index 267da85..78be33b 100644 --- a/http/http_test.go +++ b/http/http_test.go @@ -15,7 +15,7 @@ import ( "github.com/chihaya/chihaya/config" _ "github.com/chihaya/chihaya/drivers/backend/noop" - _ "github.com/chihaya/chihaya/drivers/tracker/memory" + _ "github.com/chihaya/chihaya/tracker/memory" ) type params map[string]string @@ -23,7 +23,7 @@ type params map[string]string var infoHash = string([]byte{0x89, 0xd4, 0xbc, 0x52, 0x11, 0x16, 0xca, 0x1d, 0x42, 0xa2, 0xf3, 0x0d, 0x1f, 0x27, 0x4d, 0x94, 0xe4, 0x68, 0x1d, 0xaf}) func setupTracker(cfg *config.Config) (*httptest.Server, error) { - tkr, err := NewTracker(cfg) + tkr, err := tracker.New(cfg) if err != nil { return nil, err } diff --git a/http/scrape.go b/http/scrape.go deleted file mode 100644 index b1135b2..0000000 --- a/http/scrape.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2014 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. - -package http - -import ( - "net/http" - - "github.com/julienschmidt/httprouter" - - "github.com/chihaya/bencode" - "github.com/chihaya/chihaya/drivers/tracker" - "github.com/chihaya/chihaya/models" -) - -func (t *Tracker) ServeScrape(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - scrape, err := models.NewScrape(t.cfg, r, p) - if err == models.ErrMalformedRequest { - fail(w, r, err) - return http.StatusOK, nil - } else if err != nil { - return http.StatusInternalServerError, err - } - - conn, err := t.pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - if t.cfg.Private { - _, err = conn.FindUser(scrape.Passkey) - if err == tracker.ErrUserDNE { - fail(w, r, err) - return http.StatusOK, nil - } else if err != nil { - return http.StatusInternalServerError, err - } - } - - var torrents []*models.Torrent - for _, infohash := range scrape.Infohashes { - torrent, err := conn.FindTorrent(infohash) - if err == tracker.ErrTorrentDNE { - fail(w, r, err) - return http.StatusOK, nil - } else if err != nil { - return http.StatusInternalServerError, err - } - torrents = append(torrents, torrent) - } - - resp := bencode.NewDict() - resp["files"] = filesDict(torrents) - - bencoder := bencode.NewEncoder(w) - err = bencoder.Encode(resp) - if err != nil { - return http.StatusInternalServerError, err - } - - return http.StatusOK, nil -} - -func filesDict(torrents []*models.Torrent) bencode.Dict { - d := bencode.NewDict() - - for _, torrent := range torrents { - d[torrent.Infohash] = torrentDict(torrent) - } - - return d -} - -func torrentDict(torrent *models.Torrent) bencode.Dict { - d := bencode.NewDict() - - d["complete"] = len(torrent.Seeders) - d["incomplete"] = len(torrent.Leechers) - d["downloaded"] = torrent.Snatches - - return d -} diff --git a/http/writer.go b/http/writer.go new file mode 100644 index 0000000..4c8f81d --- /dev/null +++ b/http/writer.go @@ -0,0 +1,108 @@ +// Copyright 2014 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package http + +import ( + "bytes" + "net/http" + + "github.com/chihaya/bencode" + "github.com/chihaya/chihaya/tracker" + "github.com/chihaya/chihaya/tracker/models" +) + +type Writer struct { + http.ResponseWriter +} + +func (w *Writer) WriteError(err error) error { + dict := bencode.NewDict() + dict["failure reason"] = err.Error() + + bencoder := bencode.NewEncoder(w) + return bencoder.Encode(dict) +} + +func (w *Writer) WriteAnnounce(res *tracker.AnnounceResponse) error { + dict := bencode.NewDict() + dict["complete"] = res.Complete + dict["incomplete"] = res.Incomplete + dict["interval"] = res.Interval + dict["min interval"] = res.MinInterval + + if res.Compact { + dict["peers"] = compactPeers(false, res.IPv4Peers) + dict["peers6"] = compactPeers(true, res.IPv6Peers) + } else { + dict["peers"] = peersList(res.IPv6Peers, res.IPv4Peers) + } + + bencoder := bencode.NewEncoder(w) + return bencoder.Encode(dict) +} + +func (w *Writer) WriteScrape(res *tracker.ScrapeResponse) error { + dict := bencode.NewDict() + dict["files"] = filesDict(res.Files) + + bencoder := bencode.NewEncoder(w) + return bencoder.Encode(dict) +} + +func compactPeers(ipv6 bool, peers tracker.PeerList) []byte { + var compactPeers bytes.Buffer + + if ipv6 { + for _, peer := range peers { + if ip := peer.IP.To16(); ip != nil { + compactPeers.Write(ip) + compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)}) + } + } + } else { + for _, peer := range peers { + if ip := peer.IP.To4(); ip != nil { + compactPeers.Write(ip) + compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)}) + } + } + } + + return compactPeers.Bytes() +} + +func peersList(ipv4s, ipv6s tracker.PeerList) (peers []bencode.Dict) { + for _, peer := range ipv4s { + peers = append(peers, peerDict(peer)) + } + for _, peer := range ipv6s { + peers = append(peers, peerDict(peer)) + } + return peers +} + +func peerDict(peer *models.Peer) bencode.Dict { + return bencode.Dict{ + "ip": peer.IP.String(), + "peer id": peer.ID, + "port": peer.Port, + } +} + +func filesDict(torrents []*models.Torrent) bencode.Dict { + d := bencode.NewDict() + for _, torrent := range torrents { + d[torrent.Infohash] = torrentDict(torrent) + } + return d +} + +func torrentDict(torrent *models.Torrent) bencode.Dict { + return bencode.Dict{ + "complete": len(torrent.Seeders), + "incomplete": len(torrent.Leechers), + "downloaded": torrent.Snatches, + } +} diff --git a/tracker/announce.go b/tracker/announce.go new file mode 100644 index 0000000..d7ce96c --- /dev/null +++ b/tracker/announce.go @@ -0,0 +1,236 @@ +// Copyright 2014 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package tracker + +import ( + "github.com/chihaya/chihaya/tracker/models" +) + +func (t *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { + conn, err := t.Pool.Get() + if err != nil { + return err + } + + if t.cfg.Whitelist { + err = conn.FindClient(ann.ClientID()) + if err == ErrClientUnapproved { + w.WriteError(err) + return nil + } else if err != nil { + return err + } + } + + var user *models.User + if t.cfg.Private { + user, err = conn.FindUser(ann.Passkey) + if err == ErrUserDNE { + w.WriteError(err) + return nil + } else if err != nil { + return err + } + } + + var torrent *models.Torrent + torrent, err = conn.FindTorrent(ann.Infohash) + switch { + case !t.cfg.Private && err == ErrTorrentDNE: + torrent = &models.Torrent{ + Infohash: ann.Infohash, + Seeders: make(map[string]models.Peer), + Leechers: make(map[string]models.Peer), + } + + err = conn.PutTorrent(torrent) + if err != nil { + return err + } + + case t.cfg.Private && err == ErrTorrentDNE: + w.WriteError(err) + return nil + + case err != nil: + return err + } + + peer := models.NewPeer(ann, user, torrent) + + created, err := updateTorrent(conn, ann, peer, torrent) + if err != nil { + return err + } + + snatched, err := handleEvent(conn, ann, peer, user, torrent) + if err != nil { + return err + } + + if t.cfg.Private { + delta := models.NewAnnounceDelta(ann, peer, user, torrent, created, snatched) + err = t.backend.RecordAnnounce(delta) + if err != nil { + return err + } + } else if t.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 { + // Rather than deleting the torrent explicitly, let the tracker driver + // ensure there are no race conditions. + conn.PurgeInactiveTorrent(torrent.Infohash) + } + + return w.WriteAnnounce(newAnnounceResponse(ann, user, torrent)) +} + +func updateTorrent(c Conn, a *models.Announce, p *models.Peer, t *models.Torrent) (created bool, err error) { + c.TouchTorrent(t.Infohash) + + switch { + case t.InSeederPool(p): + err = c.PutSeeder(t.Infohash, p) + if err != nil { + return + } + t.Seeders[p.Key()] = *p + + case t.InLeecherPool(p): + err = c.PutLeecher(t.Infohash, p) + if err != nil { + return + } + t.Leechers[p.Key()] = *p + + default: + if a.Left == 0 { + err = c.PutSeeder(t.Infohash, p) + if err != nil { + return + } + t.Seeders[p.Key()] = *p + } else { + err = c.PutLeecher(t.Infohash, p) + if err != nil { + return + } + t.Leechers[p.Key()] = *p + } + created = true + } + + return +} + +func handleEvent(c Conn, a *models.Announce, p *models.Peer, u *models.User, t *models.Torrent) (snatched bool, err error) { + switch { + case a.Event == "stopped" || a.Event == "paused": + if t.InSeederPool(p) { + err = c.DeleteSeeder(t.Infohash, p.Key()) + if err != nil { + return + } + delete(t.Seeders, p.Key()) + } else if t.InLeecherPool(p) { + err = c.DeleteLeecher(t.Infohash, p.Key()) + if err != nil { + return + } + delete(t.Leechers, p.Key()) + } + + case a.Event == "completed": + err = c.IncrementSnatches(t.Infohash) + if err != nil { + return + } + snatched = true + t.Snatches++ + + if t.InLeecherPool(p) { + err = LeecherFinished(c, t.Infohash, p) + if err != nil { + return + } + } + + case t.InLeecherPool(p) && a.Left == 0: + // A leecher completed but the event was never received. + err = LeecherFinished(c, t.Infohash, p) + if err != nil { + return + } + } + + return +} + +func newAnnounceResponse(a *models.Announce, u *models.User, t *models.Torrent) *AnnounceResponse { + seedCount := len(t.Seeders) + leechCount := len(t.Leechers) + + var peerCount int + if a.Left == 0 { + peerCount = minInt(a.NumWant, leechCount) + } else { + peerCount = minInt(a.NumWant, leechCount+seedCount-1) + } + + res := &AnnounceResponse{ + Complete: seedCount, + Incomplete: leechCount, + Interval: a.Config.Announce.Duration, + MinInterval: a.Config.MinAnnounce.Duration, + Compact: a.Compact, + } + + if a.NumWant > 0 && a.Event != "stopped" && a.Event != "paused" { + res.IPv4Peers, res.IPv6Peers = getPeers(a, u, t, peerCount) + } + + return res +} + +func getPeers(a *models.Announce, u *models.User, t *models.Torrent, wanted int) (ipv4s, ipv6s PeerList) { + if a.Left == 0 { + // If they're seeding, give them only leechers. + return appendPeers(nil, nil, u, t.Leechers, wanted) + } else { + // If they're leeching, prioritize giving them seeders. + ipv4s, ipv6s = appendPeers(nil, nil, u, t.Seeders, wanted) + return appendPeers(ipv4s, ipv6s, u, t.Leechers, wanted-len(ipv4s)-len(ipv6s)) + } +} + +func appendPeers(ipv4s, ipv6s PeerList, u *models.User, peers map[string]models.Peer, wanted int) (PeerList, PeerList) { + count := 0 + + for _, peer := range peers { + if count >= wanted { + break + } + + if u != nil && peer.UserID == u.ID { + continue + } + + if ip := peer.IP.To4(); len(ip) == 4 { + ipv4s = append(ipv4s, &peer) + } else if ip := peer.IP.To16(); len(ip) == 16 { + ipv6s = append(ipv6s, &peer) + } + + count++ + } + + return ipv4s, ipv6s +} + +func minInt(a, b int) int { + if a < b { + return a + } + + return b +} diff --git a/drivers/tracker/tracker.go b/tracker/conn.go similarity index 94% rename from drivers/tracker/tracker.go rename to tracker/conn.go index 1b15b01..acb8636 100644 --- a/drivers/tracker/tracker.go +++ b/tracker/conn.go @@ -2,8 +2,6 @@ // Use of this source code is governed by the BSD 2-Clause license, // which can be found in the LICENSE file. -// Package tracker provides a generic interface for manipulating a -// BitTorrent tracker's fast-moving data. package tracker import ( @@ -12,7 +10,7 @@ import ( "time" "github.com/chihaya/chihaya/config" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker/models" ) var ( @@ -47,7 +45,7 @@ func Register(name string, driver Driver) { drivers[name] = driver } -// Open creates a pool of data store connections specified by a models configuration. +// Open creates a pool of data store connections specified by a configuration. func Open(cfg *config.DriverConfig) (Pool, error) { driver, ok := drivers[cfg.Name] if !ok { diff --git a/drivers/tracker/memory/conn.go b/tracker/memory/conn.go similarity index 98% rename from drivers/tracker/memory/conn.go rename to tracker/memory/conn.go index e589ec0..340ce7f 100644 --- a/drivers/tracker/memory/conn.go +++ b/tracker/memory/conn.go @@ -8,8 +8,8 @@ import ( "runtime" "time" - "github.com/chihaya/chihaya/drivers/tracker" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker" + "github.com/chihaya/chihaya/tracker/models" ) // Conn implements a connection to a memory-based tracker data store. diff --git a/drivers/tracker/memory/driver.go b/tracker/memory/driver.go similarity index 88% rename from drivers/tracker/memory/driver.go rename to tracker/memory/driver.go index 2d9c9b8..7e4ab8d 100644 --- a/drivers/tracker/memory/driver.go +++ b/tracker/memory/driver.go @@ -8,8 +8,8 @@ package memory import ( "github.com/chihaya/chihaya/config" - "github.com/chihaya/chihaya/drivers/tracker" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker" + "github.com/chihaya/chihaya/tracker/models" ) type driver struct{} diff --git a/drivers/tracker/memory/pool.go b/tracker/memory/pool.go similarity index 86% rename from drivers/tracker/memory/pool.go rename to tracker/memory/pool.go index 7dbb440..1fb1c3a 100644 --- a/drivers/tracker/memory/pool.go +++ b/tracker/memory/pool.go @@ -7,8 +7,8 @@ package memory import ( "sync" - "github.com/chihaya/chihaya/drivers/tracker" - "github.com/chihaya/chihaya/models" + "github.com/chihaya/chihaya/tracker" + "github.com/chihaya/chihaya/tracker/models" ) type Pool struct { diff --git a/models/models.go b/tracker/models/models.go similarity index 97% rename from models/models.go rename to tracker/models/models.go index b01d2a7..c0777e9 100644 --- a/models/models.go +++ b/tracker/models/models.go @@ -2,8 +2,6 @@ // Use of this source code is governed by the BSD 2-Clause license, // which can be found in the LICENSE file. -// Package models implements the models for an abstraction over the -// multiple data stores used by a BitTorrent tracker. package models import ( @@ -19,7 +17,7 @@ import ( ) var ( - // ErrMalformedRequest is returned when an http.Request does no have the + // ErrMalformedRequest is returned when an http.Request does not have the // required parameters to create a model. ErrMalformedRequest = errors.New("malformed request") ) @@ -45,7 +43,7 @@ type Peer struct { // respectively. func NewPeer(a *Announce, u *User, t *Torrent) *Peer { if a == nil { - panic("models: announce cannot equal nil") + panic("tracker: announce cannot equal nil") } var userID uint64 diff --git a/models/models_test.go b/tracker/models/models_test.go similarity index 100% rename from models/models_test.go rename to tracker/models/models_test.go diff --git a/tracker/responses.go b/tracker/responses.go new file mode 100644 index 0000000..9297caa --- /dev/null +++ b/tracker/responses.go @@ -0,0 +1,31 @@ +// Copyright 2014 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package tracker + +import ( + "time" + + "github.com/chihaya/chihaya/tracker/models" +) + +type PeerList []*models.Peer + +type AnnounceResponse struct { + Complete, Incomplete int + Interval, MinInterval time.Duration + IPv4Peers, IPv6Peers PeerList + + Compact bool +} + +type ScrapeResponse struct { + Files []*models.Torrent +} + +type Writer interface { + WriteError(error) error + WriteAnnounce(*AnnounceResponse) error + WriteScrape(*ScrapeResponse) error +} diff --git a/drivers/tracker/routines.go b/tracker/routines.go similarity index 100% rename from drivers/tracker/routines.go rename to tracker/routines.go diff --git a/tracker/scrape.go b/tracker/scrape.go new file mode 100644 index 0000000..d53fe6d --- /dev/null +++ b/tracker/scrape.go @@ -0,0 +1,40 @@ +// Copyright 2014 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package tracker + +import ( + "github.com/chihaya/chihaya/tracker/models" +) + +func (t *Tracker) HandleScrape(scrape *models.Scrape, w Writer) error { + conn, err := t.Pool.Get() + if err != nil { + return err + } + + if t.cfg.Private { + _, err = conn.FindUser(scrape.Passkey) + if err == ErrUserDNE { + w.WriteError(err) + return nil + } else if err != nil { + return err + } + } + + var torrents []*models.Torrent + for _, infohash := range scrape.Infohashes { + torrent, err := conn.FindTorrent(infohash) + if err == ErrTorrentDNE { + w.WriteError(err) + return nil + } else if err != nil { + return err + } + torrents = append(torrents, torrent) + } + + return w.WriteScrape(&ScrapeResponse{torrents}) +} diff --git a/tracker/tracker.go b/tracker/tracker.go new file mode 100644 index 0000000..24aeaad --- /dev/null +++ b/tracker/tracker.go @@ -0,0 +1,43 @@ +// Copyright 2014 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package tracker provides a generic interface for manipulating a +// BitTorrent tracker's fast-moving data. +package tracker + +import ( + "github.com/chihaya/chihaya/config" + "github.com/chihaya/chihaya/drivers/backend" +) + +type Tracker struct { + cfg *config.Config + Pool Pool + backend backend.Conn +} + +func New(cfg *config.Config) (*Tracker, error) { + pool, err := Open(&cfg.Tracker) + if err != nil { + return nil, err + } + + bc, err := backend.Open(&cfg.Backend) + if err != nil { + return nil, err + } + + go PurgeInactivePeers( + pool, + cfg.PurgeInactiveTorrents, + cfg.Announce.Duration*2, + cfg.Announce.Duration, + ) + + return &Tracker{ + cfg: cfg, + Pool: pool, + backend: bc, + }, nil +}