implemented stopper pattern

-made defer adjustments inline and deleted the separate function.
-adjusted method in upload to take the only parameter it requires.
-Implemented stopper param for reflector server
-Aligned Cluster New to NewCluster
-Adjusted DHT to use StopAndWait
-Removed blocking waitgroup add
-Unified all components under prism.
-Moved defer done outside of functions.
-renamed NewCluster to New
-fixed travis errors.
This commit is contained in:
Mark Beamer Jr 2018-06-06 23:48:07 -04:00 committed by Alex Grintsvayg
parent 8859733101
commit a2a0b27bc4
10 changed files with 151 additions and 102 deletions

View file

@ -66,8 +66,11 @@ func (c *Cluster) Connect() error {
return err return err
} }
} }
c.stop.Add(1)
c.listen() go func() {
defer c.stop.Done()
c.listen()
}()
return nil return nil
} }
@ -80,33 +83,29 @@ func (c *Cluster) Shutdown() {
} }
func (c *Cluster) listen() { func (c *Cluster) listen() {
c.stop.Add(1) for {
go func() { select {
defer c.stop.Done() case <-c.stop.Ch():
for { return
select { case event := <-c.eventCh:
case <-c.stop.Ch(): switch event.EventType() {
return case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave:
case event := <-c.eventCh: memberEvent := event.(serf.MemberEvent)
switch event.EventType() { if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == c.name {
case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: // ignore event from my own joining of the cluster
memberEvent := event.(serf.MemberEvent) continue
if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == c.name {
// ignore event from my own joining of the cluster
continue
}
//spew.Dump(c.Members())
alive := getAliveMembers(c.s.Members())
log.Printf("%s: my hash range is now %d of %d\n", c.name, getHashRangeStart(c.name, alive), len(alive))
// figure out my new hash range based on the start and the number of alive members
// get hashes in that range that need announcing
// announce them
// if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time
} }
//spew.Dump(c.Members())
alive := getAliveMembers(c.s.Members())
log.Printf("%s: my hash range is now %d of %d\n", c.name, getHashRangeStart(c.name, alive), len(alive))
// figure out my new hash range based on the start and the number of alive members
// get hashes in that range that need announcing
// announce them
// if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time
} }
} }
}() }
} }
func getHashRangeStart(myName string, members []serf.Member) int { func getHashRangeStart(myName string, members []serf.Member) int {

View file

@ -16,7 +16,7 @@ import (
func init() { func init() {
var cmd = &cobra.Command{ var cmd = &cobra.Command{
Use: "cluster [start|join]", Use: "cluster [start|join]",
Short: "Connect to cluster", Short: "Start(join) to or Start a new cluster",
ValidArgs: []string{"start", "join"}, ValidArgs: []string{"start", "join"},
Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs), Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs),
Run: clusterCmd, Run: clusterCmd,

View file

@ -1,7 +1,10 @@
package cmd package cmd
import ( import (
"os"
"os/signal"
"strconv" "strconv"
"syscall"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer"
@ -27,5 +30,13 @@ func peerCmd(cmd *cobra.Command, args []string) {
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db) combo := store.NewDBBackedS3Store(s3, db)
log.Fatal(peer.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(peer.DefaultPort))) peerServer := peer.NewServer(combo)
if err := peerServer.Start("localhost:" + strconv.Itoa(peer.DefaultPort)); err != nil {
log.Fatal(err)
}
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan
peerServer.Shutdown()
} }

View file

@ -1,13 +1,16 @@
package cmd package cmd
import ( import (
"os"
"os/signal"
"strconv" "strconv"
"syscall"
"github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/reflector"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -27,5 +30,13 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
combo := store.NewDBBackedS3Store(s3, db) combo := store.NewDBBackedS3Store(s3, db)
log.Fatal(reflector.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(reflector.DefaultPort))) reflectorServer := reflector.NewServer(combo)
if err := reflectorServer.Start("localhost:" + strconv.Itoa(reflector.DefaultPort)); err != nil {
log.Fatal(err)
}
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan
reflectorServer.Shutdown()
} }

View file

@ -24,10 +24,10 @@ func init() {
} }
func startCmd(cmd *cobra.Command, args []string) { func startCmd(cmd *cobra.Command, args []string) {
log.SetLevel(log.DebugLevel)
db := new(db.SQL) db := new(db.SQL)
err := db.Connect(globalConfig.DBConn) err := db.Connect(globalConfig.DBConn)
checkErr(err) checkErr(err)
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
comboStore := store.NewDBBackedS3Store(s3, db) comboStore := store.NewDBBackedS3Store(s3, db)
@ -37,12 +37,9 @@ func startCmd(cmd *cobra.Command, args []string) {
} }
p := reflector.NewPrism(comboStore, clusterAddr) p := reflector.NewPrism(comboStore, clusterAddr)
err = p.Connect() if err = p.Start(); err != nil {
if err != nil { log.Fatal(err)
log.Error(err)
return
} }
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
<-interruptChan <-interruptChan

View file

@ -135,6 +135,11 @@ func (dht *DHT) join() {
// now call iterativeFind on yourself // now call iterativeFind on yourself
nf := newContactFinder(dht.node, dht.node.id, false) nf := newContactFinder(dht.node, dht.node.id, false)
// stop if dht is stopped
go func(finder *contactFinder) {
<-dht.stop.Ch()
nf.Cancel()
}(nf)
_, err := nf.Find() _, err := nf.Find()
if err != nil { if err != nil {
log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error())
@ -151,14 +156,20 @@ func (dht *DHT) Start() error {
return errors.Err(err) return errors.Err(err)
} }
conn := listener.(*net.UDPConn) conn := listener.(*net.UDPConn)
err = dht.node.Connect(conn) err = dht.node.Connect(conn)
if err != nil { if err != nil {
return err return err
} }
dht.stop.Add(1)
dht.join() go func() {
dht.startReannouncer() defer dht.stop.Done()
dht.join()
}()
dht.stop.Add(1)
go func() {
defer dht.stop.Done()
dht.startReannouncer()
}()
log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count())
return nil return nil
@ -175,8 +186,7 @@ func (dht *DHT) WaitUntilJoined() {
// Shutdown shuts down the dht // Shutdown shuts down the dht
func (dht *DHT) Shutdown() { func (dht *DHT) Shutdown() {
log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort()) log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort())
dht.stop.Stop() dht.stop.StopAndWait()
dht.stop.Wait()
dht.node.Shutdown() dht.node.Shutdown()
log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) log.Debugf("[%s] DHT stopped", dht.node.id.HexShort())
} }
@ -245,8 +255,6 @@ func (dht *DHT) Announce(hash Bitmap) error {
} }
func (dht *DHT) startReannouncer() { func (dht *DHT) startReannouncer() {
dht.stop.Add(1)
defer dht.stop.Done()
tick := time.NewTicker(tReannounce) tick := time.NewTicker(tReannounce)
for { for {
select { select {
@ -255,7 +263,9 @@ func (dht *DHT) startReannouncer() {
case <-tick.C: case <-tick.C:
dht.lock.RLock() dht.lock.RLock()
for h := range dht.announced { for h := range dht.announced {
dht.stop.Add(1)
go func(bm Bitmap) { go func(bm Bitmap) {
defer dht.stop.Done()
if err := dht.Announce(bm); err != nil { if err := dht.Announce(bm); err != nil {
log.Error("error re-announcing bitmap - ", err) log.Error("error re-announcing bitmap - ", err)
} }

View file

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce"
"github.com/lbryio/reflector.go/store" "github.com/lbryio/reflector.go/store"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
@ -27,59 +28,69 @@ const (
// Server is an instance of a peer server that houses the listener and store. // Server is an instance of a peer server that houses the listener and store.
type Server struct { type Server struct {
store store.BlobStore store store.BlobStore
l net.Listener
closed bool closed bool
stop *stopOnce.Stopper
} }
// NewServer returns an initialized Server pointer. // NewServer returns an initialized Server pointer.
func NewServer(store store.BlobStore) *Server { func NewServer(store store.BlobStore) *Server {
return &Server{ return &Server{
store: store, store: store,
stop: stopOnce.New(),
} }
} }
// Shutdown gracefully shuts down the peer server. // Shutdown gracefully shuts down the peer server.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
// TODO: need waitgroup so we can finish whatever we're doing before stopping log.Debug("shutting down peer server...")
s.closed = true s.stop.StopAndWait()
if err := s.l.Close(); err != nil {
log.Error("error shuting down peer server - ", err)
}
} }
// ListenAndServe starts the server listener to handle connections. // Start starts the server listener to handle connections.
func (s *Server) ListenAndServe(address string) error { func (s *Server) Start(address string) error {
log.Println("Listening on " + address)
log.Println("peer listening on " + address)
l, err := net.Listen("tcp", address) l, err := net.Listen("tcp", address)
if err != nil { if err != nil {
return err return err
} }
defer func(listener net.Listener) {
if err := listener.Close(); err != nil {
log.Error("error closing listener for peer server - ", err)
}
}(l)
go s.listenForShutdown(l)
s.stop.Add(1)
go func() {
defer s.stop.Done()
s.listenAndServe(l)
}()
return nil
}
func (s *Server) listenForShutdown(listener net.Listener) {
<-s.stop.Ch()
s.closed = true
if err := listener.Close(); err != nil {
log.Error("error closing listener for peer server - ", err)
}
}
func (s *Server) listenAndServe(listener net.Listener) {
for { for {
conn, err := l.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
if s.closed { if s.closed {
return nil return
} }
log.Error(err) log.Error(err)
} else { } else {
s.stop.Add(1)
go s.handleConnection(conn) go s.handleConnection(conn)
} }
} }
} }
func (s *Server) handleConnection(conn net.Conn) { func (s *Server) handleConnection(conn net.Conn) {
defer func(conn net.Conn) { defer s.stop.Done()
if err := conn.Close(); err != nil {
log.Error("error closing client connection for peer server - ", err)
}
}(conn)
timeoutDuration := 5 * time.Second timeoutDuration := 5 * time.Second
for { for {

View file

@ -27,7 +27,7 @@ func TestMain(m *testing.M) {
ms := store.MemoryBlobStore{} ms := store.MemoryBlobStore{}
s := NewServer(&ms) s := NewServer(&ms)
go func() { go func() {
if err := s.ListenAndServe(address); err != nil { if err := s.Start(address); err != nil {
log.Panic("error starting up reflector server - ", err) log.Panic("error starting up reflector server - ", err)
} }
}() }()

View file

@ -1,6 +1,8 @@
package reflector package reflector
import ( import (
"strconv"
"github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/lbry.go/stopOnce"
"github.com/lbryio/reflector.go/cluster" "github.com/lbryio/reflector.go/cluster"
"github.com/lbryio/reflector.go/dht" "github.com/lbryio/reflector.go/dht"
@ -33,21 +35,20 @@ func NewPrism(store store.BlobStore, clusterSeedAddr string) *Prism {
} }
} }
// Connect starts the components of the application. // Start starts the components of the application.
func (p *Prism) Connect() error { func (p *Prism) Start() error {
err := p.dht.Start() if err := p.dht.Start(); err != nil {
if err != nil {
return err return err
} }
if err := p.cluster.Connect(); err != nil {
err = p.cluster.Connect() return err
if err != nil { }
if err := p.peer.Start("localhost:" + strconv.Itoa(peer.DefaultPort)); err != nil {
return err
}
if err := p.reflector.Start("localhost:" + strconv.Itoa(DefaultPort)); err != nil {
return err return err
} }
// start peer
// start reflector
return nil return nil
} }

View file

@ -7,9 +7,9 @@ import (
"net" "net"
"strconv" "strconv"
"github.com/lbryio/reflector.go/store"
"github.com/lbryio/lbry.go/errors" "github.com/lbryio/lbry.go/errors"
"github.com/lbryio/lbry.go/stopOnce"
"github.com/lbryio/reflector.go/store"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -17,62 +17,71 @@ import (
// Server is and instance of the reflector server. It houses the blob store and listener. // Server is and instance of the reflector server. It houses the blob store and listener.
type Server struct { type Server struct {
store store.BlobStore store store.BlobStore
l net.Listener
closed bool closed bool
stop *stopOnce.Stopper
} }
// NewServer returns an initialized reflector server pointer. // NewServer returns an initialized reflector server pointer.
func NewServer(store store.BlobStore) *Server { func NewServer(store store.BlobStore) *Server {
return &Server{ return &Server{
store: store, store: store,
stop: stopOnce.New(),
} }
} }
// Shutdown shuts down the reflector server gracefully. // Shutdown shuts down the reflector server gracefully.
func (s *Server) Shutdown() { func (s *Server) Shutdown() {
// TODO: need waitgroup so we can finish whatever we're doing before stopping log.Debug("shutting down reflector server...")
s.closed = true s.stop.StopAndWait()
if err := s.l.Close(); err != nil {
log.Error("error shutting down reflector server - ", err)
}
} }
//ListenAndServe starts the server listener to handle connections. //Start starts the server listener to handle connections.
func (s *Server) ListenAndServe(address string) error { func (s *Server) Start(address string) error {
//ToDo - We should make this DRY as it is the same code in both servers. //ToDo - We should make this DRY as it is the same code in both servers.
log.Println("Listening on " + address) log.Println("reflector listening on " + address)
l, err := net.Listen("tcp", address) l, err := net.Listen("tcp", address)
if err != nil { if err != nil {
return err return err
} }
defer func(listener net.Listener) { go s.listenForShutdown(l)
if err := listener.Close(); err != nil {
log.Error("error closing reflector server listener - ", err)
}
}(l)
s.stop.Add(1)
go func() {
defer s.stop.Done()
s.listenAndServe(l)
}()
return nil
}
func (s *Server) listenForShutdown(listener net.Listener) {
<-s.stop.Ch()
s.closed = true
if err := listener.Close(); err != nil {
log.Error("error closing listener for peer server - ", err)
}
}
func (s *Server) listenAndServe(listener net.Listener) {
for { for {
conn, err := l.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
if s.closed { if s.closed {
return nil return
} }
log.Error(err) log.Error(err)
} else { } else {
s.stop.Add(1)
go s.handleConn(conn) go s.handleConn(conn)
} }
} }
} }
func (s *Server) handleConn(conn net.Conn) { func (s *Server) handleConn(conn net.Conn) {
defer s.stop.Done()
// TODO: connection should time out eventually // TODO: connection should time out eventually
defer func(conn net.Conn) {
if err := conn.Close(); err != nil {
log.Error("error closing reflector client connection - ", err)
}
}(conn)
err := s.doHandshake(conn) err := s.doHandshake(conn)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {