restructure into backend and frontends

This commit is contained in:
Leo Balduf 2016-08-06 22:41:33 -04:00 committed by Jimmy Zelinskie
parent 11d135ce49
commit 8f67c1018e
22 changed files with 132 additions and 83 deletions

View file

@ -15,13 +15,13 @@
// Package trakr implements a BitTorrent Tracker that supports multiple // Package trakr implements a BitTorrent Tracker that supports multiple
// protocols and configurable Hooks that execute before and after a Response // protocols and configurable Hooks that execute before and after a Response
// has been delievered to a BitTorrent client. // has been delievered to a BitTorrent client.
package trakr package backend
import ( import (
"time" "time"
"github.com/jzelinskie/trakr/bittorrent/http" "github.com/jzelinskie/trakr/bittorrent"
"github.com/jzelinskie/trakr/bittorrent/udp" "golang.org/x/net/context"
) )
// GenericConfig is a block of configuration who's structure is unknown. // GenericConfig is a block of configuration who's structure is unknown.
@ -30,38 +30,32 @@ type GenericConfig struct {
config interface{} `yaml:"config"` config interface{} `yaml:"config"`
} }
// MultiTracker is a multi-protocol, customizable BitTorrent Tracker. // Backend is a multi-protocol, customizable BitTorrent Tracker.
type MultiTracker struct { type Backend struct {
AnnounceInterval time.Duration `yaml:"announce_interval"` AnnounceInterval time.Duration `yaml:"announce_interval"`
GCInterval time.Duration `yaml:"gc_interval"` GCInterval time.Duration `yaml:"gc_interval"`
GCExpiration time.Duration `yaml:"gc_expiration"` GCExpiration time.Duration `yaml:"gc_expiration"`
HTTPConfig http.Config `yaml:"http"`
UDPConfig udp.Config `yaml:"udp"`
PeerStoreConfig []GenericConfig `yaml:"storage"` PeerStoreConfig []GenericConfig `yaml:"storage"`
PreHooks []GenericConfig `yaml:"prehooks"` PreHooks []GenericConfig `yaml:"prehooks"`
PostHooks []GenericConfig `yaml:"posthooks"` PostHooks []GenericConfig `yaml:"posthooks"`
peerStore PeerStore peerStore PeerStore
httpTracker http.Tracker closing chan struct{}
udpTracker udp.Tracker
closing chan struct{}
} }
// Stop provides a thread-safe way to shutdown a currently running // Stop provides a thread-safe way to shutdown a currently running
// MultiTracker. // Backend.
func (t *MultiTracker) Stop() { func (t *Backend) Stop() {
close(t.closing) close(t.closing)
} }
// ListenAndServe listens on the protocols and addresses specified in the // Start starts the Backend.
// HTTPConfig and UDPConfig then blocks serving BitTorrent requests until // It blocks until t.Stop() is called or an error is returned.
// t.Stop() is called or an error is returned. func (t *Backend) Start() error {
func (t *MultiTracker) ListenAndServe() error {
t.closing = make(chan struct{}) t.closing = make(chan struct{})
// Build an TrackerFuncs from the PreHooks and PostHooks. // Build an TrackerFuncs from the PreHooks and PostHooks.
// Create a PeerStore instance. // Create a PeerStore instance.
// Create a HTTP Tracker instance. // Make TrackerFuncs available to be used by frontends.
// Create a UDP Tracker instance.
select { select {
case <-t.closing: case <-t.closing:
return nil return nil
@ -69,3 +63,27 @@ func (t *MultiTracker) ListenAndServe() error {
return nil return nil
} }
// TrackerFuncs is the collection of callback functions provided by the Backend
// to (1) generate a response from a parsed request, and (2) observe anything
// after the response has been delivered to the client.
type TrackerFuncs struct {
HandleAnnounce AnnounceHandler
HandleScrape ScrapeHandler
AfterAnnounce AnnounceCallback
AfterScrape ScrapeCallback
}
// AnnounceHandler is a function that generates a response for an Announce.
type AnnounceHandler func(context.Context, *bittorrent.AnnounceRequest) (*bittorrent.AnnounceResponse, error)
// AnnounceCallback is a function that does something with the results of an
// Announce after it has been completed.
type AnnounceCallback func(*bittorrent.AnnounceRequest, *bittorrent.AnnounceResponse)
// ScrapeHandler is a function that generates a response for a Scrape.
type ScrapeHandler func(context.Context, *bittorrent.ScrapeRequest) (*bittorrent.ScrapeResponse, error)
// ScrapeCallback is a function that does something with the results of a
// Scrape after it has been completed.
type ScrapeCallback func(*bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse)

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package trakr package backend
import ( import (
"fmt" "fmt"

View file

@ -1,4 +1,4 @@
package trakr package backend
import ( import (
"fmt" "fmt"

View file

@ -20,8 +20,6 @@ package bittorrent
import ( import (
"net" "net"
"time" "time"
"golang.org/x/net/context"
) )
// PeerID represents a peer ID. // PeerID represents a peer ID.
@ -108,13 +106,6 @@ type AnnounceResponse struct {
IPv6Peers []Peer IPv6Peers []Peer
} }
// AnnounceHandler is a function that generates a response for an Announce.
type AnnounceHandler func(context.Context, *AnnounceRequest) (*AnnounceResponse, error)
// AnnounceCallback is a function that does something with the results of an
// Announce after it has been completed.
type AnnounceCallback func(*AnnounceRequest, *AnnounceResponse)
// ScrapeRequest represents the parsed parameters from a scrape request. // ScrapeRequest represents the parsed parameters from a scrape request.
type ScrapeRequest struct { type ScrapeRequest struct {
InfoHashes []InfoHash InfoHashes []InfoHash
@ -133,13 +124,6 @@ type Scrape struct {
Incomplete uint32 Incomplete uint32
} }
// ScrapeHandler is a function that generates a response for a Scrape.
type ScrapeHandler func(context.Context, *ScrapeRequest) (*ScrapeResponse, error)
// ScrapeCallback is a function that does something with the results of a
// Scrape after it has been completed.
type ScrapeCallback func(*ScrapeRequest, *ScrapeResponse)
// Peer represents the connection details of a peer that is returned in an // Peer represents the connection details of a peer that is returned in an
// announce response. // announce response.
type Peer struct { type Peer struct {
@ -171,13 +155,3 @@ type Tracker interface {
ListenAndServe() error ListenAndServe() error
Stop() Stop()
} }
// TrackerFuncs is the collection of callback functions provided to a Tracker
// to (1) generate a response from a parsed request, and (2) observe anything
// after the response has been delivered to the client.
type TrackerFuncs struct {
HandleAnnounce AnnounceHandler
HandleScrape ScrapeHandler
AfterAnnounce AnnounceCallback
AfterScrape ScrapeCallback
}

View file

@ -14,13 +14,18 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
"github.com/jzelinskie/trakr" "github.com/jzelinskie/trakr/backend"
httpfrontend "github.com/jzelinskie/trakr/frontends/http"
udpfrontend "github.com/jzelinskie/trakr/frontends/udp"
) )
type ConfigFile struct { type ConfigFile struct {
Config struct { Config struct {
PrometheusAddr string `yaml:"prometheus_addr"` PrometheusAddr string `yaml:"prometheus_addr"`
trakr.MultiTracker backend.Backend
HTTPConfig httpfrontend.Config `yaml:"http"`
UDPConfig udpfrontend.Config `yaml:"udp"`
} `yaml:"trakr"` } `yaml:"trakr"`
} }
@ -89,15 +94,66 @@ func main() {
} }
}() }()
errChan := make(chan error)
closedChan := make(chan struct{})
go func() { go func() {
shutdown := make(chan os.Signal) if err := configFile.Config.Backend.Start(); err != nil {
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) errChan <- errors.New("failed to cleanly shutdown: " + err.Error())
<-shutdown }
configFile.Config.MultiTracker.Stop()
}() }()
if err := configFile.Config.MultiTracker.ListenAndServe(); err != nil { var hFrontend *httpfrontend.Frontend
return errors.New("failed to cleanly shutdown: " + err.Error()) var uFrontend *udpfrontend.Frontend
if configFile.Config.HTTPConfig.Addr != "" {
// TODO get the real TrackerFuncs
hFrontend = httpfrontend.NewFrontend(backend.TrackerFuncs{}, configFile.Config.HTTPConfig)
go func() {
log.Println("started serving HTTP on", configFile.Config.HTTPConfig.Addr)
if err := hFrontend.ListenAndServe(); err != nil {
errChan <- errors.New("failed to cleanly shutdown HTTP frontend: " + err.Error())
}
}()
}
if configFile.Config.UDPConfig.Addr != "" {
// TODO get the real TrackerFuncs
uFrontend = udpfrontend.NewFrontend(backend.TrackerFuncs{}, configFile.Config.UDPConfig)
go func() {
log.Println("started serving UDP on", configFile.Config.UDPConfig.Addr)
if err := uFrontend.ListenAndServe(); err != nil {
errChan <- errors.New("failed to cleanly shutdown UDP frontend: " + err.Error())
}
}()
}
shutdown := make(chan os.Signal)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-shutdown
if uFrontend != nil {
uFrontend.Stop()
}
if hFrontend != nil {
hFrontend.Stop()
}
configFile.Config.Backend.Stop()
close(errChan)
close(closedChan)
}()
err = <-errChan
if err != nil {
close(shutdown)
<-closedChan
return err
} }
return nil return nil

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Package http implements a BitTorrent tracker via the HTTP protocol as // Package http implements a BitTorrent frontend via the HTTP protocol as
// described in BEP 3 and BEP 23. // described in BEP 3 and BEP 23.
package http package http
@ -26,7 +26,7 @@ import (
"github.com/tylerb/graceful" "github.com/tylerb/graceful"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/jzelinskie/trakr/bittorrent" "github.com/jzelinskie/trakr/backend"
) )
func init() { func init() {
@ -43,8 +43,8 @@ var promResponseDurationMilliseconds = prometheus.NewHistogramVec(
[]string{"action", "error"}, []string{"action", "error"},
) )
// recordResponseDuration records the duration of time to respond to a UDP // recordResponseDuration records the duration of time to respond to a Request
// Request in milliseconds . // in milliseconds .
func recordResponseDuration(action string, err error, duration time.Duration) { func recordResponseDuration(action string, err error, duration time.Duration) {
var errString string var errString string
if err != nil { if err != nil {
@ -57,7 +57,7 @@ func recordResponseDuration(action string, err error, duration time.Duration) {
} }
// Config represents all of the configurable options for an HTTP BitTorrent // Config represents all of the configurable options for an HTTP BitTorrent
// Tracker. // Frontend.
type Config struct { type Config struct {
Addr string Addr string
ReadTimeout time.Duration ReadTimeout time.Duration
@ -67,29 +67,29 @@ type Config struct {
RealIPHeader string RealIPHeader string
} }
// Tracker holds the state of an HTTP BitTorrent Tracker. // Frontend holds the state of an HTTP BitTorrent Frontend.
type Tracker struct { type Frontend struct {
grace *graceful.Server grace *graceful.Server
bittorrent.TrackerFuncs backend.TrackerFuncs
Config Config
} }
// NewTracker allocates a new instance of a Tracker. // NewFrontend allocates a new instance of a Frontend.
func NewTracker(funcs bittorrent.TrackerFuncs, cfg Config) *Tracker { func NewFrontend(funcs backend.TrackerFuncs, cfg Config) *Frontend {
return &Tracker{ return &Frontend{
TrackerFuncs: funcs, TrackerFuncs: funcs,
Config: cfg, Config: cfg,
} }
} }
// Stop provides a thread-safe way to shutdown a currently running Tracker. // Stop provides a thread-safe way to shutdown a currently running Tracker.
func (t *Tracker) Stop() { func (t *Frontend) Stop() {
t.grace.Stop(t.grace.Timeout) t.grace.Stop(t.grace.Timeout)
<-t.grace.StopChan() <-t.grace.StopChan()
} }
func (t *Tracker) handler() http.Handler { func (t *Frontend) handler() http.Handler {
router := httprouter.New() router := httprouter.New()
router.GET("/announce", t.announceRoute) router.GET("/announce", t.announceRoute)
router.GET("/scrape", t.scrapeRoute) router.GET("/scrape", t.scrapeRoute)
@ -98,7 +98,7 @@ func (t *Tracker) handler() http.Handler {
// ListenAndServe listens on the TCP network address t.Addr and blocks serving // ListenAndServe listens on the TCP network address t.Addr and blocks serving
// BitTorrent requests until t.Stop() is called or an error is returned. // BitTorrent requests until t.Stop() is called or an error is returned.
func (t *Tracker) ListenAndServe() error { func (t *Frontend) ListenAndServe() error {
t.grace = &graceful.Server{ t.grace = &graceful.Server{
Server: &http.Server{ Server: &http.Server{
Addr: t.Addr, Addr: t.Addr,
@ -139,7 +139,7 @@ func (t *Tracker) ListenAndServe() error {
} }
// announceRoute parses and responds to an Announce by using t.TrackerFuncs. // announceRoute parses and responds to an Announce by using t.TrackerFuncs.
func (t *Tracker) announceRoute(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { func (t *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var err error var err error
start := time.Now() start := time.Now()
defer recordResponseDuration("announce", err, time.Since(start)) defer recordResponseDuration("announce", err, time.Since(start))
@ -168,7 +168,7 @@ func (t *Tracker) announceRoute(w http.ResponseWriter, r *http.Request, _ httpro
} }
// scrapeRoute parses and responds to a Scrape by using t.TrackerFuncs. // scrapeRoute parses and responds to a Scrape by using t.TrackerFuncs.
func (t *Tracker) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { func (t *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
var err error var err error
start := time.Now() start := time.Now()
defer recordResponseDuration("scrape", err, time.Since(start)) defer recordResponseDuration("scrape", err, time.Since(start))

View file

@ -18,7 +18,7 @@ import (
"net/http" "net/http"
"github.com/jzelinskie/trakr/bittorrent" "github.com/jzelinskie/trakr/bittorrent"
"github.com/jzelinskie/trakr/bittorrent/http/bencode" "github.com/jzelinskie/trakr/frontends/http/bencode"
) )
// WriteError communicates an error to a BitTorrent client over HTTP. // WriteError communicates an error to a BitTorrent client over HTTP.

View file

@ -27,8 +27,9 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/jzelinskie/trakr/backend"
"github.com/jzelinskie/trakr/bittorrent" "github.com/jzelinskie/trakr/bittorrent"
"github.com/jzelinskie/trakr/bittorrent/udp/bytepool" "github.com/jzelinskie/trakr/frontends/udp/bytepool"
) )
func init() { func init() {
@ -67,27 +68,27 @@ type Config struct {
AllowIPSpoofing bool AllowIPSpoofing bool
} }
// Tracker holds the state of a UDP BitTorrent Tracker. // Frontend holds the state of a UDP BitTorrent Frontend.
type Tracker struct { type Frontend struct {
socket *net.UDPConn socket *net.UDPConn
closing chan struct{} closing chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
bittorrent.TrackerFuncs backend.TrackerFuncs
Config Config
} }
// NewTracker allocates a new instance of a Tracker. // NewFrontend allocates a new instance of a Frontend.
func NewTracker(funcs bittorrent.TrackerFuncs, cfg Config) *Tracker { func NewFrontend(funcs backend.TrackerFuncs, cfg Config) *Frontend {
return &Tracker{ return &Frontend{
closing: make(chan struct{}), closing: make(chan struct{}),
TrackerFuncs: funcs, TrackerFuncs: funcs,
Config: cfg, Config: cfg,
} }
} }
// Stop provides a thread-safe way to shutdown a currently running Tracker. // Stop provides a thread-safe way to shutdown a currently running Frontend.
func (t *Tracker) Stop() { func (t *Frontend) Stop() {
close(t.closing) close(t.closing)
t.socket.SetReadDeadline(time.Now()) t.socket.SetReadDeadline(time.Now())
t.wg.Wait() t.wg.Wait()
@ -95,7 +96,7 @@ func (t *Tracker) Stop() {
// ListenAndServe listens on the UDP network address t.Addr and blocks serving // ListenAndServe listens on the UDP network address t.Addr and blocks serving
// BitTorrent requests until t.Stop() is called or an error is returned. // BitTorrent requests until t.Stop() is called or an error is returned.
func (t *Tracker) ListenAndServe() error { func (t *Frontend) ListenAndServe() error {
udpAddr, err := net.ResolveUDPAddr("udp", t.Addr) udpAddr, err := net.ResolveUDPAddr("udp", t.Addr)
if err != nil { if err != nil {
return err return err
@ -175,7 +176,7 @@ func (w ResponseWriter) Write(b []byte) (int, error) {
} }
// handleRequest parses and responds to a UDP Request. // handleRequest parses and responds to a UDP Request.
func (t *Tracker) handleRequest(r Request, w ResponseWriter) (response []byte, actionName string, err error) { func (t *Frontend) handleRequest(r Request, w ResponseWriter) (response []byte, actionName string, err error) {
if len(r.Packet) < 16 { if len(r.Packet) < 16 {
// Malformed, no client packets are less than 16 bytes. // Malformed, no client packets are less than 16 bytes.
// We explicitly return nothing in case this is a DoS attempt. // We explicitly return nothing in case this is a DoS attempt.