Merge pull request #222 from mrd0ll4r/final-middleware
Final middleware
This commit is contained in:
commit
8b46c39bb4
6 changed files with 139 additions and 9 deletions
|
@ -84,8 +84,8 @@ type AnnounceRequest struct {
|
||||||
// response.
|
// response.
|
||||||
type AnnounceResponse struct {
|
type AnnounceResponse struct {
|
||||||
Compact bool
|
Compact bool
|
||||||
Complete int32
|
Complete uint32
|
||||||
Incomplete int32
|
Incomplete uint32
|
||||||
Interval time.Duration
|
Interval time.Duration
|
||||||
MinInterval time.Duration
|
MinInterval time.Duration
|
||||||
IPv4Peers []Peer
|
IPv4Peers []Peer
|
||||||
|
|
|
@ -8,11 +8,13 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/tylerb/graceful"
|
"github.com/tylerb/graceful"
|
||||||
|
|
||||||
"github.com/chihaya/chihaya/frontend"
|
"github.com/chihaya/chihaya/frontend"
|
||||||
|
"github.com/chihaya/chihaya/middleware"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -163,7 +165,17 @@ func (t *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprou
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := t.logic.HandleScrape(context.Background(), req)
|
host, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorln("http: unable to determine remote address for scrape:", err)
|
||||||
|
WriteError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ip := net.ParseIP(host)
|
||||||
|
ctx := context.WithValue(context.Background(), middleware.ScrapeIsIPv6Key, len(ip) == net.IPv6len)
|
||||||
|
|
||||||
|
resp, err := t.logic.HandleScrape(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
WriteError(w, err)
|
WriteError(w, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/chihaya/chihaya/bittorrent"
|
"github.com/chihaya/chihaya/bittorrent"
|
||||||
"github.com/chihaya/chihaya/frontend"
|
"github.com/chihaya/chihaya/frontend"
|
||||||
"github.com/chihaya/chihaya/frontend/udp/bytepool"
|
"github.com/chihaya/chihaya/frontend/udp/bytepool"
|
||||||
|
"github.com/chihaya/chihaya/middleware"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -222,8 +223,10 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx := context.WithValue(context.Background(), middleware.ScrapeIsIPv6Key, len(r.IP) == net.IPv6len)
|
||||||
|
|
||||||
var resp *bittorrent.ScrapeResponse
|
var resp *bittorrent.ScrapeResponse
|
||||||
resp, err = t.logic.HandleScrape(context.Background(), req)
|
resp, err = t.logic.HandleScrape(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
WriteError(w, txID, err)
|
WriteError(w, txID, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -37,8 +37,8 @@ func WriteAnnounce(w io.Writer, txID []byte, resp *bittorrent.AnnounceResponse,
|
||||||
writeHeader(buf, txID, announceActionID)
|
writeHeader(buf, txID, announceActionID)
|
||||||
}
|
}
|
||||||
binary.Write(buf, binary.BigEndian, uint32(resp.Interval/time.Second))
|
binary.Write(buf, binary.BigEndian, uint32(resp.Interval/time.Second))
|
||||||
binary.Write(buf, binary.BigEndian, uint32(resp.Incomplete))
|
binary.Write(buf, binary.BigEndian, resp.Incomplete)
|
||||||
binary.Write(buf, binary.BigEndian, uint32(resp.Complete))
|
binary.Write(buf, binary.BigEndian, resp.Complete)
|
||||||
|
|
||||||
peers := resp.IPv4Peers
|
peers := resp.IPv4Peers
|
||||||
if v6 {
|
if v6 {
|
||||||
|
|
|
@ -2,8 +2,11 @@ package middleware
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"net"
|
||||||
|
|
||||||
"github.com/chihaya/chihaya/bittorrent"
|
"github.com/chihaya/chihaya/bittorrent"
|
||||||
|
"github.com/chihaya/chihaya/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Hook abstracts the concept of anything that needs to interact with a
|
// Hook abstracts the concept of anything that needs to interact with a
|
||||||
|
@ -12,3 +15,113 @@ type Hook interface {
|
||||||
HandleAnnounce(context.Context, *bittorrent.AnnounceRequest, *bittorrent.AnnounceResponse) (context.Context, error)
|
HandleAnnounce(context.Context, *bittorrent.AnnounceRequest, *bittorrent.AnnounceResponse) (context.Context, error)
|
||||||
HandleScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse) (context.Context, error)
|
HandleScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse) (context.Context, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type skipSwarmInteraction struct{}
|
||||||
|
|
||||||
|
// SkipSwarmInteractionKey is a key for the context of an Announce to control
|
||||||
|
// whether the swarm interaction middleware should run.
|
||||||
|
// Any non-nil value set for this key will cause the swarm interaction
|
||||||
|
// middleware to skip.
|
||||||
|
var SkipSwarmInteractionKey = skipSwarmInteraction{}
|
||||||
|
|
||||||
|
type swarmInteractionHook struct {
|
||||||
|
store storage.PeerStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {
|
||||||
|
if ctx.Value(SkipSwarmInteractionKey) != nil {
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case req.Event == bittorrent.Stopped:
|
||||||
|
err = h.store.DeleteSeeder(req.InfoHash, req.Peer)
|
||||||
|
if err != nil && err != storage.ErrResourceDoesNotExist {
|
||||||
|
return ctx, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = h.store.DeleteLeecher(req.InfoHash, req.Peer)
|
||||||
|
if err != nil && err != storage.ErrResourceDoesNotExist {
|
||||||
|
return ctx, err
|
||||||
|
}
|
||||||
|
case req.Event == bittorrent.Completed:
|
||||||
|
err = h.store.GraduateLeecher(req.InfoHash, req.Peer)
|
||||||
|
return ctx, err
|
||||||
|
case req.Left == 0:
|
||||||
|
// Completed events will also have Left == 0, but by making this
|
||||||
|
// an extra case we can treat "old" seeders differently from
|
||||||
|
// graduating leechers. (Calling PutSeeder is probably faster
|
||||||
|
// than calling GraduateLeecher.)
|
||||||
|
err = h.store.PutSeeder(req.InfoHash, req.Peer)
|
||||||
|
return ctx, err
|
||||||
|
default:
|
||||||
|
err = h.store.PutLeecher(req.InfoHash, req.Peer)
|
||||||
|
return ctx, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *swarmInteractionHook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) {
|
||||||
|
// Scrapes have no effect on the swarm.
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ErrInvalidIP indicates an invalid IP for an Announce.
|
||||||
|
var ErrInvalidIP = errors.New("invalid IP")
|
||||||
|
|
||||||
|
type skipResponseHook struct{}
|
||||||
|
|
||||||
|
// SkipResponseHookKey is a key for the context of an Announce or Scrape to
|
||||||
|
// control whether the response middleware should run.
|
||||||
|
// Any non-nil value set for this key will cause the response middleware to
|
||||||
|
// skip.
|
||||||
|
var SkipResponseHookKey = skipResponseHook{}
|
||||||
|
|
||||||
|
type scrapeAddressType struct{}
|
||||||
|
|
||||||
|
// ScrapeIsIPv6Key is the key under which to store whether or not the
|
||||||
|
// address used to request a scrape was an IPv6 address.
|
||||||
|
// The value is expected to be of type bool.
|
||||||
|
// A missing value or a value that is not a bool for this key is equivalent to
|
||||||
|
// it being set to false.
|
||||||
|
var ScrapeIsIPv6Key = scrapeAddressType{}
|
||||||
|
|
||||||
|
type responseHook struct {
|
||||||
|
store storage.PeerStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) {
|
||||||
|
if ctx.Value(SkipResponseHookKey) != nil {
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
s := h.store.ScrapeSwarm(req.InfoHash, len(req.IP) == net.IPv6len)
|
||||||
|
resp.Incomplete = s.Incomplete
|
||||||
|
resp.Complete = s.Complete
|
||||||
|
|
||||||
|
switch len(req.IP) {
|
||||||
|
case net.IPv4len:
|
||||||
|
resp.IPv4Peers, err = h.store.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), req.Peer)
|
||||||
|
case net.IPv6len:
|
||||||
|
resp.IPv6Peers, err = h.store.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), req.Peer)
|
||||||
|
default:
|
||||||
|
return ctx, ErrInvalidIP
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) {
|
||||||
|
if ctx.Value(SkipResponseHookKey) != nil {
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
v6, _ := ctx.Value(ScrapeIsIPv6Key).(bool)
|
||||||
|
|
||||||
|
for _, infoHash := range req.InfoHashes {
|
||||||
|
resp.Files[infoHash] = h.store.ScrapeSwarm(infoHash, v6)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx, nil
|
||||||
|
}
|
||||||
|
|
|
@ -25,8 +25,8 @@ func NewLogic(cfg Config, peerStore storage.PeerStore, preHooks, postHooks []Hoo
|
||||||
l := &Logic{
|
l := &Logic{
|
||||||
announceInterval: cfg.AnnounceInterval,
|
announceInterval: cfg.AnnounceInterval,
|
||||||
peerStore: peerStore,
|
peerStore: peerStore,
|
||||||
preHooks: preHooks,
|
preHooks: append(preHooks, &responseHook{store: peerStore}),
|
||||||
postHooks: postHooks,
|
postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}),
|
||||||
}
|
}
|
||||||
|
|
||||||
return l
|
return l
|
||||||
|
@ -44,7 +44,9 @@ type Logic struct {
|
||||||
// HandleAnnounce generates a response for an Announce.
|
// HandleAnnounce generates a response for an Announce.
|
||||||
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (resp *bittorrent.AnnounceResponse, err error) {
|
func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (resp *bittorrent.AnnounceResponse, err error) {
|
||||||
resp = &bittorrent.AnnounceResponse{
|
resp = &bittorrent.AnnounceResponse{
|
||||||
Interval: l.announceInterval,
|
Interval: l.announceInterval,
|
||||||
|
MinInterval: l.announceInterval,
|
||||||
|
Compact: req.Compact,
|
||||||
}
|
}
|
||||||
for _, h := range l.preHooks {
|
for _, h := range l.preHooks {
|
||||||
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
|
if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {
|
||||||
|
|
Loading…
Reference in a new issue