diff --git a/cache/cache.go b/cache/cache.go index e477ede..702ea49 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -7,7 +7,6 @@ package cache import ( - "errors" "fmt" "github.com/pushrax/chihaya/config" @@ -15,9 +14,7 @@ import ( ) var ( - drivers = make(map[string]Driver) - ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back") - ErrTxConflict = errors.New("cache: Commit interrupted, update transaction and repeat") + drivers = make(map[string]Driver) ) type Driver interface { @@ -57,15 +54,9 @@ type Pool interface { Get() (Tx, error) } -// Tx represents an in-progress data store transaction. -// A transaction must end with a call to Commit or Rollback. -// -// After a call to Commit or Rollback, all operations on the -// transaction must fail with ErrTxDone. +// The transmit object is the interface to add, remove and modify +// data in the cache type Tx interface { - Commit() error - Rollback() error - // Reads FindUser(passkey string) (*models.User, bool, error) FindTorrent(infohash string) (*models.Torrent, bool, error) @@ -86,4 +77,6 @@ type Tx interface { RemoveTorrent(t *models.Torrent) error AddUser(u *models.User) error RemoveUser(u *models.User) error + WhitelistClient(peerID string) error + UnWhitelistClient(peerID string) error } diff --git a/cache/redis/redis.go b/cache/redis/redis.go index 7281f0f..3de4ee5 100644 --- a/cache/redis/redis.go +++ b/cache/redis/redis.go @@ -16,7 +16,6 @@ package redis import ( "errors" "strconv" - "strings" "time" "github.com/garyburd/redigo/redis" @@ -32,10 +31,11 @@ var ( ErrCreatePeer = errors.New("redis: Incorrect reply length for peer") ErrMarkActive = errors.New("redis: Torrent doesn't exist") - SeederSuffix = ":seeders" - LeecherSuffix = ":leechers" + SeederPrefix = "seeders:" + LeecherPrefix = "leechers:" TorrentPrefix = "torrent:" UserPrefix = "user:" + PeerPrefix = "peer:" ) type driver struct{} @@ -77,11 +77,15 @@ func (p *Pool) Close() error { } func (p *Pool) Get() (cache.Tx, error) { - return &Tx{ + retTx := &Tx{ conf: p.conf, done: false, Conn: p.pool.Get(), - }, nil + } + // Test valid connection before returning + _, err := retTx.Do("PING") + + return retTx, err } type Tx struct { @@ -99,37 +103,6 @@ func (tx *Tx) close() { tx.Conn.Close() } -func (tx *Tx) Commit() error { - if tx.done { - return cache.ErrTxDone - } - if tx.multi == true { - execResponse, err := tx.Do("EXEC") - if execResponse == nil { - tx.multi = false - return cache.ErrTxConflict - } - if err != nil { - return err - } - } - tx.close() - return nil -} - -func (tx *Tx) Rollback() error { - if tx.done { - return cache.ErrTxDone - } - // Undoes watches and multi - if _, err := tx.Do("DISCARD"); err != nil { - return err - } - tx.multi = false - tx.close() - return nil -} - func createUser(userVals []string) (*models.User, error) { if len(userVals) != 7 { return nil, ErrCreateUser @@ -163,7 +136,7 @@ func createUser(userVals []string) (*models.User, error) { return &models.User{ID, Passkey, UpMultiplier, DownMultiplier, Slots, SlotsUsed, uint(Snatches)}, nil } -func createTorrent(torrentVals []string, seeders map[string]models.Peer, leechers map[string]models.Peer) (*models.Torrent, error) { +func (tx *Tx) createTorrent(torrentVals []string) (*models.Torrent, error) { if len(torrentVals) != 7 { return nil, ErrCreateTorrent } @@ -192,14 +165,32 @@ func createTorrent(torrentVals []string, seeders map[string]models.Peer, leecher if err != nil { return nil, err } - return &models.Torrent{ID, Infohash, Active, seeders, leechers, uint(Snatches), UpMultiplier, DownMultiplier, LastAction}, nil + seeders, err := tx.getPeers(ID, SeederPrefix) + if err != nil { + return nil, err + } + leechers, err := tx.getPeers(ID, LeecherPrefix) + if err != nil { + return nil, err + } + return &models.Torrent{ID, Infohash, Active, seeders, leechers, uint(Snatches), UpMultiplier, DownMultiplier, LastAction}, nil } -// Prevents adding duplicate peers, and doesn't return error on dup add -func (tx *Tx) addPeer(infohash string, peer *models.Peer, suffix string) error { - setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix - _, err := tx.Do("SADD", setKey, *peer) +// hashkey relies on combination of peerID, userID, and torrentID being unique +func (tx *Tx) setPeer(peer *models.Peer, peerTypePrefix string) error { + hashKey := tx.conf.Prefix + peerTypePrefix + getPeerHashKey(peer) + _, err := tx.Do("HMSET", hashKey, + "id", peer.ID, + "user_id", peer.UserID, + "torrent_id", peer.TorrentID, + "ip", peer.IP, + "port", peer.Port, + "uploaded", peer.Uploaded, + "downloaded", peer.Downloaded, + "left", peer.Left, + "last_announce", peer.LastAnnounce) + if err != nil { return err } @@ -207,8 +198,8 @@ func (tx *Tx) addPeer(infohash string, peer *models.Peer, suffix string) error { } // Will not return an error if the peer doesn't exist -func (tx *Tx) removePeer(infohash string, peer *models.Peer, suffix string) error { - setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix +func (tx *Tx) removePeer(peer *models.Peer, peerTypePrefix string) error { + setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(peer.TorrentID, 36) _, err := tx.Do("SREM", setKey, *peer) if err != nil { return err @@ -216,21 +207,42 @@ func (tx *Tx) removePeer(infohash string, peer *models.Peer, suffix string) erro return nil } -func (tx *Tx) addPeers(infohash string, peers map[string]models.Peer, suffix string) error { - setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix +func (tx *Tx) removePeers(torrentID uint64, peers map[string]models.Peer, peerTypePrefix string) error { for _, peer := range peers { - err := tx.Send("SADD", setKey, peer) + hashKey := tx.conf.Prefix + peerTypePrefix + getPeerHashKey(&peer) + _, err := tx.Do("DEL", hashKey) if err != nil { return err } + delete(peers, peer.ID) } - tx.Flush() - tx.Receive() + // Only delete the set if all the peer deletions were successful + setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) + _, err := tx.Do("DEL", setKey) + if err != nil { + return err + } + return nil } -func createPeer(peerString string) (*models.Peer, error) { - peerVals := strings.Split(strings.Trim(peerString, "{}"), " ") +func getPeerHashKey(peer *models.Peer) string { + return peer.ID + ":" + strconv.FormatUint(peer.UserID, 36) + ":" + strconv.FormatUint(peer.TorrentID, 36) +} + +func (tx *Tx) addPeers(peers map[string]models.Peer, peerTypePrefix string) error { + for _, peer := range peers { + setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(peer.TorrentID, 36) + _, err := tx.Do("SADD", setKey, getPeerHashKey(&peer)) + if err != nil { + return err + } + tx.setPeer(&peer, peerTypePrefix) + } + return nil +} + +func createPeer(peerVals []string) (*models.Peer, error) { if len(peerVals) != 9 { return nil, ErrCreatePeer } @@ -269,12 +281,21 @@ func createPeer(peerString string) (*models.Peer, error) { } -func (tx *Tx) getPeers(infohash string, suffix string) (peers map[string]models.Peer, err error) { +func (tx *Tx) getPeers(torrentID uint64, peerTypePrefix string) (peers map[string]models.Peer, err error) { peers = make(map[string]models.Peer) - setKey := tx.conf.Prefix + TorrentPrefix + infohash + suffix + setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) peerStrings, err := redis.Strings(tx.Do("SMEMBERS", setKey)) - for peerIndex := range peerStrings { - peer, err := createPeer(peerStrings[peerIndex]) + if err != nil { + return peers, err + } + // Keys map to peer objects stored in hashes + for _, peerHashKey := range peerStrings { + hashKey := tx.conf.Prefix + peerTypePrefix + peerHashKey + peerVals, err := redis.Strings(tx.Do("HVALS", hashKey)) + if err != nil { + return peers, err + } + peer, err := createPeer(peerVals) if err != nil { return nil, err } @@ -297,18 +318,28 @@ func (tx *Tx) AddTorrent(t *models.Torrent) error { return err } - tx.addPeers(t.Infohash, t.Seeders, SeederSuffix) - tx.addPeers(t.Infohash, t.Leechers, LeecherSuffix) + tx.addPeers(t.Seeders, SeederPrefix) + tx.addPeers(t.Leechers, LeecherPrefix) return nil } func (tx *Tx) RemoveTorrent(t *models.Torrent) error { hashkey := tx.conf.Prefix + TorrentPrefix + t.Infohash - _, err := tx.Do("HDEL", hashkey) + _, err := tx.Do("DEL", hashkey) if err != nil { return err } + // Remove seeders and leechers as well + err = tx.removePeers(t.ID, t.Seeders, SeederPrefix) + if err != nil { + return err + } + err = tx.removePeers(t.ID, t.Leechers, LeecherPrefix) + if err != nil { + return err + } + return nil } @@ -330,7 +361,7 @@ func (tx *Tx) AddUser(u *models.User) error { func (tx *Tx) RemoveUser(u *models.User) error { hashkey := tx.conf.Prefix + UserPrefix + u.Passkey - _, err := tx.Do("HDEL", hashkey) + _, err := tx.Do("DEL", hashkey) if err != nil { return err } @@ -352,6 +383,7 @@ func (tx *Tx) FindUser(passkey string) (*models.User, bool, error) { return foundUser, true, nil } +// This is a mulple action command, it's not internally atomic func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) { hashkey := tx.conf.Prefix + TorrentPrefix + infohash torrentStrings, err := redis.Strings(tx.Do("HVALS", hashkey)) @@ -361,9 +393,7 @@ func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) { return nil, false, nil } - seeders, err := tx.getPeers(infohash, SeederSuffix) - leechers, err := tx.getPeers(infohash, LeecherSuffix) - foundTorrent, err := createTorrent(torrentStrings, seeders, leechers) + foundTorrent, err := tx.createTorrent(torrentStrings) if err != nil { return nil, false, err } @@ -372,7 +402,19 @@ func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) { func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) { key := tx.conf.Prefix + "whitelist" - return redis.Bool(tx.Do("ISMEMBER", key, peerID)) + return redis.Bool(tx.Do("SISMEMBER", key, peerID)) +} + +func (tx *Tx) WhitelistClient(peerID string) error { + key := tx.conf.Prefix + "whitelist" + _, err := tx.Do("SADD", key, peerID) + return err +} + +func (tx *Tx) UnWhitelistClient(peerID string) error { + key := tx.conf.Prefix + "whitelist" + _, err := tx.Do("SREM", key, peerID) + return err } // This is a mulple action command, it's not internally atomic @@ -407,28 +449,66 @@ func (tx *Tx) MarkActive(torrent *models.Torrent) error { return nil } -func (tx *Tx) AddLeecher(t *models.Torrent, p *models.Peer) error { - return tx.addPeer(t.Infohash, p, LeecherSuffix) +func (tx *Tx) AddLeecher(torrent *models.Torrent, peer *models.Peer) error { + setKey := tx.conf.Prefix + LeecherPrefix + strconv.FormatUint(torrent.ID, 36) + _, err := tx.Do("SADD", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + err = tx.setPeer(peer, LeecherPrefix) + if err != nil { + return err + } + if torrent.Leechers == nil { + torrent.Leechers = make(map[string]models.Peer) + } + torrent.Leechers[peer.ID] = *peer + return nil } +// Setting assumes it is already a leecher, and just needs to be updated +// Maybe eventually there will be a move from leecher to seeder method func (tx *Tx) SetLeecher(t *models.Torrent, p *models.Peer) error { - return tx.addPeer(t.Infohash, p, LeecherSuffix) + return tx.setPeer(p, LeecherPrefix) } func (tx *Tx) RemoveLeecher(t *models.Torrent, p *models.Peer) error { - return tx.removePeer(t.Infohash, p, LeecherSuffix) + err := tx.removePeer(p, LeecherPrefix) + if err != nil { + return err + } + delete(t.Leechers, p.ID) + return nil } -func (tx *Tx) AddSeeder(t *models.Torrent, p *models.Peer) error { - return tx.addPeer(t.Infohash, p, SeederSuffix) +func (tx *Tx) AddSeeder(torrent *models.Torrent, peer *models.Peer) error { + setKey := tx.conf.Prefix + SeederPrefix + strconv.FormatUint(torrent.ID, 36) + _, err := tx.Do("SADD", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + err = tx.setPeer(peer, SeederPrefix) + if err != nil { + return err + } + if torrent.Seeders == nil { + torrent.Seeders = make(map[string]models.Peer) + } + torrent.Seeders[peer.ID] = *peer + return nil } func (tx *Tx) SetSeeder(t *models.Torrent, p *models.Peer) error { - return tx.addPeer(t.Infohash, p, SeederSuffix) + return tx.setPeer(p, SeederPrefix) } func (tx *Tx) RemoveSeeder(t *models.Torrent, p *models.Peer) error { - return tx.removePeer(t.Infohash, p, SeederSuffix) + err := tx.removePeer(p, SeederPrefix) + if err != nil { + return err + } + delete(t.Seeders, p.ID) + return nil } func (tx *Tx) IncrementSlots(u *models.User) error { diff --git a/cache/redis/redis_bench_test.go b/cache/redis/redis_bench_test.go new file mode 100644 index 0000000..7c14bee --- /dev/null +++ b/cache/redis/redis_bench_test.go @@ -0,0 +1,133 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +// Benchmarks two different redis schemeas +package redis + +import ( + "errors" + "testing" + + "github.com/garyburd/redigo/redis" + + "github.com/pushrax/chihaya/models" +) + +var ( + ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back") + ErrTxConflict = errors.New("cache: Commit interrupted, update transaction and repeat") +) + +// Maximum number of parallel retries; depends on system latency +const MAX_RETRIES = 9000 + +// Legacy JSON support for benching +func (tx *Tx) initiateWrite() error { + if tx.done { + return ErrTxDone + } + if tx.multi != true { + tx.multi = true + return tx.Send("MULTI") + } + return nil +} + +func (tx *Tx) initiateRead() error { + if tx.done { + return ErrTxDone + } + if tx.multi == true { + panic("Tried to read during MULTI") + } + return nil +} + +func (tx *Tx) Commit() error { + if tx.done { + return ErrTxDone + } + if tx.multi == true { + execResponse, err := tx.Do("EXEC") + if execResponse == nil { + tx.multi = false + return ErrTxConflict + } + if err != nil { + return err + } + } + tx.close() + return nil +} + +func (tx *Tx) Rollback() error { + if tx.done { + return ErrTxDone + } + // Undoes watches and multi + if _, err := tx.Do("DISCARD"); err != nil { + return err + } + tx.multi = false + tx.close() + return nil +} + +func ExampleRedisTypesSchemaFindUser(passkey string, t TestReporter) (*models.User, bool) { + testTx := createTestTxObj(t) + hashkey := testTx.conf.Prefix + UserPrefix + passkey + userVals, err := redis.Strings(testTx.Do("HVALS", hashkey)) + if len(userVals) == 0 { + return nil, false + } + verifyErrNil(err, t) + compareUser, err := createUser(userVals) + verifyErrNil(err, t) + return compareUser, true +} + +func BenchmarkRedisTypesSchemaRemoveSeeder(b *testing.B) { + for bCount := 0; bCount < b.N; bCount++ { + //TODO this needs to be updated + b.Error("Unimplemented") + + } +} + +func BenchmarkRedisTypesSchemaFindUser(b *testing.B) { + + // Ensure successful user find ( a failed lookup may have different performance ) + b.StopTimer() + testUser := createTestUser() + testTx := createTestTxObj(b) + hashkey := testTx.conf.Prefix + UserPrefix + testUser.Passkey + reply, err := testTx.Do("HMSET", hashkey, + "id", testUser.ID, + "passkey", testUser.Passkey, + "up_multiplier", testUser.UpMultiplier, + "down_multiplier", testUser.DownMultiplier, + "slots", testUser.Slots, + "slots_used", testUser.SlotsUsed) + + if reply == nil { + b.Log("no hash fields added!") + } + verifyErrNil(err, b) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + + compareUser, exists := ExampleRedisTypesSchemaFindUser(testUser.Passkey, b) + + b.StopTimer() + if !exists { + b.Error("User not found!") + } + if testUser != *compareUser { + b.Errorf("user mismatch: %v vs. %v", compareUser, testUser) + } + b.StartTimer() + } +} diff --git a/cache/redis/redis_test.go b/cache/redis/redis_test.go index 3a18ea1..144a7f3 100644 --- a/cache/redis/redis_test.go +++ b/cache/redis/redis_test.go @@ -5,21 +5,56 @@ package redis import ( + "crypto/rand" + "io" "os" + "strconv" "testing" "github.com/garyburd/redigo/redis" - "github.com/pushrax/chihaya/cache" "github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/models" ) -// Maximum number of parallel retries; depends on system latency -const MAX_RETRIES = 9000 +var ( + testTorrentIDCounter uint64 + testUserIDCounter uint64 + testPeerIDCounter int +) -const sample_infohash = "58c290f4ea1efb3adcb8c1ed2643232117577bcd" -const sample_passkey = "32426b162be0bce5428e7e36afaf734ae5afb355" +func createTestTorrentID() uint64 { + testTorrentIDCounter++ + return testTorrentIDCounter +} + +func createTestUserID() uint64 { + testUserIDCounter++ + return testUserIDCounter +} + +func createTestPeerID() string { + testPeerIDCounter++ + return "-testPeerID-" + strconv.Itoa(testPeerIDCounter) +} + +func createTestInfohash() string { + uuid := make([]byte, 40) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + panic(err) + } + return string(uuid) +} + +func createTestPasskey() string { + uuid := make([]byte, 40) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + panic(err) + } + return string(uuid) +} // Common interface for benchmarks and test error reporting type TestReporter interface { @@ -35,28 +70,6 @@ func verifyErrNil(err error, t TestReporter) { } } -// Legacy JSON support for benching -func (tx *Tx) initiateWrite() error { - if tx.done { - return cache.ErrTxDone - } - if tx.multi != true { - tx.multi = true - return tx.Send("MULTI") - } - return nil -} - -func (tx *Tx) initiateRead() error { - if tx.done { - return cache.ErrTxDone - } - if tx.multi == true { - panic("Tried to read during MULTI") - } - return nil -} - func createTestTxObj(t TestReporter) *Tx { testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) conf := &testConfig.Cache @@ -72,8 +85,6 @@ func createTestTxObj(t TestReporter) *Tx { }, } - //testDialFunc := makeDialFunc(&testConfig.Cache) - //testConn, err := testDialFunc() txObj := &Tx{ conf: testPool.conf, done: false, @@ -83,85 +94,53 @@ func createTestTxObj(t TestReporter) *Tx { verifyErrNil(err, t) // Test connection before returning - //txObj := Tx{&testConfig.Cache, false, false, testConn} _, err = txObj.Do("PING") verifyErrNil(err, t) return txObj } func createTestUser() models.User { - testUser := models.User{214, "32426b162be0bce5428e7e36afaf734ae5afb355", 1.01, 1.0, 4, 2, 7} + testUser := models.User{createTestUserID(), createTestPasskey(), 1.01, 1.0, 4, 2, 7} return testUser } -func createSeeders() []models.Peer { - testSeeders := make([]models.Peer, 4) - testSeeders[0] = models.Peer{"testPeerID0", 57005, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - testSeeders[1] = models.Peer{"testPeerID1", 10101, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - testSeeders[2] = models.Peer{"testPeerID2", 29890, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - testSeeders[3] = models.Peer{"testPeerID3", 65261, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - return testSeeders +func createTestPeer(userID uint64, torrentID uint64) models.Peer { + + return models.Peer{createTestPeerID(), userID, torrentID, "127.0.0.1", 6889, 1024, 3000, 4200, 11} } -func createLeechers() []models.Peer { - testLeechers := make([]models.Peer, 1) - testLeechers[0] = models.Peer{"testPeerID", 11111, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - return testLeechers +func createTestPeers(torrentID uint64, num int) map[string]models.Peer { + testPeers := make(map[string]models.Peer) + for i := 0; i < num; i++ { + tempPeer := createTestPeer(createTestUserID(), torrentID) + testPeers[tempPeer.ID] = tempPeer + } + return testPeers } func createTestTorrent() models.Torrent { - testSeeders := createSeeders() - testLeechers := createLeechers() + torrentInfohash := createTestInfohash() + torrentID := createTestTorrentID() - seeders := make(map[string]models.Peer) - for i := range testSeeders { - seeders[testSeeders[i].ID] = testSeeders[i] - } + testSeeders := createTestPeers(torrentID, 4) + testLeechers := createTestPeers(torrentID, 2) - leechers := make(map[string]models.Peer) - for i := range testLeechers { - leechers[testLeechers[i].ID] = testLeechers[i] - } - - testTorrent := models.Torrent{48879, sample_infohash, true, seeders, leechers, 11, 0.0, 0.0, 0} + testTorrent := models.Torrent{torrentID, torrentInfohash, true, testSeeders, testLeechers, 11, 0.0, 0.0, 0} return testTorrent } -func ExampleRedisTypeSchemaRemoveSeeder(torrent *models.Torrent, peer *models.Peer, t TestReporter) { - testTx := createTestTxObj(t) - setkey := testTx.conf.Prefix + "torrent:" + torrent.Infohash + ":seeders" - reply, err := redis.Int(testTx.Do("SREM", setkey, *peer)) - if reply == 0 { - t.Errorf("remove %v failed", *peer) - } - verifyErrNil(err, t) -} - -func ExampleRedisTypesSchemaFindUser(passkey string, t TestReporter) (*models.User, bool) { - testTx := createTestTxObj(t) - hashkey := testTx.conf.Prefix + UserPrefix + passkey - userVals, err := redis.Strings(testTx.Do("HVALS", hashkey)) - if len(userVals) == 0 { - return nil, false - } - verifyErrNil(err, t) - compareUser, err := createUser(userVals) - verifyErrNil(err, t) - return compareUser, true -} - func TestFindUserSuccess(t *testing.T) { testUser := createTestUser() testTx := createTestTxObj(t) - hashkey := testTx.conf.Prefix + UserPrefix + sample_passkey + hashkey := testTx.conf.Prefix + UserPrefix + testUser.Passkey _, err := testTx.Do("DEL", hashkey) verifyErrNil(err, t) err = testTx.AddUser(&testUser) verifyErrNil(err, t) - compareUser, exists := ExampleRedisTypesSchemaFindUser(sample_passkey, t) + compareUser, exists := ExampleRedisTypesSchemaFindUser(testUser.Passkey, t) if !exists { t.Error("User not found!") @@ -183,11 +162,11 @@ func TestAddGetPeers(t *testing.T) { testTx := createTestTxObj(t) testTorrent := createTestTorrent() - setkey := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash + ":seeders" + setkey := testTx.conf.Prefix + SeederPrefix + strconv.FormatUint(testTorrent.ID, 36) testTx.Do("DEL", setkey) - testTx.addPeers(testTorrent.Infohash, testTorrent.Seeders, ":seeders") - peerMap, err := testTx.getPeers(sample_infohash, ":seeders") + testTx.addPeers(testTorrent.Seeders, SeederPrefix) + peerMap, err := testTx.getPeers(testTorrent.ID, SeederPrefix) if err != nil { t.Error(err) } else if len(peerMap) != len(testTorrent.Seeders) { @@ -195,67 +174,7 @@ func TestAddGetPeers(t *testing.T) { } } -func BenchmarkRedisTypesSchemaRemoveSeeder(b *testing.B) { - for bCount := 0; bCount < b.N; bCount++ { - // Ensure that remove completes successfully, - // even if it doesn't impact the performance - b.StopTimer() - testTx := createTestTxObj(b) - testTorrent := createTestTorrent() - setkey := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash + ":seeders" - testSeeders := createSeeders() - reply, err := redis.Int(testTx.Do("SADD", setkey, - testSeeders[0], - testSeeders[1], - testSeeders[2], - testSeeders[3])) - - if reply == 0 { - b.Log("no keys added!") - } - verifyErrNil(err, b) - b.StartTimer() - - ExampleRedisTypeSchemaRemoveSeeder(&testTorrent, &testSeeders[2], b) - } -} - -func BenchmarkRedisTypesSchemaFindUser(b *testing.B) { - - // Ensure successful user find ( a failed lookup may have different performance ) - b.StopTimer() - testUser := createTestUser() - testTx := createTestTxObj(b) - hashkey := testTx.conf.Prefix + UserPrefix + sample_passkey - reply, err := testTx.Do("HMSET", hashkey, - "id", testUser.ID, - "passkey", testUser.Passkey, - "up_multiplier", testUser.UpMultiplier, - "down_multiplier", testUser.DownMultiplier, - "slots", testUser.Slots, - "slots_used", testUser.SlotsUsed) - - if reply == nil { - b.Log("no hash fields added!") - } - verifyErrNil(err, b) - b.StartTimer() - - for bCount := 0; bCount < b.N; bCount++ { - - compareUser, exists := ExampleRedisTypesSchemaFindUser(sample_passkey, b) - - b.StopTimer() - if !exists { - b.Error("User not found!") - } - if testUser != *compareUser { - b.Errorf("user mismatch: %v vs. %v", compareUser, testUser) - } - b.StartTimer() - } -} - +// Legacy tests func TestReadAfterWrite(t *testing.T) { // Test requires panic defer func() { diff --git a/cache/redis/tx_test.go b/cache/redis/tx_test.go new file mode 100644 index 0000000..a82b771 --- /dev/null +++ b/cache/redis/tx_test.go @@ -0,0 +1,190 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package redis + +import ( + "fmt" + "math/rand" + "os" + "testing" + "time" + + "github.com/pushrax/chihaya/cache" + "github.com/pushrax/chihaya/config" +) + +func panicErrNil(err error) { + if err != nil { + fmt.Println(err) + panic(err) + } +} + +func createTestTx() cache.Tx { + testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) + panicErrNil(err) + conf := &testConfig.Cache + + testPool, err := cache.Open(conf) + panicErrNil(err) + + txObj, err := testPool.Get() + panicErrNil(err) + + return txObj +} + +func TestUser(t *testing.T) { + tx := createTestTx() + testUser1 := createTestUser() + testUser2 := createTestUser() + + panicErrNil(tx.AddUser(&testUser1)) + foundUser, found, err := tx.FindUser(testUser1.Passkey) + panicErrNil(err) + if !found { + t.Error("user not found") + } + if *foundUser != testUser1 { + t.Error("found user mismatch") + } + + foundUser, found, err = tx.FindUser(testUser2.Passkey) + panicErrNil(err) + if found { + t.Error("user found") + } + + err = tx.RemoveUser(&testUser1) + panicErrNil(err) + foundUser, found, err = tx.FindUser(testUser1.Passkey) + panicErrNil(err) + if found { + t.Error("removed user found") + } +} + +func TestTorrent(t *testing.T) { + tx := createTestTx() + testTorrent1 := createTestTorrent() + testTorrent2 := createTestTorrent() + + panicErrNil(tx.AddTorrent(&testTorrent1)) + foundTorrent, found, err := tx.FindTorrent(testTorrent1.Infohash) + panicErrNil(err) + if !found { + t.Error("torrent not found") + } + // Incomplete comparison as maps cannot be compared + if foundTorrent.Infohash != testTorrent1.Infohash { + t.Error("found torrent mismatch") + } + foundTorrent, found, err = tx.FindTorrent(testTorrent2.Infohash) + panicErrNil(err) + if found { + t.Error("torrent found") + } + + panicErrNil(tx.RemoveTorrent(&testTorrent1)) + foundTorrent, found, err = tx.FindTorrent(testTorrent1.Infohash) + panicErrNil(err) + if found { + t.Error("removed torrent found") + } +} +func TestClient(t *testing.T) { + tx := createTestTx() + testPeerID1 := "-lt0D30-" + testPeerID2 := "TIX0192" + + panicErrNil(tx.WhitelistClient(testPeerID1)) + found, err := tx.ClientWhitelisted(testPeerID1) + panicErrNil(err) + if !found { + t.Error("peerID not found") + } + + found, err = tx.ClientWhitelisted(testPeerID2) + panicErrNil(err) + if found { + t.Error("peerID found") + } + + panicErrNil(tx.UnWhitelistClient(testPeerID1)) + found, err = tx.ClientWhitelisted(testPeerID1) + panicErrNil(err) + if found { + t.Error("removed peerID found") + } +} + +func TestPeers(t *testing.T) { + tx := createTestTx() + + // Randomly generated strings would be safter to test with + testTorrent1 := createTestTorrent() + testTorrent2 := createTestTorrent() + foundTorrent, found, err := tx.FindTorrent(testTorrent1.Infohash) + panicErrNil(err) + if found { + testTorrent1 = *foundTorrent + } else { + panicErrNil(tx.AddTorrent(&testTorrent1)) + } + foundTorrent, found, err = tx.FindTorrent(testTorrent2.Infohash) + panicErrNil(err) + if found { + testTorrent2 = *foundTorrent + } else { + panicErrNil(tx.AddTorrent(&testTorrent2)) + } + + testSeeder1 := createTestPeer(createTestUserID(), testTorrent1.ID) + testSeeder2 := createTestPeer(createTestUserID(), testTorrent2.ID) + if testSeeder1 == testSeeder2 { + t.Error("seeders should not be equal") + } + + if _, exists := testTorrent1.Seeders[testSeeder1.ID]; exists { + t.Log("seeder aleady exists, removing") + err := tx.RemoveSeeder(&testTorrent1, &testSeeder1) + if err != nil { + t.Error(err) + } + if _, exists := testTorrent1.Seeders[testSeeder1.ID]; exists { + t.Error("Remove seeder failed") + } + } + + panicErrNil(tx.AddSeeder(&testTorrent1, &testSeeder1)) + if seeder1, exists := testTorrent1.Seeders[testSeeder1.ID]; !exists { + t.Error("seeder not added locally") + } else if seeder1 != testSeeder1 { + t.Error("seeder changed") + } + foundTorrent, found, err = tx.FindTorrent(testTorrent1.Infohash) + panicErrNil(err) + if !found { + t.Error("torrent should exist") + } + if seeder1, exists := foundTorrent.Seeders[testSeeder1.ID]; !exists { + t.Error("seeder not added") + } else if seeder1 != testSeeder1 { + t.Error("seeder changed") + } + + // Update a seeder, set it, then check to make sure it updated + r := rand.New(rand.NewSource(time.Now().UnixNano())) + testSeeder1.Downloaded += uint64(r.Int63()) + panicErrNil(tx.SetSeeder(&testTorrent1, &testSeeder1)) + foundTorrent, found, err = tx.FindTorrent(testTorrent1.Infohash) + panicErrNil(err) + if seeder1, exists := foundTorrent.Seeders[testSeeder1.ID]; !exists { + t.Error("seeder not added") + } else if seeder1 != testSeeder1 { + t.Errorf("seeder changed from %v to %v", testSeeder1, seeder1) + } + +} diff --git a/server/announce.go b/server/announce.go index 1d7a142..150ca19 100644 --- a/server/announce.go +++ b/server/announce.go @@ -197,12 +197,6 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { peer.IP = ip } - // If the transaction failed, retry - err = tx.Commit() - if err != nil { - continue - } - // Generate the response seedCount := len(torrent.Seeders) leechCount := len(torrent.Leechers)