From 7dbbc863803bfdce3d1687099e923afb4eeb893c Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Sat, 23 Dec 2017 14:54:51 -0500 Subject: [PATCH] middleware: add a registration model --- cmd/chihaya/config.go | 99 +++------ cmd/chihaya/main.go | 15 +- example_config.yaml | 6 +- middleware/clientapproval/clientapproval.go | 24 +++ middleware/jwt/jwt.go | 23 +++ middleware/logic.go | 125 ++++++++++++ .../{middleware_test.go => logic_test.go} | 0 middleware/middleware.go | 192 ++++++++---------- middleware/varinterval/varinterval.go | 30 ++- middleware/varinterval/varinterval_test.go | 2 +- storage/storage.go | 2 +- 11 files changed, 320 insertions(+), 198 deletions(-) create mode 100644 middleware/logic.go rename middleware/{middleware_test.go => logic_test.go} (100%) diff --git a/cmd/chihaya/config.go b/cmd/chihaya/config.go index 5bb1d14..72664ba 100644 --- a/cmd/chihaya/config.go +++ b/cmd/chihaya/config.go @@ -10,32 +10,17 @@ import ( "github.com/chihaya/chihaya/frontend/http" "github.com/chihaya/chihaya/frontend/udp" "github.com/chihaya/chihaya/middleware" - "github.com/chihaya/chihaya/middleware/clientapproval" - "github.com/chihaya/chihaya/middleware/jwt" - "github.com/chihaya/chihaya/middleware/varinterval" - // Imported to register as Storage Drivers. + // Imported to register as middleware drivers. + _ "github.com/chihaya/chihaya/middleware/clientapproval" + _ "github.com/chihaya/chihaya/middleware/jwt" + _ "github.com/chihaya/chihaya/middleware/varinterval" + + // Imported to register as storage drivers. _ "github.com/chihaya/chihaya/storage/memory" _ "github.com/chihaya/chihaya/storage/memorybysubnet" ) -type hookConfig struct { - Name string `yaml:"name"` - Config interface{} `yaml:"config"` -} - -type hookConfigs []hookConfig - -// Names returns all hook names listed in the configuration. -func (hookCfgs hookConfigs) Names() (hookNames []string) { - hookNames = make([]string, len(hookCfgs)) - for index, hookCfg := range hookCfgs { - hookNames[index] = hookCfg.Name - } - - return -} - type storageConfig struct { Name string `yaml:"name"` Config interface{} `yaml:"config"` @@ -43,64 +28,28 @@ type storageConfig struct { // Config represents the configuration used for executing Chihaya. type Config struct { - middleware.Config `yaml:",inline"` - PrometheusAddr string `yaml:"prometheus_addr"` - HTTPConfig http.Config `yaml:"http"` - UDPConfig udp.Config `yaml:"udp"` - Storage storageConfig `yaml:"storage"` - PreHooks hookConfigs `yaml:"prehooks"` - PostHooks hookConfigs `yaml:"posthooks"` + middleware.ResponseConfig `yaml:",inline"` + PrometheusAddr string `yaml:"prometheus_addr"` + HTTPConfig http.Config `yaml:"http"` + UDPConfig udp.Config `yaml:"udp"` + Storage storageConfig `yaml:"storage"` + PreHooks []middleware.HookConfig `yaml:"prehooks"` + PostHooks []middleware.HookConfig `yaml:"posthooks"` } -// CreateHooks creates instances of Hooks for all of the PreHooks and PostHooks -// configured in a Config. -func (cfg Config) CreateHooks() (preHooks, postHooks []middleware.Hook, err error) { - for _, hookCfg := range cfg.PreHooks { - cfgBytes, err := yaml.Marshal(hookCfg.Config) - if err != nil { - panic("failed to remarshal valid YAML") - } - - switch hookCfg.Name { - case "jwt": - var jwtCfg jwt.Config - err := yaml.Unmarshal(cfgBytes, &jwtCfg) - if err != nil { - return nil, nil, errors.New("invalid JWT middleware config: " + err.Error()) - } - hook, err := jwt.NewHook(jwtCfg) - if err != nil { - return nil, nil, errors.New("invalid JWT middleware config: " + err.Error()) - } - preHooks = append(preHooks, hook) - case "client approval": - var caCfg clientapproval.Config - err := yaml.Unmarshal(cfgBytes, &caCfg) - if err != nil { - return nil, nil, errors.New("invalid client approval middleware config: " + err.Error()) - } - hook, err := clientapproval.NewHook(caCfg) - if err != nil { - return nil, nil, errors.New("invalid client approval middleware config: " + err.Error()) - } - preHooks = append(preHooks, hook) - case "interval variation": - var viCfg varinterval.Config - err := yaml.Unmarshal(cfgBytes, &viCfg) - if err != nil { - return nil, nil, errors.New("invalid interval variation middleware config: " + err.Error()) - } - hook, err := varinterval.New(viCfg) - if err != nil { - return nil, nil, errors.New("invalid interval variation middleware config: " + err.Error()) - } - preHooks = append(preHooks, hook) - } +// PreHookNames returns only the names of the configured middleware. +func (cfg Config) PreHookNames() (names []string) { + for _, hook := range cfg.PreHooks { + names = append(names, hook.Name) } - for _, hookCfg := range cfg.PostHooks { - switch hookCfg.Name { - } + return +} + +// PostHookNames returns only the names of the configured middleware. +func (cfg Config) PostHookNames() (names []string) { + for _, hook := range cfg.PostHooks { + names = append(names, hook.Name) } return diff --git a/cmd/chihaya/main.go b/cmd/chihaya/main.go index 42faf73..6958dc3 100644 --- a/cmd/chihaya/main.go +++ b/cmd/chihaya/main.go @@ -63,15 +63,20 @@ func (r *Run) Start(ps storage.PeerStore) error { } r.peerStore = ps - preHooks, postHooks, err := cfg.CreateHooks() + preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks) if err != nil { return errors.New("failed to validate hook config: " + err.Error()) } - log.Info("starting middleware", log.Fields{ - "preHooks": cfg.PreHooks.Names(), - "postHooks": cfg.PostHooks.Names(), + postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks) + if err != nil { + return errors.New("failed to validate hook config: " + err.Error()) + } + + log.Info("starting tracker logic", log.Fields{ + "prehooks": cfg.PreHookNames(), + "posthooks": cfg.PostHookNames(), }) - r.logic = middleware.NewLogic(cfg.Config, r.peerStore, preHooks, postHooks) + r.logic = middleware.NewLogic(cfg.ResponseConfig, r.peerStore, preHooks, postHooks) if cfg.HTTPConfig.Addr != "" { log.Info("starting HTTP frontend", cfg.HTTPConfig) diff --git a/example_config.yaml b/example_config.yaml index 2037adb..d5939e7 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -121,21 +121,21 @@ chihaya: # response has been returned to a BitTorrent client. prehooks: #- name: jwt - # config: + # options: # issuer: "https://issuer.com" # audience: "https://chihaya.issuer.com" # jwk_set_url: "https://issuer.com/keys" # jwk_set_update_interval: 5m #- name: client approval - # config: + # options: # whitelist: # - "OP1011" # blacklist: # - "OP1012" #- name: interval variation - # config: + # options: # modify_response_probability: 0.2 # max_increase_delta: 60 # modify_min_interval: true diff --git a/middleware/clientapproval/clientapproval.go b/middleware/clientapproval/clientapproval.go index e09cd62..aaddc7c 100644 --- a/middleware/clientapproval/clientapproval.go +++ b/middleware/clientapproval/clientapproval.go @@ -5,11 +5,35 @@ package clientapproval import ( "context" "errors" + "fmt" + + "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware" ) +// Name is the name by which this middleware is registered with Chihaya. +const Name = "client approval" + +func init() { + middleware.RegisterDriver(Name, driver{}) +} + +var _ middleware.Driver = driver{} + +type driver struct{} + +func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { + var cfg Config + err := yaml.Unmarshal(optionBytes, &cfg) + if err != nil { + return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err) + } + + return NewHook(cfg) +} + // ErrClientUnapproved is the error returned when a client's PeerID is invalid. var ErrClientUnapproved = bittorrent.ClientError("unapproved client") diff --git a/middleware/jwt/jwt.go b/middleware/jwt/jwt.go index 4ae8fe8..fddb7ef 100644 --- a/middleware/jwt/jwt.go +++ b/middleware/jwt/jwt.go @@ -12,6 +12,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "fmt" "net/http" "strings" "time" @@ -20,6 +21,7 @@ import ( "github.com/SermoDigital/jose/jws" "github.com/SermoDigital/jose/jwt" "github.com/mendsley/gojwk" + "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware" @@ -27,6 +29,27 @@ import ( "github.com/chihaya/chihaya/pkg/stop" ) +// Name is the name by which this middleware is registered with Chihaya. +const Name = "jwt" + +func init() { + middleware.RegisterDriver(Name, driver{}) +} + +var _ middleware.Driver = driver{} + +type driver struct{} + +func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { + var cfg Config + err := yaml.Unmarshal(optionBytes, &cfg) + if err != nil { + return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err) + } + + return NewHook(cfg) +} + var ( // ErrMissingJWT is returned when a JWT is missing from a request. ErrMissingJWT = bittorrent.ClientError("unapproved request: missing jwt") diff --git a/middleware/logic.go b/middleware/logic.go new file mode 100644 index 0000000..1f42088 --- /dev/null +++ b/middleware/logic.go @@ -0,0 +1,125 @@ +package middleware + +import ( + "context" + "time" + + "github.com/chihaya/chihaya/bittorrent" + "github.com/chihaya/chihaya/frontend" + "github.com/chihaya/chihaya/pkg/log" + "github.com/chihaya/chihaya/pkg/stop" + "github.com/chihaya/chihaya/storage" +) + +// ResponseConfig holds the configuration used for the actual response. +// +// TODO(jzelinskie): Evaluate whether we would like to make this optional. +// We can make Chihaya extensible enough that you can program a new response +// generator at the cost of making it possible for users to create config that +// won't compose a functional tracker. +type ResponseConfig struct { + AnnounceInterval time.Duration `yaml:"announce_interval"` + MinAnnounceInterval time.Duration `yaml:"min_announce_interval"` +} + +var _ frontend.TrackerLogic = &Logic{} + +// NewLogic creates a new instance of a TrackerLogic that executes the provided +// middleware hooks. +func NewLogic(cfg ResponseConfig, peerStore storage.PeerStore, preHooks, postHooks []Hook) *Logic { + return &Logic{ + announceInterval: cfg.AnnounceInterval, + minAnnounceInterval: cfg.MinAnnounceInterval, + peerStore: peerStore, + preHooks: append(preHooks, &responseHook{store: peerStore}), + postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}), + } +} + +// Logic is an implementation of the TrackerLogic that functions by +// executing a series of middleware hooks. +type Logic struct { + announceInterval time.Duration + minAnnounceInterval time.Duration + peerStore storage.PeerStore + preHooks []Hook + postHooks []Hook +} + +// HandleAnnounce generates a response for an Announce. +func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) { + resp = &bittorrent.AnnounceResponse{ + Interval: l.announceInterval, + MinInterval: l.minAnnounceInterval, + Compact: req.Compact, + } + for _, h := range l.preHooks { + if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil { + return nil, nil, err + } + } + + log.Debug("generated announce response", resp) + return ctx, resp, nil +} + +// AfterAnnounce does something with the results of an Announce after it has +// been completed. +func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) { + var err error + for _, h := range l.postHooks { + if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil { + log.Error("post-announce hooks failed", log.Err(err)) + return + } + } +} + +// HandleScrape generates a response for a Scrape. +func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) { + resp = &bittorrent.ScrapeResponse{ + Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)), + } + for _, h := range l.preHooks { + if ctx, err = h.HandleScrape(ctx, req, resp); err != nil { + return nil, nil, err + } + } + + log.Debug("generated scrape response", resp) + return ctx, resp, nil +} + +// AfterScrape does something with the results of a Scrape after it has been +// completed. +func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) { + var err error + for _, h := range l.postHooks { + if ctx, err = h.HandleScrape(ctx, req, resp); err != nil { + log.Error("post-scrape hooks failed", log.Err(err)) + return + } + } +} + +// Stop stops the Logic. +// +// This stops any hooks that implement stop.Stopper. +func (l *Logic) Stop() []error { + stopGroup := stop.NewGroup() + for _, hook := range l.preHooks { + stoppable, ok := hook.(stop.Stopper) + if ok { + stopGroup.Add(stoppable) + } + } + + for _, hook := range l.postHooks { + stoppable, ok := hook.(stop.Stopper) + if ok { + stopGroup.Add(stoppable) + } + } + + return stopGroup.Stop() +} diff --git a/middleware/middleware_test.go b/middleware/logic_test.go similarity index 100% rename from middleware/middleware_test.go rename to middleware/logic_test.go diff --git a/middleware/middleware.go b/middleware/middleware.go index 691e006..8f550b8 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -3,120 +3,92 @@ package middleware import ( - "context" - "time" + "errors" + "sync" - "github.com/chihaya/chihaya/bittorrent" - "github.com/chihaya/chihaya/frontend" - "github.com/chihaya/chihaya/pkg/log" - "github.com/chihaya/chihaya/pkg/stop" - "github.com/chihaya/chihaya/storage" + "gopkg.in/yaml.v2" ) -// Config holds the configuration common across all middleware. -type Config struct { - AnnounceInterval time.Duration `yaml:"announce_interval"` - MinAnnounceInterval time.Duration `yaml:"min_announce_interval"` -} +var ( + driversM sync.RWMutex + drivers = make(map[string]Driver) -var _ frontend.TrackerLogic = &Logic{} + // ErrDriverDoesNotExist is the error returned by NewMiddleware when a + // middleware driver with that name does not exist. + ErrDriverDoesNotExist = errors.New("middleware driver with that name does not exist") +) -// NewLogic creates a new instance of a TrackerLogic that executes the provided -// middleware hooks. -func NewLogic(cfg Config, peerStore storage.PeerStore, preHooks, postHooks []Hook) *Logic { - return &Logic{ - announceInterval: cfg.AnnounceInterval, - minAnnounceInterval: cfg.MinAnnounceInterval, - peerStore: peerStore, - preHooks: append(preHooks, &responseHook{store: peerStore}), - postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}), - } -} - -// Logic is an implementation of the TrackerLogic that functions by -// executing a series of middleware hooks. -type Logic struct { - announceInterval time.Duration - minAnnounceInterval time.Duration - peerStore storage.PeerStore - preHooks []Hook - postHooks []Hook -} - -// HandleAnnounce generates a response for an Announce. -func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) { - resp = &bittorrent.AnnounceResponse{ - Interval: l.announceInterval, - MinInterval: l.minAnnounceInterval, - Compact: req.Compact, - } - for _, h := range l.preHooks { - if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil { - return nil, nil, err - } - } - - log.Debug("generated announce response", resp) - return ctx, resp, nil -} - -// AfterAnnounce does something with the results of an Announce after it has -// been completed. -func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) { - var err error - for _, h := range l.postHooks { - if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil { - log.Error("post-announce hooks failed", log.Err(err)) - return - } - } -} - -// HandleScrape generates a response for a Scrape. -func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) { - resp = &bittorrent.ScrapeResponse{ - Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)), - } - for _, h := range l.preHooks { - if ctx, err = h.HandleScrape(ctx, req, resp); err != nil { - return nil, nil, err - } - } - - log.Debug("generated scrape response", resp) - return ctx, resp, nil -} - -// AfterScrape does something with the results of a Scrape after it has been -// completed. -func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) { - var err error - for _, h := range l.postHooks { - if ctx, err = h.HandleScrape(ctx, req, resp); err != nil { - log.Error("post-scrape hooks failed", log.Err(err)) - return - } - } -} - -// Stop stops the Logic. +// Driver is the interface used to initialize a new type of middleware. // -// This stops any hooks that implement stop.stop. -func (l *Logic) Stop() []error { - stopGroup := stop.NewGroup() - for _, hook := range l.preHooks { - stoppable, ok := hook.(stop.Stopper) - if ok { - stopGroup.Add(stoppable) - } - } - - for _, hook := range l.postHooks { - stoppable, ok := hook.(stop.Stopper) - if ok { - stopGroup.Add(stoppable) - } - } - - return stopGroup.Stop() +// The options parameter is YAML encoded bytes that should be unmarshalled into +// the hook's custom configuration. +type Driver interface { + NewHook(options []byte) (Hook, error) +} + +// RegisterDriver makes a Driver available by the provided name. +// +// If called twice with the same name, the name is blank, or if the provided +// Driver is nil, this function panics. +func RegisterDriver(name string, d Driver) { + if name == "" { + panic("middleware: could not register a Driver with an empty name") + } + if d == nil { + panic("middleware: could not register a nil Driver") + } + + driversM.Lock() + defer driversM.Unlock() + + if _, dup := drivers[name]; dup { + panic("middleware: RegisterDriver called twice for " + name) + } + + drivers[name] = d +} + +// New attempts to initialize a new middleware instance from the +// list of registered Drivers. +// +// If a driver does not exist, returns ErrDriverDoesNotExist. +func New(name string, optionBytes []byte) (Hook, error) { + driversM.RLock() + defer driversM.RUnlock() + + var d Driver + d, ok := drivers[name] + if !ok { + return nil, ErrDriverDoesNotExist + } + + return d.NewHook(optionBytes) +} + +// HookConfig is the generic configuration format used for all registered Hooks. +type HookConfig struct { + Name string `yaml:"name"` + Options map[string]interface{} `yaml:"options"` +} + +// HooksFromHookConfigs is a utility function for initializing Hooks in bulk. +func HooksFromHookConfigs(cfgs []HookConfig) (hooks []Hook, err error) { + for _, cfg := range cfgs { + // Marshal the options back into bytes. + var optionBytes []byte + optionBytes, err = yaml.Marshal(cfg.Options) + if err != nil { + return + } + + var h Hook + h, err = New(cfg.Name, optionBytes) + if err != nil { + return + } + + hooks = append(hooks, h) + } + + return } diff --git a/middleware/varinterval/varinterval.go b/middleware/varinterval/varinterval.go index f531e4b..23d34b0 100644 --- a/middleware/varinterval/varinterval.go +++ b/middleware/varinterval/varinterval.go @@ -3,14 +3,38 @@ package varinterval import ( "context" "errors" + "fmt" "sync" "time" + "gopkg.in/yaml.v2" + "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/middleware" "github.com/chihaya/chihaya/middleware/pkg/random" ) +// Name is the name by which this middleware is registered with Chihaya. +const Name = "interval variation" + +func init() { + middleware.RegisterDriver(Name, driver{}) +} + +var _ middleware.Driver = driver{} + +type driver struct{} + +func (d driver) NewHook(optionBytes []byte) (middleware.Hook, error) { + var cfg Config + err := yaml.Unmarshal(optionBytes, &cfg) + if err != nil { + return nil, fmt.Errorf("invalid options for middleware %s: %s", Name, err) + } + + return NewHook(cfg) +} + // ErrInvalidModifyResponseProbability is returned for a config with an invalid // ModifyResponseProbability. var ErrInvalidModifyResponseProbability = errors.New("invalid modify_response_probability") @@ -50,9 +74,9 @@ type hook struct { sync.Mutex } -// New creates a middleware to randomly modify the announce interval from the -// given config. -func New(cfg Config) (middleware.Hook, error) { +// NewHook creates a middleware to randomly modify the announce interval from +// the given config. +func NewHook(cfg Config) (middleware.Hook, error) { err := checkConfig(cfg) if err != nil { return nil, err diff --git a/middleware/varinterval/varinterval_test.go b/middleware/varinterval/varinterval_test.go index c582c30..2ab0f96 100644 --- a/middleware/varinterval/varinterval_test.go +++ b/middleware/varinterval/varinterval_test.go @@ -43,7 +43,7 @@ func TestCheckConfig(t *testing.T) { } func TestHandleAnnounce(t *testing.T) { - h, err := New(Config{1.0, 10, true}) + h, err := NewHook(Config{1.0, 10, true}) require.Nil(t, err) require.NotNil(t, h) diff --git a/storage/storage.go b/storage/storage.go index 5326919..e64c5f8 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -136,7 +136,7 @@ func RegisterDriver(name string, d Driver) { drivers[name] = d } -// NewPeerStore attempts to initialize a new PeerStore with given a name from +// NewPeerStore attempts to initialize a new PeerStore instance from // the list of registered Drivers. // // If a driver does not exist, returns ErrDriverDoesNotExist.