From a2a0b27bc40741a0c7d340b810a4122551e1fa09 Mon Sep 17 00:00:00 2001 From: Mark Beamer Jr Date: Wed, 6 Jun 2018 23:48:07 -0400 Subject: [PATCH] implemented stopper pattern -made defer adjustments inline and deleted the separate function. -adjusted method in upload to take the only parameter it requires. -Implemented stopper param for reflector server -Aligned Cluster New to NewCluster -Adjusted DHT to use StopAndWait -Removed blocking waitgroup add -Unified all components under prism. -Moved defer done outside of functions. -renamed NewCluster to New -fixed travis errors. --- cluster/cluster.go | 51 ++++++++++++++++++----------------- cmd/cluster.go | 2 +- cmd/peer.go | 13 ++++++++- cmd/reflector.go | 15 +++++++++-- cmd/start.go | 9 +++---- dht/dht.go | 26 ++++++++++++------ peer/server.go | 55 ++++++++++++++++++++++---------------- reflector/client_test.go | 2 +- reflector/prism.go | 23 ++++++++-------- reflector/server.go | 57 +++++++++++++++++++++++----------------- 10 files changed, 151 insertions(+), 102 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 6ac8e76..4222b39 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -66,8 +66,11 @@ func (c *Cluster) Connect() error { return err } } - - c.listen() + c.stop.Add(1) + go func() { + defer c.stop.Done() + c.listen() + }() return nil } @@ -80,33 +83,29 @@ func (c *Cluster) Shutdown() { } func (c *Cluster) listen() { - c.stop.Add(1) - go func() { - defer c.stop.Done() - for { - select { - case <-c.stop.Ch(): - return - case event := <-c.eventCh: - switch event.EventType() { - case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: - memberEvent := event.(serf.MemberEvent) - if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == c.name { - // ignore event from my own joining of the cluster - continue - } - - //spew.Dump(c.Members()) - alive := getAliveMembers(c.s.Members()) - log.Printf("%s: my hash range is now %d of %d\n", c.name, getHashRangeStart(c.name, alive), len(alive)) - // figure out my new hash range based on the start and the number of alive members - // get hashes in that range that need announcing - // announce them - // if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time + for { + select { + case <-c.stop.Ch(): + return + case event := <-c.eventCh: + switch event.EventType() { + case serf.EventMemberJoin, serf.EventMemberFailed, serf.EventMemberLeave: + memberEvent := event.(serf.MemberEvent) + if event.EventType() == serf.EventMemberJoin && len(memberEvent.Members) == 1 && memberEvent.Members[0].Name == c.name { + // ignore event from my own joining of the cluster + continue } + + //spew.Dump(c.Members()) + alive := getAliveMembers(c.s.Members()) + log.Printf("%s: my hash range is now %d of %d\n", c.name, getHashRangeStart(c.name, alive), len(alive)) + // figure out my new hash range based on the start and the number of alive members + // get hashes in that range that need announcing + // announce them + // if more than one node is announcing each hash, figure out how to deal with last_announced_at so both nodes dont announce the same thing at the same time } } - }() + } } func getHashRangeStart(myName string, members []serf.Member) int { diff --git a/cmd/cluster.go b/cmd/cluster.go index 3d41f63..0a15e8b 100644 --- a/cmd/cluster.go +++ b/cmd/cluster.go @@ -16,7 +16,7 @@ import ( func init() { var cmd = &cobra.Command{ Use: "cluster [start|join]", - Short: "Connect to cluster", + Short: "Start(join) to or Start a new cluster", ValidArgs: []string{"start", "join"}, Args: argFuncs(cobra.ExactArgs(1), cobra.OnlyValidArgs), Run: clusterCmd, diff --git a/cmd/peer.go b/cmd/peer.go index edf864f..a362dd7 100644 --- a/cmd/peer.go +++ b/cmd/peer.go @@ -1,7 +1,10 @@ package cmd import ( + "os" + "os/signal" "strconv" + "syscall" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/peer" @@ -27,5 +30,13 @@ func peerCmd(cmd *cobra.Command, args []string) { s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) combo := store.NewDBBackedS3Store(s3, db) - log.Fatal(peer.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(peer.DefaultPort))) + peerServer := peer.NewServer(combo) + if err := peerServer.Start("localhost:" + strconv.Itoa(peer.DefaultPort)); err != nil { + log.Fatal(err) + } + + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + <-interruptChan + peerServer.Shutdown() } diff --git a/cmd/reflector.go b/cmd/reflector.go index c2975f2..80d0c86 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -1,13 +1,16 @@ package cmd import ( + "os" + "os/signal" "strconv" + "syscall" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" - log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" ) @@ -27,5 +30,13 @@ func reflectorCmd(cmd *cobra.Command, args []string) { s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) combo := store.NewDBBackedS3Store(s3, db) - log.Fatal(reflector.NewServer(combo).ListenAndServe("localhost:" + strconv.Itoa(reflector.DefaultPort))) + reflectorServer := reflector.NewServer(combo) + if err := reflectorServer.Start("localhost:" + strconv.Itoa(reflector.DefaultPort)); err != nil { + log.Fatal(err) + } + + interruptChan := make(chan os.Signal, 1) + signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) + <-interruptChan + reflectorServer.Shutdown() } diff --git a/cmd/start.go b/cmd/start.go index 7a25d81..027c19e 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -24,10 +24,10 @@ func init() { } func startCmd(cmd *cobra.Command, args []string) { + log.SetLevel(log.DebugLevel) db := new(db.SQL) err := db.Connect(globalConfig.DBConn) checkErr(err) - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) comboStore := store.NewDBBackedS3Store(s3, db) @@ -37,12 +37,9 @@ func startCmd(cmd *cobra.Command, args []string) { } p := reflector.NewPrism(comboStore, clusterAddr) - err = p.Connect() - if err != nil { - log.Error(err) - return + if err = p.Start(); err != nil { + log.Fatal(err) } - interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan diff --git a/dht/dht.go b/dht/dht.go index 250fcc1..a8f068f 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -135,6 +135,11 @@ func (dht *DHT) join() { // now call iterativeFind on yourself nf := newContactFinder(dht.node, dht.node.id, false) + // stop if dht is stopped + go func(finder *contactFinder) { + <-dht.stop.Ch() + nf.Cancel() + }(nf) _, err := nf.Find() if err != nil { log.Errorf("[%s] join: %s", dht.node.id.HexShort(), err.Error()) @@ -151,14 +156,20 @@ func (dht *DHT) Start() error { return errors.Err(err) } conn := listener.(*net.UDPConn) - err = dht.node.Connect(conn) if err != nil { return err } - - dht.join() - dht.startReannouncer() + dht.stop.Add(1) + go func() { + defer dht.stop.Done() + dht.join() + }() + dht.stop.Add(1) + go func() { + defer dht.stop.Done() + dht.startReannouncer() + }() log.Debugf("[%s] DHT ready on %s (%d nodes found during join)", dht.node.id.HexShort(), dht.contact.Addr().String(), dht.node.rt.Count()) return nil @@ -175,8 +186,7 @@ func (dht *DHT) WaitUntilJoined() { // Shutdown shuts down the dht func (dht *DHT) Shutdown() { log.Debugf("[%s] DHT shutting down", dht.node.id.HexShort()) - dht.stop.Stop() - dht.stop.Wait() + dht.stop.StopAndWait() dht.node.Shutdown() log.Debugf("[%s] DHT stopped", dht.node.id.HexShort()) } @@ -245,8 +255,6 @@ func (dht *DHT) Announce(hash Bitmap) error { } func (dht *DHT) startReannouncer() { - dht.stop.Add(1) - defer dht.stop.Done() tick := time.NewTicker(tReannounce) for { select { @@ -255,7 +263,9 @@ func (dht *DHT) startReannouncer() { case <-tick.C: dht.lock.RLock() for h := range dht.announced { + dht.stop.Add(1) go func(bm Bitmap) { + defer dht.stop.Done() if err := dht.Announce(bm); err != nil { log.Error("error re-announcing bitmap - ", err) } diff --git a/peer/server.go b/peer/server.go index 2b115b6..5143c85 100644 --- a/peer/server.go +++ b/peer/server.go @@ -11,6 +11,7 @@ import ( "time" "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/reflector.go/store" "github.com/davecgh/go-spew/spew" @@ -27,59 +28,69 @@ const ( // Server is an instance of a peer server that houses the listener and store. type Server struct { store store.BlobStore - l net.Listener closed bool + + stop *stopOnce.Stopper } // NewServer returns an initialized Server pointer. func NewServer(store store.BlobStore) *Server { return &Server{ store: store, + stop: stopOnce.New(), } } // Shutdown gracefully shuts down the peer server. func (s *Server) Shutdown() { - // TODO: need waitgroup so we can finish whatever we're doing before stopping - s.closed = true - if err := s.l.Close(); err != nil { - log.Error("error shuting down peer server - ", err) - } + log.Debug("shutting down peer server...") + s.stop.StopAndWait() } -// ListenAndServe starts the server listener to handle connections. -func (s *Server) ListenAndServe(address string) error { - log.Println("Listening on " + address) +// Start starts the server listener to handle connections. +func (s *Server) Start(address string) error { + + log.Println("peer listening on " + address) l, err := net.Listen("tcp", address) if err != nil { return err } - defer func(listener net.Listener) { - if err := listener.Close(); err != nil { - log.Error("error closing listener for peer server - ", err) - } - }(l) + go s.listenForShutdown(l) + s.stop.Add(1) + go func() { + defer s.stop.Done() + s.listenAndServe(l) + }() + + return nil +} + +func (s *Server) listenForShutdown(listener net.Listener) { + <-s.stop.Ch() + s.closed = true + if err := listener.Close(); err != nil { + log.Error("error closing listener for peer server - ", err) + } +} + +func (s *Server) listenAndServe(listener net.Listener) { for { - conn, err := l.Accept() + conn, err := listener.Accept() if err != nil { if s.closed { - return nil + return } log.Error(err) } else { + s.stop.Add(1) go s.handleConnection(conn) } } } func (s *Server) handleConnection(conn net.Conn) { - defer func(conn net.Conn) { - if err := conn.Close(); err != nil { - log.Error("error closing client connection for peer server - ", err) - } - }(conn) - + defer s.stop.Done() timeoutDuration := 5 * time.Second for { diff --git a/reflector/client_test.go b/reflector/client_test.go index 6067e9f..7a7cb07 100644 --- a/reflector/client_test.go +++ b/reflector/client_test.go @@ -27,7 +27,7 @@ func TestMain(m *testing.M) { ms := store.MemoryBlobStore{} s := NewServer(&ms) go func() { - if err := s.ListenAndServe(address); err != nil { + if err := s.Start(address); err != nil { log.Panic("error starting up reflector server - ", err) } }() diff --git a/reflector/prism.go b/reflector/prism.go index 63c1ff2..40a99a7 100644 --- a/reflector/prism.go +++ b/reflector/prism.go @@ -1,6 +1,8 @@ package reflector import ( + "strconv" + "github.com/lbryio/lbry.go/stopOnce" "github.com/lbryio/reflector.go/cluster" "github.com/lbryio/reflector.go/dht" @@ -33,21 +35,20 @@ func NewPrism(store store.BlobStore, clusterSeedAddr string) *Prism { } } -// Connect starts the components of the application. -func (p *Prism) Connect() error { - err := p.dht.Start() - if err != nil { +// Start starts the components of the application. +func (p *Prism) Start() error { + if err := p.dht.Start(); err != nil { return err } - - err = p.cluster.Connect() - if err != nil { + if err := p.cluster.Connect(); err != nil { + return err + } + if err := p.peer.Start("localhost:" + strconv.Itoa(peer.DefaultPort)); err != nil { + return err + } + if err := p.reflector.Start("localhost:" + strconv.Itoa(DefaultPort)); err != nil { return err } - - // start peer - - // start reflector return nil } diff --git a/reflector/server.go b/reflector/server.go index 9ca9983..48cdaa1 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -7,9 +7,9 @@ import ( "net" "strconv" - "github.com/lbryio/reflector.go/store" - "github.com/lbryio/lbry.go/errors" + "github.com/lbryio/lbry.go/stopOnce" + "github.com/lbryio/reflector.go/store" log "github.com/sirupsen/logrus" ) @@ -17,62 +17,71 @@ import ( // Server is and instance of the reflector server. It houses the blob store and listener. type Server struct { store store.BlobStore - l net.Listener closed bool + + stop *stopOnce.Stopper } // NewServer returns an initialized reflector server pointer. func NewServer(store store.BlobStore) *Server { return &Server{ store: store, + stop: stopOnce.New(), } } // Shutdown shuts down the reflector server gracefully. func (s *Server) Shutdown() { - // TODO: need waitgroup so we can finish whatever we're doing before stopping - s.closed = true - if err := s.l.Close(); err != nil { - log.Error("error shutting down reflector server - ", err) - } + log.Debug("shutting down reflector server...") + s.stop.StopAndWait() } -//ListenAndServe starts the server listener to handle connections. -func (s *Server) ListenAndServe(address string) error { +//Start starts the server listener to handle connections. +func (s *Server) Start(address string) error { //ToDo - We should make this DRY as it is the same code in both servers. - log.Println("Listening on " + address) + log.Println("reflector listening on " + address) l, err := net.Listen("tcp", address) if err != nil { return err } - defer func(listener net.Listener) { - if err := listener.Close(); err != nil { - log.Error("error closing reflector server listener - ", err) - } - }(l) + go s.listenForShutdown(l) + s.stop.Add(1) + go func() { + defer s.stop.Done() + s.listenAndServe(l) + }() + + return nil +} + +func (s *Server) listenForShutdown(listener net.Listener) { + <-s.stop.Ch() + s.closed = true + if err := listener.Close(); err != nil { + log.Error("error closing listener for peer server - ", err) + } +} + +func (s *Server) listenAndServe(listener net.Listener) { for { - conn, err := l.Accept() + conn, err := listener.Accept() if err != nil { if s.closed { - return nil + return } log.Error(err) } else { + s.stop.Add(1) go s.handleConn(conn) } } } func (s *Server) handleConn(conn net.Conn) { + defer s.stop.Done() // TODO: connection should time out eventually - defer func(conn net.Conn) { - if err := conn.Close(); err != nil { - log.Error("error closing reflector client connection - ", err) - } - }(conn) - err := s.doHandshake(conn) if err != nil { if err == io.EOF {