Separate tracker logic from the http package, step 1

This commit is contained in:
Justin Li 2014-07-17 00:09:56 -04:00
parent f8047ef8ab
commit da19ed3e21
21 changed files with 568 additions and 503 deletions

View file

@ -14,10 +14,11 @@ import (
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/http"
"github.com/chihaya/chihaya/tracker"
// See the README for how to import custom drivers.
_ "github.com/chihaya/chihaya/drivers/backend/noop"
_ "github.com/chihaya/chihaya/drivers/tracker/memory"
_ "github.com/chihaya/chihaya/tracker/memory"
)
var (
@ -67,6 +68,11 @@ func Boot() {
glog.V(1).Infof("Loaded config file: %s", configPath)
}
http.Serve(cfg)
tkr, err := tracker.New(cfg)
if err != nil {
glog.Fatal("New: ", err)
}
http.Serve(cfg, tkr)
glog.Info("Gracefully shut down")
}

View file

@ -11,7 +11,7 @@ import (
"fmt"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker/models"
)
var drivers = make(map[string]Driver)
@ -35,7 +35,7 @@ func Register(name string, driver Driver) {
drivers[name] = driver
}
// Open creates a connection specified by a models configuration.
// Open creates a connection specified by a configuration.
func Open(cfg *config.DriverConfig) (Conn, error) {
driver, ok := drivers[cfg.Name]
if !ok {

View file

@ -9,7 +9,7 @@ package noop
import (
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/drivers/backend"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker/models"
)
type driver struct{}

View file

@ -1,312 +0,0 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package http
import (
"bytes"
"net/http"
"github.com/julienschmidt/httprouter"
"github.com/chihaya/bencode"
"github.com/chihaya/chihaya/drivers/tracker"
"github.com/chihaya/chihaya/models"
)
func (t *Tracker) ServeAnnounce(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
ann, err := models.NewAnnounce(t.cfg, r, p)
if err == models.ErrMalformedRequest {
fail(w, r, err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
conn, err := t.pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
if t.cfg.Whitelist {
err = conn.FindClient(ann.ClientID())
if err == tracker.ErrClientUnapproved {
fail(w, r, err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
}
var user *models.User
if t.cfg.Private {
user, err = conn.FindUser(ann.Passkey)
if err == tracker.ErrUserDNE {
fail(w, r, err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
}
var torrent *models.Torrent
torrent, err = conn.FindTorrent(ann.Infohash)
switch {
case !t.cfg.Private && err == tracker.ErrTorrentDNE:
torrent = &models.Torrent{
Infohash: ann.Infohash,
Seeders: make(map[string]models.Peer),
Leechers: make(map[string]models.Peer),
}
err = conn.PutTorrent(torrent)
if err != nil {
return http.StatusInternalServerError, err
}
case t.cfg.Private && err == tracker.ErrTorrentDNE:
fail(w, r, err)
return http.StatusOK, nil
case err != nil:
return http.StatusInternalServerError, err
}
peer := models.NewPeer(ann, user, torrent)
created, err := updateTorrent(conn, ann, peer, torrent)
if err != nil {
return http.StatusInternalServerError, err
}
snatched, err := handleEvent(conn, ann, peer, user, torrent)
if err != nil {
return http.StatusInternalServerError, err
}
if t.cfg.Private {
delta := models.NewAnnounceDelta(ann, peer, user, torrent, created, snatched)
err = t.backend.RecordAnnounce(delta)
if err != nil {
return http.StatusInternalServerError, err
}
} else if t.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 {
// Rather than deleting the torrent explicitly, let the tracker driver
// ensure there are no race conditions.
conn.PurgeInactiveTorrent(torrent.Infohash)
}
resp := newAnnounceResponse(ann, user, torrent)
bencoder := bencode.NewEncoder(w)
err = bencoder.Encode(resp)
if err != nil {
return http.StatusInternalServerError, err
}
return http.StatusOK, nil
}
func updateTorrent(c tracker.Conn, a *models.Announce, p *models.Peer, t *models.Torrent) (created bool, err error) {
c.TouchTorrent(t.Infohash)
switch {
case t.InSeederPool(p):
err = c.PutSeeder(t.Infohash, p)
if err != nil {
return
}
t.Seeders[p.Key()] = *p
case t.InLeecherPool(p):
err = c.PutLeecher(t.Infohash, p)
if err != nil {
return
}
t.Leechers[p.Key()] = *p
default:
if a.Left == 0 {
err = c.PutSeeder(t.Infohash, p)
if err != nil {
return
}
t.Seeders[p.Key()] = *p
} else {
err = c.PutLeecher(t.Infohash, p)
if err != nil {
return
}
t.Leechers[p.Key()] = *p
}
created = true
}
return
}
func handleEvent(c tracker.Conn, a *models.Announce, p *models.Peer, u *models.User, t *models.Torrent) (snatched bool, err error) {
switch {
case a.Event == "stopped" || a.Event == "paused":
if t.InSeederPool(p) {
err = c.DeleteSeeder(t.Infohash, p.Key())
if err != nil {
return
}
delete(t.Seeders, p.Key())
} else if t.InLeecherPool(p) {
err = c.DeleteLeecher(t.Infohash, p.Key())
if err != nil {
return
}
delete(t.Leechers, p.Key())
}
case a.Event == "completed":
err = c.IncrementSnatches(t.Infohash)
if err != nil {
return
}
snatched = true
t.Snatches++
if t.InLeecherPool(p) {
err = tracker.LeecherFinished(c, t.Infohash, p)
if err != nil {
return
}
}
case t.InLeecherPool(p) && a.Left == 0:
// A leecher completed but the event was never received.
err = tracker.LeecherFinished(c, t.Infohash, p)
if err != nil {
return
}
}
return
}
func newAnnounceResponse(a *models.Announce, u *models.User, t *models.Torrent) bencode.Dict {
seedCount := len(t.Seeders)
leechCount := len(t.Leechers)
var peerCount int
if a.Left == 0 {
peerCount = minInt(a.NumWant, leechCount)
} else {
peerCount = minInt(a.NumWant, leechCount+seedCount-1)
}
resp := bencode.NewDict()
resp["complete"] = seedCount
resp["incomplete"] = leechCount
resp["interval"] = a.Config.Announce.Duration
resp["min interval"] = a.Config.MinAnnounce.Duration
if a.NumWant > 0 && a.Event != "stopped" && a.Event != "paused" {
ipv4s, ipv6s := getPeers(a, u, t, peerCount)
if a.Compact {
resp["peers"] = compactPeers("ipv4", ipv4s)
resp["peers6"] = compactPeers("ipv6", ipv6s)
} else {
resp["peers"] = peersList(ipv4s, ipv6s)
}
}
return resp
}
func compactPeers(ipv string, peers []models.Peer) []byte {
var compactPeers bytes.Buffer
switch ipv {
case "ipv4":
for _, peer := range peers {
if ip := peer.IP.To4(); ip != nil {
compactPeers.Write(ip)
compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)})
}
}
case "ipv6":
for _, peer := range peers {
if ip := peer.IP.To16(); ip != nil {
compactPeers.Write(ip)
compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)})
}
}
}
return compactPeers.Bytes()
}
func getPeers(a *models.Announce, u *models.User, t *models.Torrent, peerCount int) (ipv4s, ipv6s []models.Peer) {
if a.Left == 0 {
// If they're seeding, give them only leechers.
splitPeers(&ipv4s, &ipv6s, a, u, t.Leechers, peerCount)
} else {
// If they're leeching, prioritize giving them seeders.
count := splitPeers(&ipv4s, &ipv6s, a, u, t.Seeders, peerCount)
splitPeers(&ipv4s, &ipv6s, a, u, t.Leechers, peerCount-count)
}
return
}
func splitPeers(ipv4s, ipv6s *[]models.Peer, a *models.Announce, u *models.User, peers map[string]models.Peer, peerCount int) (count int) {
for _, peer := range peers {
if count >= peerCount {
break
}
if a.Config.Private && peer.UserID == u.ID {
continue
}
if ip := peer.IP.To4(); len(ip) == 4 {
*ipv4s = append(*ipv4s, peer)
} else if ip := peer.IP.To16(); len(ip) == 16 {
*ipv6s = append(*ipv6s, peer)
}
count++
}
return
}
func peersList(ipv4s, ipv6s []models.Peer) []bencode.Dict {
var peers []bencode.Dict
for _, peer := range ipv4s {
pd := peerDict(&peer)
peers = append(peers, pd)
}
for _, peer := range ipv6s {
pd := peerDict(&peer)
peers = append(peers, pd)
}
return peers
}
func peerDict(peer *models.Peer) bencode.Dict {
pd := bencode.NewDict()
pd["ip"] = peer.IP.String()
pd["peer id"] = peer.ID
pd["port"] = peer.Port
return pd
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}

View file

@ -9,7 +9,7 @@ import (
"github.com/chihaya/bencode"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker"
)
func TestPrivateAnnounce(t *testing.T) {
@ -64,7 +64,7 @@ func TestPrivateAnnounce(t *testing.T) {
}
func loadTestData(tkr *Tracker) error {
conn, err := tkr.pool.Get()
conn, err := tkr.Pool.Get()
if err != nil {
return err
}
@ -76,7 +76,7 @@ func loadTestData(tkr *Tracker) error {
}
for i, passkey := range users {
err = conn.PutUser(&models.User{
err = conn.PutUser(&tracker.User{
ID: uint64(i + 1),
Passkey: passkey,
})
@ -91,11 +91,11 @@ func loadTestData(tkr *Tracker) error {
return err
}
torrent := &models.Torrent{
torrent := &tracker.Torrent{
ID: 1,
Infohash: infoHash,
Seeders: make(map[string]models.Peer),
Leechers: make(map[string]models.Peer),
Seeders: make(map[string]tracker.Peer),
Leechers: make(map[string]tracker.Peer),
}
return conn.PutTorrent(torrent)

View file

@ -12,13 +12,13 @@ import (
"github.com/julienschmidt/httprouter"
"github.com/chihaya/chihaya/drivers/tracker"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker"
"github.com/chihaya/chihaya/tracker/models"
)
const jsonContentType = "application/json; charset=UTF-8"
func (t *Tracker) check(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
func (s *Server) check(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
_, err := w.Write([]byte("An easter egg goes here."))
if err != nil {
return http.StatusInternalServerError, err
@ -27,8 +27,44 @@ func (t *Tracker) check(w http.ResponseWriter, r *http.Request, p httprouter.Par
return http.StatusOK, nil
}
func (t *Tracker) getTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := t.pool.Get()
func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
ann, err := models.NewAnnounce(s.config, r, p)
writer := &Writer{w}
if err == models.ErrMalformedRequest {
writer.WriteError(err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
if err = s.tracker.HandleAnnounce(ann, writer); err != nil {
return http.StatusInternalServerError, err
}
return http.StatusOK, nil
}
func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
scrape, err := models.NewScrape(s.config, r, p)
writer := &Writer{w}
if err == models.ErrMalformedRequest {
writer.WriteError(err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
if err = s.tracker.HandleScrape(scrape, writer); err != nil {
return http.StatusInternalServerError, err
}
return http.StatusOK, nil
}
func (s *Server) getTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
@ -55,7 +91,7 @@ func (t *Tracker) getTorrent(w http.ResponseWriter, r *http.Request, p httproute
return http.StatusOK, nil
}
func (t *Tracker) putTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
func (s *Server) putTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return http.StatusInternalServerError, err
@ -67,7 +103,7 @@ func (t *Tracker) putTorrent(w http.ResponseWriter, r *http.Request, p httproute
return http.StatusBadRequest, err
}
conn, err := t.pool.Get()
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
@ -80,8 +116,8 @@ func (t *Tracker) putTorrent(w http.ResponseWriter, r *http.Request, p httproute
return http.StatusOK, nil
}
func (t *Tracker) delTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := t.pool.Get()
func (s *Server) delTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
@ -101,8 +137,8 @@ func (t *Tracker) delTorrent(w http.ResponseWriter, r *http.Request, p httproute
return http.StatusOK, nil
}
func (t *Tracker) getUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := t.pool.Get()
func (s *Server) getUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
@ -124,7 +160,7 @@ func (t *Tracker) getUser(w http.ResponseWriter, r *http.Request, p httprouter.P
return http.StatusOK, nil
}
func (t *Tracker) putUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
func (s *Server) putUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return http.StatusInternalServerError, err
@ -136,7 +172,7 @@ func (t *Tracker) putUser(w http.ResponseWriter, r *http.Request, p httprouter.P
return http.StatusBadRequest, err
}
conn, err := t.pool.Get()
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
@ -149,8 +185,8 @@ func (t *Tracker) putUser(w http.ResponseWriter, r *http.Request, p httprouter.P
return http.StatusOK, nil
}
func (t *Tracker) delUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := t.pool.Get()
func (s *Server) delUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
@ -165,8 +201,8 @@ func (t *Tracker) delUser(w http.ResponseWriter, r *http.Request, p httprouter.P
return http.StatusOK, nil
}
func (t *Tracker) putClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := t.pool.Get()
func (s *Server) putClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
@ -179,8 +215,8 @@ func (t *Tracker) putClient(w http.ResponseWriter, r *http.Request, p httprouter
return http.StatusOK, nil
}
func (t *Tracker) delClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := t.pool.Get()
func (s *Server) delClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}

View file

@ -13,45 +13,17 @@ import (
"github.com/julienschmidt/httprouter"
"github.com/stretchr/graceful"
"github.com/chihaya/bencode"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/drivers/backend"
"github.com/chihaya/chihaya/drivers/tracker"
"github.com/chihaya/chihaya/tracker"
)
type Tracker struct {
cfg *config.Config
pool tracker.Pool
backend backend.Conn
}
func NewTracker(cfg *config.Config) (*Tracker, error) {
tp, err := tracker.Open(&cfg.Tracker)
if err != nil {
return nil, err
}
bc, err := backend.Open(&cfg.Backend)
if err != nil {
return nil, err
}
go tracker.PurgeInactivePeers(
tp,
cfg.PurgeInactiveTorrents,
cfg.Announce.Duration*2,
cfg.Announce.Duration,
)
return &Tracker{
cfg: cfg,
pool: tp,
backend: bc,
}, nil
}
type ResponseHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (int, error)
type Server struct {
config *config.Config
tracker *tracker.Tracker
}
func makeHandler(handler ResponseHandler) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
start := time.Now()
@ -73,47 +45,39 @@ func makeHandler(handler ResponseHandler) httprouter.Handle {
}
}
func NewRouter(t *Tracker, cfg *config.Config) *httprouter.Router {
func NewRouter(s *Server) *httprouter.Router {
r := httprouter.New()
if cfg.Private {
r.GET("/users/:passkey/announce", makeHandler(t.ServeAnnounce))
r.GET("/users/:passkey/scrape", makeHandler(t.ServeScrape))
if s.config.Private {
r.GET("/users/:passkey/announce", makeHandler(s.serveAnnounce))
r.GET("/users/:passkey/scrape", makeHandler(s.serveScrape))
r.PUT("/users/:passkey", makeHandler(t.putUser))
r.DELETE("/users/:passkey", makeHandler(t.delUser))
r.PUT("/users/:passkey", makeHandler(s.putUser))
r.DELETE("/users/:passkey", makeHandler(s.delUser))
} else {
r.GET("/announce", makeHandler(t.ServeAnnounce))
r.GET("/scrape", makeHandler(t.ServeScrape))
r.GET("/announce", makeHandler(s.serveAnnounce))
r.GET("/scrape", makeHandler(s.serveScrape))
}
if cfg.Whitelist {
r.PUT("/clients/:clientID", makeHandler(t.putClient))
r.DELETE("/clients/:clientID", makeHandler(t.delClient))
if s.config.Whitelist {
r.PUT("/clients/:clientID", makeHandler(s.putClient))
r.DELETE("/clients/:clientID", makeHandler(s.delClient))
}
r.GET("/torrents/:infohash", makeHandler(t.getTorrent))
r.PUT("/torrents/:infohash", makeHandler(t.putTorrent))
r.DELETE("/torrents/:infohash", makeHandler(t.delTorrent))
r.GET("/check", makeHandler(t.check))
r.GET("/torrents/:infohash", makeHandler(s.getTorrent))
r.PUT("/torrents/:infohash", makeHandler(s.putTorrent))
r.DELETE("/torrents/:infohash", makeHandler(s.delTorrent))
r.GET("/check", makeHandler(s.check))
return r
}
func Serve(cfg *config.Config) {
t, err := NewTracker(cfg)
if err != nil {
glog.Fatal("New: ", err)
func Serve(cfg *config.Config, tkr *tracker.Tracker) {
srv := &Server{
config: cfg,
tracker: tkr,
}
glog.V(0).Info("Starting on ", cfg.Addr)
graceful.Run(cfg.Addr, cfg.RequestTimeout.Duration, NewRouter(t, cfg))
}
func fail(w http.ResponseWriter, r *http.Request, err error) {
dict := bencode.NewDict()
dict["failure reason"] = err.Error()
bencoder := bencode.NewEncoder(w)
bencoder.Encode(dict)
graceful.Run(cfg.Addr, cfg.RequestTimeout.Duration, NewRouter(srv))
}

View file

@ -15,7 +15,7 @@ import (
"github.com/chihaya/chihaya/config"
_ "github.com/chihaya/chihaya/drivers/backend/noop"
_ "github.com/chihaya/chihaya/drivers/tracker/memory"
_ "github.com/chihaya/chihaya/tracker/memory"
)
type params map[string]string
@ -23,7 +23,7 @@ type params map[string]string
var infoHash = string([]byte{0x89, 0xd4, 0xbc, 0x52, 0x11, 0x16, 0xca, 0x1d, 0x42, 0xa2, 0xf3, 0x0d, 0x1f, 0x27, 0x4d, 0x94, 0xe4, 0x68, 0x1d, 0xaf})
func setupTracker(cfg *config.Config) (*httptest.Server, error) {
tkr, err := NewTracker(cfg)
tkr, err := tracker.New(cfg)
if err != nil {
return nil, err
}

View file

@ -1,83 +0,0 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package http
import (
"net/http"
"github.com/julienschmidt/httprouter"
"github.com/chihaya/bencode"
"github.com/chihaya/chihaya/drivers/tracker"
"github.com/chihaya/chihaya/models"
)
func (t *Tracker) ServeScrape(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
scrape, err := models.NewScrape(t.cfg, r, p)
if err == models.ErrMalformedRequest {
fail(w, r, err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
conn, err := t.pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
if t.cfg.Private {
_, err = conn.FindUser(scrape.Passkey)
if err == tracker.ErrUserDNE {
fail(w, r, err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
}
var torrents []*models.Torrent
for _, infohash := range scrape.Infohashes {
torrent, err := conn.FindTorrent(infohash)
if err == tracker.ErrTorrentDNE {
fail(w, r, err)
return http.StatusOK, nil
} else if err != nil {
return http.StatusInternalServerError, err
}
torrents = append(torrents, torrent)
}
resp := bencode.NewDict()
resp["files"] = filesDict(torrents)
bencoder := bencode.NewEncoder(w)
err = bencoder.Encode(resp)
if err != nil {
return http.StatusInternalServerError, err
}
return http.StatusOK, nil
}
func filesDict(torrents []*models.Torrent) bencode.Dict {
d := bencode.NewDict()
for _, torrent := range torrents {
d[torrent.Infohash] = torrentDict(torrent)
}
return d
}
func torrentDict(torrent *models.Torrent) bencode.Dict {
d := bencode.NewDict()
d["complete"] = len(torrent.Seeders)
d["incomplete"] = len(torrent.Leechers)
d["downloaded"] = torrent.Snatches
return d
}

108
http/writer.go Normal file
View file

@ -0,0 +1,108 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package http
import (
"bytes"
"net/http"
"github.com/chihaya/bencode"
"github.com/chihaya/chihaya/tracker"
"github.com/chihaya/chihaya/tracker/models"
)
type Writer struct {
http.ResponseWriter
}
func (w *Writer) WriteError(err error) error {
dict := bencode.NewDict()
dict["failure reason"] = err.Error()
bencoder := bencode.NewEncoder(w)
return bencoder.Encode(dict)
}
func (w *Writer) WriteAnnounce(res *tracker.AnnounceResponse) error {
dict := bencode.NewDict()
dict["complete"] = res.Complete
dict["incomplete"] = res.Incomplete
dict["interval"] = res.Interval
dict["min interval"] = res.MinInterval
if res.Compact {
dict["peers"] = compactPeers(false, res.IPv4Peers)
dict["peers6"] = compactPeers(true, res.IPv6Peers)
} else {
dict["peers"] = peersList(res.IPv6Peers, res.IPv4Peers)
}
bencoder := bencode.NewEncoder(w)
return bencoder.Encode(dict)
}
func (w *Writer) WriteScrape(res *tracker.ScrapeResponse) error {
dict := bencode.NewDict()
dict["files"] = filesDict(res.Files)
bencoder := bencode.NewEncoder(w)
return bencoder.Encode(dict)
}
func compactPeers(ipv6 bool, peers tracker.PeerList) []byte {
var compactPeers bytes.Buffer
if ipv6 {
for _, peer := range peers {
if ip := peer.IP.To16(); ip != nil {
compactPeers.Write(ip)
compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)})
}
}
} else {
for _, peer := range peers {
if ip := peer.IP.To4(); ip != nil {
compactPeers.Write(ip)
compactPeers.Write([]byte{byte(peer.Port >> 8), byte(peer.Port & 0xff)})
}
}
}
return compactPeers.Bytes()
}
func peersList(ipv4s, ipv6s tracker.PeerList) (peers []bencode.Dict) {
for _, peer := range ipv4s {
peers = append(peers, peerDict(peer))
}
for _, peer := range ipv6s {
peers = append(peers, peerDict(peer))
}
return peers
}
func peerDict(peer *models.Peer) bencode.Dict {
return bencode.Dict{
"ip": peer.IP.String(),
"peer id": peer.ID,
"port": peer.Port,
}
}
func filesDict(torrents []*models.Torrent) bencode.Dict {
d := bencode.NewDict()
for _, torrent := range torrents {
d[torrent.Infohash] = torrentDict(torrent)
}
return d
}
func torrentDict(torrent *models.Torrent) bencode.Dict {
return bencode.Dict{
"complete": len(torrent.Seeders),
"incomplete": len(torrent.Leechers),
"downloaded": torrent.Snatches,
}
}

236
tracker/announce.go Normal file
View file

@ -0,0 +1,236 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"github.com/chihaya/chihaya/tracker/models"
)
func (t *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
conn, err := t.Pool.Get()
if err != nil {
return err
}
if t.cfg.Whitelist {
err = conn.FindClient(ann.ClientID())
if err == ErrClientUnapproved {
w.WriteError(err)
return nil
} else if err != nil {
return err
}
}
var user *models.User
if t.cfg.Private {
user, err = conn.FindUser(ann.Passkey)
if err == ErrUserDNE {
w.WriteError(err)
return nil
} else if err != nil {
return err
}
}
var torrent *models.Torrent
torrent, err = conn.FindTorrent(ann.Infohash)
switch {
case !t.cfg.Private && err == ErrTorrentDNE:
torrent = &models.Torrent{
Infohash: ann.Infohash,
Seeders: make(map[string]models.Peer),
Leechers: make(map[string]models.Peer),
}
err = conn.PutTorrent(torrent)
if err != nil {
return err
}
case t.cfg.Private && err == ErrTorrentDNE:
w.WriteError(err)
return nil
case err != nil:
return err
}
peer := models.NewPeer(ann, user, torrent)
created, err := updateTorrent(conn, ann, peer, torrent)
if err != nil {
return err
}
snatched, err := handleEvent(conn, ann, peer, user, torrent)
if err != nil {
return err
}
if t.cfg.Private {
delta := models.NewAnnounceDelta(ann, peer, user, torrent, created, snatched)
err = t.backend.RecordAnnounce(delta)
if err != nil {
return err
}
} else if t.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 {
// Rather than deleting the torrent explicitly, let the tracker driver
// ensure there are no race conditions.
conn.PurgeInactiveTorrent(torrent.Infohash)
}
return w.WriteAnnounce(newAnnounceResponse(ann, user, torrent))
}
func updateTorrent(c Conn, a *models.Announce, p *models.Peer, t *models.Torrent) (created bool, err error) {
c.TouchTorrent(t.Infohash)
switch {
case t.InSeederPool(p):
err = c.PutSeeder(t.Infohash, p)
if err != nil {
return
}
t.Seeders[p.Key()] = *p
case t.InLeecherPool(p):
err = c.PutLeecher(t.Infohash, p)
if err != nil {
return
}
t.Leechers[p.Key()] = *p
default:
if a.Left == 0 {
err = c.PutSeeder(t.Infohash, p)
if err != nil {
return
}
t.Seeders[p.Key()] = *p
} else {
err = c.PutLeecher(t.Infohash, p)
if err != nil {
return
}
t.Leechers[p.Key()] = *p
}
created = true
}
return
}
func handleEvent(c Conn, a *models.Announce, p *models.Peer, u *models.User, t *models.Torrent) (snatched bool, err error) {
switch {
case a.Event == "stopped" || a.Event == "paused":
if t.InSeederPool(p) {
err = c.DeleteSeeder(t.Infohash, p.Key())
if err != nil {
return
}
delete(t.Seeders, p.Key())
} else if t.InLeecherPool(p) {
err = c.DeleteLeecher(t.Infohash, p.Key())
if err != nil {
return
}
delete(t.Leechers, p.Key())
}
case a.Event == "completed":
err = c.IncrementSnatches(t.Infohash)
if err != nil {
return
}
snatched = true
t.Snatches++
if t.InLeecherPool(p) {
err = LeecherFinished(c, t.Infohash, p)
if err != nil {
return
}
}
case t.InLeecherPool(p) && a.Left == 0:
// A leecher completed but the event was never received.
err = LeecherFinished(c, t.Infohash, p)
if err != nil {
return
}
}
return
}
func newAnnounceResponse(a *models.Announce, u *models.User, t *models.Torrent) *AnnounceResponse {
seedCount := len(t.Seeders)
leechCount := len(t.Leechers)
var peerCount int
if a.Left == 0 {
peerCount = minInt(a.NumWant, leechCount)
} else {
peerCount = minInt(a.NumWant, leechCount+seedCount-1)
}
res := &AnnounceResponse{
Complete: seedCount,
Incomplete: leechCount,
Interval: a.Config.Announce.Duration,
MinInterval: a.Config.MinAnnounce.Duration,
Compact: a.Compact,
}
if a.NumWant > 0 && a.Event != "stopped" && a.Event != "paused" {
res.IPv4Peers, res.IPv6Peers = getPeers(a, u, t, peerCount)
}
return res
}
func getPeers(a *models.Announce, u *models.User, t *models.Torrent, wanted int) (ipv4s, ipv6s PeerList) {
if a.Left == 0 {
// If they're seeding, give them only leechers.
return appendPeers(nil, nil, u, t.Leechers, wanted)
} else {
// If they're leeching, prioritize giving them seeders.
ipv4s, ipv6s = appendPeers(nil, nil, u, t.Seeders, wanted)
return appendPeers(ipv4s, ipv6s, u, t.Leechers, wanted-len(ipv4s)-len(ipv6s))
}
}
func appendPeers(ipv4s, ipv6s PeerList, u *models.User, peers map[string]models.Peer, wanted int) (PeerList, PeerList) {
count := 0
for _, peer := range peers {
if count >= wanted {
break
}
if u != nil && peer.UserID == u.ID {
continue
}
if ip := peer.IP.To4(); len(ip) == 4 {
ipv4s = append(ipv4s, &peer)
} else if ip := peer.IP.To16(); len(ip) == 16 {
ipv6s = append(ipv6s, &peer)
}
count++
}
return ipv4s, ipv6s
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}

View file

@ -2,8 +2,6 @@
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package tracker provides a generic interface for manipulating a
// BitTorrent tracker's fast-moving data.
package tracker
import (
@ -12,7 +10,7 @@ import (
"time"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker/models"
)
var (
@ -47,7 +45,7 @@ func Register(name string, driver Driver) {
drivers[name] = driver
}
// Open creates a pool of data store connections specified by a models configuration.
// Open creates a pool of data store connections specified by a configuration.
func Open(cfg *config.DriverConfig) (Pool, error) {
driver, ok := drivers[cfg.Name]
if !ok {

View file

@ -8,8 +8,8 @@ import (
"runtime"
"time"
"github.com/chihaya/chihaya/drivers/tracker"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker"
"github.com/chihaya/chihaya/tracker/models"
)
// Conn implements a connection to a memory-based tracker data store.

View file

@ -8,8 +8,8 @@ package memory
import (
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/drivers/tracker"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker"
"github.com/chihaya/chihaya/tracker/models"
)
type driver struct{}

View file

@ -7,8 +7,8 @@ package memory
import (
"sync"
"github.com/chihaya/chihaya/drivers/tracker"
"github.com/chihaya/chihaya/models"
"github.com/chihaya/chihaya/tracker"
"github.com/chihaya/chihaya/tracker/models"
)
type Pool struct {

View file

@ -2,8 +2,6 @@
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package models implements the models for an abstraction over the
// multiple data stores used by a BitTorrent tracker.
package models
import (
@ -19,7 +17,7 @@ import (
)
var (
// ErrMalformedRequest is returned when an http.Request does no have the
// ErrMalformedRequest is returned when an http.Request does not have the
// required parameters to create a model.
ErrMalformedRequest = errors.New("malformed request")
)
@ -45,7 +43,7 @@ type Peer struct {
// respectively.
func NewPeer(a *Announce, u *User, t *Torrent) *Peer {
if a == nil {
panic("models: announce cannot equal nil")
panic("tracker: announce cannot equal nil")
}
var userID uint64

31
tracker/responses.go Normal file
View file

@ -0,0 +1,31 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"time"
"github.com/chihaya/chihaya/tracker/models"
)
type PeerList []*models.Peer
type AnnounceResponse struct {
Complete, Incomplete int
Interval, MinInterval time.Duration
IPv4Peers, IPv6Peers PeerList
Compact bool
}
type ScrapeResponse struct {
Files []*models.Torrent
}
type Writer interface {
WriteError(error) error
WriteAnnounce(*AnnounceResponse) error
WriteScrape(*ScrapeResponse) error
}

40
tracker/scrape.go Normal file
View file

@ -0,0 +1,40 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"github.com/chihaya/chihaya/tracker/models"
)
func (t *Tracker) HandleScrape(scrape *models.Scrape, w Writer) error {
conn, err := t.Pool.Get()
if err != nil {
return err
}
if t.cfg.Private {
_, err = conn.FindUser(scrape.Passkey)
if err == ErrUserDNE {
w.WriteError(err)
return nil
} else if err != nil {
return err
}
}
var torrents []*models.Torrent
for _, infohash := range scrape.Infohashes {
torrent, err := conn.FindTorrent(infohash)
if err == ErrTorrentDNE {
w.WriteError(err)
return nil
} else if err != nil {
return err
}
torrents = append(torrents, torrent)
}
return w.WriteScrape(&ScrapeResponse{torrents})
}

43
tracker/tracker.go Normal file
View file

@ -0,0 +1,43 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package tracker provides a generic interface for manipulating a
// BitTorrent tracker's fast-moving data.
package tracker
import (
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/drivers/backend"
)
type Tracker struct {
cfg *config.Config
Pool Pool
backend backend.Conn
}
func New(cfg *config.Config) (*Tracker, error) {
pool, err := Open(&cfg.Tracker)
if err != nil {
return nil, err
}
bc, err := backend.Open(&cfg.Backend)
if err != nil {
return nil, err
}
go PurgeInactivePeers(
pool,
cfg.PurgeInactiveTorrents,
cfg.Announce.Duration*2,
cfg.Announce.Duration,
)
return &Tracker{
cfg: cfg,
Pool: pool,
backend: bc,
}, nil
}