storage reorganized around a pool and transactions

This commit is contained in:
Jimmy Zelinskie 2013-07-25 10:03:04 -04:00
parent d62a71847d
commit 40505091f5
6 changed files with 240 additions and 200 deletions

View file

@ -23,16 +23,22 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
// Start a transaction
tx, err := s.dbConnPool.Get()
if err != nil {
log.Panicf("server: %s", err)
}
// Validate the user's passkey // Validate the user's passkey
passkey, _ := path.Split(r.URL.Path) passkey, _ := path.Split(r.URL.Path)
user, err := s.FindUser(passkey) user, err := validateUser(tx, passkey)
if err != nil { if err != nil {
fail(err, w, r) fail(err, w, r)
return return
} }
// Check if the user's client is whitelisted // Check if the user's client is whitelisted
whitelisted, err := s.dataStore.ClientWhitelisted(peerID) whitelisted, err := tx.ClientWhitelisted(peerID)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
@ -42,7 +48,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
} }
// Find the specified torrent // Find the specified torrent
torrent, exists, err := s.dataStore.FindTorrent(infohash) torrent, exists, err := tx.FindTorrent(infohash)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
@ -51,15 +57,9 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
// Begin a data store transaction
tx, err := s.dataStore.Begin()
if err != nil {
log.Panicf("server: %s", err)
}
// If the torrent was pruned and the user is seeding, unprune it // If the torrent was pruned and the user is seeding, unprune it
if !torrent.Active && left == 0 { if !torrent.Active && left == 0 {
err := tx.Active(torrent) err := tx.MarkActive(torrent)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }

View file

@ -15,24 +15,32 @@ import (
) )
func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
passkey, _ := path.Split(r.URL.Path) // Parse the query
_, err := s.FindUser(passkey)
if err != nil {
fail(err, w, r)
return
}
pq, err := parseQuery(r.URL.RawQuery) pq, err := parseQuery(r.URL.RawQuery)
if err != nil { if err != nil {
fail(errors.New("Error parsing query"), w, r) fail(errors.New("Error parsing query"), w, r)
return return
} }
// Start a transaction
tx, err := s.dbConnPool.Get()
if err != nil {
log.Fatal(err)
}
// Find and validate the user
passkey, _ := path.Split(r.URL.Path)
_, err = validateUser(tx, passkey)
if err != nil {
fail(err, w, r)
return
}
io.WriteString(w, "d") io.WriteString(w, "d")
writeBencoded(w, "files") writeBencoded(w, "files")
if pq.Infohashes != nil { if pq.Infohashes != nil {
for _, infohash := range pq.Infohashes { for _, infohash := range pq.Infohashes {
torrent, exists, err := s.dataStore.FindTorrent(infohash) torrent, exists, err := tx.FindTorrent(infohash)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
@ -42,7 +50,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
} }
} }
} else if infohash, exists := pq.Params["info_hash"]; exists { } else if infohash, exists := pq.Params["info_hash"]; exists {
torrent, exists, err := s.dataStore.FindTorrent(infohash) torrent, exists, err := tx.FindTorrent(infohash)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
@ -53,6 +61,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
} }
io.WriteString(w, "e") io.WriteString(w, "e")
// Finish up and write headers
r.Close = true r.Close = true
w.Header().Add("Content-Type", "text/plain") w.Header().Add("Content-Type", "text/plain")
w.Header().Add("Connection", "close") w.Header().Add("Connection", "close")

View file

@ -22,9 +22,9 @@ import (
) )
type Server struct { type Server struct {
conf *config.Config conf *config.Config
listener net.Listener listener net.Listener
dataStore storage.DS dbConnPool storage.Pool
serving bool serving bool
startTime time.Time startTime time.Time
@ -38,14 +38,14 @@ type Server struct {
} }
func New(conf *config.Config) (*Server, error) { func New(conf *config.Config) (*Server, error) {
ds, err := storage.Open(&conf.Storage) pool, err := storage.Open(&conf.Storage)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s := &Server{ s := &Server{
conf: conf, conf: conf,
dataStore: ds, dbConnPool: pool,
Server: http.Server{ Server: http.Server{
Addr: conf.Addr, Addr: conf.Addr,
ReadTimeout: conf.ReadTimeout.Duration, ReadTimeout: conf.ReadTimeout.Duration,
@ -75,7 +75,7 @@ func (s *Server) ListenAndServe() error {
func (s *Server) Stop() error { func (s *Server) Stop() error {
s.serving = false s.serving = false
s.waitgroup.Wait() s.waitgroup.Wait()
err := s.dataStore.Close() err := s.dbConnPool.Close()
if err != nil { if err != nil {
return err return err
} }
@ -121,13 +121,13 @@ func fail(err error, w http.ResponseWriter, r *http.Request) {
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
func (s *Server) FindUser(dir string) (*storage.User, error) { func validateUser(tx storage.Tx, dir string) (*storage.User, error) {
if len(dir) != 34 { if len(dir) != 34 {
return nil, errors.New("Passkey is invalid") return nil, errors.New("Passkey is invalid")
} }
passkey := dir[1:33] passkey := dir[1:33]
user, exists, err := s.dataStore.FindUser(passkey) user, exists, err := tx.FindUser(passkey)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }

View file

@ -12,6 +12,7 @@ package redis
import ( import (
"encoding/json" "encoding/json"
"errors"
"strings" "strings"
"time" "time"
@ -23,10 +24,10 @@ import (
type driver struct{} type driver struct{}
func (d *driver) New(conf *config.Storage) storage.DS { func (d *driver) New(conf *config.Storage) storage.Pool {
return &DS{ return &Pool{
conf: conf, conf: conf,
Pool: redis.Pool{ pool: redis.Pool{
MaxIdle: conf.MaxIdleConn, MaxIdle: conf.MaxIdleConn,
IdleTimeout: conf.IdleTimeout.Duration, IdleTimeout: conf.IdleTimeout.Duration,
Dial: makeDialFunc(conf), Dial: makeDialFunc(conf),
@ -65,17 +66,90 @@ func testOnBorrow(c redis.Conn, t time.Time) error {
return err return err
} }
type DS struct { type Pool struct {
conf *config.Storage conf *config.Storage
redis.Pool pool redis.Pool
} }
func (ds *DS) FindUser(passkey string) (*storage.User, bool, error) { func (p *Pool) Close() error {
conn := ds.Get() return p.pool.Close()
defer conn.Close() }
key := ds.conf.Prefix + "user:" + passkey func (p *Pool) Get() (storage.Tx, error) {
reply, err := redis.String(conn.Do("GET", key)) return &Tx{
conf: p.conf,
done: false,
multi: false,
Conn: p.pool.Get(),
}, nil
}
// Tx represents a transaction for Redis with one gotcha:
// all reads must be done prior to any writes. Writes will
// check if the MULTI command has been sent to redis and will
// send it if it hasn't.
//
// Internally a transaction looks like:
// WATCH keyA
// GET keyA
// WATCH keyB
// GET keyB
// MULTI
// SET keyA
// SET keyB
// EXEC
type Tx struct {
conf *config.Storage
done bool
multi bool
redis.Conn
}
func (tx *Tx) close() {
if tx.done {
panic("redis: transaction closed twice")
}
tx.done = true
tx.Conn.Close()
}
func (tx *Tx) Commit() error {
if tx.done {
return storage.ErrTxDone
}
if tx.multi == true {
_, err := tx.Do("EXEC")
if err != nil {
return err
}
}
tx.close()
return nil
}
func (tx *Tx) Rollback() error {
if tx.done {
return storage.ErrTxDone
}
// Redis doesn't need to do anything. Exec is atomic.
tx.close()
return nil
}
func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) {
if tx.done {
return nil, false, storage.ErrTxDone
}
if tx.multi == true {
return nil, false, errors.New("Tried to read during MULTI")
}
key := tx.conf.Prefix + "user:" + passkey
_, err := tx.Do("WATCH", key)
if err != nil {
return nil, false, err
}
reply, err := redis.String(tx.Do("GET", key))
if err != nil { if err != nil {
if err == redis.ErrNil { if err == redis.ErrNil {
return nil, false, nil return nil, false, nil
@ -91,12 +165,20 @@ func (ds *DS) FindUser(passkey string) (*storage.User, bool, error) {
return user, true, nil return user, true, nil
} }
func (ds *DS) FindTorrent(infohash string) (*storage.Torrent, bool, error) { func (tx *Tx) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
conn := ds.Get() if tx.done {
defer conn.Close() return nil, false, storage.ErrTxDone
}
if tx.multi == true {
return nil, false, errors.New("Tried to read during MULTI")
}
key := ds.conf.Prefix + "torrent:" + infohash key := tx.conf.Prefix + "torrent:" + infohash
reply, err := redis.String(conn.Do("GET", key)) _, err := tx.Do("WATCH", key)
if err != nil {
return nil, false, err
}
reply, err := redis.String(tx.Do("GET", key))
if err != nil { if err != nil {
if err == redis.ErrNil { if err == redis.ErrNil {
return nil, false, nil return nil, false, nil
@ -112,87 +194,65 @@ func (ds *DS) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
return torrent, true, nil return torrent, true, nil
} }
func (ds *DS) ClientWhitelisted(peerID string) (bool, error) { func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) {
conn := ds.Get()
defer conn.Close()
key := ds.conf.Prefix + "whitelist"
exists, err := redis.Bool(conn.Do("SISMEMBER", key, peerID))
if err != nil {
return false, err
}
return exists, nil
}
type Tx struct {
conf *config.Storage
done bool
redis.Conn
}
func (ds *DS) Begin() (storage.Tx, error) {
conn := ds.Get()
err := conn.Send("MULTI")
if err != nil {
return nil, err
}
return &Tx{
conf: ds.conf,
Conn: conn,
}, nil
}
func (tx *Tx) close() {
if tx.done { if tx.done {
panic("redis: transaction closed twice") return false, storage.ErrTxDone
}
if tx.multi == true {
return false, errors.New("Tried to read during MULTI")
} }
tx.done = true
tx.Conn.Close()
}
func (tx *Tx) Commit() error { key := tx.conf.Prefix + "whitelist"
if tx.done { _, err = tx.Do("WATCH", key)
return storage.ErrTxDone
}
_, err := tx.Do("EXEC")
if err != nil { if err != nil {
return err return
} }
tx.close() // TODO
return nil return
}
// Redis doesn't need to rollback. Exec is atomic.
func (tx *Tx) Rollback() error {
if tx.done {
return storage.ErrTxDone
}
tx.close()
return nil
} }
func (tx *Tx) Snatch(user *storage.User, torrent *storage.Torrent) error { func (tx *Tx) Snatch(user *storage.User, torrent *storage.Torrent) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
if tx.multi != true {
err := tx.Send("MULTI")
if err != nil {
return err
}
}
// TODO // TODO
return nil return nil
} }
func (tx *Tx) Active(t *storage.Torrent) error { func (tx *Tx) MarkActive(t *storage.Torrent) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
key := tx.conf.Prefix + "torrent:" + t.Infohash if tx.multi != true {
err := activeScript.Send(tx.Conn, key) err := tx.Send("MULTI")
return err if err != nil {
return err
}
}
// TODO
return nil
} }
func (tx *Tx) NewLeecher(t *storage.Torrent, p *storage.Peer) error { func (tx *Tx) NewLeecher(t *storage.Torrent, p *storage.Peer) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
if tx.multi != true {
err := tx.Send("MULTI")
if err != nil {
return err
}
}
// TODO // TODO
return nil return nil
} }
@ -201,6 +261,13 @@ func (tx *Tx) SetLeecher(t *storage.Torrent, p *storage.Peer) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
if tx.multi != true {
err := tx.Send("MULTI")
if err != nil {
return err
}
}
// TODO // TODO
return nil return nil
} }
@ -209,6 +276,13 @@ func (tx *Tx) RmLeecher(t *storage.Torrent, p *storage.Peer) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
if tx.multi != true {
err := tx.Send("MULTI")
if err != nil {
return err
}
}
// TODO // TODO
return nil return nil
} }
@ -217,6 +291,13 @@ func (tx *Tx) NewSeeder(t *storage.Torrent, p *storage.Peer) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
if tx.multi != true {
err := tx.Send("MULTI")
if err != nil {
return err
}
}
// TODO // TODO
return nil return nil
} }
@ -225,6 +306,13 @@ func (tx *Tx) SetSeeder(t *storage.Torrent, p *storage.Peer) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
if tx.multi != true {
err := tx.Send("MULTI")
if err != nil {
return err
}
}
// TODO // TODO
return nil return nil
} }
@ -233,27 +321,45 @@ func (tx *Tx) RmSeeder(t *storage.Torrent, p *storage.Peer) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
key := tx.conf.Prefix + "torrent:" + t.Infohash if tx.multi != true {
err := rmSeederScript.Send(tx.Conn, key, p.ID) err := tx.Send("MULTI")
return err if err != nil {
return err
}
}
// TODO
return nil
} }
func (tx *Tx) IncrementSlots(u *storage.User) error { func (tx *Tx) IncrementSlots(u *storage.User) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
key := tx.conf.Prefix + "user:" + u.Passkey if tx.multi != true {
err := incSlotsScript.Send(tx.Conn, key) err := tx.Send("MULTI")
return err if err != nil {
return err
}
}
// TODO
return nil
} }
func (tx *Tx) DecrementSlots(u *storage.User) error { func (tx *Tx) DecrementSlots(u *storage.User) error {
if tx.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
key := tx.conf.Prefix + "user:" + u.Passkey if tx.multi != true {
err := decSlotsScript.Send(tx.Conn, key) err := tx.Send("MULTI")
return err if err != nil {
return err
}
}
// TODO
return nil
} }
func init() { func init() {

View file

@ -1,67 +0,0 @@
package redis
import (
"github.com/garyburd/redigo/redis"
)
var incSlotsScript = redis.NewScript(1, incSlotsScriptSrc)
const incSlotsScriptSrc = `
if redis.call("exists", keys[1]) == 1 then
local json = redis.call("get", keys[1])
local user = cjson.decode(json)
user["slots_used"] = user["slots_used"] + 1
json = cjson.encode(user)
redis.call("set", key, json)
return user["slots_used"]
else
return nil
end
`
var decSlotsScript = redis.NewScript(1, incSlotsScriptSrc)
const decSlotsScriptSrc = `
if redis.call("exists", keys[1]) == 1 then
local json = redis.call("get", keys[1])
local user = cjson.decode(json)
if user["slots_used"] > 0
user["slots_used"] = user["slots_used"] - 1
end
json = cjson.encode(user)
redis.call("set", key, json)
return user["slots_used"]
else
return nil
end
`
var activeScript = redis.NewScript(1, decSlotsScriptSrc)
const activeScriptSrc = `
if redis.call("exists", keys[1]) == 1 then
local json = redis.call("get", keys[1])
local torrent = cjson.decode(json)
torrent["active"] = true
json = cjson.encode(torrent)
redis.call("set", key, json)
return user["slots_used"]
else
return nil
end
`
var rmSeederScript = redis.NewScript(2, rmSeederScriptSrc)
const rmSeederScriptSrc = `
if redis.call("EXISTS", keys[1]) == 1 then
local json = redis.call("GET", keys[1])
local torrent = cjson.decode(json)
table.remove(torrent["seeders"], keys[2])
json = cjson.encode(torrent)
redis.call("SET", key, json)
return 0
else
return nil
end
`

View file

@ -19,7 +19,7 @@ var (
) )
type Driver interface { type Driver interface {
New(*config.Storage) DS New(*config.Storage) Pool
} }
// Register makes a database driver available by the provided name. // Register makes a database driver available by the provided name.
@ -35,8 +35,8 @@ func Register(name string, driver Driver) {
drivers[name] = driver drivers[name] = driver
} }
// Open opens a data store specified by a storage configuration. // Open creates a pool of data store connections specified by a storage configuration.
func Open(conf *config.Storage) (DS, error) { func Open(conf *config.Storage) (Pool, error) {
driver, ok := drivers[conf.Driver] driver, ok := drivers[conf.Driver]
if !ok { if !ok {
return nil, fmt.Errorf( return nil, fmt.Errorf(
@ -48,19 +48,11 @@ func Open(conf *config.Storage) (DS, error) {
return pool, nil return pool, nil
} }
// DS represents a data store handle. It's expected to be safe for concurrent // Pool represents a thread-safe pool of connections to the data store
// use by multiple goroutines. // that can be used to obtain transactions.
// type Pool interface {
// A pool of connections or a database/sql.DB is a great concrete type to
// implement the DS interface.
type DS interface {
Close() error Close() error
Get() (Tx, error)
Begin() (Tx, error)
FindUser(passkey string) (*User, bool, error)
FindTorrent(infohash string) (*Torrent, bool, error)
ClientWhitelisted(peerID string) (bool, error)
} }
// Tx represents an in-progress data store transaction. // Tx represents an in-progress data store transaction.
@ -72,20 +64,20 @@ type Tx interface {
Commit() error Commit() error
Rollback() error Rollback() error
// Torrents // Reads
FindUser(passkey string) (*User, bool, error)
FindTorrent(infohash string) (*Torrent, bool, error)
ClientWhitelisted(peerID string) (bool, error)
// Writes
Snatch(u *User, t *Torrent) error Snatch(u *User, t *Torrent) error
Active(t *Torrent) error MarkActive(t *Torrent) error
// Peers
NewLeecher(t *Torrent, p *Peer) error NewLeecher(t *Torrent, p *Peer) error
SetLeecher(t *Torrent, p *Peer) error
RmLeecher(t *Torrent, p *Peer) error
NewSeeder(t *Torrent, p *Peer) error NewSeeder(t *Torrent, p *Peer) error
SetSeeder(t *Torrent, p *Peer) error RmLeecher(t *Torrent, p *Peer) error
RmSeeder(t *Torrent, p *Peer) error RmSeeder(t *Torrent, p *Peer) error
SetLeecher(t *Torrent, p *Peer) error
// Users SetSeeder(t *Torrent, p *Peer) error
IncrementSlots(u *User) error IncrementSlots(u *User) error
DecrementSlots(u *User) error DecrementSlots(u *User) error
} }