From d65ab677e7bf0aa5f40e22d79630922e9b1a4f62 Mon Sep 17 00:00:00 2001 From: onestraw Date: Wed, 12 Dec 2018 15:26:11 +0800 Subject: [PATCH] storage/redis: refactor redis storage - Based on @duyanghao's PR - Make staticcheck pass - Address review commentsq --- README.md | 6 +- docs/storage/redis.md | 72 +++++ storage/redis/common/redis.go | 61 ---- storage/redis/peer_store.go | 491 ++++++++++++++++--------------- storage/redis/peer_store_test.go | 18 +- storage/redis/redis.go | 135 +++++++++ 6 files changed, 471 insertions(+), 312 deletions(-) create mode 100644 docs/storage/redis.md delete mode 100644 storage/redis/common/redis.go create mode 100644 storage/redis/redis.go diff --git a/README.md b/README.md index 4302aa2..39faa59 100644 --- a/README.md +++ b/README.md @@ -20,12 +20,14 @@ Differentiating features include: - IPv4 and IPv6 support - [YAML] configuration - Metrics via [Prometheus] +- High Availability via [Redis] [releases]: https://github.com/chihaya/chihaya/releases [BitTorrent tracker]: http://en.wikipedia.org/wiki/BitTorrent_tracker [Go]: https://golang.org [YAML]: http://yaml.org [Prometheus]: http://prometheus.io +[Redis]: https://redis.io ## Why Chihaya? @@ -155,10 +157,6 @@ After all PreHooks have executed, any missing response fields that are required PostHooks are asynchronous tasks that occur after a response has been delivered to the client. Request data is written to the storage asynchronously in one of these PostHooks. -### HA(High Availability) - -Chihaya can achieve high availability by using [redis](https://redis.io/) storage backend. - ## Related projects - [BitTorrent.org](https://github.com/bittorrent/bittorrent.org): a static website containing the BitTorrent spec and all BEPs diff --git a/docs/storage/redis.md b/docs/storage/redis.md new file mode 100644 index 0000000..552939c --- /dev/null +++ b/docs/storage/redis.md @@ -0,0 +1,72 @@ +# Redis Storage + +This storage system separates chihaya from storage and stores all peer data in Redis to achieve HA. + +## Use Case + +When one chihaya instance is down, the Redis can continuily serve peer data through other chihaya instances. + +## Configuration + +```yaml +chihaya: + storage: + name: redis + config: + # The frequency which stale peers are removed. + gc_interval: 14m + + # The frequency which metrics are pushed into a local Prometheus endpoint. + prometheus_reporting_interval: 1s + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 16m + + # The address of redis storage. + redis_broker: "redis://pwd@127.0.0.1:6379/0" + + # The timeout for reading a command reply from redis. + redis_read_timeout: 15s + + # The timeout for writing a command to redis. + redis_write_timeout: 15s + + # The timeout for connecting to redis server. + redis_connect_timeout: 15s +``` + +## Implementation + +Seeders and Leechers for a particular InfoHash are stored with a redis hash structure, the infohash is used as hash key, peer key is field, last modified time is value. + +All the InfoHashes (swarms) are also stored into redis hash, IP family is the key, infohash is field, last modified time is value. + +Here is an example + +``` +- IPv4 + - IPv4_S_: + - IPv4_L_: + - IPv4_S_: +- IPv4_S_ + - : + - : +- IPv4_L_ + - : +- IPv4_S_ + - : +``` + + +In this case, prometheus will record two swarms, three seeders and one leecher. + +So tree keys are used to record the count of swarms, seeders and leechers for each group (IPv4, IPv6). + +``` +- IPv4_infohash_count: 2 +- IPv4_S_count: 3 +- IPv4_L_count: 1 +``` + +Note: IPv4_infohash_count has the different meaning with `memory` storage, it represents the number of infohashes reported by seeder. diff --git a/storage/redis/common/redis.go b/storage/redis/common/redis.go deleted file mode 100644 index 16f8e45..0000000 --- a/storage/redis/common/redis.go +++ /dev/null @@ -1,61 +0,0 @@ -package common - -import ( - "time" - - "github.com/garyburd/redigo/redis" -) - -// RedisConnector ... -type RedisConnector struct{} - -// NewPool returns a new pool of Redis connections -func (rc *RedisConnector) NewPool(socketPath, host, password string, db int) *redis.Pool { - return &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := rc.open(socketPath, host, password, db) - if err != nil { - return nil, err - } - - if db != 0 { - _, err = c.Do("SELECT", db) - if err != nil { - return nil, err - } - } - - return c, err - }, - // PINGs connections that have been idle more than 10 seconds - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Duration(10*time.Second) { - return nil - } - _, err := c.Do("PING") - return err - }, - } -} - -// Open a new Redis connection -func (rc *RedisConnector) open(socketPath, host, password string, db int) (redis.Conn, error) { - var opts = []redis.DialOption{ - redis.DialDatabase(db), - redis.DialReadTimeout(15 * time.Second), - redis.DialWriteTimeout(15 * time.Second), - redis.DialConnectTimeout(15 * time.Second), - } - - if password != "" { - opts = append(opts, redis.DialPassword(password)) - } - - if socketPath != "" { - return redis.Dial("unix", socketPath, opts...) - } - - return redis.Dial("tcp", host, opts...) -} diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index 092f696..fde14c8 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -1,29 +1,43 @@ // Package redis implements the storage interface for a Chihaya -// BitTorrent tracker keeping peer data in redis. +// BitTorrent tracker keeping peer data in redis with hash. +// There two categories of hash: +// +// - IPv{4,6}_{L,S}_infohash +// To save peers that hold the infohash, used for fast searching, +// deleting, and timeout handling +// +// - IPv{4,6} +// To save all the infohashes, used for garbage collection, +// metrics aggregation and leecher graduation +// +// Tree keys are used to record the count of swarms, seeders +// and leechers for each group (IPv4, IPv6). +// +// - IPv{4,6}_infohash_count +// To record the number of infohashes. +// +// - IPv{4,6}_S_count +// To record the number of seeders. +// +// - IPv{4,6}_L_count +// To record the number of leechers. package redis import ( "encoding/binary" "net" - neturl "net/url" + "strconv" "sync" "time" - "gopkg.in/yaml.v2" + "github.com/gomodule/redigo/redis" + yaml "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/storage" - - "errors" - "fmt" - "strconv" - "strings" - - "github.com/chihaya/chihaya/storage/redis/common" - "github.com/garyburd/redigo/redis" - "gopkg.in/redsync.v1" ) // Name is the name by which this peer store is registered with Chihaya. @@ -35,6 +49,9 @@ const ( defaultGarbageCollectionInterval = time.Minute * 3 defaultPeerLifetime = time.Minute * 30 defaultRedisBroker = "redis://myRedis@127.0.0.1:6379/0" + defaultRedisReadTimeout = time.Second * 15 + defaultRedisWriteTimeout = time.Second * 15 + defaultRedisConnectTimeout = time.Second * 15 ) func init() { @@ -67,16 +84,22 @@ type Config struct { PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` PeerLifetime time.Duration `yaml:"peer_lifetime"` RedisBroker string `yaml:"redis_broker"` + RedisReadTimeout time.Duration `yaml:"redis_read_timeout"` + RedisWriteTimeout time.Duration `yaml:"redis_write_timeout"` + RedisConnectTimeout time.Duration `yaml:"redis_connect_timeout"` } // LogFields renders the current config as a set of Logrus fields. func (cfg Config) LogFields() log.Fields { return log.Fields{ - "name": Name, - "gcInterval": cfg.GarbageCollectionInterval, - "promReportInterval": cfg.PrometheusReportingInterval, - "peerLifetime": cfg.PeerLifetime, - "redisBroker": cfg.RedisBroker, + "name": Name, + "gcInterval": cfg.GarbageCollectionInterval, + "promReportInterval": cfg.PrometheusReportingInterval, + "peerLifetime": cfg.PeerLifetime, + "redisBroker": cfg.RedisBroker, + "redisReadTimeout": cfg.RedisReadTimeout, + "redisWriteTimeout": cfg.RedisWriteTimeout, + "redisConnectTimeout": cfg.RedisConnectTimeout, } } @@ -96,6 +119,33 @@ func (cfg Config) Validate() Config { }) } + if cfg.RedisReadTimeout <= 0 { + validcfg.RedisReadTimeout = defaultRedisReadTimeout + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".RedisReadTimeout", + "provided": cfg.RedisReadTimeout, + "default": validcfg.RedisReadTimeout, + }) + } + + if cfg.RedisWriteTimeout <= 0 { + validcfg.RedisWriteTimeout = defaultRedisWriteTimeout + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".RedisWriteTimeout", + "provided": cfg.RedisWriteTimeout, + "default": validcfg.RedisWriteTimeout, + }) + } + + if cfg.RedisConnectTimeout <= 0 { + validcfg.RedisConnectTimeout = defaultRedisConnectTimeout + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".RedisConnectTimeout", + "provided": cfg.RedisConnectTimeout, + "default": validcfg.RedisConnectTimeout, + }) + } + if cfg.GarbageCollectionInterval <= 0 { validcfg.GarbageCollectionInterval = defaultGarbageCollectionInterval log.Warn("falling back to default configuration", log.Fields{ @@ -126,74 +176,18 @@ func (cfg Config) Validate() Config { return validcfg } -// ParseRedisURL ... -func ParseRedisURL(url string) (host, password string, db int, err error) { - // redis://pwd@host/db - - var u *neturl.URL - u, err = neturl.Parse(url) - if err != nil { - return - } - if u.Scheme != "redis" { - err = errors.New("No redis scheme found") - return - } - - if u.User != nil { - password = u.User.String() - } - - host = u.Host - - parts := strings.Split(u.Path, "/") - if len(parts) == 1 { - db = 0 //default redis db - } else { - db, err = strconv.Atoi(parts[1]) - if err != nil { - db, err = 0, nil //ignore err here - } - } - - return -} - -// NewRedisBackend creates RedisBackend instance -func NewRedisBackend(host, password, socketPath string, db int) *RedisBackend { - return &RedisBackend{ - host: host, - db: db, - password: password, - socketPath: socketPath, - } -} - -// open returns or creates instance of Redis connection -func (rb *RedisBackend) open() redis.Conn { - if rb.pool == nil { - rb.pool = rb.NewPool(rb.socketPath, rb.host, rb.password, rb.db) - } - if rb.redsync == nil { - var pools = []redsync.Pool{rb.pool} - rb.redsync = redsync.New(pools) - } - return rb.pool.Get() -} - // New creates a new PeerStore backed by redis. func New(provided Config) (storage.PeerStore, error) { cfg := provided.Validate() - // creates RedisBackend instance - h, p, db, err := ParseRedisURL(cfg.RedisBroker) + u, err := parseRedisURL(cfg.RedisBroker) if err != nil { return nil, err } ps := &peerStore{ cfg: cfg, - rb: NewRedisBackend(h, p, "", db), + rb: newRedisBackend(&provided, u, ""), closed: make(chan struct{}), } @@ -263,83 +257,76 @@ func decodePeerKey(pk serializedPeer) bittorrent.Peer { return peer } -// RedisBackend represents a Memcache result backend -type RedisBackend struct { - host string - password string - db int - pool *redis.Pool - // If set, path to a socket file overrides hostname - socketPath string - redsync *redsync.Redsync - common.RedisConnector -} - type peerStore struct { cfg Config - rb *RedisBackend + rb *redisBackend closed chan struct{} wg sync.WaitGroup } -// populateProm aggregates metrics over all shards and then posts them to +func (ps *peerStore) groups() []string { + return []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} +} + +func (ps *peerStore) leecherInfohashKey(af, ih string) string { + return af + "_L_" + ih +} + +func (ps *peerStore) seederInfohashKey(af, ih string) string { + return af + "_S_" + ih +} + +func (ps *peerStore) infohashCountKey(af string) string { + return af + "_infohash_count" +} + +func (ps *peerStore) seederCountKey(af string) string { + return af + "_S_count" +} + +func (ps *peerStore) leecherCountKey(af string) string { + return af + "_L_count" +} + +// populateProm aggregates metrics over all groups and then posts them to // prometheus. func (ps *peerStore) populateProm() { - var numInfohashes, numSeeders, numLeechers uint64 - - shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} + var numInfohashes, numSeeders, numLeechers int64 conn := ps.rb.open() defer conn.Close() - for _, shard := range shards { - infohashes_list, err := conn.Do("HKEYS", shard) // key - if err != nil { - return + for _, group := range ps.groups() { + if n, err := conn.Do("GET", ps.infohashCountKey(group)); err != nil { + log.Error("storage: GET counter failure", log.Fields{ + "key": ps.infohashCountKey(group), + "error": err, + }) + } else { + numInfohashes += n.(int64) } - infohashes := infohashes_list.([]interface{}) - - InfohashLPrefix := shard + "_L_" - InfohashSPrefix := shard + "_S_" - InfohashPrefixLen := len(InfohashLPrefix) - InfohashesMap := make(map[string]bool) - - for _, ih := range infohashes { - ih_str := string(ih.([]byte)) - ih_str_infohash := ih_str[InfohashPrefixLen:] - if strings.HasPrefix(ih_str, InfohashLPrefix) { - numLeechers++ - InfohashesMap[ih_str_infohash] = true - } else if strings.HasPrefix(ih_str, InfohashSPrefix) { - numSeeders++ - InfohashesMap[ih_str_infohash] = true - } else { - log.Error("storage: invalid Redis state", log.Fields{ - "Hkey": shard, - "Hfield": ih_str, - }) - } + if n, err := conn.Do("GET", ps.seederCountKey(group)); err != nil { + log.Error("storage: GET counter failure", log.Fields{ + "key": ps.seederCountKey(group), + "error": err, + }) + } else { + numSeeders += n.(int64) + } + if n, err := conn.Do("GET", ps.leecherCountKey(group)); err != nil { + log.Error("storage: GET counter failure", log.Fields{ + "key": ps.leecherCountKey(group), + "error": err, + }) + } else { + numLeechers += n.(int64) } - - numInfohashes += uint64(len(InfohashesMap)) } storage.PromInfohashesCount.Set(float64(numInfohashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) - - log.Debug("storage: populateProm() aggregates metrics over all shards", log.Fields{ - "numInfohashes": float64(numInfohashes), - "numSeeders": float64(numSeeders), - "numLeechers": float64(numLeechers), - }) -} - -// recordGCDuration records the duration of a GC sweep. -func recordGCDuration(duration time.Duration) { - log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": float64(duration.Nanoseconds()) / float64(time.Millisecond)}) - storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } func (ps *peerStore) getClock() int64 { @@ -347,10 +334,10 @@ func (ps *peerStore) getClock() int64 { } func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutSeeder", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -361,29 +348,43 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error pk := newPeerKey(p) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) + ct := ps.getClock() + conn := ps.rb.open() defer conn.Close() - // Update the peer in the swarm. - encodedSeederInfoHash := IPver + "_S_" + ih.String() - ct := ps.getClock() - _, err := conn.Do("HSET", encodedSeederInfoHash, pk, ct) + conn.Send("MULTI") + conn.Send("HSET", encodedSeederInfoHash, pk, ct) + conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) + reply, err := redis.Values(conn.Do("EXEC")) if err != nil { return err } - _, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct) - if err != nil { - return err + + // pk is a new field. + if reply[0].(int64) == 1 { + _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) + if err != nil { + return err + } + } + // encodedSeederInfoHash is a new field. + if reply[1].(int64) == 1 { + _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) + if err != nil { + return err + } } return nil } func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteSeeder", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -397,24 +398,27 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err conn := ps.rb.open() defer conn.Close() - encodedSeederInfoHash := IPver + "_S_" + ih.String() + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) - DelNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) + delNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) if err != nil { return err } - if DelNum.(int64) == 0 { + if delNum.(int64) == 0 { return storage.ErrResourceDoesNotExist } + if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil { + return err + } return nil } func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutLeecher", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -423,31 +427,36 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error default: } + // Update the peer in the swarm. + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) pk := newPeerKey(p) + ct := ps.getClock() conn := ps.rb.open() defer conn.Close() - // Update the peer in the swarm. - encodedLeecherInfoHash := IPver + "_L_" + ih.String() - ct := ps.getClock() - _, err := conn.Do("HSET", encodedLeecherInfoHash, pk, ct) + conn.Send("MULTI") + conn.Send("HSET", encodedLeecherInfoHash, pk, ct) + conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct) + reply, err := redis.Values(conn.Do("EXEC")) if err != nil { return err } - _, err = conn.Do("HSET", IPver, encodedLeecherInfoHash, ct) - if err != nil { - return err + // pk is a new field. + if reply[0].(int64) == 1 { + _, err = conn.Do("INCR", ps.leecherCountKey(addressFamily)) + if err != nil { + return err + } } - return nil } func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteLeecher", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -456,29 +465,31 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er default: } - pk := newPeerKey(p) - conn := ps.rb.open() defer conn.Close() - encodedLeecherInfoHash := IPver + "_L_" + ih.String() + pk := newPeerKey(p) + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) - DelNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + delNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) if err != nil { return err } - if DelNum.(int64) == 0 { + if delNum.(int64) == 0 { return storage.ErrResourceDoesNotExist } + if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil { + return err + } return nil } func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: GraduateLeecher", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -487,41 +498,52 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) default: } + encodedInfoHash := ih.String() + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) pk := newPeerKey(p) + ct := ps.getClock() conn := ps.rb.open() defer conn.Close() - encodedInfoHash := ih.String() - encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash - encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash - - _, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + conn.Send("MULTI") + conn.Send("HDEL", encodedLeecherInfoHash, pk) + conn.Send("HSET", encodedSeederInfoHash, pk, ct) + conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) + reply, err := redis.Values(conn.Do("EXEC")) if err != nil { return err } - - // Update the peer in the swarm. - ct := ps.getClock() - _, err = conn.Do("HSET", encodedSeederInfoHash, pk, ct) - if err != nil { - return err + if reply[0].(int64) == 1 { + _, err = conn.Do("DECR", ps.leecherCountKey(addressFamily)) + if err != nil { + return err + } } - _, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct) - if err != nil { - return err + if reply[1].(int64) == 1 { + _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) + if err != nil { + return err + } + } + if reply[2].(int64) == 1 { + _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) + if err != nil { + return err + } } return nil } func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { - IPver := announcer.IP.AddressFamily.String() + addressFamily := announcer.IP.AddressFamily.String() log.Debug("storage: AnnouncePeers", log.Fields{ "InfoHash": ih.String(), "seeder": seeder, "numWant": numWant, - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", announcer.ID.String(), announcer.IP.String(), IPver, announcer.Port), + "Peer": announcer, }) select { @@ -531,8 +553,8 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant } encodedInfoHash := ih.String() - encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key - encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) conn := ps.rb.open() defer conn.Close() @@ -592,18 +614,10 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant } } - APResult := "" - for _, pr := range peers { - APResult = fmt.Sprintf("%s Peer:[ID: %s, IP: %s(AddressFamily: %s), Port %d]", APResult, pr.ID.String(), pr.IP.String(), IPver, pr.Port) - } - log.Debug("storage: AnnouncePeers result", log.Fields{ - "peers": APResult, - }) - return } -func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) { +func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) { select { case <-ps.closed: panic("attempted to interact with stopped redis store") @@ -611,10 +625,10 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren } resp.InfoHash = ih - IPver := addressFamily.String() + addressFamily := af.String() encodedInfoHash := ih.String() - encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key - encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) conn := ps.rb.open() defer conn.Close() @@ -627,7 +641,6 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren }) return } - lLen := leechersLen.(int64) seedersLen, err := conn.Do("HLEN", encodedSeederInfoHash) if err != nil { @@ -637,14 +650,9 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren }) return } - sLen := seedersLen.(int64) - if lLen == 0 && sLen == 0 { - return - } - - resp.Incomplete = uint32(lLen) - resp.Complete = uint32(sLen) + resp.Incomplete = uint32(leechersLen.(int64)) + resp.Complete = uint32(seedersLen.(int64)) return } @@ -661,16 +669,15 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { default: } - shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} - conn := ps.rb.open() defer conn.Close() cutoffUnix := cutoff.UnixNano() start := time.Now() - for _, shard := range shards { - infohashesList, err := conn.Do("HKEYS", shard) // key + for _, group := range ps.groups() { + // list all infohashes in the group + infohashesList, err := conn.Do("HKEYS", group) if err != nil { return err } @@ -678,51 +685,47 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { for _, ih := range infohashes { ihStr := string(ih.([]byte)) + isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S" - ihList, err := conn.Do("HGETALL", ihStr) // field + // list all (peer, timeout) pairs for the ih + ihList, err := conn.Do("HGETALL", ihStr) if err != nil { return err } conIhList := ihList.([]interface{}) - if len(conIhList) == 0 { - _, err := conn.Do("DEL", ihStr) - if err != nil { - return err - } - log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr}) - _, err = conn.Do("HDEL", shard, ihStr) - if err != nil { - return err - } - log.Debug("storage: Deleting Redis", log.Fields{ - "Hkey": shard, - "Hfield": ihStr, - }) - continue - } - var pk serializedPeer + var removedPeerCount int64 for index, ihField := range conIhList { - if index%2 != 0 { // value + if index%2 == 1 { // value mtime, err := strconv.ParseInt(string(ihField.([]byte)), 10, 64) if err != nil { return err } if mtime <= cutoffUnix { - _, err := conn.Do("HDEL", ihStr, pk) + ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk)) if err != nil { return err } - p := decodePeerKey(pk) - log.Debug("storage: Deleting peer", log.Fields{ - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), p.IP.AddressFamily.String(), p.Port), + + removedPeerCount += ret + + log.Debug("storage: deleting peer", log.Fields{ + "Peer": decodePeerKey(pk).String(), }) } } else { // key pk = serializedPeer(ihField.([]byte)) } } + // DECR seeder/leecher counter + decrCounter := ps.leecherCountKey(group) + if isSeeder { + decrCounter = ps.seederCountKey(group) + } + if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil { + return err + } ihLen, err := conn.Do("HLEN", ihStr) if err != nil { @@ -733,38 +736,36 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { if err != nil { return err } - log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr}) - _, err = conn.Do("HDEL", shard, ihStr) + log.Debug("storage: deleting infohash", log.Fields{ + "Group": group, + "Hkey": ihStr, + }) + _, err = conn.Do("HDEL", group, ihStr) if err != nil { return err } - log.Debug("storage: Deleting Redis", log.Fields{ - "Hkey": shard, - "Hfield": ihStr, - }) } - } - } - recordGCDuration(time.Since(start)) + duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) + log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) + storage.PromGCDurationMilliseconds.Observe(duration) return nil } -func (ps *peerStore) Stop() <-chan error { - c := make(chan error) +func (ps *peerStore) Stop() stop.Result { + c := make(stop.Channel) go func() { close(ps.closed) ps.wg.Wait() - - // TODO(duyanghao): something to be done? - + // chihaya does not clear data in redis when exiting. + // chihaya keys have prefix `IPv{4,6}_`. close(c) }() - return c + return c.Result() } func (ps *peerStore) LogFields() log.Fields { diff --git a/storage/redis/peer_store_test.go b/storage/redis/peer_store_test.go index 099bce6..305927a 100644 --- a/storage/redis/peer_store_test.go +++ b/storage/redis/peer_store_test.go @@ -1,15 +1,29 @@ package redis import ( + "fmt" "testing" - "time" + "github.com/alicebob/miniredis" + s "github.com/chihaya/chihaya/storage" ) func createNew() s.PeerStore { - ps, err := New(Config{GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute, PeerLifetime: 30 * time.Minute, RedisBroker: "redis://myRedis@127.0.0.1:6379/0"}) + rs, err := miniredis.Run() + if err != nil { + panic(err) + } + redisURL := fmt.Sprintf("redis://@%s/0", rs.Addr()) + ps, err := New(Config{ + GarbageCollectionInterval: 10 * time.Minute, + PrometheusReportingInterval: 10 * time.Minute, + PeerLifetime: 30 * time.Minute, + RedisBroker: redisURL, + RedisReadTimeout: 10 * time.Second, + RedisWriteTimeout: 10 * time.Second, + RedisConnectTimeout: 10 * time.Second}) if err != nil { panic(err) } diff --git a/storage/redis/redis.go b/storage/redis/redis.go new file mode 100644 index 0000000..714955b --- /dev/null +++ b/storage/redis/redis.go @@ -0,0 +1,135 @@ +package redis + +import ( + "errors" + "net/url" + "strconv" + "strings" + "time" + + "github.com/gomodule/redigo/redis" + redsync "gopkg.in/redsync.v1" +) + +// redisBackend represents a redis handler. +type redisBackend struct { + pool *redis.Pool + redsync *redsync.Redsync +} + +// newRedisBackend creates a redisBackend instance. +func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend { + rc := &redisConnector{ + URL: u, + SocketPath: socketPath, + ReadTimeout: cfg.RedisReadTimeout, + WriteTimeout: cfg.RedisWriteTimeout, + ConnectTimeout: cfg.RedisConnectTimeout, + } + pool := rc.NewPool() + redsync := redsync.New([]redsync.Pool{pool}) + return &redisBackend{ + pool: pool, + redsync: redsync, + } +} + +// open returns or creates instance of Redis connection. +func (rb *redisBackend) open() redis.Conn { + return rb.pool.Get() +} + +type redisConnector struct { + URL *redisURL + SocketPath string + ReadTimeout time.Duration + WriteTimeout time.Duration + ConnectTimeout time.Duration +} + +// NewPool returns a new pool of Redis connections +func (rc *redisConnector) NewPool() *redis.Pool { + return &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := rc.open() + if err != nil { + return nil, err + } + + if rc.URL.DB != 0 { + _, err = c.Do("SELECT", rc.URL.DB) + if err != nil { + return nil, err + } + } + + return c, err + }, + // PINGs connections that have been idle more than 10 seconds + TestOnBorrow: func(c redis.Conn, t time.Time) error { + if time.Since(t) < time.Duration(10*time.Second) { + return nil + } + _, err := c.Do("PING") + return err + }, + } +} + +// Open a new Redis connection +func (rc *redisConnector) open() (redis.Conn, error) { + var opts = []redis.DialOption{ + redis.DialDatabase(rc.URL.DB), + redis.DialReadTimeout(rc.ReadTimeout), + redis.DialWriteTimeout(rc.WriteTimeout), + redis.DialConnectTimeout(rc.ConnectTimeout), + } + + if rc.URL.Password != "" { + opts = append(opts, redis.DialPassword(rc.URL.Password)) + } + + if rc.SocketPath != "" { + return redis.Dial("unix", rc.SocketPath, opts...) + } + + return redis.Dial("tcp", rc.URL.Host, opts...) +} + +// A redisURL represents a parsed redisURL +// The general form represented is: +// +// redis://[password@]host][/][db] +type redisURL struct { + Host string + Password string + DB int +} + +// parseRedisURL parse rawurl into redisURL +func parseRedisURL(target string) (*redisURL, error) { + var u *url.URL + u, err := url.Parse(target) + if err != nil { + return nil, err + } + if u.Scheme != "redis" { + return nil, errors.New("no redis scheme found") + } + + db := 0 //default redis db + parts := strings.Split(u.Path, "/") + if len(parts) != 1 { + db, err = strconv.Atoi(parts[1]) + if err != nil { + return nil, err + } + } + return &redisURL{ + Host: u.Host, + Password: u.User.String(), + DB: db, + }, nil +}