diff --git a/server/announce.go b/server/announce.go index 0390102..d1f4f52 100644 --- a/server/announce.go +++ b/server/announce.go @@ -10,15 +10,20 @@ import ( "net/http" "path" "strconv" + "time" + + "github.com/pushrax/chihaya/storage" ) func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { + // Parse the required parameters off of a query compact, numWant, infohash, peerID, event, ip, port, uploaded, downloaded, left, err := s.validateAnnounceQuery(r) if err != nil { fail(err, w, r) return } + // Validate the user's passkey passkey, _ := path.Split(r.URL.Path) user, err := s.FindUser(passkey) if err != nil { @@ -26,6 +31,7 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } + // Check if the user's client is whitelisted whitelisted, err := s.dataStore.ClientWhitelisted(peerID) if err != nil { log.Panicf("server: %s", err) @@ -35,6 +41,7 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } + // Find the specified torrent torrent, exists, err := s.dataStore.FindTorrent(infohash) if err != nil { log.Panicf("server: %s", err) @@ -44,49 +51,140 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } - if s.conf.Slots && user.Slots != -1 && left != 0 { - if user.UsedSlots >= user.Slots { - fail(errors.New("You've run out of download slots."), w, r) - return - } - } - + // Begin a data store transaction tx, err := s.dataStore.Begin() if err != nil { log.Panicf("server: %s", err) } + // If the torrent was pruned and the user is seeding, unprune it if torrent.Pruned && left == 0 { - err := tx.Unprune(torrent.ID) + err := tx.Unprune(torrent) if err != nil { log.Panicf("server: %s", err) } } - _, isLeecher := torrent.Leechers[peerID] - _, isSeeder := torrent.Seeders[peerID] - if event == "stopped" || event == "paused" { + // Look for the user in in the pool of seeders and leechers + sp, seeder := torrent.Seeders[peerID] + lp, leecher := torrent.Leechers[peerID] + peer := &storage.Peer{} + switch { + // Guarantee that no user is in both pools + case seeder && leecher: if left == 0 { - err := tx.RmSeeder(torrent.ID, peerID) + peer = &sp + err := tx.RmLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + leecher = false + } else { + peer = &lp + err := tx.RmSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + seeder = false + } + + case seeder: + peer = &sp + // TODO update stats + + case leecher: + peer = &lp + // TODO update stats + + default: + // The user is a new peer + peer = &storage.Peer{ + ID: peerID, + UserID: user.ID, + TorrentID: torrent.ID, + IP: ip, + Port: port, + Uploaded: uploaded, + Downloaded: downloaded, + Left: left, + LastAnnounce: time.Now().Unix(), + } + + // Check the new user's slots + if s.conf.Slots && user.Slots != -1 && left != 0 { + if user.UsedSlots >= user.Slots { + fail(errors.New("You've run out of download slots."), w, r) + return + } + } + + if left == 0 { + err := tx.NewSeeder(torrent, peer) if err != nil { log.Panicf("server: %s", err) } } else { - err := tx.RmLeecher(torrent.ID, peerID) + err := tx.IncrementSlots(user) if err != nil { log.Panicf("server: %s", err) } - err = tx.DecrementSlots(user.ID) + err = tx.NewLeecher(torrent, peer) if err != nil { log.Panicf("server: %s", err) } } - } else if event == "completed" { - err := tx.Snatch(user.ID, torrent.ID) + } + + // Handle any events given to us by the user + switch { + case event == "stopped" || event == "paused": + if seeder { + err := tx.RmSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } + if leecher { + err := tx.RmLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + err = tx.DecrementSlots(user) + if err != nil { + log.Panicf("server: %s", err) + } + } + + case event == "completed": + err := tx.Snatch(user, torrent) + if err != nil { + log.Panicf("server: %s", err) + } + if leecher { + err := tx.RmLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + err = tx.NewSeeder(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + } + + case leecher && left == 0: + // Completed event from the peer was never received + err := tx.RmLeecher(torrent, peer) + if err != nil { + log.Panicf("server: %s", err) + } + err = tx.NewSeeder(torrent, peer) if err != nil { log.Panicf("server: %s", err) } } + + // TODO compact, response, etc... + } func (s *Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) { diff --git a/storage/redis/redis.go b/storage/redis/redis.go index 653038c..4e52471 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -139,45 +139,101 @@ func (ds *DS) Begin() (storage.Tx, error) { }, nil } -func (t *Tx) Close() { - if t.done { +func (tx *Tx) close() { + if tx.done { panic("redis: transaction closed twice") } - t.done = true - t.Conn.Close() + tx.done = true + tx.Conn.Close() } -func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error { - if t.done { +func (tx *Tx) Commit() error { + if tx.done { return storage.ErrTxDone } - key := t.conf.Prefix + "Torrent:" + torrent.Infohash - err := t.Send("HSET " + key + " Status 0") - if err != nil { - return err - } - return nil -} - -func (t *Tx) Commit() error { - if t.done { - return storage.ErrTxDone - } - _, err := t.Do("EXEC") + _, err := tx.Do("EXEC") if err != nil { return err } - t.Close() + tx.close() return nil } // Redis doesn't need to rollback. Exec is atomic. -func (t *Tx) Rollback() error { - if t.done { +func (tx *Tx) Rollback() error { + if tx.done { return storage.ErrTxDone } - t.Close() + tx.close() + return nil +} + +func (tx *Tx) Snatch(user *storage.User, torrent *storage.Torrent) error { + if tx.done { + return storage.ErrTxDone + } + // TODO + return nil +} + +func (tx *Tx) Unprune(t *storage.Torrent) error { + if tx.done { + return storage.ErrTxDone + } + key := tx.conf.Prefix + "Torrent:" + t.Infohash + err := tx.Send("HSET " + key + " Status 0") + if err != nil { + return err + } + return nil +} + +func (tx *Tx) NewLeecher(t *storage.Torrent, p *storage.Peer) error { + if tx.done { + return storage.ErrTxDone + } + // TODO + return nil +} + +func (tx *Tx) RmLeecher(t *storage.Torrent, p *storage.Peer) error { + if tx.done { + return storage.ErrTxDone + } + // TODO + return nil +} + +func (tx *Tx) NewSeeder(t *storage.Torrent, p *storage.Peer) error { + if tx.done { + return storage.ErrTxDone + } + // TODO + return nil +} + +func (tx *Tx) RmSeeder(t *storage.Torrent, p *storage.Peer) error { + if tx.done { + return storage.ErrTxDone + } + // TODO + return nil +} + +func (tx *Tx) IncrementSlots(u *storage.User) error { + if tx.done { + return storage.ErrTxDone + } + // TODO + return nil +} + +func (tx *Tx) DecrementSlots(u *storage.User) error { + if tx.done { + return storage.ErrTxDone + } + // TODO return nil } diff --git a/storage/storage.go b/storage/storage.go index c09ab7e..8e2b906 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -73,16 +73,17 @@ type Tx interface { Rollback() error // Torrents - Snatch(userID, torrentID uint64) error - Unprune(torrentID uint64) error + Snatch(u *User, t *Torrent) error + Unprune(t *Torrent) error // Peers - NewLeecher(torrent *Torrent, p *Peer) error - RmLeecher(torrentID uint64, peerID string) error + NewLeecher(t *Torrent, p *Peer) error + RmLeecher(t *Torrent, p *Peer) error - NewSeeder(torrent *Torrent, p *Peer) error - RmSeeder(torrentID uint64, peerID string) error + NewSeeder(t *Torrent, p *Peer) error + RmSeeder(t *Torrent, p *Peer) error // Users - DecrementSlots(userID uint64) error + IncrementSlots(u *User) error + DecrementSlots(u *User) error }