diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index e07f43a..60564a2 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -202,7 +202,9 @@ func New(provided Config) (storage.PeerStore, error) { case <-time.After(cfg.GarbageCollectionInterval): before := time.Now().Add(-cfg.PeerLifetime) log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) - ps.collectGarbage(before) + if err = ps.collectGarbage(before); err != nil { + log.Error("storage: collectGarbage error", log.Fields{"before": before, "error": err}) + } } } }() @@ -298,29 +300,29 @@ func (ps *peerStore) populateProm() { defer conn.Close() for _, group := range ps.groups() { - if n, err := conn.Do("GET", ps.infohashCountKey(group)); err != nil { + if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && err != redis.ErrNil { log.Error("storage: GET counter failure", log.Fields{ "key": ps.infohashCountKey(group), "error": err, }) } else { - numInfohashes += n.(int64) + numInfohashes += n } - if n, err := conn.Do("GET", ps.seederCountKey(group)); err != nil { + if n, err := redis.Int64(conn.Do("GET", ps.seederCountKey(group))); err != nil && err != redis.ErrNil { log.Error("storage: GET counter failure", log.Fields{ "key": ps.seederCountKey(group), "error": err, }) } else { - numSeeders += n.(int64) + numSeeders += n } - if n, err := conn.Do("GET", ps.leecherCountKey(group)); err != nil { + if n, err := redis.Int64(conn.Do("GET", ps.leecherCountKey(group))); err != nil && err != redis.ErrNil { log.Error("storage: GET counter failure", log.Fields{ "key": ps.leecherCountKey(group), "error": err, }) } else { - numLeechers += n.(int64) + numLeechers += n } } @@ -357,20 +359,20 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error conn.Send("MULTI") conn.Send("HSET", encodedSeederInfoHash, pk, ct) conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) - reply, err := redis.Values(conn.Do("EXEC")) + reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err } // pk is a new field. - if reply[0].(int64) == 1 { + if reply[0] == 1 { _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) if err != nil { return err } } // encodedSeederInfoHash is a new field. - if reply[1].(int64) == 1 { + if reply[1] == 1 { _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) if err != nil { return err @@ -400,11 +402,11 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) - delNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) + delNum, err := redis.Int64(conn.Do("HDEL", encodedSeederInfoHash, pk)) if err != nil { return err } - if delNum.(int64) == 0 { + if delNum == 0 { return storage.ErrResourceDoesNotExist } if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil { @@ -438,12 +440,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error conn.Send("MULTI") conn.Send("HSET", encodedLeecherInfoHash, pk, ct) conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct) - reply, err := redis.Values(conn.Do("EXEC")) + reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err } // pk is a new field. - if reply[0].(int64) == 1 { + if reply[0] == 1 { _, err = conn.Do("INCR", ps.leecherCountKey(addressFamily)) if err != nil { return err @@ -471,11 +473,11 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er pk := newPeerKey(p) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) - delNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + delNum, err := redis.Int64(conn.Do("HDEL", encodedLeecherInfoHash, pk)) if err != nil { return err } - if delNum.(int64) == 0 { + if delNum == 0 { return storage.ErrResourceDoesNotExist } if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil { @@ -511,23 +513,23 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) conn.Send("HDEL", encodedLeecherInfoHash, pk) conn.Send("HSET", encodedSeederInfoHash, pk, ct) conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) - reply, err := redis.Values(conn.Do("EXEC")) + reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err } - if reply[0].(int64) == 1 { + if reply[0] == 1 { _, err = conn.Do("DECR", ps.leecherCountKey(addressFamily)) if err != nil { return err } } - if reply[1].(int64) == 1 { + if reply[1] == 1 { _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) if err != nil { return err } } - if reply[2].(int64) == 1 { + if reply[2] == 1 { _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) if err != nil { return err @@ -633,7 +635,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa conn := ps.rb.open() defer conn.Close() - leechersLen, err := conn.Do("HLEN", encodedLeecherInfoHash) + leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash)) if err != nil { log.Error("storage: Redis HLEN failure", log.Fields{ "Hkey": encodedLeecherInfoHash, @@ -642,7 +644,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa return } - seedersLen, err := conn.Do("HLEN", encodedSeederInfoHash) + seedersLen, err := redis.Int64(conn.Do("HLEN", encodedSeederInfoHash)) if err != nil { log.Error("storage: Redis HLEN failure", log.Fields{ "Hkey": encodedSeederInfoHash, @@ -651,8 +653,8 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa return } - resp.Incomplete = uint32(leechersLen.(int64)) - resp.Complete = uint32(seedersLen.(int64)) + resp.Incomplete = uint32(leechersLen) + resp.Complete = uint32(seedersLen) return } @@ -717,45 +719,41 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { for _, group := range ps.groups() { // list all infohashes in the group - infohashesList, err := conn.Do("HKEYS", group) + infohashesList, err := redis.Strings(conn.Do("HKEYS", group)) if err != nil { return err } - infohashes := infohashesList.([]interface{}) - for _, ih := range infohashes { - ihStr := string(ih.([]byte)) + for _, ihStr := range infohashesList { isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S" // list all (peer, timeout) pairs for the ih - ihList, err := conn.Do("HGETALL", ihStr) + ihList, err := redis.Strings(conn.Do("HGETALL", ihStr)) if err != nil { return err } - conIhList := ihList.([]interface{}) var pk serializedPeer var removedPeerCount int64 - for index, ihField := range conIhList { + for index, ihField := range ihList { if index%2 == 1 { // value - mtime, err := strconv.ParseInt(string(ihField.([]byte)), 10, 64) + mtime, err := strconv.ParseInt(ihField, 10, 64) if err != nil { return err } if mtime <= cutoffUnix { + log.Debug("storage: deleting peer", log.Fields{ + "Peer": decodePeerKey(pk).String(), + }) ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk)) if err != nil { return err } removedPeerCount += ret - - log.Debug("storage: deleting peer", log.Fields{ - "Peer": decodePeerKey(pk).String(), - }) } } else { // key - pk = serializedPeer(ihField.([]byte)) + pk = serializedPeer([]byte(ihField)) } } // DECR seeder/leecher counter @@ -763,8 +761,10 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { if isSeeder { decrCounter = ps.seederCountKey(group) } - if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil { - return err + if removedPeerCount > 0 { + if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil { + return err + } } // use WATCH to avoid race condition @@ -773,11 +773,11 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { if err != nil { return err } - ihLen, err := conn.Do("HLEN", ihStr) + ihLen, err := redis.Int64(conn.Do("HLEN", ihStr)) if err != nil { return err } - if ihLen.(int64) == 0 { + if ihLen == 0 { // Empty hashes are not shown among existing keys, // in other words, it's removed automatically after `HDEL` the last field. //_, err := conn.Do("DEL", ihStr) @@ -788,15 +788,17 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { conn.Send("DECR", ps.infohashCountKey(group)) } _, err = redis.Values(conn.Do("EXEC")) - if err != nil { - log.Error("storage: Redis EXEC failure, maybe caused by WATCH, ignored", log.Fields{ + if err != nil && err != redis.ErrNil { + log.Error("storage: Redis EXEC failure", log.Fields{ "group": group, "infohash": ihStr, "error": err, }) } } else { - conn.Do("UNWATCH", ihStr) + if _, err = conn.Do("UNWATCH"); err != nil && err != redis.ErrNil { + log.Error("storage: Redis UNWATCH failure", log.Fields{"error": err}) + } } } }