From 40505091f5a04a031a7f6e2b5b7e89dd71a98420 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Thu, 25 Jul 2013 10:03:04 -0400 Subject: [PATCH] storage reorganized around a pool and transactions --- server/announce.go | 20 +-- server/scrape.go | 27 ++-- server/server.go | 18 +-- storage/redis/redis.go | 266 +++++++++++++++++++++++++++------------ storage/redis/scripts.go | 67 ---------- storage/storage.go | 42 +++---- 6 files changed, 240 insertions(+), 200 deletions(-) delete mode 100644 storage/redis/scripts.go diff --git a/server/announce.go b/server/announce.go index 4914f2f..40d4c72 100644 --- a/server/announce.go +++ b/server/announce.go @@ -23,16 +23,22 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } + // Start a transaction + tx, err := s.dbConnPool.Get() + if err != nil { + log.Panicf("server: %s", err) + } + // Validate the user's passkey passkey, _ := path.Split(r.URL.Path) - user, err := s.FindUser(passkey) + user, err := validateUser(tx, passkey) if err != nil { fail(err, w, r) return } // Check if the user's client is whitelisted - whitelisted, err := s.dataStore.ClientWhitelisted(peerID) + whitelisted, err := tx.ClientWhitelisted(peerID) if err != nil { log.Panicf("server: %s", err) } @@ -42,7 +48,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // Find the specified torrent - torrent, exists, err := s.dataStore.FindTorrent(infohash) + torrent, exists, err := tx.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } @@ -51,15 +57,9 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } - // Begin a data store transaction - tx, err := s.dataStore.Begin() - if err != nil { - log.Panicf("server: %s", err) - } - // If the torrent was pruned and the user is seeding, unprune it if !torrent.Active && left == 0 { - err := tx.Active(torrent) + err := tx.MarkActive(torrent) if err != nil { log.Panicf("server: %s", err) } diff --git a/server/scrape.go b/server/scrape.go index 7285674..6d0777f 100644 --- a/server/scrape.go +++ b/server/scrape.go @@ -15,24 +15,32 @@ import ( ) func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { - passkey, _ := path.Split(r.URL.Path) - _, err := s.FindUser(passkey) - if err != nil { - fail(err, w, r) - return - } - + // Parse the query pq, err := parseQuery(r.URL.RawQuery) if err != nil { fail(errors.New("Error parsing query"), w, r) return } + // Start a transaction + tx, err := s.dbConnPool.Get() + if err != nil { + log.Fatal(err) + } + + // Find and validate the user + passkey, _ := path.Split(r.URL.Path) + _, err = validateUser(tx, passkey) + if err != nil { + fail(err, w, r) + return + } + io.WriteString(w, "d") writeBencoded(w, "files") if pq.Infohashes != nil { for _, infohash := range pq.Infohashes { - torrent, exists, err := s.dataStore.FindTorrent(infohash) + torrent, exists, err := tx.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } @@ -42,7 +50,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { } } } else if infohash, exists := pq.Params["info_hash"]; exists { - torrent, exists, err := s.dataStore.FindTorrent(infohash) + torrent, exists, err := tx.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } @@ -53,6 +61,7 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) { } io.WriteString(w, "e") + // Finish up and write headers r.Close = true w.Header().Add("Content-Type", "text/plain") w.Header().Add("Connection", "close") diff --git a/server/server.go b/server/server.go index 7fda1a6..7b7d893 100644 --- a/server/server.go +++ b/server/server.go @@ -22,9 +22,9 @@ import ( ) type Server struct { - conf *config.Config - listener net.Listener - dataStore storage.DS + conf *config.Config + listener net.Listener + dbConnPool storage.Pool serving bool startTime time.Time @@ -38,14 +38,14 @@ type Server struct { } func New(conf *config.Config) (*Server, error) { - ds, err := storage.Open(&conf.Storage) + pool, err := storage.Open(&conf.Storage) if err != nil { return nil, err } s := &Server{ - conf: conf, - dataStore: ds, + conf: conf, + dbConnPool: pool, Server: http.Server{ Addr: conf.Addr, ReadTimeout: conf.ReadTimeout.Duration, @@ -75,7 +75,7 @@ func (s *Server) ListenAndServe() error { func (s *Server) Stop() error { s.serving = false s.waitgroup.Wait() - err := s.dataStore.Close() + err := s.dbConnPool.Close() if err != nil { return err } @@ -121,13 +121,13 @@ func fail(err error, w http.ResponseWriter, r *http.Request) { w.(http.Flusher).Flush() } -func (s *Server) FindUser(dir string) (*storage.User, error) { +func validateUser(tx storage.Tx, dir string) (*storage.User, error) { if len(dir) != 34 { return nil, errors.New("Passkey is invalid") } passkey := dir[1:33] - user, exists, err := s.dataStore.FindUser(passkey) + user, exists, err := tx.FindUser(passkey) if err != nil { log.Panicf("server: %s", err) } diff --git a/storage/redis/redis.go b/storage/redis/redis.go index 3426645..7e4af33 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -12,6 +12,7 @@ package redis import ( "encoding/json" + "errors" "strings" "time" @@ -23,10 +24,10 @@ import ( type driver struct{} -func (d *driver) New(conf *config.Storage) storage.DS { - return &DS{ +func (d *driver) New(conf *config.Storage) storage.Pool { + return &Pool{ conf: conf, - Pool: redis.Pool{ + pool: redis.Pool{ MaxIdle: conf.MaxIdleConn, IdleTimeout: conf.IdleTimeout.Duration, Dial: makeDialFunc(conf), @@ -65,17 +66,90 @@ func testOnBorrow(c redis.Conn, t time.Time) error { return err } -type DS struct { +type Pool struct { conf *config.Storage - redis.Pool + pool redis.Pool } -func (ds *DS) FindUser(passkey string) (*storage.User, bool, error) { - conn := ds.Get() - defer conn.Close() +func (p *Pool) Close() error { + return p.pool.Close() +} - key := ds.conf.Prefix + "user:" + passkey - reply, err := redis.String(conn.Do("GET", key)) +func (p *Pool) Get() (storage.Tx, error) { + return &Tx{ + conf: p.conf, + done: false, + multi: false, + Conn: p.pool.Get(), + }, nil +} + +// Tx represents a transaction for Redis with one gotcha: +// all reads must be done prior to any writes. Writes will +// check if the MULTI command has been sent to redis and will +// send it if it hasn't. +// +// Internally a transaction looks like: +// WATCH keyA +// GET keyA +// WATCH keyB +// GET keyB +// MULTI +// SET keyA +// SET keyB +// EXEC +type Tx struct { + conf *config.Storage + done bool + multi bool + redis.Conn +} + +func (tx *Tx) close() { + if tx.done { + panic("redis: transaction closed twice") + } + tx.done = true + tx.Conn.Close() +} + +func (tx *Tx) Commit() error { + if tx.done { + return storage.ErrTxDone + } + if tx.multi == true { + _, err := tx.Do("EXEC") + if err != nil { + return err + } + } + tx.close() + return nil +} + +func (tx *Tx) Rollback() error { + if tx.done { + return storage.ErrTxDone + } + // Redis doesn't need to do anything. Exec is atomic. + tx.close() + return nil +} + +func (tx *Tx) FindUser(passkey string) (*storage.User, bool, error) { + if tx.done { + return nil, false, storage.ErrTxDone + } + if tx.multi == true { + return nil, false, errors.New("Tried to read during MULTI") + } + + key := tx.conf.Prefix + "user:" + passkey + _, err := tx.Do("WATCH", key) + if err != nil { + return nil, false, err + } + reply, err := redis.String(tx.Do("GET", key)) if err != nil { if err == redis.ErrNil { return nil, false, nil @@ -91,12 +165,20 @@ func (ds *DS) FindUser(passkey string) (*storage.User, bool, error) { return user, true, nil } -func (ds *DS) FindTorrent(infohash string) (*storage.Torrent, bool, error) { - conn := ds.Get() - defer conn.Close() +func (tx *Tx) FindTorrent(infohash string) (*storage.Torrent, bool, error) { + if tx.done { + return nil, false, storage.ErrTxDone + } + if tx.multi == true { + return nil, false, errors.New("Tried to read during MULTI") + } - key := ds.conf.Prefix + "torrent:" + infohash - reply, err := redis.String(conn.Do("GET", key)) + key := tx.conf.Prefix + "torrent:" + infohash + _, err := tx.Do("WATCH", key) + if err != nil { + return nil, false, err + } + reply, err := redis.String(tx.Do("GET", key)) if err != nil { if err == redis.ErrNil { return nil, false, nil @@ -112,87 +194,65 @@ func (ds *DS) FindTorrent(infohash string) (*storage.Torrent, bool, error) { return torrent, true, nil } -func (ds *DS) ClientWhitelisted(peerID string) (bool, error) { - conn := ds.Get() - defer conn.Close() - - key := ds.conf.Prefix + "whitelist" - exists, err := redis.Bool(conn.Do("SISMEMBER", key, peerID)) - if err != nil { - return false, err - } - return exists, nil -} - -type Tx struct { - conf *config.Storage - done bool - redis.Conn -} - -func (ds *DS) Begin() (storage.Tx, error) { - conn := ds.Get() - err := conn.Send("MULTI") - if err != nil { - return nil, err - } - return &Tx{ - conf: ds.conf, - Conn: conn, - }, nil -} - -func (tx *Tx) close() { +func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) { if tx.done { - panic("redis: transaction closed twice") + return false, storage.ErrTxDone + } + if tx.multi == true { + return false, errors.New("Tried to read during MULTI") } - tx.done = true - tx.Conn.Close() -} -func (tx *Tx) Commit() error { - if tx.done { - return storage.ErrTxDone - } - _, err := tx.Do("EXEC") + key := tx.conf.Prefix + "whitelist" + _, err = tx.Do("WATCH", key) if err != nil { - return err + return } - tx.close() - return nil -} - -// Redis doesn't need to rollback. Exec is atomic. -func (tx *Tx) Rollback() error { - if tx.done { - return storage.ErrTxDone - } - tx.close() - return nil + // TODO + return } func (tx *Tx) Snatch(user *storage.User, torrent *storage.Torrent) error { if tx.done { return storage.ErrTxDone } + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + // TODO return nil } -func (tx *Tx) Active(t *storage.Torrent) error { +func (tx *Tx) MarkActive(t *storage.Torrent) error { if tx.done { return storage.ErrTxDone } - key := tx.conf.Prefix + "torrent:" + t.Infohash - err := activeScript.Send(tx.Conn, key) - return err + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + + // TODO + return nil } func (tx *Tx) NewLeecher(t *storage.Torrent, p *storage.Peer) error { if tx.done { return storage.ErrTxDone } + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + // TODO return nil } @@ -201,6 +261,13 @@ func (tx *Tx) SetLeecher(t *storage.Torrent, p *storage.Peer) error { if tx.done { return storage.ErrTxDone } + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + // TODO return nil } @@ -209,6 +276,13 @@ func (tx *Tx) RmLeecher(t *storage.Torrent, p *storage.Peer) error { if tx.done { return storage.ErrTxDone } + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + // TODO return nil } @@ -217,6 +291,13 @@ func (tx *Tx) NewSeeder(t *storage.Torrent, p *storage.Peer) error { if tx.done { return storage.ErrTxDone } + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + // TODO return nil } @@ -225,6 +306,13 @@ func (tx *Tx) SetSeeder(t *storage.Torrent, p *storage.Peer) error { if tx.done { return storage.ErrTxDone } + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + // TODO return nil } @@ -233,27 +321,45 @@ func (tx *Tx) RmSeeder(t *storage.Torrent, p *storage.Peer) error { if tx.done { return storage.ErrTxDone } - key := tx.conf.Prefix + "torrent:" + t.Infohash - err := rmSeederScript.Send(tx.Conn, key, p.ID) - return err + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + + // TODO + return nil } func (tx *Tx) IncrementSlots(u *storage.User) error { if tx.done { return storage.ErrTxDone } - key := tx.conf.Prefix + "user:" + u.Passkey - err := incSlotsScript.Send(tx.Conn, key) - return err + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + + // TODO + return nil } func (tx *Tx) DecrementSlots(u *storage.User) error { if tx.done { return storage.ErrTxDone } - key := tx.conf.Prefix + "user:" + u.Passkey - err := decSlotsScript.Send(tx.Conn, key) - return err + if tx.multi != true { + err := tx.Send("MULTI") + if err != nil { + return err + } + } + + // TODO + return nil } func init() { diff --git a/storage/redis/scripts.go b/storage/redis/scripts.go deleted file mode 100644 index 341a2eb..0000000 --- a/storage/redis/scripts.go +++ /dev/null @@ -1,67 +0,0 @@ -package redis - -import ( - "github.com/garyburd/redigo/redis" -) - -var incSlotsScript = redis.NewScript(1, incSlotsScriptSrc) - -const incSlotsScriptSrc = ` -if redis.call("exists", keys[1]) == 1 then - local json = redis.call("get", keys[1]) - local user = cjson.decode(json) - user["slots_used"] = user["slots_used"] + 1 - json = cjson.encode(user) - redis.call("set", key, json) - return user["slots_used"] -else - return nil -end -` - -var decSlotsScript = redis.NewScript(1, incSlotsScriptSrc) - -const decSlotsScriptSrc = ` -if redis.call("exists", keys[1]) == 1 then - local json = redis.call("get", keys[1]) - local user = cjson.decode(json) - if user["slots_used"] > 0 - user["slots_used"] = user["slots_used"] - 1 - end - json = cjson.encode(user) - redis.call("set", key, json) - return user["slots_used"] -else - return nil -end -` - -var activeScript = redis.NewScript(1, decSlotsScriptSrc) - -const activeScriptSrc = ` -if redis.call("exists", keys[1]) == 1 then - local json = redis.call("get", keys[1]) - local torrent = cjson.decode(json) - torrent["active"] = true - json = cjson.encode(torrent) - redis.call("set", key, json) - return user["slots_used"] -else - return nil -end -` - -var rmSeederScript = redis.NewScript(2, rmSeederScriptSrc) - -const rmSeederScriptSrc = ` -if redis.call("EXISTS", keys[1]) == 1 then - local json = redis.call("GET", keys[1]) - local torrent = cjson.decode(json) - table.remove(torrent["seeders"], keys[2]) - json = cjson.encode(torrent) - redis.call("SET", key, json) - return 0 -else - return nil -end -` diff --git a/storage/storage.go b/storage/storage.go index d77369e..5a8f1b5 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -19,7 +19,7 @@ var ( ) type Driver interface { - New(*config.Storage) DS + New(*config.Storage) Pool } // Register makes a database driver available by the provided name. @@ -35,8 +35,8 @@ func Register(name string, driver Driver) { drivers[name] = driver } -// Open opens a data store specified by a storage configuration. -func Open(conf *config.Storage) (DS, error) { +// Open creates a pool of data store connections specified by a storage configuration. +func Open(conf *config.Storage) (Pool, error) { driver, ok := drivers[conf.Driver] if !ok { return nil, fmt.Errorf( @@ -48,19 +48,11 @@ func Open(conf *config.Storage) (DS, error) { return pool, nil } -// DS represents a data store handle. It's expected to be safe for concurrent -// use by multiple goroutines. -// -// A pool of connections or a database/sql.DB is a great concrete type to -// implement the DS interface. -type DS interface { +// Pool represents a thread-safe pool of connections to the data store +// that can be used to obtain transactions. +type Pool interface { Close() error - - Begin() (Tx, error) - - FindUser(passkey string) (*User, bool, error) - FindTorrent(infohash string) (*Torrent, bool, error) - ClientWhitelisted(peerID string) (bool, error) + Get() (Tx, error) } // Tx represents an in-progress data store transaction. @@ -72,20 +64,20 @@ type Tx interface { Commit() error Rollback() error - // Torrents + // Reads + FindUser(passkey string) (*User, bool, error) + FindTorrent(infohash string) (*Torrent, bool, error) + ClientWhitelisted(peerID string) (bool, error) + + // Writes Snatch(u *User, t *Torrent) error - Active(t *Torrent) error - - // Peers + MarkActive(t *Torrent) error NewLeecher(t *Torrent, p *Peer) error - SetLeecher(t *Torrent, p *Peer) error - RmLeecher(t *Torrent, p *Peer) error - NewSeeder(t *Torrent, p *Peer) error - SetSeeder(t *Torrent, p *Peer) error + RmLeecher(t *Torrent, p *Peer) error RmSeeder(t *Torrent, p *Peer) error - - // Users + SetLeecher(t *Torrent, p *Peer) error + SetSeeder(t *Torrent, p *Peer) error IncrementSlots(u *User) error DecrementSlots(u *User) error }