diff --git a/.travis.yml b/.travis.yml index b776279..f0d08fd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,10 +2,6 @@ language: go go: 1.1.2 -before_install: - - sudo apt-get install libzmq3-dev - - go get -tags zmq_3_x github.com/alecthomas/gozmq - services: - redis-server diff --git a/README.md b/README.md index 102e6d5..9a8d0f2 100644 --- a/README.md +++ b/README.md @@ -8,30 +8,25 @@ in production. Some of the planned features include. - *Fast* request processing - Maximum compatibility with what exists of the BitTorrent spec - Correct IPv6 support -- A generic storage interface that is easily adapted to use any data store -- Scaling properties that directly correlate with those of the chosen data store +- A generic storage interfaces that is easily adapted to use any data store and web application +- Scaling properties that directly correlate with those of the chosen data stores ## Architecture You are most likely looking to integrate Chihaya with a web application for organizing torrents and managing a community. Chihaya was designed with this in mind, but also tries to remain -independent. Chihaya has its own data store that needs to be bootstrapped with data from your -web application. ZeroMQ is used to publish changes to this data. Your web application must -subscribe to this stream, collect these changes, and apply them (usually in a batch fashion). -The only caveat to this architecture is that when a torrent is added or deleted your web -application needs to update both its own data store and Chihaya's. +independent. Chihaya connects to two data stores. The first, known as "cache", is used between +Chihaya processes in order to keep up with fast changing data. The second, known as "storage", +is your web application's data store. Changes immediately take place in the cache, which is why +fast data stores are recommended. These changes are also collected and periodically applied to the +storage in order to avoid locking up your web application's data store. ## Installing Make sure you have your $GOROOT and $GOPATH set up correctly and have your $GOBIN on your $PATH. -You'll also need to install ZeroMQ with your favourite package manager. Next, you'll need to -"go get" the correct version of the gozmq library that corresponds to your system's version. -For example, these are the steps you'd use to install on Ubuntu 12.04 LTS: ```sh -$ sudo apt-get install libzmq-dev -$ go get -tags zmq_2_1 github.com/alecthomas/gozmq $ go install github.com/pushrax/chihaya ``` @@ -41,41 +36,53 @@ Configuration is done in a JSON formatted file specified with the `-config` flag. An example configuration can be seen in the `exampleConfig` variable of [`config/config_test.go`](https://github.com/pushrax/chihaya/blob/master/config/config_test.go). -## Default storage drivers +## Default drivers -Chihaya currently supports the following data stores out of the box: +### Cache + +Chihaya currently supports drivers for the following caches out of the box: * [redis](http://redis.io) -## Custom storage drivers +### Storage -The [`storage`] package is heavily inspired by the standard library's -[`database/sql`] package. To write a new storage backend, create a new Go -package that has an implementation of the [`Pool`], [`Tx`], and [`Driver`] -interfaces. Within that package, you must also define an [`init()`] that calls -[`storage.Register`]. +Chihaya currently supports drivers for the following storages out of the box: -[`storage`]: http://godoc.org/github.com/pushrax/chihaya/storage -[`database/sql`]: http://godoc.org/database/sql -[`Pool`]: http://godoc.org/github.com/pushrax/chihaya/storage#Pool -[`Tx`]: http://godoc.org/github.com/pushrax/chihaya/storage#Tx -[`Driver`]: http://godoc.org/github.com/pushrax/chihaya/storage#Driver -[`init()`]: http://golang.org/ref/spec#Program_execution -[`storage.Register`]: http://godoc.org/github.com/pushrax/chihaya/storage#Register +* [batter-postgres](https://github.com/wafflesfm/batter) + +## Custom drivers Please read the documentation and understand these interfaces as there are assumptions made about thread-safety. After you've implemented a new driver, -all you have to do is remember to add `import _ path/to/your/library` to the -top of any file in your project (preferably `main.go`) and the side effects from -`func init()` will globally register your driver so that config package will recognize -your driver by name. If you're writing a driver for a popular data store, consider -contributing it. +all you have to do is remember to add `import _ path/to/your/package` to the +top of `main.go` and the side effects from `init()` will globally register +your driver so that config package will recognize your driver by name. +If you're writing a driver for a popular data store, consider contributing it. +### Cache + +The [`cache`] package is heavily inspired by the standard library's +[`database/sql`] package. To write a new cache backend, create a new Go +package that has an implementation of the [`Pool`], [`Tx`], and [`Driver`] +interfaces. Within that package, you must also define an [`init()`] that calls +[`cache.Register`]. + +[`cache`]: http://godoc.org/github.com/pushrax/chihaya/cache +[`database/sql`]: http://godoc.org/database/sql +[`Pool`]: http://godoc.org/github.com/pushrax/chihaya/cache#Pool +[`Tx`]: http://godoc.org/github.com/pushrax/chihaya/cache#Tx +[`Driver`]: http://godoc.org/github.com/pushrax/chihaya/cache#Driver +[`init()`]: http://golang.org/ref/spec#Program_execution +[`cache.Register`]: http://godoc.org/github.com/pushrax/chihaya/cache#Register + +### Storage + +TODO ## Contributing If you're interested in contributing, please contact us in **[#chihaya] on -[freenode]** or post to the GitHub issue tracker. Please don't offer +[freenode IRC]** or post to the GitHub issue tracker. Please don't offer massive pull requests with no prior communication attempts as it will most likely lead to confusion and time wasted for everyone. However, small unannounced fixes are always welcome. diff --git a/storage/storage.go b/cache/cache.go similarity index 68% rename from storage/storage.go rename to cache/cache.go index 0cc6696..ed11ab3 100644 --- a/storage/storage.go +++ b/cache/cache.go @@ -3,14 +3,15 @@ // which can be found in the LICENSE file. // Package storage provides a generic interface for manipulating a -// BitTorrent tracker's data store. -package storage +// BitTorrent tracker's cache. +package cache import ( "errors" "fmt" "github.com/pushrax/chihaya/config" + "github.com/pushrax/chihaya/models" ) var ( @@ -19,7 +20,7 @@ var ( ) type Driver interface { - New(*config.Storage) Pool + New(*config.Cache) Pool } // Register makes a database driver available by the provided name. @@ -36,7 +37,7 @@ func Register(name string, driver Driver) { } // Open creates a pool of data store connections specified by a storage configuration. -func Open(conf *config.Storage) (Pool, error) { +func Open(conf *config.Cache) (Pool, error) { driver, ok := drivers[conf.Driver] if !ok { return nil, fmt.Errorf( @@ -65,19 +66,19 @@ type Tx interface { Rollback() error // Reads - FindUser(passkey string) (*User, bool, error) - FindTorrent(infohash string) (*Torrent, bool, error) + FindUser(passkey string) (*models.User, bool, error) + FindTorrent(infohash string) (*models.Torrent, bool, error) ClientWhitelisted(peerID string) (bool, error) // Writes - RecordSnatch(u *User, t *Torrent) error - MarkActive(t *Torrent) error - AddLeecher(t *Torrent, p *Peer) error - AddSeeder(t *Torrent, p *Peer) error - RemoveLeecher(t *Torrent, p *Peer) error - RemoveSeeder(t *Torrent, p *Peer) error - SetLeecher(t *Torrent, p *Peer) error - SetSeeder(t *Torrent, p *Peer) error - IncrementSlots(u *User) error - DecrementSlots(u *User) error + RecordSnatch(u *models.User, t *models.Torrent) error + MarkActive(t *models.Torrent) error + AddLeecher(t *models.Torrent, p *models.Peer) error + AddSeeder(t *models.Torrent, p *models.Peer) error + RemoveLeecher(t *models.Torrent, p *models.Peer) error + RemoveSeeder(t *models.Torrent, p *models.Peer) error + SetLeecher(t *models.Torrent, p *models.Peer) error + SetSeeder(t *models.Torrent, p *models.Peer) error + IncrementSlots(u *models.User) error + DecrementSlots(u *models.User) error } diff --git a/storage/redis/redis.go b/cache/redis/redis.go similarity index 79% rename from storage/redis/redis.go rename to cache/redis/redis.go index be471c7..49bdded 100644 --- a/storage/redis/redis.go +++ b/cache/redis/redis.go @@ -17,13 +17,14 @@ import ( "github.com/garyburd/redigo/redis" + "github.com/pushrax/chihaya/cache" "github.com/pushrax/chihaya/config" - "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/models" ) type driver struct{} -func (d *driver) New(conf *config.Storage) storage.Pool { +func (d *driver) New(conf *config.Cache) cache.Pool { return &Pool{ conf: conf, pool: redis.Pool{ @@ -35,7 +36,7 @@ func (d *driver) New(conf *config.Storage) storage.Pool { } } -func makeDialFunc(conf *config.Storage) func() (redis.Conn, error) { +func makeDialFunc(conf *config.Cache) func() (redis.Conn, error) { return func() (conn redis.Conn, err error) { if conf.ConnTimeout != nil { conn, err = redis.DialTimeout( @@ -61,7 +62,7 @@ func testOnBorrow(c redis.Conn, t time.Time) error { } type Pool struct { - conf *config.Storage + conf *config.Cache pool redis.Pool } @@ -69,7 +70,7 @@ func (p *Pool) Close() error { return p.pool.Close() } -func (p *Pool) Get() (storage.Tx, error) { +func (p *Pool) Get() (cache.Tx, error) { return &Tx{ conf: p.conf, done: false, @@ -93,7 +94,7 @@ func (p *Pool) Get() (storage.Tx, error) { // SET keyB // EXEC type Tx struct { - conf *config.Storage + conf *config.Cache done bool multi bool redis.Conn @@ -109,7 +110,7 @@ func (tx *Tx) close() { func (tx *Tx) initiateWrite() error { if tx.done { - return storage.ErrTxDone + return cache.ErrTxDone } if tx.multi != true { return tx.Send("MULTI") @@ -119,7 +120,7 @@ func (tx *Tx) initiateWrite() error { func (tx *Tx) initiateRead() error { if tx.done { - return storage.ErrTxDone + return cache.ErrTxDone } if tx.multi == true { panic("Tried to read during MULTI") @@ -129,7 +130,7 @@ func (tx *Tx) initiateRead() error { func (tx *Tx) Commit() error { if tx.done { - return storage.ErrTxDone + return cache.ErrTxDone } if tx.multi == true { _, err := tx.Do("EXEC") @@ -143,14 +144,14 @@ func (tx *Tx) Commit() error { func (tx *Tx) Rollback() error { if tx.done { - return storage.ErrTxDone + return cache.ErrTxDone } // Redis doesn't need to do anything. Exec is atomic. tx.close() return nil } -func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) { +func (tx *Tx) FindUser(passkey string) (*models.User, bool, error) { err := tx.initiateRead() if err != nil { return nil, false, err @@ -169,7 +170,7 @@ func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) { return nil, false, err } - user := &storage.User{} + user := &models.User{} err = json.NewDecoder(strings.NewReader(reply)).Decode(user) if err != nil { return nil, true, err @@ -177,7 +178,7 @@ func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) { return user, true, nil } -func (tx *Tx) FindTorrent(infohash string) (*storage.Torrent, bool, error) { +func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) { err := tx.initiateRead() if err != nil { return nil, false, err @@ -196,7 +197,7 @@ func (tx *Tx) FindTorrent(infohash string) (*storage.Torrent, bool, error) { return nil, false, err } - torrent := &storage.Torrent{} + torrent := &models.Torrent{} err = json.NewDecoder(strings.NewReader(reply)).Decode(torrent) if err != nil { return nil, true, err @@ -220,7 +221,7 @@ func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) { return } -func (tx *Tx) RecordSnatch(user *storage.User, torrent *storage.Torrent) error { +func (tx *Tx) RecordSnatch(user *models.User, torrent *models.Torrent) error { if err := tx.initiateWrite(); err != nil { return err } @@ -229,7 +230,7 @@ func (tx *Tx) RecordSnatch(user *storage.User, torrent *storage.Torrent) error { return nil } -func (tx *Tx) MarkActive(t *storage.Torrent) error { +func (tx *Tx) MarkActive(t *models.Torrent) error { if err := tx.initiateWrite(); err != nil { return err } @@ -238,7 +239,7 @@ func (tx *Tx) MarkActive(t *storage.Torrent) error { return nil } -func (tx *Tx) AddLeecher(t *storage.Torrent, p *storage.Peer) error { +func (tx *Tx) AddLeecher(t *models.Torrent, p *models.Peer) error { if err := tx.initiateWrite(); err != nil { return err } @@ -247,7 +248,7 @@ func (tx *Tx) AddLeecher(t *storage.Torrent, p *storage.Peer) error { return nil } -func (tx *Tx) SetLeecher(t *storage.Torrent, p *storage.Peer) error { +func (tx *Tx) SetLeecher(t *models.Torrent, p *models.Peer) error { if err := tx.initiateWrite(); err != nil { return err } @@ -256,7 +257,7 @@ func (tx *Tx) SetLeecher(t *storage.Torrent, p *storage.Peer) error { return nil } -func (tx *Tx) RemoveLeecher(t *storage.Torrent, p *storage.Peer) error { +func (tx *Tx) RemoveLeecher(t *models.Torrent, p *models.Peer) error { if err := tx.initiateWrite(); err != nil { return err } @@ -265,7 +266,7 @@ func (tx *Tx) RemoveLeecher(t *storage.Torrent, p *storage.Peer) error { return nil } -func (tx *Tx) AddSeeder(t *storage.Torrent, p *storage.Peer) error { +func (tx *Tx) AddSeeder(t *models.Torrent, p *models.Peer) error { if err := tx.initiateWrite(); err != nil { return err } @@ -274,7 +275,7 @@ func (tx *Tx) AddSeeder(t *storage.Torrent, p *storage.Peer) error { return nil } -func (tx *Tx) SetSeeder(t *storage.Torrent, p *storage.Peer) error { +func (tx *Tx) SetSeeder(t *models.Torrent, p *models.Peer) error { if err := tx.initiateWrite(); err != nil { return err } @@ -283,7 +284,7 @@ func (tx *Tx) SetSeeder(t *storage.Torrent, p *storage.Peer) error { return nil } -func (tx *Tx) RemoveSeeder(t *storage.Torrent, p *storage.Peer) error { +func (tx *Tx) RemoveSeeder(t *models.Torrent, p *models.Peer) error { if err := tx.initiateWrite(); err != nil { return err } @@ -292,7 +293,7 @@ func (tx *Tx) RemoveSeeder(t *storage.Torrent, p *storage.Peer) error { return nil } -func (tx *Tx) IncrementSlots(u *storage.User) error { +func (tx *Tx) IncrementSlots(u *models.User) error { if err := tx.initiateWrite(); err != nil { return err } @@ -301,7 +302,7 @@ func (tx *Tx) IncrementSlots(u *storage.User) error { return nil } -func (tx *Tx) DecrementSlots(u *storage.User) error { +func (tx *Tx) DecrementSlots(u *models.User) error { if err := tx.initiateWrite(); err != nil { return err } @@ -311,5 +312,5 @@ func (tx *Tx) DecrementSlots(u *storage.User) error { } func init() { - storage.Register("redis", &driver{}) + cache.Register("redis", &driver{}) } diff --git a/config/config.go b/config/config.go index a762807..3f65876 100644 --- a/config/config.go +++ b/config/config.go @@ -27,8 +27,8 @@ func (d *Duration) UnmarshalJSON(b []byte) error { return err } -// Storage represents the configuration for any storage.DS. -type Storage struct { +// Cache represents the configuration for any data store used as a cache. +type Cache struct { Driver string `json:"driver"` Network string `json:"network` Addr string `json:"addr"` @@ -46,9 +46,9 @@ type Storage struct { // Config represents a configuration for a server.Server. type Config struct { - Addr string `json:"addr"` - PubAddr string `json:"pub_addr"` - Storage Storage `json:"storage"` + Addr string `json:"addr"` + PubAddr string `json:"pub_addr"` + Cache Cache `json:"cache"` Private bool `json:"private"` Freeleech bool `json:"freeleech"` diff --git a/config/config_test.go b/config/config_test.go index d969ebd..99a85d1 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -17,7 +17,7 @@ var exampleJson = `{ "network": "tcp", "addr": ":34000", "pub_addr": "tcp://*:34001", - "storage": { + "cache": { "driver": "redis", "addr": "127.0.0.1:6379", "user": "root", diff --git a/main.go b/main.go index 6511dae..f1ed81d 100644 --- a/main.go +++ b/main.go @@ -12,9 +12,9 @@ import ( "runtime" "runtime/pprof" + _ "github.com/pushrax/chihaya/cache/redis" "github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/server" - _ "github.com/pushrax/chihaya/storage/redis" ) var ( diff --git a/storage/data.go b/models/models.go similarity index 98% rename from storage/data.go rename to models/models.go index ff4d1c7..87afcba 100644 --- a/storage/data.go +++ b/models/models.go @@ -2,7 +2,7 @@ // Use of this source code is governed by the BSD 2-Clause license, // which can be found in the LICENSE file. -package storage +package models type Peer struct { ID string `json:"id"` diff --git a/server/announce.go b/server/announce.go index a0b4d10..50ca482 100644 --- a/server/announce.go +++ b/server/announce.go @@ -12,7 +12,7 @@ import ( "strconv" "time" - "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/models" ) func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { @@ -24,7 +24,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // Retry failed transactions a specified number of times - for i := 0; i < s.conf.Storage.TxRetries; i++ { + for i := 0; i < s.conf.Cache.TxRetries; i++ { // Start a transaction tx, err := s.dbConnPool.Get() @@ -69,7 +69,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // Create a new peer object from the request - peer := &storage.Peer{ + peer := &models.Peer{ ID: peerID, UserID: user.ID, TorrentID: torrent.ID, @@ -332,7 +332,7 @@ func minInt(a, b int) int { return b } -func writeSeeders(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) { +func writeSeeders(w http.ResponseWriter, t *models.Torrent, count, numWant int, compact bool) { for _, seed := range t.Seeders { if count >= numWant { break @@ -353,7 +353,7 @@ func writeSeeders(w http.ResponseWriter, t *storage.Torrent, count, numWant int, } } -func writeLeechers(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) { +func writeLeechers(w http.ResponseWriter, t *models.Torrent, count, numWant int, compact bool) { for _, leech := range t.Leechers { if count >= numWant { break diff --git a/server/publish.go b/server/publish.go deleted file mode 100644 index 1b42090..0000000 --- a/server/publish.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2013 The Chihaya Authors. All rights reserved. -// Use of this source code is governed by the BSD 2-Clause license, -// which can be found in the LICENSE file. -package server - -import ( - zmq "github.com/alecthomas/gozmq" -) - -func (s *Server) publishQueue() { - context, err := zmq.NewContext() - if err != nil { - panic(err) - } - defer context.Close() - - socket, err := context.NewSocket(zmq.PUB) - if err != nil { - panic(err) - } - defer socket.Close() - - socket.Bind(s.conf.PubAddr) - - for msg := range s.pubChan { - socket.Send([]byte(msg), 0) - } -} diff --git a/server/scrape.go b/server/scrape.go index 6d0777f..f865bc7 100644 --- a/server/scrape.go +++ b/server/scrape.go @@ -11,7 +11,7 @@ import ( "net/http" "path" - "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/models" ) func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { @@ -68,7 +68,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { w.(http.Flusher).Flush() } -func writeScrapeInfo(w io.Writer, torrent *storage.Torrent) { +func writeScrapeInfo(w io.Writer, torrent *models.Torrent) { io.WriteString(w, "d") writeBencoded(w, "complete") writeBencoded(w, len(torrent.Seeders)) diff --git a/server/server.go b/server/server.go index 93c79b4..52f541f 100644 --- a/server/server.go +++ b/server/server.go @@ -17,14 +17,15 @@ import ( "sync/atomic" "time" + "github.com/pushrax/chihaya/cache" "github.com/pushrax/chihaya/config" - "github.com/pushrax/chihaya/storage" + "github.com/pushrax/chihaya/models" ) type Server struct { conf *config.Config listener net.Listener - dbConnPool storage.Pool + dbConnPool cache.Pool serving bool startTime time.Time @@ -40,7 +41,7 @@ type Server struct { } func New(conf *config.Config) (*Server, error) { - pool, err := storage.Open(&conf.Storage) + pool, err := cache.Open(&conf.Cache) if err != nil { return nil, err } @@ -69,7 +70,6 @@ func (s *Server) ListenAndServe() error { s.startTime = time.Now() go s.updateStats() - go s.publishQueue() s.Serve(s.listener) s.waitgroup.Wait() @@ -126,7 +126,7 @@ func fail(err error, w http.ResponseWriter, r *http.Request) { w.(http.Flusher).Flush() } -func validateUser(tx storage.Tx, dir string) (*storage.User, error) { +func validateUser(tx cache.Tx, dir string) (*models.User, error) { if len(dir) != 34 { return nil, errors.New("Passkey is invalid") } diff --git a/server/torrent.go b/server/torrent.go index f416c87..a1c1864 100644 --- a/server/torrent.go +++ b/server/torrent.go @@ -47,6 +47,3 @@ func writeBencoded(w io.Writer, data interface{}) { panic("Tried to bencode an unsupported type!") } } - -func compact() { -}