From a6d04f67909368f494fbeb8c1389e89c361c5417 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Sun, 4 Aug 2013 15:56:31 -0400 Subject: [PATCH] initial announce response --- config/config.go | 1 + config/config_test.go | 5 +- main.go | 1 + server/announce.go | 423 +++++++++++++++++++++++++++--------------- 4 files changed, 276 insertions(+), 154 deletions(-) diff --git a/config/config.go b/config/config.go index 4bd8d31..ae98753 100644 --- a/config/config.go +++ b/config/config.go @@ -41,6 +41,7 @@ type Storage struct { MaxIdleConn int `json:"max_idle_conn"` IdleTimeout *Duration `json:"idle_timeout"` ConnTimeout *Duration `json:"conn_timeout"` + TxRetries int `json:"tx_retries"` } // Config represents a configuration for a server.Server. diff --git a/config/config_test.go b/config/config_test.go index c137931..7447271 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -25,7 +25,8 @@ var exampleJson = `{ "max_idle_conn": 3, "idle_timeout": "240s", - "conn_timeout": "5s" + "conn_timeout": "5s", + "tx_retries": 3 }, "private": true, @@ -83,7 +84,7 @@ func TestOpenCurDir(t *testing.T) { } func TestOpenAbsEnvPath(t *testing.T) { if !testing.Short() { - writeAndOpenJsonTest(t, filepath.Join(os.TempDir(),"testConfig.json")) + writeAndOpenJsonTest(t, filepath.Join(os.TempDir(), "testConfig.json")) } else { t.Log("Write/Read file test skipped") } diff --git a/main.go b/main.go index 0b9e772..6511dae 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/server" + _ "github.com/pushrax/chihaya/storage/redis" ) var ( diff --git a/server/announce.go b/server/announce.go index 40d4c72..f88f6bf 100644 --- a/server/announce.go +++ b/server/announce.go @@ -23,150 +23,165 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } - // Start a transaction - tx, err := s.dbConnPool.Get() - if err != nil { - log.Panicf("server: %s", err) - } + // Retry failed transactions a specified number of times + for i := 0; i < s.conf.Storage.TxRetries; i++ { - // Validate the user's passkey - passkey, _ := path.Split(r.URL.Path) - user, err := validateUser(tx, passkey) - if err != nil { - fail(err, w, r) - return - } - - // Check if the user's client is whitelisted - whitelisted, err := tx.ClientWhitelisted(peerID) - if err != nil { - log.Panicf("server: %s", err) - } - if !whitelisted { - fail(errors.New("Your client is not approved"), w, r) - return - } - - // Find the specified torrent - torrent, exists, err := tx.FindTorrent(infohash) - if err != nil { - log.Panicf("server: %s", err) - } - if !exists { - fail(errors.New("This torrent does not exist"), w, r) - return - } - - // If the torrent was pruned and the user is seeding, unprune it - if !torrent.Active && left == 0 { - err := tx.MarkActive(torrent) - if err != nil { - log.Panicf("server: %s", err) - } - } - - // Create a new peer object from the request - peer := &storage.Peer{ - ID: peerID, - UserID: user.ID, - TorrentID: torrent.ID, - IP: ip, - Port: port, - Uploaded: uploaded, - Downloaded: downloaded, - Left: left, - LastAnnounce: time.Now().Unix(), - } - - // Look for the user in in the pool of seeders and leechers - _, seeder := torrent.Seeders[peerID] - _, leecher := torrent.Leechers[peerID] - - switch { - // Guarantee that no user is in both pools - case seeder && leecher: - if left == 0 { - err := tx.RmLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - leecher = false - } else { - err := tx.RmSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - seeder = false - } - - case seeder: - // Update the peer with the stats from the request - err := tx.SetSeeder(torrent, peer) + // Start a transaction + tx, err := s.dbConnPool.Get() if err != nil { log.Panicf("server: %s", err) } - case leecher: - // Update the peer with the stats from the request - err := tx.SetLeecher(torrent, peer) + // Validate the user's passkey + passkey, _ := path.Split(r.URL.Path) + user, err := validateUser(tx, passkey) + if err != nil { + fail(err, w, r) + return + } + + // Check if the user's client is whitelisted + whitelisted, err := tx.ClientWhitelisted(peerID) if err != nil { log.Panicf("server: %s", err) } - - default: - // Check the user's slots to see if they're allowed to leech - if s.conf.Slots && user.Slots != -1 && left != 0 { - if user.SlotsUsed >= user.Slots { - fail(errors.New("You've run out of download slots."), w, r) - return - } + if !whitelisted { + fail(errors.New("Your client is not approved"), w, r) + return } - if left == 0 { - // Save the peer as a new seeder - err := tx.NewSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } else { - // Save the peer as a new leecher and increment the user's slots - err := tx.IncrementSlots(user) - if err != nil { - log.Panicf("server: %s", err) - } - err = tx.NewLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } - } - - // Handle any events in the request - switch { - case event == "stopped" || event == "paused": - if seeder { - err := tx.RmSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } - if leecher { - err := tx.RmLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - err = tx.DecrementSlots(user) - if err != nil { - log.Panicf("server: %s", err) - } - } - - case event == "completed": - err := tx.Snatch(user, torrent) + // Find the specified torrent + torrent, exists, err := tx.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) } - if leecher { + if !exists { + fail(errors.New("This torrent does not exist"), w, r) + return + } + + // If the torrent was pruned and the user is seeding, unprune it + if !torrent.Active && left == 0 { + err := tx.MarkActive(torrent) + if err != nil { + log.Panicf("server: %s", err) + } + } + + // Create a new peer object from the request + peer := &storage.Peer{ + ID: peerID, + UserID: user.ID, + TorrentID: torrent.ID, + IP: ip, + Port: port, + Uploaded: uploaded, + Downloaded: downloaded, + Left: left, + LastAnnounce: time.Now().Unix(), + } + + // Look for the user in in the pool of seeders and leechers + _, seeder := torrent.Seeders[peerID] + _, leecher := torrent.Leechers[peerID] + + switch { + // Guarantee that no user is in both pools + case seeder && leecher: + if left == 0 { + err := tx.RmLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + leecher = false + } else { + err := tx.RmSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + seeder = false + } + + case seeder: + // Update the peer with the stats from the request + err := tx.SetSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + + case leecher: + // Update the peer with the stats from the request + err := tx.SetLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + + default: + // Check the user's slots to see if they're allowed to leech + if s.conf.Slots && user.Slots != -1 && left != 0 { + if user.SlotsUsed >= user.Slots { + fail(errors.New("You've run out of download slots."), w, r) + return + } + } + + if left == 0 { + // Save the peer as a new seeder + err := tx.NewSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } else { + // Save the peer as a new leecher and increment the user's slots + err := tx.IncrementSlots(user) + if err != nil { + log.Panicf("server: %s", err) + } + err = tx.NewLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } + } + + // Handle any events in the request + switch { + case event == "stopped" || event == "paused": + if seeder { + err := tx.RmSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } + if leecher { + err := tx.RmLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + err = tx.DecrementSlots(user) + if err != nil { + log.Panicf("server: %s", err) + } + } + + case event == "completed": + err := tx.Snatch(user, torrent) + if err != nil { + log.Panicf("server: %s", err) + } + if leecher { + err := tx.RmLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + err = tx.NewSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } + + case leecher && left == 0: + // A leecher completed but the event was never received err := tx.RmLeecher(torrent, peer) if err != nil { log.Panicf("server: %s", err) @@ -177,19 +192,68 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { } } - case leecher && left == 0: - // A leecher completed but the event was never received - err := tx.RmLeecher(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) + if ip != peer.IP || port != peer.Port { + peer.Port = port + peer.IP = ip } - err = tx.NewSeeder(torrent, peer) - if err != nil { - log.Panicf("server: %s", err) - } - } - // TODO compact, response, etc... + // If the transaction failed, retry + err = tx.Commit() + if err != nil { + continue + } + + // Generate the response + seedCount := len(torrent.Seeders) + leechCount := len(torrent.Leechers) + + writeBencoded(w, "d") + writeBencoded(w, "complete") + writeBencoded(w, seedCount) + writeBencoded(w, "incomplete") + writeBencoded(w, leechCount) + writeBencoded(w, "interval") + writeBencoded(w, s.conf.Announce.Duration) + writeBencoded(w, "min interval") + writeBencoded(w, s.conf.MinAnnounce.Duration) + + if numWant > 0 && event != "stopped" && event != "paused" { + writeBencoded(w, "peers") + var peerCount, count int + + if compact { + if left > 0 { + peerCount = minInt(numWant, leechCount) + } else { + peerCount = minInt(numWant, leechCount+seedCount-1) + } + writeBencoded(w, strconv.Itoa(peerCount*6)) + writeBencoded(w, ":") + } else { + writeBencoded(w, "l") + } + + if left > 0 { + // If they're seeding, give them only leechers + writeLeechers(w, torrent, count, numWant, compact) + } else { + // If they're leeching, prioritize giving them seeders + writeSeeders(w, torrent, count, numWant, compact) + writeLeechers(w, torrent, count, numWant, compact) + } + + if compact && peerCount != count { + log.Panicf("Calculated peer count (%d) != real count (%d)", peerCount, count) + } + + if !compact { + writeBencoded(w, "e") + } + } + writeBencoded(w, "e") + + return + } } func (s Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) { @@ -228,19 +292,26 @@ func determineNumWant(fallback int, pq *parsedQuery) int { return fallback } return numWant - } else { - return fallback } + return fallback } func determineIP(r *http.Request, pq *parsedQuery) (string, error) { - if ip, ok := pq.Params["ip"]; ok { + ip, ok := pq.Params["ip"] + ipv4, okv4 := pq.Params["ipv4"] + xRealIPs, xRealOk := pq.Params["X-Real-Ip"] + + switch { + case ok: return ip, nil - } else if ip, ok := pq.Params["ipv4"]; ok { - return ip, nil - } else if ips, ok := pq.Params["X-Real-Ip"]; ok && len(ips) > 0 { - return string(ips[0]), nil - } else { + + case okv4: + return ipv4, nil + + case xRealOk && len(xRealIPs) > 0: + return string(xRealIPs[0]), nil + + default: portIndex := len(r.RemoteAddr) - 1 for ; portIndex >= 0; portIndex-- { if r.RemoteAddr[portIndex] == ':' { @@ -249,8 +320,56 @@ func determineIP(r *http.Request, pq *parsedQuery) (string, error) { } if portIndex != -1 { return r.RemoteAddr[0:portIndex], nil - } else { - return "", errors.New("Failed to parse IP address") } + return "", errors.New("Failed to parse IP address") + } +} + +func minInt(a, b int) int { + if a < b { + return a + } + return b +} + +func writeSeeders(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) { + for _, seed := range t.Seeders { + if count >= numWant { + break + } + if compact { + // TODO writeBencoded(w, compactAddr) + } else { + writeBencoded(w, "d") + writeBencoded(w, "ip") + writeBencoded(w, seed.IP) + writeBencoded(w, "peer id") + writeBencoded(w, seed.ID) + writeBencoded(w, "port") + writeBencoded(w, seed.Port) + writeBencoded(w, "e") + } + count++ + } +} + +func writeLeechers(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) { + for _, leech := range t.Leechers { + if count >= numWant { + break + } + if compact { + // TODO writeBencoded(w, compactAddr) + } else { + writeBencoded(w, "d") + writeBencoded(w, "ip") + writeBencoded(w, leech.IP) + writeBencoded(w, "peer id") + writeBencoded(w, leech.ID) + writeBencoded(w, "port") + writeBencoded(w, leech.Port) + writeBencoded(w, "e") + } + count++ } }