From e78892d5acaa739209a2921d5a5215636e4fcc6f Mon Sep 17 00:00:00 2001 From: duyanghao <1294057873@qq.com> Date: Fri, 9 Feb 2018 16:09:17 +0800 Subject: [PATCH 1/9] Add Support for Redis Storage Backend(To Achieve Chihaya High Availability) Change-Id: I5cf703095d1060ac17e403b86056d3eccad97f2c Signed-off-by: duyanghao <1294057873@qq.com> --- README.md | 4 + cmd/chihaya/config.go | 1 + storage/redis/common/redis.go | 61 +++ storage/redis/peer_store.go | 772 +++++++++++++++++++++++++++++++ storage/redis/peer_store_test.go | 47 ++ 5 files changed, 885 insertions(+) create mode 100644 storage/redis/common/redis.go create mode 100644 storage/redis/peer_store.go create mode 100644 storage/redis/peer_store_test.go diff --git a/README.md b/README.md index fe05b74..4302aa2 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,10 @@ After all PreHooks have executed, any missing response fields that are required PostHooks are asynchronous tasks that occur after a response has been delivered to the client. Request data is written to the storage asynchronously in one of these PostHooks. +### HA(High Availability) + +Chihaya can achieve high availability by using [redis](https://redis.io/) storage backend. + ## Related projects - [BitTorrent.org](https://github.com/bittorrent/bittorrent.org): a static website containing the BitTorrent spec and all BEPs diff --git a/cmd/chihaya/config.go b/cmd/chihaya/config.go index 2fc883e..1a217e0 100644 --- a/cmd/chihaya/config.go +++ b/cmd/chihaya/config.go @@ -24,6 +24,7 @@ import ( // Imports to register storage drivers. _ "github.com/chihaya/chihaya/storage/memory" _ "github.com/chihaya/chihaya/storage/memorybysubnet" + _ "github.com/chihaya/chihaya/storage/redis" ) type storageConfig struct { diff --git a/storage/redis/common/redis.go b/storage/redis/common/redis.go new file mode 100644 index 0000000..16f8e45 --- /dev/null +++ b/storage/redis/common/redis.go @@ -0,0 +1,61 @@ +package common + +import ( + "time" + + "github.com/garyburd/redigo/redis" +) + +// RedisConnector ... +type RedisConnector struct{} + +// NewPool returns a new pool of Redis connections +func (rc *RedisConnector) NewPool(socketPath, host, password string, db int) *redis.Pool { + return &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := rc.open(socketPath, host, password, db) + if err != nil { + return nil, err + } + + if db != 0 { + _, err = c.Do("SELECT", db) + if err != nil { + return nil, err + } + } + + return c, err + }, + // PINGs connections that have been idle more than 10 seconds + TestOnBorrow: func(c redis.Conn, t time.Time) error { + if time.Since(t) < time.Duration(10*time.Second) { + return nil + } + _, err := c.Do("PING") + return err + }, + } +} + +// Open a new Redis connection +func (rc *RedisConnector) open(socketPath, host, password string, db int) (redis.Conn, error) { + var opts = []redis.DialOption{ + redis.DialDatabase(db), + redis.DialReadTimeout(15 * time.Second), + redis.DialWriteTimeout(15 * time.Second), + redis.DialConnectTimeout(15 * time.Second), + } + + if password != "" { + opts = append(opts, redis.DialPassword(password)) + } + + if socketPath != "" { + return redis.Dial("unix", socketPath, opts...) + } + + return redis.Dial("tcp", host, opts...) +} diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go new file mode 100644 index 0000000..092f696 --- /dev/null +++ b/storage/redis/peer_store.go @@ -0,0 +1,772 @@ +// Package redis implements the storage interface for a Chihaya +// BitTorrent tracker keeping peer data in redis. +package redis + +import ( + "encoding/binary" + "net" + neturl "net/url" + "sync" + "time" + + "gopkg.in/yaml.v2" + + "github.com/chihaya/chihaya/bittorrent" + "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/timecache" + "github.com/chihaya/chihaya/storage" + + "errors" + "fmt" + "strconv" + "strings" + + "github.com/chihaya/chihaya/storage/redis/common" + "github.com/garyburd/redigo/redis" + "gopkg.in/redsync.v1" +) + +// Name is the name by which this peer store is registered with Chihaya. +const Name = "redis" + +// Default config constants. +const ( + defaultPrometheusReportingInterval = time.Second * 1 + defaultGarbageCollectionInterval = time.Minute * 3 + defaultPeerLifetime = time.Minute * 30 + defaultRedisBroker = "redis://myRedis@127.0.0.1:6379/0" +) + +func init() { + // Register the storage driver. + storage.RegisterDriver(Name, driver{}) +} + +type driver struct{} + +func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { + // Marshal the config back into bytes. + bytes, err := yaml.Marshal(icfg) + if err != nil { + return nil, err + } + + // Unmarshal the bytes into the proper config type. + var cfg Config + err = yaml.Unmarshal(bytes, &cfg) + if err != nil { + return nil, err + } + + return New(cfg) +} + +// Config holds the configuration of a redis PeerStore. +type Config struct { + GarbageCollectionInterval time.Duration `yaml:"gc_interval"` + PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` + PeerLifetime time.Duration `yaml:"peer_lifetime"` + RedisBroker string `yaml:"redis_broker"` +} + +// LogFields renders the current config as a set of Logrus fields. +func (cfg Config) LogFields() log.Fields { + return log.Fields{ + "name": Name, + "gcInterval": cfg.GarbageCollectionInterval, + "promReportInterval": cfg.PrometheusReportingInterval, + "peerLifetime": cfg.PeerLifetime, + "redisBroker": cfg.RedisBroker, + } +} + +// Validate sanity checks values set in a config and returns a new config with +// default values replacing anything that is invalid. +// +// This function warns to the logger when a value is changed. +func (cfg Config) Validate() Config { + validcfg := cfg + + if cfg.RedisBroker == "" { + validcfg.RedisBroker = defaultRedisBroker + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".RedisBroker", + "provided": cfg.RedisBroker, + "default": validcfg.RedisBroker, + }) + } + + if cfg.GarbageCollectionInterval <= 0 { + validcfg.GarbageCollectionInterval = defaultGarbageCollectionInterval + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".GarbageCollectionInterval", + "provided": cfg.GarbageCollectionInterval, + "default": validcfg.GarbageCollectionInterval, + }) + } + + if cfg.PrometheusReportingInterval <= 0 { + validcfg.PrometheusReportingInterval = defaultPrometheusReportingInterval + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".PrometheusReportingInterval", + "provided": cfg.PrometheusReportingInterval, + "default": validcfg.PrometheusReportingInterval, + }) + } + + if cfg.PeerLifetime <= 0 { + validcfg.PeerLifetime = defaultPeerLifetime + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".PeerLifetime", + "provided": cfg.PeerLifetime, + "default": validcfg.PeerLifetime, + }) + } + + return validcfg +} + +// ParseRedisURL ... +func ParseRedisURL(url string) (host, password string, db int, err error) { + // redis://pwd@host/db + + var u *neturl.URL + u, err = neturl.Parse(url) + if err != nil { + return + } + if u.Scheme != "redis" { + err = errors.New("No redis scheme found") + return + } + + if u.User != nil { + password = u.User.String() + } + + host = u.Host + + parts := strings.Split(u.Path, "/") + if len(parts) == 1 { + db = 0 //default redis db + } else { + db, err = strconv.Atoi(parts[1]) + if err != nil { + db, err = 0, nil //ignore err here + } + } + + return +} + +// NewRedisBackend creates RedisBackend instance +func NewRedisBackend(host, password, socketPath string, db int) *RedisBackend { + return &RedisBackend{ + host: host, + db: db, + password: password, + socketPath: socketPath, + } +} + +// open returns or creates instance of Redis connection +func (rb *RedisBackend) open() redis.Conn { + if rb.pool == nil { + rb.pool = rb.NewPool(rb.socketPath, rb.host, rb.password, rb.db) + } + if rb.redsync == nil { + var pools = []redsync.Pool{rb.pool} + rb.redsync = redsync.New(pools) + } + return rb.pool.Get() +} + +// New creates a new PeerStore backed by redis. +func New(provided Config) (storage.PeerStore, error) { + cfg := provided.Validate() + + // creates RedisBackend instance + h, p, db, err := ParseRedisURL(cfg.RedisBroker) + if err != nil { + return nil, err + } + + ps := &peerStore{ + cfg: cfg, + rb: NewRedisBackend(h, p, "", db), + closed: make(chan struct{}), + } + + // Start a goroutine for garbage collection. + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + for { + select { + case <-ps.closed: + return + case <-time.After(cfg.GarbageCollectionInterval): + before := time.Now().Add(-cfg.PeerLifetime) + log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) + ps.collectGarbage(before) + } + } + }() + + // Start a goroutine for reporting statistics to Prometheus. + ps.wg.Add(1) + go func() { + defer ps.wg.Done() + t := time.NewTicker(cfg.PrometheusReportingInterval) + for { + select { + case <-ps.closed: + t.Stop() + return + case <-t.C: + before := time.Now() + ps.populateProm() + log.Debug("storage: populateProm() finished", log.Fields{"timeTaken": time.Since(before)}) + } + } + }() + + return ps, nil +} + +type serializedPeer string + +func newPeerKey(p bittorrent.Peer) serializedPeer { + b := make([]byte, 20+2+len(p.IP.IP)) + copy(b[:20], p.ID[:]) + binary.BigEndian.PutUint16(b[20:22], p.Port) + copy(b[22:], p.IP.IP) + + return serializedPeer(b) +} + +func decodePeerKey(pk serializedPeer) bittorrent.Peer { + peer := bittorrent.Peer{ + ID: bittorrent.PeerIDFromString(string(pk[:20])), + Port: binary.BigEndian.Uint16([]byte(pk[20:22])), + IP: bittorrent.IP{IP: net.IP(pk[22:])}} + + if ip := peer.IP.To4(); ip != nil { + peer.IP.IP = ip + peer.IP.AddressFamily = bittorrent.IPv4 + } else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil + peer.IP.AddressFamily = bittorrent.IPv6 + } else { + panic("IP is neither v4 nor v6") + } + + return peer +} + +// RedisBackend represents a Memcache result backend +type RedisBackend struct { + host string + password string + db int + pool *redis.Pool + // If set, path to a socket file overrides hostname + socketPath string + redsync *redsync.Redsync + common.RedisConnector +} + +type peerStore struct { + cfg Config + rb *RedisBackend + + closed chan struct{} + wg sync.WaitGroup +} + +// populateProm aggregates metrics over all shards and then posts them to +// prometheus. +func (ps *peerStore) populateProm() { + var numInfohashes, numSeeders, numLeechers uint64 + + shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} + + conn := ps.rb.open() + defer conn.Close() + + for _, shard := range shards { + infohashes_list, err := conn.Do("HKEYS", shard) // key + if err != nil { + return + } + infohashes := infohashes_list.([]interface{}) + + InfohashLPrefix := shard + "_L_" + InfohashSPrefix := shard + "_S_" + InfohashPrefixLen := len(InfohashLPrefix) + InfohashesMap := make(map[string]bool) + + for _, ih := range infohashes { + ih_str := string(ih.([]byte)) + ih_str_infohash := ih_str[InfohashPrefixLen:] + if strings.HasPrefix(ih_str, InfohashLPrefix) { + numLeechers++ + InfohashesMap[ih_str_infohash] = true + } else if strings.HasPrefix(ih_str, InfohashSPrefix) { + numSeeders++ + InfohashesMap[ih_str_infohash] = true + } else { + log.Error("storage: invalid Redis state", log.Fields{ + "Hkey": shard, + "Hfield": ih_str, + }) + } + } + + numInfohashes += uint64(len(InfohashesMap)) + } + + storage.PromInfohashesCount.Set(float64(numInfohashes)) + storage.PromSeedersCount.Set(float64(numSeeders)) + storage.PromLeechersCount.Set(float64(numLeechers)) + + log.Debug("storage: populateProm() aggregates metrics over all shards", log.Fields{ + "numInfohashes": float64(numInfohashes), + "numSeeders": float64(numSeeders), + "numLeechers": float64(numLeechers), + }) +} + +// recordGCDuration records the duration of a GC sweep. +func recordGCDuration(duration time.Duration) { + log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": float64(duration.Nanoseconds()) / float64(time.Millisecond)}) + storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) +} + +func (ps *peerStore) getClock() int64 { + return timecache.NowUnixNano() +} + +func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { + IPver := p.IP.AddressFamily.String() + log.Debug("storage: PutSeeder", log.Fields{ + "InfoHash": ih.String(), + "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + }) + + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + + pk := newPeerKey(p) + + conn := ps.rb.open() + defer conn.Close() + + // Update the peer in the swarm. + encodedSeederInfoHash := IPver + "_S_" + ih.String() + ct := ps.getClock() + _, err := conn.Do("HSET", encodedSeederInfoHash, pk, ct) + if err != nil { + return err + } + _, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct) + if err != nil { + return err + } + + return nil +} + +func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { + IPver := p.IP.AddressFamily.String() + log.Debug("storage: DeleteSeeder", log.Fields{ + "InfoHash": ih.String(), + "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + }) + + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + + pk := newPeerKey(p) + + conn := ps.rb.open() + defer conn.Close() + + encodedSeederInfoHash := IPver + "_S_" + ih.String() + + DelNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) + if err != nil { + return err + } + if DelNum.(int64) == 0 { + return storage.ErrResourceDoesNotExist + } + + return nil +} + +func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { + IPver := p.IP.AddressFamily.String() + log.Debug("storage: PutLeecher", log.Fields{ + "InfoHash": ih.String(), + "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + }) + + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + + pk := newPeerKey(p) + + conn := ps.rb.open() + defer conn.Close() + + // Update the peer in the swarm. + encodedLeecherInfoHash := IPver + "_L_" + ih.String() + ct := ps.getClock() + _, err := conn.Do("HSET", encodedLeecherInfoHash, pk, ct) + if err != nil { + return err + } + _, err = conn.Do("HSET", IPver, encodedLeecherInfoHash, ct) + if err != nil { + return err + } + + return nil +} + +func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { + IPver := p.IP.AddressFamily.String() + log.Debug("storage: DeleteLeecher", log.Fields{ + "InfoHash": ih.String(), + "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + }) + + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + + pk := newPeerKey(p) + + conn := ps.rb.open() + defer conn.Close() + + encodedLeecherInfoHash := IPver + "_L_" + ih.String() + + DelNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + if err != nil { + return err + } + if DelNum.(int64) == 0 { + return storage.ErrResourceDoesNotExist + } + + return nil +} + +func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { + IPver := p.IP.AddressFamily.String() + log.Debug("storage: GraduateLeecher", log.Fields{ + "InfoHash": ih.String(), + "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + }) + + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + + pk := newPeerKey(p) + + conn := ps.rb.open() + defer conn.Close() + + encodedInfoHash := ih.String() + encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash + encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash + + _, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + if err != nil { + return err + } + + // Update the peer in the swarm. + ct := ps.getClock() + _, err = conn.Do("HSET", encodedSeederInfoHash, pk, ct) + if err != nil { + return err + } + _, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct) + if err != nil { + return err + } + + return nil +} + +func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { + IPver := announcer.IP.AddressFamily.String() + log.Debug("storage: AnnouncePeers", log.Fields{ + "InfoHash": ih.String(), + "seeder": seeder, + "numWant": numWant, + "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", announcer.ID.String(), announcer.IP.String(), IPver, announcer.Port), + }) + + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + + encodedInfoHash := ih.String() + encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key + encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key + + conn := ps.rb.open() + defer conn.Close() + + leechers, err := conn.Do("HKEYS", encodedLeecherInfoHash) + if err != nil { + return nil, err + } + conLeechers := leechers.([]interface{}) + + seeders, err := conn.Do("HKEYS", encodedSeederInfoHash) + if err != nil { + return nil, err + } + conSeeders := seeders.([]interface{}) + + if len(conLeechers) == 0 && len(conSeeders) == 0 { + return nil, storage.ErrResourceDoesNotExist + } + + if seeder { + // Append leechers as possible. + for _, pk := range conLeechers { + if numWant == 0 { + break + } + + peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte)))) + numWant-- + } + } else { + // Append as many seeders as possible. + for _, pk := range conSeeders { + if numWant == 0 { + break + } + + peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte)))) + numWant-- + } + + // Append leechers until we reach numWant. + if numWant > 0 { + announcerPK := newPeerKey(announcer) + for _, pk := range conLeechers { + if pk == announcerPK { + continue + } + + if numWant == 0 { + break + } + + peers = append(peers, decodePeerKey(serializedPeer(pk.([]byte)))) + numWant-- + } + } + } + + APResult := "" + for _, pr := range peers { + APResult = fmt.Sprintf("%s Peer:[ID: %s, IP: %s(AddressFamily: %s), Port %d]", APResult, pr.ID.String(), pr.IP.String(), IPver, pr.Port) + } + log.Debug("storage: AnnouncePeers result", log.Fields{ + "peers": APResult, + }) + + return +} + +func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) { + select { + case <-ps.closed: + panic("attempted to interact with stopped redis store") + default: + } + + resp.InfoHash = ih + IPver := addressFamily.String() + encodedInfoHash := ih.String() + encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key + encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key + + conn := ps.rb.open() + defer conn.Close() + + leechersLen, err := conn.Do("HLEN", encodedLeecherInfoHash) + if err != nil { + log.Error("storage: Redis HLEN failure", log.Fields{ + "Hkey": encodedLeecherInfoHash, + "error": err, + }) + return + } + lLen := leechersLen.(int64) + + seedersLen, err := conn.Do("HLEN", encodedSeederInfoHash) + if err != nil { + log.Error("storage: Redis HLEN failure", log.Fields{ + "Hkey": encodedSeederInfoHash, + "error": err, + }) + return + } + sLen := seedersLen.(int64) + + if lLen == 0 && sLen == 0 { + return + } + + resp.Incomplete = uint32(lLen) + resp.Complete = uint32(sLen) + + return +} + +// collectGarbage deletes all Peers from the PeerStore which are older than the +// cutoff time. +// +// This function must be able to execute while other methods on this interface +// are being executed in parallel. +func (ps *peerStore) collectGarbage(cutoff time.Time) error { + select { + case <-ps.closed: + return nil + default: + } + + shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} + + conn := ps.rb.open() + defer conn.Close() + + cutoffUnix := cutoff.UnixNano() + start := time.Now() + + for _, shard := range shards { + infohashesList, err := conn.Do("HKEYS", shard) // key + if err != nil { + return err + } + infohashes := infohashesList.([]interface{}) + + for _, ih := range infohashes { + ihStr := string(ih.([]byte)) + + ihList, err := conn.Do("HGETALL", ihStr) // field + if err != nil { + return err + } + conIhList := ihList.([]interface{}) + + if len(conIhList) == 0 { + _, err := conn.Do("DEL", ihStr) + if err != nil { + return err + } + log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr}) + _, err = conn.Do("HDEL", shard, ihStr) + if err != nil { + return err + } + log.Debug("storage: Deleting Redis", log.Fields{ + "Hkey": shard, + "Hfield": ihStr, + }) + continue + } + + var pk serializedPeer + for index, ihField := range conIhList { + if index%2 != 0 { // value + mtime, err := strconv.ParseInt(string(ihField.([]byte)), 10, 64) + if err != nil { + return err + } + if mtime <= cutoffUnix { + _, err := conn.Do("HDEL", ihStr, pk) + if err != nil { + return err + } + p := decodePeerKey(pk) + log.Debug("storage: Deleting peer", log.Fields{ + "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), p.IP.AddressFamily.String(), p.Port), + }) + } + } else { // key + pk = serializedPeer(ihField.([]byte)) + } + } + + ihLen, err := conn.Do("HLEN", ihStr) + if err != nil { + return err + } + if ihLen.(int64) == 0 { + _, err := conn.Do("DEL", ihStr) + if err != nil { + return err + } + log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr}) + _, err = conn.Do("HDEL", shard, ihStr) + if err != nil { + return err + } + log.Debug("storage: Deleting Redis", log.Fields{ + "Hkey": shard, + "Hfield": ihStr, + }) + } + + } + + } + + recordGCDuration(time.Since(start)) + + return nil +} + +func (ps *peerStore) Stop() <-chan error { + c := make(chan error) + go func() { + close(ps.closed) + ps.wg.Wait() + + // TODO(duyanghao): something to be done? + + close(c) + }() + + return c +} + +func (ps *peerStore) LogFields() log.Fields { + return ps.cfg.LogFields() +} diff --git a/storage/redis/peer_store_test.go b/storage/redis/peer_store_test.go new file mode 100644 index 0000000..099bce6 --- /dev/null +++ b/storage/redis/peer_store_test.go @@ -0,0 +1,47 @@ +package redis + +import ( + "testing" + + "time" + + s "github.com/chihaya/chihaya/storage" +) + +func createNew() s.PeerStore { + ps, err := New(Config{GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute, PeerLifetime: 30 * time.Minute, RedisBroker: "redis://myRedis@127.0.0.1:6379/0"}) + if err != nil { + panic(err) + } + return ps +} + +func TestPeerStore(t *testing.T) { s.TestPeerStore(t, createNew()) } + +func BenchmarkNop(b *testing.B) { s.Nop(b, createNew()) } +func BenchmarkPut(b *testing.B) { s.Put(b, createNew()) } +func BenchmarkPut1k(b *testing.B) { s.Put1k(b, createNew()) } +func BenchmarkPut1kInfohash(b *testing.B) { s.Put1kInfohash(b, createNew()) } +func BenchmarkPut1kInfohash1k(b *testing.B) { s.Put1kInfohash1k(b, createNew()) } +func BenchmarkPutDelete(b *testing.B) { s.PutDelete(b, createNew()) } +func BenchmarkPutDelete1k(b *testing.B) { s.PutDelete1k(b, createNew()) } +func BenchmarkPutDelete1kInfohash(b *testing.B) { s.PutDelete1kInfohash(b, createNew()) } +func BenchmarkPutDelete1kInfohash1k(b *testing.B) { s.PutDelete1kInfohash1k(b, createNew()) } +func BenchmarkDeleteNonexist(b *testing.B) { s.DeleteNonexist(b, createNew()) } +func BenchmarkDeleteNonexist1k(b *testing.B) { s.DeleteNonexist1k(b, createNew()) } +func BenchmarkDeleteNonexist1kInfohash(b *testing.B) { s.DeleteNonexist1kInfohash(b, createNew()) } +func BenchmarkDeleteNonexist1kInfohash1k(b *testing.B) { s.DeleteNonexist1kInfohash1k(b, createNew()) } +func BenchmarkPutGradDelete(b *testing.B) { s.PutGradDelete(b, createNew()) } +func BenchmarkPutGradDelete1k(b *testing.B) { s.PutGradDelete1k(b, createNew()) } +func BenchmarkPutGradDelete1kInfohash(b *testing.B) { s.PutGradDelete1kInfohash(b, createNew()) } +func BenchmarkPutGradDelete1kInfohash1k(b *testing.B) { s.PutGradDelete1kInfohash1k(b, createNew()) } +func BenchmarkGradNonexist(b *testing.B) { s.GradNonexist(b, createNew()) } +func BenchmarkGradNonexist1k(b *testing.B) { s.GradNonexist1k(b, createNew()) } +func BenchmarkGradNonexist1kInfohash(b *testing.B) { s.GradNonexist1kInfohash(b, createNew()) } +func BenchmarkGradNonexist1kInfohash1k(b *testing.B) { s.GradNonexist1kInfohash1k(b, createNew()) } +func BenchmarkAnnounceLeecher(b *testing.B) { s.AnnounceLeecher(b, createNew()) } +func BenchmarkAnnounceLeecher1kInfohash(b *testing.B) { s.AnnounceLeecher1kInfohash(b, createNew()) } +func BenchmarkAnnounceSeeder(b *testing.B) { s.AnnounceSeeder(b, createNew()) } +func BenchmarkAnnounceSeeder1kInfohash(b *testing.B) { s.AnnounceSeeder1kInfohash(b, createNew()) } +func BenchmarkScrapeSwarm(b *testing.B) { s.ScrapeSwarm(b, createNew()) } +func BenchmarkScrapeSwarm1kInfohash(b *testing.B) { s.ScrapeSwarm1kInfohash(b, createNew()) } From d65ab677e7bf0aa5f40e22d79630922e9b1a4f62 Mon Sep 17 00:00:00 2001 From: onestraw Date: Wed, 12 Dec 2018 15:26:11 +0800 Subject: [PATCH 2/9] storage/redis: refactor redis storage - Based on @duyanghao's PR - Make staticcheck pass - Address review commentsq --- README.md | 6 +- docs/storage/redis.md | 72 +++++ storage/redis/common/redis.go | 61 ---- storage/redis/peer_store.go | 491 ++++++++++++++++--------------- storage/redis/peer_store_test.go | 18 +- storage/redis/redis.go | 135 +++++++++ 6 files changed, 471 insertions(+), 312 deletions(-) create mode 100644 docs/storage/redis.md delete mode 100644 storage/redis/common/redis.go create mode 100644 storage/redis/redis.go diff --git a/README.md b/README.md index 4302aa2..39faa59 100644 --- a/README.md +++ b/README.md @@ -20,12 +20,14 @@ Differentiating features include: - IPv4 and IPv6 support - [YAML] configuration - Metrics via [Prometheus] +- High Availability via [Redis] [releases]: https://github.com/chihaya/chihaya/releases [BitTorrent tracker]: http://en.wikipedia.org/wiki/BitTorrent_tracker [Go]: https://golang.org [YAML]: http://yaml.org [Prometheus]: http://prometheus.io +[Redis]: https://redis.io ## Why Chihaya? @@ -155,10 +157,6 @@ After all PreHooks have executed, any missing response fields that are required PostHooks are asynchronous tasks that occur after a response has been delivered to the client. Request data is written to the storage asynchronously in one of these PostHooks. -### HA(High Availability) - -Chihaya can achieve high availability by using [redis](https://redis.io/) storage backend. - ## Related projects - [BitTorrent.org](https://github.com/bittorrent/bittorrent.org): a static website containing the BitTorrent spec and all BEPs diff --git a/docs/storage/redis.md b/docs/storage/redis.md new file mode 100644 index 0000000..552939c --- /dev/null +++ b/docs/storage/redis.md @@ -0,0 +1,72 @@ +# Redis Storage + +This storage system separates chihaya from storage and stores all peer data in Redis to achieve HA. + +## Use Case + +When one chihaya instance is down, the Redis can continuily serve peer data through other chihaya instances. + +## Configuration + +```yaml +chihaya: + storage: + name: redis + config: + # The frequency which stale peers are removed. + gc_interval: 14m + + # The frequency which metrics are pushed into a local Prometheus endpoint. + prometheus_reporting_interval: 1s + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 16m + + # The address of redis storage. + redis_broker: "redis://pwd@127.0.0.1:6379/0" + + # The timeout for reading a command reply from redis. + redis_read_timeout: 15s + + # The timeout for writing a command to redis. + redis_write_timeout: 15s + + # The timeout for connecting to redis server. + redis_connect_timeout: 15s +``` + +## Implementation + +Seeders and Leechers for a particular InfoHash are stored with a redis hash structure, the infohash is used as hash key, peer key is field, last modified time is value. + +All the InfoHashes (swarms) are also stored into redis hash, IP family is the key, infohash is field, last modified time is value. + +Here is an example + +``` +- IPv4 + - IPv4_S_: + - IPv4_L_: + - IPv4_S_: +- IPv4_S_ + - : + - : +- IPv4_L_ + - : +- IPv4_S_ + - : +``` + + +In this case, prometheus will record two swarms, three seeders and one leecher. + +So tree keys are used to record the count of swarms, seeders and leechers for each group (IPv4, IPv6). + +``` +- IPv4_infohash_count: 2 +- IPv4_S_count: 3 +- IPv4_L_count: 1 +``` + +Note: IPv4_infohash_count has the different meaning with `memory` storage, it represents the number of infohashes reported by seeder. diff --git a/storage/redis/common/redis.go b/storage/redis/common/redis.go deleted file mode 100644 index 16f8e45..0000000 --- a/storage/redis/common/redis.go +++ /dev/null @@ -1,61 +0,0 @@ -package common - -import ( - "time" - - "github.com/garyburd/redigo/redis" -) - -// RedisConnector ... -type RedisConnector struct{} - -// NewPool returns a new pool of Redis connections -func (rc *RedisConnector) NewPool(socketPath, host, password string, db int) *redis.Pool { - return &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := rc.open(socketPath, host, password, db) - if err != nil { - return nil, err - } - - if db != 0 { - _, err = c.Do("SELECT", db) - if err != nil { - return nil, err - } - } - - return c, err - }, - // PINGs connections that have been idle more than 10 seconds - TestOnBorrow: func(c redis.Conn, t time.Time) error { - if time.Since(t) < time.Duration(10*time.Second) { - return nil - } - _, err := c.Do("PING") - return err - }, - } -} - -// Open a new Redis connection -func (rc *RedisConnector) open(socketPath, host, password string, db int) (redis.Conn, error) { - var opts = []redis.DialOption{ - redis.DialDatabase(db), - redis.DialReadTimeout(15 * time.Second), - redis.DialWriteTimeout(15 * time.Second), - redis.DialConnectTimeout(15 * time.Second), - } - - if password != "" { - opts = append(opts, redis.DialPassword(password)) - } - - if socketPath != "" { - return redis.Dial("unix", socketPath, opts...) - } - - return redis.Dial("tcp", host, opts...) -} diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index 092f696..fde14c8 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -1,29 +1,43 @@ // Package redis implements the storage interface for a Chihaya -// BitTorrent tracker keeping peer data in redis. +// BitTorrent tracker keeping peer data in redis with hash. +// There two categories of hash: +// +// - IPv{4,6}_{L,S}_infohash +// To save peers that hold the infohash, used for fast searching, +// deleting, and timeout handling +// +// - IPv{4,6} +// To save all the infohashes, used for garbage collection, +// metrics aggregation and leecher graduation +// +// Tree keys are used to record the count of swarms, seeders +// and leechers for each group (IPv4, IPv6). +// +// - IPv{4,6}_infohash_count +// To record the number of infohashes. +// +// - IPv{4,6}_S_count +// To record the number of seeders. +// +// - IPv{4,6}_L_count +// To record the number of leechers. package redis import ( "encoding/binary" "net" - neturl "net/url" + "strconv" "sync" "time" - "gopkg.in/yaml.v2" + "github.com/gomodule/redigo/redis" + yaml "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/storage" - - "errors" - "fmt" - "strconv" - "strings" - - "github.com/chihaya/chihaya/storage/redis/common" - "github.com/garyburd/redigo/redis" - "gopkg.in/redsync.v1" ) // Name is the name by which this peer store is registered with Chihaya. @@ -35,6 +49,9 @@ const ( defaultGarbageCollectionInterval = time.Minute * 3 defaultPeerLifetime = time.Minute * 30 defaultRedisBroker = "redis://myRedis@127.0.0.1:6379/0" + defaultRedisReadTimeout = time.Second * 15 + defaultRedisWriteTimeout = time.Second * 15 + defaultRedisConnectTimeout = time.Second * 15 ) func init() { @@ -67,16 +84,22 @@ type Config struct { PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"` PeerLifetime time.Duration `yaml:"peer_lifetime"` RedisBroker string `yaml:"redis_broker"` + RedisReadTimeout time.Duration `yaml:"redis_read_timeout"` + RedisWriteTimeout time.Duration `yaml:"redis_write_timeout"` + RedisConnectTimeout time.Duration `yaml:"redis_connect_timeout"` } // LogFields renders the current config as a set of Logrus fields. func (cfg Config) LogFields() log.Fields { return log.Fields{ - "name": Name, - "gcInterval": cfg.GarbageCollectionInterval, - "promReportInterval": cfg.PrometheusReportingInterval, - "peerLifetime": cfg.PeerLifetime, - "redisBroker": cfg.RedisBroker, + "name": Name, + "gcInterval": cfg.GarbageCollectionInterval, + "promReportInterval": cfg.PrometheusReportingInterval, + "peerLifetime": cfg.PeerLifetime, + "redisBroker": cfg.RedisBroker, + "redisReadTimeout": cfg.RedisReadTimeout, + "redisWriteTimeout": cfg.RedisWriteTimeout, + "redisConnectTimeout": cfg.RedisConnectTimeout, } } @@ -96,6 +119,33 @@ func (cfg Config) Validate() Config { }) } + if cfg.RedisReadTimeout <= 0 { + validcfg.RedisReadTimeout = defaultRedisReadTimeout + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".RedisReadTimeout", + "provided": cfg.RedisReadTimeout, + "default": validcfg.RedisReadTimeout, + }) + } + + if cfg.RedisWriteTimeout <= 0 { + validcfg.RedisWriteTimeout = defaultRedisWriteTimeout + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".RedisWriteTimeout", + "provided": cfg.RedisWriteTimeout, + "default": validcfg.RedisWriteTimeout, + }) + } + + if cfg.RedisConnectTimeout <= 0 { + validcfg.RedisConnectTimeout = defaultRedisConnectTimeout + log.Warn("falling back to default configuration", log.Fields{ + "name": Name + ".RedisConnectTimeout", + "provided": cfg.RedisConnectTimeout, + "default": validcfg.RedisConnectTimeout, + }) + } + if cfg.GarbageCollectionInterval <= 0 { validcfg.GarbageCollectionInterval = defaultGarbageCollectionInterval log.Warn("falling back to default configuration", log.Fields{ @@ -126,74 +176,18 @@ func (cfg Config) Validate() Config { return validcfg } -// ParseRedisURL ... -func ParseRedisURL(url string) (host, password string, db int, err error) { - // redis://pwd@host/db - - var u *neturl.URL - u, err = neturl.Parse(url) - if err != nil { - return - } - if u.Scheme != "redis" { - err = errors.New("No redis scheme found") - return - } - - if u.User != nil { - password = u.User.String() - } - - host = u.Host - - parts := strings.Split(u.Path, "/") - if len(parts) == 1 { - db = 0 //default redis db - } else { - db, err = strconv.Atoi(parts[1]) - if err != nil { - db, err = 0, nil //ignore err here - } - } - - return -} - -// NewRedisBackend creates RedisBackend instance -func NewRedisBackend(host, password, socketPath string, db int) *RedisBackend { - return &RedisBackend{ - host: host, - db: db, - password: password, - socketPath: socketPath, - } -} - -// open returns or creates instance of Redis connection -func (rb *RedisBackend) open() redis.Conn { - if rb.pool == nil { - rb.pool = rb.NewPool(rb.socketPath, rb.host, rb.password, rb.db) - } - if rb.redsync == nil { - var pools = []redsync.Pool{rb.pool} - rb.redsync = redsync.New(pools) - } - return rb.pool.Get() -} - // New creates a new PeerStore backed by redis. func New(provided Config) (storage.PeerStore, error) { cfg := provided.Validate() - // creates RedisBackend instance - h, p, db, err := ParseRedisURL(cfg.RedisBroker) + u, err := parseRedisURL(cfg.RedisBroker) if err != nil { return nil, err } ps := &peerStore{ cfg: cfg, - rb: NewRedisBackend(h, p, "", db), + rb: newRedisBackend(&provided, u, ""), closed: make(chan struct{}), } @@ -263,83 +257,76 @@ func decodePeerKey(pk serializedPeer) bittorrent.Peer { return peer } -// RedisBackend represents a Memcache result backend -type RedisBackend struct { - host string - password string - db int - pool *redis.Pool - // If set, path to a socket file overrides hostname - socketPath string - redsync *redsync.Redsync - common.RedisConnector -} - type peerStore struct { cfg Config - rb *RedisBackend + rb *redisBackend closed chan struct{} wg sync.WaitGroup } -// populateProm aggregates metrics over all shards and then posts them to +func (ps *peerStore) groups() []string { + return []string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} +} + +func (ps *peerStore) leecherInfohashKey(af, ih string) string { + return af + "_L_" + ih +} + +func (ps *peerStore) seederInfohashKey(af, ih string) string { + return af + "_S_" + ih +} + +func (ps *peerStore) infohashCountKey(af string) string { + return af + "_infohash_count" +} + +func (ps *peerStore) seederCountKey(af string) string { + return af + "_S_count" +} + +func (ps *peerStore) leecherCountKey(af string) string { + return af + "_L_count" +} + +// populateProm aggregates metrics over all groups and then posts them to // prometheus. func (ps *peerStore) populateProm() { - var numInfohashes, numSeeders, numLeechers uint64 - - shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} + var numInfohashes, numSeeders, numLeechers int64 conn := ps.rb.open() defer conn.Close() - for _, shard := range shards { - infohashes_list, err := conn.Do("HKEYS", shard) // key - if err != nil { - return + for _, group := range ps.groups() { + if n, err := conn.Do("GET", ps.infohashCountKey(group)); err != nil { + log.Error("storage: GET counter failure", log.Fields{ + "key": ps.infohashCountKey(group), + "error": err, + }) + } else { + numInfohashes += n.(int64) } - infohashes := infohashes_list.([]interface{}) - - InfohashLPrefix := shard + "_L_" - InfohashSPrefix := shard + "_S_" - InfohashPrefixLen := len(InfohashLPrefix) - InfohashesMap := make(map[string]bool) - - for _, ih := range infohashes { - ih_str := string(ih.([]byte)) - ih_str_infohash := ih_str[InfohashPrefixLen:] - if strings.HasPrefix(ih_str, InfohashLPrefix) { - numLeechers++ - InfohashesMap[ih_str_infohash] = true - } else if strings.HasPrefix(ih_str, InfohashSPrefix) { - numSeeders++ - InfohashesMap[ih_str_infohash] = true - } else { - log.Error("storage: invalid Redis state", log.Fields{ - "Hkey": shard, - "Hfield": ih_str, - }) - } + if n, err := conn.Do("GET", ps.seederCountKey(group)); err != nil { + log.Error("storage: GET counter failure", log.Fields{ + "key": ps.seederCountKey(group), + "error": err, + }) + } else { + numSeeders += n.(int64) + } + if n, err := conn.Do("GET", ps.leecherCountKey(group)); err != nil { + log.Error("storage: GET counter failure", log.Fields{ + "key": ps.leecherCountKey(group), + "error": err, + }) + } else { + numLeechers += n.(int64) } - - numInfohashes += uint64(len(InfohashesMap)) } storage.PromInfohashesCount.Set(float64(numInfohashes)) storage.PromSeedersCount.Set(float64(numSeeders)) storage.PromLeechersCount.Set(float64(numLeechers)) - - log.Debug("storage: populateProm() aggregates metrics over all shards", log.Fields{ - "numInfohashes": float64(numInfohashes), - "numSeeders": float64(numSeeders), - "numLeechers": float64(numLeechers), - }) -} - -// recordGCDuration records the duration of a GC sweep. -func recordGCDuration(duration time.Duration) { - log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": float64(duration.Nanoseconds()) / float64(time.Millisecond)}) - storage.PromGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } func (ps *peerStore) getClock() int64 { @@ -347,10 +334,10 @@ func (ps *peerStore) getClock() int64 { } func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutSeeder", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -361,29 +348,43 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error pk := newPeerKey(p) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) + ct := ps.getClock() + conn := ps.rb.open() defer conn.Close() - // Update the peer in the swarm. - encodedSeederInfoHash := IPver + "_S_" + ih.String() - ct := ps.getClock() - _, err := conn.Do("HSET", encodedSeederInfoHash, pk, ct) + conn.Send("MULTI") + conn.Send("HSET", encodedSeederInfoHash, pk, ct) + conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) + reply, err := redis.Values(conn.Do("EXEC")) if err != nil { return err } - _, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct) - if err != nil { - return err + + // pk is a new field. + if reply[0].(int64) == 1 { + _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) + if err != nil { + return err + } + } + // encodedSeederInfoHash is a new field. + if reply[1].(int64) == 1 { + _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) + if err != nil { + return err + } } return nil } func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteSeeder", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -397,24 +398,27 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err conn := ps.rb.open() defer conn.Close() - encodedSeederInfoHash := IPver + "_S_" + ih.String() + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) - DelNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) + delNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) if err != nil { return err } - if DelNum.(int64) == 0 { + if delNum.(int64) == 0 { return storage.ErrResourceDoesNotExist } + if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil { + return err + } return nil } func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: PutLeecher", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -423,31 +427,36 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error default: } + // Update the peer in the swarm. + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) pk := newPeerKey(p) + ct := ps.getClock() conn := ps.rb.open() defer conn.Close() - // Update the peer in the swarm. - encodedLeecherInfoHash := IPver + "_L_" + ih.String() - ct := ps.getClock() - _, err := conn.Do("HSET", encodedLeecherInfoHash, pk, ct) + conn.Send("MULTI") + conn.Send("HSET", encodedLeecherInfoHash, pk, ct) + conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct) + reply, err := redis.Values(conn.Do("EXEC")) if err != nil { return err } - _, err = conn.Do("HSET", IPver, encodedLeecherInfoHash, ct) - if err != nil { - return err + // pk is a new field. + if reply[0].(int64) == 1 { + _, err = conn.Do("INCR", ps.leecherCountKey(addressFamily)) + if err != nil { + return err + } } - return nil } func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: DeleteLeecher", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -456,29 +465,31 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er default: } - pk := newPeerKey(p) - conn := ps.rb.open() defer conn.Close() - encodedLeecherInfoHash := IPver + "_L_" + ih.String() + pk := newPeerKey(p) + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) - DelNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + delNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) if err != nil { return err } - if DelNum.(int64) == 0 { + if delNum.(int64) == 0 { return storage.ErrResourceDoesNotExist } + if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil { + return err + } return nil } func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error { - IPver := p.IP.AddressFamily.String() + addressFamily := p.IP.AddressFamily.String() log.Debug("storage: GraduateLeecher", log.Fields{ "InfoHash": ih.String(), - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), IPver, p.Port), + "Peer": p, }) select { @@ -487,41 +498,52 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) default: } + encodedInfoHash := ih.String() + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) pk := newPeerKey(p) + ct := ps.getClock() conn := ps.rb.open() defer conn.Close() - encodedInfoHash := ih.String() - encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash - encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash - - _, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + conn.Send("MULTI") + conn.Send("HDEL", encodedLeecherInfoHash, pk) + conn.Send("HSET", encodedSeederInfoHash, pk, ct) + conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) + reply, err := redis.Values(conn.Do("EXEC")) if err != nil { return err } - - // Update the peer in the swarm. - ct := ps.getClock() - _, err = conn.Do("HSET", encodedSeederInfoHash, pk, ct) - if err != nil { - return err + if reply[0].(int64) == 1 { + _, err = conn.Do("DECR", ps.leecherCountKey(addressFamily)) + if err != nil { + return err + } } - _, err = conn.Do("HSET", IPver, encodedSeederInfoHash, ct) - if err != nil { - return err + if reply[1].(int64) == 1 { + _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) + if err != nil { + return err + } + } + if reply[2].(int64) == 1 { + _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) + if err != nil { + return err + } } return nil } func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { - IPver := announcer.IP.AddressFamily.String() + addressFamily := announcer.IP.AddressFamily.String() log.Debug("storage: AnnouncePeers", log.Fields{ "InfoHash": ih.String(), "seeder": seeder, "numWant": numWant, - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", announcer.ID.String(), announcer.IP.String(), IPver, announcer.Port), + "Peer": announcer, }) select { @@ -531,8 +553,8 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant } encodedInfoHash := ih.String() - encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key - encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) conn := ps.rb.open() defer conn.Close() @@ -592,18 +614,10 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant } } - APResult := "" - for _, pr := range peers { - APResult = fmt.Sprintf("%s Peer:[ID: %s, IP: %s(AddressFamily: %s), Port %d]", APResult, pr.ID.String(), pr.IP.String(), IPver, pr.Port) - } - log.Debug("storage: AnnouncePeers result", log.Fields{ - "peers": APResult, - }) - return } -func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) { +func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFamily) (resp bittorrent.Scrape) { select { case <-ps.closed: panic("attempted to interact with stopped redis store") @@ -611,10 +625,10 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren } resp.InfoHash = ih - IPver := addressFamily.String() + addressFamily := af.String() encodedInfoHash := ih.String() - encodedLeecherInfoHash := IPver + "_L_" + encodedInfoHash // key - encodedSeederInfoHash := IPver + "_S_" + encodedInfoHash // key + encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, encodedInfoHash) + encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, encodedInfoHash) conn := ps.rb.open() defer conn.Close() @@ -627,7 +641,6 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren }) return } - lLen := leechersLen.(int64) seedersLen, err := conn.Do("HLEN", encodedSeederInfoHash) if err != nil { @@ -637,14 +650,9 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren }) return } - sLen := seedersLen.(int64) - if lLen == 0 && sLen == 0 { - return - } - - resp.Incomplete = uint32(lLen) - resp.Complete = uint32(sLen) + resp.Incomplete = uint32(leechersLen.(int64)) + resp.Complete = uint32(seedersLen.(int64)) return } @@ -661,16 +669,15 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { default: } - shards := [2]string{bittorrent.IPv4.String(), bittorrent.IPv6.String()} - conn := ps.rb.open() defer conn.Close() cutoffUnix := cutoff.UnixNano() start := time.Now() - for _, shard := range shards { - infohashesList, err := conn.Do("HKEYS", shard) // key + for _, group := range ps.groups() { + // list all infohashes in the group + infohashesList, err := conn.Do("HKEYS", group) if err != nil { return err } @@ -678,51 +685,47 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { for _, ih := range infohashes { ihStr := string(ih.([]byte)) + isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S" - ihList, err := conn.Do("HGETALL", ihStr) // field + // list all (peer, timeout) pairs for the ih + ihList, err := conn.Do("HGETALL", ihStr) if err != nil { return err } conIhList := ihList.([]interface{}) - if len(conIhList) == 0 { - _, err := conn.Do("DEL", ihStr) - if err != nil { - return err - } - log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr}) - _, err = conn.Do("HDEL", shard, ihStr) - if err != nil { - return err - } - log.Debug("storage: Deleting Redis", log.Fields{ - "Hkey": shard, - "Hfield": ihStr, - }) - continue - } - var pk serializedPeer + var removedPeerCount int64 for index, ihField := range conIhList { - if index%2 != 0 { // value + if index%2 == 1 { // value mtime, err := strconv.ParseInt(string(ihField.([]byte)), 10, 64) if err != nil { return err } if mtime <= cutoffUnix { - _, err := conn.Do("HDEL", ihStr, pk) + ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk)) if err != nil { return err } - p := decodePeerKey(pk) - log.Debug("storage: Deleting peer", log.Fields{ - "Peer": fmt.Sprintf("[ID: %s, IP: %s(AddressFamily: %s), Port %d]", p.ID.String(), p.IP.String(), p.IP.AddressFamily.String(), p.Port), + + removedPeerCount += ret + + log.Debug("storage: deleting peer", log.Fields{ + "Peer": decodePeerKey(pk).String(), }) } } else { // key pk = serializedPeer(ihField.([]byte)) } } + // DECR seeder/leecher counter + decrCounter := ps.leecherCountKey(group) + if isSeeder { + decrCounter = ps.seederCountKey(group) + } + if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil { + return err + } ihLen, err := conn.Do("HLEN", ihStr) if err != nil { @@ -733,38 +736,36 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { if err != nil { return err } - log.Debug("storage: Deleting Redis", log.Fields{"Hkey": ihStr}) - _, err = conn.Do("HDEL", shard, ihStr) + log.Debug("storage: deleting infohash", log.Fields{ + "Group": group, + "Hkey": ihStr, + }) + _, err = conn.Do("HDEL", group, ihStr) if err != nil { return err } - log.Debug("storage: Deleting Redis", log.Fields{ - "Hkey": shard, - "Hfield": ihStr, - }) } - } - } - recordGCDuration(time.Since(start)) + duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond) + log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration}) + storage.PromGCDurationMilliseconds.Observe(duration) return nil } -func (ps *peerStore) Stop() <-chan error { - c := make(chan error) +func (ps *peerStore) Stop() stop.Result { + c := make(stop.Channel) go func() { close(ps.closed) ps.wg.Wait() - - // TODO(duyanghao): something to be done? - + // chihaya does not clear data in redis when exiting. + // chihaya keys have prefix `IPv{4,6}_`. close(c) }() - return c + return c.Result() } func (ps *peerStore) LogFields() log.Fields { diff --git a/storage/redis/peer_store_test.go b/storage/redis/peer_store_test.go index 099bce6..305927a 100644 --- a/storage/redis/peer_store_test.go +++ b/storage/redis/peer_store_test.go @@ -1,15 +1,29 @@ package redis import ( + "fmt" "testing" - "time" + "github.com/alicebob/miniredis" + s "github.com/chihaya/chihaya/storage" ) func createNew() s.PeerStore { - ps, err := New(Config{GarbageCollectionInterval: 10 * time.Minute, PrometheusReportingInterval: 10 * time.Minute, PeerLifetime: 30 * time.Minute, RedisBroker: "redis://myRedis@127.0.0.1:6379/0"}) + rs, err := miniredis.Run() + if err != nil { + panic(err) + } + redisURL := fmt.Sprintf("redis://@%s/0", rs.Addr()) + ps, err := New(Config{ + GarbageCollectionInterval: 10 * time.Minute, + PrometheusReportingInterval: 10 * time.Minute, + PeerLifetime: 30 * time.Minute, + RedisBroker: redisURL, + RedisReadTimeout: 10 * time.Second, + RedisWriteTimeout: 10 * time.Second, + RedisConnectTimeout: 10 * time.Second}) if err != nil { panic(err) } diff --git a/storage/redis/redis.go b/storage/redis/redis.go new file mode 100644 index 0000000..714955b --- /dev/null +++ b/storage/redis/redis.go @@ -0,0 +1,135 @@ +package redis + +import ( + "errors" + "net/url" + "strconv" + "strings" + "time" + + "github.com/gomodule/redigo/redis" + redsync "gopkg.in/redsync.v1" +) + +// redisBackend represents a redis handler. +type redisBackend struct { + pool *redis.Pool + redsync *redsync.Redsync +} + +// newRedisBackend creates a redisBackend instance. +func newRedisBackend(cfg *Config, u *redisURL, socketPath string) *redisBackend { + rc := &redisConnector{ + URL: u, + SocketPath: socketPath, + ReadTimeout: cfg.RedisReadTimeout, + WriteTimeout: cfg.RedisWriteTimeout, + ConnectTimeout: cfg.RedisConnectTimeout, + } + pool := rc.NewPool() + redsync := redsync.New([]redsync.Pool{pool}) + return &redisBackend{ + pool: pool, + redsync: redsync, + } +} + +// open returns or creates instance of Redis connection. +func (rb *redisBackend) open() redis.Conn { + return rb.pool.Get() +} + +type redisConnector struct { + URL *redisURL + SocketPath string + ReadTimeout time.Duration + WriteTimeout time.Duration + ConnectTimeout time.Duration +} + +// NewPool returns a new pool of Redis connections +func (rc *redisConnector) NewPool() *redis.Pool { + return &redis.Pool{ + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := rc.open() + if err != nil { + return nil, err + } + + if rc.URL.DB != 0 { + _, err = c.Do("SELECT", rc.URL.DB) + if err != nil { + return nil, err + } + } + + return c, err + }, + // PINGs connections that have been idle more than 10 seconds + TestOnBorrow: func(c redis.Conn, t time.Time) error { + if time.Since(t) < time.Duration(10*time.Second) { + return nil + } + _, err := c.Do("PING") + return err + }, + } +} + +// Open a new Redis connection +func (rc *redisConnector) open() (redis.Conn, error) { + var opts = []redis.DialOption{ + redis.DialDatabase(rc.URL.DB), + redis.DialReadTimeout(rc.ReadTimeout), + redis.DialWriteTimeout(rc.WriteTimeout), + redis.DialConnectTimeout(rc.ConnectTimeout), + } + + if rc.URL.Password != "" { + opts = append(opts, redis.DialPassword(rc.URL.Password)) + } + + if rc.SocketPath != "" { + return redis.Dial("unix", rc.SocketPath, opts...) + } + + return redis.Dial("tcp", rc.URL.Host, opts...) +} + +// A redisURL represents a parsed redisURL +// The general form represented is: +// +// redis://[password@]host][/][db] +type redisURL struct { + Host string + Password string + DB int +} + +// parseRedisURL parse rawurl into redisURL +func parseRedisURL(target string) (*redisURL, error) { + var u *url.URL + u, err := url.Parse(target) + if err != nil { + return nil, err + } + if u.Scheme != "redis" { + return nil, errors.New("no redis scheme found") + } + + db := 0 //default redis db + parts := strings.Split(u.Path, "/") + if len(parts) != 1 { + db, err = strconv.Atoi(parts[1]) + if err != nil { + return nil, err + } + } + return &redisURL{ + Host: u.Host, + Password: u.User.String(), + DB: db, + }, nil +} From 9a5fac67ed39ab9e4d93c1bb533b68e14fc58ada Mon Sep 17 00:00:00 2001 From: onestraw Date: Thu, 3 Jan 2019 18:18:56 +0800 Subject: [PATCH 3/9] dep ensure udpate Change-Id: Icb2627d7e6e8fb916b481ed9d9a47daa40330698 --- Gopkg.lock | 190 ++++++++++++++++++++++++++++++++++++++++++++--------- Gopkg.toml | 12 ++++ 2 files changed, 171 insertions(+), 31 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 5392dd2..e4ace2f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,24 +2,48 @@ [[projects]] + digest = "1:a69ab3f1445ffd4815add4bd31ba05b65b3b9fec1ade5057d5d717f30e6efd6d" name = "github.com/SermoDigital/jose" packages = [ ".", "crypto", "jws", - "jwt" + "jwt", ] + pruneopts = "UT" revision = "f6df55f235c24f236d11dbcf665249a59ac2021f" version = "1.1" [[projects]] branch = "master" - name = "github.com/anacrolix/dht" - packages = ["krpc"] - revision = "cae37fd1842087605e61382e82e4f87fab27afdc" + digest = "1:7afff364b8e5e9f1085fe77ae5630b8e0f7482338a50535881aa0b433e48fb0b" + name = "github.com/alicebob/gopher-json" + packages = ["."] + pruneopts = "UT" + revision = "5a6b3ba71ee69b77cf64febf8b5a7526ca5eaef0" + +[[projects]] + digest = "1:b7cb0201d452c4a7079dc6e8673693c1153b115e906d50d85600c962fc6085a8" + name = "github.com/alicebob/miniredis" + packages = [ + ".", + "server", + ] + pruneopts = "UT" + revision = "3657542c8629876a1fa83e0b30a0246a67ffa652" + version = "v2.4.5" [[projects]] branch = "master" + digest = "1:5019de799334af68e1c4c38911c5e5603770dc879bf29e770561e7f5871ec639" + name = "github.com/anacrolix/dht" + packages = ["krpc"] + pruneopts = "UT" + revision = "b09db78595aaba14cd992537fbe992a4c5e0a141" + +[[projects]] + branch = "master" + digest = "1:8e3aed14c993c6d261a04d530e7de0a25113b8fef4c9bc4d8d427f512254a6a9" name = "github.com/anacrolix/missinggo" packages = [ ".", @@ -27,175 +51,279 @@ "httptoo", "mime", "pproffd", - "slices" + "slices", ] - revision = "60ef2fbf63df5d871ada2680d4d8a6013dcd1745" + pruneopts = "UT" + revision = "3237bf955fed3b69d69458bf21e0a63cb982339a" [[projects]] branch = "master" name = "github.com/anacrolix/torrent" packages = [ "bencode", - "tracker" + "tracker", ] revision = "4431464fd62c37843addc79822f7cd30b4467471" [[projects]] branch = "master" + digest = "1:d6afaeed1502aa28e80a4ed0981d570ad91b2579193404256ce672ed0a609e0d" name = "github.com/beorn7/perks" packages = ["quantile"] + pruneopts = "UT" revision = "3a771d992973f24aa725d07868b467d1ddfceafb" [[projects]] branch = "master" + digest = "1:7bfd6f49553eb9409ecde1d0311fdcc4944a887d9fff23415f74b7bd462d95a4" name = "github.com/bradfitz/iter" packages = ["."] + pruneopts = "UT" revision = "454541ec3da2a73fc34fd049b19ee5777bf19345" [[projects]] + digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" name = "github.com/davecgh/go-spew" packages = ["spew"] + pruneopts = "UT" revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" version = "v1.1.1" [[projects]] + digest = "1:97df918963298c287643883209a2c3f642e6593379f97ab400c2a2e219ab647d" name = "github.com/golang/protobuf" packages = ["proto"] + pruneopts = "UT" revision = "aa810b61a9c79d51363740d207bb46cf8e620ed5" version = "v1.2.0" [[projects]] - name = "github.com/huandu/xstrings" - packages = ["."] - revision = "55ae428c2ac4f74d7430952ef528631e656ac92c" - version = "v1.1.0" + digest = "1:38ec74012390146c45af1f92d46e5382b50531247929ff3a685d2b2be65155ac" + name = "github.com/gomodule/redigo" + packages = [ + "internal", + "redis", + ] + pruneopts = "UT" + revision = "9c11da706d9b7902c6da69c592f75637793fe121" + version = "v2.0.0" [[projects]] + digest = "1:f9a5e090336881be43cfc1cf468330c1bdd60abdc9dd194e0b1ab69f4b94dd7c" + name = "github.com/huandu/xstrings" + packages = ["."] + pruneopts = "UT" + revision = "f02667b379e2fb5916c3cda2cf31e0eb885d79f8" + version = "v1.2.0" + +[[projects]] + digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" name = "github.com/inconshreveable/mousetrap" packages = ["."] + pruneopts = "UT" revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" [[projects]] + digest = "1:f97285a3b0a496dcf8801072622230d513f69175665d94de60eb042d03387f6c" name = "github.com/julienschmidt/httprouter" packages = ["."] - revision = "8c199fb6259ffc1af525cc3ad52ee60ba8359669" - version = "v1.1" + pruneopts = "UT" + revision = "348b672cd90d8190f8240323e372ecd1e66b59dc" + version = "v1.2.0" [[projects]] + digest = "1:0a69a1c0db3591fcefb47f115b224592c8dfa4368b7ba9fae509d5e16cdc95c8" + name = "github.com/konsorten/go-windows-terminal-sequences" + packages = ["."] + pruneopts = "UT" + revision = "5c8c8bd35d3832f5d134ae1e1e375b69a4d25242" + version = "v1.0.1" + +[[projects]] + digest = "1:ff5ebae34cfbf047d505ee150de27e60570e8c394b3b8fdbb720ff6ac71985fc" name = "github.com/matttproud/golang_protobuf_extensions" packages = ["pbutil"] + pruneopts = "UT" revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" version = "v1.0.1" [[projects]] branch = "master" + digest = "1:bad8c08367e93a0a30d1090ac9bc98e7157df162d21b638cc72d28d331806f8c" name = "github.com/mendsley/gojwk" packages = ["."] + pruneopts = "UT" revision = "4d5ec6e58103388d6cb0d7d72bc72649be4f0504" [[projects]] branch = "master" + digest = "1:445210ef1e723060b3ebeb691648b5090f154992ee35f7065b70a6a5a96a3bb8" name = "github.com/minio/sha256-simd" packages = ["."] - revision = "ad98a36ba0da87206e3378c556abbfeaeaa98668" + pruneopts = "UT" + revision = "51976451ce1942acbb55707a983ed232fa027110" [[projects]] + digest = "1:40e195917a951a8bf867cd05de2a46aaf1806c50cf92eebf4c16f78cd196f747" name = "github.com/pkg/errors" packages = ["."] + pruneopts = "UT" revision = "645ef00459ed84a119197bfb8d8205042c6df63d" version = "v0.8.0" [[projects]] + digest = "1:0028cb19b2e4c3112225cd871870f2d9cf49b9b4276531f03438a88e94be86fe" name = "github.com/pmezard/go-difflib" packages = ["difflib"] + pruneopts = "UT" revision = "792786c7400a136282c1664665ae0a8db921c6c2" version = "v1.0.0" [[projects]] + digest = "1:b6221ec0f8903b556e127c449e7106b63e6867170c2d10a7c058623d086f2081" name = "github.com/prometheus/client_golang" packages = ["prometheus"] + pruneopts = "UT" revision = "c5b7fccd204277076155f10851dad72b76a49317" version = "v0.8.0" [[projects]] branch = "master" + digest = "1:2d5cd61daa5565187e1d96bae64dbbc6080dacf741448e9629c64fd93203b0d4" name = "github.com/prometheus/client_model" packages = ["go"] + pruneopts = "UT" revision = "5c3871d89910bfb32f5fcab2aa4b9ec68e65a99f" [[projects]] branch = "master" + digest = "1:33c81bf709f084827a560e14415b1d7001a6d1d6fa4bec2cc2e60b84ecbc0e0a" name = "github.com/prometheus/common" packages = [ "expfmt", "internal/bitbucket.org/ww/goautoneg", - "model" + "model", ] - revision = "c7de2306084e37d54b8be01f3541a8464345e9a5" + pruneopts = "UT" + revision = "67670fe90761d7ff18ec1d640135e53b9198328f" [[projects]] branch = "master" + digest = "1:08eb8b60450efe841e37512d66ce366a87d187505d7c67b99307a6c1803483a2" name = "github.com/prometheus/procfs" packages = [ ".", "internal/util", "nfs", - "xfs" + "xfs", ] - revision = "05ee40e3a273f7245e8777337fc7b46e533a9a92" + pruneopts = "UT" + revision = "14fa7590c24d4615893b68e22fce3b3489689f65" [[projects]] + digest = "1:69b1cc331fca23d702bd72f860c6a647afd0aa9fcbc1d0659b1365e26546dd70" name = "github.com/sirupsen/logrus" packages = ["."] - revision = "3e01752db0189b9157070a0e1668a620f9a85da2" - version = "v1.0.6" + pruneopts = "UT" + revision = "bcd833dfe83d3cebad139e4a29ed79cb2318bf95" + version = "v1.2.0" [[projects]] + digest = "1:645cabccbb4fa8aab25a956cbcbdf6a6845ca736b2c64e197ca7cbb9d210b939" name = "github.com/spf13/cobra" packages = ["."] + pruneopts = "UT" revision = "ef82de70bb3f60c65fb8eebacbb2d122ef517385" version = "v0.0.3" [[projects]] + digest = "1:c1b1102241e7f645bc8e0c22ae352e8f0dc6484b6cb4d132fa9f24174e0119e2" name = "github.com/spf13/pflag" packages = ["."] - revision = "9a97c102cda95a86cec2345a6f09f55a939babf5" - version = "v1.0.2" + pruneopts = "UT" + revision = "298182f68c66c05229eb03ac171abe6e309ee79a" + version = "v1.0.3" [[projects]] + digest = "1:c40d65817cdd41fac9aa7af8bed56927bb2d6d47e4fea566a74880f5c2b1c41e" name = "github.com/stretchr/testify" packages = [ "assert", - "require" + "require", ] + pruneopts = "UT" revision = "f35b8ab0b5a2cef36673838d662e249dd9c94686" version = "v1.2.2" [[projects]] branch = "master" - name = "golang.org/x/crypto" - packages = ["ssh/terminal"] - revision = "182538f80094b6a8efaade63a8fd8e0d9d5843dd" + digest = "1:7d2aad811396da92f23df2890ed55cd8b5ccd2ecfd77ea34fc9e9f9c3a207d22" + name = "github.com/yuin/gopher-lua" + packages = [ + ".", + "ast", + "parse", + "pm", + ] + pruneopts = "UT" + revision = "1e6e6e1918e02ddf02e667e88e8aa756942448c5" [[projects]] branch = "master" + digest = "1:38f553aff0273ad6f367cb0a0f8b6eecbaef8dc6cb8b50e57b6a81c1d5b1e332" + name = "golang.org/x/crypto" + packages = ["ssh/terminal"] + pruneopts = "UT" + revision = "8d7daa0c54b357f3071e11eaef7efc4e19a417e2" + +[[projects]] + branch = "master" + digest = "1:191cccd950a4aeadb60306062f2bdc2f924d750d0156ec6c691b17211bfd7349" name = "golang.org/x/sys" packages = [ "unix", - "windows" + "windows", ] - revision = "49385e6e15226593f68b26af201feec29d5bba22" + pruneopts = "UT" + revision = "82a175fd1598e8a172e58ebdf5ed262bb29129e5" [[projects]] + digest = "1:51fd139389bbbd2c9e0eb4e139c3719c8dabf4e6679f4f9f9916e03bb55306e2" + name = "gopkg.in/redsync.v1" + packages = ["."] + pruneopts = "UT" + revision = "89538344de92e78df4b7eeeceec1f3cf2d7c0aeb" + version = "v1.1.1" + +[[projects]] + digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96" name = "gopkg.in/yaml.v2" packages = ["."] - revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" - version = "v2.2.1" + pruneopts = "UT" + revision = "51d6538a90f86fe93ac480b35f37b2be17fef232" + version = "v2.2.2" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "073ea42bedc8b51977e04bb4e86deb764315fa1b904e1da4cf48e2247a056caf" + input-imports = [ + "github.com/SermoDigital/jose/crypto", + "github.com/SermoDigital/jose/jws", + "github.com/SermoDigital/jose/jwt", + "github.com/alicebob/miniredis", + "github.com/anacrolix/torrent/tracker", + "github.com/gomodule/redigo/redis", + "github.com/julienschmidt/httprouter", + "github.com/mendsley/gojwk", + "github.com/minio/sha256-simd", + "github.com/pkg/errors", + "github.com/prometheus/client_golang/prometheus", + "github.com/sirupsen/logrus", + "github.com/spf13/cobra", + "github.com/stretchr/testify/require", + "gopkg.in/redsync.v1", + "gopkg.in/yaml.v2", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 8f59b1f..467d09a 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -69,6 +69,18 @@ name = "gopkg.in/yaml.v2" version = "2.2.1" +[[constraint]] + name = "gopkg.in/redsync.v1" + version = "1.1.1" + +[[constraint]] + name = "github.com/gomodule/redigo" + version = "2.0.0" + +[[constraint]] + name = "github.com/alicebob/miniredis" + version = "2.4.5" + [prune] go-tests = true unused-packages = true From f2ab706f10701603e8b414f970e27d5f4ce6762c Mon Sep 17 00:00:00 2001 From: onestraw Date: Wed, 16 Jan 2019 16:45:04 +0800 Subject: [PATCH 4/9] fix hlen/hdel race condition Change-Id: Ib82e6e9f0c66d2df80d68fd00e0c6ec6b46a037b --- storage/redis/peer_store.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index fde14c8..3db858b 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -727,23 +727,34 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { return err } + // use WATCH to avoid race condition + // https://redis.io/topics/transactions + _, err = conn.Do("WATCH", ihStr) + if err != nil { + return err + } ihLen, err := conn.Do("HLEN", ihStr) if err != nil { return err } if ihLen.(int64) == 0 { - _, err := conn.Do("DEL", ihStr) + // Empty hashes are not shown among existing keys, + // in other words, it's removed automatically after `HDEL` the last field. + //_, err := conn.Do("DEL", ihStr) + + conn.Send("MULTI") + conn.Send("HDEL", group, ihStr) + conn.Send("DECR", ps.infohashCountKey(group)) + _, err = redis.Values(conn.Do("EXEC")) if err != nil { - return err - } - log.Debug("storage: deleting infohash", log.Fields{ - "Group": group, - "Hkey": ihStr, - }) - _, err = conn.Do("HDEL", group, ihStr) - if err != nil { - return err + log.Error("storage: Redis EXEC failure, maybe caused by WATCH, ignored", log.Fields{ + "group": group, + "infohash": ihStr, + "error": err, + }) } + } else { + conn.Do("UNWATCH", ihStr) } } } From fa19ffd0509ccf41f9edb09836f4710105284239 Mon Sep 17 00:00:00 2001 From: onestraw Date: Sun, 20 Jan 2019 17:02:05 +0800 Subject: [PATCH 5/9] add @mrd0ll4r 's comments Change-Id: I53616703394f889fa2d0a4e952ac857d99c85218 --- storage/redis/peer_store.go | 44 ++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index 3db858b..e07f43a 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -662,6 +662,46 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa // // This function must be able to execute while other methods on this interface // are being executed in parallel. +// +// - The Delete(Seeder|Leecher) and GraduateLeecher methods never delete an +// infohash key from an addressFamily hash. They also never decrement the +// infohash counter. +// - The Put(Seeder|Leecher) and GraduateLeecher methods only ever add infohash +// keys to addressFamily hashes and increment the infohash counter. +// - The only method that deletes from the addressFamily hashes is +// collectGarbage, which also decrements the counters. That means that, +// even if a Delete(Seeder|Leecher) call removes the last peer from a swarm, +// the infohash counter is not changed and the infohash is left in the +// addressFamily hash until it will be cleaned up by collectGarbage. +// - collectGarbage must run regularly. +// - A WATCH ... MULTI ... EXEC block fails, if between the WATCH and the 'EXEC' +// any of the watched keys have changed. The location of the 'MULTI' doesn't +// matter. +// +// We have to analyze four cases to prove our algorithm works. I'll characterize +// them by a tuple (number of peers in a swarm before WATCH, number of peers in +// the swarm during the transaction). +// +// 1. (0,0), the easy case: The swarm is empty, we watch the key, we execute +// HLEN and find it empty. We remove it and decrement the counter. It stays +// empty the entire time, the transaction goes through. +// 2. (1,n > 0): The swarm is not empty, we watch the key, we find it non-empty, +// we unwatch the key. All good. No transaction is made, no transaction fails. +// 3. (0,1): We have to analyze this in two ways. +// - If the change happens before the HLEN call, we will see that the swarm is +// not empty and start no transaction. +// - If the change happens after the HLEN, we will attempt a transaction and it +// will fail. This is okay, the swarm is not empty, we will try cleaning it up +// next time collectGarbage runs. +// 4. (1,0): Again, two ways: +// - If the change happens before the HLEN, we will see an empty swarm. This +// situation happens if a call to Delete(Seeder|Leecher) removed the last +// peer asynchronously. We will attempt a transaction, but the transaction +// will fail. This is okay, the infohash key will remain in the addressFamily +// hash, we will attempt to clean it up the next time 'collectGarbage` runs. +// - If the change happens after the HLEN, we will not even attempt to make the +// transaction. The infohash key will remain in the addressFamil hash and +// we'll attempt to clean it up the next time collectGarbage runs. func (ps *peerStore) collectGarbage(cutoff time.Time) error { select { case <-ps.closed: @@ -744,7 +784,9 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { conn.Send("MULTI") conn.Send("HDEL", group, ihStr) - conn.Send("DECR", ps.infohashCountKey(group)) + if isSeeder { + conn.Send("DECR", ps.infohashCountKey(group)) + } _, err = redis.Values(conn.Do("EXEC")) if err != nil { log.Error("storage: Redis EXEC failure, maybe caused by WATCH, ignored", log.Fields{ From 9d22b67f74093e8e3df0983933c83c7fea53f486 Mon Sep 17 00:00:00 2001 From: onestraw Date: Mon, 21 Jan 2019 13:24:43 +0800 Subject: [PATCH 6/9] storage/redis: use redis reply Change-Id: If6e6c2545b12c249413d3d13ea41e127b8d1d9b0 --- storage/redis/peer_store.go | 90 +++++++++++++++++++------------------ 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index e07f43a..60564a2 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -202,7 +202,9 @@ func New(provided Config) (storage.PeerStore, error) { case <-time.After(cfg.GarbageCollectionInterval): before := time.Now().Add(-cfg.PeerLifetime) log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) - ps.collectGarbage(before) + if err = ps.collectGarbage(before); err != nil { + log.Error("storage: collectGarbage error", log.Fields{"before": before, "error": err}) + } } } }() @@ -298,29 +300,29 @@ func (ps *peerStore) populateProm() { defer conn.Close() for _, group := range ps.groups() { - if n, err := conn.Do("GET", ps.infohashCountKey(group)); err != nil { + if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && err != redis.ErrNil { log.Error("storage: GET counter failure", log.Fields{ "key": ps.infohashCountKey(group), "error": err, }) } else { - numInfohashes += n.(int64) + numInfohashes += n } - if n, err := conn.Do("GET", ps.seederCountKey(group)); err != nil { + if n, err := redis.Int64(conn.Do("GET", ps.seederCountKey(group))); err != nil && err != redis.ErrNil { log.Error("storage: GET counter failure", log.Fields{ "key": ps.seederCountKey(group), "error": err, }) } else { - numSeeders += n.(int64) + numSeeders += n } - if n, err := conn.Do("GET", ps.leecherCountKey(group)); err != nil { + if n, err := redis.Int64(conn.Do("GET", ps.leecherCountKey(group))); err != nil && err != redis.ErrNil { log.Error("storage: GET counter failure", log.Fields{ "key": ps.leecherCountKey(group), "error": err, }) } else { - numLeechers += n.(int64) + numLeechers += n } } @@ -357,20 +359,20 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error conn.Send("MULTI") conn.Send("HSET", encodedSeederInfoHash, pk, ct) conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) - reply, err := redis.Values(conn.Do("EXEC")) + reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err } // pk is a new field. - if reply[0].(int64) == 1 { + if reply[0] == 1 { _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) if err != nil { return err } } // encodedSeederInfoHash is a new field. - if reply[1].(int64) == 1 { + if reply[1] == 1 { _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) if err != nil { return err @@ -400,11 +402,11 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) - delNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) + delNum, err := redis.Int64(conn.Do("HDEL", encodedSeederInfoHash, pk)) if err != nil { return err } - if delNum.(int64) == 0 { + if delNum == 0 { return storage.ErrResourceDoesNotExist } if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil { @@ -438,12 +440,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error conn.Send("MULTI") conn.Send("HSET", encodedLeecherInfoHash, pk, ct) conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct) - reply, err := redis.Values(conn.Do("EXEC")) + reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err } // pk is a new field. - if reply[0].(int64) == 1 { + if reply[0] == 1 { _, err = conn.Do("INCR", ps.leecherCountKey(addressFamily)) if err != nil { return err @@ -471,11 +473,11 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er pk := newPeerKey(p) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) - delNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) + delNum, err := redis.Int64(conn.Do("HDEL", encodedLeecherInfoHash, pk)) if err != nil { return err } - if delNum.(int64) == 0 { + if delNum == 0 { return storage.ErrResourceDoesNotExist } if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil { @@ -511,23 +513,23 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) conn.Send("HDEL", encodedLeecherInfoHash, pk) conn.Send("HSET", encodedSeederInfoHash, pk, ct) conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) - reply, err := redis.Values(conn.Do("EXEC")) + reply, err := redis.Int64s(conn.Do("EXEC")) if err != nil { return err } - if reply[0].(int64) == 1 { + if reply[0] == 1 { _, err = conn.Do("DECR", ps.leecherCountKey(addressFamily)) if err != nil { return err } } - if reply[1].(int64) == 1 { + if reply[1] == 1 { _, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) if err != nil { return err } } - if reply[2].(int64) == 1 { + if reply[2] == 1 { _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) if err != nil { return err @@ -633,7 +635,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa conn := ps.rb.open() defer conn.Close() - leechersLen, err := conn.Do("HLEN", encodedLeecherInfoHash) + leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash)) if err != nil { log.Error("storage: Redis HLEN failure", log.Fields{ "Hkey": encodedLeecherInfoHash, @@ -642,7 +644,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa return } - seedersLen, err := conn.Do("HLEN", encodedSeederInfoHash) + seedersLen, err := redis.Int64(conn.Do("HLEN", encodedSeederInfoHash)) if err != nil { log.Error("storage: Redis HLEN failure", log.Fields{ "Hkey": encodedSeederInfoHash, @@ -651,8 +653,8 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa return } - resp.Incomplete = uint32(leechersLen.(int64)) - resp.Complete = uint32(seedersLen.(int64)) + resp.Incomplete = uint32(leechersLen) + resp.Complete = uint32(seedersLen) return } @@ -717,45 +719,41 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { for _, group := range ps.groups() { // list all infohashes in the group - infohashesList, err := conn.Do("HKEYS", group) + infohashesList, err := redis.Strings(conn.Do("HKEYS", group)) if err != nil { return err } - infohashes := infohashesList.([]interface{}) - for _, ih := range infohashes { - ihStr := string(ih.([]byte)) + for _, ihStr := range infohashesList { isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S" // list all (peer, timeout) pairs for the ih - ihList, err := conn.Do("HGETALL", ihStr) + ihList, err := redis.Strings(conn.Do("HGETALL", ihStr)) if err != nil { return err } - conIhList := ihList.([]interface{}) var pk serializedPeer var removedPeerCount int64 - for index, ihField := range conIhList { + for index, ihField := range ihList { if index%2 == 1 { // value - mtime, err := strconv.ParseInt(string(ihField.([]byte)), 10, 64) + mtime, err := strconv.ParseInt(ihField, 10, 64) if err != nil { return err } if mtime <= cutoffUnix { + log.Debug("storage: deleting peer", log.Fields{ + "Peer": decodePeerKey(pk).String(), + }) ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk)) if err != nil { return err } removedPeerCount += ret - - log.Debug("storage: deleting peer", log.Fields{ - "Peer": decodePeerKey(pk).String(), - }) } } else { // key - pk = serializedPeer(ihField.([]byte)) + pk = serializedPeer([]byte(ihField)) } } // DECR seeder/leecher counter @@ -763,8 +761,10 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { if isSeeder { decrCounter = ps.seederCountKey(group) } - if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil { - return err + if removedPeerCount > 0 { + if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil { + return err + } } // use WATCH to avoid race condition @@ -773,11 +773,11 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { if err != nil { return err } - ihLen, err := conn.Do("HLEN", ihStr) + ihLen, err := redis.Int64(conn.Do("HLEN", ihStr)) if err != nil { return err } - if ihLen.(int64) == 0 { + if ihLen == 0 { // Empty hashes are not shown among existing keys, // in other words, it's removed automatically after `HDEL` the last field. //_, err := conn.Do("DEL", ihStr) @@ -788,15 +788,17 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { conn.Send("DECR", ps.infohashCountKey(group)) } _, err = redis.Values(conn.Do("EXEC")) - if err != nil { - log.Error("storage: Redis EXEC failure, maybe caused by WATCH, ignored", log.Fields{ + if err != nil && err != redis.ErrNil { + log.Error("storage: Redis EXEC failure", log.Fields{ "group": group, "infohash": ihStr, "error": err, }) } } else { - conn.Do("UNWATCH", ihStr) + if _, err = conn.Do("UNWATCH"); err != nil && err != redis.ErrNil { + log.Error("storage: Redis UNWATCH failure", log.Fields{"error": err}) + } } } } From 7943288678954ef28de69ff6695e9d6b8133c5b2 Mon Sep 17 00:00:00 2001 From: onestraw Date: Tue, 22 Jan 2019 18:24:29 +0800 Subject: [PATCH 7/9] Add storage redis configuration example Change-Id: I908c69d50fab0963f508f3688ec5934a25aa0550 --- example_config.yaml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/example_config.yaml b/example_config.yaml index 01e2519..d9eb484 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -127,6 +127,32 @@ chihaya: # are collected and posted to Prometheus. prometheus_reporting_interval: 1s + # This block defines configuration used for redis storage. + # storage: + # name: redis + # config: + # # The frequency which stale peers are removed. + # gc_interval: 14m + + # # The frequency which metrics are pushed into a local Prometheus endpoint. + # prometheus_reporting_interval: 1s + + # # The amount of time until a peer is considered stale. + # # To avoid churn, keep this slightly larger than `announce_interval` + # peer_lifetime: 16m + + # # The address of redis storage. + # redis_broker: "redis://pwd@127.0.0.1:6379/0" + + # # The timeout for reading a command reply from redis. + # redis_read_timeout: 15s + + # # The timeout for writing a command to redis. + # redis_write_timeout: 15s + + # # The timeout for connecting to redis server. + # redis_connect_timeout: 15s + # This block defines configuration used for middleware executed before a # response has been returned to a BitTorrent client. prehooks: From e83f68b952d3724429dbc70078aa1ce5de5481f9 Mon Sep 17 00:00:00 2001 From: onestraw Date: Wed, 23 Jan 2019 13:10:37 +0800 Subject: [PATCH 8/9] storage/redis: add exiting info Change-Id: I7876bf420a35b48314d14925f5f2ae591fa2d243 --- storage/redis/peer_store.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/storage/redis/peer_store.go b/storage/redis/peer_store.go index 60564a2..59c2413 100644 --- a/storage/redis/peer_store.go +++ b/storage/redis/peer_store.go @@ -815,9 +815,8 @@ func (ps *peerStore) Stop() stop.Result { go func() { close(ps.closed) ps.wg.Wait() - // chihaya does not clear data in redis when exiting. - // chihaya keys have prefix `IPv{4,6}_`. - close(c) + log.Info("storage: exiting. chihaya does not clear data in redis when exiting. chihaya keys have prefix 'IPv{4,6}_'.") + c.Done() }() return c.Result() From 36e0204a8f5ed9e8bcb349dff249f4f9fc9b70c8 Mon Sep 17 00:00:00 2001 From: onestraw Date: Fri, 1 Feb 2019 11:14:51 +0800 Subject: [PATCH 9/9] Clarification of Redis storage HA Change-Id: I6674421d3afdbfab6acd97851cb062341b88a90e --- docs/storage/redis.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/storage/redis.md b/docs/storage/redis.md index 552939c..25c70bc 100644 --- a/docs/storage/redis.md +++ b/docs/storage/redis.md @@ -1,6 +1,8 @@ # Redis Storage -This storage system separates chihaya from storage and stores all peer data in Redis to achieve HA. +This storage system separates chihaya server from storage service, chihaya server achieves HA by storing all peer data in Redis, the storage service gets HA by creating cluster. If one chihaya node goes down, peer data will still be available in Redis service. + +The HA of storage service is not considered here, it's another topic. In case Redis service is a single node, peer data will be unavailable if the node is down. So you should setup a Redis cluster for chihaya server in production. ## Use Case