storage/redis: refactor redis storage

- Based on @duyanghao's PR
  - Make staticcheck pass
  - Address review commentsq
This commit is contained in:
onestraw 2018-12-12 15:26:11 +08:00
parent e78892d5ac
commit d65ab677e7
6 changed files with 471 additions and 312 deletions

View file

@ -20,12 +20,14 @@ Differentiating features include:
- IPv4 and IPv6 support
- [YAML] configuration
- Metrics via [Prometheus]
- High Availability via [Redis]
[releases]: https://github.com/chihaya/chihaya/releases
[BitTorrent tracker]: http://en.wikipedia.org/wiki/BitTorrent_tracker
[Go]: https://golang.org
[YAML]: http://yaml.org
[Prometheus]: http://prometheus.io
[Redis]: https://redis.io
## Why Chihaya?
@ -155,10 +157,6 @@ After all PreHooks have executed, any missing response fields that are required
PostHooks are asynchronous tasks that occur after a response has been delivered to the client.
Request data is written to the storage asynchronously in one of these PostHooks.
### HA(High Availability)
Chihaya can achieve high availability by using [redis](https://redis.io/) storage backend.
## Related projects
- [BitTorrent.org](https://github.com/bittorrent/bittorrent.org): a static website containing the BitTorrent spec and all BEPs

72
docs/storage/redis.md Normal file
View file

@ -0,0 +1,72 @@
# Redis Storage
This storage system separates chihaya from storage and stores all peer data in Redis to achieve HA.
## Use Case
When one chihaya instance is down, the Redis can continuily serve peer data through other chihaya instances.
## Configuration
```yaml
chihaya:
storage:
name: redis
config:
# The frequency which stale peers are removed.
gc_interval: 14m
# The frequency which metrics are pushed into a local Prometheus endpoint.
prometheus_reporting_interval: 1s
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 16m
# The address of redis storage.
redis_broker: "redis://pwd@127.0.0.1:6379/0"
# The timeout for reading a command reply from redis.
redis_read_timeout: 15s
# The timeout for writing a command to redis.
redis_write_timeout: 15s
# The timeout for connecting to redis server.
redis_connect_timeout: 15s
```
## Implementation
Seeders and Leechers for a particular InfoHash are stored with a redis hash structure, the infohash is used as hash key, peer key is field, last modified time is value.
All the InfoHashes (swarms) are also stored into redis hash, IP family is the key, infohash is field, last modified time is value.
Here is an example
```
- IPv4
- IPv4_S_<infohash 1>: <modification time>
- IPv4_L_<infohash 1>: <modification time>
- IPv4_S_<infohash 2>: <modification time>
- IPv4_S_<infohash 1>
- <peer 1 key>: <modification time>
- <peer 2 key>: <modification time>
- IPv4_L_<infohash 1>
- <peer 3 key>: <modification time>
- IPv4_S_<infohash 2>
- <peer 3 key>: <modification time>
```
In this case, prometheus will record two swarms, three seeders and one leecher.
So tree keys are used to record the count of swarms, seeders and leechers for each group (IPv4, IPv6).
```
- IPv4_infohash_count: 2
- IPv4_S_count: 3
- IPv4_L_count: 1
```
Note: IPv4_infohash_count has the different meaning with `memory` storage, it represents the number of infohashes reported by seeder.

View file

@ -1,61 +0,0 @@
package common
import (
"time"
"github.com/garyburd/redigo/redis"
)
// RedisConnector ...
type RedisConnector struct{}
// NewPool returns a new pool of Redis connections
func (rc *RedisConnector) NewPool(socketPath, host, password string, db int) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := rc.open(socketPath, host, password, db)
if err != nil {
return nil, err
}
if db != 0 {
_, err = c.Do("SELECT", db)
if err != nil {
return nil, err
}
}
return c, err
},
// PINGs connections that have been idle more than 10 seconds
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Duration(10*time.Second) {
return nil
}
_, err := c.Do("PING")
return err
},
}
}
// Open a new Redis connection
func (rc *RedisConnector) open(socketPath, host, password string, db int) (redis.Conn, error) {
var opts = []redis.DialOption{
redis.DialDatabase(db),
redis.DialReadTimeout(15 * time.Second),
redis.DialWriteTimeout(15 * time.Second),
redis.DialConnectTimeout(15 * time.Second),
}
if password != "" {
opts = append(opts, redis.DialPassword(password))
}
if socketPath != "" {
return redis.Dial("unix", socketPath, opts...)
}
return redis.Dial("tcp", host, opts...)
}

View file

@ -1,29 +1,43 @@
// Package redis implements the storage interface for a Chihaya
// BitTorrent tracker keeping peer data in redis.
// BitTorrent tracker keeping peer data in redis with hash.
// There two categories of hash:
//
// - IPv{4,6}_{L,S}_infohash
// To save peers that hold the infohash, used for fast searching,
// deleting, and timeout handling
//
// - IPv{4,6}
// To save all the infohashes, used for garbage collection,
// metrics aggregation and leecher graduation
//
// Tree keys are used to record the count of swarms, seeders
// and leechers for each group (IPv4, IPv6).
//
// - IPv{4,6}_infohash_count
// To record the number of infohashes.
//
// - IPv{4,6}_S_count
// To record the number of seeders.
//
// - IPv{4,6}_L_count
// To record the number of leechers.
package redis
import (
"encoding/binary"
"net"
neturl "net/url"
"strconv"
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/gomodule/redigo/redis"
yaml "gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/pkg/timecache"
"github.com/chihaya/chihaya/storage"
"errors"
"fmt"
"strconv"
"strings"
"github.com/chihaya/chihaya/storage/redis/common"
"github.com/garyburd/redigo/redis"
"gopkg.in/redsync.v1"
)
// Name is the name by which this peer store is registered with Chihaya.
@ -35,6 +49,9 @@ const (
defaultGarbageCollectionInterval = time.Minute * 3
defaultPeerLifetime = time.Minute * 30
defaultRedisBroker = "redis://myRedis@127.0.0.1:6379/0"
defaultRedisReadTimeout = time.Second * 15
defaultRedisWriteTimeout = time.Second * 15
defaultRedisConnectTimeout = time.Second * 15
)
func init() {
@ -67,16 +84,22 @@ type Config struct {
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"`
RedisBroker string `yaml:"redis_broker"`
RedisReadTimeout time.Duration `yaml:"redis_read_timeout"`
RedisWriteTimeout time.Duration `yaml:"redis_write_timeout"`
RedisConnectTimeout time.Duration `yaml:"redis_connect_timeout"`
}
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"gcInterval": cfg.GarbageCollectionInterval,
"promReportInterval": cfg.PrometheusReportingInterval,
"peerLifetime": cfg.PeerLifetime,
"redisBroker": cfg.RedisBroker,
"name": Name,
"gcInterval": cfg.GarbageCollectionInterval,
"promReportInterval": cfg.PrometheusReportingInterval,
"peerLifetime": cfg.PeerLifetime,
"redisBroker": cfg.RedisBroker,
"redisReadTimeout": cfg.RedisReadTimeout,
"redisWriteTimeout": cfg.RedisWriteTimeout,
"redisConnectTimeout": cfg.RedisConnectTimeout,
}
}
@ -96,6 +119,33 @@ func (cfg Config) Validate() Config {
})
}
if cfg.RedisReadTimeout <= 0 {
validcfg.RedisReadTimeout = defaultRedisReadTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".RedisReadTimeout",
"provided": cfg.RedisReadTimeout,
"default": validcfg.RedisReadTimeout,
})
}
if cfg.RedisWriteTimeout <= 0 {
validcfg.RedisWriteTimeout = defaultRedisWriteTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".RedisWriteTimeout",
"provided": cfg.RedisWriteTimeout,
"default": validcfg.RedisWriteTimeout,
})
}
if cfg.RedisConnectTimeout <= 0 {
validcfg.RedisConnectTimeout = defaultRedisConnectTimeout
log.Warn("falling back to default configuration", log.Fields{
"name": Name + ".RedisConnectTimeout",
"provided": cfg.RedisConnectTimeout,
"default": validcfg.RedisConnectTimeout,
})
}
if cfg.GarbageCollectionInterval <= 0 {
validcfg.GarbageCollectionInterval = defaultGarbageCollectionInterval
log.Warn("falling back to default configuration", log.Fields{
@ -126,74 +176,18 @@ func (cfg Config) Validate() Config {
return validcfg
}
// ParseRedisURL ...
func ParseRedisURL(url string) (host, password string, db int, err error) {
// redis://pwd@host/db
var u *neturl.URL
u, err = neturl.Parse(url)
if err != nil {
return
}
if u.Scheme != "redis" {
err = errors.New("No redis scheme found")
return
}
if u.User != nil {
password = u.User.String()
}
host = u.Host
parts := strings.Split(u.Path, "/")
if len(parts) == 1 {
db = 0 //default redis db
} else {
db, err = strconv.Atoi(parts[1])
if err != nil {
db, err = 0, nil //ignore err here
}
}
return
}
// NewRedisBackend creates RedisBackend instance
func NewRedisBackend(host, password, socketPath string, db int) *RedisBackend {
return &RedisBackend{
host: host,
db: db,
password: password,
socketPath: socketPath,
}
}
// open returns or creates instance of Redis connection
func (rb *RedisBackend) open() redis.Conn {
if rb.pool == nil {
rb.pool = rb.NewPool(rb.socketPath, rb.host, rb.password, rb.db)
}
if rb.redsync == nil {
var pools = []redsync.Pool{rb.pool}
rb.redsync = redsync.New(pools)
}
return rb.pool.Get()
}
// New creates a new PeerStore backed by redis.
func New(provided Config) (storage.PeerStore, error) {
cfg := provided.Validate()
// creates RedisBackend instance
h, p, db, err := ParseRedisURL(cfg.RedisBroker)
u, err := parseRedisURL(cfg.RedisBroker)
if err != nil {
return nil, err
}
ps := &peerStore{
cfg: cfg,
rb: NewRedisBackend(h, p, "", db),
rb: newRedisBackend(&provided, u, ""),
closed: make(chan struct{}),
}
@ -263,83 +257,76 @@ func decodePeerKey(pk serializedPeer) bittorrent.Peer {
return peer
}
// RedisBackend represents a Memcache result backend
type RedisBackend struct {
host string
password string
db int
pool *redis.Pool
// If set, path to a socket file overrides hostname
socketPath string
redsync *redsync.Redsync
common.RedisConnector
}
type peerStore struct {
cfg Config
rb *RedisBackend
rb *redisBackend
closed chan struct{}
wg sync.WaitGroup
}
// populateProm aggregates metrics over all shards and then posts them to
func (ps *peerStore) groups() []string {
return []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()}
}
func (ps *peerStore) leecherInfohashKey(af, ih string) string {
return af + "_L_" + ih
}
func (ps *peerStore) seederInfohashKey(af, ih string) string {
return af + "_S_" + ih
}
func (ps *peerStore) infohashCountKey(af string) string {
return af + "_infohash_count"
}
func (ps *peerStore) seederCountKey(af string) string {
return af + "_S_count"
}
func (ps *peerStore) leecherCountKey(af string) string {
return af + "_L_count"
}
// populateProm aggregates metrics over all groups and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()}
var numInfohashes, numSeeders, numLeechers int64
conn := ps.rb.open()
defer conn.Close()
for _, shard := range shards {
infohashes_list, err := conn.Do("HKEYS", shard) // key
if err != nil {
return
for _, group := range ps.groups() {
if n, err := conn.Do("GET", ps.infohashCountKey(group)); err != nil {
log.Error("storage: GET counter failure", log.Fields{
"key": ps.infohashCountKey(group),
"error": err,
})
} else {
numInfohashes += n.(int64)
}
infohashes := infohashes_list.([]interface{})
InfohashLPrefix := shard + "_L_"
InfohashSPrefix := shard + "_S_"
InfohashPrefixLen := len(InfohashLPrefix)
InfohashesMap := make(map[string]bool)
for _, ih := range infohashes {
ih_str := string(ih.([]byte))
ih_str_infohash := ih_str[InfohashPrefixLen:]
if strings.HasPrefix(ih_str, InfohashLPrefix) {
numLeechers++
InfohashesMap[ih_str_infohash] = true
} else if strings.HasPrefix(ih_str, InfohashSPrefix) {
numSeeders++
InfohashesMap[ih_str_infohash] = true
} else {
log.Error("storage: invalid Redis state", log.Fields{
"Hkey": shard,
"Hfield": ih_str,
})
}
if n, err := conn.Do("GET", ps.seederCountKey(group)); err != nil {
log.Error("storage: GET counter failure", log.Fields{
"key": ps.seederCountKey(group),
"error": err,
})
} else {
numSeeders += n.(int64)
}
if n, err := conn.Do("GET", ps.leecherCountKey(group)); err != nil {
log.Error("storage: GET counter failure", log.Fields{
"key": ps.leecherCountKey(group),
"error": err,
})
} else {
numLeechers += n.(int64)
}
numInfohashes += uint64(len(InfohashesMap))
}
storage.PromInfohashesCount.Set(float64(numInfohashes))
storage.PromSeedersCount.Set(float64(numSeeders))
storage.PromLeechersCount.Set(float64(numLeechers))
log.Debug("storage: populateProm() aggregates metrics over all shards", log.Fields{
"numInfohashes": float64(numInfohashes),
"numSeeders": float64(numSeeders),
"numLeechers": float64(numLeechers),
})
}
// recordGCDuration records the duration of a GC sweep.
func recordGCDuration(duration time.Duration) {
log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": float64(duration.Nanoseconds()) / float64(time.Millisecond)})
storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
func (ps *peerStore) getClock() int64 {
@ -347,10 +334,10 @@ func (ps *peerStore) getClock() int64 {
}
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
IPver := p.IP.AddressFamily.String()
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: PutSeeder", log.Fields{
"InfoHash": ih.String(),
"Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port),
"Peer": p,
})
select {
@ -361,29 +348,43 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
pk := newPeerKey(p)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
ct := ps.getClock()
conn := ps.rb.open()
defer conn.Close()
// Update the peer in the swarm.
encodedSeederInfoHash := IPver + "_S_" + ih.String()
ct := ps.getClock()
_, err := conn.Do("HSET", encodedSeederInfoHash, pk, ct)
conn.Send("MULTI")
conn.Send("HSET", encodedSeederInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
reply, err := redis.Values(conn.Do("EXEC"))
if err != nil {
return err
}
_, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct)
if err != nil {
return err
// pk is a new field.
if reply[0].(int64) == 1 {
_, err = conn.Do("INCR", ps.seederCountKey(addressFamily))
if err != nil {
return err
}
}
// encodedSeederInfoHash is a new field.
if reply[1].(int64) == 1 {
_, err = conn.Do("INCR", ps.infohashCountKey(addressFamily))
if err != nil {
return err
}
}
return nil
}
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
IPver := p.IP.AddressFamily.String()
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: DeleteSeeder", log.Fields{
"InfoHash": ih.String(),
"Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port),
"Peer": p,
})
select {
@ -397,24 +398,27 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
conn := ps.rb.open()
defer conn.Close()
encodedSeederInfoHash := IPver + "_S_" + ih.String()
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
DelNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk)
delNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk)
if err != nil {
return err
}
if DelNum.(int64) == 0 {
if delNum.(int64) == 0 {
return storage.ErrResourceDoesNotExist
}
if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil {
return err
}
return nil
}
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
IPver := p.IP.AddressFamily.String()
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: PutLeecher", log.Fields{
"InfoHash": ih.String(),
"Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port),
"Peer": p,
})
select {
@ -423,31 +427,36 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
default:
}
// Update the peer in the swarm.
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
pk := newPeerKey(p)
ct := ps.getClock()
conn := ps.rb.open()
defer conn.Close()
// Update the peer in the swarm.
encodedLeecherInfoHash := IPver + "_L_" + ih.String()
ct := ps.getClock()
_, err := conn.Do("HSET", encodedLeecherInfoHash, pk, ct)
conn.Send("MULTI")
conn.Send("HSET", encodedLeecherInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct)
reply, err := redis.Values(conn.Do("EXEC"))
if err != nil {
return err
}
_, err = conn.Do("HSET", IPver, encodedLeecherInfoHash, ct)
if err != nil {
return err
// pk is a new field.
if reply[0].(int64) == 1 {
_, err = conn.Do("INCR", ps.leecherCountKey(addressFamily))
if err != nil {
return err
}
}
return nil
}
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
IPver := p.IP.AddressFamily.String()
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: DeleteLeecher", log.Fields{
"InfoHash": ih.String(),
"Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port),
"Peer": p,
})
select {
@ -456,29 +465,31 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
default:
}
pk := newPeerKey(p)
conn := ps.rb.open()
defer conn.Close()
encodedLeecherInfoHash := IPver + "_L_" + ih.String()
pk := newPeerKey(p)
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
DelNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk)
delNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk)
if err != nil {
return err
}
if DelNum.(int64) == 0 {
if delNum.(int64) == 0 {
return storage.ErrResourceDoesNotExist
}
if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil {
return err
}
return nil
}
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
IPver := p.IP.AddressFamily.String()
addressFamily := p.IP.AddressFamily.String()
log.Debug("storage: GraduateLeecher", log.Fields{
"InfoHash": ih.String(),
"Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port),
"Peer": p,
})
select {
@ -487,41 +498,52 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
default:
}
encodedInfoHash := ih.String()
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
pk := newPeerKey(p)
ct := ps.getClock()
conn := ps.rb.open()
defer conn.Close()
encodedInfoHash := ih.String()
encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash
encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash
_, err := conn.Do("HDEL", encodedLeecherInfoHash, pk)
conn.Send("MULTI")
conn.Send("HDEL", encodedLeecherInfoHash, pk)
conn.Send("HSET", encodedSeederInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
reply, err := redis.Values(conn.Do("EXEC"))
if err != nil {
return err
}
// Update the peer in the swarm.
ct := ps.getClock()
_, err = conn.Do("HSET", encodedSeederInfoHash, pk, ct)
if err != nil {
return err
if reply[0].(int64) == 1 {
_, err = conn.Do("DECR", ps.leecherCountKey(addressFamily))
if err != nil {
return err
}
}
_, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct)
if err != nil {
return err
if reply[1].(int64) == 1 {
_, err = conn.Do("INCR", ps.seederCountKey(addressFamily))
if err != nil {
return err
}
}
if reply[2].(int64) == 1 {
_, err = conn.Do("INCR", ps.infohashCountKey(addressFamily))
if err != nil {
return err
}
}
return nil
}
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
IPver := announcer.IP.AddressFamily.String()
addressFamily := announcer.IP.AddressFamily.String()
log.Debug("storage: AnnouncePeers", log.Fields{
"InfoHash": ih.String(),
"seeder": seeder,
"numWant": numWant,
"Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", announcer.ID.String(), announcer.IP.String(), IPver, announcer.Port),
"Peer": announcer,
})
select {
@ -531,8 +553,8 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
}
encodedInfoHash := ih.String()
encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key
encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
conn := ps.rb.open()
defer conn.Close()
@ -592,18 +614,10 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
}
}
APResult := ""
for _, pr := range peers {
APResult = fmt.Sprintf("%s Peer:[ID: %s, IP: %s(AddressFamily: %s), Port %d]", APResult, pr.ID.String(), pr.IP.String(), IPver, pr.Port)
}
log.Debug("storage: AnnouncePeers result", log.Fields{
"peers": APResult,
})
return
}
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) {
select {
case <-ps.closed:
panic("attempted to interact with stopped redis store")
@ -611,10 +625,10 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren
}
resp.InfoHash = ih
IPver := addressFamily.String()
addressFamily := af.String()
encodedInfoHash := ih.String()
encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key
encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash)
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash)
conn := ps.rb.open()
defer conn.Close()
@ -627,7 +641,6 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren
})
return
}
lLen := leechersLen.(int64)
seedersLen, err := conn.Do("HLEN", encodedSeederInfoHash)
if err != nil {
@ -637,14 +650,9 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren
})
return
}
sLen := seedersLen.(int64)
if lLen == 0 && sLen == 0 {
return
}
resp.Incomplete = uint32(lLen)
resp.Complete = uint32(sLen)
resp.Incomplete = uint32(leechersLen.(int64))
resp.Complete = uint32(seedersLen.(int64))
return
}
@ -661,16 +669,15 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
default:
}
shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()}
conn := ps.rb.open()
defer conn.Close()
cutoffUnix := cutoff.UnixNano()
start := time.Now()
for _, shard := range shards {
infohashesList, err := conn.Do("HKEYS", shard) // key
for _, group := range ps.groups() {
// list all infohashes in the group
infohashesList, err := conn.Do("HKEYS", group)
if err != nil {
return err
}
@ -678,51 +685,47 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
for _, ih := range infohashes {
ihStr := string(ih.([]byte))
isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S"
ihList, err := conn.Do("HGETALL", ihStr) // field
// list all (peer, timeout) pairs for the ih
ihList, err := conn.Do("HGETALL", ihStr)
if err != nil {
return err
}
conIhList := ihList.([]interface{})
if len(conIhList) == 0 {
_, err := conn.Do("DEL", ihStr)
if err != nil {
return err
}
log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr})
_, err = conn.Do("HDEL", shard, ihStr)
if err != nil {
return err
}
log.Debug("storage: Deleting Redis", log.Fields{
"Hkey": shard,
"Hfield": ihStr,
})
continue
}
var pk serializedPeer
var removedPeerCount int64
for index, ihField := range conIhList {
if index%2 != 0 { // value
if index%2 == 1 { // value
mtime, err := strconv.ParseInt(string(ihField.([]byte)), 10, 64)
if err != nil {
return err
}
if mtime <= cutoffUnix {
_, err := conn.Do("HDEL", ihStr, pk)
ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk))
if err != nil {
return err
}
p := decodePeerKey(pk)
log.Debug("storage: Deleting peer", log.Fields{
"Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), p.IP.AddressFamily.String(), p.Port),
removedPeerCount += ret
log.Debug("storage: deleting peer", log.Fields{
"Peer": decodePeerKey(pk).String(),
})
}
} else { // key
pk = serializedPeer(ihField.([]byte))
}
}
// DECR seeder/leecher counter
decrCounter := ps.leecherCountKey(group)
if isSeeder {
decrCounter = ps.seederCountKey(group)
}
if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil {
return err
}
ihLen, err := conn.Do("HLEN", ihStr)
if err != nil {
@ -733,38 +736,36 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
if err != nil {
return err
}
log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr})
_, err = conn.Do("HDEL", shard, ihStr)
log.Debug("storage: deleting infohash", log.Fields{
"Group": group,
"Hkey": ihStr,
})
_, err = conn.Do("HDEL", group, ihStr)
if err != nil {
return err
}
log.Debug("storage: Deleting Redis", log.Fields{
"Hkey": shard,
"Hfield": ihStr,
})
}
}
}
recordGCDuration(time.Since(start))
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(duration)
return nil
}
func (ps *peerStore) Stop() <-chan error {
c := make(chan error)
func (ps *peerStore) Stop() stop.Result {
c := make(stop.Channel)
go func() {
close(ps.closed)
ps.wg.Wait()
// TODO(duyanghao): something to be done?
// chihaya does not clear data in redis when exiting.
// chihaya keys have prefix `IPv{4,6}_`.
close(c)
}()
return c
return c.Result()
}
func (ps *peerStore) LogFields() log.Fields {

View file

@ -1,15 +1,29 @@
package redis
import (
"fmt"
"testing"
"time"
"github.com/alicebob/miniredis"
s "github.com/chihaya/chihaya/storage"
)
func createNew() s.PeerStore {
ps, err := New(Config{GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute, PeerLifetime: 30 * time.Minute, RedisBroker: "redis://myRedis@127.0.0.1:6379/0"})
rs, err := miniredis.Run()
if err != nil {
panic(err)
}
redisURL := fmt.Sprintf("redis://@%s/0", rs.Addr())
ps, err := New(Config{
GarbageCollectionInterval: 10 * time.Minute,
PrometheusReportingInterval: 10 * time.Minute,
PeerLifetime: 30 * time.Minute,
RedisBroker: redisURL,
RedisReadTimeout: 10 * time.Second,
RedisWriteTimeout: 10 * time.Second,
RedisConnectTimeout: 10 * time.Second})
if err != nil {
panic(err)
}

135
storage/redis/redis.go Normal file
View file

@ -0,0 +1,135 @@
package redis
import (
"errors"
"net/url"
"strconv"
"strings"
"time"
"github.com/gomodule/redigo/redis"
redsync "gopkg.in/redsync.v1"
)
// redisBackend represents a redis handler.
type redisBackend struct {
pool *redis.Pool
redsync *redsync.Redsync
}
// newRedisBackend creates a redisBackend instance.
func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend {
rc := &redisConnector{
URL: u,
SocketPath: socketPath,
ReadTimeout: cfg.RedisReadTimeout,
WriteTimeout: cfg.RedisWriteTimeout,
ConnectTimeout: cfg.RedisConnectTimeout,
}
pool := rc.NewPool()
redsync := redsync.New([]redsync.Pool{pool})
return &redisBackend{
pool: pool,
redsync: redsync,
}
}
// open returns or creates instance of Redis connection.
func (rb *redisBackend) open() redis.Conn {
return rb.pool.Get()
}
type redisConnector struct {
URL *redisURL
SocketPath string
ReadTimeout time.Duration
WriteTimeout time.Duration
ConnectTimeout time.Duration
}
// NewPool returns a new pool of Redis connections
func (rc *redisConnector) NewPool() *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := rc.open()
if err != nil {
return nil, err
}
if rc.URL.DB != 0 {
_, err = c.Do("SELECT", rc.URL.DB)
if err != nil {
return nil, err
}
}
return c, err
},
// PINGs connections that have been idle more than 10 seconds
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Duration(10*time.Second) {
return nil
}
_, err := c.Do("PING")
return err
},
}
}
// Open a new Redis connection
func (rc *redisConnector) open() (redis.Conn, error) {
var opts = []redis.DialOption{
redis.DialDatabase(rc.URL.DB),
redis.DialReadTimeout(rc.ReadTimeout),
redis.DialWriteTimeout(rc.WriteTimeout),
redis.DialConnectTimeout(rc.ConnectTimeout),
}
if rc.URL.Password != "" {
opts = append(opts, redis.DialPassword(rc.URL.Password))
}
if rc.SocketPath != "" {
return redis.Dial("unix", rc.SocketPath, opts...)
}
return redis.Dial("tcp", rc.URL.Host, opts...)
}
// A redisURL represents a parsed redisURL
// The general form represented is:
//
// redis://[password@]host][/][db]
type redisURL struct {
Host string
Password string
DB int
}
// parseRedisURL parse rawurl into redisURL
func parseRedisURL(target string) (*redisURL, error) {
var u *url.URL
u, err := url.Parse(target)
if err != nil {
return nil, err
}
if u.Scheme != "redis" {
return nil, errors.New("no redis scheme found")
}
db := 0 //default redis db
parts := strings.Split(u.Path, "/")
if len(parts) != 1 {
db, err = strconv.Atoi(parts[1])
if err != nil {
return nil, err
}
}
return &redisURL{
Host: u.Host,
Password: u.User.String(),
DB: db,
}, nil
}