diff --git a/server/announce.go b/server/announce.go new file mode 100644 index 0000000..513ecc3 --- /dev/null +++ b/server/announce.go @@ -0,0 +1,111 @@ +// Copyright 2013 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package server + +import ( + "errors" + "net/http" + "path" + "strconv" + + "github.com/chihaya/chihaya/config" +) + +// announce represents all of the data from an announce request. +type announce struct { + Compact bool + Downloaded uint64 + Event string + IP string + Infohash string + Left uint64 + NumWant int + Passkey string + PeerID string + Port uint64 + Uploaded uint64 +} + +// newAnnounce parses an HTTP request and generates an Announce. +func newAnnounce(r *http.Request, conf *config.Config) (*announce, error) { + pq, err := parseQuery(r.URL.RawQuery) + if err != nil { + return nil, err + } + + compact := pq.Params["compact"] == "1" + downloaded, downloadedErr := pq.getUint64("downloaded") + event, _ := pq.Params["event"] + infohash, _ := pq.Params["info_hash"] + ip, _ := requestedIP(r, pq) + left, leftErr := pq.getUint64("left") + numWant := requestedPeerCount(conf.DefaultNumWant, pq) + passkey, _ := path.Split(r.URL.Path) + peerID, _ := pq.Params["peer_id"] + port, portErr := pq.getUint64("port") + uploaded, uploadedErr := pq.getUint64("uploaded") + + if downloadedErr != nil || + infohash == "" || + leftErr != nil || + peerID == "" || + portErr != nil || + uploadedErr != nil || + ip == "" { + return nil, errors.New("malformed request") + } + + return &announce{ + Compact: compact, + Downloaded: downloaded, + Event: event, + IP: ip, + Left: left, + NumWant: numWant, + Passkey: passkey, + PeerID: peerID, + Port: port, + Uploaded: uploaded, + }, nil +} + +func requestedPeerCount(fallback int, pq *parsedQuery) int { + if numWantStr, exists := pq.Params["numWant"]; exists { + numWant, err := strconv.Atoi(numWantStr) + if err != nil { + return fallback + } + return numWant + } + + return fallback +} + +func requestedIP(r *http.Request, pq *parsedQuery) (string, error) { + if ip, ok := pq.Params["ip"]; ok { + return ip, nil + } + + if ip, ok := pq.Params["ipv4"]; ok { + return ip, nil + } + + if xRealIPs, ok := pq.Params["X-Real-Ip"]; ok { + return string(xRealIPs[0]), nil + } + + portIndex := len(r.RemoteAddr) - 1 + for ; portIndex >= 0; portIndex-- { + if r.RemoteAddr[portIndex] == ':' { + break + } + } + + if portIndex != -1 { + return r.RemoteAddr[0:portIndex], nil + } + + return "", errors.New("failed to parse IP address") +} diff --git a/server/bencode.go b/server/bencode.go index a1c1864..21220ce 100644 --- a/server/bencode.go +++ b/server/bencode.go @@ -12,21 +12,27 @@ func writeBencoded(w io.Writer, data interface{}) { case string: str := fmt.Sprintf("%s:%s", strconv.Itoa(len(v)), v) io.WriteString(w, str) + case int: str := fmt.Sprintf("i%se", strconv.Itoa(v)) io.WriteString(w, str) + case uint: str := fmt.Sprintf("i%se", strconv.FormatUint(uint64(v), 10)) io.WriteString(w, str) + case int64: str := fmt.Sprintf("i%se", strconv.FormatInt(v, 10)) io.WriteString(w, str) + case uint64: str := fmt.Sprintf("i%se", strconv.FormatUint(v, 10)) io.WriteString(w, str) + case time.Duration: // Assume seconds str := fmt.Sprintf("i%se", strconv.FormatInt(int64(v/time.Second), 10)) io.WriteString(w, str) + case map[string]interface{}: io.WriteString(w, "d") for key, val := range v { @@ -35,15 +41,17 @@ func writeBencoded(w io.Writer, data interface{}) { writeBencoded(w, val) } io.WriteString(w, "e") + case []string: io.WriteString(w, "l") for _, val := range v { writeBencoded(w, val) } io.WriteString(w, "e") + default: // Although not currently necessary, // should handle []interface{} manually; Go can't do it implicitly - panic("Tried to bencode an unsupported type!") + panic("tried to bencode an unsupported type!") } } diff --git a/server/query.go b/server/query.go index 2e72e73..7331e37 100644 --- a/server/query.go +++ b/server/query.go @@ -16,15 +16,18 @@ type parsedQuery struct { Params map[string]string } +// getUint64 is a helper to obtain a uint64 from a parsedQuery. func (pq *parsedQuery) getUint64(key string) (uint64, error) { str, exists := pq.Params[key] if !exists { - return 0, errors.New("Value does not exist for key: " + key) + return 0, errors.New("value does not exist for key: " + key) } + val, err := strconv.ParseUint(str, 10, 64) if err != nil { return 0, err } + return val, nil } @@ -63,6 +66,7 @@ func parseQuery(query string) (*parsedQuery, error) { if err != nil { return nil, err } + valStr, err := url.QueryUnescape(query[valStart : valEnd+1]) if err != nil { return nil, err @@ -85,6 +89,7 @@ func parseQuery(query string) (*parsedQuery, error) { onKey = true keyStart = i + 1 + } else if query[i] == '=' { onKey = false valStart = i + 1 @@ -94,5 +99,6 @@ func parseQuery(query string) (*parsedQuery, error) { valEnd = i } } + return pq, nil } diff --git a/server/query_test.go b/server/query_test.go index 4f573cb..b0cabd4 100644 --- a/server/query_test.go +++ b/server/query_test.go @@ -33,12 +33,14 @@ func mapArrayEqual(boxed map[string][]string, unboxed map[string]string) bool { if len(boxed) != len(unboxed) { return false } + for mapKey, mapVal := range boxed { // Always expect box to hold only one element if len(mapVal) != 1 || mapVal[0] != unboxed[mapKey] { return false } } + return true } @@ -48,6 +50,7 @@ func TestValidQueries(t *testing.T) { if err != nil { t.Error(err) } + if !mapArrayEqual(parseVal, parsedQueryObj.Params) { t.Errorf("Incorrect parse at item %d.\n Expected=%v\n Recieved=%v\n", parseIndex, parseVal, parsedQueryObj.Params) } @@ -60,6 +63,7 @@ func TestInvalidQueries(t *testing.T) { if err == nil { t.Error("Should have produced error", parseIndex) } + if parsedQueryObj != nil { t.Error("Should be nil after error", parsedQueryObj, parseIndex) } diff --git a/server/serve_announce.go b/server/serve_announce.go index 0ff2f86..b60b4ac 100644 --- a/server/serve_announce.go +++ b/server/serve_announce.go @@ -8,76 +8,16 @@ import ( "errors" "log" "net/http" - "path" "strconv" "time" - "github.com/chihaya/chihaya/config" "github.com/chihaya/chihaya/storage" "github.com/chihaya/chihaya/storage/backend" ) -// announce represents all of the data from an announce request. -type announce struct { - Compact bool - Downloaded uint64 - Event string - IP string - Infohash string - Left uint64 - NumWant int - Passkey string - PeerID string - Port uint64 - Uploaded uint64 -} - -// newAnnounce parses an HTTP request and generates an Announce. -func newAnnounce(r *http.Request, conf *config.Config) (*announce, error) { - pq, err := parseQuery(r.URL.RawQuery) - if err != nil { - return nil, err - } - - compact := pq.Params["compact"] == "1" - downloaded, downloadedErr := pq.getUint64("downloaded") - event, _ := pq.Params["event"] - infohash, _ := pq.Params["info_hash"] - ip, _ := requestedIP(r, pq) - left, leftErr := pq.getUint64("left") - numWant := requestedPeerCount(conf.DefaultNumWant, pq) - passkey, _ := path.Split(r.URL.Path) - peerID, _ := pq.Params["peer_id"] - port, portErr := pq.getUint64("port") - uploaded, uploadedErr := pq.getUint64("uploaded") - - if downloadedErr != nil || - infohash == "" || - leftErr != nil || - peerID == "" || - portErr != nil || - uploadedErr != nil || - ip == "" { - return nil, errors.New("malformed request") - } - - return &announce{ - Compact: compact, - Downloaded: downloaded, - Event: event, - IP: ip, - Left: left, - NumWant: numWant, - Passkey: passkey, - PeerID: peerID, - Port: port, - Uploaded: uploaded, - }, nil -} - func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { // Parse the required data from a request - ann, err := newAnnounce(r, s.conf) + announce, err := newAnnounce(r, s.conf) if err != nil { fail(err, w, r) return @@ -90,14 +30,14 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // Validate the user's passkey - user, err := validateUser(conn, ann.Passkey) + user, err := validateUser(conn, announce.Passkey) if err != nil { fail(err, w, r) return } // Check if the user's client is whitelisted - whitelisted, err := conn.ClientWhitelisted(parsePeerID(ann.PeerID)) + whitelisted, err := conn.ClientWhitelisted(parsePeerID(announce.PeerID)) if err != nil { log.Panicf("server: %s", err) } @@ -107,7 +47,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // Find the specified torrent - torrent, exists, err := conn.FindTorrent(ann.Infohash) + torrent, exists, err := conn.FindTorrent(announce.Infohash) if err != nil { log.Panicf("server: %s", err) } @@ -117,7 +57,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } // If the torrent was pruned and the user is seeding, unprune it - if !torrent.Active && ann.Left == 0 { + if !torrent.Active && announce.Left == 0 { err := conn.MarkActive(torrent) if err != nil { log.Panicf("server: %s", err) @@ -127,17 +67,16 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { now := time.Now().Unix() // Create a new peer object from the request peer := &storage.Peer{ - ID: ann.PeerID, + ID: announce.PeerID, UserID: user.ID, TorrentID: torrent.ID, - IP: ann.IP, - Port: ann.Port, - Uploaded: ann.Uploaded, - Downloaded: ann.Downloaded, - Left: ann.Left, + IP: announce.IP, + Port: announce.Port, + Uploaded: announce.Uploaded, + Downloaded: announce.Downloaded, + Left: announce.Left, LastAnnounce: now, } - delta := &backend.AnnounceDelta{ Peer: peer, Torrent: torrent, @@ -152,7 +91,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { switch { // Guarantee that no user is in both pools case seeder && leecher: - if ann.Left == 0 { + if announce.Left == 0 { err := conn.RemoveLeecher(torrent, peer) if err != nil { log.Panicf("server: %s", err) @@ -181,7 +120,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } default: - if ann.Left == 0 { + if announce.Left == 0 { // Save the peer as a new seeder err := conn.AddSeeder(torrent, peer) if err != nil { @@ -198,7 +137,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { // Handle any events in the request switch { - case ann.Event == "stopped" || ann.Event == "paused": + case announce.Event == "stopped" || announce.Event == "paused": if seeder { err := conn.RemoveSeeder(torrent, peer) if err != nil { @@ -212,7 +151,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } } - case ann.Event == "completed": + case announce.Event == "completed": err := conn.RecordSnatch(user, torrent) if err != nil { log.Panicf("server: %s", err) @@ -225,7 +164,7 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } } - case leecher && ann.Left == 0: + case leecher && announce.Left == 0: // A leecher completed but the event was never received err := conn.LeecherFinished(torrent, peer) if err != nil { @@ -233,9 +172,9 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } } - if ann.IP != peer.IP || ann.Port != peer.Port { - peer.Port = ann.Port - peer.IP = ann.IP + if announce.IP != peer.IP || announce.Port != peer.Port { + peer.Port = announce.Port + peer.IP = announce.IP } // Generate the response @@ -252,15 +191,15 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { writeBencoded(w, "min interval") writeBencoded(w, s.conf.MinAnnounce.Duration) - if ann.NumWant > 0 && ann.Event != "stopped" && ann.Event != "paused" { + if announce.NumWant > 0 && announce.Event != "stopped" && announce.Event != "paused" { writeBencoded(w, "peers") var peerCount, count int - if ann.Compact { - if ann.Left > 0 { - peerCount = minInt(ann.NumWant, leechCount) + if announce.Compact { + if announce.Left > 0 { + peerCount = minInt(announce.NumWant, leechCount) } else { - peerCount = minInt(ann.NumWant, leechCount+seedCount-1) + peerCount = minInt(announce.NumWant, leechCount+seedCount-1) } writeBencoded(w, strconv.Itoa(peerCount*6)) writeBencoded(w, ":") @@ -268,27 +207,27 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { writeBencoded(w, "l") } - if ann.Left > 0 { + if announce.Left > 0 { // If they're seeding, give them only leechers - count += writeLeechers(w, user, torrent, ann.NumWant, ann.Compact) + count += writeLeechers(w, user, torrent, announce.NumWant, announce.Compact) } else { // If they're leeching, prioritize giving them seeders - count += writeSeeders(w, user, torrent, ann.NumWant, ann.Compact) - count += writeLeechers(w, user, torrent, ann.NumWant-count, ann.Compact) + count += writeSeeders(w, user, torrent, announce.NumWant, announce.Compact) + count += writeLeechers(w, user, torrent, announce.NumWant-count, announce.Compact) } - if ann.Compact && peerCount != count { - log.Panicf("Calculated peer count (%d) != real count (%d)", peerCount, count) + if announce.Compact && peerCount != count { + log.Panicf("calculated peer count (%d) != real count (%d)", peerCount, count) } - if !ann.Compact { + if !announce.Compact { writeBencoded(w, "e") } } writeBencoded(w, "e") - rawDeltaUp := peer.Uploaded - ann.Uploaded - rawDeltaDown := peer.Downloaded - ann.Downloaded + rawDeltaUp := peer.Uploaded - announce.Uploaded + rawDeltaDown := peer.Downloaded - announce.Downloaded // Restarting a torrent may cause a delta to be negative. if rawDeltaUp < 0 { @@ -304,44 +243,11 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { s.backendConn.RecordAnnounce(delta) } -func requestedPeerCount(fallback int, pq *parsedQuery) int { - if numWantStr, exists := pq.Params["numWant"]; exists { - numWant, err := strconv.Atoi(numWantStr) - if err != nil { - return fallback - } - return numWant - } - return fallback -} - -func requestedIP(r *http.Request, pq *parsedQuery) (string, error) { - if ip, ok := pq.Params["ip"]; ok { - return ip, nil - } - if ip, ok := pq.Params["ipv4"]; ok { - return ip, nil - } - if xRealIPs, ok := pq.Params["X-Real-Ip"]; ok { - return string(xRealIPs[0]), nil - } - - portIndex := len(r.RemoteAddr) - 1 - for ; portIndex >= 0; portIndex-- { - if r.RemoteAddr[portIndex] == ':' { - break - } - } - if portIndex != -1 { - return r.RemoteAddr[0:portIndex], nil - } - return "", errors.New("failed to parse IP address") -} - func minInt(a, b int) int { if a < b { return a } + return b } @@ -351,9 +257,11 @@ func writeSeeders(w http.ResponseWriter, user *storage.User, t *storage.Torrent, if count >= numWant { break } + if peer.UserID == user.ID { continue } + if compact { // TODO writeBencoded(w, compactAddr) } else { @@ -368,6 +276,7 @@ func writeSeeders(w http.ResponseWriter, user *storage.User, t *storage.Torrent, } count++ } + return count } @@ -377,9 +286,11 @@ func writeLeechers(w http.ResponseWriter, user *storage.User, t *storage.Torrent if count >= numWant { break } + if peer.UserID == user.ID { continue } + if compact { // TODO writeBencoded(w, compactAddr) } else { @@ -394,5 +305,6 @@ func writeLeechers(w http.ResponseWriter, user *storage.User, t *storage.Torrent } count++ } + return count } diff --git a/server/server.go b/server/server.go index 513059d..447ae06 100644 --- a/server/server.go +++ b/server/server.go @@ -26,13 +26,15 @@ import ( // Server represents BitTorrent tracker server. type Server struct { - conf *config.Config + conf *config.Config + + // These are open connections. listener *stoppableListener.StoppableListener trackerPool tracker.Pool backendConn backend.Conn - startTime time.Time - + // These are for collecting stats. + startTime time.Time deltaRequests int64 rpm int64 @@ -45,10 +47,12 @@ func New(conf *config.Config) (*Server, error) { if err != nil { return nil, err } + backendConn, err := backend.Open(&conf.Backend) if err != nil { return nil, err } + err = backendConn.Start() if err != nil { return nil, err @@ -74,6 +78,7 @@ func (s *Server) ListenAndServe() error { if err != nil { return err } + sl := stoppableListener.Handle(l) s.listener = sl s.startTime = time.Now() @@ -86,11 +91,19 @@ func (s *Server) ListenAndServe() error { // Stop cleanly ends the handling of incoming HTTP requests. func (s *Server) Stop() error { + // Wait for current requests to finish being handled. s.listener.Stop <- true + err := s.trackerPool.Close() if err != nil { return err } + + err = s.backendConn.Close() + if err != nil { + return err + } + return s.listener.Close() } @@ -117,17 +130,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { func fail(err error, w http.ResponseWriter, r *http.Request) { errmsg := err.Error() - message := "d14:failure reason" + strconv.Itoa(len(errmsg)) + ":" + errmsg + "e" - length, _ := io.WriteString(w, message) + msg := "d14:failure reason" + strconv.Itoa(len(errmsg)) + ":" + errmsg + "e" + length, _ := io.WriteString(w, msg) w.Header().Add("Content-Length", string(length)) w.(http.Flusher).Flush() } func validateUser(conn tracker.Conn, dir string) (*storage.User, error) { + passkey := dir[1:33] if len(dir) != 34 { return nil, errors.New("passkey is invalid") } - passkey := dir[1:33] user, exists, err := conn.FindUser(passkey) if err != nil { @@ -140,7 +153,7 @@ func validateUser(conn tracker.Conn, dir string) (*storage.User, error) { return user, nil } -// Takes a peer_id and returns a ClientID +// parsePeerID returns the clientID for a given peerID. func parsePeerID(peerID string) (clientID string) { length := len(peerID) if length >= 6 { @@ -152,5 +165,6 @@ func parsePeerID(peerID string) (clientID string) { clientID = peerID[0:6] } } + return }