Merge pull request #413 from chihaya/recursive-stop-groups

Return []error from Stop() channel, allow recursive stop groups
This commit is contained in:
Justin Li 2018-09-11 17:14:41 -04:00 committed by GitHub
commit 1a4e4c833b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 76 additions and 57 deletions

View file

@ -113,19 +113,19 @@ func combineErrors(prefix string, errs []error) error {
// Stop shuts down an instance of Chihaya. // Stop shuts down an instance of Chihaya.
func (r *Run) Stop(keepPeerStore bool) (storage.PeerStore, error) { func (r *Run) Stop(keepPeerStore bool) (storage.PeerStore, error) {
log.Debug("stopping frontends and prometheus endpoint") log.Debug("stopping frontends and prometheus endpoint")
if errs := r.sg.Stop(); len(errs) != 0 { if errs := r.sg.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down frontends", errs) return nil, combineErrors("failed while shutting down frontends", errs)
} }
log.Debug("stopping logic") log.Debug("stopping logic")
if errs := r.logic.Stop(); len(errs) != 0 { if errs := r.logic.Stop().Wait(); len(errs) != 0 {
return nil, combineErrors("failed while shutting down middleware", errs) return nil, combineErrors("failed while shutting down middleware", errs)
} }
if !keepPeerStore { if !keepPeerStore {
log.Debug("stopping peer store") log.Debug("stopping peer store")
if err, closed := <-r.peerStore.Stop(); !closed { if errs := r.peerStore.Stop().Wait(); len(errs) != 0 {
return nil, err return nil, combineErrors("failed while shutting down peer store", errs)
} }
r.peerStore = nil r.peerStore = nil
} }

View file

@ -14,6 +14,7 @@ import (
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/frontend" "github.com/chihaya/chihaya/frontend"
"github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
) )
// Config represents all of the configurable options for an HTTP BitTorrent // Config represents all of the configurable options for an HTTP BitTorrent
@ -140,17 +141,13 @@ func NewFrontend(logic frontend.TrackerLogic, provided Config) (*Frontend, error
} }
// Stop provides a thread-safe way to shutdown a currently running Frontend. // Stop provides a thread-safe way to shutdown a currently running Frontend.
func (f *Frontend) Stop() <-chan error { func (f *Frontend) Stop() stop.Result {
c := make(chan error) c := make(stop.Channel)
go func() { go func() {
if err := f.srv.Shutdown(context.Background()); err != nil { c.Done(f.srv.Shutdown(context.Background()))
c <- err
} else {
close(c)
}
}() }()
return c return c.Result()
} }
func (f *Frontend) handler() http.Handler { func (f *Frontend) handler() http.Handler {

View file

@ -87,26 +87,22 @@ func NewFrontend(logic frontend.TrackerLogic, cfg Config) (*Frontend, error) {
} }
// Stop provides a thread-safe way to shutdown a currently running Frontend. // Stop provides a thread-safe way to shutdown a currently running Frontend.
func (t *Frontend) Stop() <-chan error { func (t *Frontend) Stop() stop.Result {
select { select {
case <-t.closing: case <-t.closing:
return stop.AlreadyStopped return stop.AlreadyStopped
default: default:
} }
c := make(chan error) c := make(stop.Channel)
go func() { go func() {
close(t.closing) close(t.closing)
t.socket.SetReadDeadline(time.Now()) t.socket.SetReadDeadline(time.Now())
t.wg.Wait() t.wg.Wait()
if err := t.socket.Close(); err != nil { c.Done(t.socket.Close())
c <- err
} else {
close(c)
}
}() }()
return c return c.Result()
} }
// listenAndServe blocks while listening and serving UDP BitTorrent requests // listenAndServe blocks while listening and serving UDP BitTorrent requests

View file

@ -144,19 +144,19 @@ func (h *hook) updateKeys() error {
return nil return nil
} }
func (h *hook) Stop() <-chan error { func (h *hook) Stop() stop.Result {
log.Debug("attempting to shutdown JWT middleware") log.Debug("attempting to shutdown JWT middleware")
select { select {
case <-h.closing: case <-h.closing:
return stop.AlreadyStopped return stop.AlreadyStopped
default: default:
} }
c := make(chan error) c := make(stop.Channel)
go func() { go func() {
close(h.closing) close(h.closing)
close(c) c.Done()
}() }()
return c return c.Result()
} }
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) { func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (context.Context, error) {

View file

@ -105,7 +105,7 @@ func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
// Stop stops the Logic. // Stop stops the Logic.
// //
// This stops any hooks that implement stop.Stopper. // This stops any hooks that implement stop.Stopper.
func (l *Logic) Stop() []error { func (l *Logic) Stop() stop.Result {
stopGroup := stop.NewGroup() stopGroup := stop.NewGroup()
for _, hook := range l.preHooks { for _, hook := range l.preHooks {
stoppable, ok := hook.(stop.Stopper) stoppable, ok := hook.(stop.Stopper)

View file

@ -7,6 +7,7 @@ import (
"net/http" "net/http"
"github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -17,17 +18,13 @@ type Server struct {
} }
// Stop shuts down the server. // Stop shuts down the server.
func (s *Server) Stop() <-chan error { func (s *Server) Stop() stop.Result {
c := make(chan error) c := make(stop.Channel)
go func() { go func() {
if err := s.srv.Shutdown(context.Background()); err != nil { c.Done(s.srv.Shutdown(context.Background()))
c <- err
} else {
close(c)
}
}() }()
return c return c.Result()
} }
// NewServer creates a new instance of a Prometheus server that asynchronously // NewServer creates a new instance of a Prometheus server that asynchronously

View file

@ -5,17 +5,45 @@ import (
"sync" "sync"
) )
// Channel is used to return zero or more errors asynchronously. Call Done()
// once to pass errors to the Channel.
type Channel chan []error
// Result is a receive-only version of Channel. Call Wait() once to receive any
// returned errors.
type Result <-chan []error
// Done adds zero or more errors to the Channel and closes it, indicating the
// caller has finished stopping. It should be called exactly once.
func (ch Channel) Done(errs ...error) {
if len(errs) > 0 && errs[0] != nil {
ch <- errs
}
close(ch)
}
// Result converts a Channel to a Result.
func (ch Channel) Result() <-chan []error {
return ch
}
// Wait blocks until Done() is called on the underlying Channel and returns any
// errors. It should be called exactly once.
func (r Result) Wait() []error {
return <-r
}
// AlreadyStopped is a closed error channel to be used by Funcs when // AlreadyStopped is a closed error channel to be used by Funcs when
// an element was already stopped. // an element was already stopped.
var AlreadyStopped <-chan error var AlreadyStopped Result
// AlreadyStoppedFunc is a Func that returns AlreadyStopped. // AlreadyStoppedFunc is a Func that returns AlreadyStopped.
var AlreadyStoppedFunc = func() <-chan error { return AlreadyStopped } var AlreadyStoppedFunc = func() Result { return AlreadyStopped }
func init() { func init() {
closeMe := make(chan error) closeMe := make(Channel)
close(closeMe) close(closeMe)
AlreadyStopped = closeMe AlreadyStopped = closeMe.Result()
} }
// Stopper is an interface that allows a clean shutdown. // Stopper is an interface that allows a clean shutdown.
@ -27,11 +55,11 @@ type Stopper interface {
// Closing the channel signals a clean shutdown. // Closing the channel signals a clean shutdown.
// Stop() should return immediately and perform the actual shutdown in a // Stop() should return immediately and perform the actual shutdown in a
// separate goroutine. // separate goroutine.
Stop() <-chan error Stop() Result
} }
// Func is a function that can be used to provide a clean shutdown. // Func is a function that can be used to provide a clean shutdown.
type Func func() <-chan error type Func func() Result
// Group is a collection of Stoppers that can be stopped all at once. // Group is a collection of Stoppers that can be stopped all at once.
type Group struct { type Group struct {
@ -67,14 +95,13 @@ func (cg *Group) AddFunc(toAddFunc Func) {
// Stopping will be done in a concurrent fashion. // Stopping will be done in a concurrent fashion.
// The slice of errors returned contains all errors returned by stopping the // The slice of errors returned contains all errors returned by stopping the
// members. // members.
func (cg *Group) Stop() []error { func (cg *Group) Stop() Result {
cg.Lock() cg.Lock()
defer cg.Unlock() defer cg.Unlock()
var errors []error whenDone := make(Channel)
whenDone := make(chan struct{})
waitChannels := make([]<-chan error, 0, len(cg.stoppables)) waitChannels := make([]Result, 0, len(cg.stoppables))
for _, toStop := range cg.stoppables { for _, toStop := range cg.stoppables {
waitFor := toStop() waitFor := toStop()
if waitFor == nil { if waitFor == nil {
@ -84,15 +111,15 @@ func (cg *Group) Stop() []error {
} }
go func() { go func() {
var errors []error
for _, waitForMe := range waitChannels { for _, waitForMe := range waitChannels {
err := <-waitForMe childErrors := waitForMe.Wait()
if err != nil { if len(childErrors) > 0 {
errors = append(errors, err) errors = append(errors, childErrors...)
} }
} }
close(whenDone) whenDone.Done(errors...)
}() }()
<-whenDone return whenDone.Result()
return errors
} }

View file

@ -13,6 +13,7 @@ import (
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/pkg/timecache"
"github.com/chihaya/chihaya/storage" "github.com/chihaya/chihaya/storage"
) )
@ -570,8 +571,8 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
return nil return nil
} }
func (ps *peerStore) Stop() <-chan error { func (ps *peerStore) Stop() stop.Result {
c := make(chan error) c := make(stop.Channel)
go func() { go func() {
close(ps.closed) close(ps.closed)
ps.wg.Wait() ps.wg.Wait()
@ -583,10 +584,10 @@ func (ps *peerStore) Stop() <-chan error {
} }
ps.shards = shards ps.shards = shards
close(c) c.Done()
}() }()
return c return c.Result()
} }
func (ps *peerStore) LogFields() log.Fields { func (ps *peerStore) LogFields() log.Fields {

View file

@ -14,6 +14,7 @@ import (
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/log" "github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/pkg/timecache" "github.com/chihaya/chihaya/pkg/timecache"
"github.com/chihaya/chihaya/storage" "github.com/chihaya/chihaya/storage"
) )
@ -707,8 +708,8 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
return nil return nil
} }
func (ps *peerStore) Stop() <-chan error { func (ps *peerStore) Stop() stop.Result {
c := make(chan error) c := make(stop.Channel)
go func() { go func() {
close(ps.closed) close(ps.closed)
ps.wg.Wait() ps.wg.Wait()
@ -720,10 +721,10 @@ func (ps *peerStore) Stop() <-chan error {
} }
ps.shards = shards ps.shards = shards
close(c) c.Done()
}() }()
return c return c.Result()
} }
func (ps *peerStore) LogFields() log.Fields { func (ps *peerStore) LogFields() log.Fields {