use serf to track cluster members, update hash range on membership change. closes lbryio/reflector-cluster#47
This commit is contained in:
parent
0d458aefc3
commit
d167213d54
5 changed files with 170 additions and 6 deletions
30
cluster/cluster.go
Normal file
30
cluster/cluster.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package cluster
|
||||
|
||||
import (
|
||||
"github.com/lbryio/errors.go"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func Connect(nodeName, addr string, port int) (*serf.Serf, <-chan serf.Event, error) {
|
||||
conf := serf.DefaultConfig()
|
||||
conf.MemberlistConfig.BindPort = port
|
||||
conf.MemberlistConfig.AdvertisePort = port
|
||||
conf.NodeName = nodeName
|
||||
|
||||
eventCh := make(chan serf.Event)
|
||||
conf.EventCh = eventCh
|
||||
|
||||
cluster, err := serf.Create(conf)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Prefix("couldn't create cluster", err)
|
||||
}
|
||||
|
||||
_, err = cluster.Join([]string{addr}, true)
|
||||
if err != nil {
|
||||
log.Warnf("couldn't join cluster, starting own: %v\n", err)
|
||||
}
|
||||
|
||||
return cluster, eventCh, nil
|
||||
}
|
122
cmd/cluster.go
Normal file
122
cmd/cluster.go
Normal file
|
@ -0,0 +1,122 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/lbryio/internal-apis/app/crypto"
|
||||
"github.com/lbryio/reflector.go/cluster"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
const (
|
||||
clusterStart = "start"
|
||||
clusterJoin = "join"
|
||||
|
||||
clusterPort = 17946
|
||||
)
|
||||
|
||||
func init() {
|
||||
var cmd = &cobra.Command{
|
||||
Use: "cluster [start|join]",
|
||||
Short: "Connect to cluster",
|
||||
ValidArgs: []string{clusterStart, clusterJoin},
|
||||
Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs),
|
||||
Run: clusterCmd,
|
||||
}
|
||||
RootCmd.AddCommand(cmd)
|
||||
}
|
||||
|
||||
func clusterCmd(cmd *cobra.Command, args []string) {
|
||||
var c *serf.Serf
|
||||
var eventCh <-chan serf.Event
|
||||
var err error
|
||||
|
||||
nodeName := crypto.RandString(12)
|
||||
clusterAddr := "127.0.0.1:" + strconv.Itoa(clusterPort)
|
||||
if args[0] == clusterStart {
|
||||
c, eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort)
|
||||
} else {
|
||||
c, eventCh, err = cluster.Connect(nodeName, clusterAddr, clusterPort+1+rand.Intn(1000))
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer c.Leave()
|
||||
|
||||
shutdownCh := make(chan struct{})
|
||||
var shutdownWg sync.WaitGroup
|
||||
|
||||
shutdownWg.Add(1)
|
||||
go func() {
|
||||
defer shutdownWg.Done()
|
||||
for {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
spew.Dump(event)
|
||||
switch event.EventType() {
|
||||
case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave:
|
||||
memberEvent := event.(serf.MemberEvent)
|
||||
if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == nodeName {
|
||||
// ignore event from my own joining of the cluster
|
||||
} else {
|
||||
spew.Dump(c.Members())
|
||||
log.Printf("my hash range is now %d\n", getHashRangeStart(nodeName, getAliveMembers(c.Members())))
|
||||
// figure out my new hash range based on the start and the number of alive members
|
||||
// get hashes in that range that need announcing
|
||||
// announce them
|
||||
// if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time
|
||||
}
|
||||
}
|
||||
case <-shutdownCh:
|
||||
log.Debugln("shutting down event dumper")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
<-interruptChan
|
||||
log.Debugln("received interrupt")
|
||||
close(shutdownCh)
|
||||
log.Debugln("waiting for threads to finish")
|
||||
shutdownWg.Wait()
|
||||
log.Debugln("shutting down main thread")
|
||||
}
|
||||
|
||||
func getHashRangeStart(myName string, members []serf.Member) int {
|
||||
names := []string{}
|
||||
for _, m := range members {
|
||||
names = append(names, m.Name)
|
||||
}
|
||||
|
||||
sort.Strings(names)
|
||||
i := 1
|
||||
for _, n := range names {
|
||||
if n == myName {
|
||||
return i
|
||||
}
|
||||
i++
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func getAliveMembers(members []serf.Member) []serf.Member {
|
||||
alive := []serf.Member{}
|
||||
for _, m := range members {
|
||||
if m.Status == serf.StatusAlive {
|
||||
alive = append(alive, m)
|
||||
}
|
||||
}
|
||||
return alive
|
||||
}
|
|
@ -1,23 +1,23 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/peer"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func init() {
|
||||
var peerCmd = &cobra.Command{
|
||||
var cmd = &cobra.Command{
|
||||
Use: "peer",
|
||||
Short: "Run peer server",
|
||||
Run: peerCmd,
|
||||
}
|
||||
RootCmd.AddCommand(peerCmd)
|
||||
RootCmd.AddCommand(cmd)
|
||||
}
|
||||
|
||||
func peerCmd(cmd *cobra.Command, args []string) {
|
||||
|
|
|
@ -1,23 +1,23 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/reflector"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
func init() {
|
||||
var reflectorCmd = &cobra.Command{
|
||||
var cmd = &cobra.Command{
|
||||
Use: "reflector",
|
||||
Short: "Run reflector server",
|
||||
Run: reflectorCmd,
|
||||
}
|
||||
RootCmd.AddCommand(reflectorCmd)
|
||||
RootCmd.AddCommand(cmd)
|
||||
}
|
||||
|
||||
func reflectorCmd(cmd *cobra.Command, args []string) {
|
||||
|
|
12
cmd/root.go
12
cmd/root.go
|
@ -40,3 +40,15 @@ func checkErr(err error) {
|
|||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func argFuncs(funcs ...cobra.PositionalArgs) cobra.PositionalArgs {
|
||||
return func(cmd *cobra.Command, args []string) error {
|
||||
for _, f := range funcs {
|
||||
err := f(cmd, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue