From 5c99738b7fecab6c0731cc4440fb4c4aab7ce2a6 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Fri, 5 Aug 2016 01:47:04 -0400 Subject: [PATCH] make it compile! --- bittorrent/bittorrent.go | 10 ++-- bittorrent/http/parser.go | 72 +++++++++--------------- bittorrent/http/query_params.go | 21 +++---- bittorrent/http/tracker.go | 55 +++++++++++-------- bittorrent/http/writer.go | 1 + bittorrent/http/writer_test.go | 3 +- bittorrent/udp/bytepool/bytepool.go | 35 ++++++++++++ bittorrent/udp/parser.go | 38 ++++++------- bittorrent/udp/tracker.go | 68 +++++++++++++---------- bittorrent/udp/writer.go | 43 ++++++++------- cmd/trakr/config.go | 0 cmd/trakr/main.go | 65 ++++++++++++++++++++++ example_config.yaml | 12 ++-- hooks.go | 16 ++++-- stopper/stopper.go | 14 ++--- storage.go | 8 +-- tracker.go | 85 ++++++++++++++++++++++++++--- 17 files changed, 361 insertions(+), 185 deletions(-) create mode 100644 bittorrent/udp/bytepool/bytepool.go delete mode 100644 cmd/trakr/config.go diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index 035dcbe..34294fe 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -20,6 +20,8 @@ package bittorrent import ( "net" "time" + + "golang.org/x/net/context" ) // PeerID represents a peer ID. @@ -107,7 +109,7 @@ type AnnounceResponse struct { } // AnnounceHandler is a function that generates a response for an Announce. -type AnnounceHandler func(*AnnounceRequest) *AnnounceResponse +type AnnounceHandler func(context.Context, *AnnounceRequest) (*AnnounceResponse, error) // AnnounceCallback is a function that does something with the results of an // Announce after it has been completed. @@ -132,7 +134,7 @@ type Scrape struct { } // ScrapeHandler is a function that generates a response for a Scrape. -type ScrapeHandler func(*ScrapeRequest) *ScrapeResponse +type ScrapeHandler func(context.Context, *ScrapeRequest) (*ScrapeResponse, error) // ScrapeCallback is a function that does something with the results of a // Scrape after it has been completed. @@ -152,9 +154,9 @@ func (p Peer) Equal(x Peer) bool { return p.EqualEndpoint(x) && p.ID == x.ID } // EqualEndpoint reports whether p and x have the same endpoint. func (p Peer) EqualEndpoint(x Peer) bool { return p.Port == x.Port && p.IP.Equal(x.IP) } -// Params is used to fetch request optional parameters. +// Params is used to fetch request optional parameters from an Announce. type Params interface { - String(key string) (string, error) + String(key string) (string, bool) } // ClientError represents an error that should be exposed to the client over diff --git a/bittorrent/http/parser.go b/bittorrent/http/parser.go index 9c8d8d9..a43742e 100644 --- a/bittorrent/http/parser.go +++ b/bittorrent/http/parser.go @@ -32,14 +32,9 @@ func ParseAnnounce(r *http.Request, realIPHeader string, allowIPSpoofing bool) ( return nil, err } - request := &bittorrent.AnnounceRequest{Params: q} + request := &bittorrent.AnnounceRequest{Params: qp} - eventStr, err := qp.String("event") - if err == query.ErrKeyNotFound { - eventStr = "" - } else if err != nil { - return nil, bittorrent.ClientError("failed to parse parameter: event") - } + eventStr, _ := qp.String("event") request.Event, err = bittorrent.NewEvent(eventStr) if err != nil { return nil, bittorrent.ClientError("failed to provide valid client event") @@ -57,14 +52,14 @@ func ParseAnnounce(r *http.Request, realIPHeader string, allowIPSpoofing bool) ( } request.InfoHash = infoHashes[0] - peerID, err := qp.String("peer_id") - if err != nil { + peerID, ok := qp.String("peer_id") + if !ok { return nil, bittorrent.ClientError("failed to parse parameter: peer_id") } if len(peerID) != 20 { return nil, bittorrent.ClientError("failed to provide valid peer_id") } - request.PeerID = bittorrent.PeerIDFromString(peerID) + request.Peer.ID = bittorrent.PeerIDFromString(peerID) request.Left, err = qp.Uint64("left") if err != nil { @@ -85,24 +80,24 @@ func ParseAnnounce(r *http.Request, realIPHeader string, allowIPSpoofing bool) ( if err != nil { return nil, bittorrent.ClientError("failed to parse parameter: numwant") } - request.NumWant = int32(numwant) + request.NumWant = uint32(numwant) port, err := qp.Uint64("port") if err != nil { return nil, bittorrent.ClientError("failed to parse parameter: port") } - request.Port = uint16(port) + request.Peer.Port = uint16(port) - request.IP, err = requestedIP(q, r, realIPHeader, allowIPSpoofing) - if err != nil { - return nil, bittorrent.ClientError("failed to parse peer IP address: " + err.Error()) + request.Peer.IP = requestedIP(r, qp, realIPHeader, allowIPSpoofing) + if request.Peer.IP == nil { + return nil, bittorrent.ClientError("failed to parse peer IP address") } return request, nil } // ParseScrape parses an bittorrent.ScrapeRequest from an http.Request. -func ParseScrape(r *http.Request) (*bittorent.ScrapeRequest, error) { +func ParseScrape(r *http.Request) (*bittorrent.ScrapeRequest, error) { qp, err := NewQueryParams(r.URL.RawQuery) if err != nil { return nil, err @@ -115,7 +110,7 @@ func ParseScrape(r *http.Request) (*bittorent.ScrapeRequest, error) { request := &bittorrent.ScrapeRequest{ InfoHashes: infoHashes, - Params: q, + Params: qp, } return request, nil @@ -126,46 +121,31 @@ func ParseScrape(r *http.Request) (*bittorent.ScrapeRequest, error) { // If allowIPSpoofing is true, IPs provided via params will be used. // If realIPHeader is not empty string, the first value of the HTTP Header with // that name will be used. -func requestedIP(r *http.Request, p bittorent.Params, realIPHeader string, allowIPSpoofing bool) (net.IP, error) { +func requestedIP(r *http.Request, p bittorrent.Params, realIPHeader string, allowIPSpoofing bool) net.IP { if allowIPSpoofing { - if ipstr, err := p.String("ip"); err == nil { - ip, err := net.ParseIP(str) - if err != nil { - return nil, err - } - - return ip, nil + if ipstr, ok := p.String("ip"); ok { + ip := net.ParseIP(ipstr) + return ip } - if ipstr, err := p.String("ipv4"); err == nil { - ip, err := net.ParseIP(str) - if err != nil { - return nil, err - } - - return ip, nil + if ipstr, ok := p.String("ipv4"); ok { + ip := net.ParseIP(ipstr) + return ip } - if ipstr, err := p.String("ipv6"); err == nil { - ip, err := net.ParseIP(str) - if err != nil { - return nil, err - } - - return ip, nil + if ipstr, ok := p.String("ipv6"); ok { + ip := net.ParseIP(ipstr) + return ip } } if realIPHeader != "" { if ips, ok := r.Header[realIPHeader]; ok && len(ips) > 0 { - ip, err := net.ParseIP(ips[0]) - if err != nil { - return nil, err - } - - return ip, nil + ip := net.ParseIP(ips[0]) + return ip } } - return r.RemoteAddr + host, _, _ := net.SplitHostPort(r.RemoteAddr) + return net.ParseIP(host) } diff --git a/bittorrent/http/query_params.go b/bittorrent/http/query_params.go index b3fc62c..5607e3e 100644 --- a/bittorrent/http/query_params.go +++ b/bittorrent/http/query_params.go @@ -40,14 +40,14 @@ type QueryParams struct { } // NewQueryParams parses a raw URL query. -func NewQueryParams(query string) (*Query, error) { +func NewQueryParams(query string) (*QueryParams, error) { var ( keyStart, keyEnd int valStart, valEnd int onKey = true - q = &Query{ + q = &QueryParams{ query: query, infoHashes: nil, params: make(map[string]string), @@ -111,18 +111,15 @@ func NewQueryParams(query string) (*Query, error) { // String returns a string parsed from a query. Every key can be returned as a // string because they are encoded in the URL as strings. -func (q *Query) String(key string) (string, error) { - val, exists := q.params[key] - if !exists { - return "", ErrKeyNotFound - } - return val, nil +func (qp *QueryParams) String(key string) (string, bool) { + value, ok := qp.params[key] + return value, ok } // Uint64 returns a uint parsed from a query. After being called, it is safe to // cast the uint64 to your desired length. -func (q *Query) Uint64(key string) (uint64, error) { - str, exists := q.params[key] +func (qp *QueryParams) Uint64(key string) (uint64, error) { + str, exists := qp.params[key] if !exists { return 0, ErrKeyNotFound } @@ -136,6 +133,6 @@ func (q *Query) Uint64(key string) (uint64, error) { } // InfoHashes returns a list of requested infohashes. -func (q *Query) InfoHashes() []bittorrent.InfoHash { - return q.infoHashes +func (qp *QueryParams) InfoHashes() []bittorrent.InfoHash { + return qp.infoHashes } diff --git a/bittorrent/http/tracker.go b/bittorrent/http/tracker.go index 0b6f88f..a2edabc 100644 --- a/bittorrent/http/tracker.go +++ b/bittorrent/http/tracker.go @@ -16,6 +16,19 @@ // described in BEP 3 and BEP 23. package http +import ( + "net" + "net/http" + "time" + + "github.com/julienschmidt/httprouter" + "github.com/prometheus/client_golang/prometheus" + "github.com/tylerb/graceful" + "golang.org/x/net/context" + + "github.com/jzelinskie/trakr/bittorrent" +) + var promResponseDurationMilliseconds = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "trakr_http_response_duration_milliseconds", @@ -27,9 +40,14 @@ var promResponseDurationMilliseconds = prometheus.NewHistogramVec( // recordResponseDuration records the duration of time to respond to a UDP // Request in milliseconds . -func recordResponseDuration(action, err error, duration time.Duration) { +func recordResponseDuration(action string, err error, duration time.Duration) { + var errString string + if err != nil { + errString = err.Error() + } + promResponseDurationMilliseconds. - WithLabelValues(action, err.Error()). + WithLabelValues(action, errString). Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } @@ -53,8 +71,8 @@ type Tracker struct { } // NewTracker allocates a new instance of a Tracker. -func NewTracker(funcs bittorrent.TrackerFuncs, cfg Config) { - return &Server{ +func NewTracker(funcs bittorrent.TrackerFuncs, cfg Config) *Tracker { + return &Tracker{ TrackerFuncs: funcs, Config: cfg, } @@ -66,11 +84,11 @@ func (t *Tracker) Stop() { <-t.grace.StopChan() } -func (t *Tracker) handler() { +func (t *Tracker) handler() http.Handler { router := httprouter.New() router.GET("/announce", t.announceRoute) router.GET("/scrape", t.scrapeRoute) - return server + return router } // ListenAndServe listens on the TCP network address t.Addr and blocks serving @@ -111,18 +129,15 @@ func (t *Tracker) ListenAndServe() error { panic("http: failed to gracefully run HTTP server: " + err.Error()) } } + + return nil } // announceRoute parses and responds to an Announce by using t.TrackerFuncs. func (t *Tracker) announceRoute(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var err error start := time.Now() - defer func() { - var errString string - if err != nil { - errString = err.Error() - } - recordResponseDuration("announce", errString, time.Since(start)) - }() + defer recordResponseDuration("announce", err, time.Since(start)) req, err := ParseAnnounce(r, t.RealIPHeader, t.AllowIPSpoofing) if err != nil { @@ -130,7 +145,7 @@ func (t *Tracker) announceRoute(w http.ResponseWriter, r *http.Request, _ httpro return } - resp, err := t.HandleAnnounce(req) + resp, err := t.HandleAnnounce(context.TODO(), req) if err != nil { WriteError(w, err) return @@ -145,19 +160,13 @@ func (t *Tracker) announceRoute(w http.ResponseWriter, r *http.Request, _ httpro if t.AfterAnnounce != nil { go t.AfterAnnounce(req, resp) } - recordResponseDuration("announce") } // scrapeRoute parses and responds to a Scrape by using t.TrackerFuncs. func (t *Tracker) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var err error start := time.Now() - defer func() { - var errString string - if err != nil { - errString = err.Error() - } - recordResponseDuration("scrape", errString, time.Since(start)) - }() + defer recordResponseDuration("scrape", err, time.Since(start)) req, err := ParseScrape(r) if err != nil { @@ -165,7 +174,7 @@ func (t *Tracker) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprout return } - resp, err := t.HandleScrape(req) + resp, err := t.HandleScrape(context.TODO(), req) if err != nil { WriteError(w, err) return diff --git a/bittorrent/http/writer.go b/bittorrent/http/writer.go index a0da645..c1e9266 100644 --- a/bittorrent/http/writer.go +++ b/bittorrent/http/writer.go @@ -18,6 +18,7 @@ import ( "net/http" "github.com/jzelinskie/trakr/bittorrent" + "github.com/jzelinskie/trakr/bittorrent/http/bencode" ) // WriteError communicates an error to a BitTorrent client over HTTP. diff --git a/bittorrent/http/writer_test.go b/bittorrent/http/writer_test.go index 4c9b185..e8a5d31 100644 --- a/bittorrent/http/writer_test.go +++ b/bittorrent/http/writer_test.go @@ -18,8 +18,9 @@ import ( "net/http/httptest" "testing" - "github.com/jzelinskie/trakr/bittorrent" "github.com/stretchr/testify/assert" + + "github.com/jzelinskie/trakr/bittorrent" ) func TestWriteError(t *testing.T) { diff --git a/bittorrent/udp/bytepool/bytepool.go b/bittorrent/udp/bytepool/bytepool.go new file mode 100644 index 0000000..adc1207 --- /dev/null +++ b/bittorrent/udp/bytepool/bytepool.go @@ -0,0 +1,35 @@ +// Copyright 2016 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package bytepool + +import "sync" + +// BytePool is a cached pool of reusable byte slices. +type BytePool struct { + sync.Pool +} + +// New allocates a new BytePool with slices of the provided capacity. +func New(length, capacity int) *BytePool { + var bp BytePool + bp.Pool.New = func() interface{} { + return make([]byte, length, capacity) + } + return &bp +} + +// Get returns a byte slice from the pool. +func (bp *BytePool) Get() []byte { + return bp.Pool.Get().([]byte) +} + +// Put returns a byte slice to the pool. +func (bp *BytePool) Put(b []byte) { + // Zero out the bytes. + for i := 0; i < cap(b); i++ { + b[i] = 0x0 + } + bp.Pool.Put(b) +} diff --git a/bittorrent/udp/parser.go b/bittorrent/udp/parser.go index 85e4469..31ca9d8 100644 --- a/bittorrent/udp/parser.go +++ b/bittorrent/udp/parser.go @@ -63,35 +63,35 @@ var ( // // If allowIPSpoofing is true, IPs provided via params will be used. func ParseAnnounce(r Request, allowIPSpoofing bool) (*bittorrent.AnnounceRequest, error) { - if len(r.packet) < 98 { + if len(r.Packet) < 98 { return nil, errMalformedPacket } - infohash := r.packet[16:36] - peerID := r.packet[36:56] - downloaded := binary.BigEndian.Uint64(r.packet[56:64]) - left := binary.BigEndian.Uint64(r.packet[64:72]) - uploaded := binary.BigEndian.Uint64(r.packet[72:80]) + infohash := r.Packet[16:36] + peerID := r.Packet[36:56] + downloaded := binary.BigEndian.Uint64(r.Packet[56:64]) + left := binary.BigEndian.Uint64(r.Packet[64:72]) + uploaded := binary.BigEndian.Uint64(r.Packet[72:80]) - eventID := int(r.packet[83]) + eventID := int(r.Packet[83]) if eventID >= len(eventIDs) { return nil, errMalformedEvent } ip := r.IP - ipbytes := r.packet[84:88] + ipbytes := r.Packet[84:88] if allowIPSpoofing { ip = net.IP(ipbytes) } - if !allowIPSpoofing && r.ip == nil { + if !allowIPSpoofing && r.IP == nil { // We have no IP address to fallback on. return nil, errMalformedIP } - numWant := binary.BigEndian.Uint32(r.packet[92:96]) - port := binary.BigEndian.Uint16(r.packet[96:98]) + numWant := binary.BigEndian.Uint32(r.Packet[92:96]) + port := binary.BigEndian.Uint16(r.Packet[96:98]) - params, err := handleOptionalParameters(r.packet) + params, err := handleOptionalParameters(r.Packet) if err != nil { return nil, err } @@ -152,24 +152,24 @@ func handleOptionalParameters(packet []byte) (params bittorrent.Params, err erro } // ParseScrape parses a ScrapeRequest from a UDP request. -func parseScrape(r Request) (*bittorrent.ScrapeRequest, error) { +func ParseScrape(r Request) (*bittorrent.ScrapeRequest, error) { // If a scrape isn't at least 36 bytes long, it's malformed. - if len(r.packet) < 36 { + if len(r.Packet) < 36 { return nil, errMalformedPacket } // Skip past the initial headers and check that the bytes left equal the // length of a valid list of infohashes. - r.packet = r.packet[16:] - if len(r.packet)%20 != 0 { + r.Packet = r.Packet[16:] + if len(r.Packet)%20 != 0 { return nil, errMalformedPacket } // Allocate a list of infohashes and append it to the list until we're out. var infohashes []bittorrent.InfoHash - for len(r.packet) >= 20 { - infohashes = append(infohashes, bittorrent.InfoHashFromBytes(r.packet[:20])) - r.packet = r.packet[20:] + for len(r.Packet) >= 20 { + infohashes = append(infohashes, bittorrent.InfoHashFromBytes(r.Packet[:20])) + r.Packet = r.Packet[20:] } return &bittorrent.ScrapeRequest{ diff --git a/bittorrent/udp/tracker.go b/bittorrent/udp/tracker.go index 366721b..4efa640 100644 --- a/bittorrent/udp/tracker.go +++ b/bittorrent/udp/tracker.go @@ -19,10 +19,16 @@ package udp import ( "bytes" "encoding/binary" + "log" "net" + "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/net/context" + "github.com/jzelinskie/trakr/bittorrent" + "github.com/jzelinskie/trakr/bittorrent/udp/bytepool" ) var promResponseDurationMilliseconds = prometheus.NewHistogramVec( @@ -36,9 +42,14 @@ var promResponseDurationMilliseconds = prometheus.NewHistogramVec( // recordResponseDuration records the duration of time to respond to a UDP // Request in milliseconds . -func recordResponseDuration(action, err error, duration time.Duration) { +func recordResponseDuration(action string, err error, duration time.Duration) { + var errString string + if err != nil { + errString = err.Error() + } + promResponseDurationMilliseconds. - WithLabelValues(action, err.Error()). + WithLabelValues(action, errString). Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } @@ -47,12 +58,13 @@ func recordResponseDuration(action, err error, duration time.Duration) { type Config struct { Addr string PrivateKey string + MaxClockSkew time.Duration AllowIPSpoofing bool } // Tracker holds the state of a UDP BitTorrent Tracker. type Tracker struct { - sock *net.UDPConn + socket *net.UDPConn closing chan struct{} wg sync.WaitGroup @@ -61,7 +73,7 @@ type Tracker struct { } // NewTracker allocates a new instance of a Tracker. -func NewTracker(funcs bittorrent.TrackerFuncs, cfg Config) { +func NewTracker(funcs bittorrent.TrackerFuncs, cfg Config) *Tracker { return &Tracker{ closing: make(chan struct{}), TrackerFuncs: funcs, @@ -72,7 +84,7 @@ func NewTracker(funcs bittorrent.TrackerFuncs, cfg Config) { // Stop provides a thread-safe way to shutdown a currently running Tracker. func (t *Tracker) Stop() { close(t.closing) - t.sock.SetReadDeadline(time.Now()) + t.socket.SetReadDeadline(time.Now()) t.wg.Wait() } @@ -84,11 +96,11 @@ func (t *Tracker) ListenAndServe() error { return err } - t.sock, err = net.ListenUDP("udp", udpAddr) + t.socket, err = net.ListenUDP("udp", udpAddr) if err != nil { return err } - defer t.sock.Close() + defer t.socket.Close() pool := bytepool.New(256, 2048) @@ -103,8 +115,8 @@ func (t *Tracker) ListenAndServe() error { // Read a UDP packet into a reusable buffer. buffer := pool.Get() - t.sock.SetReadDeadline(time.Now().Add(time.Second)) - n, addr, err := t.sock.ReadFromUDP(buffer) + t.socket.SetReadDeadline(time.Now().Add(time.Second)) + n, addr, err := t.socket.ReadFromUDP(buffer) if err != nil { pool.Put(buffer) if netErr, ok := err.(net.Error); ok && netErr.Temporary() { @@ -122,21 +134,18 @@ func (t *Tracker) ListenAndServe() error { log.Println("Got UDP Request") t.wg.Add(1) - go func(start time.Time) { + go func() { defer t.wg.Done() defer pool.Put(buffer) // Handle the request. start := time.Now() - response, action, err := t.handleRequest(&Request{buffer[:n], addr.IP}) + response, action, err := t.handleRequest( + Request{buffer[:n], addr.IP}, + ResponseWriter{t.socket, addr}, + ) log.Printf("Handled UDP Request: %s, %s, %s\n", response, action, err) - - // Record to the duration of time used to respond to the request. - var errString string - if err != nil { - errString = err.Error() - } - recordResponseDuration(action, errString, time.Since(start)) + recordResponseDuration(action, err, time.Since(start)) }() } } @@ -150,19 +159,19 @@ type Request struct { // ResponseWriter implements the ability to respond to a Request via the // io.Writer interface. type ResponseWriter struct { - socket net.UDPConn - addr net.UDPAddr + socket *net.UDPConn + addr *net.UDPAddr } // Write implements the io.Writer interface for a ResponseWriter. -func (w *ResponseWriter) Write(b []byte) (int, error) { +func (w ResponseWriter) Write(b []byte) (int, error) { w.socket.WriteToUDP(b, w.addr) return len(b), nil } // handleRequest parses and responds to a UDP Request. -func (t *Tracker) handleRequest(r *Request, w *ResponseWriter) (response []byte, actionName string, err error) { - if len(r.packet) < 16 { +func (t *Tracker) handleRequest(r Request, w ResponseWriter) (response []byte, actionName string, err error) { + if len(r.Packet) < 16 { // Malformed, no client packets are less than 16 bytes. // We explicitly return nothing in case this is a DoS attempt. err = errMalformedPacket @@ -170,13 +179,13 @@ func (t *Tracker) handleRequest(r *Request, w *ResponseWriter) (response []byte, } // Parse the headers of the UDP packet. - connID := r.packet[0:8] - actionID := binary.BigEndian.Uint32(r.packet[8:12]) - txID := r.packet[12:16] + connID := r.Packet[0:8] + actionID := binary.BigEndian.Uint32(r.Packet[8:12]) + txID := r.Packet[12:16] // If this isn't requesting a new connection ID and the connection ID is // invalid, then fail. - if actionID != connectActionID && !ValidConnectionID(connID, r.IP, time.Now(), t.PrivateKey) { + if actionID != connectActionID && !ValidConnectionID(connID, r.IP, time.Now(), t.MaxClockSkew, t.PrivateKey) { err = errBadConnectionID WriteError(w, txID, err) return @@ -206,7 +215,7 @@ func (t *Tracker) handleRequest(r *Request, w *ResponseWriter) (response []byte, } var resp *bittorrent.AnnounceResponse - resp, err = t.HandleAnnounce(req) + resp, err = t.HandleAnnounce(context.TODO(), req) if err != nil { WriteError(w, txID, err) return @@ -231,8 +240,7 @@ func (t *Tracker) handleRequest(r *Request, w *ResponseWriter) (response []byte, } var resp *bittorrent.ScrapeResponse - ctx := context.TODO() - resp, err = t.HandleScrape(ctx, req) + resp, err = t.HandleScrape(context.TODO(), req) if err != nil { WriteError(w, txID, err) return diff --git a/bittorrent/udp/writer.go b/bittorrent/udp/writer.go index 068741a..211635f 100644 --- a/bittorrent/udp/writer.go +++ b/bittorrent/udp/writer.go @@ -18,58 +18,59 @@ import ( "bytes" "encoding/binary" "fmt" + "io" "time" "github.com/jzelinskie/trakr/bittorrent" ) // WriteError writes the failure reason as a null-terminated string. -func WriteError(writer io.Writer, txID []byte, err error) { +func WriteError(w io.Writer, txID []byte, err error) { // If the client wasn't at fault, acknowledge it. if _, ok := err.(bittorrent.ClientError); !ok { err = fmt.Errorf("internal error occurred: %s", err.Error()) } var buf bytes.Buffer - writeHeader(buf, txID, errorActionID) + writeHeader(&buf, txID, errorActionID) buf.WriteString(err.Error()) buf.WriteRune('\000') - writer.Write(buf.Bytes()) + w.Write(buf.Bytes()) } // WriteAnnounce encodes an announce response according to BEP 15. -func WriteAnnounce(respBuf *bytes.Buffer, txID []byte, resp *bittorrent.AnnounceResponse) { - writeHeader(respBuf, txID, announceActionID) - binary.Write(respBuf, binary.BigEndian, uint32(resp.Interval/time.Second)) - binary.Write(respBuf, binary.BigEndian, uint32(resp.Incomplete)) - binary.Write(respBuf, binary.BigEndian, uint32(resp.Complete)) +func WriteAnnounce(w io.Writer, txID []byte, resp *bittorrent.AnnounceResponse) { + writeHeader(w, txID, announceActionID) + binary.Write(w, binary.BigEndian, uint32(resp.Interval/time.Second)) + binary.Write(w, binary.BigEndian, uint32(resp.Incomplete)) + binary.Write(w, binary.BigEndian, uint32(resp.Complete)) for _, peer := range resp.IPv4Peers { - respBuf.Write(peer.IP) - binary.Write(respBuf, binary.BigEndian, peer.Port) + w.Write(peer.IP) + binary.Write(w, binary.BigEndian, peer.Port) } } // WriteScrape encodes a scrape response according to BEP 15. -func WriteScrape(respBuf *bytes.Buffer, txID []byte, resp *bittorrent.ScrapeResponse) { - writeHeader(respBuf, txID, scrapeActionID) +func WriteScrape(w io.Writer, txID []byte, resp *bittorrent.ScrapeResponse) { + writeHeader(w, txID, scrapeActionID) for _, scrape := range resp.Files { - binary.Write(respBuf, binary.BigEndian, scrape.Complete) - binary.Write(respBuf, binary.BigEndian, scrape.Snatches) - binary.Write(respBuf, binary.BigEndian, scrape.Incomplete) + binary.Write(w, binary.BigEndian, scrape.Complete) + binary.Write(w, binary.BigEndian, scrape.Snatches) + binary.Write(w, binary.BigEndian, scrape.Incomplete) } } // WriteConnectionID encodes a new connection response according to BEP 15. -func WriteConnectionID(respBuf *bytes.Buffer, txID, connID []byte) { - writeHeader(respBuf, txID, connectActionID) - respBuf.Write(connID) +func WriteConnectionID(w io.Writer, txID, connID []byte) { + writeHeader(w, txID, connectActionID) + w.Write(connID) } // writeHeader writes the action and transaction ID to the provided response // buffer. -func writeHeader(respBuf *bytes.Buffer, txID []byte, action uint32) { - binary.Write(respBuf, binary.BigEndian, action) - respBuf.Write(txID) +func writeHeader(w io.Writer, txID []byte, action uint32) { + binary.Write(w, binary.BigEndian, action) + w.Write(txID) } diff --git a/cmd/trakr/config.go b/cmd/trakr/config.go deleted file mode 100644 index e69de29..0000000 diff --git a/cmd/trakr/main.go b/cmd/trakr/main.go index e69de29..65409c5 100644 --- a/cmd/trakr/main.go +++ b/cmd/trakr/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "errors" + "log" + "os" + "os/signal" + "runtime/pprof" + "syscall" + + "github.com/spf13/cobra" + + "github.com/jzelinskie/trakr" +) + +func main() { + var configFilePath string + var cpuProfilePath string + + var rootCmd = &cobra.Command{ + Use: "trakr", + Short: "BitTorrent Tracker", + Long: "A customizible, multi-protocol BitTorrent Tracker", + Run: func(cmd *cobra.Command, args []string) { + if err := func() error { + if cpuProfilePath != "" { + log.Println("enabled CPU profiling to " + cpuProfilePath) + f, err := os.Create(cpuProfilePath) + if err != nil { + return err + } + pprof.StartCPUProfile(f) + defer pprof.StopCPUProfile() + } + + mt, err := trakr.MultiTrackerFromFile(configFilePath) + if err != nil { + return errors.New("failed to read config: " + err.Error()) + } + + go func() { + shutdown := make(chan os.Signal) + signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) + <-shutdown + mt.Stop() + }() + + if err := mt.ListenAndServe(); err != nil { + return errors.New("failed to cleanly shutdown: " + err.Error()) + } + + return nil + }(); err != nil { + log.Fatal(err) + } + }, + } + + rootCmd.Flags().StringVar(&configFilePath, "config", "/etc/trakr.yaml", "location of configuration file (defaults to /etc/trakr.yaml)") + rootCmd.Flags().StringVarP(&cpuProfilePath, "cpuprofile", "", "", "location to save a CPU profile") + + if err := rootCmd.Execute(); err != nil { + log.Fatal(err) + } +} diff --git a/example_config.yaml b/example_config.yaml index 199275c..66e8ef1 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -2,22 +2,22 @@ trakr: announce_interval: 15m allow_ip_spoofing: true default_num_want: 50 - + http: addr: 0.0.0.0:6881 real_ip_header: x-real-ip read_timeout: 5s write_timeout: 5s request_timeout: 5s - + udp: addr: 0.0.0.0:6881 - + storage: name: memory config: shards: 1 - + prehooks: - name: jwt config: @@ -29,6 +29,6 @@ trakr: type: whitelist clients: - OP1011 - + posthooks: - - name: gossip \ No newline at end of file + - name: gossip diff --git a/hooks.go b/hooks.go index bbe12a8..03c4430 100644 --- a/hooks.go +++ b/hooks.go @@ -14,13 +14,19 @@ package trakr -import "github.com/jzelinskie/trakr/bittorrent" +import ( + "fmt" + + "golang.org/x/net/context" + + "github.com/jzelinskie/trakr/bittorrent" +) // Hook abstracts the concept of anything that needs to interact with a // BitTorrent client's request and response to a BitTorrent tracker. type Hook interface { - HandleAnnounce(context.Context, bittorrent.AnnounceRequest, bittorrent.AnnounceResponse) error - HandleScrape(context.Context, bittorrent.ScrapeRequest, bittorrent.ScrapeResponse) error + HandleAnnounce(context.Context, *bittorrent.AnnounceRequest, *bittorrent.AnnounceResponse) error + HandleScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse) error } // HookConstructor is a function used to create a new instance of a Hook. @@ -36,7 +42,7 @@ func RegisterPreHook(name string, con HookConstructor) { if con == nil { panic("trakr: could not register nil HookConstructor") } - if _, dup := constructors[name]; dup { + if _, dup := preHooks[name]; dup { panic("trakr: could not register duplicate HookConstructor: " + name) } preHooks[name] = con @@ -61,7 +67,7 @@ func RegisterPostHook(name string, con HookConstructor) { if con == nil { panic("trakr: could not register nil HookConstructor") } - if _, dup := constructors[name]; dup { + if _, dup := postHooks[name]; dup { panic("trakr: could not register duplicate HookConstructor: " + name) } preHooks[name] = con diff --git a/stopper/stopper.go b/stopper/stopper.go index 75dc4f8..2b9bd34 100644 --- a/stopper/stopper.go +++ b/stopper/stopper.go @@ -4,11 +4,11 @@ import ( "sync" ) -// AlreadyStopped is a closed error channel to be used by StopperFuncs when +// AlreadyStopped is a closed error channel to be used by Funcs when // an element was already stopped. var AlreadyStopped <-chan error -// AlreadyStoppedFunc is a StopperFunc that returns AlreadyStopped. +// AlreadyStoppedFunc is a Func that returns AlreadyStopped. var AlreadyStoppedFunc = func() <-chan error { return AlreadyStopped } func init() { @@ -30,7 +30,7 @@ type Stopper interface { // StopGroup is a group that can be stopped. type StopGroup struct { - stoppables []StopperFunc + stoppables []Func stoppablesLock sync.Mutex } @@ -40,7 +40,7 @@ type Func func() <-chan error // NewStopGroup creates a new StopGroup. func NewStopGroup() *StopGroup { return &StopGroup{ - stoppables: make([]StopperFunc, 0), + stoppables: make([]Func, 0), } } @@ -53,9 +53,9 @@ func (cg *StopGroup) Add(toAdd Stopper) { cg.stoppables = append(cg.stoppables, toAdd.Stop) } -// AddFunc adds a StopperFunc to the StopGroup. -// On the next call to Stop(), the StopperFunc will be called. -func (cg *StopGroup) AddFunc(toAddFunc StopperFunc) { +// AddFunc adds a Func to the StopGroup. +// On the next call to Stop(), the Func will be called. +func (cg *StopGroup) AddFunc(toAddFunc Func) { cg.stoppablesLock.Lock() defer cg.stoppablesLock.Unlock() diff --git a/storage.go b/storage.go index 8d7056c..02c719a 100644 --- a/storage.go +++ b/storage.go @@ -10,7 +10,7 @@ import ( // ErrResourceDoesNotExist is the error returned by all delete methods in the // store if the requested resource does not exist. -var ErrResourceDoesNotExist = bittorrent.ClientError(errors.New("resource does not exist")) +var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") // PeerStore is an interface that abstracts the interactions of storing and // manipulating Peers such that it can be implemented for various data stores. @@ -68,7 +68,7 @@ type PeerStore interface { // PeerStore. type PeerStoreConstructor func(interface{}) (PeerStore, error) -var peerStores = make(map[string]PeerStoreConstructors) +var peerStores = make(map[string]PeerStoreConstructor) // RegisterPeerStore makes a PeerStoreConstructor available by the provided // name. @@ -80,7 +80,7 @@ func RegisterPeerStore(name string, con PeerStoreConstructor) { panic("trakr: could not register nil PeerStoreConstructor") } - if _, dup := peerStore[name]; dup { + if _, dup := peerStores[name]; dup { panic("trakr: could not register duplicate PeerStoreConstructor: " + name) } @@ -88,7 +88,7 @@ func RegisterPeerStore(name string, con PeerStoreConstructor) { } // NewPeerStore creates an instance of the given PeerStore by name. -func NewPeerStore(name, config interface{}) (PeerStore, error) { +func NewPeerStore(name string, config interface{}) (PeerStore, error) { con, ok := peerStores[name] if !ok { return nil, fmt.Errorf("trakr: unknown PeerStore %q (forgotten import?)", name) diff --git a/tracker.go b/tracker.go index 4cc86cb..27d2686 100644 --- a/tracker.go +++ b/tracker.go @@ -17,22 +17,93 @@ // has been delievered to a BitTorrent client. package trakr +import ( + "errors" + "io" + "io/ioutil" + "os" + "time" + + "github.com/jzelinskie/trakr/bittorrent/http" + "github.com/jzelinskie/trakr/bittorrent/udp" + "gopkg.in/yaml.v2" +) + +// GenericConfig is a block of configuration who's structure is unknown. +type GenericConfig struct { + name string `yaml:"name"` + config interface{} `yaml:"config"` +} + // MultiTracker is a multi-protocol, customizable BitTorrent Tracker. type MultiTracker struct { - HTTPConfig http.Config - UDPConfig udp.Config - AnnounceInterval time.Duration - GCInterval time.Duration - GCExpiration time.Duration - PreHooks []Hook - PostHooks []Hook + AnnounceInterval time.Duration `yaml:"announce_interval"` + GCInterval time.Duration `yaml:"gc_interval"` + GCExpiration time.Duration `yaml:"gc_expiration"` + HTTPConfig http.Config `yaml:"http"` + UDPConfig udp.Config `yaml:"udp"` + PeerStoreConfig []GenericConfig `yaml:"storage"` + PreHooks []GenericConfig `yaml:"prehooks"` + PostHooks []GenericConfig `yaml:"posthooks"` + peerStore PeerStore httpTracker http.Tracker udpTracker udp.Tracker } +// decodeConfigFile unmarshals an io.Reader into a new MultiTracker. +func decodeConfigFile(r io.Reader) (*MultiTracker, error) { + contents, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + cfgFile := struct { + mt MultiTracker `yaml:"trakr"` + }{} + err = yaml.Unmarshal(contents, cfgFile) + if err != nil { + return nil, err + } + + return &cfgFile.mt, nil +} + +// MultiTrackerFromFile returns a new MultiTracker given the path to a YAML +// configuration file. +// +// It supports relative and absolute paths and environment variables. +func MultiTrackerFromFile(path string) (*MultiTracker, error) { + if path == "" { + return nil, errors.New("no config path specified") + } + + f, err := os.Open(os.ExpandEnv(path)) + if err != nil { + return nil, err + } + defer f.Close() + + cfg, err := decodeConfigFile(f) + if err != nil { + return nil, err + } + + return cfg, nil +} + +// Stop provides a thread-safe way to shutdown a currently running +// MultiTracker. +func (t *MultiTracker) Stop() { +} + // ListenAndServe listens on the protocols and addresses specified in the // HTTPConfig and UDPConfig then blocks serving BitTorrent requests until // t.Stop() is called or an error is returned. func (t *MultiTracker) ListenAndServe() error { + // Build an TrackerFuncs from the PreHooks and PostHooks. + // Create a PeerStore instance. + // Create a HTTP Tracker instance. + // Create a UDP Tracker instance. + return nil }