Merge pull request #165 from mrd0ll4r/add-peer

middleware: added peer store updates middleware (swarm)
This commit is contained in:
Jimmy Zelinskie 2016-05-17 03:31:13 -04:00
commit 1597d1ffd3
15 changed files with 197 additions and 33 deletions

View file

@ -86,6 +86,28 @@ type AnnounceRequest struct {
Params Params
}
// Peer4 returns a Peer using the IPv4 endpoint of the Announce.
// Note that, if the Announce does not contain an IPv4 address, the IP field of
// the returned Peer can be nil.
func (r *AnnounceRequest) Peer4() Peer {
return Peer{
IP: r.IPv4,
Port: r.Port,
ID: r.PeerID,
}
}
// Peer6 returns a Peer using the IPv6 endpoint of the Announce.
// Note that, if the Announce does not contain an IPv6 address, the IP field of
// the returned Peer can be nil.
func (r *AnnounceRequest) Peer6() Peer {
return Peer{
IP: r.IPv6,
Port: r.Port,
ID: r.PeerID,
}
}
// AnnounceResponse represents the parameters used to create an announce
// response.
type AnnounceResponse struct {

View file

@ -28,6 +28,7 @@ import (
_ "github.com/chihaya/chihaya/server/store/middleware/infohash"
_ "github.com/chihaya/chihaya/server/store/middleware/ip"
_ "github.com/chihaya/chihaya/server/store/middleware/response"
_ "github.com/chihaya/chihaya/server/store/middleware/swarm"
)
var configPath string

View file

@ -21,30 +21,36 @@ type IPStore interface {
AddNetwork(network string) error
// HasIP returns whether the given IP address is contained in the IPStore
// or belong to any of the stored networks.
// or belongs to any of the stored networks.
HasIP(ip net.IP) (bool, error)
// HasAnyIP returns whether any of the given IP addresses are contained in
// the IPStore or belong to any of the stored networks.
// HasAnyIP returns whether any of the given IP addresses are contained
// in the IPStore or belongs to any of the stored networks.
HasAnyIP(ips []net.IP) (bool, error)
// HassAllIPs returns whether all of the given IP addresses are contained in
// the IPStore or belong to any of the stored networks.
// HassAllIPs returns whether all of the given IP addresses are
// contained in the IPStore or belongs to any of the stored networks.
HasAllIPs(ips []net.IP) (bool, error)
// RemoveIP removes a single IP address from the IPStore.
//
// This wil not remove the given address from any networks it belongs to
// that are stored in the IPStore.
//
// Returns ErrResourceDoesNotExist if the given IP address is not
// contained in the store.
RemoveIP(ip net.IP) error
// RemoveNetwork removes a range of IP addresses that was previously added
// through AddNetwork.
// RemoveNetwork removes a range of IP addresses that was previously
// added through AddNetwork.
//
// The given network must not, as a string, match the previously added
// network, but rather denote the same network, e.g. if the network
// 192.168.22.255/24 was added, removing the network 192.168.22.123/24
// will succeed.
//
// Returns ErrResourceDoesNotExist if the given network is not
// contained in the store.
RemoveNetwork(network string) error
}

View file

@ -57,14 +57,14 @@ func key(ip net.IP) [16]byte {
}
func (s *ipStore) AddNetwork(network string) error {
s.Lock()
defer s.Unlock()
key, length, err := netmatch.ParseNetwork(network)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
return s.networks.Add(key, length)
}
@ -78,9 +78,9 @@ func (s *ipStore) AddIP(ip net.IP) error {
}
func (s *ipStore) HasIP(ip net.IP) (bool, error) {
key := key(ip)
s.RLock()
defer s.RUnlock()
key := key(ip)
_, ok := s.ips[key]
if ok {
@ -138,22 +138,31 @@ func (s *ipStore) HasAllIPs(ips []net.IP) (bool, error) {
}
func (s *ipStore) RemoveIP(ip net.IP) error {
key := key(ip)
s.Lock()
defer s.Unlock()
delete(s.ips, key(ip))
if _, ok := s.ips[key]; !ok {
return store.ErrResourceDoesNotExist
}
delete(s.ips, key)
return nil
}
func (s *ipStore) RemoveNetwork(network string) error {
s.Lock()
defer s.Unlock()
key, length, err := netmatch.ParseNetwork(network)
if err != nil {
return err
}
return s.networks.Remove(key, length)
s.Lock()
defer s.Unlock()
err = s.networks.Remove(key, length)
if err != nil && err == netmatch.ErrNotContained {
return store.ErrResourceDoesNotExist
}
return err
}

View file

@ -9,6 +9,7 @@ import (
"testing"
"github.com/chihaya/chihaya/server/store"
"github.com/stretchr/testify/assert"
)
@ -64,7 +65,7 @@ func TestIPStore(t *testing.T) {
// check removes
err = s.RemoveIP(v6)
assert.Nil(t, err)
assert.NotNil(t, err)
err = s.RemoveIP(v4s)
assert.Nil(t, err)

View file

@ -119,10 +119,16 @@ func (s *peerStore) DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) erro
defer shard.Unlock()
if shard.peers[key] == nil {
return nil
return store.ErrResourceDoesNotExist
}
delete(shard.peers[key], peerKey(p))
pk := peerKey(p)
if _, ok := shard.peers[key][pk]; !ok {
return store.ErrResourceDoesNotExist
}
delete(shard.peers[key], pk)
if len(shard.peers[key]) == 0 {
delete(shard.peers, key)
@ -156,10 +162,16 @@ func (s *peerStore) DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) err
defer shard.Unlock()
if shard.peers[key] == nil {
return nil
return store.ErrResourceDoesNotExist
}
delete(shard.peers[key], peerKey(p))
pk := peerKey(p)
if _, ok := shard.peers[key][pk]; !ok {
return store.ErrResourceDoesNotExist
}
delete(shard.peers[key], pk)
if len(shard.peers[key]) == 0 {
delete(shard.peers, key)

View file

@ -51,6 +51,10 @@ func (ss *stringStore) RemoveString(s string) error {
ss.Lock()
defer ss.Unlock()
if _, ok := ss.strings[s]; !ok {
return store.ErrResourceDoesNotExist
}
delete(ss.strings, s)
return nil

View file

@ -5,9 +5,11 @@
package memory
import (
"github.com/chihaya/chihaya/server/store"
"github.com/stretchr/testify/assert"
"testing"
"github.com/stretchr/testify/assert"
"github.com/chihaya/chihaya/server/store"
)
var (
@ -30,7 +32,7 @@ func TestStringStore(t *testing.T) {
assert.False(t, has)
err = ss.RemoveString(s1)
assert.Nil(t, err)
assert.NotNil(t, err)
err = ss.PutString(s1)
assert.Nil(t, err)

View file

@ -27,16 +27,14 @@ func (f FailedToRetrievePeers) Error() string { return string(f) }
func responseAnnounceClient(next tracker.AnnounceHandler) tracker.AnnounceHandler {
return func(cfg *chihaya.TrackerConfig, req *chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) (err error) {
storage := store.MustGetStore()
peer4 := chihaya.Peer{ID: req.PeerID, IP: req.IPv4, Port: req.Port}
peer6 := chihaya.Peer{ID: req.PeerID, IP: req.IPv6, Port: req.Port}
resp.MinInterval = cfg.MinAnnounceInterval
resp.Compact = req.Compact
resp.Complete = int32(storage.NumSeeders(req.InfoHash))
resp.Incomplete = int32(storage.NumLeechers(req.InfoHash))
resp.IPv4Peers, resp.IPv6Peers, err = storage.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), peer4, peer6)
resp.IPv4Peers, resp.IPv6Peers, err = storage.AnnouncePeers(req.InfoHash, req.Left == 0, int(req.NumWant), req.Peer4(), req.Peer6())
if err != nil {
return err.(FailedToRetrievePeers)
return FailedToRetrievePeers(err.Error())
}
return next(cfg, req, resp)

View file

@ -0,0 +1,12 @@
## Swarm Interaction Middleware
This package provides the announce middleware that modifies peer data stored in the `store` package.
### `store_swarm_interaction`
The `store_swarm_interaction` middleware updates the data stored in the `peerStore` based on the announce.
### Important things to notice
It is recommended to have this middleware run before the `store_response` middleware.
The `store_response` middleware assumes the store to be already updated by the announce.

View file

@ -0,0 +1,75 @@
// Copyright 2016 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package response
import (
"github.com/chihaya/chihaya"
"github.com/chihaya/chihaya/pkg/event"
"github.com/chihaya/chihaya/server/store"
"github.com/chihaya/chihaya/tracker"
)
func init() {
tracker.RegisterAnnounceMiddleware("store_swarm_interaction", announceSwarmInteraction)
}
// FailedSwarmInteraction represents an error that indicates that the
// interaction of a peer with a swarm failed.
type FailedSwarmInteraction string
// Error satisfies the error interface for FailedSwarmInteraction.
func (f FailedSwarmInteraction) Error() string { return string(f) }
// announceSwarmInteraction provides a middleware that manages swarm
// interactions for a peer based on the announce.
func announceSwarmInteraction(next tracker.AnnounceHandler) tracker.AnnounceHandler {
return func(cfg *chihaya.TrackerConfig, req *chihaya.AnnounceRequest, resp *chihaya.AnnounceResponse) (err error) {
if req.IPv4 != nil {
err = updatePeerStore(req, req.Peer4())
if err != nil {
return FailedSwarmInteraction(err.Error())
}
}
if req.IPv6 != nil {
err = updatePeerStore(req, req.Peer6())
if err != nil {
return FailedSwarmInteraction(err.Error())
}
}
return next(cfg, req, resp)
}
}
func updatePeerStore(req *chihaya.AnnounceRequest, peer chihaya.Peer) (err error) {
storage := store.MustGetStore()
switch {
case req.Event == event.Stopped:
err = storage.DeleteSeeder(req.InfoHash, peer)
if err != nil && err != store.ErrResourceDoesNotExist {
return err
}
err = storage.DeleteLeecher(req.InfoHash, peer)
if err != nil && err != store.ErrResourceDoesNotExist {
return err
}
case req.Event == event.Completed || req.Left == 0:
err = storage.GraduateLeecher(req.InfoHash, peer)
if err != nil {
return err
}
default:
err = storage.PutLeecher(req.InfoHash, peer)
if err != nil {
return err
}
}
return nil
}

View file

@ -18,25 +18,35 @@ type PeerStore interface {
// PutSeeder adds a seeder for the infoHash to the PeerStore.
PutSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error
// DeleteSeeder removes a seeder for the infoHash from the PeerStore.
//
// Returns ErrResourceDoesNotExist if the infoHash or peer does not
// exist.
DeleteSeeder(infoHash chihaya.InfoHash, p chihaya.Peer) error
// PutLeecher adds a leecher for the infoHash to the PeerStore.
PutLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error
// DeleteLeecher removes a leecher for the infoHash from the PeerStore.
//
// Returns ErrResourceDoesNotExist if the infoHash or peer does not
// exist.
DeleteLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error
// GraduateLeecher promotes a peer from a leecher to a seeder for the
// infoHash within the PeerStore.
//
// If the given Peer is not a leecher, it will still be added to the
// list of seeders and no error will be returned.
GraduateLeecher(infoHash chihaya.InfoHash, p chihaya.Peer) error
// AnnouncePeers returns a list of both IPv4, and IPv6 peers for an
// announce.
//
// If seeder is true then the peers returned will only be leechers, the
// ammount of leechers returned will be the smaller value of numWant or the
// available leechers.
// ammount of leechers returned will be the smaller value of numWant or
// the available leechers.
// If it is false then seeders will be returned up until numWant or the
// available seeders, whichever is smaller. If the available seeders is less
// than numWant then peers are returned until numWant or they run out.
// available seeders, whichever is smaller. If the available seeders is
// less than numWant then peers are returned until numWant or they run out.
AnnouncePeers(infoHash chihaya.InfoHash, seeder bool, numWant int, peer4, peer6 chihaya.Peer) (peers, peers6 []chihaya.Peer, err error)
// CollectGarbage deletes peers from the peerStore which are older than the
// cutoff time.

View file

@ -23,6 +23,10 @@ func init() {
server.Register("store", constructor)
}
// ErrResourceDoesNotExist is the error returned by all delete methods in the
// store if the requested resource does not exist.
var ErrResourceDoesNotExist = errors.New("resource does not exist")
func constructor(srvcfg *chihaya.ServerConfig, tkr *tracker.Tracker) (server.Server, error) {
if theStore == nil {
cfg, err := newConfig(srvcfg)

View file

@ -10,8 +10,16 @@ var stringStoreDrivers = make(map[string]StringStoreDriver)
// StringStore represents an interface for manipulating strings.
type StringStore interface {
// PutString adds the given string to the StringStore.
PutString(s string) error
// HasString returns whether or not the StringStore contains the given
// string.
HasString(s string) (bool, error)
// RemoveString removes the string from the string store.
// Returns ErrResourceDoesNotExist if the given string is not contained
// in the store.
RemoveString(s string) error
}

View file

@ -8,8 +8,8 @@ package tracker
import (
"errors"
"fmt"
"github.com/chihaya/chihaya"
)