package cluster import ( "io/ioutil" baselog "log" "sort" "time" "github.com/lbryio/lbry.go/v2/extras/crypto" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/hashicorp/serf/serf" log "github.com/sirupsen/logrus" ) const ( DefaultPort = 17946 MembershipChangeBufferWindow = 1 * time.Second ) // Cluster maintains cluster membership and notifies on certain events type Cluster struct { OnMembershipChange func(n, total int) name string port int seedAddr string s *serf.Serf eventCh chan serf.Event stop *stop.Group } // New returns a new Cluster instance that is not connected. func New(port int, seedAddr string) *Cluster { return &Cluster{ name: crypto.RandString(12), port: port, seedAddr: seedAddr, stop: stop.New(), } } // Connect Initializes the Cluster based on a configuration passed via the New function. It then stores the seed // address, starts gossiping and listens for gossip. func (c *Cluster) Connect() error { var err error conf := serf.DefaultConfig() conf.MemberlistConfig.BindPort = c.port conf.MemberlistConfig.AdvertisePort = c.port conf.NodeName = c.name nullLogger := baselog.New(ioutil.Discard, "", 0) conf.Logger = nullLogger c.eventCh = make(chan serf.Event) conf.EventCh = c.eventCh c.s, err = serf.Create(conf) if err != nil { return errors.Prefix("couldn't create cluster", err) } if c.seedAddr != "" { _, err = c.s.Join([]string{c.seedAddr}, true) if err != nil { return err } } c.stop.Add(1) go func() { c.listen() c.stop.Done() }() log.Debugf("cluster started") return nil } // Shutdown safely shuts down the cluster. func (c *Cluster) Shutdown() { log.Debug("shutting down cluster...") c.stop.StopAndWait() err := c.s.Leave() if err != nil { log.Error(errors.Prefix("shutting down cluster", err)) } log.Debugf("cluster stopped") } func (c *Cluster) listen() { var timerCh <-chan time.Time timer := time.NewTimer(0) for { select { case <-c.stop.Ch(): return case event := <-c.eventCh: switch event.EventType() { case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: // // ignore event from my own joining of the cluster //memberEvent := event.(serf.MemberEvent) //if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == c.name { // continue //} if timerCh == nil { timer.Reset(MembershipChangeBufferWindow) timerCh = timer.C } } case <-timerCh: if c.OnMembershipChange != nil { alive := getAliveMembers(c.s.Members()) c.OnMembershipChange(getHashInterval(c.name, alive), len(alive)) } timerCh = nil } } } func getHashInterval(myName string, members []serf.Member) int { var names []string for _, m := range members { names = append(names, m.Name) } sort.Strings(names) i := 1 for _, n := range names { if n == myName { return i } i++ } return -1 } func getAliveMembers(members []serf.Member) []serf.Member { var alive []serf.Member for _, m := range members { if m.Status == serf.StatusAlive { alive = append(alive, m) } } return alive }