Implemented redis cache driver using redis types

This commit is contained in:
cpb8010 2013-09-06 20:59:12 -04:00
parent b00c4f44ca
commit cd6e4abec0
4 changed files with 362 additions and 427 deletions

4
cache/cache.go vendored
View file

@ -82,4 +82,8 @@ type Tx interface {
SetSeeder(t *models.Torrent, p *models.Peer) error SetSeeder(t *models.Torrent, p *models.Peer) error
IncrementSlots(u *models.User) error IncrementSlots(u *models.User) error
DecrementSlots(u *models.User) error DecrementSlots(u *models.User) error
AddTorrent(t *models.Torrent) error
RemoveTorrent(t *models.Torrent) error
AddUser(u *models.User) error
RemoveUser(u *models.User) error
} }

428
cache/redis/redis.go vendored
View file

@ -14,7 +14,8 @@
package redis package redis
import ( import (
"encoding/json" "errors"
"strconv"
"strings" "strings"
"time" "time"
@ -25,6 +26,18 @@ import (
"github.com/pushrax/chihaya/models" "github.com/pushrax/chihaya/models"
) )
var (
ErrCreateUser = errors.New("redis: Incorrect reply length for user")
ErrCreateTorrent = errors.New("redis: Incorrect reply length for torrent")
ErrCreatePeer = errors.New("redis: Incorrect reply length for peer")
ErrMarkActive = errors.New("redis: Torrent doesn't exist")
SeederSuffix = ":seeders"
LeecherSuffix = ":leechers"
TorrentPrefix = "torrent:"
UserPrefix = "user:"
)
type driver struct{} type driver struct{}
func (d *driver) New(conf *config.DataStore) cache.Pool { func (d *driver) New(conf *config.DataStore) cache.Pool {
@ -65,27 +78,12 @@ func (p *Pool) Close() error {
func (p *Pool) Get() (cache.Tx, error) { func (p *Pool) Get() (cache.Tx, error) {
return &Tx{ return &Tx{
conf: p.conf, conf: p.conf,
done: false, done: false,
multi: false, Conn: p.pool.Get(),
Conn: p.pool.Get(),
}, nil }, nil
} }
// Tx represents a transaction for Redis with one gotcha:
// all reads must be done prior to any writes. Writes will
// check if the MULTI command has been sent to redis and will
// send it if it hasn't.
//
// Internally a transaction looks like:
// WATCH keyA
// GET keyA
// WATCH keyB
// GET keyB
// MULTI
// SET keyA
// SET keyB
// EXEC
type Tx struct { type Tx struct {
conf *config.DataStore conf *config.DataStore
done bool done bool
@ -101,27 +99,6 @@ func (tx *Tx) close() {
tx.Conn.Close() tx.Conn.Close()
} }
func (tx *Tx) initiateWrite() error {
if tx.done {
return cache.ErrTxDone
}
if tx.multi != true {
tx.multi = true
return tx.Send("MULTI")
}
return nil
}
func (tx *Tx) initiateRead() error {
if tx.done {
return cache.ErrTxDone
}
if tx.multi == true {
panic("Tried to read during MULTI")
}
return nil
}
func (tx *Tx) Commit() error { func (tx *Tx) Commit() error {
if tx.done { if tx.done {
return cache.ErrTxDone return cache.ErrTxDone
@ -153,163 +130,324 @@ func (tx *Tx) Rollback() error {
return nil return nil
} }
func (tx *Tx) FindUser(passkey string) (*models.User, bool, error) { func createUser(userVals []string) (*models.User, error) {
err := tx.initiateRead() if len(userVals) != 7 {
if err != nil { return nil, ErrCreateUser
return nil, false, err
} }
// This could be a loop+switch
key := tx.conf.Prefix + "user:" + passkey ID, err := strconv.ParseUint(userVals[0], 10, 64)
_, err = tx.Do("WATCH", key)
if err != nil { if err != nil {
return nil, false, err return nil, err
} }
reply, err := redis.String(tx.Do("GET", key)) Passkey := userVals[1]
UpMultiplier, err := strconv.ParseFloat(userVals[2], 64)
if err != nil { if err != nil {
if err == redis.ErrNil { return nil, err
return nil, false, nil
}
return nil, false, err
} }
DownMultiplier, err := strconv.ParseFloat(userVals[3], 64)
user := &models.User{}
err = json.NewDecoder(strings.NewReader(reply)).Decode(user)
if err != nil { if err != nil {
return nil, true, err return nil, err
} }
return user, true, nil Slots, err := strconv.ParseInt(userVals[4], 10, 64)
if err != nil {
return nil, err
}
SlotsUsed, err := strconv.ParseInt(userVals[5], 10, 64)
if err != nil {
return nil, err
}
Snatches, err := strconv.ParseUint(userVals[6], 10, 64)
if err != nil {
return nil, err
}
return &models.User{ID, Passkey, UpMultiplier, DownMultiplier, Slots, SlotsUsed, uint(Snatches)}, nil
} }
func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) { func createTorrent(torrentVals []string, seeders map[string]models.Peer, leechers map[string]models.Peer) (*models.Torrent, error) {
err := tx.initiateRead() if len(torrentVals) != 7 {
if err != nil { return nil, ErrCreateTorrent
return nil, false, err
} }
ID, err := strconv.ParseUint(torrentVals[0], 10, 64)
if err != nil {
return nil, err
}
Infohash := torrentVals[1]
Active, err := strconv.ParseBool(torrentVals[2])
if err != nil {
return nil, err
}
Snatches, err := strconv.ParseUint(torrentVals[3], 10, 32)
if err != nil {
return nil, err
}
UpMultiplier, err := strconv.ParseFloat(torrentVals[4], 64)
if err != nil {
return nil, err
}
DownMultiplier, err := strconv.ParseFloat(torrentVals[5], 64)
if err != nil {
return nil, err
}
LastAction, err := strconv.ParseInt(torrentVals[6], 10, 64)
if err != nil {
return nil, err
}
return &models.Torrent{ID, Infohash, Active, seeders, leechers, uint(Snatches), UpMultiplier, DownMultiplier, LastAction}, nil
key := tx.conf.Prefix + "torrent:" + infohash
_, err = tx.Do("WATCH", key)
if err != nil {
return nil, false, err
}
reply, err := redis.String(tx.Do("GET", key))
if err != nil {
if err == redis.ErrNil {
return nil, false, nil
}
return nil, false, err
}
torrent := &models.Torrent{}
err = json.NewDecoder(strings.NewReader(reply)).Decode(torrent)
if err != nil {
return nil, true, err
}
return torrent, true, nil
} }
func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) { // Prevents adding duplicate peers, and doesn't return error on dup add
err = tx.initiateRead() func (tx *Tx) addPeer(infohash string, peer *models.Peer, suffix string) error {
setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix
_, err := tx.Do("SADD", setKey, *peer)
if err != nil { if err != nil {
return false, err
}
key := tx.conf.Prefix + "whitelist"
_, err = tx.Do("WATCH", key)
if err != nil {
return
}
// TODO
return
}
func (tx *Tx) RecordSnatch(user *models.User, torrent *models.Torrent) error {
if err := tx.initiateWrite(); err != nil {
return err return err
} }
// TODO
return nil return nil
} }
func (tx *Tx) MarkActive(t *models.Torrent) error { // Will not return an error if the peer doesn't exist
if err := tx.initiateWrite(); err != nil { func (tx *Tx) removePeer(infohash string, peer *models.Peer, suffix string) error {
setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix
_, err := tx.Do("SREM", setKey, *peer)
if err != nil {
return err
}
return nil
}
func (tx *Tx) addPeers(infohash string, peers map[string]models.Peer, suffix string) error {
setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix
for _, peer := range peers {
err := tx.Send("SADD", setKey, peer)
if err != nil {
return err
}
}
tx.Flush()
tx.Receive()
return nil
}
func createPeer(peerString string) (*models.Peer, error) {
peerVals := strings.Split(strings.Trim(peerString, "{}"), " ")
if len(peerVals) != 9 {
return nil, ErrCreatePeer
}
ID := peerVals[0]
UserID, err := strconv.ParseUint(peerVals[1], 10, 64)
if err != nil {
return nil, err
}
TorrentID, err := strconv.ParseUint(peerVals[2], 10, 64)
if err != nil {
return nil, err
}
IP := peerVals[3]
Port, err := strconv.ParseUint(peerVals[4], 10, 64)
if err != nil {
return nil, err
}
Uploaded, err := strconv.ParseUint(peerVals[5], 10, 64)
if err != nil {
return nil, err
}
Downloaded, err := strconv.ParseUint(peerVals[6], 10, 64)
if err != nil {
return nil, err
}
Left, err := strconv.ParseUint(peerVals[7], 10, 64)
if err != nil {
return nil, err
}
LastAnnounce, err := strconv.ParseInt(peerVals[8], 10, 64)
if err != nil {
return nil, err
}
return &models.Peer{ID, UserID, TorrentID, IP, Port, Uploaded, Downloaded, Left, LastAnnounce}, nil
}
func (tx *Tx) getPeers(infohash string, suffix string) (peers map[string]models.Peer, err error) {
peers = make(map[string]models.Peer)
setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix
peerStrings, err := redis.Strings(tx.Do("SMEMBERS", setKey))
for peerIndex := range peerStrings {
peer, err := createPeer(peerStrings[peerIndex])
if err != nil {
return nil, err
}
peers[peer.ID] = *peer
}
return
}
func (tx *Tx) AddTorrent(t *models.Torrent) error {
hashkey := tx.conf.Prefix + TorrentPrefix + t.Infohash
_, err := tx.Do("HMSET", hashkey,
"id", t.ID,
"infohash", t.Infohash,
"active", t.Active,
"snatches", t.Snatches,
"up_multiplier", t.UpMultiplier,
"down_multiplier", t.DownMultiplier,
"last_action", t.LastAction)
if err != nil {
return err return err
} }
// TODO tx.addPeers(t.Infohash, t.Seeders, SeederSuffix)
tx.addPeers(t.Infohash, t.Leechers, LeecherSuffix)
return nil
}
func (tx *Tx) RemoveTorrent(t *models.Torrent) error {
hashkey := tx.conf.Prefix + TorrentPrefix + t.Infohash
_, err := tx.Do("HDEL", hashkey)
if err != nil {
return err
}
return nil
}
func (tx *Tx) AddUser(u *models.User) error {
hashkey := tx.conf.Prefix + UserPrefix + u.Passkey
_, err := tx.Do("HMSET", hashkey,
"id", u.ID,
"passkey", u.Passkey,
"up_multiplier", u.UpMultiplier,
"down_multiplier", u.DownMultiplier,
"slots", u.Slots,
"slots_used", u.SlotsUsed,
"snatches", u.Snatches)
if err != nil {
return err
}
return nil
}
func (tx *Tx) RemoveUser(u *models.User) error {
hashkey := tx.conf.Prefix + UserPrefix + u.Passkey
_, err := tx.Do("HDEL", hashkey)
if err != nil {
return err
}
return nil
}
func (tx *Tx) FindUser(passkey string) (*models.User, bool, error) {
hashkey := tx.conf.Prefix + UserPrefix + passkey
userStrings, err := redis.Strings(tx.Do("HVALS", hashkey))
if err != nil {
return nil, false, err
} else if len(userStrings) == 0 {
return nil, false, nil
}
foundUser, err := createUser(userStrings)
if err != nil {
return nil, false, err
}
return foundUser, true, nil
}
func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) {
hashkey := tx.conf.Prefix + TorrentPrefix + infohash
torrentStrings, err := redis.Strings(tx.Do("HVALS", hashkey))
if err != nil {
return nil, false, err
} else if len(torrentStrings) == 0 {
return nil, false, nil
}
seeders, err := tx.getPeers(infohash, SeederSuffix)
leechers, err := tx.getPeers(infohash, LeecherSuffix)
foundTorrent, err := createTorrent(torrentStrings, seeders, leechers)
if err != nil {
return nil, false, err
}
return foundTorrent, true, nil
}
func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) {
key := tx.conf.Prefix + "whitelist"
return redis.Bool(tx.Do("ISMEMBER", key, peerID))
}
// This is a mulple action command, it's not internally atomic
func (tx *Tx) RecordSnatch(user *models.User, torrent *models.Torrent) error {
torrentKey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash
snatchCount, err := redis.Int(tx.Do("HINCRBY", torrentKey, 1))
if err != nil {
return err
}
torrent.Snatches = uint(snatchCount)
userKey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash
snatchCount, err = redis.Int(tx.Do("HINCRBY", userKey, 1))
if err != nil {
return err
}
user.Snatches = uint(snatchCount)
return nil
}
func (tx *Tx) MarkActive(torrent *models.Torrent) error {
hashkey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash
activeExists, err := redis.Int(tx.Do("HSET", hashkey, true))
if err != nil {
return err
}
// HSET returns 1 if hash didn't exist before
if activeExists == 1 {
return ErrMarkActive
}
return nil return nil
} }
func (tx *Tx) AddLeecher(t *models.Torrent, p *models.Peer) error { func (tx *Tx) AddLeecher(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil { return tx.addPeer(t.Infohash, p, LeecherSuffix)
return err
}
// TODO
return nil
} }
func (tx *Tx) SetLeecher(t *models.Torrent, p *models.Peer) error { func (tx *Tx) SetLeecher(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil { return tx.addPeer(t.Infohash, p, LeecherSuffix)
return err
}
// TODO
return nil
} }
func (tx *Tx) RemoveLeecher(t *models.Torrent, p *models.Peer) error { func (tx *Tx) RemoveLeecher(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil { return tx.removePeer(t.Infohash, p, LeecherSuffix)
return err
}
// TODO
return nil
} }
func (tx *Tx) AddSeeder(t *models.Torrent, p *models.Peer) error { func (tx *Tx) AddSeeder(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil { return tx.addPeer(t.Infohash, p, SeederSuffix)
return err
}
// TODO
return nil
} }
func (tx *Tx) SetSeeder(t *models.Torrent, p *models.Peer) error { func (tx *Tx) SetSeeder(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil { return tx.addPeer(t.Infohash, p, SeederSuffix)
return err
}
// TODO
return nil
} }
func (tx *Tx) RemoveSeeder(t *models.Torrent, p *models.Peer) error { func (tx *Tx) RemoveSeeder(t *models.Torrent, p *models.Peer) error {
if err := tx.initiateWrite(); err != nil { return tx.removePeer(t.Infohash, p, SeederSuffix)
return err
}
// TODO
return nil
} }
func (tx *Tx) IncrementSlots(u *models.User) error { func (tx *Tx) IncrementSlots(u *models.User) error {
if err := tx.initiateWrite(); err != nil { hashkey := tx.conf.Prefix + UserPrefix + u.Passkey
slotCount, err := redis.Int(tx.Do("HINCRBY", hashkey, 1))
if err != nil {
return err return err
} }
u.Slots = int64(slotCount)
// TODO
return nil return nil
} }
func (tx *Tx) DecrementSlots(u *models.User) error { func (tx *Tx) DecrementSlots(u *models.User) error {
if err := tx.initiateWrite(); err != nil { hashkey := tx.conf.Prefix + UserPrefix + u.Passkey
slotCount, err := redis.Int(tx.Do("HINCRBY", hashkey, -1))
if err != nil {
return err return err
} }
u.Slots = int64(slotCount)
// TODO
return nil return nil
} }

View file

@ -5,13 +5,8 @@
package redis package redis
import ( import (
"encoding/json"
"math/rand"
"os" "os"
"strconv"
"strings"
"testing" "testing"
"time"
"github.com/garyburd/redigo/redis" "github.com/garyburd/redigo/redis"
@ -22,6 +17,7 @@ import (
// Maximum number of parallel retries; depends on system latency // Maximum number of parallel retries; depends on system latency
const MAX_RETRIES = 9000 const MAX_RETRIES = 9000
const sample_infohash = "58c290f4ea1efb3adcb8c1ed2643232117577bcd" const sample_infohash = "58c290f4ea1efb3adcb8c1ed2643232117577bcd"
const sample_passkey = "32426b162be0bce5428e7e36afaf734ae5afb355" const sample_passkey = "32426b162be0bce5428e7e36afaf734ae5afb355"
@ -39,6 +35,28 @@ func verifyErrNil(err error, t TestReporter) {
} }
} }
// Legacy JSON support for benching
func (tx *Tx) initiateWrite() error {
if tx.done {
return cache.ErrTxDone
}
if tx.multi != true {
tx.multi = true
return tx.Send("MULTI")
}
return nil
}
func (tx *Tx) initiateRead() error {
if tx.done {
return cache.ErrTxDone
}
if tx.multi == true {
panic("Tried to read during MULTI")
}
return nil
}
func createTestTxObj(t TestReporter) *Tx { func createTestTxObj(t TestReporter) *Tx {
testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH"))
conf := &testConfig.Cache conf := &testConfig.Cache
@ -71,22 +89,8 @@ func createTestTxObj(t TestReporter) *Tx {
return txObj return txObj
} }
func createUserFromValues(userVals []string, t TestReporter) *models.User { func createTestUser() models.User {
ID, err := strconv.ParseUint(userVals[0], 10, 64) testUser := models.User{214, "32426b162be0bce5428e7e36afaf734ae5afb355", 1.01, 1.0, 4, 2, 7}
verifyErrNil(err, t)
Passkey := userVals[1]
UpMultiplier, err := strconv.ParseFloat(userVals[2], 64)
verifyErrNil(err, t)
DownMultiplier, err := strconv.ParseFloat(userVals[3], 64)
Slots, err := strconv.ParseInt(userVals[4], 10, 64)
verifyErrNil(err, t)
SlotsUsed, err := strconv.ParseInt(userVals[5], 10, 64)
verifyErrNil(err, t)
return &models.User{ID, Passkey, UpMultiplier, DownMultiplier, Slots, SlotsUsed}
}
func createUser() models.User {
testUser := models.User{214, "32426b162be0bce5428e7e36afaf734ae5afb355", 0.0, 0.0, 4, 2}
return testUser return testUser
} }
@ -105,7 +109,7 @@ func createLeechers() []models.Peer {
return testLeechers return testLeechers
} }
func createTorrent() models.Torrent { func createTestTorrent() models.Torrent {
testSeeders := createSeeders() testSeeders := createSeeders()
testLeechers := createLeechers() testLeechers := createLeechers()
@ -124,96 +128,6 @@ func createTorrent() models.Torrent {
return testTorrent return testTorrent
} }
func createTorrentJson(t TestReporter) []byte {
jsonTorrent, err := json.Marshal(createTorrent())
verifyErrNil(err, t)
return jsonTorrent
}
func createUserJson(t TestReporter) []byte {
jsonUser, err := json.Marshal(createUser())
verifyErrNil(err, t)
return jsonUser
}
func ExampleJsonTransaction(testTx *Tx, retries int, t TestReporter) {
defer func() {
if err := recover(); err != nil {
t.Error(err)
}
}()
verifyErrNil(testTx.initiateRead(), t)
_, err := testTx.Do("WATCH", "testKeyA")
verifyErrNil(err, t)
_, err = redis.String(testTx.Do("GET", "testKeyA"))
if err != nil {
if err == redis.ErrNil {
t.Log("testKeyA does not exist yet")
} else {
t.Error(err)
}
}
_, err = testTx.Do("WATCH", "testKeyB")
verifyErrNil(err, t)
_, err = redis.String(testTx.Do("GET", "testKeyB"))
if err != nil {
if err == redis.ErrNil {
t.Log("testKeyB does not exist yet")
} else {
t.Error(err)
}
}
verifyErrNil(testTx.initiateWrite(), t)
// Generate random data to set
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
verifyErrNil(testTx.Send("SET", "testKeyA", strconv.Itoa(randGen.Int())), t)
verifyErrNil(testTx.Send("SET", "testKeyB", strconv.Itoa(randGen.Int())), t)
err = testTx.Commit()
// For parallel runs, there may be conflicts, retry until successful
if err == cache.ErrTxConflict && retries > 0 {
ExampleJsonTransaction(testTx, retries-1, t)
// Clear TxConflict, if retries max out, errors are already recorded
err = nil
} else if err == cache.ErrTxConflict {
t.Error("Conflict encountered, max retries reached")
t.Error(err)
}
verifyErrNil(err, t)
}
func ExampleJsonSchemaRemoveSeeder(torrent *models.Torrent, peer *models.Peer, t TestReporter) {
testTx := createTestTxObj(t)
verifyErrNil(testTx.initiateRead(), t)
key := testTx.conf.Prefix + "torrent:" + torrent.Infohash
_, err := testTx.Do("WATCH", key)
reply, err := redis.String(testTx.Do("GET", key))
if err != nil {
if err == redis.ErrNil {
t.Error("testTorrent does not exist")
} else {
t.Error(err)
}
}
verifyErrNil(json.NewDecoder(strings.NewReader(reply)).Decode(torrent), t)
delete(torrent.Seeders, "testPeerID2")
jsonTorrent, err := json.Marshal(torrent)
verifyErrNil(err, t)
verifyErrNil(testTx.initiateWrite(), t)
verifyErrNil(testTx.Send("SET", key, jsonTorrent), t)
verifyErrNil(testTx.Commit(), t)
}
func ExampleRedisTypeSchemaRemoveSeeder(torrent *models.Torrent, peer *models.Peer, t TestReporter) { func ExampleRedisTypeSchemaRemoveSeeder(torrent *models.Torrent, peer *models.Peer, t TestReporter) {
testTx := createTestTxObj(t) testTx := createTestTxObj(t)
setkey := testTx.conf.Prefix + "torrent:" + torrent.Infohash + ":seeders" setkey := testTx.conf.Prefix + "torrent:" + torrent.Infohash + ":seeders"
@ -224,64 +138,70 @@ func ExampleRedisTypeSchemaRemoveSeeder(torrent *models.Torrent, peer *models.Pe
verifyErrNil(err, t) verifyErrNil(err, t)
} }
func ExampleJsonSchemaFindUser(passkey string, t TestReporter) (*models.User, bool) {
testTx := createTestTxObj(t)
verifyErrNil(testTx.initiateRead(), t)
key := testTx.conf.Prefix + "user:" + passkey
_, err := testTx.Do("WATCH", key)
verifyErrNil(err, t)
reply, err := redis.String(testTx.Do("GET", key))
if err != nil {
if err == redis.ErrNil {
return nil, false
} else {
t.Error(err)
}
}
user := &models.User{}
verifyErrNil(json.NewDecoder(strings.NewReader(reply)).Decode(user), t)
return user, true
}
func ExampleRedisTypesSchemaFindUser(passkey string, t TestReporter) (*models.User, bool) { func ExampleRedisTypesSchemaFindUser(passkey string, t TestReporter) (*models.User, bool) {
testTx := createTestTxObj(t) testTx := createTestTxObj(t)
hashkey := testTx.conf.Prefix + "user_hash:" + sample_passkey hashkey := testTx.conf.Prefix + UserPrefix + passkey
userVals, err := redis.Strings(testTx.Do("HVALS", hashkey)) userVals, err := redis.Strings(testTx.Do("HVALS", hashkey))
if userVals == nil { if len(userVals) == 0 {
return nil, false return nil, false
} }
verifyErrNil(err, t) verifyErrNil(err, t)
compareUser := createUserFromValues(userVals, t) compareUser, err := createUser(userVals)
verifyErrNil(err, t)
return compareUser, true return compareUser, true
} }
func BenchmarkRedisJsonSchemaRemoveSeeder(b *testing.B) { func TestFindUserSuccess(t *testing.T) {
for bCount := 0; bCount < b.N; bCount++ { testUser := createTestUser()
b.StopTimer() testTx := createTestTxObj(t)
testTx := createTestTxObj(b) hashkey := testTx.conf.Prefix + UserPrefix + sample_passkey
testTorrent := createTorrent() _, err := testTx.Do("DEL", hashkey)
testSeeders := createSeeders() verifyErrNil(err, t)
key := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash
// Benchmark setup not a transaction, not thread-safe
_, err := testTx.Do("SET", key, createTorrentJson(b))
verifyErrNil(err, b)
b.StartTimer()
ExampleJsonSchemaRemoveSeeder(&testTorrent, &testSeeders[2], b) err = testTx.AddUser(&testUser)
verifyErrNil(err, t)
compareUser, exists := ExampleRedisTypesSchemaFindUser(sample_passkey, t)
if !exists {
t.Error("User not found!")
}
if testUser != *compareUser {
t.Errorf("user mismatch: %v vs. %v", compareUser, testUser)
}
}
func TestFindUserFail(t *testing.T) {
compareUser, exists := ExampleRedisTypesSchemaFindUser("not_a_user_passkey", t)
if exists {
t.Errorf("User %v found when none should exist!", compareUser)
}
}
func TestAddGetPeers(t *testing.T) {
testTx := createTestTxObj(t)
testTorrent := createTestTorrent()
setkey := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash + ":seeders"
testTx.Do("DEL", setkey)
testTx.addPeers(testTorrent.Infohash, testTorrent.Seeders, ":seeders")
peerMap, err := testTx.getPeers(sample_infohash, ":seeders")
if err != nil {
t.Error(err)
} else if len(peerMap) != len(testTorrent.Seeders) {
t.Error("Num Peers not equal")
} }
} }
func BenchmarkRedisTypesSchemaRemoveSeeder(b *testing.B) { func BenchmarkRedisTypesSchemaRemoveSeeder(b *testing.B) {
for bCount := 0; bCount < b.N; bCount++ { for bCount := 0; bCount < b.N; bCount++ {
// Ensure that remove completes successfully, // Ensure that remove completes successfully,
// even if it doesn't impact the performance // even if it doesn't impact the performance
b.StopTimer() b.StopTimer()
testTx := createTestTxObj(b) testTx := createTestTxObj(b)
testTorrent := createTorrent() testTorrent := createTestTorrent()
setkey := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash + ":seeders" setkey := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash + ":seeders"
testSeeders := createSeeders() testSeeders := createSeeders()
reply, err := redis.Int(testTx.Do("SADD", setkey, reply, err := redis.Int(testTx.Do("SADD", setkey,
@ -300,37 +220,13 @@ func BenchmarkRedisTypesSchemaRemoveSeeder(b *testing.B) {
} }
} }
func BenchmarkRedisJsonSchemaFindUser(b *testing.B) {
// Ensure successful user find ( a failed lookup may have different performance )
b.StopTimer()
testTx := createTestTxObj(b)
testUser := createUser()
userJson := string(createUserJson(b))
verifyErrNil(testTx.initiateWrite(), b)
key := testTx.conf.Prefix + "user:" + sample_passkey
verifyErrNil(testTx.Send("SET", key, userJson), b)
verifyErrNil(testTx.Commit(), b)
b.StartTimer()
for bCount := 0; bCount < b.N; bCount++ {
compareUser, exists := ExampleJsonSchemaFindUser(sample_passkey, b)
b.StopTimer()
if !exists {
b.Error("User not found!")
}
if testUser != *compareUser {
b.Errorf("user mismatch: %v vs. %v", compareUser, testUser)
}
b.StartTimer()
}
}
func BenchmarkRedisTypesSchemaFindUser(b *testing.B) { func BenchmarkRedisTypesSchemaFindUser(b *testing.B) {
// Ensure successful user find ( a failed lookup may have different performance ) // Ensure successful user find ( a failed lookup may have different performance )
b.StopTimer() b.StopTimer()
testUser := createUser() testUser := createTestUser()
testTx := createTestTxObj(b) testTx := createTestTxObj(b)
hashkey := testTx.conf.Prefix + "user_hash:" + sample_passkey hashkey := testTx.conf.Prefix + UserPrefix + sample_passkey
reply, err := testTx.Do("HMSET", hashkey, reply, err := testTx.Do("HMSET", hashkey,
"id", testUser.ID, "id", testUser.ID,
"passkey", testUser.Passkey, "passkey", testUser.Passkey,
@ -340,7 +236,7 @@ func BenchmarkRedisTypesSchemaFindUser(b *testing.B) {
"slots_used", testUser.SlotsUsed) "slots_used", testUser.SlotsUsed)
if reply == nil { if reply == nil {
b.Error("no hash fields added!") b.Log("no hash fields added!")
} }
verifyErrNil(err, b) verifyErrNil(err, b)
b.StartTimer() b.StartTimer()
@ -360,13 +256,6 @@ func BenchmarkRedisTypesSchemaFindUser(b *testing.B) {
} }
} }
func TestRedisTransaction(t *testing.T) {
for i := 0; i < 10; i++ {
// No retries for serial transactions
ExampleJsonTransaction(createTestTxObj(t), 0, t)
}
}
func TestReadAfterWrite(t *testing.T) { func TestReadAfterWrite(t *testing.T) {
// Test requires panic // Test requires panic
defer func() { defer func() {
@ -392,100 +281,3 @@ func TestCloseClosedTransaction(t *testing.T) {
testTx.close() testTx.close()
testTx.close() testTx.close()
} }
func TestParallelTx0(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
for i := 0; i < 20; i++ {
go ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t)
time.Sleep(1 * time.Millisecond)
}
}
func TestParallelTx1(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t)
for i := 0; i < 100; i++ {
go ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t)
}
}
func TestParallelTx2(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
for i := 0; i < 100; i++ {
go ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t)
}
ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t)
}
// Just in case the above parallel tests didn't fail, force a failure here
func TestParallelInterrupted(t *testing.T) {
t.Parallel()
testTx := createTestTxObj(t)
defer func() {
if err := recover(); err != nil {
t.Errorf("initiateRead() failed in parallel %s", err)
}
}()
verifyErrNil(testTx.initiateRead(), t)
_, err := testTx.Do("WATCH", "testKeyA")
verifyErrNil(err, t)
testValueA, err := redis.String(testTx.Do("GET", "testKeyA"))
if err != nil {
if err == redis.ErrNil {
t.Log("redis.ErrNil")
} else {
t.Error(err)
}
}
_, err = testTx.Do("WATCH", "testKeyB")
if err != nil {
if err == redis.ErrNil {
t.Log("redis.ErrNil")
} else {
t.Error(err)
}
}
testValueB, err := redis.String(testTx.Do("GET", "testKeyB"))
if err != nil {
if err == redis.ErrNil {
t.Log("redis.ErrNil")
} else {
t.Error(err)
}
}
// Stand in for what real updates would do
testValueB = testValueB + "+updates"
testValueA = testValueA + "+updates"
// Simulating another client interrupts transaction, causing exec to fail
ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t)
verifyErrNil(testTx.initiateWrite(), t)
verifyErrNil(testTx.Send("SET", "testKeyA", testValueA), t)
verifyErrNil(testTx.Send("SET", "testKeyB", testValueB), t)
keys, err := (testTx.Do("EXEC"))
// Expect error
if keys != nil {
t.Error("Keys not nil; exec should have been interrupted")
}
verifyErrNil(err, t)
testTx.close()
}

View file

@ -40,4 +40,5 @@ type User struct {
DownMultiplier float64 `json:"down_multiplier"` DownMultiplier float64 `json:"down_multiplier"`
Slots int64 `json:"slots"` Slots int64 `json:"slots"`
SlotsUsed int64 `json:"slots_used"` SlotsUsed int64 `json:"slots_used"`
Snatches uint `json:"snatches"`
} }