initial architecture overhaul

This commit is contained in:
Jimmy Zelinskie 2013-08-23 15:39:42 -04:00
parent d756de9127
commit d3f3c81c61
13 changed files with 103 additions and 129 deletions

View file

@ -2,10 +2,6 @@ language: go
go: 1.1.2
before_install:
- sudo apt-get install libzmq3-dev
- go get -tags zmq_3_x github.com/alecthomas/gozmq
services:
- redis-server

View file

@ -8,30 +8,25 @@ in production. Some of the planned features include.
- *Fast* request processing
- Maximum compatibility with what exists of the BitTorrent spec
- Correct IPv6 support
- A generic storage interface that is easily adapted to use any data store
- Scaling properties that directly correlate with those of the chosen data store
- A generic storage interfaces that is easily adapted to use any data store and web application
- Scaling properties that directly correlate with those of the chosen data stores
## Architecture
You are most likely looking to integrate Chihaya with a web application for organizing torrents
and managing a community. Chihaya was designed with this in mind, but also tries to remain
independent. Chihaya has its own data store that needs to be bootstrapped with data from your
web application. ZeroMQ is used to publish changes to this data. Your web application must
subscribe to this stream, collect these changes, and apply them (usually in a batch fashion).
The only caveat to this architecture is that when a torrent is added or deleted your web
application needs to update both its own data store and Chihaya's.
independent. Chihaya connects to two data stores. The first, known as "cache", is used between
Chihaya processes in order to keep up with fast changing data. The second, known as "storage",
is your web application's data store. Changes immediately take place in the cache, which is why
fast data stores are recommended. These changes are also collected and periodically applied to the
storage in order to avoid locking up your web application's data store.
## Installing
Make sure you have your $GOROOT and $GOPATH set up correctly and have your $GOBIN on your $PATH.
You'll also need to install ZeroMQ with your favourite package manager. Next, you'll need to
"go get" the correct version of the gozmq library that corresponds to your system's version.
For example, these are the steps you'd use to install on Ubuntu 12.04 LTS:
```sh
$ sudo apt-get install libzmq-dev
$ go get -tags zmq_2_1 github.com/alecthomas/gozmq
$ go install github.com/pushrax/chihaya
```
@ -41,41 +36,53 @@ Configuration is done in a JSON formatted file specified with the `-config`
flag. An example configuration can be seen in the `exampleConfig` variable of
[`config/config_test.go`](https://github.com/pushrax/chihaya/blob/master/config/config_test.go).
## Default storage drivers
## Default drivers
Chihaya currently supports the following data stores out of the box:
### Cache
Chihaya currently supports drivers for the following caches out of the box:
* [redis](http://redis.io)
## Custom storage drivers
### Storage
The [`storage`] package is heavily inspired by the standard library's
[`database/sql`] package. To write a new storage backend, create a new Go
package that has an implementation of the [`Pool`], [`Tx`], and [`Driver`]
interfaces. Within that package, you must also define an [`init()`] that calls
[`storage.Register`].
Chihaya currently supports drivers for the following storages out of the box:
[`storage`]: http://godoc.org/github.com/pushrax/chihaya/storage
[`database/sql`]: http://godoc.org/database/sql
[`Pool`]: http://godoc.org/github.com/pushrax/chihaya/storage#Pool
[`Tx`]: http://godoc.org/github.com/pushrax/chihaya/storage#Tx
[`Driver`]: http://godoc.org/github.com/pushrax/chihaya/storage#Driver
[`init()`]: http://golang.org/ref/spec#Program_execution
[`storage.Register`]: http://godoc.org/github.com/pushrax/chihaya/storage#Register
* [batter-postgres](https://github.com/wafflesfm/batter)
## Custom drivers
Please read the documentation and understand these interfaces as there are
assumptions made about thread-safety. After you've implemented a new driver,
all you have to do is remember to add `import _ path/to/your/library` to the
top of any file in your project (preferably `main.go`) and the side effects from
`func init()` will globally register your driver so that config package will recognize
your driver by name. If you're writing a driver for a popular data store, consider
contributing it.
all you have to do is remember to add `import _ path/to/your/package` to the
top of `main.go` and the side effects from `init()` will globally register
your driver so that config package will recognize your driver by name.
If you're writing a driver for a popular data store, consider contributing it.
### Cache
The [`cache`] package is heavily inspired by the standard library's
[`database/sql`] package. To write a new cache backend, create a new Go
package that has an implementation of the [`Pool`], [`Tx`], and [`Driver`]
interfaces. Within that package, you must also define an [`init()`] that calls
[`cache.Register`].
[`cache`]: http://godoc.org/github.com/pushrax/chihaya/cache
[`database/sql`]: http://godoc.org/database/sql
[`Pool`]: http://godoc.org/github.com/pushrax/chihaya/cache#Pool
[`Tx`]: http://godoc.org/github.com/pushrax/chihaya/cache#Tx
[`Driver`]: http://godoc.org/github.com/pushrax/chihaya/cache#Driver
[`init()`]: http://golang.org/ref/spec#Program_execution
[`cache.Register`]: http://godoc.org/github.com/pushrax/chihaya/cache#Register
### Storage
TODO
## Contributing
If you're interested in contributing, please contact us in **[#chihaya] on
[freenode]** or post to the GitHub issue tracker. Please don't offer
[freenode IRC]** or post to the GitHub issue tracker. Please don't offer
massive pull requests with no prior communication attempts as it will most
likely lead to confusion and time wasted for everyone. However, small
unannounced fixes are always welcome.

View file

@ -3,14 +3,15 @@
// which can be found in the LICENSE file.
// Package storage provides a generic interface for manipulating a
// BitTorrent tracker's data store.
package storage
// BitTorrent tracker's cache.
package cache
import (
"errors"
"fmt"
"github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/models"
)
var (
@ -19,7 +20,7 @@ var (
)
type Driver interface {
New(*config.Storage) Pool
New(*config.Cache) Pool
}
// Register makes a database driver available by the provided name.
@ -36,7 +37,7 @@ func Register(name string, driver Driver) {
}
// Open creates a pool of data store connections specified by a storage configuration.
func Open(conf *config.Storage) (Pool, error) {
func Open(conf *config.Cache) (Pool, error) {
driver, ok := drivers[conf.Driver]
if !ok {
return nil, fmt.Errorf(
@ -65,19 +66,19 @@ type Tx interface {
Rollback() error
// Reads
FindUser(passkey string) (*User, bool, error)
FindTorrent(infohash string) (*Torrent, bool, error)
FindUser(passkey string) (*models.User, bool, error)
FindTorrent(infohash string) (*models.Torrent, bool, error)
ClientWhitelisted(peerID string) (bool, error)
// Writes
RecordSnatch(u *User, t *Torrent) error
MarkActive(t *Torrent) error
AddLeecher(t *Torrent, p *Peer) error
AddSeeder(t *Torrent, p *Peer) error
RemoveLeecher(t *Torrent, p *Peer) error
RemoveSeeder(t *Torrent, p *Peer) error
SetLeecher(t *Torrent, p *Peer) error
SetSeeder(t *Torrent, p *Peer) error
IncrementSlots(u *User) error
DecrementSlots(u *User) error
RecordSnatch(u *models.User, t *models.Torrent) error
MarkActive(t *models.Torrent) error
AddLeecher(t *models.Torrent, p *models.Peer) error
AddSeeder(t *models.Torrent, p *models.Peer) error
RemoveLeecher(t *models.Torrent, p *models.Peer) error
RemoveSeeder(t *models.Torrent, p *models.Peer) error
SetLeecher(t *models.Torrent, p *models.Peer) error
SetSeeder(t *models.Torrent, p *models.Peer) error
IncrementSlots(u *models.User) error
DecrementSlots(u *models.User) error
}

View file

@ -17,13 +17,14 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/pushrax/chihaya/cache"
"github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/storage"
"github.com/pushrax/chihaya/models"
)
type driver struct{}
func (d *driver) New(conf *config.Storage) storage.Pool {
func (d *driver) New(conf *config.Cache) cache.Pool {
return &Pool{
conf: conf,
pool: redis.Pool{
@ -35,7 +36,7 @@ func (d *driver) New(conf *config.Storage) storage.Pool {
}
}
func makeDialFunc(conf *config.Storage) func() (redis.Conn, error) {
func makeDialFunc(conf *config.Cache) func() (redis.Conn, error) {
return func() (conn redis.Conn, err error) {
if conf.ConnTimeout != nil {
conn, err = redis.DialTimeout(
@ -61,7 +62,7 @@ func testOnBorrow(c redis.Conn, t time.Time) error {
}
type Pool struct {
conf *config.Storage
conf *config.Cache
pool redis.Pool
}
@ -69,7 +70,7 @@ func (p *Pool) Close() error {
return p.pool.Close()
}
func (p *Pool) Get() (storage.Tx, error) {
func (p *Pool) Get() (cache.Tx, error) {
return &Tx{
conf: p.conf,
done: false,
@ -93,7 +94,7 @@ func (p *Pool) Get() (storage.Tx, error) {
// SET keyB
// EXEC
type Tx struct {
conf *config.Storage
conf *config.Cache
done bool
multi bool
redis.Conn
@ -109,7 +110,7 @@ func (tx *Tx) close() {
func (tx *Tx) initiateWrite() error {
if tx.done {
return storage.ErrTxDone
return cache.ErrTxDone
}
if tx.multi != true {
return tx.Send("MULTI")
@ -119,7 +120,7 @@ func (tx *Tx) initiateWrite() error {
func (tx *Tx) initiateRead() error {
if tx.done {
return storage.ErrTxDone
return cache.ErrTxDone
}
if tx.multi == true {
panic("Tried to read during MULTI")
@ -129,7 +130,7 @@ func (tx *Tx) initiateRead() error {
func (tx *Tx) Commit() error {
if tx.done {
return storage.ErrTxDone
return cache.ErrTxDone
}
if tx.multi == true {
_, err := tx.Do("EXEC")
@ -143,14 +144,14 @@ func (tx *Tx) Commit() error {
func (tx *Tx) Rollback() error {
if tx.done {
return storage.ErrTxDone
return cache.ErrTxDone
}
// Redis doesn't need to do anything. Exec is atomic.
tx.close()
return nil
}
func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) {
func (tx *Tx) FindUser(passkey string) (*models.User, bool, error) {
err := tx.initiateRead()
if err != nil {
return nil, false, err
@ -169,7 +170,7 @@ func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) {
return nil, false, err
}
user := &storage.User{}
user := &models.User{}
err = json.NewDecoder(strings.NewReader(reply)).Decode(user)
if err != nil {
return nil, true, err
@ -177,7 +178,7 @@ func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) {
return user, true, nil
}
func (tx *Tx) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) {
err := tx.initiateRead()
if err != nil {
return nil, false, err
@ -196,7 +197,7 @@ func (tx *Tx) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
return nil, false, err
}
torrent := &storage.Torrent{}
torrent := &models.Torrent{}
err = json.NewDecoder(strings.NewReader(reply)).Decode(torrent)
if err != nil {
return nil, true, err
@ -220,7 +221,7 @@ func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) {
return
}
func (tx *Tx) RecordSnatch(user *storage.User, torrent *storage.Torrent) error {
func (tx *Tx) RecordSnatch(user *models.User, torrent *models.Torrent) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -229,7 +230,7 @@ func (tx *Tx) RecordSnatch(user *storage.User, torrent *storage.Torrent) error {
return nil
}
func (tx *Tx) MarkActive(t *storage.Torrent) error {
func (tx *Tx) MarkActive(t *models.Torrent) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -238,7 +239,7 @@ func (tx *Tx) MarkActive(t *storage.Torrent) error {
return nil
}
func (tx *Tx) AddLeecher(t *storage.Torrent, p *storage.Peer) error {
func (tx *Tx) AddLeecher(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -247,7 +248,7 @@ func (tx *Tx) AddLeecher(t *storage.Torrent, p *storage.Peer) error {
return nil
}
func (tx *Tx) SetLeecher(t *storage.Torrent, p *storage.Peer) error {
func (tx *Tx) SetLeecher(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -256,7 +257,7 @@ func (tx *Tx) SetLeecher(t *storage.Torrent, p *storage.Peer) error {
return nil
}
func (tx *Tx) RemoveLeecher(t *storage.Torrent, p *storage.Peer) error {
func (tx *Tx) RemoveLeecher(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -265,7 +266,7 @@ func (tx *Tx) RemoveLeecher(t *storage.Torrent, p *storage.Peer) error {
return nil
}
func (tx *Tx) AddSeeder(t *storage.Torrent, p *storage.Peer) error {
func (tx *Tx) AddSeeder(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -274,7 +275,7 @@ func (tx *Tx) AddSeeder(t *storage.Torrent, p *storage.Peer) error {
return nil
}
func (tx *Tx) SetSeeder(t *storage.Torrent, p *storage.Peer) error {
func (tx *Tx) SetSeeder(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -283,7 +284,7 @@ func (tx *Tx) SetSeeder(t *storage.Torrent, p *storage.Peer) error {
return nil
}
func (tx *Tx) RemoveSeeder(t *storage.Torrent, p *storage.Peer) error {
func (tx *Tx) RemoveSeeder(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -292,7 +293,7 @@ func (tx *Tx) RemoveSeeder(t *storage.Torrent, p *storage.Peer) error {
return nil
}
func (tx *Tx) IncrementSlots(u *storage.User) error {
func (tx *Tx) IncrementSlots(u *models.User) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -301,7 +302,7 @@ func (tx *Tx) IncrementSlots(u *storage.User) error {
return nil
}
func (tx *Tx) DecrementSlots(u *storage.User) error {
func (tx *Tx) DecrementSlots(u *models.User) error {
if err := tx.initiateWrite(); err != nil {
return err
}
@ -311,5 +312,5 @@ func (tx *Tx) DecrementSlots(u *storage.User) error {
}
func init() {
storage.Register("redis", &driver{})
cache.Register("redis", &driver{})
}

View file

@ -27,8 +27,8 @@ func (d *Duration) UnmarshalJSON(b []byte) error {
return err
}
// Storage represents the configuration for any storage.DS.
type Storage struct {
// Cache represents the configuration for any data store used as a cache.
type Cache struct {
Driver string `json:"driver"`
Network string `json:"network`
Addr string `json:"addr"`
@ -48,7 +48,7 @@ type Storage struct {
type Config struct {
Addr string `json:"addr"`
PubAddr string `json:"pub_addr"`
Storage Storage `json:"storage"`
Cache Cache `json:"cache"`
Private bool `json:"private"`
Freeleech bool `json:"freeleech"`

View file

@ -17,7 +17,7 @@ var exampleJson = `{
"network": "tcp",
"addr": ":34000",
"pub_addr": "tcp://*:34001",
"storage": {
"cache": {
"driver": "redis",
"addr": "127.0.0.1:6379",
"user": "root",

View file

@ -12,9 +12,9 @@ import (
"runtime"
"runtime/pprof"
_ "github.com/pushrax/chihaya/cache/redis"
"github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/server"
_ "github.com/pushrax/chihaya/storage/redis"
)
var (

View file

@ -2,7 +2,7 @@
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package storage
package models
type Peer struct {
ID string `json:"id"`

View file

@ -12,7 +12,7 @@ import (
"strconv"
"time"
"github.com/pushrax/chihaya/storage"
"github.com/pushrax/chihaya/models"
)
func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
@ -24,7 +24,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
}
// Retry failed transactions a specified number of times
for i := 0; i < s.conf.Storage.TxRetries; i++ {
for i := 0; i < s.conf.Cache.TxRetries; i++ {
// Start a transaction
tx, err := s.dbConnPool.Get()
@ -69,7 +69,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
}
// Create a new peer object from the request
peer := &storage.Peer{
peer := &models.Peer{
ID: peerID,
UserID: user.ID,
TorrentID: torrent.ID,
@ -332,7 +332,7 @@ func minInt(a, b int) int {
return b
}
func writeSeeders(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) {
func writeSeeders(w http.ResponseWriter, t *models.Torrent, count, numWant int, compact bool) {
for _, seed := range t.Seeders {
if count >= numWant {
break
@ -353,7 +353,7 @@ func writeSeeders(w http.ResponseWriter, t *storage.Torrent, count, numWant int,
}
}
func writeLeechers(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) {
func writeLeechers(w http.ResponseWriter, t *models.Torrent, count, numWant int, compact bool) {
for _, leech := range t.Leechers {
if count >= numWant {
break

View file

@ -1,28 +0,0 @@
// Copyright 2013 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package server
import (
zmq "github.com/alecthomas/gozmq"
)
func (s *Server) publishQueue() {
context, err := zmq.NewContext()
if err != nil {
panic(err)
}
defer context.Close()
socket, err := context.NewSocket(zmq.PUB)
if err != nil {
panic(err)
}
defer socket.Close()
socket.Bind(s.conf.PubAddr)
for msg := range s.pubChan {
socket.Send([]byte(msg), 0)
}
}

View file

@ -11,7 +11,7 @@ import (
"net/http"
"path"
"github.com/pushrax/chihaya/storage"
"github.com/pushrax/chihaya/models"
)
func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
@ -68,7 +68,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
w.(http.Flusher).Flush()
}
func writeScrapeInfo(w io.Writer, torrent *storage.Torrent) {
func writeScrapeInfo(w io.Writer, torrent *models.Torrent) {
io.WriteString(w, "d")
writeBencoded(w, "complete")
writeBencoded(w, len(torrent.Seeders))

View file

@ -17,14 +17,15 @@ import (
"sync/atomic"
"time"
"github.com/pushrax/chihaya/cache"
"github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/storage"
"github.com/pushrax/chihaya/models"
)
type Server struct {
conf *config.Config
listener net.Listener
dbConnPool storage.Pool
dbConnPool cache.Pool
serving bool
startTime time.Time
@ -40,7 +41,7 @@ type Server struct {
}
func New(conf *config.Config) (*Server, error) {
pool, err := storage.Open(&conf.Storage)
pool, err := cache.Open(&conf.Cache)
if err != nil {
return nil, err
}
@ -69,7 +70,6 @@ func (s *Server) ListenAndServe() error {
s.startTime = time.Now()
go s.updateStats()
go s.publishQueue()
s.Serve(s.listener)
s.waitgroup.Wait()
@ -126,7 +126,7 @@ func fail(err error, w http.ResponseWriter, r *http.Request) {
w.(http.Flusher).Flush()
}
func validateUser(tx storage.Tx, dir string) (*storage.User, error) {
func validateUser(tx cache.Tx, dir string) (*models.User, error) {
if len(dir) != 34 {
return nil, errors.New("Passkey is invalid")
}

View file

@ -47,6 +47,3 @@ func writeBencoded(w io.Writer, data interface{}) {
panic("Tried to bencode an unsupported type!")
}
}
func compact() {
}