From 1d9b2bc32244d762f500768a8829419afca4d4eb Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 13 Aug 2014 17:45:34 -0400 Subject: [PATCH] First step towards removing Tracker Driver. This feature isn't worth maintaining and if anyone needs to scale beyond memory on a single box, we can evaluate it then. --- README.md | 25 ++-- chihaya.go | 1 - http/announce_test.go | 25 +--- http/http_test.go | 1 - http/routes.go | 63 +++------- tracker/announce.go | 68 +++++------ tracker/conn.go | 85 -------------- tracker/memory/conn.go | 244 -------------------------------------- tracker/memory/driver.go | 27 ----- tracker/memory/pool.go | 33 ------ tracker/scrape.go | 11 +- tracker/storage.go | 248 +++++++++++++++++++++++++++++++++++++++ tracker/tracker.go | 69 +++-------- 13 files changed, 324 insertions(+), 576 deletions(-) delete mode 100644 tracker/conn.go delete mode 100644 tracker/memory/conn.go delete mode 100644 tracker/memory/driver.go delete mode 100644 tracker/memory/pool.go create mode 100644 tracker/storage.go diff --git a/README.md b/README.md index a97da2e..a19f098 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,10 @@ Features include: - Low resource consumption, and fast, asynchronous request processing - Full IPv6 support, including handling for dual-stacked peers -- Generic storage interfaces that are easily adapted to work with any database - Full compatibility with what exists of the BitTorrent spec - Extensive metrics for visibility into the tracker and swarm's performance - Ability to group peers in local subnets to reduce backbone contention +- Pluggable backend driver that can coordinate with an external database ## Using Chihaya @@ -59,7 +59,7 @@ $ go test -v ./... -bench . ### Customizing Chihaya -If you require more than the drivers provided out-of-the-box, you are free to create your own and then produce your own custom Chihaya binary. To create this binary, simply create your own main package, import your custom drivers, then call [`chihaya.Boot`] from main. +Chihaya is designed to be extended. If you require more than the drivers provided out-of-the-box, you are free to create your own and then produce your own custom Chihaya binary. To create this binary, simply create your own main package, import your custom drivers, then call [`chihaya.Boot`] from main. [`chihaya.Boot`]: http://godoc.org/github.com/chihaya/chihaya @@ -71,34 +71,27 @@ package main import ( "github.com/chihaya/chihaya" - _ "github.com/yourusername/chihaya-custom-backend" // Import any of your own drivers. + // Import any of your own drivers. + _ "github.com/yourusername/chihaya-custom-backend" ) func main() { - chihaya.Boot() // Start Chihaya normally. + // Start Chihaya normally. + chihaya.Boot() } ``` -#### Tracker Drivers +#### Implementing a Driver -The [`tracker`] package contains 3 interfaces that are heavily inspired by the standard library's [`database/sql`] package. To write a new driver that will provide a storage mechanism for the fast moving data within the tracker, create your own new Go package that has an implementation of the [`tracker.Driver`], [`tracker.Pool`], and [`tracker.Conn`] interfaces. Within that package, you must also define an [`init()`] that calls [`tracker.Register`] registering your new driver. A great place to start is the documentation and source code of the [`memory`] driver to understand thread safety and basic driver design. - -#### Backend Drivers - -The [`backend`] package is meant to provide announce deltas to a slower and more consistent database, such as the one powering a torrent-indexing website. Implementing a backend driver is very similar to implementing a tracker driver: simply create a package that implements the [`backend.Driver`] and [`backend.Conn`] interfaces and calls [`backend.Register`] in it's [`init()`]. Please note that [`backend.Conn`] must be thread-safe. +The [`backend`] package is meant to provide announce deltas to a slower and more consistent database, such as the one powering a torrent-indexing website. Implementing a backend driver is heavily inspired by the standard library's [`database/sql`] package: simply create a package that implements the [`backend.Driver`] and [`backend.Conn`] interfaces and calls [`backend.Register`] in it's [`init()`]. Please note that [`backend.Conn`] must be thread-safe. A great place to start is to read the [`no-op`] driver which comes out-of-the-box with Chihaya and is meant to be used for public trackers. [`init()`]: http://golang.org/ref/spec#Program_execution [`database/sql`]: http://godoc.org/database/sql -[`tracker`]: http://godoc.org/github.com/chihaya/chihaya/tracker -[`tracker.Register`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Register -[`tracker.Driver`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Driver -[`tracker.Pool`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Pool -[`tracker.Conn`]: http://godoc.org/github.com/chihaya/chihaya/tracker#Conn -[`memory`]: http://godoc.org/github.com/chihaya/chihaya/tracker/memory [`backend`]: http://godoc.org/github.com/chihaya/chihaya/backend [`backend.Register`]: http://godoc.org/github.com/chihaya/chihaya/backend#Register [`backend.Driver`]: http://godoc.org/github.com/chihaya/chihaya/backend#Driver [`backend.Conn`]: http://godoc.org/github.com/chihaya/chihaya/backend#Conn +[`no-op`]: http://godoc.org/github.com/chihaya/chihaya/backend/noop ### Contributing diff --git a/chihaya.go b/chihaya.go index 2d5dd2a..f898612 100644 --- a/chihaya.go +++ b/chihaya.go @@ -19,7 +19,6 @@ import ( // See the README for how to import custom drivers. _ "github.com/chihaya/chihaya/backend/noop" - _ "github.com/chihaya/chihaya/tracker/memory" ) var ( diff --git a/http/announce_test.go b/http/announce_test.go index 3c8ea57..ce2bf48 100644 --- a/http/announce_test.go +++ b/http/announce_test.go @@ -131,10 +131,7 @@ func TestPrivateAnnounce(t *testing.T) { t.Fatal(err) } - err = loadPrivateTestData(tkr) - if err != nil { - t.Fatal(err) - } + loadPrivateTestData(tkr) srv, err := createServer(tkr, &cfg) if err != nil { @@ -343,12 +340,7 @@ func checkAnnounce(p params, expected interface{}, srv *httptest.Server, t *test return true } -func loadPrivateTestData(tkr *tracker.Tracker) error { - conn, err := tkr.Pool.Get() - if err != nil { - return err - } - +func loadPrivateTestData(tkr *tracker.Tracker) { users := []string{ "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv1", "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv2", @@ -356,20 +348,13 @@ func loadPrivateTestData(tkr *tracker.Tracker) error { } for i, passkey := range users { - err = conn.PutUser(&models.User{ + tkr.PutUser(&models.User{ ID: uint64(i + 1), Passkey: passkey, }) - - if err != nil { - return err - } } - err = conn.PutClient("TR2820") - if err != nil { - return err - } + tkr.PutClient("TR2820") torrent := &models.Torrent{ ID: 1, @@ -378,5 +363,5 @@ func loadPrivateTestData(tkr *tracker.Tracker) error { Leechers: models.NewPeerMap(false), } - return conn.PutTorrent(torrent) + tkr.PutTorrent(torrent) } diff --git a/http/http_test.go b/http/http_test.go index 93ad1fb..3bd8f9d 100644 --- a/http/http_test.go +++ b/http/http_test.go @@ -17,7 +17,6 @@ import ( "github.com/chihaya/chihaya/tracker" _ "github.com/chihaya/chihaya/backend/noop" - _ "github.com/chihaya/chihaya/tracker/memory" ) type params map[string]string diff --git a/http/routes.go b/http/routes.go index 1aa8ce1..dbc0eeb 100644 --- a/http/routes.go +++ b/http/routes.go @@ -71,6 +71,7 @@ func handleTorrentError(err error, w *Writer) (int, error) { stats.RecordEvent(stats.ClientError) return http.StatusOK, nil } + return http.StatusInternalServerError, err } @@ -99,17 +100,12 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request, p httproute } func (s *Server) getTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - infohash, err := url.QueryUnescape(p.ByName("infohash")) if err != nil { return http.StatusNotFound, err } - torrent, err := conn.FindTorrent(infohash) + torrent, err := s.tracker.FindTorrent(infohash) if err != nil { return handleError(err) } @@ -131,35 +127,22 @@ func (s *Server) putTorrent(w http.ResponseWriter, r *http.Request, p httprouter return http.StatusBadRequest, err } - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - return handleError(conn.PutTorrent(&torrent)) + s.tracker.PutTorrent(&torrent) + return http.StatusOK, nil } func (s *Server) delTorrent(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - infohash, err := url.QueryUnescape(p.ByName("infohash")) if err != nil { return http.StatusNotFound, err } - return handleError(conn.DeleteTorrent(infohash)) + s.tracker.DeleteTorrent(infohash) + return http.StatusOK, nil } func (s *Server) getUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - user, err := conn.FindUser(p.ByName("passkey")) + user, err := s.tracker.FindUser(p.ByName("passkey")) if err == models.ErrUserDNE { return http.StatusNotFound, err } else if err != nil { @@ -183,37 +166,21 @@ func (s *Server) putUser(w http.ResponseWriter, r *http.Request, p httprouter.Pa return http.StatusBadRequest, err } - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - return handleError(conn.PutUser(&user)) + s.tracker.PutUser(&user) + return http.StatusOK, nil } func (s *Server) delUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - return handleError(conn.DeleteUser(p.ByName("passkey"))) + s.tracker.DeleteUser(p.ByName("passkey")) + return http.StatusOK, nil } func (s *Server) putClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - return handleError(conn.PutClient(p.ByName("clientID"))) + s.tracker.PutClient(p.ByName("clientID")) + return http.StatusOK, nil } func (s *Server) delClient(w http.ResponseWriter, r *http.Request, p httprouter.Params) (int, error) { - conn, err := s.tracker.Pool.Get() - if err != nil { - return http.StatusInternalServerError, err - } - - return handleError(conn.DeleteClient(p.ByName("clientID"))) + s.tracker.DeleteClient(p.ByName("clientID")) + return http.StatusOK, nil } diff --git a/tracker/announce.go b/tracker/announce.go index 80d8fc2..ffc74a3 100644 --- a/tracker/announce.go +++ b/tracker/announce.go @@ -11,28 +11,21 @@ import ( // HandleAnnounce encapsulates all of the logic of handling a BitTorrent // client's Announce without being coupled to any transport protocol. -func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { - conn, err := tkr.Pool.Get() - if err != nil { - return err - } - - defer conn.Close() - +func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) { if tkr.cfg.ClientWhitelistEnabled { - if err = conn.FindClient(ann.ClientID()); err != nil { + if err = tkr.ClientApproved(ann.ClientID()); err != nil { return err } } var user *models.User if tkr.cfg.PrivateEnabled { - if user, err = conn.FindUser(ann.Passkey); err != nil { + if user, err = tkr.FindUser(ann.Passkey); err != nil { return err } } - torrent, err := conn.FindTorrent(ann.Infohash) + torrent, err := tkr.FindTorrent(ann.Infohash) if err == models.ErrTorrentDNE && !tkr.cfg.PrivateEnabled { torrent = &models.Torrent{ @@ -41,10 +34,7 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { Leechers: models.NewPeerMap(false), } - err = conn.PutTorrent(torrent) - if err != nil { - return err - } + tkr.PutTorrent(torrent) stats.RecordEvent(stats.NewTorrent) } else if err != nil { return err @@ -57,12 +47,12 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { delta = newAnnounceDelta(ann, torrent) } - created, err := updateSwarm(conn, ann) + created, err := tkr.updateSwarm(ann) if err != nil { return err } - snatched, err := handleEvent(conn, ann) + snatched, err := tkr.handleEvent(ann) if err != nil { return err } @@ -76,7 +66,7 @@ func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) error { } else if tkr.cfg.PurgeInactiveTorrents && torrent.PeerCount() == 0 { // Rather than deleting the torrent explicitly, let the tracker driver // ensure there are no race conditions. - conn.PurgeInactiveTorrent(torrent.Infohash) + tkr.PurgeInactiveTorrent(torrent.Infohash) stats.RecordEvent(stats.DeletedTorrent) } @@ -127,18 +117,18 @@ func newAnnounceDelta(ann *models.Announce, t *models.Torrent) *models.AnnounceD } // updateSwarm handles the changes to a torrent's swarm given an announce. -func updateSwarm(c Conn, ann *models.Announce) (created bool, err error) { +func (tkr *Tracker) updateSwarm(ann *models.Announce) (created bool, err error) { var createdv4, createdv6 bool - c.TouchTorrent(ann.Torrent.Infohash) + tkr.TouchTorrent(ann.Torrent.Infohash) if ann.HasIPv4() { - createdv4, err = updatePeer(c, ann, ann.PeerV4) + createdv4, err = tkr.updatePeer(ann, ann.PeerV4) if err != nil { return } } if ann.HasIPv6() { - createdv6, err = updatePeer(c, ann, ann.PeerV6) + createdv6, err = tkr.updatePeer(ann, ann.PeerV6) if err != nil { return } @@ -147,18 +137,18 @@ func updateSwarm(c Conn, ann *models.Announce) (created bool, err error) { return createdv4 || createdv6, nil } -func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool, err error) { +func (tkr *Tracker) updatePeer(ann *models.Announce, peer *models.Peer) (created bool, err error) { p, t := ann.Peer, ann.Torrent switch { case t.Seeders.Contains(p.Key()): - err = c.PutSeeder(t.Infohash, p) + err = tkr.PutSeeder(t.Infohash, p) if err != nil { return } case t.Leechers.Contains(p.Key()): - err = c.PutLeecher(t.Infohash, p) + err = tkr.PutLeecher(t.Infohash, p) if err != nil { return } @@ -170,14 +160,14 @@ func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool, } if ann.Left == 0 { - err = c.PutSeeder(t.Infohash, p) + err = tkr.PutSeeder(t.Infohash, p) if err != nil { return } stats.RecordPeerEvent(stats.NewSeed, p.HasIPv6()) } else { - err = c.PutLeecher(t.Infohash, p) + err = tkr.PutLeecher(t.Infohash, p) if err != nil { return } @@ -190,24 +180,24 @@ func updatePeer(c Conn, ann *models.Announce, peer *models.Peer) (created bool, // handleEvent checks to see whether an announce has an event and if it does, // properly handles that event. -func handleEvent(c Conn, ann *models.Announce) (snatched bool, err error) { +func (tkr *Tracker) handleEvent(ann *models.Announce) (snatched bool, err error) { var snatchedv4, snatchedv6 bool if ann.HasIPv4() { - snatchedv4, err = handlePeerEvent(c, ann, ann.PeerV4) + snatchedv4, err = tkr.handlePeerEvent(ann, ann.PeerV4) if err != nil { return } } if ann.HasIPv6() { - snatchedv6, err = handlePeerEvent(c, ann, ann.PeerV6) + snatchedv6, err = tkr.handlePeerEvent(ann, ann.PeerV6) if err != nil { return } } if snatchedv4 || snatchedv6 { - err = c.IncrementTorrentSnatches(ann.Torrent.Infohash) + err = tkr.IncrementTorrentSnatches(ann.Torrent.Infohash) if err != nil { return } @@ -217,7 +207,7 @@ func handleEvent(c Conn, ann *models.Announce) (snatched bool, err error) { return false, nil } -func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched bool, err error) { +func (tkr *Tracker) handlePeerEvent(ann *models.Announce, p *models.Peer) (snatched bool, err error) { p, t := ann.Peer, ann.Torrent switch { @@ -225,14 +215,14 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo // updateSwarm checks if the peer is active on the torrent, // so one of these branches must be followed. if t.Seeders.Contains(p.Key()) { - err = c.DeleteSeeder(t.Infohash, p) + err = tkr.DeleteSeeder(t.Infohash, p) if err != nil { return } stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6()) } else if t.Leechers.Contains(p.Key()) { - err = c.DeleteLeecher(t.Infohash, p) + err = tkr.DeleteLeecher(t.Infohash, p) if err != nil { return } @@ -244,7 +234,7 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo v6seed := t.Seeders.Contains(models.NewPeerKey(p.ID, true)) if t.Leechers.Contains(p.Key()) { - err = leecherFinished(c, t, p) + err = tkr.leecherFinished(t, p) } else { err = models.ErrBadRequest } @@ -257,18 +247,18 @@ func handlePeerEvent(c Conn, ann *models.Announce, p *models.Peer) (snatched boo case t.Leechers.Contains(p.Key()) && ann.Left == 0: // A leecher completed but the event was never received. - err = leecherFinished(c, t, p) + err = tkr.leecherFinished(t, p) } return } // leecherFinished moves a peer from the leeching pool to the seeder pool. -func leecherFinished(c Conn, t *models.Torrent, p *models.Peer) error { - if err := c.DeleteLeecher(t.Infohash, p); err != nil { +func (tkr *Tracker) leecherFinished(t *models.Torrent, p *models.Peer) error { + if err := tkr.DeleteLeecher(t.Infohash, p); err != nil { return err } - if err := c.PutSeeder(t.Infohash, p); err != nil { + if err := tkr.PutSeeder(t.Infohash, p); err != nil { return err } stats.RecordPeerEvent(stats.Completed, p.HasIPv6()) diff --git a/tracker/conn.go b/tracker/conn.go deleted file mode 100644 index a1c7017..0000000 --- a/tracker/conn.go +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2014 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. - -package tracker - -import ( - "fmt" - "time" - - "github.com/chihaya/chihaya/config" - "github.com/chihaya/chihaya/tracker/models" -) - -var drivers = make(map[string]Driver) - -// Driver represents an interface to pool of connections to models used for -// the tracker. -type Driver interface { - New(*config.DriverConfig) Pool -} - -// Register makes a database driver available by the provided name. -// If Register is called twice with the same name or if driver is nil, -// it panics. -func Register(name string, driver Driver) { - if driver == nil { - panic("tracker: Register driver is nil") - } - if _, dup := drivers[name]; dup { - panic("tracker: Register called twice for driver " + name) - } - drivers[name] = driver -} - -// Open creates a pool of data store connections specified by a configuration. -func Open(cfg *config.DriverConfig) (Pool, error) { - driver, ok := drivers[cfg.Name] - if !ok { - return nil, fmt.Errorf( - "unknown driver %q (forgotten import?)", - cfg.Name, - ) - } - return driver.New(cfg), nil -} - -// Pool represents a thread-safe pool of connections to the data store -// that can be used to safely within concurrent goroutines. -type Pool interface { - Close() error - Get() (Conn, error) -} - -// Conn represents a connection to the data store that can be used -// to make reads/writes. -type Conn interface { - Close() error - - // Torrent interactions - TouchTorrent(infohash string) error - FindTorrent(infohash string) (*models.Torrent, error) - PutTorrent(t *models.Torrent) error - DeleteTorrent(infohash string) error - IncrementTorrentSnatches(infohash string) error - - PutLeecher(infohash string, p *models.Peer) error - DeleteLeecher(infohash string, p *models.Peer) error - - PutSeeder(infohash string, p *models.Peer) error - DeleteSeeder(infohash string, p *models.Peer) error - - PurgeInactiveTorrent(infohash string) error - PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error - - // User interactions - FindUser(passkey string) (*models.User, error) - PutUser(u *models.User) error - DeleteUser(passkey string) error - - // Whitelist interactions - FindClient(clientID string) error - PutClient(clientID string) error - DeleteClient(clientID string) error -} diff --git a/tracker/memory/conn.go b/tracker/memory/conn.go deleted file mode 100644 index 229c87a..0000000 --- a/tracker/memory/conn.go +++ /dev/null @@ -1,244 +0,0 @@ -// Copyright 2014 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. - -package memory - -import ( - "runtime" - "time" - - "github.com/chihaya/chihaya/stats" - "github.com/chihaya/chihaya/tracker/models" -) - -// Conn implements a connection to a memory-based tracker data store. -type Conn struct { - *Pool -} - -func (c *Conn) Close() error { - return nil -} - -func (c *Conn) FindUser(passkey string) (*models.User, error) { - c.usersM.RLock() - defer c.usersM.RUnlock() - - user, exists := c.users[passkey] - if !exists { - return nil, models.ErrUserDNE - } - return &*user, nil -} - -func (c *Conn) FindTorrent(infohash string) (*models.Torrent, error) { - c.torrentsM.RLock() - defer c.torrentsM.RUnlock() - - torrent, exists := c.torrents[infohash] - if !exists { - return nil, models.ErrTorrentDNE - } - return &*torrent, nil -} - -func (c *Conn) FindClient(peerID string) error { - c.whitelistM.RLock() - defer c.whitelistM.RUnlock() - - _, ok := c.whitelist[peerID] - if !ok { - return models.ErrClientUnapproved - } - return nil -} - -func (c *Conn) IncrementTorrentSnatches(infohash string) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, exists := c.torrents[infohash] - if !exists { - return models.ErrTorrentDNE - } - t.Snatches++ - - return nil -} - -func (c *Conn) TouchTorrent(infohash string) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, ok := c.torrents[infohash] - if !ok { - return models.ErrTorrentDNE - } - t.LastAction = time.Now().Unix() - - return nil -} - -func (c *Conn) DeleteLeecher(infohash string, p *models.Peer) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, ok := c.torrents[infohash] - if !ok { - return models.ErrTorrentDNE - } - t.Leechers.Delete(p.Key()) - - return nil -} - -func (c *Conn) DeleteSeeder(infohash string, p *models.Peer) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, ok := c.torrents[infohash] - if !ok { - return models.ErrTorrentDNE - } - t.Seeders.Delete(p.Key()) - - return nil -} - -func (c *Conn) PutLeecher(infohash string, p *models.Peer) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, ok := c.torrents[infohash] - if !ok { - return models.ErrTorrentDNE - } - t.Leechers.Put(*p) - - return nil -} - -func (c *Conn) PutSeeder(infohash string, p *models.Peer) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - t, ok := c.torrents[infohash] - if !ok { - return models.ErrTorrentDNE - } - t.Seeders.Put(*p) - - return nil -} - -func (c *Conn) PutTorrent(t *models.Torrent) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - c.torrents[t.Infohash] = &*t - - return nil -} - -func (c *Conn) DeleteTorrent(infohash string) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - delete(c.torrents, infohash) - - return nil -} - -func (c *Conn) PurgeInactiveTorrent(infohash string) error { - c.torrentsM.Lock() - defer c.torrentsM.Unlock() - - torrent, exists := c.torrents[infohash] - if !exists { - return models.ErrTorrentDNE - } - - if torrent.PeerCount() == 0 { - delete(c.torrents, infohash) - } - - return nil -} - -func (c *Conn) PutUser(u *models.User) error { - c.usersM.Lock() - defer c.usersM.Unlock() - - c.users[u.Passkey] = &*u - - return nil -} - -func (c *Conn) DeleteUser(passkey string) error { - c.usersM.Lock() - defer c.usersM.Unlock() - - delete(c.users, passkey) - - return nil -} - -func (c *Conn) PutClient(peerID string) error { - c.whitelistM.Lock() - defer c.whitelistM.Unlock() - - c.whitelist[peerID] = true - - return nil -} - -func (c *Conn) DeleteClient(peerID string) error { - c.whitelistM.Lock() - defer c.whitelistM.Unlock() - - delete(c.whitelist, peerID) - - return nil -} - -func (c *Conn) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error { - unixtime := before.Unix() - - // Build array of map keys to operate on. - c.torrentsM.RLock() - index := 0 - keys := make([]string, len(c.torrents)) - - for infohash := range c.torrents { - keys[index] = infohash - index++ - } - - c.torrentsM.RUnlock() - - // Process keys. - for _, infohash := range keys { - runtime.Gosched() // Let other goroutines run, since this is low priority. - - c.torrentsM.Lock() - torrent := c.torrents[infohash] - - if torrent == nil { - continue // Torrent deleted since keys were computed. - } - - torrent.Seeders.Purge(unixtime) - torrent.Leechers.Purge(unixtime) - - peers := torrent.PeerCount() - c.torrentsM.Unlock() - - if purgeEmptyTorrents && peers == 0 { - c.PurgeInactiveTorrent(infohash) - stats.RecordEvent(stats.ReapedTorrent) - } - } - - return nil -} diff --git a/tracker/memory/driver.go b/tracker/memory/driver.go deleted file mode 100644 index 60454a9..0000000 --- a/tracker/memory/driver.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2014 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. - -// Package memory implements a Chihaya tracker storage driver within memory. -// Stored values will not persist if the tracker is restarted. -package memory - -import ( - "github.com/chihaya/chihaya/config" - "github.com/chihaya/chihaya/tracker" - "github.com/chihaya/chihaya/tracker/models" -) - -type driver struct{} - -func (d *driver) New(cfg *config.DriverConfig) tracker.Pool { - return &Pool{ - users: make(map[string]*models.User), - torrents: make(map[string]*models.Torrent), - whitelist: make(map[string]bool), - } -} - -func init() { - tracker.Register("memory", &driver{}) -} diff --git a/tracker/memory/pool.go b/tracker/memory/pool.go deleted file mode 100644 index 1fb1c3a..0000000 --- a/tracker/memory/pool.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2014 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. - -package memory - -import ( - "sync" - - "github.com/chihaya/chihaya/tracker" - "github.com/chihaya/chihaya/tracker/models" -) - -type Pool struct { - users map[string]*models.User - usersM sync.RWMutex - - torrents map[string]*models.Torrent - torrentsM sync.RWMutex - - whitelist map[string]bool - whitelistM sync.RWMutex -} - -func (p *Pool) Get() (tracker.Conn, error) { - return &Conn{ - Pool: p, - }, nil -} - -func (p *Pool) Close() error { - return nil -} diff --git a/tracker/scrape.go b/tracker/scrape.go index e1980bb..4fee8c2 100644 --- a/tracker/scrape.go +++ b/tracker/scrape.go @@ -9,22 +9,15 @@ import "github.com/chihaya/chihaya/tracker/models" // HandleScrape encapsulates all the logic of handling a BitTorrent client's // scrape without being coupled to any transport protocol. func (tkr *Tracker) HandleScrape(scrape *models.Scrape, w Writer) (err error) { - conn, err := tkr.Pool.Get() - if err != nil { - return err - } - - defer conn.Close() - if tkr.cfg.PrivateEnabled { - if _, err = conn.FindUser(scrape.Passkey); err != nil { + if _, err = tkr.FindUser(scrape.Passkey); err != nil { return err } } var torrents []*models.Torrent for _, infohash := range scrape.Infohashes { - torrent, err := conn.FindTorrent(infohash) + torrent, err := tkr.FindTorrent(infohash) if err != nil { return err } diff --git a/tracker/storage.go b/tracker/storage.go new file mode 100644 index 0000000..fe38424 --- /dev/null +++ b/tracker/storage.go @@ -0,0 +1,248 @@ +package tracker + +import ( + "runtime" + "sync" + "time" + + "github.com/chihaya/chihaya/stats" + "github.com/chihaya/chihaya/tracker/models" +) + +type Storage struct { + users map[string]*models.User + usersM sync.RWMutex + + torrents map[string]*models.Torrent + torrentsM sync.RWMutex + + clients map[string]bool + clientsM sync.RWMutex +} + +func NewStorage() *Storage { + return &Storage{ + users: make(map[string]*models.User), + torrents: make(map[string]*models.Torrent), + clients: make(map[string]bool), + } +} + +func (s *Storage) TouchTorrent(infohash string) error { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return models.ErrTorrentDNE + } + + torrent.LastAction = time.Now().Unix() + + return nil +} + +func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) { + s.torrentsM.RLock() + defer s.torrentsM.RUnlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return nil, models.ErrTorrentDNE + } + + return &*torrent, nil +} + +func (s *Storage) PutTorrent(torrent *models.Torrent) { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + s.torrents[torrent.Infohash] = &*torrent +} + +func (s *Storage) DeleteTorrent(infohash string) { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + delete(s.torrents, infohash) +} + +func (s *Storage) IncrementTorrentSnatches(infohash string) error { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return models.ErrTorrentDNE + } + + torrent.Snatches++ + + return nil +} + +func (s *Storage) PutLeecher(infohash string, p *models.Peer) error { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return models.ErrTorrentDNE + } + + torrent.Leechers.Put(*p) + + return nil +} + +func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return models.ErrTorrentDNE + } + + torrent.Leechers.Delete(p.Key()) + + return nil +} + +func (s *Storage) PutSeeder(infohash string, p *models.Peer) error { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return models.ErrTorrentDNE + } + + torrent.Seeders.Put(*p) + + return nil +} + +func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return models.ErrTorrentDNE + } + + torrent.Seeders.Delete(p.Key()) + + return nil +} + +func (s *Storage) PurgeInactiveTorrent(infohash string) error { + s.torrentsM.Lock() + defer s.torrentsM.Unlock() + + torrent, exists := s.torrents[infohash] + if !exists { + return models.ErrTorrentDNE + } + + if torrent.PeerCount() == 0 { + delete(s.torrents, infohash) + } + + return nil +} + +func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error { + unixtime := before.Unix() + + // Build a list of keys to process. + s.torrentsM.RLock() + index := 0 + keys := make([]string, len(s.torrents)) + + for infohash := range s.torrents { + keys[index] = infohash + index++ + } + s.torrentsM.RUnlock() + + // Process the keys while allowing other goroutines to run. + for _, infohash := range keys { + runtime.Gosched() + + s.torrentsM.Lock() + torrent := s.torrents[infohash] + + if torrent == nil { + // The torrent has already been deleted since keys were computed. + continue + } + + torrent.Seeders.Purge(unixtime) + torrent.Leechers.Purge(unixtime) + + peers := torrent.PeerCount() + s.torrentsM.Unlock() + + if purgeEmptyTorrents && peers == 0 { + s.PurgeInactiveTorrent(infohash) + stats.RecordEvent(stats.ReapedTorrent) + } + } + + return nil +} + +func (s *Storage) FindUser(passkey string) (*models.User, error) { + s.usersM.RLock() + defer s.usersM.RUnlock() + + user, exists := s.users[passkey] + if !exists { + return nil, models.ErrUserDNE + } + + return &*user, nil +} + +func (s *Storage) PutUser(user *models.User) { + s.usersM.Lock() + defer s.usersM.Unlock() + + s.users[user.Passkey] = &*user +} + +func (s *Storage) DeleteUser(passkey string) { + s.usersM.Lock() + defer s.usersM.Unlock() + + delete(s.users, passkey) +} + +func (s *Storage) ClientApproved(peerID string) error { + s.clientsM.RLock() + defer s.clientsM.RUnlock() + + _, exists := s.clients[peerID] + if !exists { + return models.ErrClientUnapproved + } + + return nil +} + +func (s *Storage) PutClient(peerID string) { + s.clientsM.Lock() + defer s.clientsM.Unlock() + + s.clients[peerID] = true +} + +func (s *Storage) DeleteClient(peerID string) { + s.clientsM.Lock() + defer s.clientsM.Unlock() + + delete(s.clients, peerID) +} diff --git a/tracker/tracker.go b/tracker/tracker.go index 17a346e..4d2ae71 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -20,76 +20,48 @@ import ( // independently of the underlying data transports used. type Tracker struct { cfg *config.Config - Pool Pool backend backend.Conn + + *Storage } // New creates a new Tracker, and opens any necessary connections. // Maintenance routines are automatically spawned in the background. func New(cfg *config.Config) (*Tracker, error) { - pool, err := Open(&cfg.Tracker) - if err != nil { - return nil, err - } - bc, err := backend.Open(&cfg.Backend) if err != nil { return nil, err } - go purgeInactivePeers( - pool, + tkr := &Tracker{ + cfg: cfg, + backend: bc, + Storage: NewStorage(), + } + + go tkr.purgeInactivePeers( cfg.PurgeInactiveTorrents, cfg.Announce.Duration*2, cfg.Announce.Duration, ) - tkr := &Tracker{ - cfg: cfg, - Pool: pool, - backend: bc, - } - if cfg.ClientWhitelistEnabled { - err = tkr.LoadApprovedClients(cfg.ClientWhitelist) - if err != nil { - return nil, err - } + tkr.LoadApprovedClients(cfg.ClientWhitelist) } return tkr, nil } // Close gracefully shutdowns a Tracker by closing any database connections. -func (tkr *Tracker) Close() (err error) { - err = tkr.Pool.Close() - if err != nil { - return - } - - err = tkr.backend.Close() - if err != nil { - return - } - - return +func (tkr *Tracker) Close() error { + return tkr.backend.Close() } // LoadApprovedClients loads a list of client IDs into the tracker's storage. -func (tkr *Tracker) LoadApprovedClients(clients []string) error { - conn, err := tkr.Pool.Get() - if err != nil { - return err - } - +func (tkr *Tracker) LoadApprovedClients(clients []string) { for _, client := range clients { - err = conn.PutClient(client) - if err != nil { - return err - } + tkr.PutClient(client) } - - return nil } // Writer serializes a tracker's responses, and is implemented for each @@ -112,23 +84,14 @@ type Writer interface { // // The default interval is equal to the announce interval, since this is a // relatively expensive operation. -func purgeInactivePeers(p Pool, purgeEmptyTorrents bool, threshold, interval time.Duration) { +func (tkr *Tracker) purgeInactivePeers(purgeEmptyTorrents bool, threshold, interval time.Duration) { for _ = range time.NewTicker(interval).C { before := time.Now().Add(-threshold) glog.V(0).Infof("Purging peers with no announces since %s", before) - conn, err := p.Get() - - if err != nil { - glog.Error("Unable to get connection for a routine") - continue - } - - err = conn.PurgeInactivePeers(purgeEmptyTorrents, before) + err := tkr.PurgeInactivePeers(purgeEmptyTorrents, before) if err != nil { glog.Errorf("Error purging torrents: %s", err) } - - conn.Close() } }