removed waitgroup/pubchannel; added stoppablelistener

This commit is contained in:
Jimmy Zelinskie 2013-10-22 19:07:39 -04:00
parent 54e769a8f2
commit 2c7fc05c97

View file

@ -13,10 +13,11 @@ import (
"net/http" "net/http"
"path" "path"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/etix/stoppableListener"
"github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/storage" "github.com/pushrax/chihaya/storage"
"github.com/pushrax/chihaya/storage/tracker" "github.com/pushrax/chihaya/storage/tracker"
@ -24,19 +25,14 @@ import (
type Server struct { type Server struct {
conf *config.Config conf *config.Config
listener net.Listener listener *stoppableListener.StoppableListener
dbConnPool tracker.Pool dbConnPool tracker.Pool
serving bool
startTime time.Time startTime time.Time
deltaRequests int64 deltaRequests int64
rpm int64 rpm int64
waitgroup sync.WaitGroup
pubChan chan string
http.Server http.Server
} }
@ -49,7 +45,6 @@ func New(conf *config.Config) (*Server, error) {
s := &Server{ s := &Server{
conf: conf, conf: conf,
dbConnPool: pool, dbConnPool: pool,
pubChan: make(chan string),
Server: http.Server{ Server: http.Server{
Addr: conf.Addr, Addr: conf.Addr,
ReadTimeout: conf.ReadTimeout.Duration, ReadTimeout: conf.ReadTimeout.Duration,
@ -61,39 +56,30 @@ func New(conf *config.Config) (*Server, error) {
} }
func (s *Server) ListenAndServe() error { func (s *Server) ListenAndServe() error {
listener, err := net.Listen("tcp", s.Addr) l, err := net.Listen("tcp", s.Addr)
s.listener = listener
if err != nil { if err != nil {
return err return err
} }
s.serving = true sl := stoppableListener.Handle(l)
s.listener = sl
s.startTime = time.Now() s.startTime = time.Now()
go s.updateStats() go s.updateStats()
s.Serve(s.listener) s.Serve(s.listener)
s.waitgroup.Wait()
return nil return nil
} }
func (s *Server) Stop() error { func (s *Server) Stop() error {
s.serving = false s.listener.Stop <- true
s.waitgroup.Wait()
err := s.dbConnPool.Close() err := s.dbConnPool.Close()
if err != nil { if err != nil {
return err return err
} }
close(s.pubChan)
return s.listener.Close() return s.listener.Close()
} }
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !s.serving {
return
}
s.waitgroup.Add(1)
defer s.waitgroup.Done()
defer atomic.AddInt64(&s.deltaRequests, 1) defer atomic.AddInt64(&s.deltaRequests, 1)
switch r.URL.Path { switch r.URL.Path {