diff --git a/cache/redis/redis_test.go b/cache/redis/redis_test.go index fe87f18..55b6750 100644 --- a/cache/redis/redis_test.go +++ b/cache/redis/redis_test.go @@ -2,12 +2,16 @@ // Use of this source code is governed by the BSD 2-Clause license, // which can be found in the LICENSE file. +// Package redis implements the storage interface for a BitTorrent tracker. +// Benchmarks are at the top of the file, tests are at the bottom package redis import ( + "encoding/json" "math/rand" "os" "strconv" + "strings" "testing" "time" @@ -15,43 +19,68 @@ import ( "github.com/pushrax/chihaya/cache" "github.com/pushrax/chihaya/config" + "github.com/pushrax/chihaya/models" ) -// Maximum number of parallel retries; depends on system latency +// Maximum number of parallel retries, will depends on system latency const MAX_RETRIES = 9000 -func CreateTestTxObj(t *testing.T) Tx { - testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) - if err != nil { - t.Error(err) - } - - testDialFunc := makeDialFunc(&testConfig.Cache) - testConn, err := testDialFunc() - if err != nil { - t.Error(err) - } - return Tx{&testConfig.Cache, false, false, testConn} +type TestReporter interface { + Error(args ...interface{}) } -func SampleTransaction(testTx Tx, retries int, t *testing.T) { +func verifyErrNil(err error, t TestReporter) { + if err != nil { + t.Error(err) + } +} + +func createTestTxObj(t *testing.T) *Tx { + testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) + conf := &testConfig.Cache + verifyErrNil(err, t) + + testPool := &Pool{ + conf: conf, + pool: redis.Pool{ + MaxIdle: conf.MaxIdleConn, + IdleTimeout: conf.IdleTimeout.Duration, + Dial: makeDialFunc(conf), + TestOnBorrow: testOnBorrow, + }, + } + + //testDialFunc := makeDialFunc(&testConfig.Cache) + //testConn, err := testDialFunc() + txObj := &Tx{ + conf: testPool.conf, + done: false, + multi: false, + Conn: testPool.pool.Get(), + } + verifyErrNil(err, t) + + // Test connection before returning + //txObj := Tx{&testConfig.Cache, false, false, testConn} + _, err = txObj.Do("PING") + verifyErrNil(err, t) + return txObj +} + +func sampleTransaction(testTx *Tx, retries int, t *testing.T) { defer func() { - if err := recover(); err != nil { - t.Errorf("initiateRead() failed: %s", err) + if rawError := recover(); rawError != nil { + t.Error(rawError) } }() - err := testTx.initiateRead() - if err != nil { - t.Error(err) - } - _, err = testTx.Do("WATCH", "testKeyA") - if err != nil { - t.Error(err) - } + verifyErrNil(testTx.initiateRead(), t) + _, err := testTx.Do("WATCH", "testKeyA") + verifyErrNil(err, t) + _, err = redis.String(testTx.Do("GET", "testKeyA")) if err != nil { if err == redis.ErrNil { - t.Log("redis.ErrNil") + t.Log("testKeyA does not exist yet") } else { t.Error(err) } @@ -64,129 +93,292 @@ func SampleTransaction(testTx Tx, retries int, t *testing.T) { t.Error(err) } } + _, err = redis.String(testTx.Do("GET", "testKeyB")) if err != nil { if err == redis.ErrNil { - t.Log("redis.ErrNil") + t.Log("testKeyB does not exist yet") } else { t.Error(err) } } - err = testTx.initiateWrite() - if err != nil { - t.Error(err) - } + verifyErrNil(testTx.initiateWrite(), t) // Generate random data to set randGen := rand.New(rand.NewSource(time.Now().UnixNano())) - err = testTx.Send("SET", "testKeyA", strconv.Itoa(randGen.Int())) - if err != nil { - t.Error(err) - } - err = testTx.Send("SET", "testKeyB", strconv.Itoa(randGen.Int())) - if err != nil { - t.Error(err) - } + verifyErrNil(testTx.Send("SET", "testKeyA", strconv.Itoa(randGen.Int())), t) + verifyErrNil(testTx.Send("SET", "testKeyB", strconv.Itoa(randGen.Int())), t) err = testTx.Commit() - switch { // For parallel runs, there may be conflicts, retry until successful - case err == cache.ErrTxConflict && retries > 0: - // t.Logf("Conflict, %d retries left",retries) - SampleTransaction(testTx, retries-1, t) + if err == cache.ErrTxConflict && retries > 0 { + sampleTransaction(testTx, retries-1, t) // Clear TxConflict, if retries max out, errors are already recorded err = nil - case err == cache.ErrTxConflict: - t.Errorf("Conflict encountered, max retries reached: %s", err) - case err != nil: + } else if err == cache.ErrTxConflict { + t.Error("Conflict encountered, max retries reached") t.Error(err) - default: } + verifyErrNil(err, t) +} + +func BenchmarkRedisJsonSchemaRemoveSeeder(b *testing.B) { + var t testing.T + infohash := "58c290f4ea1efb3adcb8c1ed2643232117577bcd" + for bCount := 0; bCount < b.N; bCount++ { + testTx := createTestTxObj(&t) + + verifyErrNil(testTx.initiateRead(), b) + + key := testTx.conf.Prefix + "torrent:" + infohash + _, err := testTx.Do("WATCH", key) + reply, err := redis.String(testTx.Do("GET", key)) + if err != nil { + if err == redis.ErrNil { + b.Logf("no key yet, creating json reply") + reply = string(createTorrentJson(&t)) + } else { + b.Error(err) + } + } + + torrent := &models.Torrent{} + verifyErrNil(json.NewDecoder(strings.NewReader(reply)).Decode(torrent), b) + + delete(torrent.Seeders, "testPeerID2") + + jsonTorrent, err := json.Marshal(torrent) + verifyErrNil(err, b) + verifyErrNil(testTx.initiateWrite(), b) + verifyErrNil(testTx.Send("SET", key, jsonTorrent), b) + verifyErrNil(testTx.Commit(), b) + } +} + +func BenchmarkRedisTypesSchemaRemoveSeeder(b *testing.B) { + var t testing.T + infohash := "58c290f4ea1efb3adcb8c1ed2643232117577bcd" + for bCount := 0; bCount < b.N; bCount++ { + testTx := createTestTxObj(&t) + setkey := testTx.conf.Prefix + "torrent:" + infohash + ":seeders" + + b.StopTimer() + reply, err := redis.Int(testTx.Do("SADD", setkey, "testPeerID0", "testPeerID1", "testPeerID2", "testPeerID3")) + if reply == 0 { + b.Error("no keys added!") + } + verifyErrNil(err, b) + + b.StartTimer() + + reply, err = redis.Int(testTx.Do("SREM", setkey, "testPeerID2")) + if reply == 0 { + b.Error("remove failed") + } + verifyErrNil(err, b) + } + +} + +func BenchmarkRedisJsonSchemaFindUser(b *testing.B) { + var t testing.T + passkey := "32426b162be0bce5428e7e36afaf734ae5afb355" + for bCount := 0; bCount < b.N; bCount++ { + testTx := createTestTxObj(&t) + + verifyErrNil(testTx.initiateRead(), b) + + key := testTx.conf.Prefix + "user:" + passkey + _, err := testTx.Do("WATCH", key) + verifyErrNil(err, b) + reply, err := redis.String(testTx.Do("GET", key)) + if err != nil { + if err == redis.ErrNil { + b.Logf("no key yet, creating & sending json back to redis") + reply = string(createUserJson(&t)) + // Add user to redis, as this is a read-only operation + verifyErrNil(testTx.initiateWrite(), b) + verifyErrNil(testTx.Send("SET", key, reply), b) + verifyErrNil(testTx.Commit(), b) + } else { + b.Error(err) + } + } + + user := &models.User{} + verifyErrNil(json.NewDecoder(strings.NewReader(reply)).Decode(user), b) + } +} + +func BenchmarkRedisTypesSchemaFindUser(b *testing.B) { + var t testing.T + passkey := "32426b162be0bce5428e7e36afaf734ae5afb355" + for bCount := 0; bCount < b.N; bCount++ { + testTx := createTestTxObj(&t) + hashkey := testTx.conf.Prefix + "user_hash:" + passkey + + b.StopTimer() + testUser := createUser() + reply, err := testTx.Do("HMSET", hashkey, "id", testUser.ID, "passkey", testUser.Passkey, "up_multiplier", testUser.UpMultiplier, "down_multiplier", testUser.DownMultiplier, "slots", testUser.Slots, "slots_used", testUser.SlotsUsed) + if reply == nil { + b.Error("no hash fields added!") + } + verifyErrNil(err, b) + + b.StartTimer() + + userVals, err := redis.Strings(testTx.Do("HVALS", hashkey)) + if userVals == nil { + b.Error("user does not exist!") + } + verifyErrNil(err, b) + compareUser := createUserFromValues(userVals, &t) + if testUser != compareUser { + b.Errorf("user mismatch: %v vs. %v", compareUser, testUser) + b.Log(userVals) + } + } +} + +func createTorrentJson(t *testing.T) []byte { + jsonTorrent, err := json.Marshal(createTorrent()) + verifyErrNil(err, t) + return jsonTorrent +} + +func createUserJson(t *testing.T) []byte { + jsonUser, err := json.Marshal(createUser()) + verifyErrNil(err, t) + return jsonUser +} + +func createUserFromValues(userVals []string, t *testing.T) models.User { + ID, err := strconv.ParseUint(userVals[0], 10, 64) + verifyErrNil(err, t) + Passkey := userVals[1] + UpMultiplier, err := strconv.ParseFloat(userVals[2], 64) + verifyErrNil(err, t) + DownMultiplier, err := strconv.ParseFloat(userVals[3], 64) + Slots, err := strconv.ParseInt(userVals[4], 10, 64) + verifyErrNil(err, t) + SlotsUsed, err := strconv.ParseInt(userVals[5], 10, 64) + verifyErrNil(err, t) + return models.User{ID, Passkey, UpMultiplier, DownMultiplier, Slots, SlotsUsed} +} +func createUser() models.User { + testUser := models.User{214, "32426b162be0bce5428e7e36afaf734ae5afb355", 0.0, 0.0, 4, 2} + return testUser +} + +func createTorrent() models.Torrent { + testSeeders := make([]models.Peer, 4) + testSeeders[0] = models.Peer{"testPeerID0", 57005, 48879, "testIP", 6889, 1024, 3000, 4200, 6} + testSeeders[1] = models.Peer{"testPeerID1", 10101, 48879, "testIP", 6889, 1024, 3000, 4200, 6} + testSeeders[2] = models.Peer{"testPeerID2", 29890, 48879, "testIP", 6889, 1024, 3000, 4200, 6} + testSeeders[3] = models.Peer{"testPeerID3", 65261, 48879, "testIP", 6889, 1024, 3000, 4200, 6} + testLeechers := make([]models.Peer, 1) + testLeechers[0] = models.Peer{"testPeerID", 11111, 48879, "testIP", 6889, 1024, 3000, 4200, 6} + + infohash := "58c290f4ea1efb3adcb8c1ed2643232117577bcd" + + seeders := make(map[string]models.Peer) + for i := range testSeeders { + seeders[testSeeders[i].ID] = testSeeders[i] + } + + leechers := make(map[string]models.Peer) + for i := range testLeechers { + leechers[testLeechers[i].ID] = testLeechers[i] + } + + testTorrent := models.Torrent{48879, infohash, true, seeders, leechers, 11, 0.0, 0.0, 0} + return testTorrent } func TestRedisTransaction(t *testing.T) { for i := 0; i < 10; i++ { // No retries for serial transactions - SampleTransaction(CreateTestTxObj(t), 0, t) + sampleTransaction(createTestTxObj(t), 0, t) } } func TestReadAfterWrite(t *testing.T) { + testTx := createTestTxObj(t) + + verifyErrNil(testTx.initiateWrite(), t) + + // Test requires panic defer func() { - if err := recover(); err == nil { + if rawError := recover(); rawError == nil { t.Error("Read after write did not panic") } }() - - testTx := CreateTestTxObj(t) - err := testTx.initiateWrite() - if err != nil { - t.Error(err) - } - - err = testTx.initiateRead() - if err != nil { - t.Error(err) - } + verifyErrNil(testTx.initiateRead(), t) } -func TestCloseClosedTransaction(t *testing.T) { +func TestDoubleClose(t *testing.T) { + testTx := createTestTxObj(t) + + testTx.close() + //require panic defer func() { - if err := recover(); err == nil { - t.Error("Closing a closed transaction did not panic") + if rawError := recover(); rawError == nil { + t.Error("double close did not panic") } }() - - testTx := CreateTestTxObj(t) - testTx.close() testTx.close() } func TestParallelTx0(t *testing.T) { + if testing.Short() { + t.SkipNow() + } t.Parallel() + for i := 0; i < 20; i++ { - go SampleTransaction(CreateTestTxObj(t), MAX_RETRIES, t) + go sampleTransaction(createTestTxObj(t), MAX_RETRIES, t) time.Sleep(1 * time.Millisecond) } + } func TestParallelTx1(t *testing.T) { + if testing.Short() { + t.SkipNow() + } t.Parallel() - SampleTransaction(CreateTestTxObj(t), MAX_RETRIES, t) + sampleTransaction(createTestTxObj(t), MAX_RETRIES, t) for i := 0; i < 100; i++ { - go SampleTransaction(CreateTestTxObj(t), MAX_RETRIES, t) + go sampleTransaction(createTestTxObj(t), MAX_RETRIES, t) } } func TestParallelTx2(t *testing.T) { + if testing.Short() { + t.SkipNow() + } t.Parallel() for i := 0; i < 100; i++ { - go SampleTransaction(CreateTestTxObj(t), MAX_RETRIES, t) + go sampleTransaction(createTestTxObj(t), MAX_RETRIES, t) } - SampleTransaction(CreateTestTxObj(t), MAX_RETRIES, t) + sampleTransaction(createTestTxObj(t), MAX_RETRIES, t) } // Just in case the above parallel tests didn't fail, force a failure here func TestParallelInterrupted(t *testing.T) { t.Parallel() + testTx := createTestTxObj(t) defer func() { - if err := recover(); err != nil { - t.Errorf("initiateRead() failed in parallel: %s", err) + if rawError := recover(); rawError != nil { + t.Error("initiate read failed in parallelInterrupted") } }() + verifyErrNil(testTx.initiateRead(), t) - testTx := CreateTestTxObj(t) - err := testTx.initiateRead() - if err != nil { - t.Error(err) - } - - _, err = testTx.Do("WATCH", "testKeyA") - if err != nil { - t.Error(err) - } + _, err := testTx.Do("WATCH", "testKeyA") + verifyErrNil(err, t) testValueA, err := redis.String(testTx.Do("GET", "testKeyA")) if err != nil { @@ -214,35 +406,23 @@ func TestParallelInterrupted(t *testing.T) { t.Error(err) } } - // Stand in for what real updates would do testValueB = testValueB + "+updates" testValueA = testValueA + "+updates" // Simulating another client interrupts transaction, causing exec to fail - SampleTransaction(CreateTestTxObj(t), MAX_RETRIES, t) + sampleTransaction(createTestTxObj(t), MAX_RETRIES, t) - err = testTx.initiateWrite() - if err != nil { - t.Error(err) - } - err = testTx.Send("SET", "testKeyA", testValueA) - if err != nil { - t.Error(err) - } - - err = testTx.Send("SET", "testKeyB", testValueB) - if err != nil { - t.Error(err) - } + verifyErrNil(testTx.initiateWrite(), t) + verifyErrNil(testTx.Send("SET", "testKeyA", testValueA), t) + verifyErrNil(testTx.Send("SET", "testKeyB", testValueB), t) keys, err := (testTx.Do("EXEC")) + // Expect error if keys != nil { - t.Error("Keys not nil; exec should have been interrupted") + t.Error("keys not nil, exec should have been interrupted") } + verifyErrNil(err, t) testTx.close() - if err != nil { - t.Error(err) - } } diff --git a/models/models.go b/models/models.go index 87afcba..727c0a4 100644 --- a/models/models.go +++ b/models/models.go @@ -24,7 +24,7 @@ type Torrent struct { Leechers map[string]Peer `json:"leechers"` Snatches uint `json:"snatches"` UpMultiplier float64 `json:"up_multiplier"` - DownMultiplier float64 `json:"down_multipler"` + DownMultiplier float64 `json:"down_multiplier"` LastAction int64 `json:"last_action"` }