transactions, connpools, misc
This commit is contained in:
parent
5848a7a8d8
commit
279c78192f
7 changed files with 123 additions and 84 deletions
|
@ -53,9 +53,10 @@ type Config struct {
|
||||||
Private bool `json:"private"`
|
Private bool `json:"private"`
|
||||||
Freeleech bool `json:"freeleech"`
|
Freeleech bool `json:"freeleech"`
|
||||||
|
|
||||||
Announce Duration `json:"announce"`
|
Announce Duration `json:"announce"`
|
||||||
MinAnnounce Duration `json:"min_announce"`
|
MinAnnounce Duration `json:"min_announce"`
|
||||||
ReadTimeout Duration `json:"read_timeout"`
|
ReadTimeout Duration `json:"read_timeout"`
|
||||||
|
DefaultNumWant int `json:"default_num_want"`
|
||||||
|
|
||||||
Whitelist []Client `json:"whitelist"`
|
Whitelist []Client `json:"whitelist"`
|
||||||
}
|
}
|
||||||
|
@ -76,13 +77,13 @@ func New(path string) (*Config, error) {
|
||||||
return conf, nil
|
return conf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) Whitelisted(peerId string) (matched bool) {
|
func (c *Config) ClientWhitelisted(peerID string) (matched bool) {
|
||||||
for _, client := range c.Whitelist {
|
for _, client := range c.Whitelist {
|
||||||
length := len(client.PeerID)
|
length := len(client.PeerID)
|
||||||
if length <= len(peerId) {
|
if length <= len(peerID) {
|
||||||
matched = true
|
matched = true
|
||||||
for i := 0; i < length; i++ {
|
for i := 0; i < length; i++ {
|
||||||
if peerId[i] != client.PeerID[i] {
|
if peerID[i] != client.PeerID[i] {
|
||||||
matched = false
|
matched = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
"announce": "30m",
|
"announce": "30m",
|
||||||
"min_announce": "15m",
|
"min_announce": "15m",
|
||||||
"read_timeout": "20s",
|
"read_timeout": "20s",
|
||||||
|
"default_num_want": 50,
|
||||||
|
|
||||||
"whitelist": [
|
"whitelist": [
|
||||||
{ "name": "Azureus 2.5.x", "peer_id": "-AZ25" },
|
{ "name": "Azureus 2.5.x", "peer_id": "-AZ25" },
|
||||||
|
|
|
@ -10,11 +10,15 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
|
||||||
|
conn := s.connPool.Get()
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
passkey, _ := path.Split(r.URL.Path)
|
passkey, _ := path.Split(r.URL.Path)
|
||||||
_, err := validatePasskey(passkey, s.storage)
|
_, err := validatePasskey(passkey, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fail(err, w, r)
|
fail(err, w, r)
|
||||||
return
|
return
|
||||||
|
@ -32,18 +36,18 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = pq.validate()
|
err = pq.validateAnnounceParams()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fail(errors.New("Malformed request"), w, r)
|
fail(errors.New("Malformed request"), w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !s.conf.Whitelisted(pq.params["peerId"]) {
|
if !s.conf.ClientWhitelisted(pq.params["peer_id"]) {
|
||||||
fail(errors.New("Your client is not approved"), w, r)
|
fail(errors.New("Your client is not approved"), w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
torrent, exists, err := s.storage.FindTorrent(pq.params["infohash"])
|
torrent, exists, err := conn.FindTorrent(pq.params["infohash"])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("server: %s", err)
|
log.Panicf("server: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -52,8 +56,13 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tx, err := conn.NewTx()
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("server: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
if left, _ := pq.getUint64("left"); torrent.Status == 1 && left == 0 {
|
if left, _ := pq.getUint64("left"); torrent.Status == 1 && left == 0 {
|
||||||
err := s.storage.UnpruneTorrent(torrent)
|
err := tx.UnpruneTorrent(torrent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("server: %s", err)
|
log.Panicf("server: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -71,5 +80,15 @@ func (s *Server) serveAnnounce(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var numWant int
|
||||||
|
if numWantStr, exists := pq.params["numWant"]; exists {
|
||||||
|
numWant, err := strconv.Atoi(numWantStr)
|
||||||
|
if err != nil {
|
||||||
|
numWant = s.conf.DefaultNumWant
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
numWant = s.conf.DefaultNumWant
|
||||||
|
}
|
||||||
|
|
||||||
// TODO continue
|
// TODO continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/pushrax/chihaya/config"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -96,7 +97,7 @@ func parseQuery(query string) (*parsedQuery, error) {
|
||||||
return pq, nil
|
return pq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pq *parsedQuery) validate() error {
|
func (pq *parsedQuery) validateAnnounceParams() error {
|
||||||
infohash, _ := pq.params["info_hash"]
|
infohash, _ := pq.params["info_hash"]
|
||||||
if infohash == "" {
|
if infohash == "" {
|
||||||
return errors.New("infohash does not exist")
|
return errors.New("infohash does not exist")
|
||||||
|
@ -126,27 +127,23 @@ func (pq *parsedQuery) validate() error {
|
||||||
|
|
||||||
// TODO IPv6 support
|
// TODO IPv6 support
|
||||||
func (pq *parsedQuery) determineIP(r *http.Request) (string, error) {
|
func (pq *parsedQuery) determineIP(r *http.Request) (string, error) {
|
||||||
ip, ok := pq.params["ip"]
|
if ip, ok := pq.params["ip"]; ok {
|
||||||
if !ok {
|
return ip, nil
|
||||||
ip, ok = pq.params["ipv4"]
|
} else if ip, ok := pq.params["ipv4"]; ok {
|
||||||
if !ok {
|
return ip, nil
|
||||||
ips, ok := r.Header["X-Real-Ip"]
|
} else if ips, ok := pq.params["X-Real-Ip"]; ok && len(ips) > 0 {
|
||||||
if ok && len(ips) > 0 {
|
return string(ips[0]), nil
|
||||||
ip = ips[0]
|
} else {
|
||||||
} else {
|
portIndex := len(r.RemoteAddr) - 1
|
||||||
portIndex := len(r.RemoteAddr) - 1
|
for ; portIndex >= 0; portIndex-- {
|
||||||
for ; portIndex >= 0; portIndex-- {
|
if r.RemoteAddr[portIndex] == ':' {
|
||||||
if r.RemoteAddr[portIndex] == ':' {
|
break
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if portIndex != -1 {
|
|
||||||
ip = r.RemoteAddr[0:portIndex]
|
|
||||||
} else {
|
|
||||||
return "", errors.New("Failed to parse IP address")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if portIndex != -1 {
|
||||||
|
return r.RemoteAddr[0:portIndex], nil
|
||||||
|
} else {
|
||||||
|
return "", errors.New("Failed to parse IP address")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ip, nil
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ import (
|
||||||
type Server struct {
|
type Server struct {
|
||||||
conf *config.Config
|
conf *config.Config
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
storage storage.Conn
|
connPool storage.Pool
|
||||||
|
|
||||||
serving bool
|
serving bool
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
|
@ -39,14 +39,14 @@ type Server struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(conf *config.Config) (*Server, error) {
|
func New(conf *config.Config) (*Server, error) {
|
||||||
store, err := storage.Open(&conf.Storage)
|
pool, err := storage.Open(&conf.Storage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &Server{
|
s := &Server{
|
||||||
conf: conf,
|
conf: conf,
|
||||||
storage: store,
|
storage: pool,
|
||||||
Server: http.Server{
|
Server: http.Server{
|
||||||
Addr: conf.Addr,
|
Addr: conf.Addr,
|
||||||
ReadTimeout: conf.ReadTimeout.Duration,
|
ReadTimeout: conf.ReadTimeout.Duration,
|
||||||
|
|
|
@ -15,16 +15,16 @@ import (
|
||||||
|
|
||||||
type driver struct{}
|
type driver struct{}
|
||||||
|
|
||||||
func (d *driver) New(conf *config.Storage) (storage.Conn, error) {
|
func (d *driver) New(conf *config.Storage) storage.Pool {
|
||||||
return &Conn{
|
return &Pool{
|
||||||
conf: conf,
|
conf: conf,
|
||||||
pool: &redis.Pool{
|
pool: redis.Pool{
|
||||||
MaxIdle: 3,
|
MaxIdle: 3,
|
||||||
IdleTimeout: 240 * time.Second,
|
IdleTimeout: 240 * time.Second,
|
||||||
Dial: makeDialFunc(conf),
|
Dial: makeDialFunc(conf),
|
||||||
TestOnBorrow: testOnBorrow,
|
TestOnBorrow: testOnBorrow,
|
||||||
},
|
},
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeDialFunc(conf *config.Storage) func() (redis.Conn, error) {
|
func makeDialFunc(conf *config.Storage) func() (redis.Conn, error) {
|
||||||
|
@ -60,22 +60,31 @@ func testOnBorrow(c redis.Conn, t time.Time) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
type Conn struct {
|
type Pool struct {
|
||||||
conf *config.Storage
|
conf *config.Storage
|
||||||
pool *redis.Pool
|
pool redis.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) Close() error {
|
func (p *Pool) Get() storage.Conn {
|
||||||
return c.pool.Close()
|
return &Conn{
|
||||||
|
conf: p.conf,
|
||||||
|
Conn: p.pool.Get(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) Close() error {
|
||||||
|
return p.pool.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type Conn struct {
|
||||||
|
conf *config.Storage
|
||||||
|
redis.Conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) {
|
func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) {
|
||||||
conn := c.pool.Get()
|
|
||||||
defer c.pool.Close()
|
|
||||||
|
|
||||||
key := c.conf.Prefix + "User:" + passkey
|
key := c.conf.Prefix + "User:" + passkey
|
||||||
|
|
||||||
exists, err := redis.Bool(conn.Do("EXISTS", key))
|
exists, err := redis.Bool(c.Do("EXISTS", key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
@ -83,7 +92,7 @@ func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) {
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
reply, err := redis.Values(conn.Do("HGETALL", key))
|
reply, err := redis.Values(c.Do("HGETALL", key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, err
|
return nil, true, err
|
||||||
}
|
}
|
||||||
|
@ -96,12 +105,9 @@ func (c *Conn) FindUser(passkey string) (*storage.User, bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
|
func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
|
||||||
conn := c.pool.Get()
|
|
||||||
defer c.pool.Close()
|
|
||||||
|
|
||||||
key := c.conf.Prefix + "Torrent:" + infohash
|
key := c.conf.Prefix + "Torrent:" + infohash
|
||||||
|
|
||||||
exists, err := redis.Bool(conn.Do("EXISTS", key))
|
exists, err := redis.Bool(c.Do("EXISTS", key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
@ -109,7 +115,7 @@ func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
reply, err := redis.Values(conn.Do("HGETALL", key))
|
reply, err := redis.Values(c.Do("HGETALL", key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, err
|
return nil, true, err
|
||||||
}
|
}
|
||||||
|
@ -121,8 +127,32 @@ func (c *Conn) FindTorrent(infohash string) (*storage.Torrent, bool, error) {
|
||||||
return torrent, true, nil
|
return torrent, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) UnpruneTorrent(torrent *storage.Torrent) error {
|
type Tx struct {
|
||||||
// TODO
|
conn *Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Conn) NewTx() (storage.Tx, error) {
|
||||||
|
err := c.Send("MULTI")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Tx{c}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tx) UnpruneTorrent(torrent *storage.Torrent) error {
|
||||||
|
key := t.conn.conf.Prefix + "Torrent:" + torrent.Infohash
|
||||||
|
err := t.conn.Send("HSET " + key + " Status 0")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tx) Commit() error {
|
||||||
|
_, err := t.conn.Do("EXEC")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
var drivers = make(map[string]Driver)
|
var drivers = make(map[string]Driver)
|
||||||
|
|
||||||
type Driver interface {
|
type Driver interface {
|
||||||
New(*config.Storage) (Conn, error)
|
New(*config.Storage) Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
func Register(name string, driver Driver) {
|
func Register(name string, driver Driver) {
|
||||||
|
@ -28,7 +28,7 @@ func Register(name string, driver Driver) {
|
||||||
drivers[name] = driver
|
drivers[name] = driver
|
||||||
}
|
}
|
||||||
|
|
||||||
func Open(conf *config.Storage) (Conn, error) {
|
func Open(conf *config.Storage) (Pool, error) {
|
||||||
driver, ok := drivers[conf.Driver]
|
driver, ok := drivers[conf.Driver]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf(
|
return nil, fmt.Errorf(
|
||||||
|
@ -36,38 +36,29 @@ func Open(conf *config.Storage) (Conn, error) {
|
||||||
conf.Driver,
|
conf.Driver,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
store, err := driver.New(conf)
|
pool := driver.New(conf)
|
||||||
if err != nil {
|
return pool, nil
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return store, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnPool represents a pool of connections to the data store.
|
||||||
|
type Pool interface {
|
||||||
|
Close() error
|
||||||
|
Get() Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
// Conn represents a single connection to the data store.
|
||||||
type Conn interface {
|
type Conn interface {
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
|
NewTx() (Tx, error)
|
||||||
|
|
||||||
FindUser(passkey string) (*User, bool, error)
|
FindUser(passkey string) (*User, bool, error)
|
||||||
FindTorrent(infohash string) (*Torrent, bool, error)
|
FindTorrent(infohash string) (*Torrent, bool, error)
|
||||||
UnpruneTorrent(torrent *Torrent) error
|
}
|
||||||
|
|
||||||
/*
|
// Tx represents a data store transaction.
|
||||||
RecordUser(
|
type Tx interface {
|
||||||
user *User,
|
Commit() error
|
||||||
rawDeltaUpload int64,
|
|
||||||
rawDeltaDownload int64,
|
UnpruneTorrent(torrent *Torrent) error
|
||||||
deltaUpload int64,
|
|
||||||
deltaDownload int64,
|
|
||||||
) error
|
|
||||||
RecordSnatch(peer *Peer, now int64) error
|
|
||||||
RecordTorrent(torrent *Torrent, deltaSnatch uint64) error
|
|
||||||
RecordTransferIP(peer *Peer) error
|
|
||||||
RecordTransferHistory(
|
|
||||||
peer *Peer,
|
|
||||||
rawDeltaUpload int64,
|
|
||||||
rawDeltaDownload int64,
|
|
||||||
deltaTime int64,
|
|
||||||
deltaSnatch uint64,
|
|
||||||
active bool,
|
|
||||||
) error
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue