Merge pull request #209 from jzelinskie/logrus

misc fixes for #207
This commit is contained in:
mrd0ll4r 2016-09-05 13:40:52 -04:00 committed by GitHub
commit f6e362e506
8 changed files with 42 additions and 44 deletions

View file

@ -130,9 +130,3 @@ type ClientError string
// Error implements the error interface for ClientError. // Error implements the error interface for ClientError.
func (c ClientError) Error() string { return string(c) } func (c ClientError) Error() string { return string(c) }
// Tracker represents an implementation of the BitTorrent tracker protocol.
type Tracker interface {
ListenAndServe() error
Stop()
}

View file

@ -2,13 +2,13 @@ package main
import ( import (
"errors" "errors"
"log"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"runtime/pprof" "runtime/pprof"
"syscall" "syscall"
log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -19,9 +19,14 @@ import (
) )
func rootCmdRun(cmd *cobra.Command, args []string) error { func rootCmdRun(cmd *cobra.Command, args []string) error {
debugLog, _ := cmd.Flags().GetBool("debug")
if debugLog {
log.SetLevel(log.DebugLevel)
log.Debugln("debug logging enabled")
}
cpuProfilePath, _ := cmd.Flags().GetString("cpuprofile") cpuProfilePath, _ := cmd.Flags().GetString("cpuprofile")
if cpuProfilePath != "" { if cpuProfilePath != "" {
log.Println("enabled CPU profiling to " + cpuProfilePath) log.Infoln("enabled CPU profiling to", cpuProfilePath)
f, err := os.Create(cpuProfilePath) f, err := os.Create(cpuProfilePath)
if err != nil { if err != nil {
return err return err
@ -42,26 +47,26 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
Addr: cfg.PrometheusAddr, Addr: cfg.PrometheusAddr,
Handler: prometheus.Handler(), Handler: prometheus.Handler(),
} }
log.Println("started serving prometheus stats on", cfg.PrometheusAddr) log.Infoln("started serving prometheus stats on", cfg.PrometheusAddr)
if err := promServer.ListenAndServe(); err != nil { if err := promServer.ListenAndServe(); err != nil {
log.Fatal(err) log.Fatalln("failed to start prometheus server:", err.Error())
} }
}() }()
// Force the compiler to enforce memory against the storage interface. // Force the compiler to enforce memory against the storage interface.
peerStore, err := memory.New(cfg.Storage) peerStore, err := memory.New(cfg.Storage)
if err != nil { if err != nil {
return err return errors.New("failed to create memory storage: " + err.Error())
} }
preHooks, postHooks, err := configFile.CreateHooks() preHooks, postHooks, err := configFile.CreateHooks()
if err != nil { if err != nil {
return err return errors.New("failed to create hooks: " + err.Error())
} }
logic := middleware.NewLogic(cfg.Config, peerStore, preHooks, postHooks) logic := middleware.NewLogic(cfg.Config, peerStore, preHooks, postHooks)
if err != nil { if err != nil {
return err return errors.New("failed to create TrackerLogic: " + err.Error())
} }
shutdown := make(chan struct{}) shutdown := make(chan struct{})
@ -74,7 +79,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
httpFrontend = httpfrontend.NewFrontend(logic, cfg.HTTPConfig) httpFrontend = httpfrontend.NewFrontend(logic, cfg.HTTPConfig)
go func() { go func() {
log.Println("started serving HTTP on", cfg.HTTPConfig.Addr) log.Infoln("started serving HTTP on", cfg.HTTPConfig.Addr)
if err := httpFrontend.ListenAndServe(); err != nil { if err := httpFrontend.ListenAndServe(); err != nil {
errChan <- errors.New("failed to cleanly shutdown HTTP frontend: " + err.Error()) errChan <- errors.New("failed to cleanly shutdown HTTP frontend: " + err.Error())
} }
@ -85,7 +90,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
udpFrontend = udpfrontend.NewFrontend(logic, cfg.UDPConfig) udpFrontend = udpfrontend.NewFrontend(logic, cfg.UDPConfig)
go func() { go func() {
log.Println("started serving UDP on", cfg.UDPConfig.Addr) log.Infoln("started serving UDP on", cfg.UDPConfig.Addr)
if err := udpFrontend.ListenAndServe(); err != nil { if err := udpFrontend.ListenAndServe(); err != nil {
errChan <- errors.New("failed to cleanly shutdown UDP frontend: " + err.Error()) errChan <- errors.New("failed to cleanly shutdown UDP frontend: " + err.Error())
} }
@ -114,6 +119,8 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
} }
} }
// TODO(jzelinskie): stop hooks here
close(errChan) close(errChan)
}() }()
@ -125,7 +132,7 @@ func rootCmdRun(cmd *cobra.Command, args []string) error {
close(shutdown) close(shutdown)
closed = true closed = true
} else { } else {
log.Println(bufErr) log.Infoln(bufErr)
} }
bufErr = err bufErr = err
} }
@ -147,6 +154,7 @@ func main() {
} }
rootCmd.Flags().String("config", "/etc/chihaya.yaml", "location of configuration file") rootCmd.Flags().String("config", "/etc/chihaya.yaml", "location of configuration file")
rootCmd.Flags().String("cpuprofile", "", "location to save a CPU profile") rootCmd.Flags().String("cpuprofile", "", "location to save a CPU profile")
rootCmd.Flags().Bool("debug", false, "enable debug logging")
if err := rootCmd.Execute(); err != nil { if err := rootCmd.Execute(); err != nil {
log.Fatal(err) log.Fatal(err)

View file

@ -136,7 +136,7 @@ func (t *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, _ httpr
return return
} }
resp, err := t.logic.HandleAnnounce(context.TODO(), req) resp, err := t.logic.HandleAnnounce(context.Background(), req)
if err != nil { if err != nil {
WriteError(w, err) WriteError(w, err)
return return
@ -148,7 +148,7 @@ func (t *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, _ httpr
return return
} }
go t.logic.AfterAnnounce(context.TODO(), req, resp) go t.logic.AfterAnnounce(context.Background(), req, resp)
} }
// scrapeRoute parses and responds to a Scrape by using t.TrackerLogic. // scrapeRoute parses and responds to a Scrape by using t.TrackerLogic.
@ -163,7 +163,7 @@ func (t *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprou
return return
} }
resp, err := t.logic.HandleScrape(context.TODO(), req) resp, err := t.logic.HandleScrape(context.Background(), req)
if err != nil { if err != nil {
WriteError(w, err) WriteError(w, err)
return return
@ -175,5 +175,5 @@ func (t *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprou
return return
} }
go t.logic.AfterScrape(context.TODO(), req, resp) go t.logic.AfterScrape(context.Background(), req, resp)
} }

View file

@ -7,11 +7,11 @@ type BytePool struct {
sync.Pool sync.Pool
} }
// New allocates a new BytePool with slices of the provided capacity. // New allocates a new BytePool with slices of equal length and capacity.
func New(length, capacity int) *BytePool { func New(length int) *BytePool {
var bp BytePool var bp BytePool
bp.Pool.New = func() interface{} { bp.Pool.New = func() interface{} {
return make([]byte, length, capacity) return make([]byte, length, length)
} }
return &bp return &bp
} }

View file

@ -93,7 +93,7 @@ func (t *Frontend) ListenAndServe() error {
} }
defer t.socket.Close() defer t.socket.Close()
pool := bytepool.New(2048, 2048) pool := bytepool.New(2048)
for { for {
// Check to see if we need to shutdown. // Check to see if we need to shutdown.
@ -190,7 +190,6 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
} }
WriteConnectionID(w, txID, NewConnectionID(r.IP, time.Now(), t.PrivateKey)) WriteConnectionID(w, txID, NewConnectionID(r.IP, time.Now(), t.PrivateKey))
return
case announceActionID, announceV6ActionID: case announceActionID, announceV6ActionID:
actionName = "announce" actionName = "announce"
@ -203,7 +202,7 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
} }
var resp *bittorrent.AnnounceResponse var resp *bittorrent.AnnounceResponse
resp, err = t.logic.HandleAnnounce(context.TODO(), req) resp, err = t.logic.HandleAnnounce(context.Background(), req)
if err != nil { if err != nil {
WriteError(w, txID, err) WriteError(w, txID, err)
return return
@ -211,9 +210,7 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
WriteAnnounce(w, txID, resp, actionID == announceV6ActionID) WriteAnnounce(w, txID, resp, actionID == announceV6ActionID)
go t.logic.AfterAnnounce(context.TODO(), req, resp) go t.logic.AfterAnnounce(context.Background(), req, resp)
return
case scrapeActionID: case scrapeActionID:
actionName = "scrape" actionName = "scrape"
@ -226,7 +223,7 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
} }
var resp *bittorrent.ScrapeResponse var resp *bittorrent.ScrapeResponse
resp, err = t.logic.HandleScrape(context.TODO(), req) resp, err = t.logic.HandleScrape(context.Background(), req)
if err != nil { if err != nil {
WriteError(w, txID, err) WriteError(w, txID, err)
return return
@ -234,13 +231,12 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
WriteScrape(w, txID, resp) WriteScrape(w, txID, resp)
go t.logic.AfterScrape(context.TODO(), req, resp) go t.logic.AfterScrape(context.Background(), req, resp)
return
default: default:
err = errUnknownAction err = errUnknownAction
WriteError(w, txID, err) WriteError(w, txID, err)
}
return return
} }
}

View file

@ -11,7 +11,6 @@ import (
"crypto" "crypto"
"encoding/json" "encoding/json"
"errors" "errors"
"log"
"net/http" "net/http"
"net/url" "net/url"
"time" "time"
@ -19,6 +18,7 @@ import (
jc "github.com/SermoDigital/jose/crypto" jc "github.com/SermoDigital/jose/crypto"
"github.com/SermoDigital/jose/jws" "github.com/SermoDigital/jose/jws"
"github.com/SermoDigital/jose/jwt" "github.com/SermoDigital/jose/jwt"
log "github.com/Sirupsen/logrus"
"github.com/mendsley/gojwk" "github.com/mendsley/gojwk"
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
@ -64,7 +64,7 @@ func NewHook(cfg Config) middleware.Hook {
case <-time.After(cfg.JWKUpdateInterval): case <-time.After(cfg.JWKUpdateInterval):
resp, err := http.Get(cfg.JWKSetURL) resp, err := http.Get(cfg.JWKSetURL)
if err != nil { if err != nil {
log.Println("failed to fetch JWK Set: " + err.Error()) log.Errorln("failed to fetch JWK Set: " + err.Error())
continue continue
} }
@ -72,7 +72,7 @@ func NewHook(cfg Config) middleware.Hook {
err = json.NewDecoder(resp.Body).Decode(&parsedJWKs) err = json.NewDecoder(resp.Body).Decode(&parsedJWKs)
if err != nil { if err != nil {
resp.Body.Close() resp.Body.Close()
log.Println("failed to decode JWK JSON: " + err.Error()) log.Errorln("failed to decode JWK JSON: " + err.Error())
continue continue
} }
resp.Body.Close() resp.Body.Close()
@ -81,7 +81,7 @@ func NewHook(cfg Config) middleware.Hook {
for kid, parsedJWK := range parsedJWKs { for kid, parsedJWK := range parsedJWKs {
publicKey, err := parsedJWK.DecodePublicKey() publicKey, err := parsedJWK.DecodePublicKey()
if err != nil { if err != nil {
log.Println("failed to decode JWK into public key: " + err.Error()) log.Errorln("failed to decode JWK into public key: " + err.Error())
continue continue
} }
keys[kid] = publicKey keys[kid] = publicKey

View file

@ -4,9 +4,9 @@ package middleware
import ( import (
"context" "context"
"log"
"time" "time"
log "github.com/Sirupsen/logrus"
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/frontend" "github.com/chihaya/chihaya/frontend"
"github.com/chihaya/chihaya/storage" "github.com/chihaya/chihaya/storage"
@ -65,7 +65,7 @@ func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequ
func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) { func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) {
for _, h := range l.postHooks { for _, h := range l.postHooks {
if err := h.HandleAnnounce(ctx, req, resp); err != nil { if err := h.HandleAnnounce(ctx, req, resp); err != nil {
log.Println("chihaya: post-announce hooks failed:", err.Error()) log.Errorln("chihaya: post-announce hooks failed:", err.Error())
return return
} }
} }
@ -90,7 +90,7 @@ func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest)
func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) { func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) {
for _, h := range l.postHooks { for _, h := range l.postHooks {
if err := h.HandleScrape(ctx, req, resp); err != nil { if err := h.HandleScrape(ctx, req, resp); err != nil {
log.Println("chihaya: post-scrape hooks failed:", err.Error()) log.Errorln("chihaya: post-scrape hooks failed:", err.Error())
return return
} }
} }

View file

@ -3,12 +3,13 @@ package memory
import ( import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"log"
"net" "net"
"runtime" "runtime"
"sync" "sync"
"time" "time"
log "github.com/Sirupsen/logrus"
"github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/storage" "github.com/chihaya/chihaya/storage"
) )
@ -53,7 +54,7 @@ func New(cfg Config) (storage.PeerStore, error) {
return return
case <-time.After(cfg.GarbageCollectionInterval): case <-time.After(cfg.GarbageCollectionInterval):
before := time.Now().Add(-cfg.PeerLifetime) before := time.Now().Add(-cfg.PeerLifetime)
log.Println("memory: purging peers with no announces since ", before) log.Debugln("memory: purging peers with no announces since", before)
ps.collectGarbage(before) ps.collectGarbage(before)
} }
} }
@ -327,7 +328,6 @@ func (s *peerStore) collectGarbage(cutoff time.Time) error {
default: default:
} }
log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String())
cutoffUnix := cutoff.UnixNano() cutoffUnix := cutoff.UnixNano()
for _, shard := range s.shards { for _, shard := range s.shards {
shard.RLock() shard.RLock()