initial announce response

This commit is contained in:
Jimmy Zelinskie 2013-08-04 15:56:31 -04:00
parent 2a639c2f3b
commit a6d04f6790
4 changed files with 276 additions and 154 deletions

View file

@ -41,6 +41,7 @@ type Storage struct {
MaxIdleConn int `json:"max_idle_conn"` MaxIdleConn int `json:"max_idle_conn"`
IdleTimeout *Duration `json:"idle_timeout"` IdleTimeout *Duration `json:"idle_timeout"`
ConnTimeout *Duration `json:"conn_timeout"` ConnTimeout *Duration `json:"conn_timeout"`
TxRetries int `json:"tx_retries"`
} }
// Config represents a configuration for a server.Server. // Config represents a configuration for a server.Server.

View file

@ -25,7 +25,8 @@ var exampleJson = `{
"max_idle_conn": 3, "max_idle_conn": 3,
"idle_timeout": "240s", "idle_timeout": "240s",
"conn_timeout": "5s" "conn_timeout": "5s",
"tx_retries": 3
}, },
"private": true, "private": true,
@ -83,7 +84,7 @@ func TestOpenCurDir(t *testing.T) {
} }
func TestOpenAbsEnvPath(t *testing.T) { func TestOpenAbsEnvPath(t *testing.T) {
if !testing.Short() { if !testing.Short() {
writeAndOpenJsonTest(t, filepath.Join(os.TempDir(),"testConfig.json")) writeAndOpenJsonTest(t, filepath.Join(os.TempDir(), "testConfig.json"))
} else { } else {
t.Log("Write/Read file test skipped") t.Log("Write/Read file test skipped")
} }

View file

@ -14,6 +14,7 @@ import (
"github.com/pushrax/chihaya/config" "github.com/pushrax/chihaya/config"
"github.com/pushrax/chihaya/server" "github.com/pushrax/chihaya/server"
_ "github.com/pushrax/chihaya/storage/redis"
) )
var ( var (

View file

@ -23,150 +23,165 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
// Start a transaction // Retry failed transactions a specified number of times
tx, err := s.dbConnPool.Get() for i := 0; i < s.conf.Storage.TxRetries; i++ {
if err != nil {
log.Panicf("server: %s", err)
}
// Validate the user's passkey // Start a transaction
passkey, _ := path.Split(r.URL.Path) tx, err := s.dbConnPool.Get()
user, err := validateUser(tx, passkey)
if err != nil {
fail(err, w, r)
return
}
// Check if the user's client is whitelisted
whitelisted, err := tx.ClientWhitelisted(peerID)
if err != nil {
log.Panicf("server: %s", err)
}
if !whitelisted {
fail(errors.New("Your client is not approved"), w, r)
return
}
// Find the specified torrent
torrent, exists, err := tx.FindTorrent(infohash)
if err != nil {
log.Panicf("server: %s", err)
}
if !exists {
fail(errors.New("This torrent does not exist"), w, r)
return
}
// If the torrent was pruned and the user is seeding, unprune it
if !torrent.Active && left == 0 {
err := tx.MarkActive(torrent)
if err != nil {
log.Panicf("server: %s", err)
}
}
// Create a new peer object from the request
peer := &storage.Peer{
ID: peerID,
UserID: user.ID,
TorrentID: torrent.ID,
IP: ip,
Port: port,
Uploaded: uploaded,
Downloaded: downloaded,
Left: left,
LastAnnounce: time.Now().Unix(),
}
// Look for the user in in the pool of seeders and leechers
_, seeder := torrent.Seeders[peerID]
_, leecher := torrent.Leechers[peerID]
switch {
// Guarantee that no user is in both pools
case seeder && leecher:
if left == 0 {
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
leecher = false
} else {
err := tx.RmSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
seeder = false
}
case seeder:
// Update the peer with the stats from the request
err := tx.SetSeeder(torrent, peer)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
case leecher: // Validate the user's passkey
// Update the peer with the stats from the request passkey, _ := path.Split(r.URL.Path)
err := tx.SetLeecher(torrent, peer) user, err := validateUser(tx, passkey)
if err != nil {
fail(err, w, r)
return
}
// Check if the user's client is whitelisted
whitelisted, err := tx.ClientWhitelisted(peerID)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
if !whitelisted {
default: fail(errors.New("Your client is not approved"), w, r)
// Check the user's slots to see if they're allowed to leech return
if s.conf.Slots && user.Slots != -1 && left != 0 {
if user.SlotsUsed >= user.Slots {
fail(errors.New("You've run out of download slots."), w, r)
return
}
} }
if left == 0 { // Find the specified torrent
// Save the peer as a new seeder torrent, exists, err := tx.FindTorrent(infohash)
err := tx.NewSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
} else {
// Save the peer as a new leecher and increment the user's slots
err := tx.IncrementSlots(user)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.NewLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
}
// Handle any events in the request
switch {
case event == "stopped" || event == "paused":
if seeder {
err := tx.RmSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
if leecher {
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.DecrementSlots(user)
if err != nil {
log.Panicf("server: %s", err)
}
}
case event == "completed":
err := tx.Snatch(user, torrent)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
if leecher { if !exists {
fail(errors.New("This torrent does not exist"), w, r)
return
}
// If the torrent was pruned and the user is seeding, unprune it
if !torrent.Active && left == 0 {
err := tx.MarkActive(torrent)
if err != nil {
log.Panicf("server: %s", err)
}
}
// Create a new peer object from the request
peer := &storage.Peer{
ID: peerID,
UserID: user.ID,
TorrentID: torrent.ID,
IP: ip,
Port: port,
Uploaded: uploaded,
Downloaded: downloaded,
Left: left,
LastAnnounce: time.Now().Unix(),
}
// Look for the user in in the pool of seeders and leechers
_, seeder := torrent.Seeders[peerID]
_, leecher := torrent.Leechers[peerID]
switch {
// Guarantee that no user is in both pools
case seeder && leecher:
if left == 0 {
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
leecher = false
} else {
err := tx.RmSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
seeder = false
}
case seeder:
// Update the peer with the stats from the request
err := tx.SetSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
case leecher:
// Update the peer with the stats from the request
err := tx.SetLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
default:
// Check the user's slots to see if they're allowed to leech
if s.conf.Slots && user.Slots != -1 && left != 0 {
if user.SlotsUsed >= user.Slots {
fail(errors.New("You've run out of download slots."), w, r)
return
}
}
if left == 0 {
// Save the peer as a new seeder
err := tx.NewSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
} else {
// Save the peer as a new leecher and increment the user's slots
err := tx.IncrementSlots(user)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.NewLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
}
// Handle any events in the request
switch {
case event == "stopped" || event == "paused":
if seeder {
err := tx.RmSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
if leecher {
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.DecrementSlots(user)
if err != nil {
log.Panicf("server: %s", err)
}
}
case event == "completed":
err := tx.Snatch(user, torrent)
if err != nil {
log.Panicf("server: %s", err)
}
if leecher {
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.NewSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
case leecher && left == 0:
// A leecher completed but the event was never received
err := tx.RmLeecher(torrent, peer) err := tx.RmLeecher(torrent, peer)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
@ -177,19 +192,68 @@ func (s Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
} }
} }
case leecher && left == 0: if ip != peer.IP || port != peer.Port {
// A leecher completed but the event was never received peer.Port = port
err := tx.RmLeecher(torrent, peer) peer.IP = ip
if err != nil {
log.Panicf("server: %s", err)
} }
err = tx.NewSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
// TODO compact, response, etc... // If the transaction failed, retry
err = tx.Commit()
if err != nil {
continue
}
// Generate the response
seedCount := len(torrent.Seeders)
leechCount := len(torrent.Leechers)
writeBencoded(w, "d")
writeBencoded(w, "complete")
writeBencoded(w, seedCount)
writeBencoded(w, "incomplete")
writeBencoded(w, leechCount)
writeBencoded(w, "interval")
writeBencoded(w, s.conf.Announce.Duration)
writeBencoded(w, "min interval")
writeBencoded(w, s.conf.MinAnnounce.Duration)
if numWant > 0 && event != "stopped" && event != "paused" {
writeBencoded(w, "peers")
var peerCount, count int
if compact {
if left > 0 {
peerCount = minInt(numWant, leechCount)
} else {
peerCount = minInt(numWant, leechCount+seedCount-1)
}
writeBencoded(w, strconv.Itoa(peerCount*6))
writeBencoded(w, ":")
} else {
writeBencoded(w, "l")
}
if left > 0 {
// If they're seeding, give them only leechers
writeLeechers(w, torrent, count, numWant, compact)
} else {
// If they're leeching, prioritize giving them seeders
writeSeeders(w, torrent, count, numWant, compact)
writeLeechers(w, torrent, count, numWant, compact)
}
if compact && peerCount != count {
log.Panicf("Calculated peer count (%d) != real count (%d)", peerCount, count)
}
if !compact {
writeBencoded(w, "e")
}
}
writeBencoded(w, "e")
return
}
} }
func (s Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) { func (s Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) {
@ -228,19 +292,26 @@ func determineNumWant(fallback int, pq *parsedQuery) int {
return fallback return fallback
} }
return numWant return numWant
} else {
return fallback
} }
return fallback
} }
func determineIP(r *http.Request, pq *parsedQuery) (string, error) { func determineIP(r *http.Request, pq *parsedQuery) (string, error) {
if ip, ok := pq.Params["ip"]; ok { ip, ok := pq.Params["ip"]
ipv4, okv4 := pq.Params["ipv4"]
xRealIPs, xRealOk := pq.Params["X-Real-Ip"]
switch {
case ok:
return ip, nil return ip, nil
} else if ip, ok := pq.Params["ipv4"]; ok {
return ip, nil case okv4:
} else if ips, ok := pq.Params["X-Real-Ip"]; ok && len(ips) > 0 { return ipv4, nil
return string(ips[0]), nil
} else { case xRealOk && len(xRealIPs) > 0:
return string(xRealIPs[0]), nil
default:
portIndex := len(r.RemoteAddr) - 1 portIndex := len(r.RemoteAddr) - 1
for ; portIndex >= 0; portIndex-- { for ; portIndex >= 0; portIndex-- {
if r.RemoteAddr[portIndex] == ':' { if r.RemoteAddr[portIndex] == ':' {
@ -249,8 +320,56 @@ func determineIP(r *http.Request, pq *parsedQuery) (string, error) {
} }
if portIndex != -1 { if portIndex != -1 {
return r.RemoteAddr[0:portIndex], nil return r.RemoteAddr[0:portIndex], nil
} else {
return "", errors.New("Failed to parse IP address")
} }
return "", errors.New("Failed to parse IP address")
}
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func writeSeeders(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) {
for _, seed := range t.Seeders {
if count >= numWant {
break
}
if compact {
// TODO writeBencoded(w, compactAddr)
} else {
writeBencoded(w, "d")
writeBencoded(w, "ip")
writeBencoded(w, seed.IP)
writeBencoded(w, "peer id")
writeBencoded(w, seed.ID)
writeBencoded(w, "port")
writeBencoded(w, seed.Port)
writeBencoded(w, "e")
}
count++
}
}
func writeLeechers(w http.ResponseWriter, t *storage.Torrent, count, numWant int, compact bool) {
for _, leech := range t.Leechers {
if count >= numWant {
break
}
if compact {
// TODO writeBencoded(w, compactAddr)
} else {
writeBencoded(w, "d")
writeBencoded(w, "ip")
writeBencoded(w, leech.IP)
writeBencoded(w, "peer id")
writeBencoded(w, leech.ID)
writeBencoded(w, "port")
writeBencoded(w, leech.Port)
writeBencoded(w, "e")
}
count++
} }
} }