initial middleware refactor

This commit is contained in:
Jimmy Zelinskie 2016-01-25 00:41:39 -05:00
parent 5c27c960f0
commit bd33c0c66b
30 changed files with 2291 additions and 27 deletions

3
.gitignore vendored
View file

@ -1,3 +0,0 @@
/config.json
/chihaya
/Godeps/_workspace

View file

@ -1,24 +0,0 @@
language: go
go:
- 1.5
- tip
sudo: false
before_install:
- go get github.com/tools/godep
- godep restore
script:
- go test -v ./...
notifications:
irc:
channels:
- "irc.freenode.net#chihaya"
use_notice: true
skip_join: true
on_success: always
on_failure: always
email: false

47
chihaya.go Normal file
View file

@ -0,0 +1,47 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.package middleware
package chihaya
import (
"net"
"time"
)
// AnnounceRequest represents the parsed parameters from an announce request.
type AnnounceRequest map[string]interface{}
// AnnounceResponse represents the parameters used to create an announce
// response.
type AnnounceResponse struct {
Compact bool
Complete int32
Incomplete int32
Interval time.Duration
MinInterval time.Duration
IPv4Peers []Peer
IPv6Peers []Peer
}
// ScrapeRequest represents the parsed parameters from a scrape request.
type ScrapeRequest map[string]interface{}
// ScrapeResponse represents the parameters used to create a scrape response.
type ScrapeResponse struct {
Files map[string]Scrape
}
// Scrape represents the state of a swarm that is returned in a scrape response.
type Scrape struct {
Complete int32
Incomplete int32
}
// Peer represents the connection details of a peer that is returned in an
// announce response.
type Peer struct {
ID string
IP net.IP
Port uint16
}

47
cmd/chihaya/main.go Normal file
View file

@ -0,0 +1,47 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package main
import (
"flag"
"log"
"os"
"os/signal"
"syscall"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/server"
"github.com/chihaya/chihaya/tracker"
)
var configPath string
func init() {
flag.StringVar(&configPath, "config", "", "path to the configuration file")
}
func main() {
flag.Parse()
cfg, err := config.Open(configPath)
if err != nil {
log.Fatal("failed to load config: " + err.Error())
}
tkr, err := tracker.NewTracker(&cfg.Tracker)
if err != nil {
log.Fatal("failed to create tracker: " + err.Error())
}
pool, err := server.StartPool(cfg.Servers, tkr)
if err != nil {
log.Fatal("failed to create server pool: " + err.Error())
}
shutdown := make(chan os.Signal)
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
<-shutdown
pool.Stop()
}

84
config/config.go Normal file
View file

@ -0,0 +1,84 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package config implements the opening and parsing of a chihaya configuration.
package config
import (
"io"
"io/ioutil"
"os"
"time"
"gopkg.in/yaml.v2"
)
// DefaultConfig is a sane configuration used as a fallback or for testing.
var DefaultConfig = Config{
Tracker: TrackerConfig{
AnnounceInterval: 10 * time.Minute,
MinAnnounceInterval: 5 * time.Minute,
AnnounceMiddleware: []string{},
ScrapeMiddleware: []string{},
},
Servers: []ServerConfig{},
}
// Config represents the global configuration of a chihaya binary.
type Config struct {
Tracker TrackerConfig `yaml:"tracker"`
Servers []ServerConfig `yaml:"servers"`
}
// TrackerConfig represents the configuration of the BitTorrent tracker used by
// chihaya.
type TrackerConfig struct {
AnnounceInterval time.Duration `yaml:"announce"`
MinAnnounceInterval time.Duration `yaml:"minAnnounce"`
AnnounceMiddleware []string `yaml:"announceMiddleware"`
ScrapeMiddleware []string `yaml:"scrapeMiddleware"`
}
// ServerConfig represents the configuration of the servers started by chihaya.
type ServerConfig struct {
Name string `yaml:"name"`
Config interface{} `yaml:"config"`
}
// Open is a shortcut to open a file, read it, and allocates a new Config.
// It supports relative and absolute paths. Given "", it returns DefaultConfig.
func Open(path string) (*Config, error) {
if path == "" {
return &DefaultConfig, nil
}
f, err := os.Open(os.ExpandEnv(path))
if err != nil {
return nil, err
}
defer f.Close()
cfg, err := Decode(f)
if err != nil {
return nil, err
}
return cfg, nil
}
// Decode unmarshals an io.Reader into a newly allocated *Config.
func Decode(r io.Reader) (*Config, error) {
contents, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
cfg := &Config{}
err = yaml.Unmarshal(contents, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}

39
config/example.yaml Normal file
View file

@ -0,0 +1,39 @@
# Copyright 2016 The Chihaya Authors. All rights reserved.
# Use of this source code is governed by the BSD 2-Clause license,
# which can be found in the LICENSE file.
chihaya:
tracker:
announce: "10m"
minAnnounce: "5m"
announceMiddleware:
- "prometheus"
- "storeClientValidation"
- "storeCreateOnAnnounce"
scrapeMiddleware:
- "prometheus"
- "storeClientValidation"
servers:
- name: "store"
config:
addr: "localhost:6880"
requestTimeout: "10s"
readTimeout: "10s"
writeTimeout: "10s"
clientStore: "memory"
peerStore: "memory"
peerStoreConfig:
gcAfter: "30m"
shards: 1
- name: "http"
config:
addr: "localhost:6881"
requestTimeout: "10s"
readTimeout: "10s"
writeTimeout: "10s"
- name: "udp"
config:
addr: "localhost:6882"

23
pkg/bencode/bencode.go Normal file
View file

@ -0,0 +1,23 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package bencode implements bencoding of data as defined in BEP 3 using
// type assertion over reflection for performance.
package bencode
// Dict represents a bencode dictionary.
type Dict map[string]interface{}
// NewDict allocates the memory for a Dict.
func NewDict() Dict {
return make(Dict)
}
// List represents a bencode list.
type List []interface{}
// NewList allocates the memory for a List.
func NewList() List {
return make(List, 0)
}

135
pkg/bencode/decoder.go Normal file
View file

@ -0,0 +1,135 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package bencode
import (
"bufio"
"bytes"
"errors"
"io"
"strconv"
)
// A Decoder reads bencoded objects from an input stream.
type Decoder struct {
r *bufio.Reader
}
// NewDecoder returns a new decoder that reads from r.
func NewDecoder(r io.Reader) *Decoder {
return &Decoder{r: bufio.NewReader(r)}
}
// Decode unmarshals the next bencoded value in the stream.
func (dec *Decoder) Decode() (interface{}, error) {
return unmarshal(dec.r)
}
// Unmarshal deserializes and returns the bencoded value in buf.
func Unmarshal(buf []byte) (interface{}, error) {
r := bufio.NewReader(bytes.NewBuffer(buf))
return unmarshal(r)
}
// unmarshal reads bencoded values from a bufio.Reader
func unmarshal(r *bufio.Reader) (interface{}, error) {
tok, err := r.ReadByte()
if err != nil {
return nil, err
}
switch tok {
case 'i':
return readTerminatedInt(r, 'e')
case 'l':
list := NewList()
for {
ok, err := readTerminator(r, 'e')
if err != nil {
return nil, err
} else if ok {
break
}
v, err := unmarshal(r)
if err != nil {
return nil, err
}
list = append(list, v)
}
return list, nil
case 'd':
dict := NewDict()
for {
ok, err := readTerminator(r, 'e')
if err != nil {
return nil, err
} else if ok {
break
}
v, err := unmarshal(r)
if err != nil {
return nil, err
}
key, ok := v.(string)
if !ok {
return nil, errors.New("bencode: non-string map key")
}
dict[key], err = unmarshal(r)
if err != nil {
return nil, err
}
}
return dict, nil
default:
err = r.UnreadByte()
if err != nil {
return nil, err
}
length, err := readTerminatedInt(r, ':')
if err != nil {
return nil, errors.New("bencode: unknown input sequence")
}
buf := make([]byte, length)
n, err := r.Read(buf)
if err != nil {
return nil, err
} else if int64(n) != length {
return nil, errors.New("bencode: short read")
}
return string(buf), nil
}
}
func readTerminator(r *bufio.Reader, term byte) (bool, error) {
tok, err := r.ReadByte()
if err != nil {
return false, err
} else if tok == term {
return true, nil
}
return false, r.UnreadByte()
}
func readTerminatedInt(r *bufio.Reader, term byte) (int64, error) {
buf, err := r.ReadSlice(term)
if err != nil {
return 0, err
} else if len(buf) <= 1 {
return 0, errors.New("bencode: empty integer field")
}
return strconv.ParseInt(string(buf[:len(buf)-1]), 10, 64)
}

View file

@ -0,0 +1,86 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package bencode
import (
"testing"
"github.com/stretchr/testify/assert"
)
var unmarshalTests = []struct {
input string
expected interface{}
}{
{"i42e", int64(42)},
{"i-42e", int64(-42)},
{"7:example", "example"},
{"l3:one3:twoe", List{"one", "two"}},
{"le", List{}},
{"d3:one2:aa3:two2:bbe", Dict{"one": "aa", "two": "bb"}},
{"de", Dict{}},
}
func TestUnmarshal(t *testing.T) {
for _, tt := range unmarshalTests {
got, err := Unmarshal([]byte(tt.input))
assert.Nil(t, err, "unmarshal should not fail")
assert.Equal(t, got, tt.expected, "unmarshalled values should match the expected results")
}
}
type bufferLoop struct {
val string
}
func (r *bufferLoop) Read(b []byte) (int, error) {
n := copy(b, r.val)
return n, nil
}
func BenchmarkUnmarshalScalar(b *testing.B) {
d1 := NewDecoder(&bufferLoop{"7:example"})
d2 := NewDecoder(&bufferLoop{"i42e"})
for i := 0; i < b.N; i++ {
d1.Decode()
d2.Decode()
}
}
func TestUnmarshalLarge(t *testing.T) {
data := Dict{
"k1": List{"a", "b", "c"},
"k2": int64(42),
"k3": "val",
"k4": int64(-42),
}
buf, _ := Marshal(data)
dec := NewDecoder(&bufferLoop{string(buf)})
got, err := dec.Decode()
assert.Nil(t, err, "decode should not fail")
assert.Equal(t, got, data, "encoding and decoding should equal the original value")
}
func BenchmarkUnmarshalLarge(b *testing.B) {
data := map[string]interface{}{
"k1": []string{"a", "b", "c"},
"k2": 42,
"k3": "val",
"k4": uint(42),
}
buf, _ := Marshal(data)
dec := NewDecoder(&bufferLoop{string(buf)})
for i := 0; i < b.N; i++ {
dec.Decode()
}
}

157
pkg/bencode/encoder.go Normal file
View file

@ -0,0 +1,157 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package bencode
import (
"bytes"
"fmt"
"io"
"strconv"
"time"
)
// An Encoder writes bencoded objects to an output stream.
type Encoder struct {
w io.Writer
}
// NewEncoder returns a new encoder that writes to w.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: w}
}
// Encode writes the bencoding of v to the stream.
func (enc *Encoder) Encode(v interface{}) error {
return marshal(enc.w, v)
}
// Marshal returns the bencoding of v.
func Marshal(v interface{}) ([]byte, error) {
buf := &bytes.Buffer{}
err := marshal(buf, v)
return buf.Bytes(), err
}
// Marshaler is the interface implemented by objects that can marshal
// themselves.
type Marshaler interface {
MarshalBencode() ([]byte, error)
}
// marshal writes types bencoded to an io.Writer
func marshal(w io.Writer, data interface{}) error {
switch v := data.(type) {
case Marshaler:
bencoded, err := v.MarshalBencode()
if err != nil {
return err
}
_, err = w.Write(bencoded)
if err != nil {
return err
}
case string:
marshalString(w, v)
case int:
marshalInt(w, int64(v))
case uint:
marshalUint(w, uint64(v))
case int16:
marshalInt(w, int64(v))
case uint16:
marshalUint(w, uint64(v))
case int64:
marshalInt(w, v)
case uint64:
marshalUint(w, v)
case []byte:
marshalBytes(w, v)
case time.Duration: // Assume seconds
marshalInt(w, int64(v/time.Second))
case Dict:
marshal(w, map[string]interface{}(v))
case []Dict:
w.Write([]byte{'l'})
for _, val := range v {
err := marshal(w, val)
if err != nil {
return err
}
}
w.Write([]byte{'e'})
case map[string]interface{}:
w.Write([]byte{'d'})
for key, val := range v {
marshalString(w, key)
err := marshal(w, val)
if err != nil {
return err
}
}
w.Write([]byte{'e'})
case []string:
w.Write([]byte{'l'})
for _, val := range v {
err := marshal(w, val)
if err != nil {
return err
}
}
w.Write([]byte{'e'})
case List:
marshal(w, []interface{}(v))
case []interface{}:
w.Write([]byte{'l'})
for _, val := range v {
err := marshal(w, val)
if err != nil {
return err
}
}
w.Write([]byte{'e'})
default:
return fmt.Errorf("attempted to marshal unsupported type:\n%t", v)
}
return nil
}
func marshalInt(w io.Writer, v int64) {
w.Write([]byte{'i'})
w.Write([]byte(strconv.FormatInt(v, 10)))
w.Write([]byte{'e'})
}
func marshalUint(w io.Writer, v uint64) {
w.Write([]byte{'i'})
w.Write([]byte(strconv.FormatUint(v, 10)))
w.Write([]byte{'e'})
}
func marshalBytes(w io.Writer, v []byte) {
w.Write([]byte(strconv.Itoa(len(v))))
w.Write([]byte{':'})
w.Write(v)
}
func marshalString(w io.Writer, v string) {
marshalBytes(w, []byte(v))
}

View file

@ -0,0 +1,71 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package bencode
import (
"bytes"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var marshalTests = []struct {
input interface{}
expected []string
}{
{int(42), []string{"i42e"}},
{int(-42), []string{"i-42e"}},
{uint(43), []string{"i43e"}},
{int64(44), []string{"i44e"}},
{uint64(45), []string{"i45e"}},
{int16(44), []string{"i44e"}},
{uint16(45), []string{"i45e"}},
{"example", []string{"7:example"}},
{[]byte("example"), []string{"7:example"}},
{30 * time.Minute, []string{"i1800e"}},
{[]string{"one", "two"}, []string{"l3:one3:twoe", "l3:two3:onee"}},
{[]interface{}{"one", "two"}, []string{"l3:one3:twoe", "l3:two3:onee"}},
{[]string{}, []string{"le"}},
{map[string]interface{}{"one": "aa", "two": "bb"}, []string{"d3:one2:aa3:two2:bbe", "d3:two2:bb3:one2:aae"}},
{map[string]interface{}{}, []string{"de"}},
}
func TestMarshal(t *testing.T) {
for _, test := range marshalTests {
got, err := Marshal(test.input)
assert.Nil(t, err, "marshal should not fail")
assert.Contains(t, test.expected, string(got), "the marshaled result should be one of the expected permutations")
}
}
func BenchmarkMarshalScalar(b *testing.B) {
buf := &bytes.Buffer{}
encoder := NewEncoder(buf)
for i := 0; i < b.N; i++ {
encoder.Encode("test")
encoder.Encode(123)
}
}
func BenchmarkMarshalLarge(b *testing.B) {
data := map[string]interface{}{
"k1": []string{"a", "b", "c"},
"k2": 42,
"k3": "val",
"k4": uint(42),
}
buf := &bytes.Buffer{}
encoder := NewEncoder(buf)
for i := 0; i < b.N; i++ {
encoder.Encode(data)
}
}

23
pkg/clientid/client_id.go Normal file
View file

@ -0,0 +1,23 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package clientid implements the parsing of BitTorrent ClientIDs from
// BitTorrent PeerIDs.
package clientid
// New returns the part of a PeerID that identifies a peer's client software.
func New(peerID string) (clientID string) {
length := len(peerID)
if length >= 6 {
if peerID[0] == '-' {
if length >= 7 {
clientID = peerID[1:7]
}
} else {
clientID = peerID[:6]
}
}
return
}

View file

@ -0,0 +1,62 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package clientid
import "testing"
func TestClientID(t *testing.T) {
var clientTable = []struct {
peerID string
clientID string
}{
{"-AZ3034-6wfG2wk6wWLc", "AZ3034"},
{"-AZ3042-6ozMq5q6Q3NX", "AZ3042"},
{"-BS5820-oy4La2MWGEFj", "BS5820"},
{"-AR6360-6oZyyMWoOOBe", "AR6360"},
{"-AG2083-s1hiF8vGAAg0", "AG2083"},
{"-AG3003-lEl2Mm4NEO4n", "AG3003"},
{"-MR1100-00HS~T7*65rm", "MR1100"},
{"-LK0140-ATIV~nbEQAMr", "LK0140"},
{"-KT2210-347143496631", "KT2210"},
{"-TR0960-6ep6svaa61r4", "TR0960"},
{"-XX1150-dv220cotgj4d", "XX1150"},
{"-AZ2504-192gwethivju", "AZ2504"},
{"-KT4310-3L4UvarKuqIu", "KT4310"},
{"-AZ2060-0xJQ02d4309O", "AZ2060"},
{"-BD0300-2nkdf08Jd890", "BD0300"},
{"-A~0010-a9mn9DFkj39J", "A~0010"},
{"-UT2300-MNu93JKnm930", "UT2300"},
{"-UT2300-KT4310KT4301", "UT2300"},
{"T03A0----f089kjsdf6e", "T03A0-"},
{"S58B-----nKl34GoNb75", "S58B--"},
{"M4-4-0--9aa757Efd5Bl", "M4-4-0"},
{"AZ2500BTeYUzyabAfo6U", "AZ2500"}, // BitTyrant
{"exbc0JdSklm834kj9Udf", "exbc0J"}, // Old BitComet
{"FUTB0L84j542mVc84jkd", "FUTB0L"}, // Alt BitComet
{"XBT054d-8602Jn83NnF9", "XBT054"}, // XBT
{"OP1011affbecbfabeefb", "OP1011"}, // Opera
{"-ML2.7.2-kgjjfkd9762", "ML2.7."}, // MLDonkey
{"-BOWA0C-SDLFJWEIORNM", "BOWA0C"}, // Bits on Wheels
{"Q1-0-0--dsn34DFn9083", "Q1-0-0"}, // Queen Bee
{"Q1-10-0-Yoiumn39BDfO", "Q1-10-"}, // Queen Bee Alt
{"346------SDFknl33408", "346---"}, // TorreTopia
{"QVOD0054ABFFEDCCDEDB", "QVOD00"}, // Qvod
{"", ""},
{"-", ""},
{"12345", ""},
{"-12345", ""},
{"123456", "123456"},
{"-123456", "123456"},
}
for _, tt := range clientTable {
if parsedID := New(tt.peerID); parsedID != tt.clientID {
t.Error("Incorrectly parsed peer ID", tt.peerID, "as", parsedID)
}
}
}

70
pkg/event/event.go Normal file
View file

@ -0,0 +1,70 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package event implements type-level constraints for dealing with the events
// communicated via BitTorrent announce.
package event
import (
"errors"
"strings"
)
// ErrUnknownEvent is returned when New fails to return an event.
var ErrUnknownEvent = errors.New("unknown event")
// Event represents an event done by a BitTorrent client.
type event uint8
const (
// None is the event when a BitTorrent client announces due to time lapsed
// since the previous announce.
None event = iota
// Started is the event sent by a BitTorrent client when it joins a swarm.
Started
// Stopped is the event sent by a BitTorrent client when it leaves a swarm.
Stopped
// Completed is the event sent by a BitTorrent client when it finishes
// downloading all of the required chunks.
Completed
)
var (
eventToString = make(map[event]string)
stringToEvent = make(map[string]event)
)
func init() {
eventToString[None] = "none"
eventToString[Started] = "started"
eventToString[Stopped] = "stopped"
eventToString[Completed] = "completed"
stringToEvent[""] = None
stringToEvent["none"] = None
stringToEvent["started"] = Started
stringToEvent["stopped"] = Stopped
stringToEvent["completed"] = Completed
}
// New returns the proper Event given a string.
func New(eventStr string) (event, error) {
if e, ok := stringToEvent[strings.ToLower(eventStr)]; ok {
return e, nil
}
return None, ErrUnknownEvent
}
// String implements Stringer for an event.
func (e event) String() string {
if name, ok := eventToString[e]; ok {
return name
}
panic("event: event has no associated name")
}

33
pkg/event/event_test.go Normal file
View file

@ -0,0 +1,33 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package event
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNew(t *testing.T) {
var table = []struct {
data string
expected event
expectedErr error
}{
{"", None, nil},
{"NONE", None, nil},
{"none", None, nil},
{"started", Started, nil},
{"stopped", Stopped, nil},
{"completed", Completed, nil},
{"notAnEvent", None, ErrUnknownEvent},
}
for _, tt := range table {
got, err := New(tt.data)
assert.Equal(t, err, tt.expectedErr, "errors should equal the expected value")
assert.Equal(t, got, tt.expected, "events should equal the expected value")
}
}

33
server/http/config.go Normal file
View file

@ -0,0 +1,33 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package http
import (
"time"
"gopkg.in/yaml.v2"
)
type httpConfig struct {
Addr string `yaml:"addr"`
RequestTimeout time.Duration `yaml:"requestTimeout"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
}
func newHTTPConfig(srvcfg interface{}) (*httpConfig, error) {
bytes, err := yaml.Marshal(srvcfg)
if err != nil {
return nil, err
}
var cfg httpConfig
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}

239
server/http/query/query.go Normal file
View file

@ -0,0 +1,239 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package query implements a simple, fast URL parser designed to be used to
// parse parameters sent from BitTorrent clients. The last value of a key wins,
// except for they key "info_hash".
package query
import (
"errors"
"net/url"
"strconv"
"strings"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/pkg/event"
)
// ErrKeyNotFound is returned when a provided key has no value associated with
// it.
var ErrKeyNotFound = errors.New("query: value for the provided key does not exist")
// Query represents a parsed URL.Query.
type Query struct {
query string
infohashes []string
params map[string]string
}
// New parses a raw URL query.
func New(query string) (*Query, error) {
var (
keyStart, keyEnd int
valStart, valEnd int
firstInfohash string
onKey = true
hasInfohash = false
q = &Query{
query: query,
infohashes: nil,
params: make(map[string]string),
}
)
for i, length := 0, len(query); i < length; i++ {
separator := query[i] == '&' || query[i] == ';' || query[i] == '?'
last := i == length-1
if separator || last {
if onKey && !last {
keyStart = i + 1
continue
}
if last && !separator && !onKey {
valEnd = i
}
keyStr, err := url.QueryUnescape(query[keyStart : keyEnd+1])
if err != nil {
return nil, err
}
var valStr string
if valEnd > 0 {
valStr, err = url.QueryUnescape(query[valStart : valEnd+1])
if err != nil {
return nil, err
}
}
q.params[strings.ToLower(keyStr)] = valStr
if keyStr == "info_hash" {
if hasInfohash {
// Multiple infohashes
if q.infohashes == nil {
q.infohashes = []string{firstInfohash}
}
q.infohashes = append(q.infohashes, valStr)
} else {
firstInfohash = valStr
hasInfohash = true
}
}
valEnd = 0
onKey = true
keyStart = i + 1
} else if query[i] == '=' {
onKey = false
valStart = i + 1
valEnd = 0
} else if onKey {
keyEnd = i
} else {
valEnd = i
}
}
return q, nil
}
// Infohashes returns a list of requested infohashes.
func (q *Query) Infohashes() ([]string, error) {
if q.infohashes == nil {
infohash, err := q.String("info_hash")
if err != nil {
return nil, err
}
return []string{infohash}, nil
}
return q.infohashes, nil
}
// String returns a string parsed from a query. Every key can be returned as a
// string because they are encoded in the URL as strings.
func (q *Query) String(key string) (string, error) {
val, exists := q.params[key]
if !exists {
return "", ErrKeyNotFound
}
return val, nil
}
// Uint64 returns a uint parsed from a query. After being called, it is safe to
// cast the uint64 to your desired length.
func (q *Query) Uint64(key string) (uint64, error) {
str, exists := q.params[key]
if !exists {
return 0, ErrKeyNotFound
}
val, err := strconv.ParseUint(str, 10, 64)
if err != nil {
return 0, err
}
return val, nil
}
// AnnounceRequest generates an chihaya.AnnounceRequest with the parameters
// provided by a query.
func (q *Query) AnnounceRequest() (chihaya.AnnounceRequest, error) {
request := make(chihaya.AnnounceRequest)
request["query"] = q.query
eventStr, err := q.String("event")
if err != nil {
return nil, errors.New("failed to parse parameter: event")
}
request["event"], err = event.New(eventStr)
if err != nil {
return nil, errors.New("failed to provide valid client event")
}
compactStr, err := q.String("compact")
if err != nil {
return nil, errors.New("failed to parse parameter: compact")
}
request["compact"] = compactStr != "0"
request["info_hash"], err = q.String("info_hash")
if err != nil {
return nil, errors.New("failed to parse parameter: info_hash")
}
request["peer_id"], err = q.String("peer_id")
if err != nil {
return nil, errors.New("failed to parse parameter: peer_id")
}
request["left"], err = q.Uint64("left")
if err != nil {
return nil, errors.New("failed to parse parameter: left")
}
request["downloaded"], err = q.Uint64("downloaded")
if err != nil {
return nil, errors.New("failed to parse parameter: downloaded")
}
request["uploaded"], err = q.Uint64("uploaded")
if err != nil {
return nil, errors.New("failed to parse parameter: uploaded")
}
request["numwant"], err = q.String("numwant")
if err != nil {
return nil, errors.New("failed to parse parameter: numwant")
}
request["ip"], err = q.Uint64("port")
if err != nil {
return nil, errors.New("failed to parse parameter: port")
}
request["port"], err = q.Uint64("port")
if err != nil {
return nil, errors.New("failed to parse parameter: port")
}
request["ip"], err = q.String("ip")
if err != nil {
return nil, errors.New("failed to parse parameter: ip")
}
request["ipv4"], err = q.String("ipv4")
if err != nil {
return nil, errors.New("failed to parse parameter: ipv4")
}
request["ipv6"], err = q.String("ipv6")
if err != nil {
return nil, errors.New("failed to parse parameter: ipv6")
}
return request, nil
}
// ScrapeRequest generates an chihaya.ScrapeRequeset with the parameters
// provided by a query.
func (q *Query) ScrapeRequest() (chihaya.ScrapeRequest, error) {
request := make(chihaya.ScrapeRequest)
var err error
request["info_hash"], err = q.Infohashes()
if err != nil {
return nil, errors.New("failed to parse parameter: info_hash")
}
return request, nil
}

147
server/http/server.go Normal file
View file

@ -0,0 +1,147 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package http
import (
"errors"
"log"
"net"
"net/http"
"github.com/julienschmidt/httprouter"
"github.com/tylerb/graceful"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/server"
"github.com/chihaya/chihaya/server/http/query"
"github.com/chihaya/chihaya/tracker"
)
func init() {
server.Register("http", constructor)
}
func constructor(srvcfg *config.ServerConfig, tkr *tracker.Tracker) (server.Server, error) {
cfg, err := newHTTPConfig(srvcfg)
if err != nil {
return nil, errors.New("http: invalid config: " + err.Error())
}
return &httpServer{
cfg: cfg,
tkr: tkr,
}, nil
}
type httpServer struct {
cfg *httpConfig
tkr *tracker.Tracker
grace *graceful.Server
stopping bool
}
func (s *httpServer) Start() {
s.grace = &graceful.Server{
Server: &http.Server{
Addr: s.cfg.Addr,
Handler: s.routes(),
ReadTimeout: s.cfg.ReadTimeout,
WriteTimeout: s.cfg.WriteTimeout,
},
Timeout: s.cfg.RequestTimeout,
NoSignalHandling: true,
ShutdownInitiated: func() { s.stopping = true },
ConnState: func(conn net.Conn, state http.ConnState) {
switch state {
case http.StateNew:
//stats.RecordEvent(stats.AcceptedConnection)
case http.StateClosed:
//stats.RecordEvent(stats.ClosedConnection)
case http.StateHijacked:
panic("http: connection impossibly hijacked")
// Ignore the following cases.
case http.StateActive, http.StateIdle:
default:
panic("http: connection transitioned to unknown state")
}
},
}
s.grace.SetKeepAlivesEnabled(false)
if err := s.grace.ListenAndServe(); err != nil {
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
log.Printf("Failed to gracefully run HTTP server: %s", err.Error())
return
}
}
}
func (s *httpServer) Stop() {
if !s.stopping {
s.grace.Stop(s.grace.Timeout)
}
s.grace = nil
s.stopping = false
}
func (s *httpServer) routes() *httprouter.Router {
r := httprouter.New()
r.GET("/announce", s.serveAnnounce)
r.GET("/scrape", s.serveScrape)
return r
}
func (s *httpServer) serveAnnounce(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
writer := &writer{w}
q, err := query.New(r.URL.RawQuery)
if err != nil {
writer.writeError(err)
return
}
req, err := q.AnnounceRequest()
if err != nil {
writer.writeError(err)
return
}
resp, err := s.tkr.HandleAnnounce(req)
if err != nil {
writer.writeError(err)
return
}
writer.writeAnnounceResponse(resp)
}
func (s *httpServer) serveScrape(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
writer := &writer{w}
q, err := query.New(r.URL.RawQuery)
if err != nil {
writer.writeError(err)
return
}
req, err := q.ScrapeRequest()
if err != nil {
writer.writeError(err)
return
}
resp, err := s.tkr.HandleScrape(req)
if err != nil {
writer.writeError(err)
return
}
writer.writeScrapeResponse(resp)
}

94
server/http/writer.go Normal file
View file

@ -0,0 +1,94 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package http
import (
"net/http"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/pkg/bencode"
)
type writer struct{ http.ResponseWriter }
func (w *writer) writeError(err error) error {
return bencode.NewEncoder(w).Encode(bencode.Dict{
"failure reason": err.Error(),
})
}
func (w *writer) writeAnnounceResponse(resp *chihaya.AnnounceResponse) error {
bdict := bencode.Dict{
"complete": resp.Complete,
"incomplete": resp.Incomplete,
"interval": resp.Interval,
"min interval": resp.MinInterval,
}
// Add the peers to the dictionary in the compact format.
if resp.Compact {
var IPv4CompactDict, IPv6CompactDict []byte
// Add the IPv4 peers to the dictionary.
for _, peer := range resp.IPv4Peers {
IPv4CompactDict = append(IPv4CompactDict, compact(peer)...)
}
if len(IPv4CompactDict) > 0 {
bdict["peers"] = IPv4CompactDict
}
// Add the IPv6 peers to the dictionary.
for _, peer := range resp.IPv6Peers {
IPv6CompactDict = append(IPv6CompactDict, compact(peer)...)
}
if len(IPv6CompactDict) > 0 {
bdict["peers6"] = IPv6CompactDict
}
return bencode.NewEncoder(w).Encode(bdict)
}
// Add the peers to the dictionary.
var peers []bencode.Dict
for _, peer := range resp.IPv4Peers {
peers = append(peers, dict(peer))
}
for _, peer := range resp.IPv6Peers {
peers = append(peers, dict(peer))
}
bdict["peers"] = peers
return bencode.NewEncoder(w).Encode(bdict)
}
func (w *writer) writeScrapeResponse(resp *chihaya.ScrapeResponse) error {
filesDict := bencode.NewDict()
for infohash, scrape := range resp.Files {
filesDict[infohash] = bencode.Dict{
"complete": scrape.Complete,
"incomplete": scrape.Incomplete,
"downloaded": scrape.Downloaded,
}
}
return bencode.NewEncoder(w).Encode(bencode.Dict{
"files": filesDict,
})
}
func compact(peer chihaya.Peer) (buf []byte) {
buf = []byte(peer.IP)
buf = append(buf, byte(peer.Port>>8))
buf = append(buf, byte(peer.Port&0xff))
return
}
func dict(peer chihaya.Peer) bencode.Dict {
return bencode.Dict{
"peer id": peer.ID,
"ip": peer.IP.String(),
"port": peer.Port,
}
}

View file

@ -0,0 +1,30 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package http
import (
"errors"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWriteError(t *testing.T) {
var table = []struct {
reason, expected string
}{
{"hello world", "d14:failure reason11:hello worlde"},
{"what's up", "d14:failure reason9:what's upe"},
}
for _, tt := range table {
r := httptest.NewRecorder()
w := &writer{r}
err := w.writeError(errors.New(tt.reason))
assert.Nil(t, err, "writeError should not fail with test input")
assert.Equal(t, r.Body.String(), tt.expected, "writer should write the expected value")
}
}

53
server/pool.go Normal file
View file

@ -0,0 +1,53 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package server
import (
"sync"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/tracker"
)
// Pool represents a running pool of servers.
type Pool struct {
servers []Server
wg sync.WaitGroup
}
// StartPool creates a new pool of servers specified by the provided config and
// runs them.
func StartPool(cfgs []config.ServerConfig, tkr *tracker.Tracker) (*Pool, error) {
var servers []Server
var wg sync.WaitGroup
for _, cfg := range cfgs {
srv, err := New(&cfg, tkr)
if err != nil {
return nil, err
}
wg.Add(1)
go func(srv Server) {
defer wg.Done()
srv.Start()
}(srv)
servers = append(servers, srv)
}
return &Pool{
servers: servers,
wg: wg,
}, nil
}
// Stop safely shuts down a pool of servers.
func (p *Pool) Stop() {
for _, srv := range p.servers {
srv.Stop()
}
p.wg.Wait()
}

54
server/server.go Normal file
View file

@ -0,0 +1,54 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package server implements an abstraction over servers meant to be run .
// alongside a tracker.
//
// Servers may be implementations of different transport protocols or have their
// own custom behavior.
package server
import (
"fmt"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/tracker"
)
var constructors = make(map[string]Constructor)
// Constructor is a function that creates a new Server.
type Constructor func(*config.ServerConfig, *tracker.Tracker) (Server, error)
// Register makes a Constructor available by the provided name.
//
// If this function is called twice with the same name or if the Constructor is
// nil, it panics.
func Register(name string, con Constructor) {
if con == nil {
panic("server: could not register nil Constructor")
}
if _, dup := constructors[name]; dup {
panic("server: could not register duplicate Constructor: " + name)
}
constructors[name] = con
}
// New creates a Server specified by a configuration.
func New(cfg *config.ServerConfig, tkr *tracker.Tracker) (Server, error) {
con, ok := constructors[cfg.Name]
if !ok {
return nil, fmt.Errorf(
"server: unknown Constructor %q (forgotten import?)",
cfg.Name,
)
}
return con(cfg, tkr)
}
// Server represents one instance of a server accessing the tracker.
type Server interface {
Start()
Stop()
}

View file

@ -0,0 +1,49 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package store
import "fmt"
var clientStoreDrivers = make(map[string]ClientStoreDriver)
// ClientStore represents an interface for manipulating clientIDs.
type ClientStore interface {
CreateClient(clientID string) error
FindClient(peerID string) (bool, error)
DeleteClient(clientID string) error
}
// ClientStoreDriver represents an interface for creating a handle to the
// storage of swarms.
type ClientStoreDriver interface {
New(*Config) (ClientStore, error)
}
// RegisterClientStoreDriver makes a driver available by the provided name.
//
// If this function is called twice with the same name or if the driver is nil,
// it panics.
func RegisterClientStoreDriver(name string, driver ClientStoreDriver) {
if driver == nil {
panic("store: could not register nil ClientStoreDriver")
}
if _, dup := clientStoreDrivers[name]; dup {
panic("store: could not register duplicate ClientStoreDriver: " + name)
}
clientStoreDrivers[name] = driver
}
// OpenClientStore returns a ClientStore specified by a configuration.
func OpenClientStore(name string, cfg *Config) (ClientStore, error) {
driver, ok := clientStoreDrivers[name]
if !ok {
return nil, fmt.Errorf(
"store: unknown driver %q (forgotten import?)",
name,
)
}
return driver.New(cfg)
}

View file

@ -0,0 +1,59 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package memory
import (
"sync"
"github.com/chihaya/chihaya/pkg/clientid"
"github.com/chihaya/chihaya/server/store"
)
func init() {
store.RegisterClientStoreDriver("memory", &clientStoreDriver{})
}
type clientStoreDriver struct{}
func (d *clientStoreDriver) New(cfg *store.Config) (store.ClientStore, error) {
return &clientStore{
clientIDs: make(map[string]struct{}),
}, nil
}
type clientStore struct {
clientIDs map[string]struct{}
sync.RWMutex
}
var _ store.ClientStore = &clientStore{}
func (s *clientStore) CreateClient(clientID string) error {
s.Lock()
defer s.Unlock()
s.clientIDs[clientID] = struct{}{}
return nil
}
func (s *clientStore) FindClient(peerID string) (bool, error) {
clientID := clientid.New(peerID)
s.Lock()
defer s.Unlock()
_, ok := s.clientIDs[clientID]
return ok, nil
}
func (s *clientStore) DeleteClient(clientID string) error {
s.Lock()
defer s.Unlock()
delete(s.clientIDs, clientID)
return nil
}

View file

@ -0,0 +1,272 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package memory
import (
"hash/fnv"
"runtime"
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/server/store"
)
func init() {
store.RegisterPeerStoreDriver("memory", &peerStoreDriver{})
}
type peerStoreDriver struct{}
func (d *peerStoreDriver) New(storecfg *store.Config) (store.PeerStore, error) {
cfg, err := newPeerStoreConfig(storecfg)
if err != nil {
return nil, err
}
return &peerStore{
shards: make([]*peerShard, cfg.Shards),
}, nil
}
type peerStoreConfig struct {
Shards int `yaml:"shards"`
}
func newPeerStoreConfig(storecfg *store.Config) (*peerStoreConfig, error) {
bytes, err := yaml.Marshal(storecfg.PeerStoreConfig)
if err != nil {
return nil, err
}
var cfg peerStoreConfig
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
const seedersSuffix = "-seeders"
const leechersSuffix = "-leechers"
type peer struct {
chihaya.Peer
LastAction time.Time
}
type peerShard struct {
peers map[string]map[string]peer
sync.RWMutex
}
type peerStore struct {
shards []*peerShard
}
var _ store.PeerStore = &peerStore{}
func (s *peerStore) shardIndex(infohash string) uint32 {
idx := fnv.New32()
idx.Write([]byte(infohash))
return idx.Sum32() % uint32(len(s.shards))
}
func (s *peerStore) PutSeeder(infohash string, p chihaya.Peer) error {
key := infohash + seedersSuffix
shard := s.shards[s.shardIndex(infohash)]
shard.Lock()
defer shard.Unlock()
if shard.peers[key] == nil {
shard.peers[key] = make(map[string]peer)
}
shard.peers[key][p.ID] = peer{
Peer: p,
LastAction: time.Now(),
}
return nil
}
func (s *peerStore) DeleteSeeder(infohash, peerID string) error {
key := infohash + seedersSuffix
shard := s.shards[s.shardIndex(infohash)]
shard.Lock()
defer shard.Unlock()
if shard.peers[key] == nil {
return nil
}
delete(shard.peers[key], peerID)
if len(shard.peers[key]) == 0 {
shard.peers[key] = nil
}
return nil
}
func (s *peerStore) PutLeecher(infohash string, p chihaya.Peer) error {
key := infohash + leechersSuffix
shard := s.shards[s.shardIndex(infohash)]
shard.Lock()
defer shard.Unlock()
if shard.peers[key] == nil {
shard.peers[key] = make(map[string]peer)
}
shard.peers[key][p.ID] = peer{
Peer: p,
LastAction: time.Now(),
}
return nil
}
func (s *peerStore) DeleteLeecher(infohash, peerID string) error {
key := infohash + leechersSuffix
shard := s.shards[s.shardIndex(infohash)]
shard.Lock()
defer shard.Unlock()
if shard.peers[key] == nil {
return nil
}
delete(shard.peers[key], peerID)
if len(shard.peers[key]) == 0 {
shard.peers[key] = nil
}
return nil
}
func (s *peerStore) GraduateLeecher(infohash string, p chihaya.Peer) error {
leecherKey := infohash + leechersSuffix
seederKey := infohash + seedersSuffix
shard := s.shards[s.shardIndex(infohash)]
shard.Lock()
defer shard.Unlock()
if shard.peers[leecherKey] != nil {
delete(shard.peers[leecherKey], p.ID)
}
if shard.peers[seederKey] == nil {
shard.peers[seederKey] = make(map[string]peer)
}
shard.peers[seederKey][p.ID] = peer{
Peer: p,
LastAction: time.Now(),
}
return nil
}
func (s *peerStore) CollectGarbage(cutoff time.Time) error {
for _, shard := range s.shards {
shard.RLock()
var keys []string
for key := range shard.peers {
keys = append(keys, key)
}
shard.RUnlock()
runtime.Gosched()
for _, key := range keys {
shard.Lock()
var peersToDelete []string
for peerID, p := range shard.peers[key] {
if p.LastAction.Before(cutoff) {
peersToDelete = append(peersToDelete, peerID)
}
}
for _, peerID := range peersToDelete {
delete(shard.peers[key], peerID)
}
shard.Unlock()
runtime.Gosched()
}
runtime.Gosched()
}
return nil
}
func (s *peerStore) AnnouncePeers(infohash string, seeder bool, numWant int) (peers, peers6 []chihaya.Peer, err error) {
leecherKey := infohash + leechersSuffix
seederKey := infohash + seedersSuffix
shard := s.shards[s.shardIndex(infohash)]
shard.RLock()
defer shard.RUnlock()
if seeder {
// Append leechers as possible.
leechers := shard.peers[leecherKey]
for _, p := range leechers {
if numWant == 0 {
break
}
if p.IP.To4() == nil {
peers6 = append(peers6, p.Peer)
} else {
peers = append(peers, p.Peer)
}
numWant--
}
} else {
// Append as many seeders as possible.
seeders := shard.peers[seederKey]
for _, p := range seeders {
if numWant == 0 {
break
}
if p.IP.To4() == nil {
peers6 = append(peers6, p.Peer)
} else {
peers = append(peers, p.Peer)
}
numWant--
}
// Append leechers until we reach numWant.
leechers := shard.peers[leecherKey]
if numWant > 0 {
for _, p := range leechers {
if numWant == 0 {
break
}
if p.IP.To4() == nil {
peers6 = append(peers6, p.Peer)
} else {
peers = append(peers, p.Peer)
}
numWant--
}
}
}
return
}

View file

@ -0,0 +1,62 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package store
import (
"fmt"
"time"
"github.com/chihaya/chihaya"
)
var peerStoreDrivers = make(map[string]PeerStoreDriver)
// PeerStore represents an interface for manipulating peers.
type PeerStore interface {
PutSeeder(infohash string, p chihaya.Peer) error
DeleteSeeder(infohash, peerID string) error
PutLeecher(infohash string, p chihaya.Peer) error
DeleteLeecher(infohash, peerID string) error
GraduateLeecher(infohash string, p chihaya.Peer) error
AnnouncePeers(infohash string, seeder bool, numWant int) (peers, peers6 []chihaya.Peer, err error)
CollectGarbage(cutoff time.Time) error
}
// PeerStoreDriver represents an interface for creating a handle to the storage
// of peers.
type PeerStoreDriver interface {
New(*Config) (PeerStore, error)
}
// RegisterPeerStoreDriver makes a driver available by the provided name.
//
// If this function is called twice with the same name or if the driver is nil,
// it panics.
func RegisterPeerStoreDriver(name string, driver PeerStoreDriver) {
if driver == nil {
panic("storage: could not register nil PeerStoreDriver")
}
if _, dup := peerStoreDrivers[name]; dup {
panic("storage: could not register duplicate PeerStoreDriver: " + name)
}
peerStoreDrivers[name] = driver
}
// OpenPeerStore returns a PeerStore specified by a configuration.
func OpenPeerStore(name string, cfg *Config) (PeerStore, error) {
driver, ok := peerStoreDrivers[name]
if !ok {
return nil, fmt.Errorf(
"storage: unknown driver %q (forgotten import?)",
name,
)
}
return driver.New(cfg)
}

95
server/store/store.go Normal file
View file

@ -0,0 +1,95 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package store
import (
"errors"
"log"
"sync"
"time"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/server"
"github.com/chihaya/chihaya/tracker"
)
var theStore *Store
func init() {
server.Register("store", constructor)
}
func constructor(srvcfg *config.ServerConfig, tkr *tracker.Tracker) (server.Server, error) {
if theStore == nil {
cfg, err := newConfig(srvcfg)
if err != nil {
return nil, errors.New("store: invalid store config: " + err.Error())
}
theStore = &Store{
cfg: cfg,
tkr: tkr,
}
}
return theStore, nil
}
type Config struct {
Addr string `yaml:"addr"`
RequestTimeout time.Duration `yaml:"requestTimeout"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
GCAfter time.Duration `yaml:"gcAfter"`
ClientStore string `yaml:"clientStore"`
ClientStoreConfig interface{} `yaml:"clienStoreConfig"`
PeerStore string `yaml:"peerStore"`
PeerStoreConfig interface{} `yaml:"peerStoreConfig"`
}
func newConfig(srvcfg interface{}) (*Config, error) {
bytes, err := yaml.Marshal(srvcfg)
if err != nil {
return nil, err
}
var cfg Config
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
// MustGetStore is used by middleware to access the store.
//
// This function calls log.Fatal if a server hasn't been already created by
// the server package.
func MustGetStore() *Store {
if theStore == nil {
log.Fatal("store middleware used without store server")
}
return theStore
}
type Store struct {
cfg *Config
tkr *tracker.Tracker
shutdown chan struct{}
wg sync.WaitGroup
PeerStore
ClientStore
}
func (s *Store) Start() {
}
func (s *Store) Stop() {
close(s.shutdown)
s.wg.Wait()
}

110
tracker/middleware.go Normal file
View file

@ -0,0 +1,110 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.package middleware
package tracker
import (
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/config"
)
// AnnounceHandler is a function that operates on an AnnounceResponse before it
// has been delivered to a client.
type AnnounceHandler func(*config.TrackerConfig, chihaya.AnnounceRequest, *chihaya.AnnounceResponse) error
func (h AnnounceHandler) handleAnnounce(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error {
return h(cfg, req, resp)
}
// AnnounceMiddleware is higher-order AnnounceHandler used to implement modular
// behavior processing an announce.
type AnnounceMiddleware func(AnnounceHandler) AnnounceHandler
type announceChain struct{ mw []AnnounceMiddleware }
func (c *announceChain) Append(mw ...AnnounceMiddleware) {
newMW := make([]AnnounceMiddleware, len(c.mw)+len(mw))
copy(newMW[:len(c.mw)], c.mw)
copy(newMW[len(c.mw):], mw)
c.mw = newMW
}
func (c *announceChain) Handler() AnnounceHandler {
final := func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error {
return nil
}
for i := len(c.mw) - 1; i >= 0; i-- {
final = c.mw[i](final)
}
return final
}
var announceMiddleware = make(map[string]AnnounceMiddleware)
// RegisterAnnounceMiddleware makes a middleware available to the tracker under
// the provided named.
//
// If this function is called twice with the same name or if the handler is nil,
// it panics.
func RegisterAnnounceMiddleware(name string, mw AnnounceMiddleware) {
if mw == nil {
panic("tracker: could not register nil AnnounceMiddleware")
}
if _, dup := announceMiddleware[name]; dup {
panic("tracker: could not register duplicate AnnounceMiddleware: " + name)
}
announceMiddleware[name] = mw
}
// ScrapeHandler is a middleware function that operates on a ScrapeResponse
// before it has been delivered to a client.
type ScrapeHandler func(*config.TrackerConfig, chihaya.ScrapeRequest, *chihaya.ScrapeResponse) error
func (h ScrapeHandler) handleScrape(cfg *config.TrackerConfig, req chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) error {
return h(cfg, req, resp)
}
// ScrapeMiddleware is higher-order ScrapeHandler used to implement modular
// behavior processing a scrape.
type ScrapeMiddleware func(ScrapeHandler) ScrapeHandler
type scrapeChain struct{ mw []ScrapeMiddleware }
func (c *scrapeChain) Append(mw ...ScrapeMiddleware) {
newMW := make([]ScrapeMiddleware, len(c.mw)+len(mw))
copy(newMW[:len(c.mw)], c.mw)
copy(newMW[len(c.mw):], mw)
c.mw = newMW
}
func (c *scrapeChain) Handler() ScrapeHandler {
final := func(cfg *config.TrackerConfig, req chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) error {
return nil
}
for i := len(c.mw) - 1; i >= 0; i-- {
final = c.mw[i](final)
}
return final
}
var scrapeMiddleware = make(map[string]ScrapeMiddleware)
// RegisterScrapeMiddleware makes a middleware available to the tracker under
// the provided named.
//
// If this function is called twice with the same name or if the handler is nil,
// it panics.
func RegisterScrapeMiddleware(name string, mw ScrapeMiddleware) {
if mw == nil {
panic("tracker: could not register nil ScrapeMiddleware")
}
if _, dup := scrapeMiddleware[name]; dup {
panic("tracker: could not register duplicate ScrapeMiddleware: " + name)
}
scrapeMiddleware[name] = mw
}

View file

@ -0,0 +1,53 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.package middleware
package tracker
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/config"
)
func testAnnounceMW1(next AnnounceHandler) AnnounceHandler {
return func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error {
resp.IPv4Peers = append(resp.IPv4Peers, chihaya.Peer{
Port: 1,
})
return next(cfg, req, resp)
}
}
func testAnnounceMW2(next AnnounceHandler) AnnounceHandler {
return func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error {
resp.IPv4Peers = append(resp.IPv4Peers, chihaya.Peer{
Port: 2,
})
return next(cfg, req, resp)
}
}
func testAnnounceMW3(next AnnounceHandler) AnnounceHandler {
return func(cfg *config.TrackerConfig, req chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) error {
resp.IPv4Peers = append(resp.IPv4Peers, chihaya.Peer{
Port: 3,
})
return next(cfg, req, resp)
}
}
func TestAnnounceChain(t *testing.T) {
var achain announceChain
achain.Append(testAnnounceMW1)
achain.Append(testAnnounceMW2)
achain.Append(testAnnounceMW3)
handler := achain.Handler()
resp := &chihaya.AnnounceResponse{}
err := handler(nil, chihaya.AnnounceRequest{}, resp)
assert.Nil(t, err, "the handler should not return an error")
assert.Equal(t, resp.IPv4Peers, []chihaya.Peer{chihaya.Peer{Port: 1}, chihaya.Peer{Port: 2}, chihaya.Peer{Port: 3}}, "the list of peers added from the middleware should be in the same order.")
}

64
tracker/tracker.go Normal file
View file

@ -0,0 +1,64 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.package middleware
package tracker
import (
"errors"
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/config"
)
// Tracker represents a protocol independent, middleware-composed BitTorrent
// tracker.
type Tracker struct {
cfg *config.TrackerConfig
handleAnnounce AnnounceHandler
handleScrape ScrapeHandler
}
// NewTracker parses a config and generates a Tracker composed by the middleware
// specified in the config.
func NewTracker(cfg *config.TrackerConfig) (*Tracker, error) {
var achain announceChain
for _, mwName := range cfg.AnnounceMiddleware {
mw, ok := announceMiddleware[mwName]
if !ok {
return nil, errors.New("failed to find announce middleware: " + mwName)
}
achain.Append(mw)
}
var schain scrapeChain
for _, mwName := range cfg.ScrapeMiddleware {
mw, ok := scrapeMiddleware[mwName]
if !ok {
return nil, errors.New("failed to find scrape middleware: " + mwName)
}
schain.Append(mw)
}
return &Tracker{
cfg: cfg,
handleAnnounce: achain.Handler(),
handleScrape: schain.Handler(),
}, nil
}
// HandleAnnounce runs an AnnounceRequest through a Tracker's middleware and
// returns the result.
func (t *Tracker) HandleAnnounce(req chihaya.AnnounceRequest) (*chihaya.AnnounceResponse, error) {
resp := &chihaya.AnnounceResponse{}
err := t.handleAnnounce(t.cfg, req, resp)
return resp, err
}
// HandleScrape runs a ScrapeRequest through a Tracker's middleware and returns
// the result.
func (t *Tracker) HandleScrape(req chihaya.ScrapeRequest) (*chihaya.ScrapeResponse, error) {
resp := &chihaya.ScrapeResponse{}
err := t.handleScrape(t.cfg, req, resp)
return resp, err
}