From 279c78192f3566e4cd46db7c8ae7acb2d1739281 Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Wed, 3 Jul 2013 18:24:03 -0400 Subject: [PATCH] transactions, connpools, misc --- config/config.go | 13 ++++---- example/config.json | 1 + server/announce.go | 29 ++++++++++++++--- server/query.go | 39 +++++++++++------------ server/server.go | 6 ++-- storage/redis/redis.go | 70 ++++++++++++++++++++++++++++++------------ storage/storage.go | 49 ++++++++++++----------------- 7 files changed, 123 insertions(+), 84 deletions(-) diff --git a/config/config.go b/config/config.go index edd4ab5..56f1de9 100644 --- a/config/config.go +++ b/config/config.go @@ -53,9 +53,10 @@ type Config struct { Private bool `json:"private"` Freeleech bool `json:"freeleech"` - Announce Duration `json:"announce"` - MinAnnounce Duration `json:"min_announce"` - ReadTimeout Duration `json:"read_timeout"` + Announce Duration `json:"announce"` + MinAnnounce Duration `json:"min_announce"` + ReadTimeout Duration `json:"read_timeout"` + DefaultNumWant int `json:"default_num_want"` Whitelist []Client `json:"whitelist"` } @@ -76,13 +77,13 @@ func New(path string) (*Config, error) { return conf, nil } -func (c *Config) Whitelisted(peerId string) (matched bool) { +func (c *Config) ClientWhitelisted(peerID string) (matched bool) { for _, client := range c.Whitelist { length := len(client.PeerID) - if length <= len(peerId) { + if length <= len(peerID) { matched = true for i := 0; i < length; i++ { - if peerId[i] != client.PeerID[i] { + if peerID[i] != client.PeerID[i] { matched = false break } diff --git a/example/config.json b/example/config.json index d41e5e0..8f3e77e 100644 --- a/example/config.json +++ b/example/config.json @@ -20,6 +20,7 @@ "announce": "30m", "min_announce": "15m", "read_timeout": "20s", + "default_num_want": 50, "whitelist": [ { "name": "Azureus 2.5.x", "peer_id": "-AZ25" }, diff --git a/server/announce.go b/server/announce.go index 9afb9fa..32ddcea 100644 --- a/server/announce.go +++ b/server/announce.go @@ -10,11 +10,15 @@ import ( "log" "net/http" "path" + "strconv" ) func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { + conn := s.connPool.Get() + defer conn.Close() + passkey, _ := path.Split(r.URL.Path) - _, err := validatePasskey(passkey, s.storage) + _, err := validatePasskey(passkey, conn) if err != nil { fail(err, w, r) return @@ -32,18 +36,18 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } - err = pq.validate() + err = pq.validateAnnounceParams() if err != nil { fail(errors.New("Malformed request"), w, r) return } - if !s.conf.Whitelisted(pq.params["peerId"]) { + if !s.conf.ClientWhitelisted(pq.params["peer_id"]) { fail(errors.New("Your client is not approved"), w, r) return } - torrent, exists, err := s.storage.FindTorrent(pq.params["infohash"]) + torrent, exists, err := conn.FindTorrent(pq.params["infohash"]) if err != nil { log.Panicf("server: %s", err) } @@ -52,8 +56,13 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } + tx, err := conn.NewTx() + if err != nil { + log.Panicf("server: %s", err) + } + if left, _ := pq.getUint64("left"); torrent.Status == 1 && left == 0 { - err := s.storage.UnpruneTorrent(torrent) + err := tx.UnpruneTorrent(torrent) if err != nil { log.Panicf("server: %s", err) } @@ -71,5 +80,15 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) { return } + var numWant int + if numWantStr, exists := pq.params["numWant"]; exists { + numWant, err := strconv.Atoi(numWantStr) + if err != nil { + numWant = s.conf.DefaultNumWant + } + } else { + numWant = s.conf.DefaultNumWant + } + // TODO continue } diff --git a/server/query.go b/server/query.go index 5429dc9..5ce18dc 100644 --- a/server/query.go +++ b/server/query.go @@ -6,6 +6,7 @@ package server import ( "errors" + "github.com/pushrax/chihaya/config" "net/http" "net/url" "strconv" @@ -96,7 +97,7 @@ func parseQuery(query string) (*parsedQuery, error) { return pq, nil } -func (pq *parsedQuery) validate() error { +func (pq *parsedQuery) validateAnnounceParams() error { infohash, _ := pq.params["info_hash"] if infohash == "" { return errors.New("infohash does not exist") @@ -126,27 +127,23 @@ func (pq *parsedQuery) validate() error { // TODO IPv6 support func (pq *parsedQuery) determineIP(r *http.Request) (string, error) { - ip, ok := pq.params["ip"] - if !ok { - ip, ok = pq.params["ipv4"] - if !ok { - ips, ok := r.Header["X-Real-Ip"] - if ok && len(ips) > 0 { - ip = ips[0] - } else { - portIndex := len(r.RemoteAddr) - 1 - for ; portIndex >= 0; portIndex-- { - if r.RemoteAddr[portIndex] == ':' { - break - } - } - if portIndex != -1 { - ip = r.RemoteAddr[0:portIndex] - } else { - return "", errors.New("Failed to parse IP address") - } + if ip, ok := pq.params["ip"]; ok { + return ip, nil + } else if ip, ok := pq.params["ipv4"]; ok { + return ip, nil + } else if ips, ok := pq.params["X-Real-Ip"]; ok && len(ips) > 0 { + return string(ips[0]), nil + } else { + portIndex := len(r.RemoteAddr) - 1 + for ; portIndex >= 0; portIndex-- { + if r.RemoteAddr[portIndex] == ':' { + break } } + if portIndex != -1 { + return r.RemoteAddr[0:portIndex], nil + } else { + return "", errors.New("Failed to parse IP address") + } } - return ip, nil } diff --git a/server/server.go b/server/server.go index 9458021..948ada6 100644 --- a/server/server.go +++ b/server/server.go @@ -25,7 +25,7 @@ import ( type Server struct { conf *config.Config listener net.Listener - storage storage.Conn + connPool storage.Pool serving bool startTime time.Time @@ -39,14 +39,14 @@ type Server struct { } func New(conf *config.Config) (*Server, error) { - store, err := storage.Open(&conf.Storage) + pool, err := storage.Open(&conf.Storage) if err != nil { return nil, err } s := &Server{ conf: conf, - storage: store, + storage: pool, Server: http.Server{ Addr: conf.Addr, ReadTimeout: conf.ReadTimeout.Duration, diff --git a/storage/redis/redis.go b/storage/redis/redis.go index 3bbef47..f42ce50 100644 --- a/storage/redis/redis.go +++ b/storage/redis/redis.go @@ -15,16 +15,16 @@ import ( type driver struct{} -func (d *driver) New(conf *config.Storage) (storage.Conn, error) { - return &Conn{ +func (d *driver) New(conf *config.Storage) storage.Pool { + return &Pool{ conf: conf, - pool: &redis.Pool{ + pool: redis.Pool{ MaxIdle: 3, IdleTimeout: 240 * time.Second, Dial: makeDialFunc(conf), TestOnBorrow: testOnBorrow, }, - }, nil + } } func makeDialFunc(conf *config.Storage) func() (redis.Conn, error) { @@ -60,22 +60,31 @@ func testOnBorrow(c redis.Conn, t time.Time) error { return err } -type Conn struct { +type Pool struct { conf *config.Storage - pool *redis.Pool + pool redis.Pool } -func (c *Conn) Close() error { - return c.pool.Close() +func (p *Pool) Get() storage.Conn { + return &Conn{ + conf: p.conf, + Conn: p.pool.Get(), + } +} + +func (p *Pool) Close() error { + return p.pool.Close() +} + +type Conn struct { + conf *config.Storage + redis.Conn } func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) { - conn := c.pool.Get() - defer c.pool.Close() - key := c.conf.Prefix + "User:" + passkey - exists, err := redis.Bool(conn.Do("EXISTS", key)) + exists, err := redis.Bool(c.Do("EXISTS", key)) if err != nil { return nil, false, err } @@ -83,7 +92,7 @@ func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) { return nil, false, nil } - reply, err := redis.Values(conn.Do("HGETALL", key)) + reply, err := redis.Values(c.Do("HGETALL", key)) if err != nil { return nil, true, err } @@ -96,12 +105,9 @@ func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) { } func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) { - conn := c.pool.Get() - defer c.pool.Close() - key := c.conf.Prefix + "Torrent:" + infohash - exists, err := redis.Bool(conn.Do("EXISTS", key)) + exists, err := redis.Bool(c.Do("EXISTS", key)) if err != nil { return nil, false, err } @@ -109,7 +115,7 @@ func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) { return nil, false, nil } - reply, err := redis.Values(conn.Do("HGETALL", key)) + reply, err := redis.Values(c.Do("HGETALL", key)) if err != nil { return nil, true, err } @@ -121,8 +127,32 @@ func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) { return torrent, true, nil } -func (c *Conn) UnpruneTorrent(torrent *storage.Torrent) error { - // TODO +type Tx struct { + conn *Conn +} + +func (c *Conn) NewTx() (storage.Tx, error) { + err := c.Send("MULTI") + if err != nil { + return nil, err + } + return &Tx{c}, nil +} + +func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error { + key := t.conn.conf.Prefix + "Torrent:" + torrent.Infohash + err := t.conn.Send("HSET " + key + " Status 0") + if err != nil { + return err + } + return nil +} + +func (t *Tx) Commit() error { + _, err := t.conn.Do("EXEC") + if err != nil { + return err + } return nil } diff --git a/storage/storage.go b/storage/storage.go index bbc1551..d559fbc 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -15,7 +15,7 @@ import ( var drivers = make(map[string]Driver) type Driver interface { - New(*config.Storage) (Conn, error) + New(*config.Storage) Pool } func Register(name string, driver Driver) { @@ -28,7 +28,7 @@ func Register(name string, driver Driver) { drivers[name] = driver } -func Open(conf *config.Storage) (Conn, error) { +func Open(conf *config.Storage) (Pool, error) { driver, ok := drivers[conf.Driver] if !ok { return nil, fmt.Errorf( @@ -36,38 +36,29 @@ func Open(conf *config.Storage) (Conn, error) { conf.Driver, ) } - store, err := driver.New(conf) - if err != nil { - return nil, err - } - return store, nil + pool := driver.New(conf) + return pool, nil } +// ConnPool represents a pool of connections to the data store. +type Pool interface { + Close() error + Get() Conn +} + +// Conn represents a single connection to the data store. type Conn interface { Close() error + NewTx() (Tx, error) + FindUser(passkey string) (*User, bool, error) FindTorrent(infohash string) (*Torrent, bool, error) - UnpruneTorrent(torrent *Torrent) error - - /* - RecordUser( - user *User, - rawDeltaUpload int64, - rawDeltaDownload int64, - deltaUpload int64, - deltaDownload int64, - ) error - RecordSnatch(peer *Peer, now int64) error - RecordTorrent(torrent *Torrent, deltaSnatch uint64) error - RecordTransferIP(peer *Peer) error - RecordTransferHistory( - peer *Peer, - rawDeltaUpload int64, - rawDeltaDownload int64, - deltaTime int64, - deltaSnatch uint64, - active bool, - ) error - */ +} + +// Tx represents a data store transaction. +type Tx interface { + Commit() error + + UnpruneTorrent(torrent *Torrent) error }