middleware: add a registration model
This commit is contained in:
parent
2bead6b7b4
commit
7dbbc86380
11 changed files with 320 additions and 198 deletions
middleware
|
@ -3,120 +3,92 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/chihaya/chihaya/bittorrent"
|
||||
"github.com/chihaya/chihaya/frontend"
|
||||
"github.com/chihaya/chihaya/pkg/log"
|
||||
"github.com/chihaya/chihaya/pkg/stop"
|
||||
"github.com/chihaya/chihaya/storage"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
// Config holds the configuration common across all middleware.
|
||||
type Config struct {
|
||||
AnnounceInterval time.Duration `yaml:"announce_interval"`
|
||||
MinAnnounceInterval time.Duration `yaml:"min_announce_interval"`
|
||||
}
|
||||
var (
|
||||
driversM sync.RWMutex
|
||||
drivers = make(map[string]Driver)
|
||||
|
||||
var _ frontend.TrackerLogic = &Logic{}
|
||||
// ErrDriverDoesNotExist is the error returned by NewMiddleware when a
|
||||
// middleware driver with that name does not exist.
|
||||
ErrDriverDoesNotExist = errors.New("middleware driver with that name does not exist")
|
||||
)
|
||||
|
||||
// NewLogic creates a new instance of a TrackerLogic that executes the provided
|
||||
// middleware hooks.
|
||||
func NewLogic(cfg Config, peerStore storage.PeerStore, preHooks, postHooks []Hook) *Logic {
|
||||
return &Logic{
|
||||
announceInterval: cfg.AnnounceInterval,
|
||||
minAnnounceInterval: cfg.MinAnnounceInterval,
|
||||
peerStore: peerStore,
|
||||
preHooks: append(preHooks, &responseHook{store: peerStore}),
|
||||
postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}),
|
||||
}
|
||||
}
|
||||
|
||||
// Logic is an implementation of the TrackerLogic that functions by
|
||||
// executing a series of middleware hooks.
|
||||
type Logic struct {
|
||||
announceInterval time.Duration
|
||||
minAnnounceInterval time.Duration
|
||||
peerStore storage.PeerStore
|
||||
preHooks []Hook
|
||||
postHooks []Hook
|
||||
}
|
||||
|
||||
// HandleAnnounce generates a response for an Announce.
|
||||
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (_ context.Context, resp *bittorrent.AnnounceResponse, err error) {
|
||||
resp = &bittorrent.AnnounceResponse{
|
||||
Interval: l.announceInterval,
|
||||
MinInterval: l.minAnnounceInterval,
|
||||
Compact: req.Compact,
|
||||
}
|
||||
for _, h := range l.preHooks {
|
||||
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("generated announce response", resp)
|
||||
return ctx, resp, nil
|
||||
}
|
||||
|
||||
// AfterAnnounce does something with the results of an Announce after it has
|
||||
// been completed.
|
||||
func (l *Logic) AfterAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) {
|
||||
var err error
|
||||
for _, h := range l.postHooks {
|
||||
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
|
||||
log.Error("post-announce hooks failed", log.Err(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HandleScrape generates a response for a Scrape.
|
||||
func (l *Logic) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest) (_ context.Context, resp *bittorrent.ScrapeResponse, err error) {
|
||||
resp = &bittorrent.ScrapeResponse{
|
||||
Files: make([]bittorrent.Scrape, 0, len(req.InfoHashes)),
|
||||
}
|
||||
for _, h := range l.preHooks {
|
||||
if ctx, err = h.HandleScrape(ctx, req, resp); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("generated scrape response", resp)
|
||||
return ctx, resp, nil
|
||||
}
|
||||
|
||||
// AfterScrape does something with the results of a Scrape after it has been
|
||||
// completed.
|
||||
func (l *Logic) AfterScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) {
|
||||
var err error
|
||||
for _, h := range l.postHooks {
|
||||
if ctx, err = h.HandleScrape(ctx, req, resp); err != nil {
|
||||
log.Error("post-scrape hooks failed", log.Err(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the Logic.
|
||||
// Driver is the interface used to initialize a new type of middleware.
|
||||
//
|
||||
// This stops any hooks that implement stop.stop.
|
||||
func (l *Logic) Stop() []error {
|
||||
stopGroup := stop.NewGroup()
|
||||
for _, hook := range l.preHooks {
|
||||
stoppable, ok := hook.(stop.Stopper)
|
||||
if ok {
|
||||
stopGroup.Add(stoppable)
|
||||
}
|
||||
}
|
||||
|
||||
for _, hook := range l.postHooks {
|
||||
stoppable, ok := hook.(stop.Stopper)
|
||||
if ok {
|
||||
stopGroup.Add(stoppable)
|
||||
}
|
||||
}
|
||||
|
||||
return stopGroup.Stop()
|
||||
// The options parameter is YAML encoded bytes that should be unmarshalled into
|
||||
// the hook's custom configuration.
|
||||
type Driver interface {
|
||||
NewHook(options []byte) (Hook, error)
|
||||
}
|
||||
|
||||
// RegisterDriver makes a Driver available by the provided name.
|
||||
//
|
||||
// If called twice with the same name, the name is blank, or if the provided
|
||||
// Driver is nil, this function panics.
|
||||
func RegisterDriver(name string, d Driver) {
|
||||
if name == "" {
|
||||
panic("middleware: could not register a Driver with an empty name")
|
||||
}
|
||||
if d == nil {
|
||||
panic("middleware: could not register a nil Driver")
|
||||
}
|
||||
|
||||
driversM.Lock()
|
||||
defer driversM.Unlock()
|
||||
|
||||
if _, dup := drivers[name]; dup {
|
||||
panic("middleware: RegisterDriver called twice for " + name)
|
||||
}
|
||||
|
||||
drivers[name] = d
|
||||
}
|
||||
|
||||
// New attempts to initialize a new middleware instance from the
|
||||
// list of registered Drivers.
|
||||
//
|
||||
// If a driver does not exist, returns ErrDriverDoesNotExist.
|
||||
func New(name string, optionBytes []byte) (Hook, error) {
|
||||
driversM.RLock()
|
||||
defer driversM.RUnlock()
|
||||
|
||||
var d Driver
|
||||
d, ok := drivers[name]
|
||||
if !ok {
|
||||
return nil, ErrDriverDoesNotExist
|
||||
}
|
||||
|
||||
return d.NewHook(optionBytes)
|
||||
}
|
||||
|
||||
// HookConfig is the generic configuration format used for all registered Hooks.
|
||||
type HookConfig struct {
|
||||
Name string `yaml:"name"`
|
||||
Options map[string]interface{} `yaml:"options"`
|
||||
}
|
||||
|
||||
// HooksFromHookConfigs is a utility function for initializing Hooks in bulk.
|
||||
func HooksFromHookConfigs(cfgs []HookConfig) (hooks []Hook, err error) {
|
||||
for _, cfg := range cfgs {
|
||||
// Marshal the options back into bytes.
|
||||
var optionBytes []byte
|
||||
optionBytes, err = yaml.Marshal(cfg.Options)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var h Hook
|
||||
h, err = New(cfg.Name, optionBytes)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
hooks = append(hooks, h)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue