diff --git a/cmd/chihaya/main.go b/cmd/chihaya/main.go index 699f3b1..83ecf20 100644 --- a/cmd/chihaya/main.go +++ b/cmd/chihaya/main.go @@ -113,19 +113,19 @@ func combineErrors(prefix string, errs []error) error { // Stop shuts down an instance of Chihaya. func (r *Run) Stop(keepPeerStore bool) (storage.PeerStore, error) { log.Debug("stopping frontends and prometheus endpoint") - if errs := r.sg.Stop(); len(errs) != 0 { + if errs := r.sg.Stop().Wait(); len(errs) != 0 { return nil, combineErrors("failed while shutting down frontends", errs) } log.Debug("stopping logic") - if errs := r.logic.Stop(); len(errs) != 0 { + if errs := r.logic.Stop().Wait(); len(errs) != 0 { return nil, combineErrors("failed while shutting down middleware", errs) } if !keepPeerStore { log.Debug("stopping peer store") - if err, closed := <-r.peerStore.Stop(); !closed { - return nil, err + if errs := r.peerStore.Stop().Wait(); len(errs) != 0 { + return nil, combineErrors("failed while shutting down peer store", errs) } r.peerStore = nil } diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 6e252d9..6dcd0df 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -14,6 +14,7 @@ import ( "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/frontend" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/stop" ) // Config represents all of the configurable options for an HTTP BitTorrent @@ -140,17 +141,13 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error } // Stop provides a thread-safe way to shutdown a currently running Frontend. -func (f *Frontend) Stop() <-chan error { - c := make(chan error) +func (f *Frontend) Stop() stop.Result { + c := make(stop.Channel) go func() { - if err := f.srv.Shutdown(context.Background()); err != nil { - c <- err - } else { - close(c) - } + c.Done(f.srv.Shutdown(context.Background())) }() - return c + return c.Result() } func (f *Frontend) handler() http.Handler { diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index 8dea946..1681adb 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -87,26 +87,22 @@ func NewFrontend(logic frontend.TrackerLogic, cfg Config) (*Frontend, error) { } // Stop provides a thread-safe way to shutdown a currently running Frontend. -func (t *Frontend) Stop() <-chan error { +func (t *Frontend) Stop() stop.Result { select { case <-t.closing: return stop.AlreadyStopped default: } - c := make(chan error) + c := make(stop.Channel) go func() { close(t.closing) t.socket.SetReadDeadline(time.Now()) t.wg.Wait() - if err := t.socket.Close(); err != nil { - c <- err - } else { - close(c) - } + c.Done(t.socket.Close()) }() - return c + return c.Result() } // listenAndServe blocks while listening and serving UDP BitTorrent requests diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index fddb7ef..066cbd9 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -144,19 +144,19 @@ func (h *hook) updateKeys() error { return nil } -func (h *hook) Stop() <-chan error { +func (h *hook) Stop() stop.Result { log.Debug("attempting to shutdown JWT middleware") select { case <-h.closing: return stop.AlreadyStopped default: } - c := make(chan error) + c := make(stop.Channel) go func() { close(h.closing) - close(c) + c.Done() }() - return c + return c.Result() } func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) { diff --git a/middleware/logic.go b/middleware/logic.go index 1f42088..3c06d25 100644 --- a/middleware/logic.go +++ b/middleware/logic.go @@ -105,7 +105,7 @@ func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, // Stop stops the Logic. // // This stops any hooks that implement stop.Stopper. -func (l *Logic) Stop() []error { +func (l *Logic) Stop() stop.Result { stopGroup := stop.NewGroup() for _, hook := range l.preHooks { stoppable, ok := hook.(stop.Stopper) diff --git a/pkg/prometheus/server.go b/pkg/prometheus/server.go index 89c8279..1330a9f 100644 --- a/pkg/prometheus/server.go +++ b/pkg/prometheus/server.go @@ -7,6 +7,7 @@ import ( "net/http" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/stop" "github.com/prometheus/client_golang/prometheus" ) @@ -17,17 +18,13 @@ type Server struct { } // Stop shuts down the server. -func (s *Server) Stop() <-chan error { - c := make(chan error) +func (s *Server) Stop() stop.Result { + c := make(stop.Channel) go func() { - if err := s.srv.Shutdown(context.Background()); err != nil { - c <- err - } else { - close(c) - } + c.Done(s.srv.Shutdown(context.Background())) }() - return c + return c.Result() } // NewServer creates a new instance of a Prometheus server that asynchronously diff --git a/pkg/stop/stop.go b/pkg/stop/stop.go index 9703965..d3b29dd 100644 --- a/pkg/stop/stop.go +++ b/pkg/stop/stop.go @@ -5,17 +5,45 @@ import ( "sync" ) +// Channel is used to return zero or more errors asynchronously. Call Done() +// once to pass errors to the Channel. +type Channel chan []error + +// Result is a receive-only version of Channel. Call Wait() once to receive any +// returned errors. +type Result <-chan []error + +// Done adds zero or more errors to the Channel and closes it, indicating the +// caller has finished stopping. It should be called exactly once. +func (ch Channel) Done(errs ...error) { + if len(errs) > 0 && errs[0] != nil { + ch <- errs + } + close(ch) +} + +// Result converts a Channel to a Result. +func (ch Channel) Result() <-chan []error { + return ch +} + +// Wait blocks until Done() is called on the underlying Channel and returns any +// errors. It should be called exactly once. +func (r Result) Wait() []error { + return <-r +} + // AlreadyStopped is a closed error channel to be used by Funcs when // an element was already stopped. -var AlreadyStopped <-chan error +var AlreadyStopped Result // AlreadyStoppedFunc is a Func that returns AlreadyStopped. -var AlreadyStoppedFunc = func() <-chan error { return AlreadyStopped } +var AlreadyStoppedFunc = func() Result { return AlreadyStopped } func init() { - closeMe := make(chan error) + closeMe := make(Channel) close(closeMe) - AlreadyStopped = closeMe + AlreadyStopped = closeMe.Result() } // Stopper is an interface that allows a clean shutdown. @@ -27,11 +55,11 @@ type Stopper interface { // Closing the channel signals a clean shutdown. // Stop() should return immediately and perform the actual shutdown in a // separate goroutine. - Stop() <-chan error + Stop() Result } // Func is a function that can be used to provide a clean shutdown. -type Func func() <-chan error +type Func func() Result // Group is a collection of Stoppers that can be stopped all at once. type Group struct { @@ -67,14 +95,13 @@ func (cg *Group) AddFunc(toAddFunc Func) { // Stopping will be done in a concurrent fashion. // The slice of errors returned contains all errors returned by stopping the // members. -func (cg *Group) Stop() []error { +func (cg *Group) Stop() Result { cg.Lock() defer cg.Unlock() - var errors []error - whenDone := make(chan struct{}) + whenDone := make(Channel) - waitChannels := make([]<-chan error, 0, len(cg.stoppables)) + waitChannels := make([]Result, 0, len(cg.stoppables)) for _, toStop := range cg.stoppables { waitFor := toStop() if waitFor == nil { @@ -84,15 +111,15 @@ func (cg *Group) Stop() []error { } go func() { + var errors []error for _, waitForMe := range waitChannels { - err := <-waitForMe - if err != nil { - errors = append(errors, err) + childErrors := waitForMe.Wait() + if len(childErrors) > 0 { + errors = append(errors, childErrors...) } } - close(whenDone) + whenDone.Done(errors...) }() - <-whenDone - return errors + return whenDone.Result() } diff --git a/storage/memory/peer_store.go b/storage/memory/peer_store.go index 073671b..b45c7cd 100644 --- a/storage/memory/peer_store.go +++ b/storage/memory/peer_store.go @@ -13,6 +13,7 @@ import ( "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/storage" ) @@ -570,8 +571,8 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { return nil } -func (ps *peerStore) Stop() <-chan error { - c := make(chan error) +func (ps *peerStore) Stop() stop.Result { + c := make(stop.Channel) go func() { close(ps.closed) ps.wg.Wait() @@ -583,10 +584,10 @@ func (ps *peerStore) Stop() <-chan error { } ps.shards = shards - close(c) + c.Done() }() - return c + return c.Result() } func (ps *peerStore) LogFields() log.Fields { diff --git a/storage/memorybysubnet/peer_store.go b/storage/memorybysubnet/peer_store.go index cda08ef..ade63a1 100644 --- a/storage/memorybysubnet/peer_store.go +++ b/storage/memorybysubnet/peer_store.go @@ -14,6 +14,7 @@ import ( "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/storage" ) @@ -707,8 +708,8 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error { return nil } -func (ps *peerStore) Stop() <-chan error { - c := make(chan error) +func (ps *peerStore) Stop() stop.Result { + c := make(stop.Channel) go func() { close(ps.closed) ps.wg.Wait() @@ -720,10 +721,10 @@ func (ps *peerStore) Stop() <-chan error { } ps.shards = shards - close(c) + c.Done() }() - return c + return c.Result() } func (ps *peerStore) LogFields() log.Fields {