diff --git a/bittorrent/bittorrent.go b/bittorrent/bittorrent.go index 8f0e98c..241c08a 100644 --- a/bittorrent/bittorrent.go +++ b/bittorrent/bittorrent.go @@ -84,8 +84,8 @@ type AnnounceRequest struct { // response. type AnnounceResponse struct { Compact bool - Complete int32 - Incomplete int32 + Complete uint32 + Incomplete uint32 Interval time.Duration MinInterval time.Duration IPv4Peers []Peer diff --git a/frontend/http/frontend.go b/frontend/http/frontend.go index 6f976cf..2760c00 100644 --- a/frontend/http/frontend.go +++ b/frontend/http/frontend.go @@ -8,11 +8,13 @@ import ( "net/http" "time" + log "github.com/Sirupsen/logrus" "github.com/julienschmidt/httprouter" "github.com/prometheus/client_golang/prometheus" "github.com/tylerb/graceful" "github.com/chihaya/chihaya/frontend" + "github.com/chihaya/chihaya/middleware" ) func init() { @@ -163,7 +165,17 @@ func (t *Frontend) scrapeRoute(w http.ResponseWriter, r *http.Request, _ httprou return } - resp, err := t.logic.HandleScrape(context.Background(), req) + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + log.Errorln("http: unable to determine remote address for scrape:", err) + WriteError(w, err) + return + } + + ip := net.ParseIP(host) + ctx := context.WithValue(context.Background(), middleware.ScrapeIsIPv6Key, len(ip) == net.IPv6len) + + resp, err := t.logic.HandleScrape(ctx, req) if err != nil { WriteError(w, err) return diff --git a/frontend/udp/frontend.go b/frontend/udp/frontend.go index fb1017d..40fa913 100644 --- a/frontend/udp/frontend.go +++ b/frontend/udp/frontend.go @@ -15,6 +15,7 @@ import ( "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/frontend" "github.com/chihaya/chihaya/frontend/udp/bytepool" + "github.com/chihaya/chihaya/middleware" ) func init() { @@ -222,8 +223,10 @@ func (t *Frontend) handleRequest(r Request, w ResponseWriter) (actionName string return } + ctx := context.WithValue(context.Background(), middleware.ScrapeIsIPv6Key, len(r.IP) == net.IPv6len) + var resp *bittorrent.ScrapeResponse - resp, err = t.logic.HandleScrape(context.Background(), req) + resp, err = t.logic.HandleScrape(ctx, req) if err != nil { WriteError(w, txID, err) return diff --git a/frontend/udp/writer.go b/frontend/udp/writer.go index d8fc87d..f0800ed 100644 --- a/frontend/udp/writer.go +++ b/frontend/udp/writer.go @@ -37,8 +37,8 @@ func WriteAnnounce(w io.Writer, txID []byte, resp *bittorrent.AnnounceResponse, writeHeader(buf, txID, announceActionID) } binary.Write(buf, binary.BigEndian, uint32(resp.Interval/time.Second)) - binary.Write(buf, binary.BigEndian, uint32(resp.Incomplete)) - binary.Write(buf, binary.BigEndian, uint32(resp.Complete)) + binary.Write(buf, binary.BigEndian, resp.Incomplete) + binary.Write(buf, binary.BigEndian, resp.Complete) peers := resp.IPv4Peers if v6 { diff --git a/middleware/hooks.go b/middleware/hooks.go index 0cfbeac..769fb7d 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -2,8 +2,11 @@ package middleware import ( "context" + "errors" + "net" "github.com/chihaya/chihaya/bittorrent" + "github.com/chihaya/chihaya/storage" ) // Hook abstracts the concept of anything that needs to interact with a @@ -12,3 +15,113 @@ type Hook interface { HandleAnnounce(context.Context, *bittorrent.AnnounceRequest, *bittorrent.AnnounceResponse) (context.Context, error) HandleScrape(context.Context, *bittorrent.ScrapeRequest, *bittorrent.ScrapeResponse) (context.Context, error) } + +type skipSwarmInteraction struct{} + +// SkipSwarmInteractionKey is a key for the context of an Announce to control +// whether the swarm interaction middleware should run. +// Any non-nil value set for this key will cause the swarm interaction +// middleware to skip. +var SkipSwarmInteractionKey = skipSwarmInteraction{} + +type swarmInteractionHook struct { + store storage.PeerStore +} + +func (h *swarmInteractionHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { + if ctx.Value(SkipSwarmInteractionKey) != nil { + return ctx, nil + } + + switch { + case req.Event == bittorrent.Stopped: + err = h.store.DeleteSeeder(req.InfoHash, req.Peer) + if err != nil && err != storage.ErrResourceDoesNotExist { + return ctx, err + } + + err = h.store.DeleteLeecher(req.InfoHash, req.Peer) + if err != nil && err != storage.ErrResourceDoesNotExist { + return ctx, err + } + case req.Event == bittorrent.Completed: + err = h.store.GraduateLeecher(req.InfoHash, req.Peer) + return ctx, err + case req.Left == 0: + // Completed events will also have Left == 0, but by making this + // an extra case we can treat "old" seeders differently from + // graduating leechers. (Calling PutSeeder is probably faster + // than calling GraduateLeecher.) + err = h.store.PutSeeder(req.InfoHash, req.Peer) + return ctx, err + default: + err = h.store.PutLeecher(req.InfoHash, req.Peer) + return ctx, err + } + + return ctx, nil +} + +func (h *swarmInteractionHook) HandleScrape(ctx context.Context, _ *bittorrent.ScrapeRequest, _ *bittorrent.ScrapeResponse) (context.Context, error) { + // Scrapes have no effect on the swarm. + return ctx, nil +} + +// ErrInvalidIP indicates an invalid IP for an Announce. +var ErrInvalidIP = errors.New("invalid IP") + +type skipResponseHook struct{} + +// SkipResponseHookKey is a key for the context of an Announce or Scrape to +// control whether the response middleware should run. +// Any non-nil value set for this key will cause the response middleware to +// skip. +var SkipResponseHookKey = skipResponseHook{} + +type scrapeAddressType struct{} + +// ScrapeIsIPv6Key is the key under which to store whether or not the +// address used to request a scrape was an IPv6 address. +// The value is expected to be of type bool. +// A missing value or a value that is not a bool for this key is equivalent to +// it being set to false. +var ScrapeIsIPv6Key = scrapeAddressType{} + +type responseHook struct { + store storage.PeerStore +} + +func (h *responseHook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, resp *bittorrent.AnnounceResponse) (_ context.Context, err error) { + if ctx.Value(SkipResponseHookKey) != nil { + return ctx, nil + } + + s := h.store.ScrapeSwarm(req.InfoHash, len(req.IP) == net.IPv6len) + resp.Incomplete = s.Incomplete + resp.Complete = s.Complete + + switch len(req.IP) { + case net.IPv4len: + resp.IPv4Peers, err = h.store.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), req.Peer) + case net.IPv6len: + resp.IPv6Peers, err = h.store.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), req.Peer) + default: + return ctx, ErrInvalidIP + } + + return ctx, err +} + +func (h *responseHook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest, resp *bittorrent.ScrapeResponse) (context.Context, error) { + if ctx.Value(SkipResponseHookKey) != nil { + return ctx, nil + } + + v6, _ := ctx.Value(ScrapeIsIPv6Key).(bool) + + for _, infoHash := range req.InfoHashes { + resp.Files[infoHash] = h.store.ScrapeSwarm(infoHash, v6) + } + + return ctx, nil +} diff --git a/middleware/middleware.go b/middleware/middleware.go index 56cff22..77b246f 100644 --- a/middleware/middleware.go +++ b/middleware/middleware.go @@ -25,8 +25,8 @@ func NewLogic(cfg Config, peerStore storage.PeerStore, preHooks, postHooks []Hoo l := &Logic{ announceInterval: cfg.AnnounceInterval, peerStore: peerStore, - preHooks: preHooks, - postHooks: postHooks, + preHooks: append(preHooks, &responseHook{store: peerStore}), + postHooks: append(postHooks, &swarmInteractionHook{store: peerStore}), } return l @@ -44,7 +44,9 @@ type Logic struct { // HandleAnnounce generates a response for an Announce. func (l *Logic) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest) (resp *bittorrent.AnnounceResponse, err error) { resp = &bittorrent.AnnounceResponse{ - Interval: l.announceInterval, + Interval: l.announceInterval, + MinInterval: l.announceInterval, + Compact: req.Compact, } for _, h := range l.preHooks { if ctx, err = h.HandleAnnounce(ctx, req, resp); err != nil {