announce handler progress

This commit is contained in:
Jimmy Zelinskie 2013-07-12 00:36:24 -04:00
parent 5cb4e2fabb
commit 4d4a979864
9 changed files with 313 additions and 188 deletions

View file

@ -50,6 +50,7 @@ type Config struct {
Private bool `json:"private"`
Freeleech bool `json:"freeleech"`
Slots bool `json:"slots"`
Announce Duration `json:"announce"`
MinAnnounce Duration `json:"min_announce"`

View file

@ -11,35 +11,27 @@ import (
"net/http"
"path"
"strconv"
"time"
"github.com/pushrax/chihaya/storage"
)
func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
passkey, _ := path.Split(r.URL.Path)
_, err := s.validatePasskey(passkey)
user, err := s.FindUser(passkey)
if err != nil {
fail(err, w, r)
return
}
pq, err := parseQuery(r.URL.RawQuery)
if err != nil {
fail(errors.New("Error parsing query"), w, r)
return
}
_, err = pq.determineIP(r)
if err != nil {
fail(err, w, r)
return
}
err = pq.validateAnnounceParams()
aq, err := newAnnounceQuery(r)
if err != nil {
fail(errors.New("Malformed request"), w, r)
return
}
ok, err := s.dataStore.ClientWhitelisted(pq.params["peer_id"])
peerID := aq.PeerID()
ok, err := s.dataStore.ClientWhitelisted(peerID)
if err != nil {
log.Panicf("server: %s", err)
}
@ -48,7 +40,7 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return
}
torrent, exists, err := s.dataStore.FindTorrent(pq.params["infohash"])
torrent, exists, err := s.dataStore.FindTorrent(aq.Infohash())
if err != nil {
log.Panicf("server: %s", err)
}
@ -62,34 +54,207 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
log.Panicf("server: %s", err)
}
if left, _ := pq.getUint64("left"); torrent.Status == 1 && left == 0 {
err := tx.UnpruneTorrent(torrent)
left := aq.Left()
if torrent.Pruned && left == 0 {
err := tx.Unprune(torrent.ID)
if err != nil {
log.Panicf("server: %s", err)
}
torrent.Status = 0
} else if torrent.Status != 0 {
fail(
fmt.Errorf(
"This torrent does not exist (status: %d, left: %d)",
torrent.Status,
left,
),
w,
r,
)
} else if torrent.Pruned {
e := fmt.Errorf("This torrent does not exist (pruned: %t, left: %d)", torrent.Pruned, left)
fail(e, w, r)
return
}
var numWant int
if numWantStr, exists := pq.params["numWant"]; exists {
numWant, err := strconv.Atoi(numWantStr)
if err != nil {
numWant = s.conf.DefaultNumWant
_ = aq.NumWant(s.conf.DefaultNumWant)
if s.conf.Slots && user.Slots != -1 && aq.Left() != 0 {
if user.UsedSlots >= user.Slots {
fail(errors.New("You've run out of download slots."), w, r)
return
}
}
_, isLeecher := torrent.Leechers[peerID]
_, isSeeder := torrent.Seeders[peerID]
event := aq.Event()
completed := "completed" == event
if event == "stopped" || event == "paused" {
if left == 0 {
err := tx.RmSeeder(torrent.ID, peerID)
if err != nil {
log.Panicf("server: %s", err)
}
} else {
err := tx.RmLeecher(torrent.ID, peerID)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.DecrementSlots(user.ID)
if err != nil {
log.Panicf("server: %s", err)
}
}
} else if completed {
err := tx.Snatch(user.ID, torrent.ID)
if err != nil {
log.Panicf("server: %s", err)
}
} else {
numWant = s.conf.DefaultNumWant
}
// TODO continue
}
// An AnnounceQuery is a parsedQuery that guarantees the existance
// of parameters required for torrent client announces.
type announceQuery struct {
pq *parsedQuery
ip string
created int64
}
func newAnnounceQuery(r *http.Request) (*announceQuery, error) {
pq, err := parseQuery(r.URL.RawQuery)
if err != nil {
return nil, err
}
infohash, _ := pq.Params["info_hash"]
if infohash == "" {
return nil, errors.New("infohash does not exist")
}
peerId, _ := pq.Params["peer_id"]
if peerId == "" {
return nil, errors.New("peerId does not exist")
}
_, err = pq.getUint64("port")
if err != nil {
return nil, errors.New("port does not exist")
}
_, err = pq.getUint64("uploaded")
if err != nil {
return nil, errors.New("uploaded does not exist")
}
_, err = pq.getUint64("downloaded")
if err != nil {
return nil, errors.New("downloaded does not exist")
}
_, err = pq.getUint64("left")
if err != nil {
return nil, errors.New("left does not exist")
}
aq := &announceQuery{
pq: pq,
created: time.Now().Unix(),
}
aq.ip, err = aq.determineIP(r)
if err != nil {
return nil, err
}
return aq, nil
}
func (aq *announceQuery) Infohash() string {
infohash, _ := aq.pq.Params["info_hash"]
if infohash == "" {
panic("announceQuery missing infohash")
}
return infohash
}
func (aq *announceQuery) PeerID() string {
peerID, _ := aq.pq.Params["peer_id"]
if peerID == "" {
panic("announceQuery missing peer_id")
}
return peerID
}
func (aq *announceQuery) Port() uint64 {
port, err := aq.pq.getUint64("port")
if err != nil {
panic("announceQuery missing port")
}
return port
}
func (aq *announceQuery) IP() string {
return aq.ip
}
func (aq *announceQuery) Uploaded() uint64 {
ul, err := aq.pq.getUint64("uploaded")
if err != nil {
panic("announceQuery missing uploaded")
}
return ul
}
func (aq *announceQuery) Downloaded() uint64 {
dl, err := aq.pq.getUint64("downloaded")
if err != nil {
panic("announceQuery missing downloaded")
}
return dl
}
func (aq *announceQuery) Left() uint64 {
left, err := aq.pq.getUint64("left")
if err != nil {
panic("announceQuery missing left")
}
return left
}
func (aq *announceQuery) Event() string {
return aq.pq.Params["event"]
}
func (aq *announceQuery) determineIP(r *http.Request) (string, error) {
if ip, ok := aq.pq.Params["ip"]; ok {
return ip, nil
} else if ip, ok := aq.pq.Params["ipv4"]; ok {
return ip, nil
} else if ips, ok := aq.pq.Params["X-Real-Ip"]; ok && len(ips) > 0 {
return string(ips[0]), nil
} else {
portIndex := len(r.RemoteAddr) - 1
for ; portIndex >= 0; portIndex-- {
if r.RemoteAddr[portIndex] == ':' {
break
}
}
if portIndex != -1 {
return r.RemoteAddr[0:portIndex], nil
} else {
return "", errors.New("Failed to parse IP address")
}
}
}
func (aq *announceQuery) NumWant(fallback int) int {
if numWantStr, exists := aq.pq.Params["numWant"]; exists {
numWant, err := strconv.Atoi(numWantStr)
if err != nil {
return fallback
}
return numWant
} else {
return fallback
}
}
func (aq *announceQuery) Peer(uid, tid uint64) *storage.Peer {
return &storage.Peer{
ID: aq.PeerID(),
UserID: uid,
TorrentID: tid,
IP: aq.IP(),
Port: aq.Port(),
LastAnnounce: aq.created,
Uploaded: aq.Uploaded(),
Downloaded: aq.Downloaded(),
}
}

View file

@ -6,28 +6,29 @@ package server
import (
"errors"
"net/http"
"net/url"
"strconv"
)
// parsedQuery represents a parsed URL.Query.
type parsedQuery struct {
infohashes []string
params map[string]string
Infohashes []string
Params map[string]string
}
func (pq *parsedQuery) getUint64(key string) (uint64, bool) {
str, exists := pq.params[key]
func (pq *parsedQuery) getUint64(key string) (uint64, error) {
str, exists := pq.Params[key]
if !exists {
return 0, false
return 0, errors.New("Value does not exist for key: " + key)
}
val, err := strconv.ParseUint(str, 10, 64)
if err != nil {
return 0, false
return 0, err
}
return val, true
return val, nil
}
// parseQuery parses a raw url query.
func parseQuery(query string) (*parsedQuery, error) {
var (
keyStart, keyEnd int
@ -38,8 +39,8 @@ func parseQuery(query string) (*parsedQuery, error) {
hasInfohash = false
pq = &parsedQuery{
infohashes: nil,
params: make(map[string]string),
Infohashes: nil,
Params: make(map[string]string),
}
)
@ -67,15 +68,15 @@ func parseQuery(query string) (*parsedQuery, error) {
return nil, err
}
pq.params[keyStr] = valStr
pq.Params[keyStr] = valStr
if keyStr == "info_hash" {
if hasInfohash {
// Multiple infohashes
if pq.infohashes == nil {
pq.infohashes = []string{firstInfohash}
if pq.Infohashes == nil {
pq.Infohashes = []string{firstInfohash}
}
pq.infohashes = append(pq.infohashes, valStr)
pq.Infohashes = append(pq.Infohashes, valStr)
} else {
firstInfohash = valStr
hasInfohash = true
@ -95,54 +96,3 @@ func parseQuery(query string) (*parsedQuery, error) {
}
return pq, nil
}
func (pq *parsedQuery) validateAnnounceParams() error {
infohash, _ := pq.params["info_hash"]
if infohash == "" {
return errors.New("infohash does not exist")
}
peerId, _ := pq.params["peer_id"]
if peerId == "" {
return errors.New("peerId does not exist")
}
_, ok := pq.getUint64("port")
if ok == false {
return errors.New("port does not exist")
}
_, ok = pq.getUint64("uploaded")
if ok == false {
return errors.New("uploaded does not exist")
}
_, ok = pq.getUint64("downloaded")
if ok == false {
return errors.New("downloaded does not exist")
}
_, ok = pq.getUint64("left")
if ok == false {
return errors.New("left does not exist")
}
return nil
}
// TODO IPv6 support
func (pq *parsedQuery) determineIP(r *http.Request) (string, error) {
if ip, ok := pq.params["ip"]; ok {
return ip, nil
} else if ip, ok := pq.params["ipv4"]; ok {
return ip, nil
} else if ips, ok := pq.params["X-Real-Ip"]; ok && len(ips) > 0 {
return string(ips[0]), nil
} else {
portIndex := len(r.RemoteAddr) - 1
for ; portIndex >= 0; portIndex-- {
if r.RemoteAddr[portIndex] == ':' {
break
}
}
if portIndex != -1 {
return r.RemoteAddr[0:portIndex], nil
} else {
return "", errors.New("Failed to parse IP address")
}
}
}

View file

@ -6,20 +6,17 @@ package server
import (
"errors"
"fmt"
"io"
"log"
"net/http"
"path"
"strconv"
"time"
"github.com/pushrax/chihaya/storage"
)
func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
passkey, _ := path.Split(r.URL.Path)
_, err := s.validatePasskey(passkey)
_, err := s.FindUser(passkey)
if err != nil {
fail(err, w, r)
return
@ -32,25 +29,25 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
}
io.WriteString(w, "d")
bencode(w, "files")
if pq.infohashes != nil {
for _, infohash := range pq.infohashes {
writeBencoded(w, "files")
if pq.Infohashes != nil {
for _, infohash := range pq.Infohashes {
torrent, exists, err := s.dataStore.FindTorrent(infohash)
if err != nil {
log.Panicf("server: %s", err)
}
if exists {
bencode(w, infohash)
writeBencoded(w, infohash)
writeScrapeInfo(w, torrent)
}
}
} else if infohash, exists := pq.params["info_hash"]; exists {
} else if infohash, exists := pq.Params["info_hash"]; exists {
torrent, exists, err := s.dataStore.FindTorrent(infohash)
if err != nil {
log.Panicf("server: %s", err)
}
if exists {
bencode(w, infohash)
writeBencoded(w, infohash)
writeScrapeInfo(w, torrent)
}
}
@ -64,53 +61,11 @@ func (s *Server) serveScrape(w http.ResponseWriter, r *http.Request) {
func writeScrapeInfo(w io.Writer, torrent *storage.Torrent) {
io.WriteString(w, "d")
bencode(w, "complete")
bencode(w, len(torrent.Seeders))
bencode(w, "downloaded")
bencode(w, torrent.Snatched)
bencode(w, "incomplete")
bencode(w, len(torrent.Leechers))
writeBencoded(w, "complete")
writeBencoded(w, len(torrent.Seeders))
writeBencoded(w, "downloaded")
writeBencoded(w, torrent.Snatches)
writeBencoded(w, "incomplete")
writeBencoded(w, len(torrent.Leechers))
io.WriteString(w, "e")
}
func bencode(w io.Writer, data interface{}) {
// A massive switch is faster than reflection
switch v := data.(type) {
case string:
str := fmt.Sprintf("%s:%s", strconv.Itoa(len(v)), v)
io.WriteString(w, str)
case int:
str := fmt.Sprintf("i%se", strconv.Itoa(v))
io.WriteString(w, str)
case uint:
str := fmt.Sprintf("i%se", strconv.FormatUint(uint64(v), 10))
io.WriteString(w, str)
case int64:
str := fmt.Sprintf("i%se", strconv.FormatInt(v, 10))
io.WriteString(w, str)
case uint64:
str := fmt.Sprintf("i%se", strconv.FormatUint(v, 10))
io.WriteString(w, str)
case time.Duration: // Assume seconds
str := fmt.Sprintf("i%se", strconv.FormatInt(int64(v/time.Second), 10))
io.WriteString(w, str)
case map[string]interface{}:
io.WriteString(w, "d")
for key, val := range v {
str := fmt.Sprintf("%s:%s", strconv.Itoa(len(key)), key)
io.WriteString(w, str)
bencode(w, val)
}
io.WriteString(w, "e")
case []string:
io.WriteString(w, "l")
for _, val := range v {
bencode(w, val)
}
io.WriteString(w, "e")
default:
// Although not currently necessary,
// should handle []interface{} manually; Go can't do it implicitly
panic("Tried to bencode an unsupported type!")
}
}

View file

@ -7,7 +7,6 @@ package server
import (
"errors"
"fmt"
"io"
"log"
"net"
@ -113,14 +112,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func fail(err error, w http.ResponseWriter, r *http.Request) {
errmsg := err.Error()
message := fmt.Sprintf(
"%s%s%s%s%s",
"d14:failure reason",
strconv.Itoa(len(errmsg)),
":",
errmsg,
"e",
)
message := "d14:failure reason" + strconv.Itoa(len(errmsg)) + ":" + errmsg + "e"
length, _ := io.WriteString(w, message)
r.Close = true
w.Header().Add("Content-Type", "text/plain")
@ -129,7 +121,7 @@ func fail(err error, w http.ResponseWriter, r *http.Request) {
w.(http.Flusher).Flush()
}
func (s *Server) validatePasskey(dir string) (*storage.User, error) {
func (s *Server) FindUser(dir string) (*storage.User, error) {
if len(dir) != 34 {
return nil, errors.New("Passkey is invalid")
}

52
server/torrent.go Normal file
View file

@ -0,0 +1,52 @@
package server
import (
"fmt"
"io"
"strconv"
"time"
)
func writeBencoded(w io.Writer, data interface{}) {
switch v := data.(type) {
case string:
str := fmt.Sprintf("%s:%s", strconv.Itoa(len(v)), v)
io.WriteString(w, str)
case int:
str := fmt.Sprintf("i%se", strconv.Itoa(v))
io.WriteString(w, str)
case uint:
str := fmt.Sprintf("i%se", strconv.FormatUint(uint64(v), 10))
io.WriteString(w, str)
case int64:
str := fmt.Sprintf("i%se", strconv.FormatInt(v, 10))
io.WriteString(w, str)
case uint64:
str := fmt.Sprintf("i%se", strconv.FormatUint(v, 10))
io.WriteString(w, str)
case time.Duration: // Assume seconds
str := fmt.Sprintf("i%se", strconv.FormatInt(int64(v/time.Second), 10))
io.WriteString(w, str)
case map[string]interface{}:
io.WriteString(w, "d")
for key, val := range v {
str := fmt.Sprintf("%s:%s", strconv.Itoa(len(key)), key)
io.WriteString(w, str)
writeBencoded(w, val)
}
io.WriteString(w, "e")
case []string:
io.WriteString(w, "l")
for _, val := range v {
writeBencoded(w, val)
}
io.WriteString(w, "e")
default:
// Although not currently necessary,
// should handle []interface{} manually; Go can't do it implicitly
panic("Tried to bencode an unsupported type!")
}
}
func compact() {
}

View file

@ -9,17 +9,14 @@ type Peer struct {
UserID uint64
TorrentID uint64
Port uint
IP string
Addr []byte
Port uint64
Uploaded uint64
Downloaded uint64
Left uint64
Seeding bool
StartTimeUnix int64
LastAnnounce int64
LastAnnounce int64
}
type Torrent struct {
@ -28,21 +25,22 @@ type Torrent struct {
UpMultiplier float64
DownMultiplier float64
Seeders map[string]*Peer
Leechers map[string]*Peer
Seeders map[string]Peer
Leechers map[string]Peer
Snatched uint
Status int64
Snatches uint
Pruned bool
LastAction int64
}
type User struct {
ID uint64
Passkey string
ID uint64
Passkey string
UpMultiplier float64
DownMultiplier float64
Slots int64
UsedSlots int64
Slots int64
UsedSlots int64
SlotsLastChecked int64
}

View file

@ -67,7 +67,7 @@ func (ds *DS) FindUser(passkey string) (*storage.User, bool, error) {
conn := ds.Get()
defer conn.Close()
key := ds.conf.Prefix + "user:" + passkey
key := ds.conf.Prefix + "User:" + passkey
reply, err := redis.Values(conn.Do("HGETALL", key))
if err != nil {
return nil, true, err
@ -90,7 +90,7 @@ func (ds *DS) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
conn := ds.Get()
defer conn.Close()
key := ds.conf.Prefix + "torrent:" + infohash
key := ds.conf.Prefix + "Torrent:" + infohash
reply, err := redis.Values(conn.Do("HGETALL", key))
if err != nil {
return nil, false, err
@ -113,7 +113,7 @@ func (ds *DS) ClientWhitelisted(peerID string) (bool, error) {
conn := ds.Get()
defer conn.Close()
key := ds.conf.Prefix + "whitelist:" + peerID
key := ds.conf.Prefix + "Whitelist:" + peerID
exists, err := redis.Bool(conn.Do("EXISTS", key))
if err != nil {
return false, err
@ -151,7 +151,7 @@ func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error {
if t.done {
return storage.ErrTxDone
}
key := t.conf.Prefix + "torrent:" + torrent.Infohash
key := t.conf.Prefix + "Torrent:" + torrent.Infohash
err := t.Send("HSET " + key + " Status 0")
if err != nil {
return err

View file

@ -72,5 +72,17 @@ type Tx interface {
Commit() error
Rollback() error
UnpruneTorrent(torrent *Torrent) error
// Torrents
Snatch(userID, torrentID uint64) error
Unprune(torrentID uint64) error
// Peers
NewLeecher(torrent *Torrent, p *Peer) error
RmLeecher(torrentID uint64, peerID string) error
NewSeeder(torrent *Torrent, p *Peer) error
RmSeeder(torrentID uint64, peerID string) error
// Users
DecrementSlots(userID uint64) error
}