diff --git a/cache/cache.go b/cache/cache.go index 569d9c7..bcd2a4d 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -7,7 +7,6 @@ package cache import ( - "errors" "fmt" "github.com/pushrax/chihaya/config" @@ -15,9 +14,7 @@ import ( ) var ( - drivers = make(map[string]Driver) - ErrTxDone = errors.New("cache: Transaction has already been committed or rolled back") - ErrTxConflict = errors.New("cache: Commit interrupted, update transaction and repeat") + drivers = make(map[string]Driver) ) type Driver interface { @@ -57,15 +54,9 @@ type Pool interface { Get() (Tx, error) } -// Tx represents an in-progress data store transaction. -// A transaction must end with a call to Commit or Rollback. -// -// After a call to Commit or Rollback, all operations on the -// transaction must fail with ErrTxDone. +// The transmit object is the interface to add, remove and modify +// data in the cache type Tx interface { - Commit() error - Rollback() error - // Reads FindUser(passkey string) (*models.User, bool, error) FindTorrent(infohash string) (*models.Torrent, bool, error) @@ -82,4 +73,13 @@ type Tx interface { SetSeeder(t *models.Torrent, p *models.Peer) error IncrementSlots(u *models.User) error DecrementSlots(u *models.User) error + LeecherFinished(t *models.Torrent, p *models.Peer) error + + // Priming / Testing + AddTorrent(t *models.Torrent) error + RemoveTorrent(t *models.Torrent) error + AddUser(u *models.User) error + RemoveUser(u *models.User) error + WhitelistClient(peerID string) error + UnWhitelistClient(peerID string) error } diff --git a/cache/redis/redis.go b/cache/redis/redis.go index 21ae0ea..3b70069 100644 --- a/cache/redis/redis.go +++ b/cache/redis/redis.go @@ -4,18 +4,28 @@ // Package redis implements the storage interface for a BitTorrent tracker. // -// The client whitelist is represented as a set with the key name "whitelist" -// with an optional prefix. Torrents and users are represented as hashes. -// Torrents' keys are named "torrent:" with an optional prefix. -// Users' keys are named "user:" with an optional prefix. The -// seeders and leechers attributes of torrent hashes are strings that represent -// the key for those hashes within redis. This is done because redis cannot -// nest their hash data type. +// This interface is configured by a config.DataStore. +// To get a handle to this interface, call New on the initialized driver and +// then Get() on returned the cache.Pool. +// +// Torrents, Users, and Peers are all stored in Redis hash types. All Redis +// keys can have an optional prefix specified during configuration. +// The relationship between Torrents and Peers is a Redis set that holds +// the peers' keys. There are two sets per torrent, one for seeders and +// one for leechers. The Redis sets are keyed by type and the torrent's ID. +// +// The whitelist is a Redis set with the key "whitelist" that holds client IDs. +// Operations on the whitelist do not parse the client ID from a peer ID. +// +// Some functions in this interface are not atomic. The data being modified may +// change while the function is executing. This will not cause the function to +// return an error; instead the function will complete and return valid, stale +// data. package redis import ( - "encoding/json" - "strings" + "errors" + "strconv" "time" "github.com/garyburd/redigo/redis" @@ -25,8 +35,22 @@ import ( "github.com/pushrax/chihaya/models" ) +var ( + ErrCreateUser = errors.New("redis: Incorrect reply length for user") + ErrCreateTorrent = errors.New("redis: Incorrect reply length for torrent") + ErrCreatePeer = errors.New("redis: Incorrect reply length for peer") + ErrMarkActive = errors.New("redis: Torrent doesn't exist") + + SeedersPrefix = "seeders:" + LeechersPrefix = "leechers:" + TorrentPrefix = "torrent:" + UserPrefix = "user:" + PeerPrefix = "peer:" +) + type driver struct{} +// New creates and returns a cache.Pool. func (d *driver) New(conf *config.DataStore) cache.Pool { return &Pool{ conf: conf, @@ -39,6 +63,7 @@ func (d *driver) New(conf *config.DataStore) cache.Pool { } } +// makeDialFunc configures and returns a new redis.Dial struct using the specified configuration. func makeDialFunc(conf *config.DataStore) func() (redis.Conn, error) { return func() (conn redis.Conn, err error) { conn, err = redis.Dial(conf.Network, conf.Host+":"+conf.Port) @@ -49,6 +74,7 @@ func makeDialFunc(conf *config.DataStore) func() (redis.Conn, error) { } } +// testOnBorrow pings the Redis instance func testOnBorrow(c redis.Conn, t time.Time) error { _, err := c.Do("PING") return err @@ -64,32 +90,17 @@ func (p *Pool) Close() error { } func (p *Pool) Get() (cache.Tx, error) { - return &Tx{ - conf: p.conf, - done: false, - multi: false, - Conn: p.pool.Get(), - }, nil + retTx := &Tx{ + conf: p.conf, + done: false, + Conn: p.pool.Get(), + } + return retTx, nil } -// Tx represents a transaction for Redis with one gotcha: -// all reads must be done prior to any writes. Writes will -// check if the MULTI command has been sent to redis and will -// send it if it hasn't. -// -// Internally a transaction looks like: -// WATCH keyA -// GET keyA -// WATCH keyB -// GET keyB -// MULTI -// SET keyA -// SET keyB -// EXEC type Tx struct { - conf *config.DataStore - done bool - multi bool + conf *config.DataStore + done bool redis.Conn } @@ -101,218 +112,578 @@ func (tx *Tx) close() { tx.Conn.Close() } -func (tx *Tx) initiateWrite() error { - if tx.done { - return cache.ErrTxDone +// createUser takes a string slice of length 14 and returns a pointer to a new +// models.User or an error. +// This function is used to create a user from a Redis hash response(HGETALL). +// The order of strings the in the slice must follow the pattern: +// [, , , , ...] +// If the field value string cannot be converted to the correct type, +// createUser will return a nil user and the conversion error. +func createUser(userVals []string) (*models.User, error) { + if len(userVals) != 14 { + return nil, ErrCreateUser } - if tx.multi != true { - tx.multi = true - return tx.Send("MULTI") - } - return nil -} - -func (tx *Tx) initiateRead() error { - if tx.done { - return cache.ErrTxDone - } - if tx.multi == true { - panic("Tried to read during MULTI") - } - return nil -} - -func (tx *Tx) Commit() error { - if tx.done { - return cache.ErrTxDone - } - if tx.multi == true { - execResponse, err := tx.Do("EXEC") - if execResponse == nil { - tx.multi = false - return cache.ErrTxConflict + var user models.User + var err error + for index, userString := range userVals { + switch userString { + case "id": + user.ID, err = strconv.ParseUint(userVals[index+1], 10, 64) + case "passkey": + user.Passkey = userVals[index+1] + case "up_multiplier": + user.UpMultiplier, err = strconv.ParseFloat(userVals[index+1], 64) + case "down_multiplier": + user.DownMultiplier, err = strconv.ParseFloat(userVals[index+1], 64) + case "slots": + user.Slots, err = strconv.ParseInt(userVals[index+1], 10, 64) + case "slots_used": + user.SlotsUsed, err = strconv.ParseInt(userVals[index+1], 10, 64) + case "snatches": + user.Snatches, err = strconv.ParseUint(userVals[index+1], 10, 64) } + if err != nil { + return nil, err + } + } + return &user, nil +} + +// createTorrent takes a string slice of length 14 and returns a pointer to a new models.Torrent +// or an error. +// This function can be used to create a torrent from a Redis hash response(HGETALL). +// The order of strings the in the slice must follow the pattern: +// [, , , , ...] +// This function calls multiple redis commands, it's not internally atomic. +// If the field values cannot be converted to the correct type, +// createTorrent will return a nil user and the conversion error. +// After converting the torrent fields, the seeders and leechers are populated by redis.getPeers +func (tx *Tx) createTorrent(torrentVals []string) (*models.Torrent, error) { + if len(torrentVals) != 14 { + return nil, ErrCreateTorrent + } + var torrent models.Torrent + var err error + for index, torrentString := range torrentVals { + switch torrentString { + case "id": + torrent.ID, err = strconv.ParseUint(torrentVals[index+1], 10, 64) + case "infohash": + torrent.Infohash = torrentVals[index+1] + case "active": + torrent.Active, err = strconv.ParseBool(torrentVals[index+1]) + case "snatches": + torrent.Snatches, err = strconv.ParseUint(torrentVals[index+1], 10, 32) + case "up_multiplier": + torrent.UpMultiplier, err = strconv.ParseFloat(torrentVals[index+1], 64) + case "down_multiplier": + torrent.DownMultiplier, err = strconv.ParseFloat(torrentVals[index+1], 64) + case "last_action": + torrent.LastAction, err = strconv.ParseInt(torrentVals[index+1], 10, 64) + } + if err != nil { + return nil, err + } + } + torrent.Seeders, err = tx.getPeers(torrent.ID, SeedersPrefix) + if err != nil { + return nil, err + } + torrent.Leechers, err = tx.getPeers(torrent.ID, LeechersPrefix) + if err != nil { + return nil, err + } + return &torrent, nil +} + +// setPeer writes or overwrites peer information, stored as a Redis hash. +// The hash fields names are the same as the JSON tags on the models.Peer struct. +func (tx *Tx) setPeer(peer *models.Peer) error { + hashKey := tx.conf.Prefix + getPeerHashKey(peer) + _, err := tx.Do("HMSET", hashKey, + "id", peer.ID, + "user_id", peer.UserID, + "torrent_id", peer.TorrentID, + "ip", peer.IP, + "port", peer.Port, + "uploaded", peer.Uploaded, + "downloaded", peer.Downloaded, + "left", peer.Left, + "last_announce", peer.LastAnnounce) + + return err +} + +// removePeer removes the given peer from the specified peer set (seeder or leecher), +// and removes the peer information. +// This function calls multiple redis commands, it's not internally atomic. +// This function will not return an error if the peer to remove doesn't exist. +func (tx *Tx) removePeer(peer *models.Peer, peerTypePrefix string) error { + setKey := tx.conf.Prefix + getPeerSetKey(peerTypePrefix, peer) + _, err := tx.Do("SREM", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + hashKey := tx.conf.Prefix + getPeerHashKey(peer) + _, err = tx.Do("DEL", hashKey) + return nil +} + +// removePeers removes all peers from specified peer set (seeders or leechers), +// removes the peer information, and then removes the associated peer from the given map. +// This function will not return an error if the peer to remove doesn't exist. +// This function will only delete the peer set if all the individual peer deletions were successful +// This function calls multiple redis commands, it's not internally atomic. +func (tx *Tx) removePeers(torrentID uint64, peers map[string]models.Peer, peerTypePrefix string) error { + for _, peer := range peers { + hashKey := tx.conf.Prefix + getPeerHashKey(&peer) + _, err := tx.Do("DEL", hashKey) if err != nil { return err } + delete(peers, models.PeerMapKey(&peer)) } - tx.close() - return nil -} - -func (tx *Tx) Rollback() error { - if tx.done { - return cache.ErrTxDone - } - // Undoes watches and multi - if _, err := tx.Do("DISCARD"); err != nil { + setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) + _, err := tx.Do("DEL", setKey) + if err != nil { return err } - tx.multi = false - tx.close() return nil } -func (tx *Tx) FindUser(passkey string) (*models.User, bool, error) { - err := tx.initiateRead() - if err != nil { - return nil, false, err - } - - key := tx.conf.Prefix + "user:" + passkey - _, err = tx.Do("WATCH", key) - if err != nil { - return nil, false, err - } - reply, err := redis.String(tx.Do("GET", key)) - if err != nil { - if err == redis.ErrNil { - return nil, false, nil - } - return nil, false, err - } - - user := &models.User{} - err = json.NewDecoder(strings.NewReader(reply)).Decode(user) - if err != nil { - return nil, true, err - } - return user, true, nil +// getPeerHashKey returns a string with the peer.ID, encoded peer.UserID, and encoded peer.TorrentID, +// concatenated and delimited by colons +// This key corresponds to a Redis hash type with fields containing a peer's data. +// The peer hashkey relies on the combination of peerID, userID, and torrentID being unique. +func getPeerHashKey(peer *models.Peer) string { + return peer.ID + ":" + strconv.FormatUint(peer.UserID, 36) + ":" + strconv.FormatUint(peer.TorrentID, 36) } -func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) { - err := tx.initiateRead() - if err != nil { - return nil, false, err - } - - key := tx.conf.Prefix + "torrent:" + infohash - _, err = tx.Do("WATCH", key) - if err != nil { - return nil, false, err - } - reply, err := redis.String(tx.Do("GET", key)) - if err != nil { - if err == redis.ErrNil { - return nil, false, nil - } - return nil, false, err - } - - torrent := &models.Torrent{} - err = json.NewDecoder(strings.NewReader(reply)).Decode(torrent) - if err != nil { - return nil, true, err - } - return torrent, true, nil +// getPeerSetKey returns a string that is the peer's encoded torrentID appended to the typePrefix +// This key corresponds to a torrent's pool of leechers or seeders +func getPeerSetKey(typePrefix string, peer *models.Peer) string { + return typePrefix + strconv.FormatUint(peer.TorrentID, 36) } -func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) { - err = tx.initiateRead() - if err != nil { - return false, err +// addPeers adds each peer's key to the specified peer set and saves the peer's information. +// This function will not return an error if the peer already exists in the set. +// This function calls multiple redis commands, it's not internally atomic. +func (tx *Tx) addPeers(peers map[string]models.Peer, peerTypePrefix string) error { + for _, peer := range peers { + setKey := tx.conf.Prefix + getPeerSetKey(peerTypePrefix, &peer) + _, err := tx.Do("SADD", setKey, getPeerHashKey(&peer)) + if err != nil { + return err + } + tx.setPeer(&peer) } + return nil +} - key := tx.conf.Prefix + "whitelist" - _, err = tx.Do("WATCH", key) - if err != nil { - return +// createPeer takes a slice of length 9 and returns a pointer to a new models.Peer or an error. +// This function is used to create a peer from a Redis hash response(HGETALL). +// The order of strings the in the slice must follow the pattern: +// [, , , , ...] +// If the field value string cannot be converted to the correct type, +// the function will return a nil peer and the conversion error. +func createPeer(peerVals []string) (*models.Peer, error) { + if len(peerVals) != 18 { + return nil, ErrCreatePeer } + var peer models.Peer + var err error + for index, peerString := range peerVals { + switch peerString { + case "id": + peer.ID = peerVals[index+1] + case "user_id": + peer.UserID, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "torrent_id": + peer.TorrentID, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "ip": + peer.IP = peerVals[index+1] + case "port": + peer.Port, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "uploaded": + peer.Uploaded, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "downloaded": + peer.Downloaded, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "left": + peer.Left, err = strconv.ParseUint(peerVals[index+1], 10, 64) + case "last_announce": + peer.LastAnnounce, err = strconv.ParseInt(peerVals[index+1], 10, 64) + } + if err != nil { + return nil, err + } + } + return &peer, nil +} - // TODO +// getPeers returns a map of peers from a specified torrent's peer set(seeders or leechers). +// This is a multiple action command, it's not internally atomic. +func (tx *Tx) getPeers(torrentID uint64, peerTypePrefix string) (peers map[string]models.Peer, err error) { + peers = make(map[string]models.Peer) + setKey := tx.conf.Prefix + peerTypePrefix + strconv.FormatUint(torrentID, 36) + peerStrings, err := redis.Strings(tx.Do("SMEMBERS", setKey)) + if err != nil { + return nil, err + } + // Keys map to peer objects stored in hashes + for _, peerHashKey := range peerStrings { + hashKey := tx.conf.Prefix + peerHashKey + peerVals, err := redis.Strings(tx.Do("HGETALL", hashKey)) + if err != nil { + return nil, err + } + if len(peerVals) == 0 { + continue + } + peer, err := createPeer(peerVals) + if err != nil { + return nil, err + } + peers[models.PeerMapKey(peer)] = *peer + } return } +// AddTorrent writes/overwrites torrent information and saves peers from both peer sets. +// The hash fields names are the same as the JSON tags on the models.Torrent struct. +// This is a multiple action command, it's not internally atomic. +func (tx *Tx) AddTorrent(t *models.Torrent) error { + hashkey := tx.conf.Prefix + TorrentPrefix + t.Infohash + _, err := tx.Do("HMSET", hashkey, + "id", t.ID, + "infohash", t.Infohash, + "active", t.Active, + "snatches", t.Snatches, + "up_multiplier", t.UpMultiplier, + "down_multiplier", t.DownMultiplier, + "last_action", t.LastAction) + if err != nil { + return err + } + + err = tx.addPeers(t.Seeders, SeedersPrefix) + if err != nil { + return err + } + err = tx.addPeers(t.Leechers, LeechersPrefix) + if err != nil { + return err + } + return nil +} + +// RemoveTorrent deletes the torrent's Redis hash and then deletes all peers. +// This function will not return an error if the torrent has already been removed. +// This is a multiple action command, it's not internally atomic. +func (tx *Tx) RemoveTorrent(t *models.Torrent) error { + hashkey := tx.conf.Prefix + TorrentPrefix + t.Infohash + _, err := tx.Do("DEL", hashkey) + if err != nil { + return err + } + // Remove seeders and leechers as well + err = tx.removePeers(t.ID, t.Seeders, SeedersPrefix) + if err != nil { + return err + } + err = tx.removePeers(t.ID, t.Leechers, LeechersPrefix) + if err != nil { + return err + } + return nil +} + +// AddUser writes/overwrites user information to a Redis hash. +// The hash fields names are the same as the JSON tags on the models.user struct. +func (tx *Tx) AddUser(u *models.User) error { + hashkey := tx.conf.Prefix + UserPrefix + u.Passkey + _, err := tx.Do("HMSET", hashkey, + "id", u.ID, + "passkey", u.Passkey, + "up_multiplier", u.UpMultiplier, + "down_multiplier", u.DownMultiplier, + "slots", u.Slots, + "slots_used", u.SlotsUsed, + "snatches", u.Snatches) + if err != nil { + return err + } + return nil +} + +// RemoveUser removes the user's hash from Redis. +// This function does not return an error if the user doesn't exist. +func (tx *Tx) RemoveUser(u *models.User) error { + hashkey := tx.conf.Prefix + UserPrefix + u.Passkey + _, err := tx.Do("DEL", hashkey) + if err != nil { + return err + } + return nil +} + +// FindUser returns a pointer to a new user struct and true if the user exists, +// or nil and false if the user doesn't exist. +// This function does not return an error if the torrent doesn't exist. +func (tx *Tx) FindUser(passkey string) (*models.User, bool, error) { + hashkey := tx.conf.Prefix + UserPrefix + passkey + // Consider using HGETALL instead of HVALS here for robustness + userStrings, err := redis.Strings(tx.Do("HGETALL", hashkey)) + if err != nil { + return nil, false, err + } else if len(userStrings) == 0 { + return nil, false, nil + } + foundUser, err := createUser(userStrings) + if err != nil { + return nil, false, err + } + return foundUser, true, nil +} + +// FindTorrent returns a pointer to a new torrent struct and true if the torrent exists, +// or nil and false if the torrent doesn't exist. +// This is a multiple action command, it's not internally atomic. +func (tx *Tx) FindTorrent(infohash string) (*models.Torrent, bool, error) { + hashkey := tx.conf.Prefix + TorrentPrefix + infohash + torrentStrings, err := redis.Strings(tx.Do("HGETALL", hashkey)) + if err != nil { + return nil, false, err + } else if len(torrentStrings) == 0 { + return nil, false, nil + } + + foundTorrent, err := tx.createTorrent(torrentStrings) + if err != nil { + return nil, false, err + } + return foundTorrent, true, nil +} + +// ClientWhitelisted returns true if the ClientID exists in the Client set. +// This function does not parse the client ID from the peer ID. +// The clientID must match exactly to a member of the set. +func (tx *Tx) ClientWhitelisted(peerID string) (exists bool, err error) { + key := tx.conf.Prefix + "whitelist" + return redis.Bool(tx.Do("SISMEMBER", key, peerID)) +} + +// WhitelistClient adds a client ID to the client whitelist set. +// This function does not return an error if the client ID is already in the set. +func (tx *Tx) WhitelistClient(peerID string) error { + key := tx.conf.Prefix + "whitelist" + _, err := tx.Do("SADD", key, peerID) + return err +} + +// UnWhitelistClient removes a client ID from the client whitelist set +// This function does not return an error if the client ID is not in the set. +func (tx *Tx) UnWhitelistClient(peerID string) error { + key := tx.conf.Prefix + "whitelist" + _, err := tx.Do("SREM", key, peerID) + return err +} + +// RecordSnatch increments the snatch counter on the torrent and user by one. +// This modifies the arguments as well as the hash field in Redis. +// This is a multiple action command, it's not internally atomic. func (tx *Tx) RecordSnatch(user *models.User, torrent *models.Torrent) error { - if err := tx.initiateWrite(); err != nil { + + torrentKey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash + snatchCount, err := redis.Int(tx.Do("HINCRBY", torrentKey, "snatches", 1)) + if err != nil { return err } + torrent.Snatches = uint64(snatchCount) - // TODO + userKey := tx.conf.Prefix + UserPrefix + user.Passkey + snatchCount, err = redis.Int(tx.Do("HINCRBY", userKey, "snatches", 1)) + if err != nil { + return err + } + user.Snatches = uint64(snatchCount) return nil } -func (tx *Tx) MarkActive(t *models.Torrent) error { - if err := tx.initiateWrite(); err != nil { +// MarkActive sets the active field of the torrent to true. +// This modifies the argument as well as the hash field in Redis. +// This function will return ErrMarkActive if the torrent does not exist. +func (tx *Tx) MarkActive(torrent *models.Torrent) error { + hashkey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash + activeExists, err := redis.Int(tx.Do("HSET", hashkey, "active", true)) + if err != nil { return err } - - // TODO + torrent.Active = true + // HSET returns 1 if hash didn't exist before + if activeExists == 1 { + return ErrMarkActive + } return nil } -func (tx *Tx) AddLeecher(t *models.Torrent, p *models.Peer) error { - if err := tx.initiateWrite(); err != nil { +// MarkInactive sets the active field of the torrent to false. +// This modifies the argument as well as the hash field in Redis. +// This function will return ErrMarkActive if the torrent does not exist. +func (tx *Tx) MarkInactive(torrent *models.Torrent) error { + hashkey := tx.conf.Prefix + TorrentPrefix + torrent.Infohash + activeExists, err := redis.Int(tx.Do("HSET", hashkey, "active", false)) + if err != nil { return err } - - // TODO + torrent.Active = false + // HSET returns 1 if hash didn't exist before + if activeExists == 1 { + // Clean-up incomplete torrent + _, err = tx.Do("DEL", hashkey) + if err != nil { + return err + } + return ErrMarkActive + } return nil } +// AddLeecher adds a new peer to a torrent's leecher set. +// This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. +// This function does not return an error if the leecher already exists. +// This is a multiple action command, it's not internally atomic. +func (tx *Tx) AddLeecher(torrent *models.Torrent, peer *models.Peer) error { + setKey := tx.conf.Prefix + LeechersPrefix + strconv.FormatUint(torrent.ID, 36) + _, err := tx.Do("SADD", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + err = tx.setPeer(peer) + if err != nil { + return err + } + if torrent.Leechers == nil { + torrent.Leechers = make(map[string]models.Peer) + } + torrent.Leechers[models.PeerMapKey(peer)] = *peer + return nil +} + +// SetLeecher updates a torrent's leecher. +// This modifies the torrent argument, as well as the peer's hash in Redis. +// Setting assumes that the peer is already a leecher, and only needs to be updated. +// This function does not return an error if the leecher does not exist or is not in the torrent's leecher set. func (tx *Tx) SetLeecher(t *models.Torrent, p *models.Peer) error { - if err := tx.initiateWrite(); err != nil { + err := tx.setPeer(p) + if err != nil { return err } - - // TODO + t.Leechers[models.PeerMapKey(p)] = *p return nil } +// RemoveLeecher removes the given peer from a torrent's leecher set. +// This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. +// This function does not return an error if the peer doesn't exist, or is not in the set. func (tx *Tx) RemoveLeecher(t *models.Torrent, p *models.Peer) error { - if err := tx.initiateWrite(); err != nil { + err := tx.removePeer(p, LeechersPrefix) + if err != nil { return err } - - // TODO + delete(t.Leechers, models.PeerMapKey(p)) return nil } -func (tx *Tx) AddSeeder(t *models.Torrent, p *models.Peer) error { - if err := tx.initiateWrite(); err != nil { +// LeecherFinished moves a peer's hashkey from a torrent's leecher set to the seeder set and updates the peer. +// This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. +// This function does not return an error if the peer doesn't exist or is not in the torrent's leecher set. +func (tx *Tx) LeecherFinished(torrent *models.Torrent, peer *models.Peer) error { + torrentIdKey := strconv.FormatUint(torrent.ID, 36) + seederSetKey := tx.conf.Prefix + SeedersPrefix + torrentIdKey + leecherSetKey := tx.conf.Prefix + LeechersPrefix + torrentIdKey + + _, err := tx.Do("SMOVE", leecherSetKey, seederSetKey, getPeerHashKey(peer)) + if err != nil { return err } + torrent.Seeders[models.PeerMapKey(peer)] = *peer + delete(torrent.Leechers, models.PeerMapKey(peer)) - // TODO + err = tx.setPeer(peer) + return err +} + +// AddSeeder adds a new peer to a torrent's seeder set. +// This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. +// This function does not return an error if the seeder already exists. +// This is a multiple action command, it's not internally atomic. +func (tx *Tx) AddSeeder(torrent *models.Torrent, peer *models.Peer) error { + setKey := tx.conf.Prefix + SeedersPrefix + strconv.FormatUint(torrent.ID, 36) + _, err := tx.Do("SADD", setKey, getPeerHashKey(peer)) + if err != nil { + return err + } + err = tx.setPeer(peer) + if err != nil { + return err + } + if torrent.Seeders == nil { + torrent.Seeders = make(map[string]models.Peer) + } + torrent.Seeders[models.PeerMapKey(peer)] = *peer return nil } +// SetSeeder updates a torrent's seeder. +// This modifies the torrent argument, as well as the peer's hash in Redis. +// Setting assumes that the peer is already a seeder, and only needs to be updated. +// This function does not return an error if the seeder does not exist or is not in the torrent's seeder set. func (tx *Tx) SetSeeder(t *models.Torrent, p *models.Peer) error { - if err := tx.initiateWrite(); err != nil { + err := tx.setPeer(p) + if err != nil { return err } - - // TODO + t.Seeders[models.PeerMapKey(p)] = *p return nil } +// RemoveSeeder removes the given peer from a torrent's seeder set. +// This modifies the torrent argument, as well as the torrent's set and peer's hash in Redis. +// This function does not return an error if the peer doesn't exist, or is not in the set. func (tx *Tx) RemoveSeeder(t *models.Torrent, p *models.Peer) error { - if err := tx.initiateWrite(); err != nil { + err := tx.removePeer(p, SeedersPrefix) + if err != nil { return err } - - // TODO + delete(t.Seeders, models.PeerMapKey(p)) return nil } +// IncrementSlots increment a user's Slots by one. +// This function modifies the argument as well as the hash field in Redis. func (tx *Tx) IncrementSlots(u *models.User) error { - if err := tx.initiateWrite(); err != nil { + hashkey := tx.conf.Prefix + UserPrefix + u.Passkey + slotCount, err := redis.Int(tx.Do("HINCRBY", hashkey, "slots", 1)) + if err != nil { return err } - - // TODO + u.Slots = int64(slotCount) return nil } +// IncrementSlots increment a user's Slots by one. +// This function modifies the argument as well as the hash field in Redis. func (tx *Tx) DecrementSlots(u *models.User) error { - if err := tx.initiateWrite(); err != nil { + hashkey := tx.conf.Prefix + UserPrefix + u.Passkey + slotCount, err := redis.Int(tx.Do("HINCRBY", hashkey, "slots", -1)) + if err != nil { return err } - - // TODO + u.Slots = int64(slotCount) return nil } +// init registers the redis driver func init() { cache.Register("redis", &driver{}) } diff --git a/cache/redis/redis_bench_test.go b/cache/redis/redis_bench_test.go new file mode 100644 index 0000000..74aa054 --- /dev/null +++ b/cache/redis/redis_bench_test.go @@ -0,0 +1,288 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package redis + +import ( + "math/rand" + "testing" + "time" +) + +func BenchmarkSuccessfulFindUser(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testUser := createTestUser() + panicOnErr(tx.AddUser(testUser)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + + foundUser, found, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + if !found { + b.Error("user not found", testUser) + } + if *foundUser != *testUser { + b.Error("found user mismatch", *foundUser, testUser) + } + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveUser(testUser)) + b.StartTimer() +} + +func BenchmarkFailedFindUser(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testUser := createTestUser() + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + + _, found, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + if found { + b.Error("user not found", testUser) + } + } +} + +func BenchmarkSuccessfulFindTorrent(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + + panicOnErr(tx.AddTorrent(testTorrent)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if !found { + b.Error("torrent not found", testTorrent) + } + // Incomplete comparison as maps make struct not nativly comparable + if foundTorrent.Infohash != testTorrent.Infohash { + b.Error("found torrent mismatch", foundTorrent, testTorrent) + } + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveTorrent(testTorrent)) + b.StartTimer() +} + +func BenchmarkFailFindTorrent(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if found { + b.Error("torrent found", foundTorrent) + } + } +} + +func BenchmarkSuccessfulClientWhitelisted(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testPeerID := "-lt0D30-" + panicOnErr(tx.WhitelistClient(testPeerID)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + found, err := tx.ClientWhitelisted(testPeerID) + panicOnErr(err) + if !found { + b.Error("peerID not found", testPeerID) + } + } + // Cleanup + b.StopTimer() + panicOnErr(tx.UnWhitelistClient(testPeerID)) + b.StartTimer() +} + +func BenchmarkFailClientWhitelisted(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testPeerID2 := "TIX0192" + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + found, err := tx.ClientWhitelisted(testPeerID2) + panicOnErr(err) + if found { + b.Error("peerID found", testPeerID2) + } + } +} + +func BenchmarkRecordSnatch(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + testUser := createTestUser() + panicOnErr(tx.AddTorrent(testTorrent)) + panicOnErr(tx.AddUser(testUser)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + panicOnErr(tx.RecordSnatch(testUser, testTorrent)) + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveTorrent(testTorrent)) + panicOnErr(tx.RemoveUser(testUser)) + b.StartTimer() +} + +func BenchmarkMarkActive(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + testTorrent.Active = false + panicOnErr(tx.AddTorrent(testTorrent)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + panicOnErr(tx.MarkActive(testTorrent)) + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveTorrent(testTorrent)) + b.StartTimer() +} + +func BenchmarkAddSeeder(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + b.StartTimer() + + panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveTorrent(testTorrent)) + b.StartTimer() +} + +func BenchmarkRemoveSeeder(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + tx.AddSeeder(testTorrent, testSeeder) + b.StartTimer() + + panicOnErr(tx.RemoveSeeder(testTorrent, testSeeder)) + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveTorrent(testTorrent)) + b.StartTimer() +} + +func BenchmarkSetSeeder(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testSeeder.Uploaded += uint64(r.Int63()) + b.StartTimer() + + tx.SetSeeder(testTorrent, testSeeder) + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveTorrent(testTorrent)) + b.StartTimer() +} + +func BenchmarkIncrementSlots(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testUser := createTestUser() + panicOnErr(tx.AddUser(testUser)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + panicOnErr(tx.IncrementSlots(testUser)) + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveUser(testUser)) + b.StartTimer() +} + +func BenchmarkLeecherFinished(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) + testLeecher.Left = 0 + b.StartTimer() + + panicOnErr(tx.LeecherFinished(testTorrent, testLeecher)) + } + // Cleanup + b.StopTimer() + panicOnErr(tx.RemoveTorrent(testTorrent)) + b.StartTimer() +} + +// This is a comparision to the Leecher finished function +func BenchmarkRemoveLeecherAddSeeder(b *testing.B) { + b.StopTimer() + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + b.StartTimer() + + for bCount := 0; bCount < b.N; bCount++ { + b.StopTimer() + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) + testLeecher.Left = 0 + b.StartTimer() + + panicOnErr(tx.RemoveLeecher(testTorrent, testLeecher)) + panicOnErr(tx.AddSeeder(testTorrent, testLeecher)) + } + // Cleanup + b.StopTimer() + tx.RemoveTorrent(testTorrent) + b.StartTimer() +} diff --git a/cache/redis/redis_test.go b/cache/redis/redis_test.go index f1a3cd4..f907a0d 100644 --- a/cache/redis/redis_test.go +++ b/cache/redis/redis_test.go @@ -5,44 +5,88 @@ package redis import ( - "encoding/json" - "math/rand" + "crypto/rand" + "fmt" + "io" "os" "strconv" - "strings" "testing" - "time" "github.com/garyburd/redigo/redis" - "github.com/pushrax/chihaya/cache" "github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/models" ) -// Maximum number of parallel retries; depends on system latency -const MAX_RETRIES = 9000 -const sample_infohash = "58c290f4ea1efb3adcb8c1ed2643232117577bcd" -const sample_passkey = "32426b162be0bce5428e7e36afaf734ae5afb355" +var ( + testTorrentIDChannel chan uint64 + testUserIDChannel chan uint64 + testPeerIDChannel chan int +) -// Common interface for benchmarks and test error reporting -type TestReporter interface { - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - Log(args ...interface{}) - Logf(format string, args ...interface{}) +func init() { + testTorrentIDChannel = make(chan uint64, 100) + testUserIDChannel = make(chan uint64, 100) + testPeerIDChannel = make(chan int, 100) + // Sync access to ID counter with buffered global channels + go func() { + for i := 0; ; i++ { + testTorrentIDChannel <- uint64(i) + } + }() + go func() { + for i := 0; ; i++ { + testUserIDChannel <- uint64(i) + } + }() + go func() { + for i := 0; ; i++ { + testPeerIDChannel <- i + } + }() } -func verifyErrNil(err error, t TestReporter) { +func createTestTorrentID() uint64 { + return <-testTorrentIDChannel +} + +func createTestUserID() uint64 { + return <-testUserIDChannel +} + +func createTestPeerID() string { + return "-testPeerID-" + strconv.Itoa(<-testPeerIDChannel) +} + +func createTestInfohash() string { + uuid := make([]byte, 40) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + panic(err) + } + return string(uuid) +} + +func createTestPasskey() string { + uuid := make([]byte, 40) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + panic(err) + } + return string(uuid) +} + +func panicOnErr(err error) { if err != nil { - t.Error(err) + fmt.Println(err) + panic(err) } } -func createTestTxObj(t TestReporter) *Tx { +func createTestRedisTx() *Tx { testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) conf := &testConfig.Cache - verifyErrNil(err, t) + panicOnErr(err) testPool := &Pool{ conf: conf, @@ -54,438 +98,87 @@ func createTestTxObj(t TestReporter) *Tx { }, } - //testDialFunc := makeDialFunc(&testConfig.Cache) - //testConn, err := testDialFunc() txObj := &Tx{ - conf: testPool.conf, - done: false, - multi: false, - Conn: testPool.pool.Get(), + conf: testPool.conf, + done: false, + Conn: testPool.pool.Get(), } - verifyErrNil(err, t) + panicOnErr(err) // Test connection before returning - //txObj := Tx{&testConfig.Cache, false, false, testConn} _, err = txObj.Do("PING") - verifyErrNil(err, t) + panicOnErr(err) return txObj } -func createUserFromValues(userVals []string, t TestReporter) *models.User { - ID, err := strconv.ParseUint(userVals[0], 10, 64) - verifyErrNil(err, t) - Passkey := userVals[1] - UpMultiplier, err := strconv.ParseFloat(userVals[2], 64) - verifyErrNil(err, t) - DownMultiplier, err := strconv.ParseFloat(userVals[3], 64) - Slots, err := strconv.ParseInt(userVals[4], 10, 64) - verifyErrNil(err, t) - SlotsUsed, err := strconv.ParseInt(userVals[5], 10, 64) - verifyErrNil(err, t) - return &models.User{ID, Passkey, UpMultiplier, DownMultiplier, Slots, SlotsUsed} +func createTestUser() *models.User { + return &models.User{ID: createTestUserID(), Passkey: createTestPasskey(), + UpMultiplier: 1.01, DownMultiplier: 1.0, Slots: 4, SlotsUsed: 2, Snatches: 7} } -func createUser() models.User { - testUser := models.User{214, "32426b162be0bce5428e7e36afaf734ae5afb355", 0.0, 0.0, 4, 2} - return testUser +func createTestPeer(userID uint64, torrentID uint64) *models.Peer { + + return &models.Peer{ID: createTestPeerID(), UserID: userID, TorrentID: torrentID, + IP: "127.0.0.1", Port: 6889, Uploaded: 1024, Downloaded: 3000, Left: 4200, LastAnnounce: 11} } -func createSeeders() []models.Peer { - testSeeders := make([]models.Peer, 4) - testSeeders[0] = models.Peer{"testPeerID0", 57005, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - testSeeders[1] = models.Peer{"testPeerID1", 10101, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - testSeeders[2] = models.Peer{"testPeerID2", 29890, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - testSeeders[3] = models.Peer{"testPeerID3", 65261, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - return testSeeders +func createTestPeers(torrentID uint64, num int) map[string]models.Peer { + testPeers := make(map[string]models.Peer) + for i := 0; i < num; i++ { + tempPeer := createTestPeer(createTestUserID(), torrentID) + testPeers[models.PeerMapKey(tempPeer)] = *tempPeer + } + return testPeers } -func createLeechers() []models.Peer { - testLeechers := make([]models.Peer, 1) - testLeechers[0] = models.Peer{"testPeerID", 11111, 48879, "testIP", 6889, 1024, 3000, 4200, 6} - return testLeechers +func createTestTorrent() *models.Torrent { + + torrentInfohash := createTestInfohash() + torrentID := createTestTorrentID() + + testSeeders := createTestPeers(torrentID, 4) + testLeechers := createTestPeers(torrentID, 2) + + testTorrent := models.Torrent{ID: torrentID, Infohash: torrentInfohash, Active: true, + Seeders: testSeeders, Leechers: testLeechers, Snatches: 11, UpMultiplier: 1.0, DownMultiplier: 1.0, LastAction: 0} + return &testTorrent } -func createTorrent() models.Torrent { +func TestValidPeers(t *testing.T) { + testTx := createTestRedisTx() + testTorrentID := createTestTorrentID() + testPeers := createTestPeers(testTorrentID, 3) - testSeeders := createSeeders() - testLeechers := createLeechers() - - seeders := make(map[string]models.Peer) - for i := range testSeeders { - seeders[testSeeders[i].ID] = testSeeders[i] + panicOnErr(testTx.addPeers(testPeers, "test:")) + peerMap, err := testTx.getPeers(testTorrentID, "test:") + panicOnErr(err) + if len(peerMap) != len(testPeers) { + t.Error("Num Peers not equal ", len(peerMap), len(testPeers)) } - - leechers := make(map[string]models.Peer) - for i := range testLeechers { - leechers[testLeechers[i].ID] = testLeechers[i] - } - - testTorrent := models.Torrent{48879, sample_infohash, true, seeders, leechers, 11, 0.0, 0.0, 0} - return testTorrent + panicOnErr(testTx.removePeers(testTorrentID, testPeers, "test:")) } -func createTorrentJson(t TestReporter) []byte { - jsonTorrent, err := json.Marshal(createTorrent()) - verifyErrNil(err, t) - return jsonTorrent -} +func TestInvalidPeers(t *testing.T) { + testTx := createTestRedisTx() + testTorrentID := createTestTorrentID() + testPeers := createTestPeers(testTorrentID, 3) + tempPeer := createTestPeer(createTestUserID(), testTorrentID) + testPeers[models.PeerMapKey(tempPeer)] = *tempPeer -func createUserJson(t TestReporter) []byte { - jsonUser, err := json.Marshal(createUser()) - verifyErrNil(err, t) - return jsonUser -} + panicOnErr(testTx.addPeers(testPeers, "test:")) + // Imitate a peer being removed during get + hashKey := testTx.conf.Prefix + getPeerHashKey(tempPeer) + _, err := testTx.Do("DEL", hashKey) + panicOnErr(err) -func ExampleJsonTransaction(testTx *Tx, retries int, t TestReporter) { - defer func() { - if err := recover(); err != nil { - t.Error(err) - } - }() - verifyErrNil(testTx.initiateRead(), t) - _, err := testTx.Do("WATCH", "testKeyA") - verifyErrNil(err, t) - - _, err = redis.String(testTx.Do("GET", "testKeyA")) - if err != nil { - if err == redis.ErrNil { - t.Log("testKeyA does not exist yet") - } else { - t.Error(err) - } + peerMap, err := testTx.getPeers(testTorrentID, "test:") + panicOnErr(err) + // Expect 1 less peer due to delete + if len(peerMap) != len(testPeers)-1 { + t.Error("Num Peers not equal ", len(peerMap), len(testPeers)-1) } - _, err = testTx.Do("WATCH", "testKeyB") - verifyErrNil(err, t) - - _, err = redis.String(testTx.Do("GET", "testKeyB")) - if err != nil { - if err == redis.ErrNil { - t.Log("testKeyB does not exist yet") - } else { - t.Error(err) - } - } - - verifyErrNil(testTx.initiateWrite(), t) - - // Generate random data to set - randGen := rand.New(rand.NewSource(time.Now().UnixNano())) - verifyErrNil(testTx.Send("SET", "testKeyA", strconv.Itoa(randGen.Int())), t) - verifyErrNil(testTx.Send("SET", "testKeyB", strconv.Itoa(randGen.Int())), t) - - err = testTx.Commit() - // For parallel runs, there may be conflicts, retry until successful - if err == cache.ErrTxConflict && retries > 0 { - ExampleJsonTransaction(testTx, retries-1, t) - // Clear TxConflict, if retries max out, errors are already recorded - err = nil - } else if err == cache.ErrTxConflict { - t.Error("Conflict encountered, max retries reached") - t.Error(err) - } - verifyErrNil(err, t) -} - -func ExampleJsonSchemaRemoveSeeder(torrent *models.Torrent, peer *models.Peer, t TestReporter) { - testTx := createTestTxObj(t) - - verifyErrNil(testTx.initiateRead(), t) - - key := testTx.conf.Prefix + "torrent:" + torrent.Infohash - _, err := testTx.Do("WATCH", key) - reply, err := redis.String(testTx.Do("GET", key)) - if err != nil { - if err == redis.ErrNil { - t.Error("testTorrent does not exist") - } else { - t.Error(err) - } - } - - verifyErrNil(json.NewDecoder(strings.NewReader(reply)).Decode(torrent), t) - - delete(torrent.Seeders, "testPeerID2") - - jsonTorrent, err := json.Marshal(torrent) - verifyErrNil(err, t) - verifyErrNil(testTx.initiateWrite(), t) - verifyErrNil(testTx.Send("SET", key, jsonTorrent), t) - verifyErrNil(testTx.Commit(), t) - -} - -func ExampleRedisTypeSchemaRemoveSeeder(torrent *models.Torrent, peer *models.Peer, t TestReporter) { - testTx := createTestTxObj(t) - setkey := testTx.conf.Prefix + "torrent:" + torrent.Infohash + ":seeders" - reply, err := redis.Int(testTx.Do("SREM", setkey, *peer)) - if reply == 0 { - t.Errorf("remove %v failed", *peer) - } - verifyErrNil(err, t) -} - -func ExampleJsonSchemaFindUser(passkey string, t TestReporter) (*models.User, bool) { - testTx := createTestTxObj(t) - - verifyErrNil(testTx.initiateRead(), t) - - key := testTx.conf.Prefix + "user:" + passkey - _, err := testTx.Do("WATCH", key) - verifyErrNil(err, t) - reply, err := redis.String(testTx.Do("GET", key)) - if err != nil { - if err == redis.ErrNil { - return nil, false - } else { - t.Error(err) - } - } - - user := &models.User{} - verifyErrNil(json.NewDecoder(strings.NewReader(reply)).Decode(user), t) - return user, true -} - -func ExampleRedisTypesSchemaFindUser(passkey string, t TestReporter) (*models.User, bool) { - testTx := createTestTxObj(t) - hashkey := testTx.conf.Prefix + "user_hash:" + sample_passkey - userVals, err := redis.Strings(testTx.Do("HVALS", hashkey)) - if userVals == nil { - return nil, false - } - verifyErrNil(err, t) - compareUser := createUserFromValues(userVals, t) - return compareUser, true -} - -func BenchmarkRedisJsonSchemaRemoveSeeder(b *testing.B) { - for bCount := 0; bCount < b.N; bCount++ { - b.StopTimer() - testTx := createTestTxObj(b) - testTorrent := createTorrent() - testSeeders := createSeeders() - key := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash - // Benchmark setup not a transaction, not thread-safe - _, err := testTx.Do("SET", key, createTorrentJson(b)) - verifyErrNil(err, b) - b.StartTimer() - - ExampleJsonSchemaRemoveSeeder(&testTorrent, &testSeeders[2], b) + panicOnErr(testTx.removePeers(testTorrentID, testPeers, "test:")) + if len(testPeers) != 0 { + t.Errorf("All peers not removed, %d peers remain!", len(testPeers)) } } - -func BenchmarkRedisTypesSchemaRemoveSeeder(b *testing.B) { - for bCount := 0; bCount < b.N; bCount++ { - - // Ensure that remove completes successfully, - // even if it doesn't impact the performance - b.StopTimer() - testTx := createTestTxObj(b) - testTorrent := createTorrent() - setkey := testTx.conf.Prefix + "torrent:" + testTorrent.Infohash + ":seeders" - testSeeders := createSeeders() - reply, err := redis.Int(testTx.Do("SADD", setkey, - testSeeders[0], - testSeeders[1], - testSeeders[2], - testSeeders[3])) - - if reply == 0 { - b.Log("no keys added!") - } - verifyErrNil(err, b) - b.StartTimer() - - ExampleRedisTypeSchemaRemoveSeeder(&testTorrent, &testSeeders[2], b) - } -} - -func BenchmarkRedisJsonSchemaFindUser(b *testing.B) { - // Ensure successful user find ( a failed lookup may have different performance ) - b.StopTimer() - testTx := createTestTxObj(b) - testUser := createUser() - userJson := string(createUserJson(b)) - verifyErrNil(testTx.initiateWrite(), b) - key := testTx.conf.Prefix + "user:" + sample_passkey - verifyErrNil(testTx.Send("SET", key, userJson), b) - verifyErrNil(testTx.Commit(), b) - b.StartTimer() - for bCount := 0; bCount < b.N; bCount++ { - compareUser, exists := ExampleJsonSchemaFindUser(sample_passkey, b) - b.StopTimer() - if !exists { - b.Error("User not found!") - } - if testUser != *compareUser { - b.Errorf("user mismatch: %v vs. %v", compareUser, testUser) - } - b.StartTimer() - } -} - -func BenchmarkRedisTypesSchemaFindUser(b *testing.B) { - - // Ensure successful user find ( a failed lookup may have different performance ) - b.StopTimer() - testUser := createUser() - testTx := createTestTxObj(b) - hashkey := testTx.conf.Prefix + "user_hash:" + sample_passkey - reply, err := testTx.Do("HMSET", hashkey, - "id", testUser.ID, - "passkey", testUser.Passkey, - "up_multiplier", testUser.UpMultiplier, - "down_multiplier", testUser.DownMultiplier, - "slots", testUser.Slots, - "slots_used", testUser.SlotsUsed) - - if reply == nil { - b.Error("no hash fields added!") - } - verifyErrNil(err, b) - b.StartTimer() - - for bCount := 0; bCount < b.N; bCount++ { - - compareUser, exists := ExampleRedisTypesSchemaFindUser(sample_passkey, b) - - b.StopTimer() - if !exists { - b.Error("User not found!") - } - if testUser != *compareUser { - b.Errorf("user mismatch: %v vs. %v", compareUser, testUser) - } - b.StartTimer() - } -} - -func TestRedisTransaction(t *testing.T) { - for i := 0; i < 10; i++ { - // No retries for serial transactions - ExampleJsonTransaction(createTestTxObj(t), 0, t) - } -} - -func TestReadAfterWrite(t *testing.T) { - // Test requires panic - defer func() { - if err := recover(); err == nil { - t.Error("Read after write did not panic") - } - }() - - testTx := createTestTxObj(t) - verifyErrNil(testTx.initiateWrite(), t) - verifyErrNil(testTx.initiateRead(), t) -} - -func TestCloseClosedTransaction(t *testing.T) { - //require panic - defer func() { - if err := recover(); err == nil { - t.Error("Closing a closed transaction did not panic") - } - }() - - testTx := createTestTxObj(t) - testTx.close() - testTx.close() -} - -func TestParallelTx0(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - t.Parallel() - - for i := 0; i < 20; i++ { - go ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t) - time.Sleep(1 * time.Millisecond) - } - -} - -func TestParallelTx1(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - t.Parallel() - ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t) - for i := 0; i < 100; i++ { - go ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t) - } -} - -func TestParallelTx2(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - t.Parallel() - for i := 0; i < 100; i++ { - go ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t) - } - ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t) -} - -// Just in case the above parallel tests didn't fail, force a failure here -func TestParallelInterrupted(t *testing.T) { - t.Parallel() - - testTx := createTestTxObj(t) - defer func() { - if err := recover(); err != nil { - t.Errorf("initiateRead() failed in parallel %s", err) - } - }() - verifyErrNil(testTx.initiateRead(), t) - - _, err := testTx.Do("WATCH", "testKeyA") - verifyErrNil(err, t) - - testValueA, err := redis.String(testTx.Do("GET", "testKeyA")) - if err != nil { - if err == redis.ErrNil { - t.Log("redis.ErrNil") - } else { - t.Error(err) - } - } - - _, err = testTx.Do("WATCH", "testKeyB") - if err != nil { - if err == redis.ErrNil { - t.Log("redis.ErrNil") - } else { - t.Error(err) - } - } - - testValueB, err := redis.String(testTx.Do("GET", "testKeyB")) - if err != nil { - if err == redis.ErrNil { - t.Log("redis.ErrNil") - } else { - t.Error(err) - } - } - // Stand in for what real updates would do - testValueB = testValueB + "+updates" - testValueA = testValueA + "+updates" - - // Simulating another client interrupts transaction, causing exec to fail - ExampleJsonTransaction(createTestTxObj(t), MAX_RETRIES, t) - - verifyErrNil(testTx.initiateWrite(), t) - verifyErrNil(testTx.Send("SET", "testKeyA", testValueA), t) - verifyErrNil(testTx.Send("SET", "testKeyB", testValueB), t) - - keys, err := (testTx.Do("EXEC")) - // Expect error - if keys != nil { - t.Error("Keys not nil; exec should have been interrupted") - } - verifyErrNil(err, t) - - testTx.close() -} diff --git a/cache/redis/tx_test.go b/cache/redis/tx_test.go new file mode 100644 index 0000000..3d9772c --- /dev/null +++ b/cache/redis/tx_test.go @@ -0,0 +1,563 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package redis + +import ( + "math/rand" + "os" + "reflect" + "testing" + "time" + + "github.com/pushrax/chihaya/cache" + "github.com/pushrax/chihaya/config" + "github.com/pushrax/chihaya/models" +) + +func createTestTx() cache.Tx { + testConfig, err := config.Open(os.Getenv("TESTCONFIGPATH")) + panicOnErr(err) + conf := &testConfig.Cache + + testPool, err := cache.Open(conf) + panicOnErr(err) + + txObj, err := testPool.Get() + panicOnErr(err) + + return txObj +} + +func TestFindUserSuccess(t *testing.T) { + tx := createTestTx() + testUser := createTestUser() + + panicOnErr(tx.AddUser(testUser)) + foundUser, found, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + if !found { + t.Error("user not found", testUser) + } + if *foundUser != *testUser { + t.Error("found user mismatch", *foundUser, testUser) + } + // Cleanup + panicOnErr(tx.RemoveUser(testUser)) +} + +func TestFindUserFail(t *testing.T) { + tx := createTestTx() + testUser := createTestUser() + + foundUser, found, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + if found { + t.Error("user found", foundUser) + } +} + +func TestRemoveUser(t *testing.T) { + tx := createTestTx() + testUser := createTestUser() + + panicOnErr(tx.AddUser(testUser)) + err := tx.RemoveUser(testUser) + panicOnErr(err) + foundUser, found, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + if found { + t.Error("removed user found", foundUser) + } +} + +func TestFindTorrentSuccess(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if !found { + t.Error("torrent not found", testTorrent) + } + if !reflect.DeepEqual(foundTorrent, testTorrent) { + t.Error("found torrent mismatch", foundTorrent, testTorrent) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestFindTorrentFail(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if found { + t.Error("torrent found", foundTorrent) + } +} + +func TestRemoveTorrent(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + + panicOnErr(tx.RemoveTorrent(testTorrent)) + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if found { + t.Error("removed torrent found", foundTorrent) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestClientWhitelistSuccess(t *testing.T) { + tx := createTestTx() + testPeerID := "-lt0D30-" + + panicOnErr(tx.WhitelistClient(testPeerID)) + found, err := tx.ClientWhitelisted(testPeerID) + panicOnErr(err) + if !found { + t.Error("peerID not found", testPeerID) + } + // Cleanup + panicOnErr(tx.UnWhitelistClient(testPeerID)) +} + +func TestClientWhitelistFail(t *testing.T) { + tx := createTestTx() + testPeerID2 := "TIX0192" + + found, err := tx.ClientWhitelisted(testPeerID2) + panicOnErr(err) + if found { + t.Error("peerID found", testPeerID2) + } +} + +func TestRecordSnatch(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + testUser := createTestUser() + panicOnErr(tx.AddTorrent(testTorrent)) + panicOnErr(tx.AddUser(testUser)) + + userSnatches := testUser.Snatches + torrentSnatches := testTorrent.Snatches + + panicOnErr(tx.RecordSnatch(testUser, testTorrent)) + + foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundUser, _, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + + if testUser.Snatches != userSnatches+1 { + t.Error("snatch not recorded to local user", testUser.Snatches, userSnatches+1) + } + if testTorrent.Snatches != torrentSnatches+1 { + t.Error("snatch not recorded to local torrent") + } + if foundUser.Snatches != userSnatches+1 { + t.Error("snatch not recorded to cached user", foundUser.Snatches, userSnatches+1) + } + if foundTorrent.Snatches != torrentSnatches+1 { + t.Error("snatch not recorded to cached torrent") + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) + panicOnErr(tx.RemoveUser(testUser)) +} + +func TestMarkActive(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + testTorrent.Active = false + panicOnErr(tx.AddTorrent(testTorrent)) + + panicOnErr(tx.MarkActive(testTorrent)) + foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + + if foundTorrent.Active != true { + t.Error("cached torrent not activated") + } + if testTorrent.Active != true { + t.Error("cached torrent not activated") + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestClientWhitelistRemove(t *testing.T) { + tx := createTestTx() + testPeerID := "-lt0D30-" + panicOnErr(tx.WhitelistClient(testPeerID)) + panicOnErr(tx.UnWhitelistClient(testPeerID)) + + found, err := tx.ClientWhitelisted(testPeerID) + panicOnErr(err) + if found { + t.Error("removed peerID found", testPeerID) + } +} + +func TestAddSeeder(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + + panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, found := foundTorrent.Seeders[models.PeerMapKey(testSeeder)] + if found && foundSeeder != *testSeeder { + t.Error("seeder not added to cache", testSeeder) + } + foundSeeder, found = testTorrent.Seeders[models.PeerMapKey(testSeeder)] + if found && foundSeeder != *testSeeder { + t.Error("seeder not added to local", testSeeder) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestAddLeecher(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + + panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, found := foundTorrent.Leechers[models.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to cache", testLeecher) + } + foundLeecher, found = testTorrent.Leechers[models.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to local", testLeecher) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestRemoveSeeder(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) + + panicOnErr(tx.RemoveSeeder(testTorrent, testSeeder)) + foundSeeder, found := testTorrent.Seeders[models.PeerMapKey(testSeeder)] + if found || foundSeeder == *testSeeder { + t.Error("seeder not removed from local", foundSeeder) + } + + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, found = foundTorrent.Seeders[models.PeerMapKey(testSeeder)] + if found || foundSeeder == *testSeeder { + t.Error("seeder not removed from cache", foundSeeder, *testSeeder) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestRemoveLeecher(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) + + panicOnErr(tx.RemoveLeecher(testTorrent, testLeecher)) + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, found := foundTorrent.Leechers[models.PeerMapKey(testLeecher)] + if found || foundLeecher == *testLeecher { + t.Error("leecher not removed from cache", foundLeecher, *testLeecher) + } + foundLeecher, found = testTorrent.Leechers[models.PeerMapKey(testLeecher)] + if found || foundLeecher == *testLeecher { + t.Error("leecher not removed from local", foundLeecher, *testLeecher) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestSetSeeder(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + testSeeder.Uploaded += uint64(r.Int63()) + + panicOnErr(tx.SetSeeder(testTorrent, testSeeder)) + + foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, _ := foundTorrent.Seeders[models.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in cache", foundSeeder, *testSeeder) + } + foundSeeder, _ = testTorrent.Seeders[models.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in local", foundSeeder, *testSeeder) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestSetLeecher(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + testLeecher.Uploaded += uint64(r.Int63()) + + panicOnErr(tx.SetLeecher(testTorrent, testLeecher)) + foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, _ := foundTorrent.Leechers[models.PeerMapKey(testLeecher)] + if foundLeecher != *testLeecher { + t.Error("leecher not updated in cache", testLeecher) + } + foundLeecher, _ = testTorrent.Leechers[models.PeerMapKey(testLeecher)] + if foundLeecher != *testLeecher { + t.Error("leecher not updated in local", testLeecher) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestIncrementSlots(t *testing.T) { + tx := createTestTx() + testUser := createTestUser() + panicOnErr(tx.AddUser(testUser)) + numSlots := testUser.Slots + + panicOnErr(tx.IncrementSlots(testUser)) + foundUser, _, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + + if foundUser.Slots != numSlots+1 { + t.Error("cached slots not incremented") + } + if testUser.Slots != numSlots+1 { + t.Error("local slots not incremented") + } + // Cleanup + panicOnErr(tx.RemoveUser(testUser)) +} + +func TestDecrementSlots(t *testing.T) { + tx := createTestTx() + testUser := createTestUser() + panicOnErr(tx.AddUser(testUser)) + numSlots := testUser.Slots + + panicOnErr(tx.DecrementSlots(testUser)) + foundUser, _, err := tx.FindUser(testUser.Passkey) + panicOnErr(err) + + if foundUser.Slots != numSlots-1 { + t.Error("cached slots not incremented") + } + if testUser.Slots != numSlots-1 { + t.Error("local slots not incremented") + } + // Cleanup + panicOnErr(tx.RemoveUser(testUser)) +} + +func TestLeecherFinished(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) + testLeecher.Left = 0 + + panicOnErr(tx.LeecherFinished(testTorrent, testLeecher)) + + foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, _ := foundTorrent.Seeders[models.PeerMapKey(testLeecher)] + if foundSeeder != *testLeecher { + t.Error("seeder not added to cache", foundSeeder, *testLeecher) + } + foundSeeder, _ = foundTorrent.Leechers[models.PeerMapKey(testLeecher)] + if foundSeeder == *testLeecher { + t.Error("leecher not removed from cache", testLeecher) + } + foundSeeder, _ = testTorrent.Seeders[models.PeerMapKey(testLeecher)] + if foundSeeder != *testLeecher { + t.Error("seeder not added to local", testLeecher) + } + foundSeeder, _ = testTorrent.Leechers[models.PeerMapKey(testLeecher)] + if foundSeeder == *testLeecher { + t.Error("leecher not removed from local", testLeecher) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +// Add, update, verify remove +func TestUpdatePeer(t *testing.T) { + tx := createTestTx() + testTorrent := createTestTorrent() + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddTorrent(testTorrent)) + panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) + // Update a seeder, set it, then check to make sure it updated + r := rand.New(rand.NewSource(time.Now().UnixNano())) + testSeeder.Uploaded += uint64(r.Int63()) + + panicOnErr(tx.SetSeeder(testTorrent, testSeeder)) + + panicOnErr(tx.RemoveSeeder(testTorrent, testSeeder)) + foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + if seeder, exists := foundTorrent.Seeders[models.PeerMapKey(testSeeder)]; exists { + t.Error("seeder not removed from cache", seeder) + } + if seeder, exists := testTorrent.Seeders[models.PeerMapKey(testSeeder)]; exists { + t.Error("seeder not removed from local", seeder) + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestParallelFindUser(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + tx := createTestTx() + testUserSuccess := createTestUser() + testUserFail := createTestUser() + panicOnErr(tx.AddUser(testUserSuccess)) + + for i := 0; i < 10; i++ { + foundUser, found, err := tx.FindUser(testUserFail.Passkey) + panicOnErr(err) + if found { + t.Error("user found", foundUser) + } + foundUser, found, err = tx.FindUser(testUserSuccess.Passkey) + panicOnErr(err) + if !found { + t.Error("user not found", testUserSuccess) + } + if *foundUser != *testUserSuccess { + t.Error("found user mismatch", *foundUser, testUserSuccess) + } + } + // Cleanup + panicOnErr(tx.RemoveUser(testUserSuccess)) +} + +func TestParallelFindTorrent(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + tx := createTestTx() + testTorrentSuccess := createTestTorrent() + testTorrentFail := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrentSuccess)) + + for i := 0; i < 10; i++ { + foundTorrent, found, err := tx.FindTorrent(testTorrentSuccess.Infohash) + panicOnErr(err) + if !found { + t.Error("torrent not found", testTorrentSuccess) + } + if !reflect.DeepEqual(foundTorrent, testTorrentSuccess) { + t.Error("found torrent mismatch", foundTorrent, testTorrentSuccess) + } + foundTorrent, found, err = tx.FindTorrent(testTorrentFail.Infohash) + panicOnErr(err) + if found { + t.Error("torrent found", foundTorrent) + } + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrentSuccess)) +} + +func TestParallelSetSeeder(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + testSeeder := createTestPeer(createTestUserID(), testTorrent.ID) + panicOnErr(tx.AddSeeder(testTorrent, testSeeder)) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for i := 0; i < 10; i++ { + testSeeder.Uploaded += uint64(r.Int63()) + + panicOnErr(tx.SetSeeder(testTorrent, testSeeder)) + + foundTorrent, _, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundSeeder, _ := foundTorrent.Seeders[models.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in cache", foundSeeder, *testSeeder) + } + foundSeeder, _ = testTorrent.Seeders[models.PeerMapKey(testSeeder)] + if foundSeeder != *testSeeder { + t.Error("seeder not updated in local", foundSeeder, *testSeeder) + } + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} + +func TestParallelAddLeecher(t *testing.T) { + t.Parallel() + if testing.Short() { + t.Skip() + } + tx := createTestTx() + testTorrent := createTestTorrent() + panicOnErr(tx.AddTorrent(testTorrent)) + + for i := 0; i < 10; i++ { + testLeecher := createTestPeer(createTestUserID(), testTorrent.ID) + + panicOnErr(tx.AddLeecher(testTorrent, testLeecher)) + + foundTorrent, found, err := tx.FindTorrent(testTorrent.Infohash) + panicOnErr(err) + foundLeecher, found := foundTorrent.Leechers[models.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to cache", testLeecher) + } + foundLeecher, found = testTorrent.Leechers[models.PeerMapKey(testLeecher)] + if found && foundLeecher != *testLeecher { + t.Error("leecher not added to local", testLeecher) + } + } + // Cleanup + panicOnErr(tx.RemoveTorrent(testTorrent)) +} diff --git a/models/models.go b/models/models.go index abfeeb6..5599c94 100644 --- a/models/models.go +++ b/models/models.go @@ -4,6 +4,10 @@ package models +import ( + "strconv" +) + type Peer struct { ID string `json:"id"` UserID uint64 `json:"user_id"` @@ -18,6 +22,10 @@ type Peer struct { LastAnnounce int64 `json:"last_announce"` } +func PeerMapKey(peer *Peer) string { + return peer.ID + ":" + strconv.FormatUint(peer.UserID, 36) +} + type Torrent struct { ID uint64 `json:"id"` Infohash string `json:"infohash"` @@ -26,7 +34,7 @@ type Torrent struct { Seeders map[string]Peer `json:"seeders"` Leechers map[string]Peer `json:"leechers"` - Snatches uint `json:"snatches"` + Snatches uint64 `json:"snatches"` UpMultiplier float64 `json:"up_multiplier"` DownMultiplier float64 `json:"down_multiplier"` LastAction int64 `json:"last_action"` @@ -40,4 +48,5 @@ type User struct { DownMultiplier float64 `json:"down_multiplier"` Slots int64 `json:"slots"` SlotsUsed int64 `json:"slots_used"` + Snatches uint64 `json:"snatches"` } diff --git a/server/announce.go b/server/announce.go index 1d7a142..90f8d89 100644 --- a/server/announce.go +++ b/server/announce.go @@ -82,8 +82,8 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // Look for the user in in the pool of seeders and leechers - _, seeder := torrent.Seeders[peerID] - _, leecher := torrent.Leechers[peerID] + _, seeder := torrent.Seeders[models.PeerMapKey(peer)] + _, leecher := torrent.Leechers[models.PeerMapKey(peer)] switch { // Guarantee that no user is in both pools @@ -170,11 +170,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { log.Panicf("server: %s", err) } if leecher { - err := tx.RemoveLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - err = tx.AddSeeder(torrent, peer) + err := tx.LeecherFinished(torrent, peer) if err != nil { log.Panicf("server: %s", err) } @@ -182,11 +178,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { case leecher && left == 0: // A leecher completed but the event was never received - err := tx.RemoveLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - err = tx.AddSeeder(torrent, peer) + err := tx.LeecherFinished(torrent, peer) if err != nil { log.Panicf("server: %s", err) } @@ -197,12 +189,6 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { peer.IP = ip } - // If the transaction failed, retry - err = tx.Commit() - if err != nil { - continue - } - // Generate the response seedCount := len(torrent.Seeders) leechCount := len(torrent.Leechers)