Merge branch 'gazelle-storage-driver'

This commit is contained in:
Justin Li 2013-09-07 18:27:22 -04:00
commit b00c4f44ca
13 changed files with 297 additions and 32 deletions

4
cache/cache.go vendored
View file

@ -15,8 +15,8 @@ import (
)
var (
drivers = make(map[string]Driver)
ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back")
drivers = make(map[string]Driver)
ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back")
ErrTxConflict = errors.New("cache: Commit interrupted, update transaction and repeat")
)

View file

@ -31,7 +31,7 @@ func (d *driver) New(conf *config.DataStore) cache.Pool {
return &Pool{
conf: conf,
pool: redis.Pool{
MaxIdle: conf.MaxIdleConn,
MaxIdle: conf.MaxIdleConns,
IdleTimeout: conf.IdleTimeout.Duration,
Dial: makeDialFunc(conf),
TestOnBorrow: testOnBorrow,

View file

@ -47,7 +47,7 @@ func createTestTxObj(t TestReporter) *Tx {
testPool := &Pool{
conf: conf,
pool: redis.Pool{
MaxIdle: conf.MaxIdleConn,
MaxIdle: conf.MaxIdleConns,
IdleTimeout: conf.IdleTimeout.Duration,
Dial: makeDialFunc(conf),
TestOnBorrow: testOnBorrow,

View file

@ -39,8 +39,8 @@ type DataStore struct {
Encoding string `json:"encoding,omitempty"`
Prefix string `json:"prefix,omitempty"`
MaxIdleConn int `json:"max_idle_conn,omitempty"`
IdleTimeout *Duration `json:"idle_timeout,omitempty"`
MaxIdleConns int `json:"max_idle_conns,omitempty"`
IdleTimeout *Duration `json:"idle_timeout,omitempty"`
}
// Config represents a configuration for a server.Server.

View file

@ -12,7 +12,7 @@
"pass": "",
"prefix": "test:",
"max_idle_conn": 3,
"max_idle_conns": 3,
"idle_timeout": "240s"
},

View file

@ -12,9 +12,12 @@ import (
"runtime"
"runtime/pprof"
_ "github.com/pushrax/chihaya/cache/redis"
"github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/server"
_ "github.com/pushrax/chihaya/cache/redis"
_ "github.com/pushrax/chihaya/storage/batter"
_ "github.com/pushrax/chihaya/storage/gazelle"
)
var (

View file

@ -5,11 +5,13 @@
package models
type Peer struct {
ID string `json:"id"`
UserID uint64 `json:"user_id"`
TorrentID uint64 `json:"torrent_id"`
IP string `json:"ip"`
Port uint64 `json:"port"`
ID string `json:"id"`
UserID uint64 `json:"user_id"`
TorrentID uint64 `json:"torrent_id"`
IP string `json:"ip"`
Port uint64 `json:"port"`
Uploaded uint64 `json:"uploaded"`
Downloaded uint64 `json:"downloaded`
Left uint64 `json:"left"`
@ -17,20 +19,23 @@ type Peer struct {
}
type Torrent struct {
ID uint64 `json:"id"`
Infohash string `json:"infohash"`
Active bool `json:"active"`
Seeders map[string]Peer `json:"seeders"`
Leechers map[string]Peer `json:"leechers"`
Snatches uint `json:"snatches"`
UpMultiplier float64 `json:"up_multiplier"`
DownMultiplier float64 `json:"down_multiplier"`
LastAction int64 `json:"last_action"`
ID uint64 `json:"id"`
Infohash string `json:"infohash"`
Active bool `json:"active"`
Seeders map[string]Peer `json:"seeders"`
Leechers map[string]Peer `json:"leechers"`
Snatches uint `json:"snatches"`
UpMultiplier float64 `json:"up_multiplier"`
DownMultiplier float64 `json:"down_multiplier"`
LastAction int64 `json:"last_action"`
}
type User struct {
ID uint64 `json:"id"`
Passkey string `json:"passkey"`
ID uint64 `json:"id"`
Passkey string `json:"passkey"`
UpMultiplier float64 `json:"up_multiplier"`
DownMultiplier float64 `json:"down_multiplier"`
Slots int64 `json:"slots"`

View file

@ -11,7 +11,6 @@ import (
"fmt"
"github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/models"
"github.com/pushrax/chihaya/storage"
_ "github.com/bmizerany/pq"
@ -32,6 +31,11 @@ func (d *driver) New(conf *config.DataStore) storage.Conn {
if err != nil {
panic("batter: failed to open connection to postgres")
}
if conf.MaxIdleConns != 0 {
db.SetMaxIdleConns(conf.MaxIdleConns)
}
return &Conn{db}
}
@ -39,11 +43,11 @@ type Conn struct {
*sql.DB
}
func (c *Conn) UpdateTorrents(t []models.Torrent) error {
func (c *Conn) Start() error {
return nil
}
func (c *Conn) UpdateUsers(u []models.User) error {
func (c *Conn) RecordAnnounce(delta *storage.AnnounceDelta) error {
return nil
}

25
storage/batter/load.go Normal file
View file

@ -0,0 +1,25 @@
// Copyright 2013 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package batter
import (
"github.com/pushrax/chihaya/models"
)
func (c *Conn) LoadTorrents(ids []uint64) ([]*models.Torrent, error) {
return nil, nil
}
func (c *Conn) LoadAllTorrents() ([]*models.Torrent, error) {
return nil, nil
}
func (c *Conn) LoadUsers(ids []uint64) ([]*models.User, error) {
return nil, nil
}
func (c *Conn) LoadAllUsers(ids []uint64) ([]*models.User, error) {
return nil, nil
}

62
storage/gazelle/flush.go Normal file
View file

@ -0,0 +1,62 @@
// Copyright 2013 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package gazelle
import (
"bytes"
"log"
"time"
)
func (c *Conn) flushTorrents() {
var query bytes.Buffer
c.waitGroup.Add(1)
defer c.waitGroup.Done()
var count int
for {
length := len(c.torrentChannel)
query.Reset()
query.WriteString("INSERT INTO torrents (ID, Snatched, Seeders, Leechers, last_action) VALUES\n")
for count = 0; count < length; count++ {
s := <-c.torrentChannel
if s == "" {
break
}
query.WriteString(s)
if count != length-1 {
query.WriteRune(',')
}
}
if !c.terminate {
log.Printf("[torrents] Flushing %d\n", count)
}
if count > 0 {
query.WriteString("\nON DUPLICATE KEY UPDATE Snatched = Snatched + VALUES(Snatched), " +
"Seeders = VALUES(Seeders), Leechers = VALUES(Leechers), " +
"last_action = IF(last_action < VALUES(last_action), VALUES(last_action), last_action);")
c.Exec(query.String())
if length < cap(c.torrentChannel)/2 {
time.Sleep(200 * time.Millisecond)
}
} else if c.terminate {
break
} else {
time.Sleep(time.Second)
}
}
}
func (c *Conn) flushUsers() {}
func (c *Conn) flushTransferHistory() {}
func (c *Conn) flushTransferIps() {}
func (c *Conn) flushSnatches() {}

View file

@ -0,0 +1,99 @@
// Copyright 2013 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package gazelle provides a driver for a BitTorrent tracker to interface
// with the MySQL database used by Gazelle (github.com/WhatCD/Gazelle).
package gazelle
import (
"database/sql"
"fmt"
"sync"
"github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/storage"
_ "github.com/go-sql-driver/mysql"
)
type driver struct{}
func (d *driver) New(conf *config.DataStore) storage.Conn {
dsn := fmt.Sprintf(
"%s:%s@%s:%s/%s?charset=utf8mb4,utf8",
conf.Username,
conf.Password,
conf.Host,
conf.Port,
conf.Schema,
)
db, err := sql.Open("mysql", dsn)
if err != nil {
panic("gazelle: failed to open connection to MySQL")
}
if conf.MaxIdleConns != 0 {
db.SetMaxIdleConns(conf.MaxIdleConns)
}
conn := &Conn{DB: db}
// TODO Buffer sizes
conn.torrentChannel = make(chan string, 1000)
conn.userChannel = make(chan string, 1000)
conn.transferHistoryChannel = make(chan string, 1000)
conn.transferIpsChannel = make(chan string, 1000)
conn.snatchChannel = make(chan string, 100)
return conn
}
type Conn struct {
waitGroup sync.WaitGroup
terminate bool
torrentChannel chan string
userChannel chan string
transferHistoryChannel chan string
transferIpsChannel chan string
snatchChannel chan string
*sql.DB
}
func (c *Conn) Start() error {
go c.flushTorrents()
go c.flushUsers()
go c.flushTransferHistory()
go c.flushTransferIps()
go c.flushSnatches()
return nil
}
func (c *Conn) Close() error {
c.terminate = true
c.waitGroup.Wait()
return c.DB.Close()
}
func (c *Conn) RecordAnnounce(delta *storage.AnnounceDelta) error {
snatchCount := 0
if delta.Snatched {
snatchCount = 1
}
c.torrentChannel <- fmt.Sprintf(
"('%d','%d','%d','%d','%d')",
delta.Torrent.ID,
snatchCount,
len(delta.Torrent.Seeders),
len(delta.Torrent.Leechers),
delta.Torrent.LastAction,
)
return nil
}
func init() {
storage.Register("gazelle", &driver{})
}

25
storage/gazelle/load.go Normal file
View file

@ -0,0 +1,25 @@
// Copyright 2013 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package gazelle
import (
"github.com/pushrax/chihaya/models"
)
func (c *Conn) LoadTorrents(ids []uint64) ([]*models.Torrent, error) {
return nil, nil
}
func (c *Conn) LoadAllTorrents() ([]*models.Torrent, error) {
return nil, nil
}
func (c *Conn) LoadUsers(ids []uint64) ([]*models.User, error) {
return nil, nil
}
func (c *Conn) LoadAllUsers(ids []uint64) ([]*models.User, error) {
return nil, nil
}

View file

@ -13,9 +13,7 @@ import (
"github.com/pushrax/chihaya/models"
)
var (
drivers = make(map[string]Driver)
)
var drivers = make(map[string]Driver)
type Driver interface {
New(*config.DataStore) Conn
@ -49,7 +47,51 @@ func Open(conf *config.DataStore) (Conn, error) {
// Conn represents a connection to the data store.
type Conn interface {
// Start is called once when the server starts.
// It starts any necessary goroutines a given driver requires, and sets
// up the driver's initial state
Start() error
// Close terminates connections to the database(s) and gracefully shuts
// down the driver
Close() error
UpdateTorrents(t []models.Torrent) error
UpdateUsers(u []models.User) error
// RecordAnnounce is called once per announce, and is passed the delta in
// statistics for the client peer since its last announce.
RecordAnnounce(delta *AnnounceDelta) error
// LoadTorrents fetches and returns the specified torrents.
LoadTorrents(ids []uint64) ([]*models.Torrent, error)
// LoadAllTorrents fetches and returns all torrents.
LoadAllTorrents() ([]*models.Torrent, error)
// LoadUsers fetches and returns the specified users.
LoadUsers(ids []uint64) ([]*models.User, error)
// LoadAllUsers fetches and returns all users.
LoadAllUsers(ids []uint64) ([]*models.User, error)
}
// AnnounceDelta contains a difference in statistics for a peer.
// It is used for communicating changes to be recorded by the storage driver.
type AnnounceDelta struct {
Peer *models.Peer
Torrent *models.Torrent
User *models.User
// Created is true if this announce created a new peer or changed an existing peer's address
Created bool
// Uploaded contains the raw upload delta for this announce, in bytes
Uploaded uint64
// Downloaded contains the raw download delta for this announce, in bytes
Downloaded uint64
// Timestamp is the unix timestamp this announce occurred at
Timestamp float64
// Snatched is true if this announce completed the download
Snatched bool
}