diff --git a/cache/cache.go b/cache/cache.go index 5f3ef69..569d9c7 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -15,8 +15,8 @@ import ( ) var ( - drivers = make(map[string]Driver) - ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back") + drivers = make(map[string]Driver) + ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back") ErrTxConflict = errors.New("cache: Commit interrupted, update transaction and repeat") ) diff --git a/cache/redis/redis.go b/cache/redis/redis.go index a1b194e..21ae0ea 100644 --- a/cache/redis/redis.go +++ b/cache/redis/redis.go @@ -31,7 +31,7 @@ func (d *driver) New(conf *config.DataStore) cache.Pool { return &Pool{ conf: conf, pool: redis.Pool{ - MaxIdle: conf.MaxIdleConn, + MaxIdle: conf.MaxIdleConns, IdleTimeout: conf.IdleTimeout.Duration, Dial: makeDialFunc(conf), TestOnBorrow: testOnBorrow, diff --git a/cache/redis/redis_test.go b/cache/redis/redis_test.go index 6f45ff3..f1a3cd4 100644 --- a/cache/redis/redis_test.go +++ b/cache/redis/redis_test.go @@ -47,7 +47,7 @@ func createTestTxObj(t TestReporter) *Tx { testPool := &Pool{ conf: conf, pool: redis.Pool{ - MaxIdle: conf.MaxIdleConn, + MaxIdle: conf.MaxIdleConns, IdleTimeout: conf.IdleTimeout.Duration, Dial: makeDialFunc(conf), TestOnBorrow: testOnBorrow, diff --git a/config/config.go b/config/config.go index c751909..c5907da 100644 --- a/config/config.go +++ b/config/config.go @@ -39,8 +39,8 @@ type DataStore struct { Encoding string `json:"encoding,omitempty"` Prefix string `json:"prefix,omitempty"` - MaxIdleConn int `json:"max_idle_conn,omitempty"` - IdleTimeout *Duration `json:"idle_timeout,omitempty"` + MaxIdleConns int `json:"max_idle_conns,omitempty"` + IdleTimeout *Duration `json:"idle_timeout,omitempty"` } // Config represents a configuration for a server.Server. diff --git a/config/example.json b/config/example.json index 90b5111..44a1ba9 100644 --- a/config/example.json +++ b/config/example.json @@ -12,7 +12,7 @@ "pass": "", "prefix": "test:", - "max_idle_conn": 3, + "max_idle_conns": 3, "idle_timeout": "240s" }, diff --git a/main.go b/main.go index f1ed81d..ed13f83 100644 --- a/main.go +++ b/main.go @@ -12,9 +12,12 @@ import ( "runtime" "runtime/pprof" - _ "github.com/pushrax/chihaya/cache/redis" "github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/server" + + _ "github.com/pushrax/chihaya/cache/redis" + _ "github.com/pushrax/chihaya/storage/batter" + _ "github.com/pushrax/chihaya/storage/gazelle" ) var ( diff --git a/models/models.go b/models/models.go index 727c0a4..abfeeb6 100644 --- a/models/models.go +++ b/models/models.go @@ -5,11 +5,13 @@ package models type Peer struct { - ID string `json:"id"` - UserID uint64 `json:"user_id"` - TorrentID uint64 `json:"torrent_id"` - IP string `json:"ip"` - Port uint64 `json:"port"` + ID string `json:"id"` + UserID uint64 `json:"user_id"` + TorrentID uint64 `json:"torrent_id"` + + IP string `json:"ip"` + Port uint64 `json:"port"` + Uploaded uint64 `json:"uploaded"` Downloaded uint64 `json:"downloaded` Left uint64 `json:"left"` @@ -17,20 +19,23 @@ type Peer struct { } type Torrent struct { - ID uint64 `json:"id"` - Infohash string `json:"infohash"` - Active bool `json:"active"` - Seeders map[string]Peer `json:"seeders"` - Leechers map[string]Peer `json:"leechers"` - Snatches uint `json:"snatches"` - UpMultiplier float64 `json:"up_multiplier"` - DownMultiplier float64 `json:"down_multiplier"` - LastAction int64 `json:"last_action"` + ID uint64 `json:"id"` + Infohash string `json:"infohash"` + Active bool `json:"active"` + + Seeders map[string]Peer `json:"seeders"` + Leechers map[string]Peer `json:"leechers"` + + Snatches uint `json:"snatches"` + UpMultiplier float64 `json:"up_multiplier"` + DownMultiplier float64 `json:"down_multiplier"` + LastAction int64 `json:"last_action"` } type User struct { - ID uint64 `json:"id"` - Passkey string `json:"passkey"` + ID uint64 `json:"id"` + Passkey string `json:"passkey"` + UpMultiplier float64 `json:"up_multiplier"` DownMultiplier float64 `json:"down_multiplier"` Slots int64 `json:"slots"` diff --git a/storage/batter/batter.go b/storage/batter/batter.go index 4badba2..311436e 100644 --- a/storage/batter/batter.go +++ b/storage/batter/batter.go @@ -11,7 +11,6 @@ import ( "fmt" "github.com/pushrax/chihaya/config" - "github.com/pushrax/chihaya/models" "github.com/pushrax/chihaya/storage" _ "github.com/bmizerany/pq" @@ -32,6 +31,11 @@ func (d *driver) New(conf *config.DataStore) storage.Conn { if err != nil { panic("batter: failed to open connection to postgres") } + + if conf.MaxIdleConns != 0 { + db.SetMaxIdleConns(conf.MaxIdleConns) + } + return &Conn{db} } @@ -39,11 +43,11 @@ type Conn struct { *sql.DB } -func (c *Conn) UpdateTorrents(t []models.Torrent) error { +func (c *Conn) Start() error { return nil } -func (c *Conn) UpdateUsers(u []models.User) error { +func (c *Conn) RecordAnnounce(delta *storage.AnnounceDelta) error { return nil } diff --git a/storage/batter/load.go b/storage/batter/load.go new file mode 100644 index 0000000..9e35bd2 --- /dev/null +++ b/storage/batter/load.go @@ -0,0 +1,25 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package batter + +import ( + "github.com/pushrax/chihaya/models" +) + +func (c *Conn) LoadTorrents(ids []uint64) ([]*models.Torrent, error) { + return nil, nil +} + +func (c *Conn) LoadAllTorrents() ([]*models.Torrent, error) { + return nil, nil +} + +func (c *Conn) LoadUsers(ids []uint64) ([]*models.User, error) { + return nil, nil +} + +func (c *Conn) LoadAllUsers(ids []uint64) ([]*models.User, error) { + return nil, nil +} diff --git a/storage/gazelle/flush.go b/storage/gazelle/flush.go new file mode 100644 index 0000000..d847d2d --- /dev/null +++ b/storage/gazelle/flush.go @@ -0,0 +1,62 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package gazelle + +import ( + "bytes" + "log" + "time" +) + +func (c *Conn) flushTorrents() { + var query bytes.Buffer + c.waitGroup.Add(1) + defer c.waitGroup.Done() + var count int + + for { + length := len(c.torrentChannel) + query.Reset() + + query.WriteString("INSERT INTO torrents (ID, Snatched, Seeders, Leechers, last_action) VALUES\n") + + for count = 0; count < length; count++ { + s := <-c.torrentChannel + if s == "" { + break + } + query.WriteString(s) + + if count != length-1 { + query.WriteRune(',') + } + } + + if !c.terminate { + log.Printf("[torrents] Flushing %d\n", count) + } + + if count > 0 { + query.WriteString("\nON DUPLICATE KEY UPDATE Snatched = Snatched + VALUES(Snatched), " + + "Seeders = VALUES(Seeders), Leechers = VALUES(Leechers), " + + "last_action = IF(last_action < VALUES(last_action), VALUES(last_action), last_action);") + + c.Exec(query.String()) + + if length < cap(c.torrentChannel)/2 { + time.Sleep(200 * time.Millisecond) + } + } else if c.terminate { + break + } else { + time.Sleep(time.Second) + } + } +} + +func (c *Conn) flushUsers() {} +func (c *Conn) flushTransferHistory() {} +func (c *Conn) flushTransferIps() {} +func (c *Conn) flushSnatches() {} diff --git a/storage/gazelle/gazelle.go b/storage/gazelle/gazelle.go new file mode 100644 index 0000000..484409b --- /dev/null +++ b/storage/gazelle/gazelle.go @@ -0,0 +1,99 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Package gazelle provides a driver for a BitTorrent tracker to interface +// with the MySQL database used by Gazelle (github.com/WhatCD/Gazelle). +package gazelle + +import ( + "database/sql" + "fmt" + "sync" + + "github.com/pushrax/chihaya/config" + "github.com/pushrax/chihaya/storage" + + _ "github.com/go-sql-driver/mysql" +) + +type driver struct{} + +func (d *driver) New(conf *config.DataStore) storage.Conn { + dsn := fmt.Sprintf( + "%s:%s@%s:%s/%s?charset=utf8mb4,utf8", + conf.Username, + conf.Password, + conf.Host, + conf.Port, + conf.Schema, + ) + db, err := sql.Open("mysql", dsn) + if err != nil { + panic("gazelle: failed to open connection to MySQL") + } + + if conf.MaxIdleConns != 0 { + db.SetMaxIdleConns(conf.MaxIdleConns) + } + + conn := &Conn{DB: db} + + // TODO Buffer sizes + conn.torrentChannel = make(chan string, 1000) + conn.userChannel = make(chan string, 1000) + conn.transferHistoryChannel = make(chan string, 1000) + conn.transferIpsChannel = make(chan string, 1000) + conn.snatchChannel = make(chan string, 100) + + return conn +} + +type Conn struct { + waitGroup sync.WaitGroup + terminate bool + + torrentChannel chan string + userChannel chan string + transferHistoryChannel chan string + transferIpsChannel chan string + snatchChannel chan string + + *sql.DB +} + +func (c *Conn) Start() error { + go c.flushTorrents() + go c.flushUsers() + go c.flushTransferHistory() + go c.flushTransferIps() + go c.flushSnatches() + return nil +} + +func (c *Conn) Close() error { + c.terminate = true + c.waitGroup.Wait() + return c.DB.Close() +} + +func (c *Conn) RecordAnnounce(delta *storage.AnnounceDelta) error { + snatchCount := 0 + if delta.Snatched { + snatchCount = 1 + } + + c.torrentChannel <- fmt.Sprintf( + "('%d','%d','%d','%d','%d')", + delta.Torrent.ID, + snatchCount, + len(delta.Torrent.Seeders), + len(delta.Torrent.Leechers), + delta.Torrent.LastAction, + ) + return nil +} + +func init() { + storage.Register("gazelle", &driver{}) +} diff --git a/storage/gazelle/load.go b/storage/gazelle/load.go new file mode 100644 index 0000000..6b5320e --- /dev/null +++ b/storage/gazelle/load.go @@ -0,0 +1,25 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package gazelle + +import ( + "github.com/pushrax/chihaya/models" +) + +func (c *Conn) LoadTorrents(ids []uint64) ([]*models.Torrent, error) { + return nil, nil +} + +func (c *Conn) LoadAllTorrents() ([]*models.Torrent, error) { + return nil, nil +} + +func (c *Conn) LoadUsers(ids []uint64) ([]*models.User, error) { + return nil, nil +} + +func (c *Conn) LoadAllUsers(ids []uint64) ([]*models.User, error) { + return nil, nil +} diff --git a/storage/storage.go b/storage/storage.go index 10109c0..9949fe2 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -13,9 +13,7 @@ import ( "github.com/pushrax/chihaya/models" ) -var ( - drivers = make(map[string]Driver) -) +var drivers = make(map[string]Driver) type Driver interface { New(*config.DataStore) Conn @@ -49,7 +47,51 @@ func Open(conf *config.DataStore) (Conn, error) { // Conn represents a connection to the data store. type Conn interface { + // Start is called once when the server starts. + // It starts any necessary goroutines a given driver requires, and sets + // up the driver's initial state + Start() error + + // Close terminates connections to the database(s) and gracefully shuts + // down the driver Close() error - UpdateTorrents(t []models.Torrent) error - UpdateUsers(u []models.User) error + + // RecordAnnounce is called once per announce, and is passed the delta in + // statistics for the client peer since its last announce. + RecordAnnounce(delta *AnnounceDelta) error + + // LoadTorrents fetches and returns the specified torrents. + LoadTorrents(ids []uint64) ([]*models.Torrent, error) + + // LoadAllTorrents fetches and returns all torrents. + LoadAllTorrents() ([]*models.Torrent, error) + + // LoadUsers fetches and returns the specified users. + LoadUsers(ids []uint64) ([]*models.User, error) + + // LoadAllUsers fetches and returns all users. + LoadAllUsers(ids []uint64) ([]*models.User, error) +} + +// AnnounceDelta contains a difference in statistics for a peer. +// It is used for communicating changes to be recorded by the storage driver. +type AnnounceDelta struct { + Peer *models.Peer + Torrent *models.Torrent + User *models.User + + // Created is true if this announce created a new peer or changed an existing peer's address + Created bool + + // Uploaded contains the raw upload delta for this announce, in bytes + Uploaded uint64 + + // Downloaded contains the raw download delta for this announce, in bytes + Downloaded uint64 + + // Timestamp is the unix timestamp this announce occurred at + Timestamp float64 + + // Snatched is true if this announce completed the download + Snatched bool }