more redis work

This commit is contained in:
Jimmy Zelinskie 2013-06-25 21:58:06 -04:00
parent 56132e3d64
commit 2759dd6e2f
4 changed files with 124 additions and 45 deletions

View file

@ -39,6 +39,7 @@ type Storage struct {
Password string `json:"pass"`
Schema string `json:"schema,omitempty"`
Encoding string `json:"encoding,omitempty"`
Prefix string `json:"prefix,omitempty"`
ConnectTimeout *Duration `json:"conn_timeout,omitempty"`
ReadTimeout *Duration `json:"read_timeout,omitempty"`

View file

@ -7,6 +7,7 @@
"addr": "127.0.0.1:6379",
"user": "root",
"pass": "",
"prefix": "test:",
"conn_timeout": "5s",
"read_timeout": "5s",

View file

@ -5,6 +5,8 @@
package redis
import (
"time"
"github.com/garyburd/redigo/redis"
"github.com/pushrax/chihaya/config"
@ -14,35 +16,108 @@ import (
type driver struct{}
func (d *driver) New(conf *config.Storage) (storage.Conn, error) {
var (
conn redis.Conn
err error
)
if conf.ConnectTimeout != nil &&
conf.ReadTimeout != nil &&
conf.WriteTimeout != nil {
conn, err = redis.DialTimeout(
conf.Network,
conf.Addr,
conf.ConnectTimeout.Duration,
conf.ReadTimeout.Duration,
conf.WriteTimeout.Duration,
)
} else {
conn, err = redis.Dial(conf.Network, conf.Addr)
}
if err != nil {
return nil, err
}
return &Conn{
conn,
conf: conf,
pool: &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
var (
conn redis.Conn
err error
)
if conf.ConnectTimeout != nil &&
conf.ReadTimeout != nil &&
conf.WriteTimeout != nil {
conn, err = redis.DialTimeout(
conf.Network,
conf.Addr,
conf.ConnectTimeout.Duration,
conf.ReadTimeout.Duration,
conf.WriteTimeout.Duration,
)
} else {
conn, err = redis.Dial(conf.Network, conf.Addr)
}
if err != nil {
return nil, err
}
return conn, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
},
}, nil
}
type Conn struct {
conn redis.Conn
conf *config.Storage
pool *redis.Pool
}
func (c *Conn) Close() error {
return c.pool.Close()
}
func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) {
conn := c.pool.Get()
defer c.pool.Close()
key := c.conf.Prefix + "User:" + passkey
exists, err := redis.Bool(conn.Do("EXISTS", key))
if err != nil {
return nil, false, err
}
if !exists {
return nil, false, nil
}
reply, err := redis.Values(conn.Do("HGETALL", key))
if err != nil {
return nil, false, err
}
user := &storage.User{}
err = redis.ScanStruct(reply, user)
if err != nil {
return nil, false, err
}
return user, true, nil
}
func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
conn := c.pool.Get()
defer c.pool.Close()
key := c.conf.Prefix + "Torrent:" + infohash
exists, err := redis.Bool(conn.Do("EXISTS", key))
if err != nil {
return nil, false, err
}
if !exists {
return nil, false, nil
}
reply, err := redis.Values(conn.Do("HGETALL", key))
if err != nil {
return nil, false, err
}
torrent := &storage.Torrent{}
err = redis.ScanStruct(reply, torrent)
if err != nil {
return nil, false, err
}
return torrent, true, nil
}
func (c *Conn) UnpruneTorrent(torrent *storage.Torrent) error {
// TODO
return nil
}
func init() {

View file

@ -12,13 +12,13 @@ import (
"github.com/pushrax/chihaya/config"
)
var drivers = make(map[string]StorageDriver)
var drivers = make(map[string]Driver)
type StorageDriver interface {
type Driver interface {
New(*config.Storage) (Conn, error)
}
func Register(name string, driver StorageDriver) {
func Register(name string, driver Driver) {
if driver == nil {
panic("storage: Register driver is nil")
}
@ -50,22 +50,24 @@ type Conn interface {
FindTorrent(infohash string) (*Torrent, bool, error)
UnpruneTorrent(torrent *Torrent) error
RecordUser(
user *User,
rawDeltaUpload int64,
rawDeltaDownload int64,
deltaUpload int64,
deltaDownload int64,
) error
RecordSnatch(peer *Peer, now int64) error
RecordTorrent(torrent *Torrent, deltaSnatch uint64) error
RecordTransferIP(peer *Peer) error
RecordTransferHistory(
peer *Peer,
rawDeltaUpload int64,
rawDeltaDownload int64,
deltaTime int64,
deltaSnatch uint64,
active bool,
) error
/*
RecordUser(
user *User,
rawDeltaUpload int64,
rawDeltaDownload int64,
deltaUpload int64,
deltaDownload int64,
) error
RecordSnatch(peer *Peer, now int64) error
RecordTorrent(torrent *Torrent, deltaSnatch uint64) error
RecordTransferIP(peer *Peer) error
RecordTransferHistory(
peer *Peer,
rawDeltaUpload int64,
rawDeltaDownload int64,
deltaTime int64,
deltaSnatch uint64,
active bool,
) error
*/
}