First step towards removing Tracker Driver.

This feature isn't worth maintaining and if anyone needs to scale beyond
memory on a single box, we can evaluate it then.
This commit is contained in:
Jimmy Zelinskie 2014-08-13 17:45:34 -04:00
parent e3420b4013
commit 1d9b2bc322
13 changed files with 324 additions and 576 deletions

View file

@ -6,10 +6,10 @@ Features include:
- Low resource consumption, and fast, asynchronous request processing - Low resource consumption, and fast, asynchronous request processing
- Full IPv6 support, including handling for dual-stacked peers - Full IPv6 support, including handling for dual-stacked peers
- Generic storage interfaces that are easily adapted to work with any database
- Full compatibility with what exists of the BitTorrent spec - Full compatibility with what exists of the BitTorrent spec
- Extensive metrics for visibility into the tracker and swarm's performance - Extensive metrics for visibility into the tracker and swarm's performance
- Ability to group peers in local subnets to reduce backbone contention - Ability to group peers in local subnets to reduce backbone contention
- Pluggable backend driver that can coordinate with an external database
## Using Chihaya ## Using Chihaya
@ -59,7 +59,7 @@ $ go test -v ./... -bench .
### Customizing Chihaya ### Customizing Chihaya
If you require more than the drivers provided out-of-the-box, you are free to create your own and then produce your own custom Chihaya binary. To create this binary, simply create your own main package, import your custom drivers, then call [`chihaya.Boot`] from main. Chihaya is designed to be extended. If you require more than the drivers provided out-of-the-box, you are free to create your own and then produce your own custom Chihaya binary. To create this binary, simply create your own main package, import your custom drivers, then call [`chihaya.Boot`] from main.
[`chihaya.Boot`]: http://godoc.org/github.com/chihaya/chihaya [`chihaya.Boot`]: http://godoc.org/github.com/chihaya/chihaya
@ -71,34 +71,27 @@ package main
import ( import (
"github.com/chihaya/chihaya" "github.com/chihaya/chihaya"
_ "github.com/yourusername/chihaya-custom-backend" // Import any of your own drivers. // Import any of your own drivers.
_ "github.com/yourusername/chihaya-custom-backend"
) )
func main() { func main() {
chihaya.Boot() // Start Chihaya normally. // Start Chihaya normally.
chihaya.Boot()
} }
``` ```
#### Tracker Drivers #### Implementing a Driver
The [`tracker`] package contains 3 interfaces that are heavily inspired by the standard library's [`database/sql`] package. To write a new driver that will provide a storage mechanism for the fast moving data within the tracker, create your own new Go package that has an implementation of the [`tracker.Driver`], [`tracker.Pool`], and [`tracker.Conn`] interfaces. Within that package, you must also define an [`init()`] that calls [`tracker.Register`] registering your new driver. A great place to start is the documentation and source code of the [`memory`] driver to understand thread safety and basic driver design. The [`backend`] package is meant to provide announce deltas to a slower and more consistent database, such as the one powering a torrent-indexing website. Implementing a backend driver is heavily inspired by the standard library's [`database/sql`] package: simply create a package that implements the [`backend.Driver`] and [`backend.Conn`] interfaces and calls [`backend.Register`] in it's [`init()`]. Please note that [`backend.Conn`] must be thread-safe. A great place to start is to read the [`no-op`] driver which comes out-of-the-box with Chihaya and is meant to be used for public trackers.
#### Backend Drivers
The [`backend`] package is meant to provide announce deltas to a slower and more consistent database, such as the one powering a torrent-indexing website. Implementing a backend driver is very similar to implementing a tracker driver: simply create a package that implements the [`backend.Driver`] and [`backend.Conn`] interfaces and calls [`backend.Register`] in it's [`init()`]. Please note that [`backend.Conn`] must be thread-safe.
[`init()`]: http://golang.org/ref/spec#Program_execution [`init()`]: http://golang.org/ref/spec#Program_execution
[`database/sql`]: http://godoc.org/database/sql [`database/sql`]: http://godoc.org/database/sql
[`tracker`]: http://godoc.org/github.com/chihaya/chihaya/tracker
[`tracker.Register`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Register
[`tracker.Driver`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Driver
[`tracker.Pool`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Pool
[`tracker.Conn`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Conn
[`memory`]: http://godoc.org/github.com/chihaya/chihaya/tracker/memory
[`backend`]: http://godoc.org/github.com/chihaya/chihaya/backend [`backend`]: http://godoc.org/github.com/chihaya/chihaya/backend
[`backend.Register`]: http://godoc.org/github.com/chihaya/chihaya/backend#Register [`backend.Register`]: http://godoc.org/github.com/chihaya/chihaya/backend#Register
[`backend.Driver`]: http://godoc.org/github.com/chihaya/chihaya/backend#Driver [`backend.Driver`]: http://godoc.org/github.com/chihaya/chihaya/backend#Driver
[`backend.Conn`]: http://godoc.org/github.com/chihaya/chihaya/backend#Conn [`backend.Conn`]: http://godoc.org/github.com/chihaya/chihaya/backend#Conn
[`no-op`]: http://godoc.org/github.com/chihaya/chihaya/backend/noop
### Contributing ### Contributing

View file

@ -19,7 +19,6 @@ import (
// See the README for how to import custom drivers. // See the README for how to import custom drivers.
_ "github.com/chihaya/chihaya/backend/noop" _ "github.com/chihaya/chihaya/backend/noop"
_ "github.com/chihaya/chihaya/tracker/memory"
) )
var ( var (

View file

@ -131,10 +131,7 @@ func TestPrivateAnnounce(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = loadPrivateTestData(tkr) loadPrivateTestData(tkr)
if err != nil {
t.Fatal(err)
}
srv, err := createServer(tkr, &cfg) srv, err := createServer(tkr, &cfg)
if err != nil { if err != nil {
@ -343,12 +340,7 @@ func checkAnnounce(p params, expected interface{}, srv *httptest.Server, t *test
return true return true
} }
func loadPrivateTestData(tkr *tracker.Tracker) error { func loadPrivateTestData(tkr *tracker.Tracker) {
conn, err := tkr.Pool.Get()
if err != nil {
return err
}
users := []string{ users := []string{
"vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv1", "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv1",
"vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv2", "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv2",
@ -356,20 +348,13 @@ func loadPrivateTestData(tkr *tracker.Tracker) error {
} }
for i, passkey := range users { for i, passkey := range users {
err = conn.PutUser(&models.User{ tkr.PutUser(&models.User{
ID: uint64(i + 1), ID: uint64(i + 1),
Passkey: passkey, Passkey: passkey,
}) })
if err != nil {
return err
}
} }
err = conn.PutClient("TR2820") tkr.PutClient("TR2820")
if err != nil {
return err
}
torrent := &models.Torrent{ torrent := &models.Torrent{
ID: 1, ID: 1,
@ -378,5 +363,5 @@ func loadPrivateTestData(tkr *tracker.Tracker) error {
Leechers: models.NewPeerMap(false), Leechers: models.NewPeerMap(false),
} }
return conn.PutTorrent(torrent) tkr.PutTorrent(torrent)
} }

View file

@ -17,7 +17,6 @@ import (
"github.com/chihaya/chihaya/tracker" "github.com/chihaya/chihaya/tracker"
_ "github.com/chihaya/chihaya/backend/noop" _ "github.com/chihaya/chihaya/backend/noop"
_ "github.com/chihaya/chihaya/tracker/memory"
) )
type params map[string]string type params map[string]string

View file

@ -71,6 +71,7 @@ func handleTorrentError(err error, w *Writer) (int, error) {
stats.RecordEvent(stats.ClientError) stats.RecordEvent(stats.ClientError)
return http.StatusOK, nil return http.StatusOK, nil
} }
return http.StatusInternalServerError, err return http.StatusInternalServerError, err
} }
@ -99,17 +100,12 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request, p httproute
} }
func (s *Server) getTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { func (s *Server) getTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
infohash, err := url.QueryUnescape(p.ByName("infohash")) infohash, err := url.QueryUnescape(p.ByName("infohash"))
if err != nil { if err != nil {
return http.StatusNotFound, err return http.StatusNotFound, err
} }
torrent, err := conn.FindTorrent(infohash) torrent, err := s.tracker.FindTorrent(infohash)
if err != nil { if err != nil {
return handleError(err) return handleError(err)
} }
@ -131,35 +127,22 @@ func (s *Server) putTorrent(w http.ResponseWriter, r *http.Request, p httprouter
return http.StatusBadRequest, err return http.StatusBadRequest, err
} }
conn, err := s.tracker.Pool.Get() s.tracker.PutTorrent(&torrent)
if err != nil { return http.StatusOK, nil
return http.StatusInternalServerError, err
}
return handleError(conn.PutTorrent(&torrent))
} }
func (s *Server) delTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { func (s *Server) delTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get()
if err != nil {
return http.StatusInternalServerError, err
}
infohash, err := url.QueryUnescape(p.ByName("infohash")) infohash, err := url.QueryUnescape(p.ByName("infohash"))
if err != nil { if err != nil {
return http.StatusNotFound, err return http.StatusNotFound, err
} }
return handleError(conn.DeleteTorrent(infohash)) s.tracker.DeleteTorrent(infohash)
return http.StatusOK, nil
} }
func (s *Server) getUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { func (s *Server) getUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get() user, err := s.tracker.FindUser(p.ByName("passkey"))
if err != nil {
return http.StatusInternalServerError, err
}
user, err := conn.FindUser(p.ByName("passkey"))
if err == models.ErrUserDNE { if err == models.ErrUserDNE {
return http.StatusNotFound, err return http.StatusNotFound, err
} else if err != nil { } else if err != nil {
@ -183,37 +166,21 @@ func (s *Server) putUser(w http.ResponseWriter, r *http.Request, p httprouter.Pa
return http.StatusBadRequest, err return http.StatusBadRequest, err
} }
conn, err := s.tracker.Pool.Get() s.tracker.PutUser(&user)
if err != nil { return http.StatusOK, nil
return http.StatusInternalServerError, err
}
return handleError(conn.PutUser(&user))
} }
func (s *Server) delUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { func (s *Server) delUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get() s.tracker.DeleteUser(p.ByName("passkey"))
if err != nil { return http.StatusOK, nil
return http.StatusInternalServerError, err
}
return handleError(conn.DeleteUser(p.ByName("passkey")))
} }
func (s *Server) putClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { func (s *Server) putClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get() s.tracker.PutClient(p.ByName("clientID"))
if err != nil { return http.StatusOK, nil
return http.StatusInternalServerError, err
}
return handleError(conn.PutClient(p.ByName("clientID")))
} }
func (s *Server) delClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { func (s *Server) delClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) {
conn, err := s.tracker.Pool.Get() s.tracker.DeleteClient(p.ByName("clientID"))
if err != nil { return http.StatusOK, nil
return http.StatusInternalServerError, err
}
return handleError(conn.DeleteClient(p.ByName("clientID")))
} }

View file

@ -11,28 +11,21 @@ import (
// HandleAnnounce encapsulates all of the logic of handling a BitTorrent // HandleAnnounce encapsulates all of the logic of handling a BitTorrent
// client's Announce without being coupled to any transport protocol. // client's Announce without being coupled to any transport protocol.
func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) {
conn, err := tkr.Pool.Get()
if err != nil {
return err
}
defer conn.Close()
if tkr.cfg.ClientWhitelistEnabled { if tkr.cfg.ClientWhitelistEnabled {
if err = conn.FindClient(ann.ClientID()); err != nil { if err = tkr.ClientApproved(ann.ClientID()); err != nil {
return err return err
} }
} }
var user *models.User var user *models.User
if tkr.cfg.PrivateEnabled { if tkr.cfg.PrivateEnabled {
if user, err = conn.FindUser(ann.Passkey); err != nil { if user, err = tkr.FindUser(ann.Passkey); err != nil {
return err return err
} }
} }
torrent, err := conn.FindTorrent(ann.Infohash) torrent, err := tkr.FindTorrent(ann.Infohash)
if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled { if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled {
torrent = &models.Torrent{ torrent = &models.Torrent{
@ -41,10 +34,7 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
Leechers: models.NewPeerMap(false), Leechers: models.NewPeerMap(false),
} }
err = conn.PutTorrent(torrent) tkr.PutTorrent(torrent)
if err != nil {
return err
}
stats.RecordEvent(stats.NewTorrent) stats.RecordEvent(stats.NewTorrent)
} else if err != nil { } else if err != nil {
return err return err
@ -57,12 +47,12 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
delta = newAnnounceDelta(ann, torrent) delta = newAnnounceDelta(ann, torrent)
} }
created, err := updateSwarm(conn, ann) created, err := tkr.updateSwarm(ann)
if err != nil { if err != nil {
return err return err
} }
snatched, err := handleEvent(conn, ann) snatched, err := tkr.handleEvent(ann)
if err != nil { if err != nil {
return err return err
} }
@ -76,7 +66,7 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error {
} else if tkr.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 { } else if tkr.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 {
// Rather than deleting the torrent explicitly, let the tracker driver // Rather than deleting the torrent explicitly, let the tracker driver
// ensure there are no race conditions. // ensure there are no race conditions.
conn.PurgeInactiveTorrent(torrent.Infohash) tkr.PurgeInactiveTorrent(torrent.Infohash)
stats.RecordEvent(stats.DeletedTorrent) stats.RecordEvent(stats.DeletedTorrent)
} }
@ -127,18 +117,18 @@ func newAnnounceDelta(ann *models.Announce, t *models.Torrent) *models.AnnounceD
} }
// updateSwarm handles the changes to a torrent's swarm given an announce. // updateSwarm handles the changes to a torrent's swarm given an announce.
func updateSwarm(c Conn, ann *models.Announce) (created bool, err error) { func (tkr *Tracker) updateSwarm(ann *models.Announce) (created bool, err error) {
var createdv4, createdv6 bool var createdv4, createdv6 bool
c.TouchTorrent(ann.Torrent.Infohash) tkr.TouchTorrent(ann.Torrent.Infohash)
if ann.HasIPv4() { if ann.HasIPv4() {
createdv4, err = updatePeer(c, ann, ann.PeerV4) createdv4, err = tkr.updatePeer(ann, ann.PeerV4)
if err != nil { if err != nil {
return return
} }
} }
if ann.HasIPv6() { if ann.HasIPv6() {
createdv6, err = updatePeer(c, ann, ann.PeerV6) createdv6, err = tkr.updatePeer(ann, ann.PeerV6)
if err != nil { if err != nil {
return return
} }
@ -147,18 +137,18 @@ func updateSwarm(c Conn, ann *models.Announce) (created bool, err error) {
return createdv4 || createdv6, nil return createdv4 || createdv6, nil
} }
func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool, err error) { func (tkr *Tracker) updatePeer(ann *models.Announce, peer *models.Peer) (created bool, err error) {
p, t := ann.Peer, ann.Torrent p, t := ann.Peer, ann.Torrent
switch { switch {
case t.Seeders.Contains(p.Key()): case t.Seeders.Contains(p.Key()):
err = c.PutSeeder(t.Infohash, p) err = tkr.PutSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
case t.Leechers.Contains(p.Key()): case t.Leechers.Contains(p.Key()):
err = c.PutLeecher(t.Infohash, p) err = tkr.PutLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
@ -170,14 +160,14 @@ func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool,
} }
if ann.Left == 0 { if ann.Left == 0 {
err = c.PutSeeder(t.Infohash, p) err = tkr.PutSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
stats.RecordPeerEvent(stats.NewSeed, p.HasIPv6()) stats.RecordPeerEvent(stats.NewSeed, p.HasIPv6())
} else { } else {
err = c.PutLeecher(t.Infohash, p) err = tkr.PutLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
@ -190,24 +180,24 @@ func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool,
// handleEvent checks to see whether an announce has an event and if it does, // handleEvent checks to see whether an announce has an event and if it does,
// properly handles that event. // properly handles that event.
func handleEvent(c Conn, ann *models.Announce) (snatched bool, err error) { func (tkr *Tracker) handleEvent(ann *models.Announce) (snatched bool, err error) {
var snatchedv4, snatchedv6 bool var snatchedv4, snatchedv6 bool
if ann.HasIPv4() { if ann.HasIPv4() {
snatchedv4, err = handlePeerEvent(c, ann, ann.PeerV4) snatchedv4, err = tkr.handlePeerEvent(ann, ann.PeerV4)
if err != nil { if err != nil {
return return
} }
} }
if ann.HasIPv6() { if ann.HasIPv6() {
snatchedv6, err = handlePeerEvent(c, ann, ann.PeerV6) snatchedv6, err = tkr.handlePeerEvent(ann, ann.PeerV6)
if err != nil { if err != nil {
return return
} }
} }
if snatchedv4 || snatchedv6 { if snatchedv4 || snatchedv6 {
err = c.IncrementTorrentSnatches(ann.Torrent.Infohash) err = tkr.IncrementTorrentSnatches(ann.Torrent.Infohash)
if err != nil { if err != nil {
return return
} }
@ -217,7 +207,7 @@ func handleEvent(c Conn, ann *models.Announce) (snatched bool, err error) {
return false, nil return false, nil
} }
func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched bool, err error) { func (tkr *Tracker) handlePeerEvent(ann *models.Announce, p *models.Peer) (snatched bool, err error) {
p, t := ann.Peer, ann.Torrent p, t := ann.Peer, ann.Torrent
switch { switch {
@ -225,14 +215,14 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo
// updateSwarm checks if the peer is active on the torrent, // updateSwarm checks if the peer is active on the torrent,
// so one of these branches must be followed. // so one of these branches must be followed.
if t.Seeders.Contains(p.Key()) { if t.Seeders.Contains(p.Key()) {
err = c.DeleteSeeder(t.Infohash, p) err = tkr.DeleteSeeder(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6()) stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6())
} else if t.Leechers.Contains(p.Key()) { } else if t.Leechers.Contains(p.Key()) {
err = c.DeleteLeecher(t.Infohash, p) err = tkr.DeleteLeecher(t.Infohash, p)
if err != nil { if err != nil {
return return
} }
@ -244,7 +234,7 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo
v6seed := t.Seeders.Contains(models.NewPeerKey(p.ID, true)) v6seed := t.Seeders.Contains(models.NewPeerKey(p.ID, true))
if t.Leechers.Contains(p.Key()) { if t.Leechers.Contains(p.Key()) {
err = leecherFinished(c, t, p) err = tkr.leecherFinished(t, p)
} else { } else {
err = models.ErrBadRequest err = models.ErrBadRequest
} }
@ -257,18 +247,18 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo
case t.Leechers.Contains(p.Key()) && ann.Left == 0: case t.Leechers.Contains(p.Key()) && ann.Left == 0:
// A leecher completed but the event was never received. // A leecher completed but the event was never received.
err = leecherFinished(c, t, p) err = tkr.leecherFinished(t, p)
} }
return return
} }
// leecherFinished moves a peer from the leeching pool to the seeder pool. // leecherFinished moves a peer from the leeching pool to the seeder pool.
func leecherFinished(c Conn, t *models.Torrent, p *models.Peer) error { func (tkr *Tracker) leecherFinished(t *models.Torrent, p *models.Peer) error {
if err := c.DeleteLeecher(t.Infohash, p); err != nil { if err := tkr.DeleteLeecher(t.Infohash, p); err != nil {
return err return err
} }
if err := c.PutSeeder(t.Infohash, p); err != nil { if err := tkr.PutSeeder(t.Infohash, p); err != nil {
return err return err
} }
stats.RecordPeerEvent(stats.Completed, p.HasIPv6()) stats.RecordPeerEvent(stats.Completed, p.HasIPv6())

View file

@ -1,85 +0,0 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"fmt"
"time"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/tracker/models"
)
var drivers = make(map[string]Driver)
// Driver represents an interface to pool of connections to models used for
// the tracker.
type Driver interface {
New(*config.DriverConfig) Pool
}
// Register makes a database driver available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func Register(name string, driver Driver) {
if driver == nil {
panic("tracker: Register driver is nil")
}
if _, dup := drivers[name]; dup {
panic("tracker: Register called twice for driver " + name)
}
drivers[name] = driver
}
// Open creates a pool of data store connections specified by a configuration.
func Open(cfg *config.DriverConfig) (Pool, error) {
driver, ok := drivers[cfg.Name]
if !ok {
return nil, fmt.Errorf(
"unknown driver %q (forgotten import?)",
cfg.Name,
)
}
return driver.New(cfg), nil
}
// Pool represents a thread-safe pool of connections to the data store
// that can be used to safely within concurrent goroutines.
type Pool interface {
Close() error
Get() (Conn, error)
}
// Conn represents a connection to the data store that can be used
// to make reads/writes.
type Conn interface {
Close() error
// Torrent interactions
TouchTorrent(infohash string) error
FindTorrent(infohash string) (*models.Torrent, error)
PutTorrent(t *models.Torrent) error
DeleteTorrent(infohash string) error
IncrementTorrentSnatches(infohash string) error
PutLeecher(infohash string, p *models.Peer) error
DeleteLeecher(infohash string, p *models.Peer) error
PutSeeder(infohash string, p *models.Peer) error
DeleteSeeder(infohash string, p *models.Peer) error
PurgeInactiveTorrent(infohash string) error
PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error
// User interactions
FindUser(passkey string) (*models.User, error)
PutUser(u *models.User) error
DeleteUser(passkey string) error
// Whitelist interactions
FindClient(clientID string) error
PutClient(clientID string) error
DeleteClient(clientID string) error
}

View file

@ -1,244 +0,0 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package memory
import (
"runtime"
"time"
"github.com/chihaya/chihaya/stats"
"github.com/chihaya/chihaya/tracker/models"
)
// Conn implements a connection to a memory-based tracker data store.
type Conn struct {
*Pool
}
func (c *Conn) Close() error {
return nil
}
func (c *Conn) FindUser(passkey string) (*models.User, error) {
c.usersM.RLock()
defer c.usersM.RUnlock()
user, exists := c.users[passkey]
if !exists {
return nil, models.ErrUserDNE
}
return &*user, nil
}
func (c *Conn) FindTorrent(infohash string) (*models.Torrent, error) {
c.torrentsM.RLock()
defer c.torrentsM.RUnlock()
torrent, exists := c.torrents[infohash]
if !exists {
return nil, models.ErrTorrentDNE
}
return &*torrent, nil
}
func (c *Conn) FindClient(peerID string) error {
c.whitelistM.RLock()
defer c.whitelistM.RUnlock()
_, ok := c.whitelist[peerID]
if !ok {
return models.ErrClientUnapproved
}
return nil
}
func (c *Conn) IncrementTorrentSnatches(infohash string) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, exists := c.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
t.Snatches++
return nil
}
func (c *Conn) TouchTorrent(infohash string) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, ok := c.torrents[infohash]
if !ok {
return models.ErrTorrentDNE
}
t.LastAction = time.Now().Unix()
return nil
}
func (c *Conn) DeleteLeecher(infohash string, p *models.Peer) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, ok := c.torrents[infohash]
if !ok {
return models.ErrTorrentDNE
}
t.Leechers.Delete(p.Key())
return nil
}
func (c *Conn) DeleteSeeder(infohash string, p *models.Peer) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, ok := c.torrents[infohash]
if !ok {
return models.ErrTorrentDNE
}
t.Seeders.Delete(p.Key())
return nil
}
func (c *Conn) PutLeecher(infohash string, p *models.Peer) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, ok := c.torrents[infohash]
if !ok {
return models.ErrTorrentDNE
}
t.Leechers.Put(*p)
return nil
}
func (c *Conn) PutSeeder(infohash string, p *models.Peer) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
t, ok := c.torrents[infohash]
if !ok {
return models.ErrTorrentDNE
}
t.Seeders.Put(*p)
return nil
}
func (c *Conn) PutTorrent(t *models.Torrent) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
c.torrents[t.Infohash] = &*t
return nil
}
func (c *Conn) DeleteTorrent(infohash string) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
delete(c.torrents, infohash)
return nil
}
func (c *Conn) PurgeInactiveTorrent(infohash string) error {
c.torrentsM.Lock()
defer c.torrentsM.Unlock()
torrent, exists := c.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
if torrent.PeerCount() == 0 {
delete(c.torrents, infohash)
}
return nil
}
func (c *Conn) PutUser(u *models.User) error {
c.usersM.Lock()
defer c.usersM.Unlock()
c.users[u.Passkey] = &*u
return nil
}
func (c *Conn) DeleteUser(passkey string) error {
c.usersM.Lock()
defer c.usersM.Unlock()
delete(c.users, passkey)
return nil
}
func (c *Conn) PutClient(peerID string) error {
c.whitelistM.Lock()
defer c.whitelistM.Unlock()
c.whitelist[peerID] = true
return nil
}
func (c *Conn) DeleteClient(peerID string) error {
c.whitelistM.Lock()
defer c.whitelistM.Unlock()
delete(c.whitelist, peerID)
return nil
}
func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error {
unixtime := before.Unix()
// Build array of map keys to operate on.
c.torrentsM.RLock()
index := 0
keys := make([]string, len(c.torrents))
for infohash := range c.torrents {
keys[index] = infohash
index++
}
c.torrentsM.RUnlock()
// Process keys.
for _, infohash := range keys {
runtime.Gosched() // Let other goroutines run, since this is low priority.
c.torrentsM.Lock()
torrent := c.torrents[infohash]
if torrent == nil {
continue // Torrent deleted since keys were computed.
}
torrent.Seeders.Purge(unixtime)
torrent.Leechers.Purge(unixtime)
peers := torrent.PeerCount()
c.torrentsM.Unlock()
if purgeEmptyTorrents && peers == 0 {
c.PurgeInactiveTorrent(infohash)
stats.RecordEvent(stats.ReapedTorrent)
}
}
return nil
}

View file

@ -1,27 +0,0 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package memory implements a Chihaya tracker storage driver within memory.
// Stored values will not persist if the tracker is restarted.
package memory
import (
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/tracker"
"github.com/chihaya/chihaya/tracker/models"
)
type driver struct{}
func (d *driver) New(cfg *config.DriverConfig) tracker.Pool {
return &Pool{
users: make(map[string]*models.User),
torrents: make(map[string]*models.Torrent),
whitelist: make(map[string]bool),
}
}
func init() {
tracker.Register("memory", &driver{})
}

View file

@ -1,33 +0,0 @@
// Copyright 2014 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package memory
import (
"sync"
"github.com/chihaya/chihaya/tracker"
"github.com/chihaya/chihaya/tracker/models"
)
type Pool struct {
users map[string]*models.User
usersM sync.RWMutex
torrents map[string]*models.Torrent
torrentsM sync.RWMutex
whitelist map[string]bool
whitelistM sync.RWMutex
}
func (p *Pool) Get() (tracker.Conn, error) {
return &Conn{
Pool: p,
}, nil
}
func (p *Pool) Close() error {
return nil
}

View file

@ -9,22 +9,15 @@ import "github.com/chihaya/chihaya/tracker/models"
// HandleScrape encapsulates all the logic of handling a BitTorrent client's // HandleScrape encapsulates all the logic of handling a BitTorrent client's
// scrape without being coupled to any transport protocol. // scrape without being coupled to any transport protocol.
func (tkr *Tracker) HandleScrape(scrape *models.Scrape, w Writer) (err error) { func (tkr *Tracker) HandleScrape(scrape *models.Scrape, w Writer) (err error) {
conn, err := tkr.Pool.Get()
if err != nil {
return err
}
defer conn.Close()
if tkr.cfg.PrivateEnabled { if tkr.cfg.PrivateEnabled {
if _, err = conn.FindUser(scrape.Passkey); err != nil { if _, err = tkr.FindUser(scrape.Passkey); err != nil {
return err return err
} }
} }
var torrents []*models.Torrent var torrents []*models.Torrent
for _, infohash := range scrape.Infohashes { for _, infohash := range scrape.Infohashes {
torrent, err := conn.FindTorrent(infohash) torrent, err := tkr.FindTorrent(infohash)
if err != nil { if err != nil {
return err return err
} }

248
tracker/storage.go Normal file
View file

@ -0,0 +1,248 @@
package tracker
import (
"runtime"
"sync"
"time"
"github.com/chihaya/chihaya/stats"
"github.com/chihaya/chihaya/tracker/models"
)
type Storage struct {
users map[string]*models.User
usersM sync.RWMutex
torrents map[string]*models.Torrent
torrentsM sync.RWMutex
clients map[string]bool
clientsM sync.RWMutex
}
func NewStorage() *Storage {
return &Storage{
users: make(map[string]*models.User),
torrents: make(map[string]*models.Torrent),
clients: make(map[string]bool),
}
}
func (s *Storage) TouchTorrent(infohash string) error {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
torrent, exists := s.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.LastAction = time.Now().Unix()
return nil
}
func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
s.torrentsM.RLock()
defer s.torrentsM.RUnlock()
torrent, exists := s.torrents[infohash]
if !exists {
return nil, models.ErrTorrentDNE
}
return &*torrent, nil
}
func (s *Storage) PutTorrent(torrent *models.Torrent) {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
s.torrents[torrent.Infohash] = &*torrent
}
func (s *Storage) DeleteTorrent(infohash string) {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
delete(s.torrents, infohash)
}
func (s *Storage) IncrementTorrentSnatches(infohash string) error {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
torrent, exists := s.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Snatches++
return nil
}
func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
torrent, exists := s.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Leechers.Put(*p)
return nil
}
func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
torrent, exists := s.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Leechers.Delete(p.Key())
return nil
}
func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
torrent, exists := s.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Seeders.Put(*p)
return nil
}
func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
torrent, exists := s.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Seeders.Delete(p.Key())
return nil
}
func (s *Storage) PurgeInactiveTorrent(infohash string) error {
s.torrentsM.Lock()
defer s.torrentsM.Unlock()
torrent, exists := s.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
if torrent.PeerCount() == 0 {
delete(s.torrents, infohash)
}
return nil
}
func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error {
unixtime := before.Unix()
// Build a list of keys to process.
s.torrentsM.RLock()
index := 0
keys := make([]string, len(s.torrents))
for infohash := range s.torrents {
keys[index] = infohash
index++
}
s.torrentsM.RUnlock()
// Process the keys while allowing other goroutines to run.
for _, infohash := range keys {
runtime.Gosched()
s.torrentsM.Lock()
torrent := s.torrents[infohash]
if torrent == nil {
// The torrent has already been deleted since keys were computed.
continue
}
torrent.Seeders.Purge(unixtime)
torrent.Leechers.Purge(unixtime)
peers := torrent.PeerCount()
s.torrentsM.Unlock()
if purgeEmptyTorrents && peers == 0 {
s.PurgeInactiveTorrent(infohash)
stats.RecordEvent(stats.ReapedTorrent)
}
}
return nil
}
func (s *Storage) FindUser(passkey string) (*models.User, error) {
s.usersM.RLock()
defer s.usersM.RUnlock()
user, exists := s.users[passkey]
if !exists {
return nil, models.ErrUserDNE
}
return &*user, nil
}
func (s *Storage) PutUser(user *models.User) {
s.usersM.Lock()
defer s.usersM.Unlock()
s.users[user.Passkey] = &*user
}
func (s *Storage) DeleteUser(passkey string) {
s.usersM.Lock()
defer s.usersM.Unlock()
delete(s.users, passkey)
}
func (s *Storage) ClientApproved(peerID string) error {
s.clientsM.RLock()
defer s.clientsM.RUnlock()
_, exists := s.clients[peerID]
if !exists {
return models.ErrClientUnapproved
}
return nil
}
func (s *Storage) PutClient(peerID string) {
s.clientsM.Lock()
defer s.clientsM.Unlock()
s.clients[peerID] = true
}
func (s *Storage) DeleteClient(peerID string) {
s.clientsM.Lock()
defer s.clientsM.Unlock()
delete(s.clients, peerID)
}

View file

@ -20,78 +20,50 @@ import (
// independently of the underlying data transports used. // independently of the underlying data transports used.
type Tracker struct { type Tracker struct {
cfg *config.Config cfg *config.Config
Pool Pool
backend backend.Conn backend backend.Conn
*Storage
} }
// New creates a new Tracker, and opens any necessary connections. // New creates a new Tracker, and opens any necessary connections.
// Maintenance routines are automatically spawned in the background. // Maintenance routines are automatically spawned in the background.
func New(cfg *config.Config) (*Tracker, error) { func New(cfg *config.Config) (*Tracker, error) {
pool, err := Open(&cfg.Tracker)
if err != nil {
return nil, err
}
bc, err := backend.Open(&cfg.Backend) bc, err := backend.Open(&cfg.Backend)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go purgeInactivePeers( tkr := &Tracker{
pool, cfg: cfg,
backend: bc,
Storage: NewStorage(),
}
go tkr.purgeInactivePeers(
cfg.PurgeInactiveTorrents, cfg.PurgeInactiveTorrents,
cfg.Announce.Duration*2, cfg.Announce.Duration*2,
cfg.Announce.Duration, cfg.Announce.Duration,
) )
tkr := &Tracker{
cfg: cfg,
Pool: pool,
backend: bc,
}
if cfg.ClientWhitelistEnabled { if cfg.ClientWhitelistEnabled {
err = tkr.LoadApprovedClients(cfg.ClientWhitelist) tkr.LoadApprovedClients(cfg.ClientWhitelist)
if err != nil {
return nil, err
}
} }
return tkr, nil return tkr, nil
} }
// Close gracefully shutdowns a Tracker by closing any database connections. // Close gracefully shutdowns a Tracker by closing any database connections.
func (tkr *Tracker) Close() (err error) { func (tkr *Tracker) Close() error {
err = tkr.Pool.Close() return tkr.backend.Close()
if err != nil {
return
}
err = tkr.backend.Close()
if err != nil {
return
}
return
} }
// LoadApprovedClients loads a list of client IDs into the tracker's storage. // LoadApprovedClients loads a list of client IDs into the tracker's storage.
func (tkr *Tracker) LoadApprovedClients(clients []string) error { func (tkr *Tracker) LoadApprovedClients(clients []string) {
conn, err := tkr.Pool.Get()
if err != nil {
return err
}
for _, client := range clients { for _, client := range clients {
err = conn.PutClient(client) tkr.PutClient(client)
if err != nil {
return err
} }
} }
return nil
}
// Writer serializes a tracker's responses, and is implemented for each // Writer serializes a tracker's responses, and is implemented for each
// response transport used by the tracker. // response transport used by the tracker.
// //
@ -112,23 +84,14 @@ type Writer interface {
// //
// The default interval is equal to the announce interval, since this is a // The default interval is equal to the announce interval, since this is a
// relatively expensive operation. // relatively expensive operation.
func purgeInactivePeers(p Pool, purgeEmptyTorrents bool, threshold, interval time.Duration) { func (tkr *Tracker) purgeInactivePeers(purgeEmptyTorrents bool, threshold, interval time.Duration) {
for _ = range time.NewTicker(interval).C { for _ = range time.NewTicker(interval).C {
before := time.Now().Add(-threshold) before := time.Now().Add(-threshold)
glog.V(0).Infof("Purging peers with no announces since %s", before) glog.V(0).Infof("Purging peers with no announces since %s", before)
conn, err := p.Get() err := tkr.PurgeInactivePeers(purgeEmptyTorrents, before)
if err != nil {
glog.Error("Unable to get connection for a routine")
continue
}
err = conn.PurgeInactivePeers(purgeEmptyTorrents, before)
if err != nil { if err != nil {
glog.Errorf("Error purging torrents: %s", err) glog.Errorf("Error purging torrents: %s", err)
} }
conn.Close()
} }
} }