tracker: Add common interface to start and stop servers

This commit is contained in:
Justin Li 2015-02-20 13:39:19 -05:00
parent 0d33210901
commit 92f3c62456
4 changed files with 106 additions and 35 deletions

View file

@ -7,9 +7,11 @@ package chihaya
import (
"flag"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"sync"
"syscall"
"github.com/golang/glog"
@ -80,28 +82,49 @@ func Boot() {
}
var wg sync.WaitGroup
var servers []tracker.Server
if cfg.HTTPListenAddr != "" {
wg.Add(1)
srv := http.NewServer(cfg, tkr)
servers = append(servers, srv)
go func() {
defer wg.Done()
http.Serve(cfg, tkr)
srv.Serve()
}()
}
if cfg.UDPListenAddr != "" {
wg.Add(1)
srv := udp.NewServer(cfg, tkr)
servers = append(servers, srv)
go func() {
defer wg.Done()
udp.Serve(cfg, tkr)
srv.Serve()
}()
}
wg.Wait()
shutdown := make(chan os.Signal)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
go func() {
wg.Wait()
signal.Stop(shutdown)
close(shutdown)
}()
<-shutdown
glog.Info("Shutting down...")
for _, srv := range servers {
srv.Stop()
}
<-shutdown
if err := tkr.Close(); err != nil {
glog.Errorf("Failed to shut down tracker cleanly: %s", err.Error())
}
glog.Info("Gracefully shut down")
}

View file

@ -24,8 +24,10 @@ type ResponseHandler func(http.ResponseWriter, *http.Request, httprouter.Params)
// Server represents an HTTP serving torrent tracker.
type Server struct {
config *config.Config
tracker *tracker.Tracker
config *config.Config
tracker *tracker.Tracker
grace *graceful.Server
stopping bool
}
// makeHandler wraps our ResponseHandlers while timing requests, collecting,
@ -120,34 +122,50 @@ func (s *Server) connState(conn net.Conn, state http.ConnState) {
// Serve creates a new Server and proceeds to block while handling requests
// until a graceful shutdown.
func Serve(cfg *config.Config, tkr *tracker.Tracker) {
srv := &Server{
config: cfg,
tracker: tkr,
}
func (s *Server) Serve() {
glog.V(0).Info("Starting HTTP on ", s.config.HTTPListenAddr)
glog.V(0).Info("Starting HTTP on ", cfg.HTTPListenAddr)
if cfg.HTTPListenLimit != 0 {
glog.V(0).Info("Limiting connections to ", cfg.HTTPListenLimit)
if s.config.HTTPListenLimit != 0 {
glog.V(0).Info("Limiting connections to ", s.config.HTTPListenLimit)
}
grace := &graceful.Server{
Timeout: cfg.HTTPRequestTimeout.Duration,
ConnState: srv.connState,
ListenLimit: cfg.HTTPListenLimit,
Timeout: s.config.HTTPRequestTimeout.Duration,
ConnState: s.connState,
ListenLimit: s.config.HTTPListenLimit,
Server: &http.Server{
Addr: cfg.HTTPListenAddr,
Handler: newRouter(srv),
ReadTimeout: cfg.HTTPReadTimeout.Duration,
WriteTimeout: cfg.HTTPWriteTimeout.Duration,
Addr: s.config.HTTPListenAddr,
Handler: newRouter(s),
ReadTimeout: s.config.HTTPReadTimeout.Duration,
WriteTimeout: s.config.HTTPWriteTimeout.Duration,
},
}
s.grace = grace
grace.SetKeepAlivesEnabled(false)
grace.ShutdownInitiated = func() { s.stopping = true }
if err := grace.ListenAndServe(); err != nil {
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
glog.Errorf("Failed to gracefully run HTTP server: %s", err.Error())
return
}
}
glog.Info("HTTP server shut down cleanly")
}
// Stop cleanly shuts down the server.
func (s *Server) Stop() {
if !s.stopping {
s.grace.Stop(s.grace.Timeout)
}
}
// NewServer returns a new HTTP server for a given configuration and tracker.
func NewServer(cfg *config.Config, tkr *tracker.Tracker) *Server {
return &Server{
config: cfg,
tracker: tkr,
}
}

View file

@ -24,6 +24,15 @@ type Tracker struct {
*Storage
}
// Server represents a server for a given BitTorrent tracker protocol.
type Server interface {
// Serve runs the server and blocks until the server has shut down.
Serve()
// Stop cleanly shuts down the server in a non-blocking manner.
Stop()
}
// New creates a new Tracker, and opens any necessary connections.
// Maintenance routines are automatically spawned in the background.
func New(cfg *config.Config) (*Tracker, error) {

View file

@ -8,6 +8,7 @@ package udp
import (
"net"
"time"
"github.com/golang/glog"
"github.com/pushrax/bufferpool"
@ -20,10 +21,12 @@ import (
type Server struct {
config *config.Config
tracker *tracker.Tracker
done bool
}
func (srv *Server) ListenAndServe() error {
listenAddr, err := net.ResolveUDPAddr("udp", srv.config.UDPListenAddr)
func (s *Server) serve() error {
listenAddr, err := net.ResolveUDPAddr("udp", s.config.UDPListenAddr)
if err != nil {
return err
}
@ -34,37 +37,55 @@ func (srv *Server) ListenAndServe() error {
return err
}
if srv.config.UDPReadBufferSize > 0 {
sock.SetReadBuffer(srv.config.UDPReadBufferSize)
if s.config.UDPReadBufferSize > 0 {
sock.SetReadBuffer(s.config.UDPReadBufferSize)
}
pool := bufferpool.New(1000, 2048)
for {
for !s.done {
buffer := pool.TakeSlice()
sock.SetReadDeadline(time.Now().Add(time.Second))
n, addr, err := sock.ReadFromUDP(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
continue
}
return err
}
go func() {
response := srv.handlePacket(buffer[:n], addr)
response := s.handlePacket(buffer[:n], addr)
if response != nil {
sock.WriteToUDP(response, addr)
}
pool.GiveSlice(buffer)
}()
}
return nil
}
func Serve(cfg *config.Config, tkr *tracker.Tracker) {
srv := &Server{
func (s *Server) Serve() {
glog.V(0).Info("Starting UDP on ", s.config.UDPListenAddr)
if err := s.serve(); err != nil {
glog.Errorf("Failed to run UDP server: %s", err.Error())
} else {
glog.Info("UDP server shut down cleanly")
}
}
// Stop cleanly shuts down the server.
func (s *Server) Stop() {
s.done = true
}
// NewServer returns a new UDP server for a given configuration and tracker.
func NewServer(cfg *config.Config, tkr *tracker.Tracker) *Server {
return &Server{
config: cfg,
tracker: tkr,
}
glog.V(0).Info("Starting UDP on ", cfg.UDPListenAddr)
if err := srv.ListenAndServe(); err != nil {
glog.Errorf("Failed to run UDP server: %s", err.Error())
}
}