switch on event; scaffolding for redis driver

This commit is contained in:
Jimmy Zelinskie 2013-07-21 20:15:48 -04:00
parent dae48d0c3b
commit db6d8707df
3 changed files with 201 additions and 46 deletions

View file

@ -10,15 +10,20 @@ import (
"net/http" "net/http"
"path" "path"
"strconv" "strconv"
"time"
"github.com/pushrax/chihaya/storage"
) )
func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
// Parse the required parameters off of a query
compact, numWant, infohash, peerID, event, ip, port, uploaded, downloaded, left, err := s.validateAnnounceQuery(r) compact, numWant, infohash, peerID, event, ip, port, uploaded, downloaded, left, err := s.validateAnnounceQuery(r)
if err != nil { if err != nil {
fail(err, w, r) fail(err, w, r)
return return
} }
// Validate the user's passkey
passkey, _ := path.Split(r.URL.Path) passkey, _ := path.Split(r.URL.Path)
user, err := s.FindUser(passkey) user, err := s.FindUser(passkey)
if err != nil { if err != nil {
@ -26,6 +31,7 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
// Check if the user's client is whitelisted
whitelisted, err := s.dataStore.ClientWhitelisted(peerID) whitelisted, err := s.dataStore.ClientWhitelisted(peerID)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
@ -35,6 +41,7 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
// Find the specified torrent
torrent, exists, err := s.dataStore.FindTorrent(infohash) torrent, exists, err := s.dataStore.FindTorrent(infohash)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
@ -44,49 +51,140 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
return return
} }
if s.conf.Slots && user.Slots != -1 && left != 0 { // Begin a data store transaction
if user.UsedSlots >= user.Slots {
fail(errors.New("You've run out of download slots."), w, r)
return
}
}
tx, err := s.dataStore.Begin() tx, err := s.dataStore.Begin()
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
// If the torrent was pruned and the user is seeding, unprune it
if torrent.Pruned && left == 0 { if torrent.Pruned && left == 0 {
err := tx.Unprune(torrent.ID) err := tx.Unprune(torrent)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
} }
_, isLeecher := torrent.Leechers[peerID] // Look for the user in in the pool of seeders and leechers
_, isSeeder := torrent.Seeders[peerID] sp, seeder := torrent.Seeders[peerID]
if event == "stopped" || event == "paused" { lp, leecher := torrent.Leechers[peerID]
peer := &storage.Peer{}
switch {
// Guarantee that no user is in both pools
case seeder && leecher:
if left == 0 { if left == 0 {
err := tx.RmSeeder(torrent.ID, peerID) peer = &sp
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
leecher = false
} else {
peer = &lp
err := tx.RmSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
seeder = false
}
case seeder:
peer = &sp
// TODO update stats
case leecher:
peer = &lp
// TODO update stats
default:
// The user is a new peer
peer = &storage.Peer{
ID: peerID,
UserID: user.ID,
TorrentID: torrent.ID,
IP: ip,
Port: port,
Uploaded: uploaded,
Downloaded: downloaded,
Left: left,
LastAnnounce: time.Now().Unix(),
}
// Check the new user's slots
if s.conf.Slots && user.Slots != -1 && left != 0 {
if user.UsedSlots >= user.Slots {
fail(errors.New("You've run out of download slots."), w, r)
return
}
}
if left == 0 {
err := tx.NewSeeder(torrent, peer)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
} else { } else {
err := tx.RmLeecher(torrent.ID, peerID) err := tx.IncrementSlots(user)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
err = tx.DecrementSlots(user.ID) err = tx.NewLeecher(torrent, peer)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
} }
} else if event == "completed" { }
err := tx.Snatch(user.ID, torrent.ID)
// Handle any events given to us by the user
switch {
case event == "stopped" || event == "paused":
if seeder {
err := tx.RmSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
if leecher {
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.DecrementSlots(user)
if err != nil {
log.Panicf("server: %s", err)
}
}
case event == "completed":
err := tx.Snatch(user, torrent)
if err != nil {
log.Panicf("server: %s", err)
}
if leecher {
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.NewSeeder(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
}
case leecher && left == 0:
// Completed event from the peer was never received
err := tx.RmLeecher(torrent, peer)
if err != nil {
log.Panicf("server: %s", err)
}
err = tx.NewSeeder(torrent, peer)
if err != nil { if err != nil {
log.Panicf("server: %s", err) log.Panicf("server: %s", err)
} }
} }
// TODO compact, response, etc...
} }
func (s *Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) { func (s *Server) validateAnnounceQuery(r *http.Request) (compact bool, numWant int, infohash, peerID, event, ip string, port, uploaded, downloaded, left uint64, err error) {

View file

@ -139,45 +139,101 @@ func (ds *DS) Begin() (storage.Tx, error) {
}, nil }, nil
} }
func (t *Tx) Close() { func (tx *Tx) close() {
if t.done { if tx.done {
panic("redis: transaction closed twice") panic("redis: transaction closed twice")
} }
t.done = true tx.done = true
t.Conn.Close() tx.Conn.Close()
} }
func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error { func (tx *Tx) Commit() error {
if t.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
key := t.conf.Prefix + "Torrent:" + torrent.Infohash _, err := tx.Do("EXEC")
err := t.Send("HSET " + key + " Status 0")
if err != nil {
return err
}
return nil
}
func (t *Tx) Commit() error {
if t.done {
return storage.ErrTxDone
}
_, err := t.Do("EXEC")
if err != nil { if err != nil {
return err return err
} }
t.Close() tx.close()
return nil return nil
} }
// Redis doesn't need to rollback. Exec is atomic. // Redis doesn't need to rollback. Exec is atomic.
func (t *Tx) Rollback() error { func (tx *Tx) Rollback() error {
if t.done { if tx.done {
return storage.ErrTxDone return storage.ErrTxDone
} }
t.Close() tx.close()
return nil
}
func (tx *Tx) Snatch(user *storage.User, torrent *storage.Torrent) error {
if tx.done {
return storage.ErrTxDone
}
// TODO
return nil
}
func (tx *Tx) Unprune(t *storage.Torrent) error {
if tx.done {
return storage.ErrTxDone
}
key := tx.conf.Prefix + "Torrent:" + t.Infohash
err := tx.Send("HSET " + key + " Status 0")
if err != nil {
return err
}
return nil
}
func (tx *Tx) NewLeecher(t *storage.Torrent, p *storage.Peer) error {
if tx.done {
return storage.ErrTxDone
}
// TODO
return nil
}
func (tx *Tx) RmLeecher(t *storage.Torrent, p *storage.Peer) error {
if tx.done {
return storage.ErrTxDone
}
// TODO
return nil
}
func (tx *Tx) NewSeeder(t *storage.Torrent, p *storage.Peer) error {
if tx.done {
return storage.ErrTxDone
}
// TODO
return nil
}
func (tx *Tx) RmSeeder(t *storage.Torrent, p *storage.Peer) error {
if tx.done {
return storage.ErrTxDone
}
// TODO
return nil
}
func (tx *Tx) IncrementSlots(u *storage.User) error {
if tx.done {
return storage.ErrTxDone
}
// TODO
return nil
}
func (tx *Tx) DecrementSlots(u *storage.User) error {
if tx.done {
return storage.ErrTxDone
}
// TODO
return nil return nil
} }

View file

@ -73,16 +73,17 @@ type Tx interface {
Rollback() error Rollback() error
// Torrents // Torrents
Snatch(userID, torrentID uint64) error Snatch(u *User, t *Torrent) error
Unprune(torrentID uint64) error Unprune(t *Torrent) error
// Peers // Peers
NewLeecher(torrent *Torrent, p *Peer) error NewLeecher(t *Torrent, p *Peer) error
RmLeecher(torrentID uint64, peerID string) error RmLeecher(t *Torrent, p *Peer) error
NewSeeder(torrent *Torrent, p *Peer) error NewSeeder(t *Torrent, p *Peer) error
RmSeeder(torrentID uint64, peerID string) error RmSeeder(t *Torrent, p *Peer) error
// Users // Users
DecrementSlots(userID uint64) error IncrementSlots(u *User) error
DecrementSlots(u *User) error
} }