Merge pull request #376 from jzelinskie/register-mw

Add registrable middleware
This commit is contained in:
Jimmy Zelinskie 2018-01-01 18:34:20 -05:00 committed by GitHub
commit a5b15d69ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 380 additions and 248 deletions

View file

@ -10,32 +10,17 @@ import (
"github.com/chihaya/chihaya/frontend/http"
"github.com/chihaya/chihaya/frontend/udp"
"github.com/chihaya/chihaya/middleware"
"github.com/chihaya/chihaya/middleware/clientapproval"
"github.com/chihaya/chihaya/middleware/jwt"
"github.com/chihaya/chihaya/middleware/varinterval"
// Imported to register as Storage Drivers.
// Imported to register as middleware drivers.
_ "github.com/chihaya/chihaya/middleware/clientapproval"
_ "github.com/chihaya/chihaya/middleware/jwt"
_ "github.com/chihaya/chihaya/middleware/varinterval"
// Imported to register as storage drivers.
_ "github.com/chihaya/chihaya/storage/memory"
_ "github.com/chihaya/chihaya/storage/memorybysubnet"
)
type hookConfig struct {
Name string `yaml:"name"`
Config interface{} `yaml:"config"`
}
type hookConfigs []hookConfig
// Names returns all hook names listed in the configuration.
func (hookCfgs hookConfigs) Names() (hookNames []string) {
hookNames = make([]string, len(hookCfgs))
for index, hookCfg := range hookCfgs {
hookNames[index] = hookCfg.Name
}
return
}
type storageConfig struct {
Name string `yaml:"name"`
Config interface{} `yaml:"config"`
@ -43,64 +28,28 @@ type storageConfig struct {
// Config represents the configuration used for executing Chihaya.
type Config struct {
middleware.Config `yaml:",inline"`
PrometheusAddr string `yaml:"prometheus_addr"`
HTTPConfig http.Config `yaml:"http"`
UDPConfig udp.Config `yaml:"udp"`
Storage storageConfig `yaml:"storage"`
PreHooks hookConfigs `yaml:"prehooks"`
PostHooks hookConfigs `yaml:"posthooks"`
middleware.ResponseConfig `yaml:",inline"`
PrometheusAddr string `yaml:"prometheus_addr"`
HTTPConfig http.Config `yaml:"http"`
UDPConfig udp.Config `yaml:"udp"`
Storage storageConfig `yaml:"storage"`
PreHooks []middleware.HookConfig `yaml:"prehooks"`
PostHooks []middleware.HookConfig `yaml:"posthooks"`
}
// CreateHooks creates instances of Hooks for all of the PreHooks and PostHooks
// configured in a Config.
func (cfg Config) CreateHooks() (preHooks, postHooks []middleware.Hook, err error) {
for _, hookCfg := range cfg.PreHooks {
cfgBytes, err := yaml.Marshal(hookCfg.Config)
if err != nil {
panic("failed to remarshal valid YAML")
}
switch hookCfg.Name {
case "jwt":
var jwtCfg jwt.Config
err := yaml.Unmarshal(cfgBytes, &jwtCfg)
if err != nil {
return nil, nil, errors.New("invalid JWT middleware config: " + err.Error())
}
hook, err := jwt.NewHook(jwtCfg)
if err != nil {
return nil, nil, errors.New("invalid JWT middleware config: " + err.Error())
}
preHooks = append(preHooks, hook)
case "client approval":
var caCfg clientapproval.Config
err := yaml.Unmarshal(cfgBytes, &caCfg)
if err != nil {
return nil, nil, errors.New("invalid client approval middleware config: " + err.Error())
}
hook, err := clientapproval.NewHook(caCfg)
if err != nil {
return nil, nil, errors.New("invalid client approval middleware config: " + err.Error())
}
preHooks = append(preHooks, hook)
case "interval variation":
var viCfg varinterval.Config
err := yaml.Unmarshal(cfgBytes, &viCfg)
if err != nil {
return nil, nil, errors.New("invalid interval variation middleware config: " + err.Error())
}
hook, err := varinterval.New(viCfg)
if err != nil {
return nil, nil, errors.New("invalid interval variation middleware config: " + err.Error())
}
preHooks = append(preHooks, hook)
}
// PreHookNames returns only the names of the configured middleware.
func (cfg Config) PreHookNames() (names []string) {
for _, hook := range cfg.PreHooks {
names = append(names, hook.Name)
}
for _, hookCfg := range cfg.PostHooks {
switch hookCfg.Name {
}
return
}
// PostHookNames returns only the names of the configured middleware.
func (cfg Config) PostHookNames() (names []string) {
for _, hook := range cfg.PostHooks {
names = append(names, hook.Name)
}
return

View file

@ -63,15 +63,20 @@ func (r *Run) Start(ps storage.PeerStore) error {
}
r.peerStore = ps
preHooks, postHooks, err := cfg.CreateHooks()
preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks)
if err != nil {
return errors.New("failed to validate hook config: " + err.Error())
}
log.Info("starting middleware", log.Fields{
"preHooks": cfg.PreHooks.Names(),
"postHooks": cfg.PostHooks.Names(),
postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks)
if err != nil {
return errors.New("failed to validate hook config: " + err.Error())
}
log.Info("starting tracker logic", log.Fields{
"prehooks": cfg.PreHookNames(),
"posthooks": cfg.PostHookNames(),
})
r.logic = middleware.NewLogic(cfg.Config, r.peerStore, preHooks, postHooks)
r.logic = middleware.NewLogic(cfg.ResponseConfig, r.peerStore, preHooks, postHooks)
if cfg.HTTPConfig.Addr != "" {
log.Info("starting HTTP frontend", cfg.HTTPConfig)
@ -167,59 +172,69 @@ func RunCmdFunc(cmd *cobra.Command, args []string) error {
}
}
// PreRunCmdFunc handles command line flags for the Run command.
func PreRunCmdFunc(cmd *cobra.Command, args []string) error {
noColors, err := cmd.Flags().GetBool("nocolors")
if err != nil {
return err
}
if noColors {
log.SetFormatter(&logrus.TextFormatter{DisableColors: true})
}
jsonLog, err := cmd.Flags().GetBool("json")
if err != nil {
return err
}
if jsonLog {
log.SetFormatter(&logrus.JSONFormatter{})
log.Info("enabled JSON logging")
}
debugLog, err := cmd.Flags().GetBool("debug")
if err != nil {
return err
}
if debugLog {
log.SetDebug(true)
log.Info("enabled debug logging")
}
cpuProfilePath, err := cmd.Flags().GetString("cpuprofile")
if err != nil {
return err
}
if cpuProfilePath != "" {
f, err := os.Create(cpuProfilePath)
if err != nil {
return err
}
pprof.StartCPUProfile(f)
log.Info("enabled CPU profiling", log.Fields{"path": cpuProfilePath})
}
return nil
}
// PostRunCmdFunc handles clean up of any state initialized by command line
// flags.
func PostRunCmdFunc(cmd *cobra.Command, args []string) error {
// This can be called regardless because it noops when not profiling.
pprof.StopCPUProfile()
return nil
}
func main() {
var rootCmd = &cobra.Command{
Use: "chihaya",
Short: "BitTorrent Tracker",
Long: "A customizable, multi-protocol BitTorrent Tracker",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
noColors, err := cmd.Flags().GetBool("nocolors")
if err != nil {
return err
}
if noColors {
log.SetFormatter(&logrus.TextFormatter{DisableColors: true})
}
jsonLog, err := cmd.Flags().GetBool("json")
if err != nil {
return err
}
if jsonLog {
log.SetFormatter(&logrus.JSONFormatter{})
}
debugLog, err := cmd.Flags().GetBool("debug")
if err != nil {
return err
}
if debugLog {
log.Info("enabling debug logging")
log.SetDebug(true)
}
cpuProfilePath, err := cmd.Flags().GetString("cpuprofile")
if err != nil {
return err
}
if cpuProfilePath != "" {
log.Info("enabling CPU profiling", log.Fields{"path": cpuProfilePath})
f, err := os.Create(cpuProfilePath)
if err != nil {
return err
}
pprof.StartCPUProfile(f)
}
return nil
},
RunE: RunCmdFunc,
PersistentPostRunE: func(cmd *cobra.Command, args []string) error {
// StopCPUProfile() noops when not profiling.
pprof.StopCPUProfile()
return nil
},
Use: "chihaya",
Short: "BitTorrent Tracker",
Long: "A customizable, multi-protocol BitTorrent Tracker",
PersistentPreRunE: PreRunCmdFunc,
RunE: RunCmdFunc,
PersistentPostRunE: PostRunCmdFunc,
}
rootCmd.Flags().String("config", "/etc/chihaya.yaml", "location of configuration file")
rootCmd.Flags().String("cpuprofile", "", "location to save a CPU profile")
rootCmd.Flags().Bool("debug", false, "enable debug logging")

View file

@ -121,21 +121,21 @@ chihaya:
# response has been returned to a BitTorrent client.
prehooks:
#- name: jwt
# config:
# options:
# issuer: "https://issuer.com"
# audience: "https://chihaya.issuer.com"
# jwk_set_url: "https://issuer.com/keys"
# jwk_set_update_interval: 5m
#- name: client approval
# config:
# options:
# whitelist:
# - "OP1011"
# blacklist:
# - "OP1012"
#- name: interval variation
# config:
# options:
# modify_response_probability: 0.2
# max_increase_delta: 60
# modify_min_interval: true

View file

@ -5,11 +5,35 @@ package clientapproval
import (
"context"
"errors"
"fmt"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware"
)
// Name is the name by which this middleware is registered with Chihaya.
const Name = "client approval"
func init() {
middleware.RegisterDriver(Name, driver{})
}
var _ middleware.Driver = driver{}
type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) {
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
}
return NewHook(cfg)
}
// ErrClientUnapproved is the error returned when a client's PeerID is invalid.
var ErrClientUnapproved = bittorrent.ClientError("unapproved client")

View file

@ -12,6 +12,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
@ -20,6 +21,7 @@ import (
"github.com/SermoDigital/jose/jws"
"github.com/SermoDigital/jose/jwt"
"github.com/mendsley/gojwk"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware"
@ -27,6 +29,27 @@ import (
"github.com/chihaya/chihaya/pkg/stop"
)
// Name is the name by which this middleware is registered with Chihaya.
const Name = "jwt"
func init() {
middleware.RegisterDriver(Name, driver{})
}
var _ middleware.Driver = driver{}
type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) {
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
}
return NewHook(cfg)
}
var (
// ErrMissingJWT is returned when a JWT is missing from a request.
ErrMissingJWT = bittorrent.ClientError("unapproved request: missing jwt")

125
middleware/logic.go Normal file
View file

@ -0,0 +1,125 @@
package middleware
import (
"context"
"time"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/frontend"
"github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/storage"
)
// ResponseConfig holds the configuration used for the actual response.
//
// TODO(jzelinskie): Evaluate whether we would like to make this optional.
// We can make Chihaya extensible enough that you can program a new response
// generator at the cost of making it possible for users to create config that
// won't compose a functional tracker.
type ResponseConfig struct {
AnnounceInterval time.Duration `yaml:"announce_interval"`
MinAnnounceInterval time.Duration `yaml:"min_announce_interval"`
}
var _ frontend.TrackerLogic = &Logic{}
// NewLogic creates a new instance of a TrackerLogic that executes the provided
// middleware hooks.
func NewLogic(cfg ResponseConfig, peerStore storage.PeerStore, preHooks, postHooks []Hook) *Logic {
return &Logic{
announceInterval: cfg.AnnounceInterval,
minAnnounceInterval: cfg.MinAnnounceInterval,
peerStore: peerStore,
preHooks: append(preHooks, &responseHook{store: peerStore}),
postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}),
}
}
// Logic is an implementation of the TrackerLogic that functions by
// executing a series of middleware hooks.
type Logic struct {
announceInterval time.Duration
minAnnounceInterval time.Duration
peerStore storage.PeerStore
preHooks []Hook
postHooks []Hook
}
// HandleAnnounce generates a response for an Announce.
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) {
resp = &bittorrent.AnnounceResponse{
Interval: l.announceInterval,
MinInterval: l.minAnnounceInterval,
Compact: req.Compact,
}
for _, h := range l.preHooks {
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
return nil, nil, err
}
}
log.Debug("generated announce response", resp)
return ctx, resp, nil
}
// AfterAnnounce does something with the results of an Announce after it has
// been completed.
func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) {
var err error
for _, h := range l.postHooks {
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
log.Error("post-announce hooks failed", log.Err(err))
return
}
}
}
// HandleScrape generates a response for a Scrape.
func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) {
resp = &bittorrent.ScrapeResponse{
Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)),
}
for _, h := range l.preHooks {
if ctx, err = h.HandleScrape(ctx, req, resp); err != nil {
return nil, nil, err
}
}
log.Debug("generated scrape response", resp)
return ctx, resp, nil
}
// AfterScrape does something with the results of a Scrape after it has been
// completed.
func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) {
var err error
for _, h := range l.postHooks {
if ctx, err = h.HandleScrape(ctx, req, resp); err != nil {
log.Error("post-scrape hooks failed", log.Err(err))
return
}
}
}
// Stop stops the Logic.
//
// This stops any hooks that implement stop.Stopper.
func (l *Logic) Stop() []error {
stopGroup := stop.NewGroup()
for _, hook := range l.preHooks {
stoppable, ok := hook.(stop.Stopper)
if ok {
stopGroup.Add(stoppable)
}
}
for _, hook := range l.postHooks {
stoppable, ok := hook.(stop.Stopper)
if ok {
stopGroup.Add(stoppable)
}
}
return stopGroup.Stop()
}

View file

@ -3,120 +3,92 @@
package middleware
import (
"context"
"time"
"errors"
"sync"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/frontend"
"github.com/chihaya/chihaya/pkg/log"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/storage"
"gopkg.in/yaml.v2"
)
// Config holds the configuration common across all middleware.
type Config struct {
AnnounceInterval time.Duration `yaml:"announce_interval"`
MinAnnounceInterval time.Duration `yaml:"min_announce_interval"`
}
var (
driversM sync.RWMutex
drivers = make(map[string]Driver)
var _ frontend.TrackerLogic = &Logic{}
// ErrDriverDoesNotExist is the error returned by NewMiddleware when a
// middleware driver with that name does not exist.
ErrDriverDoesNotExist = errors.New("middleware driver with that name does not exist")
)
// NewLogic creates a new instance of a TrackerLogic that executes the provided
// middleware hooks.
func NewLogic(cfg Config, peerStore storage.PeerStore, preHooks, postHooks []Hook) *Logic {
return &Logic{
announceInterval: cfg.AnnounceInterval,
minAnnounceInterval: cfg.MinAnnounceInterval,
peerStore: peerStore,
preHooks: append(preHooks, &responseHook{store: peerStore}),
postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}),
}
}
// Logic is an implementation of the TrackerLogic that functions by
// executing a series of middleware hooks.
type Logic struct {
announceInterval time.Duration
minAnnounceInterval time.Duration
peerStore storage.PeerStore
preHooks []Hook
postHooks []Hook
}
// HandleAnnounce generates a response for an Announce.
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) {
resp = &bittorrent.AnnounceResponse{
Interval: l.announceInterval,
MinInterval: l.minAnnounceInterval,
Compact: req.Compact,
}
for _, h := range l.preHooks {
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
return nil, nil, err
}
}
log.Debug("generated announce response", resp)
return ctx, resp, nil
}
// AfterAnnounce does something with the results of an Announce after it has
// been completed.
func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) {
var err error
for _, h := range l.postHooks {
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
log.Error("post-announce hooks failed", log.Err(err))
return
}
}
}
// HandleScrape generates a response for a Scrape.
func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) {
resp = &bittorrent.ScrapeResponse{
Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)),
}
for _, h := range l.preHooks {
if ctx, err = h.HandleScrape(ctx, req, resp); err != nil {
return nil, nil, err
}
}
log.Debug("generated scrape response", resp)
return ctx, resp, nil
}
// AfterScrape does something with the results of a Scrape after it has been
// completed.
func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) {
var err error
for _, h := range l.postHooks {
if ctx, err = h.HandleScrape(ctx, req, resp); err != nil {
log.Error("post-scrape hooks failed", log.Err(err))
return
}
}
}
// Stop stops the Logic.
// Driver is the interface used to initialize a new type of middleware.
//
// This stops any hooks that implement stop.stop.
func (l *Logic) Stop() []error {
stopGroup := stop.NewGroup()
for _, hook := range l.preHooks {
stoppable, ok := hook.(stop.Stopper)
if ok {
stopGroup.Add(stoppable)
}
}
for _, hook := range l.postHooks {
stoppable, ok := hook.(stop.Stopper)
if ok {
stopGroup.Add(stoppable)
}
}
return stopGroup.Stop()
// The options parameter is YAML encoded bytes that should be unmarshalled into
// the hook's custom configuration.
type Driver interface {
NewHook(options []byte) (Hook, error)
}
// RegisterDriver makes a Driver available by the provided name.
//
// If called twice with the same name, the name is blank, or if the provided
// Driver is nil, this function panics.
func RegisterDriver(name string, d Driver) {
if name == "" {
panic("middleware: could not register a Driver with an empty name")
}
if d == nil {
panic("middleware: could not register a nil Driver")
}
driversM.Lock()
defer driversM.Unlock()
if _, dup := drivers[name]; dup {
panic("middleware: RegisterDriver called twice for " + name)
}
drivers[name] = d
}
// New attempts to initialize a new middleware instance from the
// list of registered Drivers.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.
func New(name string, optionBytes []byte) (Hook, error) {
driversM.RLock()
defer driversM.RUnlock()
var d Driver
d, ok := drivers[name]
if !ok {
return nil, ErrDriverDoesNotExist
}
return d.NewHook(optionBytes)
}
// HookConfig is the generic configuration format used for all registered Hooks.
type HookConfig struct {
Name string `yaml:"name"`
Options map[string]interface{} `yaml:"options"`
}
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
func HooksFromHookConfigs(cfgs []HookConfig) (hooks []Hook, err error) {
for _, cfg := range cfgs {
// Marshal the options back into bytes.
var optionBytes []byte
optionBytes, err = yaml.Marshal(cfg.Options)
if err != nil {
return
}
var h Hook
h, err = New(cfg.Name, optionBytes)
if err != nil {
return
}
hooks = append(hooks, h)
}
return
}

View file

@ -3,14 +3,38 @@ package varinterval
import (
"context"
"errors"
"fmt"
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/middleware"
"github.com/chihaya/chihaya/middleware/pkg/random"
)
// Name is the name by which this middleware is registered with Chihaya.
const Name = "interval variation"
func init() {
middleware.RegisterDriver(Name, driver{})
}
var _ middleware.Driver = driver{}
type driver struct{}
func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) {
var cfg Config
err := yaml.Unmarshal(optionBytes, &cfg)
if err != nil {
return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err)
}
return NewHook(cfg)
}
// ErrInvalidModifyResponseProbability is returned for a config with an invalid
// ModifyResponseProbability.
var ErrInvalidModifyResponseProbability = errors.New("invalid modify_response_probability")
@ -50,9 +74,9 @@ type hook struct {
sync.Mutex
}
// New creates a middleware to randomly modify the announce interval from the
// given config.
func New(cfg Config) (middleware.Hook, error) {
// NewHook creates a middleware to randomly modify the announce interval from
// the given config.
func NewHook(cfg Config) (middleware.Hook, error) {
err := checkConfig(cfg)
if err != nil {
return nil, err

View file

@ -43,7 +43,7 @@ func TestCheckConfig(t *testing.T) {
}
func TestHandleAnnounce(t *testing.T) {
h, err := New(Config{1.0, 10, true})
h, err := NewHook(Config{1.0, 10, true})
require.Nil(t, err)
require.NotNil(t, h)

View file

@ -136,7 +136,7 @@ func RegisterDriver(name string, d Driver) {
drivers[name] = d
}
// NewPeerStore attempts to initialize a new PeerStore with given a name from
// NewPeerStore attempts to initialize a new PeerStore instance from
// the list of registered Drivers.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.