commit
1c3e0432d8
11 changed files with 98 additions and 64 deletions
|
@ -11,7 +11,10 @@ import (
|
|||
"github.com/chihaya/chihaya/pkg/event"
|
||||
)
|
||||
|
||||
// PeerID represents a peer ID.
|
||||
type PeerID string
|
||||
|
||||
// InfoHash represents an infohash in hexadecimal notation.
|
||||
type InfoHash string
|
||||
|
||||
// AnnounceRequest represents the parsed parameters from an announce request.
|
||||
|
|
|
@ -35,12 +35,14 @@ func constructor(srvcfg *chihaya.ServerConfig, tkr *tracker.Tracker) (server.Ser
|
|||
}
|
||||
|
||||
type httpServer struct {
|
||||
cfg *httpConfig
|
||||
tkr *tracker.Tracker
|
||||
grace *graceful.Server
|
||||
stopping bool
|
||||
cfg *httpConfig
|
||||
tkr *tracker.Tracker
|
||||
grace *graceful.Server
|
||||
}
|
||||
|
||||
// Start runs the server and blocks until it has exited.
|
||||
//
|
||||
// It panics if the server exits unexpectedly.
|
||||
func (s *httpServer) Start() {
|
||||
s.grace = &graceful.Server{
|
||||
Server: &http.Server{
|
||||
|
@ -49,9 +51,8 @@ func (s *httpServer) Start() {
|
|||
ReadTimeout: s.cfg.ReadTimeout,
|
||||
WriteTimeout: s.cfg.WriteTimeout,
|
||||
},
|
||||
Timeout: s.cfg.RequestTimeout,
|
||||
NoSignalHandling: true,
|
||||
ShutdownInitiated: func() { s.stopping = true },
|
||||
Timeout: s.cfg.RequestTimeout,
|
||||
NoSignalHandling: true,
|
||||
ConnState: func(conn net.Conn, state http.ConnState) {
|
||||
switch state {
|
||||
case http.StateNew:
|
||||
|
@ -76,20 +77,17 @@ func (s *httpServer) Start() {
|
|||
if err := s.grace.ListenAndServe(); err != nil {
|
||||
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
|
||||
log.Printf("Failed to gracefully run HTTP server: %s", err.Error())
|
||||
return
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Println("HTTP server shut down cleanly")
|
||||
}
|
||||
|
||||
// Stop stops the server and blocks until the server has exited.
|
||||
func (s *httpServer) Stop() {
|
||||
if !s.stopping {
|
||||
s.grace.Stop(s.grace.Timeout)
|
||||
}
|
||||
|
||||
s.grace = nil
|
||||
s.stopping = false
|
||||
s.grace.Stop(s.grace.Timeout)
|
||||
<-s.grace.StopChan()
|
||||
}
|
||||
|
||||
func (s *httpServer) routes() *httprouter.Router {
|
||||
|
|
|
@ -8,6 +8,8 @@ package prometheus
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
|
@ -69,6 +71,9 @@ type Server struct {
|
|||
|
||||
var _ server.Server = &Server{}
|
||||
|
||||
// Start starts the prometheus server and blocks until it exits.
|
||||
//
|
||||
// It panics if the server exits unexpectedly.
|
||||
func (s *Server) Start() {
|
||||
s.grace = &graceful.Server{
|
||||
Server: &http.Server{
|
||||
|
@ -80,8 +85,19 @@ func (s *Server) Start() {
|
|||
Timeout: s.cfg.ShutdownTimeout,
|
||||
NoSignalHandling: true,
|
||||
}
|
||||
|
||||
if err := s.grace.ListenAndServe(); err != nil {
|
||||
if opErr, ok := err.(*net.OpError); !ok || (ok && opErr.Op != "accept") {
|
||||
log.Printf("Failed to gracefully run Prometheus server: %s", err.Error())
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Println("Prometheus server shut down cleanly")
|
||||
}
|
||||
|
||||
// Stop stops the prometheus server and blocks until it exits.
|
||||
func (s *Server) Stop() {
|
||||
s.grace.Stop(s.cfg.ShutdownTimeout)
|
||||
<-s.grace.StopChan()
|
||||
}
|
||||
|
|
|
@ -46,6 +46,11 @@ func New(cfg *chihaya.ServerConfig, tkr *tracker.Tracker) (Server, error) {
|
|||
|
||||
// Server represents one instance of a server accessing the tracker.
|
||||
type Server interface {
|
||||
// Start starts a server and blocks until the server exits.
|
||||
//
|
||||
// It should panic if the server exits unexpectedly.
|
||||
Start()
|
||||
|
||||
// Stop stops a server and blocks until the server exits.
|
||||
Stop()
|
||||
}
|
||||
|
|
|
@ -50,8 +50,8 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
1,
|
||||
}
|
||||
config = store.DriverConfig{
|
||||
"memory",
|
||||
unmarshalledConfig,
|
||||
Name: "memory",
|
||||
Config: unmarshalledConfig,
|
||||
}
|
||||
d = &peerStoreDriver{}
|
||||
)
|
||||
|
@ -62,9 +62,9 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
for _, p := range peers {
|
||||
// Construct chihaya.Peer from test data.
|
||||
peer := chihaya.Peer{
|
||||
chihaya.PeerID(p.peerID),
|
||||
net.ParseIP(p.ip),
|
||||
p.port,
|
||||
ID: chihaya.PeerID(p.peerID),
|
||||
IP: net.ParseIP(p.ip),
|
||||
Port: p.port,
|
||||
}
|
||||
|
||||
if p.seeder {
|
||||
|
@ -95,9 +95,9 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
for _, p := range peers {
|
||||
// Construct chihaya.Peer from test data.
|
||||
peer := chihaya.Peer{
|
||||
chihaya.PeerID(p.peerID),
|
||||
net.ParseIP(p.ip),
|
||||
p.port,
|
||||
ID: chihaya.PeerID(p.peerID),
|
||||
IP: net.ParseIP(p.ip),
|
||||
Port: p.port,
|
||||
}
|
||||
|
||||
if p.seeder {
|
||||
|
@ -121,9 +121,9 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
for _, p := range peers {
|
||||
// Construct chihaya.Peer from test data.
|
||||
peer := chihaya.Peer{
|
||||
chihaya.PeerID(p.peerID),
|
||||
net.ParseIP(p.ip),
|
||||
p.port,
|
||||
ID: chihaya.PeerID(p.peerID),
|
||||
IP: net.ParseIP(p.ip),
|
||||
Port: p.port,
|
||||
}
|
||||
if p.seeder {
|
||||
s.PutSeeder(hash, peer)
|
||||
|
@ -136,9 +136,9 @@ func TestPeerStoreAPI(t *testing.T) {
|
|||
assert.Equal(t, 6, s.NumSeeders(hash))
|
||||
assert.Equal(t, 4, s.NumLeechers(hash))
|
||||
peer := chihaya.Peer{
|
||||
chihaya.PeerID(peers[0].peerID),
|
||||
net.ParseIP(peers[0].ip),
|
||||
peers[0].port,
|
||||
ID: chihaya.PeerID(peers[0].peerID),
|
||||
IP: net.ParseIP(peers[0].ip),
|
||||
Port: peers[0].port,
|
||||
}
|
||||
err = s.GraduateLeecher(hash, peer)
|
||||
assert.Nil(t, err)
|
||||
|
|
|
@ -13,17 +13,22 @@ import (
|
|||
func init() {
|
||||
tracker.RegisterAnnounceMiddleware("infohash_blacklist", blacklistAnnounceInfohash)
|
||||
tracker.RegisterScrapeMiddlewareConstructor("infohash_blacklist", blacklistScrapeInfohash)
|
||||
mustGetStore = func() store.StringStore {
|
||||
return store.MustGetStore()
|
||||
}
|
||||
}
|
||||
|
||||
// ErrBlockedInfohash is returned by a middleware if any of the infohashes
|
||||
// contained in an announce or scrape are disallowed.
|
||||
var ErrBlockedInfohash = tracker.ClientError("disallowed infohash")
|
||||
|
||||
var mustGetStore func() store.StringStore
|
||||
|
||||
// blacklistAnnounceInfohash provides a middleware that only allows announces
|
||||
// for infohashes that are not stored in a StringStore.
|
||||
func blacklistAnnounceInfohash(next tracker.AnnounceHandler) tracker.AnnounceHandler {
|
||||
return func(cfg *chihaya.TrackerConfig, req *chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) (err error) {
|
||||
blacklisted, err := store.MustGetStore().HasString(PrefixInfohash + string(req.InfoHash))
|
||||
blacklisted, err := mustGetStore().HasString(PrefixInfohash + string(req.InfoHash))
|
||||
if err != nil {
|
||||
return err
|
||||
} else if blacklisted {
|
||||
|
@ -63,7 +68,7 @@ func blacklistScrapeInfohash(c chihaya.MiddlewareConfig) (tracker.ScrapeMiddlewa
|
|||
func blacklistFilterScrape(next tracker.ScrapeHandler) tracker.ScrapeHandler {
|
||||
return func(cfg *chihaya.TrackerConfig, req *chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) (err error) {
|
||||
blacklisted := false
|
||||
storage := store.MustGetStore()
|
||||
storage := mustGetStore()
|
||||
infohashes := req.InfoHashes
|
||||
|
||||
for i, ih := range infohashes {
|
||||
|
@ -84,7 +89,7 @@ func blacklistFilterScrape(next tracker.ScrapeHandler) tracker.ScrapeHandler {
|
|||
func blacklistBlockScrape(next tracker.ScrapeHandler) tracker.ScrapeHandler {
|
||||
return func(cfg *chihaya.TrackerConfig, req *chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) (err error) {
|
||||
blacklisted := false
|
||||
storage := store.MustGetStore()
|
||||
storage := mustGetStore()
|
||||
|
||||
for _, ih := range req.InfoHashes {
|
||||
blacklisted, err = storage.HasString(PrefixInfohash + string(ih))
|
||||
|
|
|
@ -10,38 +10,42 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/chihaya/chihaya"
|
||||
"github.com/chihaya/chihaya/server"
|
||||
"github.com/chihaya/chihaya/server/store"
|
||||
"github.com/chihaya/chihaya/tracker"
|
||||
|
||||
_ "github.com/chihaya/chihaya/server/store/memory"
|
||||
)
|
||||
|
||||
var srv server.Server
|
||||
type storeMock struct {
|
||||
strings map[string]struct{}
|
||||
}
|
||||
|
||||
func (ss *storeMock) PutString(s string) error {
|
||||
ss.strings[s] = struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *storeMock) HasString(s string) (bool, error) {
|
||||
_, ok := ss.strings[s]
|
||||
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
func (ss *storeMock) RemoveString(s string) error {
|
||||
delete(ss.strings, s)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var mock store.StringStore = &storeMock{
|
||||
strings: make(map[string]struct{}),
|
||||
}
|
||||
|
||||
func TestASetUp(t *testing.T) {
|
||||
serverConfig := chihaya.ServerConfig{
|
||||
Name: "store",
|
||||
Config: store.Config{
|
||||
Addr: "localhost:6880",
|
||||
StringStore: store.DriverConfig{
|
||||
Name: "memory",
|
||||
},
|
||||
IPStore: store.DriverConfig{
|
||||
Name: "memory",
|
||||
},
|
||||
PeerStore: store.DriverConfig{
|
||||
Name: "memory",
|
||||
},
|
||||
},
|
||||
mustGetStore = func() store.StringStore {
|
||||
return mock
|
||||
}
|
||||
|
||||
var err error
|
||||
srv, err = server.New(&serverConfig, &tracker.Tracker{})
|
||||
assert.Nil(t, err)
|
||||
srv.Start()
|
||||
|
||||
store.MustGetStore().PutString(PrefixInfohash + "abc")
|
||||
mustGetStore().PutString(PrefixInfohash + "abc")
|
||||
}
|
||||
|
||||
func TestBlacklistAnnounceMiddleware(t *testing.T) {
|
||||
|
|
|
@ -6,7 +6,6 @@ package infohash
|
|||
|
||||
import (
|
||||
"github.com/chihaya/chihaya"
|
||||
"github.com/chihaya/chihaya/server/store"
|
||||
"github.com/chihaya/chihaya/tracker"
|
||||
)
|
||||
|
||||
|
@ -22,7 +21,7 @@ const PrefixInfohash = "ih-"
|
|||
// for infohashes that are not stored in a StringStore
|
||||
func whitelistAnnounceInfohash(next tracker.AnnounceHandler) tracker.AnnounceHandler {
|
||||
return func(cfg *chihaya.TrackerConfig, req *chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) (err error) {
|
||||
whitelisted, err := store.MustGetStore().HasString(PrefixInfohash + string(req.InfoHash))
|
||||
whitelisted, err := mustGetStore().HasString(PrefixInfohash + string(req.InfoHash))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -62,7 +61,7 @@ func whitelistScrapeInfohash(c chihaya.MiddlewareConfig) (tracker.ScrapeMiddlewa
|
|||
func whitelistFilterScrape(next tracker.ScrapeHandler) tracker.ScrapeHandler {
|
||||
return func(cfg *chihaya.TrackerConfig, req *chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) (err error) {
|
||||
whitelisted := false
|
||||
storage := store.MustGetStore()
|
||||
storage := mustGetStore()
|
||||
infohashes := req.InfoHashes
|
||||
|
||||
for i, ih := range infohashes {
|
||||
|
@ -83,7 +82,7 @@ func whitelistFilterScrape(next tracker.ScrapeHandler) tracker.ScrapeHandler {
|
|||
func whitelistBlockScrape(next tracker.ScrapeHandler) tracker.ScrapeHandler {
|
||||
return func(cfg *chihaya.TrackerConfig, req *chihaya.ScrapeRequest, resp *chihaya.ScrapeResponse) (err error) {
|
||||
whitelisted := false
|
||||
storage := store.MustGetStore()
|
||||
storage := mustGetStore()
|
||||
|
||||
for _, ih := range req.InfoHashes {
|
||||
whitelisted, err = storage.HasString(PrefixInfohash + string(ih))
|
||||
|
|
|
@ -94,7 +94,3 @@ func TestWhitelistScrapeMiddlewareFilter(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Equal(t, []chihaya.InfoHash{chihaya.InfoHash("abc")}, req.InfoHashes)
|
||||
}
|
||||
|
||||
func TestZTearDown(t *testing.T) {
|
||||
srv.Stop()
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ func constructor(srvcfg *chihaya.ServerConfig, tkr *tracker.Tracker) (server.Ser
|
|||
return theStore, nil
|
||||
}
|
||||
|
||||
// Config represents the configuration for the store.
|
||||
type Config struct {
|
||||
Addr string `yaml:"addr"`
|
||||
RequestTimeout time.Duration `yaml:"request_timeout"`
|
||||
|
@ -68,6 +69,7 @@ type Config struct {
|
|||
StringStore DriverConfig `yaml:"string_store"`
|
||||
}
|
||||
|
||||
// DriverConfig represents the configuration for a store driver.
|
||||
type DriverConfig struct {
|
||||
Name string `yaml:"name"`
|
||||
Config interface{} `yaml:"config"`
|
||||
|
@ -99,6 +101,7 @@ func MustGetStore() *Store {
|
|||
return theStore
|
||||
}
|
||||
|
||||
// Store provides storage for a tracker.
|
||||
type Store struct {
|
||||
cfg *Config
|
||||
tkr *tracker.Tracker
|
||||
|
@ -110,9 +113,14 @@ type Store struct {
|
|||
StringStore
|
||||
}
|
||||
|
||||
// Start starts the store drivers and blocks until all of them exit.
|
||||
func (s *Store) Start() {
|
||||
<-s.shutdown
|
||||
s.wg.Wait()
|
||||
log.Println("Store server shut down cleanly")
|
||||
}
|
||||
|
||||
// Stop stops the store drivers and waits for them to exit.
|
||||
func (s *Store) Stop() {
|
||||
close(s.shutdown)
|
||||
s.wg.Wait()
|
||||
|
|
|
@ -16,7 +16,7 @@ type StringStore interface {
|
|||
}
|
||||
|
||||
// StringStoreDriver represents an interface for creating a handle to the
|
||||
// storage of swarms.
|
||||
// storage of strings.
|
||||
type StringStoreDriver interface {
|
||||
New(*DriverConfig) (StringStore, error)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue