Merge pull request #301 from jzelinskie/memorybysubnet
Storage Drivers (starting w/ subnet memory)
This commit is contained in:
commit
edef032381
8 changed files with 1153 additions and 151 deletions
|
@ -7,13 +7,16 @@ import (
|
|||
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
httpfrontend "github.com/chihaya/chihaya/frontend/http"
|
||||
udpfrontend "github.com/chihaya/chihaya/frontend/udp"
|
||||
"github.com/chihaya/chihaya/frontend/http"
|
||||
"github.com/chihaya/chihaya/frontend/udp"
|
||||
"github.com/chihaya/chihaya/middleware"
|
||||
"github.com/chihaya/chihaya/middleware/clientapproval"
|
||||
"github.com/chihaya/chihaya/middleware/jwt"
|
||||
"github.com/chihaya/chihaya/middleware/varinterval"
|
||||
"github.com/chihaya/chihaya/storage/memory"
|
||||
|
||||
// Imported to register as Storage Drivers.
|
||||
_ "github.com/chihaya/chihaya/storage/memory"
|
||||
_ "github.com/chihaya/chihaya/storage/memorybysubnet"
|
||||
)
|
||||
|
||||
type hookConfig struct {
|
||||
|
@ -33,15 +36,20 @@ func (hookCfgs hookConfigs) Names() (hookNames []string) {
|
|||
return
|
||||
}
|
||||
|
||||
type storageConfig struct {
|
||||
Name string `yaml:"name"`
|
||||
Config interface{} `yaml:"config"`
|
||||
}
|
||||
|
||||
// Config represents the configuration used for executing Chihaya.
|
||||
type Config struct {
|
||||
middleware.Config `yaml:",inline"`
|
||||
PrometheusAddr string `yaml:"prometheus_addr"`
|
||||
HTTPConfig httpfrontend.Config `yaml:"http"`
|
||||
UDPConfig udpfrontend.Config `yaml:"udp"`
|
||||
Storage memory.Config `yaml:"storage"`
|
||||
PreHooks hookConfigs `yaml:"prehooks"`
|
||||
PostHooks hookConfigs `yaml:"posthooks"`
|
||||
PrometheusAddr string `yaml:"prometheus_addr"`
|
||||
HTTPConfig http.Config `yaml:"http"`
|
||||
UDPConfig udp.Config `yaml:"udp"`
|
||||
Storage storageConfig `yaml:"storage"`
|
||||
PreHooks hookConfigs `yaml:"prehooks"`
|
||||
PostHooks hookConfigs `yaml:"posthooks"`
|
||||
}
|
||||
|
||||
// CreateHooks creates instances of Hooks for all of the PreHooks and PostHooks
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
"github.com/chihaya/chihaya/pkg/prometheus"
|
||||
"github.com/chihaya/chihaya/pkg/stop"
|
||||
"github.com/chihaya/chihaya/storage"
|
||||
"github.com/chihaya/chihaya/storage/memory"
|
||||
)
|
||||
|
||||
// Run represents the state of a running instance of Chihaya.
|
||||
|
@ -53,11 +52,11 @@ func (r *Run) Start(ps storage.PeerStore) error {
|
|||
r.sg.Add(prometheus.NewServer(cfg.PrometheusAddr))
|
||||
|
||||
if ps == nil {
|
||||
log.WithFields(cfg.Storage.LogFields()).Info("starting storage")
|
||||
ps, err = memory.New(cfg.Storage)
|
||||
ps, err = storage.NewPeerStore(cfg.Storage.Name, cfg.Storage.Config)
|
||||
if err != nil {
|
||||
return errors.New("failed to create memory storage: " + err.Error())
|
||||
}
|
||||
log.WithFields(ps.LogFields()).Info("started storage")
|
||||
}
|
||||
r.peerStore = ps
|
||||
|
||||
|
|
51
docs/storage/memorybysubnet.md
Normal file
51
docs/storage/memorybysubnet.md
Normal file
|
@ -0,0 +1,51 @@
|
|||
# Memory Subnet Storage
|
||||
|
||||
This storage system stores all peer data ephemerally in memory and prioritizes peers in the same subnet.
|
||||
|
||||
## Use Case
|
||||
|
||||
When the network being used for BitTorrent traffic is organized such that IP address can be mapped to physical location, this storage will encourage peers to transfer data between physically closer peers.
|
||||
|
||||
## Configuration
|
||||
|
||||
```yaml
|
||||
chihaya:
|
||||
storage:
|
||||
name: memorybysubnet
|
||||
config:
|
||||
# The frequency which stale peers are removed.
|
||||
gc_interval: 14m
|
||||
|
||||
# The frequency which metrics are pushed into a local Prometheus endpoint.
|
||||
prometheus_reporting_interval: 1s
|
||||
|
||||
# The amount of time until a peer is considered stale.
|
||||
# To avoid churn, keep this slightly larger than `announce_interval`
|
||||
peer_lifetime: 16m
|
||||
|
||||
# The number of partitions data will be divided into in order to provide a
|
||||
# higher degree of parallelism.
|
||||
shard_count: 1024
|
||||
|
||||
# The number of bits that are used to mask IPv4 peers' addresses such that peers with the same mask are returned first from announces.
|
||||
preferred_ipv4_subnet_mask_bits_set: 8
|
||||
|
||||
# The number of bits that are used to mask IPv6 peers' addresses such that peers with the same mask are returned first from announces.
|
||||
preferred_ipv6_subnet_mask_bits_set: 16
|
||||
```
|
||||
|
||||
## Implementation
|
||||
|
||||
The implementation of this storage strives to remain as similar to the `memory` storage system as possible.
|
||||
|
||||
Seeders and Leechers for a particular InfoHash are organized into maps by subnet (and then mapped to their last announce time):
|
||||
|
||||
```go
|
||||
type swarm struct {
|
||||
seeders map[peerSubnet]map[serializedPeer]int64
|
||||
leechers map[peerSubnet]map[serializedPeer]int64
|
||||
}
|
||||
```
|
||||
|
||||
This causes the allocation and maintenance overhead of many extra maps.
|
||||
Expect approximately a x2 slowdown in performance compared to `memory`.
|
|
@ -67,20 +67,22 @@ chihaya:
|
|||
|
||||
# This block defines configuration used for the storage of peer data.
|
||||
storage:
|
||||
# The frequency which stale peers are removed.
|
||||
gc_interval: 14m
|
||||
name: memory
|
||||
config:
|
||||
# The frequency which stale peers are removed.
|
||||
gc_interval: 14m
|
||||
|
||||
# The amount of time until a peer is considered stale.
|
||||
# To avoid churn, keep this slightly larger than `announce_interval`
|
||||
peer_lifetime: 16m
|
||||
# The amount of time until a peer is considered stale.
|
||||
# To avoid churn, keep this slightly larger than `announce_interval`
|
||||
peer_lifetime: 16m
|
||||
|
||||
# The number of partitions data will be divided into in order to provide a
|
||||
# higher degree of parallelism.
|
||||
shard_count: 1024
|
||||
# The number of partitions data will be divided into in order to provide a
|
||||
# higher degree of parallelism.
|
||||
shards: 1024
|
||||
|
||||
# The interval at which metrics about the number of infohashes and peers
|
||||
# are collected and posted to Prometheus.
|
||||
prometheus_reporting_interval: 1s
|
||||
# The interval at which metrics about the number of infohashes and peers
|
||||
# are collected and posted to Prometheus.
|
||||
prometheus_reporting_interval: 1s
|
||||
|
||||
# This block defines configuration used for middleware executed before a
|
||||
# response has been returned to a BitTorrent client.
|
||||
|
|
|
@ -1,25 +1,38 @@
|
|||
// Package memory implements the storage interface for a Chihaya
|
||||
// BitTorrent tracker keeping peer data in memory.
|
||||
package memory
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"net"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/chihaya/chihaya/bittorrent"
|
||||
"github.com/chihaya/chihaya/storage"
|
||||
)
|
||||
|
||||
// Name is the name by which this peer store is registered with Chihaya.
|
||||
const Name = "memory"
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(promGCDurationMilliseconds)
|
||||
prometheus.MustRegister(promInfohashesCount)
|
||||
prometheus.MustRegister(promSeedersCount, promLeechersCount)
|
||||
// Register Prometheus metrics.
|
||||
prometheus.MustRegister(
|
||||
promGCDurationMilliseconds,
|
||||
promInfohashesCount,
|
||||
promSeedersCount,
|
||||
promLeechersCount,
|
||||
)
|
||||
|
||||
// Register the storage driver.
|
||||
storage.RegisterDriver(Name, driver{})
|
||||
}
|
||||
|
||||
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
|
@ -30,7 +43,7 @@ var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpt
|
|||
|
||||
var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "chihaya_storage_infohashes_count",
|
||||
Help: "The number of infohashes tracked",
|
||||
Help: "The number of Infohashes tracked",
|
||||
})
|
||||
|
||||
var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
|
@ -48,9 +61,24 @@ func recordGCDuration(duration time.Duration) {
|
|||
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
|
||||
}
|
||||
|
||||
// ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is
|
||||
// less than or equal to zero.
|
||||
var ErrInvalidGCInterval = errors.New("invalid garbage collection interval")
|
||||
type driver struct{}
|
||||
|
||||
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
|
||||
// Marshal the config back into bytes.
|
||||
bytes, err := yaml.Marshal(icfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Unmarshal the bytes into the proper config type.
|
||||
var cfg Config
|
||||
err = yaml.Unmarshal(bytes, &cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return New(cfg)
|
||||
}
|
||||
|
||||
// Config holds the configuration of a memory PeerStore.
|
||||
type Config struct {
|
||||
|
@ -63,41 +91,72 @@ type Config struct {
|
|||
// LogFields renders the current config as a set of Logrus fields.
|
||||
func (cfg Config) LogFields() log.Fields {
|
||||
return log.Fields{
|
||||
"gcInterval": cfg.GarbageCollectionInterval,
|
||||
"peerLifetime": cfg.PeerLifetime,
|
||||
"shardCount": cfg.ShardCount,
|
||||
"name": Name,
|
||||
"gcInterval": cfg.GarbageCollectionInterval,
|
||||
"promReportInterval": cfg.PrometheusReportingInterval,
|
||||
"peerLifetime": cfg.PeerLifetime,
|
||||
"shardCount": cfg.ShardCount,
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new PeerStore backed by memory.
|
||||
func New(cfg Config) (storage.PeerStore, error) {
|
||||
var shardCount int
|
||||
// Validate sanity checks values set in a config and returns a new config with
|
||||
// default values replacing anything that is invalid.
|
||||
//
|
||||
// This function warns to the logger when a value is changed.
|
||||
func (cfg Config) Validate() Config {
|
||||
validcfg := cfg
|
||||
if cfg.ShardCount > 0 {
|
||||
shardCount = cfg.ShardCount
|
||||
validcfg.ShardCount = cfg.ShardCount
|
||||
} else {
|
||||
log.Warnln("storage: shardCount not configured, using 1 as default value.")
|
||||
shardCount = 1
|
||||
validcfg.ShardCount = 1024
|
||||
log.WithFields(log.Fields{
|
||||
"name": Name + ".ShardCount",
|
||||
"provided": strconv.Itoa(cfg.ShardCount),
|
||||
"default": strconv.Itoa(validcfg.ShardCount),
|
||||
}).Warnln("falling back to default configuration")
|
||||
}
|
||||
|
||||
if cfg.GarbageCollectionInterval <= 0 {
|
||||
return nil, ErrInvalidGCInterval
|
||||
validcfg.GarbageCollectionInterval = time.Minute * 14
|
||||
log.WithFields(log.Fields{
|
||||
"name": Name + ".GarbageCollectionInterval",
|
||||
"provided": cfg.GarbageCollectionInterval,
|
||||
"default": validcfg.GarbageCollectionInterval,
|
||||
}).Warnln("falling back to default configuration")
|
||||
}
|
||||
|
||||
if cfg.PrometheusReportingInterval <= 0 {
|
||||
validcfg.PrometheusReportingInterval = time.Second * 1
|
||||
log.WithFields(log.Fields{
|
||||
"name": Name + ".PrometheusReportingInterval",
|
||||
"provided": cfg.PrometheusReportingInterval,
|
||||
"default": validcfg.PrometheusReportingInterval,
|
||||
}).Warnln("falling back to default configuration")
|
||||
}
|
||||
|
||||
return validcfg
|
||||
}
|
||||
|
||||
// New creates a new PeerStore backed by memory.
|
||||
func New(provided Config) (storage.PeerStore, error) {
|
||||
cfg := provided.Validate()
|
||||
ps := &peerStore{
|
||||
shards: make([]*peerShard, shardCount*2),
|
||||
closing: make(chan struct{}),
|
||||
cfg: cfg,
|
||||
shards: make([]*peerShard, cfg.ShardCount*2),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
for i := 0; i < shardCount*2; i++ {
|
||||
for i := 0; i < cfg.ShardCount*2; i++ {
|
||||
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
|
||||
}
|
||||
|
||||
// Start a goroutine for garbage collection.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
return
|
||||
case <-time.After(cfg.GarbageCollectionInterval):
|
||||
before := time.Now().Add(-cfg.PeerLifetime)
|
||||
|
@ -107,13 +166,14 @@ func New(cfg Config) (storage.PeerStore, error) {
|
|||
}
|
||||
}()
|
||||
|
||||
// Start a goroutine for updating our cached system clock.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
t := time.NewTicker(1 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
t.Stop()
|
||||
return
|
||||
case now := <-t.C:
|
||||
|
@ -122,17 +182,14 @@ func New(cfg Config) (storage.PeerStore, error) {
|
|||
}
|
||||
}()
|
||||
|
||||
// Start a goroutine for reporting statistics to Prometheus.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
if cfg.PrometheusReportingInterval <= 0 {
|
||||
cfg.PrometheusReportingInterval = 1
|
||||
log.Warn("storage: PrometheusReportingInterval not specified/invalid, defaulting to 1 second")
|
||||
}
|
||||
t := time.NewTicker(cfg.PrometheusReportingInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
|
@ -148,67 +205,6 @@ func New(cfg Config) (storage.PeerStore, error) {
|
|||
|
||||
type serializedPeer string
|
||||
|
||||
type peerShard struct {
|
||||
swarms map[bittorrent.InfoHash]swarm
|
||||
numSeeders uint64
|
||||
numLeechers uint64
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type swarm struct {
|
||||
// map serialized peer to mtime
|
||||
seeders map[serializedPeer]int64
|
||||
leechers map[serializedPeer]int64
|
||||
}
|
||||
|
||||
type peerStore struct {
|
||||
shards []*peerShard
|
||||
closing chan struct{}
|
||||
// clock stores the current time nanoseconds, updated every second.
|
||||
// Must be accessed atomically!
|
||||
clock int64
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// populateProm aggregates metrics over all shards and then posts them to
|
||||
// prometheus.
|
||||
func (ps *peerStore) populateProm() {
|
||||
var numInfohashes, numSeeders, numLeechers uint64
|
||||
|
||||
for _, s := range ps.shards {
|
||||
s.RLock()
|
||||
numInfohashes += uint64(len(s.swarms))
|
||||
numSeeders += s.numSeeders
|
||||
numLeechers += s.numLeechers
|
||||
s.RUnlock()
|
||||
}
|
||||
|
||||
promInfohashesCount.Set(float64(numInfohashes))
|
||||
promSeedersCount.Set(float64(numSeeders))
|
||||
promLeechersCount.Set(float64(numLeechers))
|
||||
}
|
||||
|
||||
var _ storage.PeerStore = &peerStore{}
|
||||
|
||||
func (ps *peerStore) getClock() int64 {
|
||||
return atomic.LoadInt64(&ps.clock)
|
||||
}
|
||||
|
||||
func (ps *peerStore) setClock(to int64) {
|
||||
atomic.StoreInt64(&ps.clock, to)
|
||||
}
|
||||
|
||||
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
||||
// There are twice the amount of shards specified by the user, the first
|
||||
// half is dedicated to IPv4 swarms and the second half is dedicated to
|
||||
// IPv6 swarms.
|
||||
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
|
||||
if af == bittorrent.IPv6 {
|
||||
idx += uint32(len(ps.shards) / 2)
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
func newPeerKey(p bittorrent.Peer) serializedPeer {
|
||||
b := make([]byte, 20+2+len(p.IP.IP))
|
||||
copy(b[:20], p.ID[:])
|
||||
|
@ -236,9 +232,73 @@ func decodePeerKey(pk serializedPeer) bittorrent.Peer {
|
|||
return peer
|
||||
}
|
||||
|
||||
type peerShard struct {
|
||||
swarms map[bittorrent.InfoHash]swarm
|
||||
numSeeders uint64
|
||||
numLeechers uint64
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type swarm struct {
|
||||
// map serialized peer to mtime
|
||||
seeders map[serializedPeer]int64
|
||||
leechers map[serializedPeer]int64
|
||||
}
|
||||
|
||||
type peerStore struct {
|
||||
cfg Config
|
||||
shards []*peerShard
|
||||
|
||||
// clock stores the current time nanoseconds, updated every second.
|
||||
// Must be accessed atomically!
|
||||
clock int64
|
||||
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
var _ storage.PeerStore = &peerStore{}
|
||||
|
||||
// populateProm aggregates metrics over all shards and then posts them to
|
||||
// prometheus.
|
||||
func (ps *peerStore) populateProm() {
|
||||
var numInfohashes, numSeeders, numLeechers uint64
|
||||
|
||||
for _, s := range ps.shards {
|
||||
s.RLock()
|
||||
numInfohashes += uint64(len(s.swarms))
|
||||
numSeeders += s.numSeeders
|
||||
numLeechers += s.numLeechers
|
||||
s.RUnlock()
|
||||
}
|
||||
|
||||
promInfohashesCount.Set(float64(numInfohashes))
|
||||
promSeedersCount.Set(float64(numSeeders))
|
||||
promLeechersCount.Set(float64(numLeechers))
|
||||
}
|
||||
|
||||
func (ps *peerStore) getClock() int64 {
|
||||
return atomic.LoadInt64(&ps.clock)
|
||||
}
|
||||
|
||||
func (ps *peerStore) setClock(to int64) {
|
||||
atomic.StoreInt64(&ps.clock, to)
|
||||
}
|
||||
|
||||
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
||||
// There are twice the amount of shards specified by the user, the first
|
||||
// half is dedicated to IPv4 swarms and the second half is dedicated to
|
||||
// IPv6 swarms.
|
||||
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
|
||||
if af == bittorrent.IPv6 {
|
||||
idx += uint32(len(ps.shards) / 2)
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
@ -255,10 +315,12 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
|
|||
}
|
||||
}
|
||||
|
||||
// If this peer isn't already a seeder, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
|
||||
// new peer
|
||||
shard.numSeeders++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].seeders[pk] = ps.getClock()
|
||||
|
||||
shard.Unlock()
|
||||
|
@ -267,7 +329,7 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
|
|||
|
||||
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
@ -287,11 +349,8 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
|
|||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
if _, ok := shard.swarms[ih].seeders[pk]; ok {
|
||||
// seeder actually removed
|
||||
shard.numSeeders--
|
||||
delete(shard.swarms[ih].seeders, pk)
|
||||
}
|
||||
shard.numSeeders--
|
||||
delete(shard.swarms[ih].seeders, pk)
|
||||
|
||||
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
|
@ -303,7 +362,7 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
|
|||
|
||||
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
@ -320,10 +379,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
|
|||
}
|
||||
}
|
||||
|
||||
// If this peer isn't already a leecher, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].leechers[pk]; !ok {
|
||||
// new leecher
|
||||
shard.numLeechers++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].leechers[pk] = ps.getClock()
|
||||
|
||||
shard.Unlock()
|
||||
|
@ -332,7 +393,7 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
|
|||
|
||||
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
@ -352,11 +413,8 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
|
|||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
if _, ok := shard.swarms[ih].leechers[pk]; ok {
|
||||
// leecher actually removed
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers, pk)
|
||||
}
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers, pk)
|
||||
|
||||
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
|
@ -368,7 +426,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
|
|||
|
||||
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
@ -385,16 +443,18 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
|
|||
}
|
||||
}
|
||||
|
||||
// If this peer is a leecher, update the stats for the swarm and remove them.
|
||||
if _, ok := shard.swarms[ih].leechers[pk]; ok {
|
||||
// leecher actually removed
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers, pk)
|
||||
}
|
||||
|
||||
// If this peer isn't already a seeder, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
|
||||
// new seeder
|
||||
shard.numSeeders++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].seeders[pk] = ps.getClock()
|
||||
|
||||
shard.Unlock()
|
||||
|
@ -403,7 +463,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
|
|||
|
||||
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
@ -419,41 +479,40 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
|
|||
if seeder {
|
||||
// Append leechers as possible.
|
||||
leechers := shard.swarms[ih].leechers
|
||||
for p := range leechers {
|
||||
decodedPeer := decodePeerKey(p)
|
||||
for pk := range leechers {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodedPeer)
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
} else {
|
||||
// Append as many seeders as possible.
|
||||
seeders := shard.swarms[ih].seeders
|
||||
for p := range seeders {
|
||||
decodedPeer := decodePeerKey(p)
|
||||
for pk := range seeders {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodedPeer)
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
|
||||
// Append leechers until we reach numWant.
|
||||
leechers := shard.swarms[ih].leechers
|
||||
if numWant > 0 {
|
||||
for p := range leechers {
|
||||
decodedPeer := decodePeerKey(p)
|
||||
leechers := shard.swarms[ih].leechers
|
||||
announcerPK := newPeerKey(announcer)
|
||||
for pk := range leechers {
|
||||
if pk == announcerPK {
|
||||
continue
|
||||
}
|
||||
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if decodedPeer.Equal(announcer) {
|
||||
continue
|
||||
}
|
||||
peers = append(peers, decodedPeer)
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
}
|
||||
|
@ -465,7 +524,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
|
|||
|
||||
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
@ -493,13 +552,14 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren
|
|||
// are being executed in parallel.
|
||||
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
|
||||
select {
|
||||
case <-ps.closing:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
case <-ps.closed:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
cutoffUnix := cutoff.UnixNano()
|
||||
start := time.Now()
|
||||
|
||||
for _, shard := range ps.shards {
|
||||
shard.RLock()
|
||||
var infohashes []bittorrent.InfoHash
|
||||
|
@ -520,15 +580,15 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
|
|||
|
||||
for pk, mtime := range shard.swarms[ih].leechers {
|
||||
if mtime <= cutoffUnix {
|
||||
delete(shard.swarms[ih].leechers, pk)
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers, pk)
|
||||
}
|
||||
}
|
||||
|
||||
for pk, mtime := range shard.swarms[ih].seeders {
|
||||
if mtime <= cutoffUnix {
|
||||
delete(shard.swarms[ih].seeders, pk)
|
||||
shard.numSeeders--
|
||||
delete(shard.swarms[ih].seeders, pk)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -551,7 +611,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
|
|||
func (ps *peerStore) Stop() <-chan error {
|
||||
c := make(chan error)
|
||||
go func() {
|
||||
close(ps.closing)
|
||||
close(ps.closed)
|
||||
ps.wg.Wait()
|
||||
|
||||
// Explicitly deallocate our storage.
|
||||
|
@ -566,3 +626,7 @@ func (ps *peerStore) Stop() <-chan error {
|
|||
|
||||
return c
|
||||
}
|
||||
|
||||
func (ps *peerStore) LogFields() log.Fields {
|
||||
return ps.cfg.LogFields()
|
||||
}
|
||||
|
|
768
storage/memorybysubnet/peer_store.go
Normal file
768
storage/memorybysubnet/peer_store.go
Normal file
|
@ -0,0 +1,768 @@
|
|||
// Package memorybysubnet implements the storage interface for a Chihaya
|
||||
// BitTorrent tracker keeping peer data in memory organized by a pre-configured
|
||||
// subnet mask.
|
||||
package memorybysubnet
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"net"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/chihaya/chihaya/bittorrent"
|
||||
"github.com/chihaya/chihaya/storage"
|
||||
)
|
||||
|
||||
// Name is the name by which this peer store is registered with Chihaya.
|
||||
const Name = "memorybysubnet"
|
||||
|
||||
func init() {
|
||||
// Register Prometheus metrics.
|
||||
prometheus.MustRegister(
|
||||
promGCDurationMilliseconds,
|
||||
promInfohashesCount,
|
||||
promSeedersCount,
|
||||
promLeechersCount,
|
||||
)
|
||||
|
||||
// Register the storage driver.
|
||||
storage.RegisterDriver(Name, driver{})
|
||||
}
|
||||
|
||||
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Name: "chihaya_storage_gc_duration_milliseconds",
|
||||
Help: "The time it takes to perform storage garbage collection",
|
||||
Buckets: prometheus.ExponentialBuckets(9.375, 2, 10),
|
||||
})
|
||||
|
||||
var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "chihaya_storage_infohashes_count",
|
||||
Help: "The number of Infohashes tracked",
|
||||
})
|
||||
|
||||
var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "chihaya_storage_seeders_count",
|
||||
Help: "The number of seeders tracked",
|
||||
})
|
||||
|
||||
var promLeechersCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "chihaya_storage_leechers_count",
|
||||
Help: "The number of leechers tracked",
|
||||
})
|
||||
|
||||
// recordGCDuration records the duration of a GC sweep.
|
||||
func recordGCDuration(duration time.Duration) {
|
||||
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
|
||||
}
|
||||
|
||||
type driver struct{}
|
||||
|
||||
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
|
||||
// Marshal the config back into bytes.
|
||||
bytes, err := yaml.Marshal(icfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Unmarshal the bytes into the proper config type.
|
||||
var cfg Config
|
||||
err = yaml.Unmarshal(bytes, &cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return New(cfg)
|
||||
}
|
||||
|
||||
// Config holds the configuration of a memory PeerStore.
|
||||
type Config struct {
|
||||
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
|
||||
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
|
||||
PeerLifetime time.Duration `yaml:"peer_lifetime"`
|
||||
ShardCount int `yaml:"shard_count"`
|
||||
PreferredIPv4SubnetMaskBitsSet int `yaml:"preferred_ipv4_subnet_mask_bits_set"`
|
||||
PreferredIPv6SubnetMaskBitsSet int `yaml:"preferred_ipv6_subnet_mask_bits_set"`
|
||||
}
|
||||
|
||||
// LogFields renders the current config as a set of Logrus fields.
|
||||
func (cfg Config) LogFields() log.Fields {
|
||||
return log.Fields{
|
||||
"name": Name,
|
||||
"gcInterval": cfg.GarbageCollectionInterval,
|
||||
"promReportInterval": cfg.PrometheusReportingInterval,
|
||||
"peerLifetime": cfg.PeerLifetime,
|
||||
"shardCount": cfg.ShardCount,
|
||||
"prefIPv4Mask": cfg.PreferredIPv4SubnetMaskBitsSet,
|
||||
"prefIPv6Mask": cfg.PreferredIPv6SubnetMaskBitsSet,
|
||||
}
|
||||
}
|
||||
|
||||
// Validate sanity checks values set in a config and returns a new config with
|
||||
// default values replacing anything that is invalid.
|
||||
//
|
||||
// This function warns to the logger when a value is changed.
|
||||
func (cfg Config) Validate() Config {
|
||||
validcfg := cfg
|
||||
if cfg.ShardCount > 0 {
|
||||
validcfg.ShardCount = cfg.ShardCount
|
||||
} else {
|
||||
validcfg.ShardCount = 1024
|
||||
log.WithFields(log.Fields{
|
||||
"name": Name + ".ShardCount",
|
||||
"provided": strconv.Itoa(cfg.ShardCount),
|
||||
"default": strconv.Itoa(validcfg.ShardCount),
|
||||
}).Warnln("falling back to default configuration")
|
||||
}
|
||||
|
||||
if cfg.GarbageCollectionInterval <= 0 {
|
||||
validcfg.GarbageCollectionInterval = time.Minute * 14
|
||||
log.WithFields(log.Fields{
|
||||
"name": Name + ".GarbageCollectionInterval",
|
||||
"provided": cfg.GarbageCollectionInterval,
|
||||
"default": validcfg.GarbageCollectionInterval,
|
||||
}).Warnln("falling back to default configuration")
|
||||
}
|
||||
|
||||
if cfg.PrometheusReportingInterval <= 0 {
|
||||
validcfg.PrometheusReportingInterval = time.Second * 1
|
||||
log.WithFields(log.Fields{
|
||||
"name": Name + ".PrometheusReportingInterval",
|
||||
"provided": cfg.PrometheusReportingInterval,
|
||||
"default": validcfg.PrometheusReportingInterval,
|
||||
}).Warnln("falling back to default configuration")
|
||||
}
|
||||
|
||||
return validcfg
|
||||
}
|
||||
|
||||
// New creates a new PeerStore backed by memory that organizes peers by a
|
||||
// pre-configured subnet mask.
|
||||
func New(provided Config) (storage.PeerStore, error) {
|
||||
cfg := provided.Validate()
|
||||
ps := &peerStore{
|
||||
cfg: cfg,
|
||||
ipv4Mask: net.CIDRMask(cfg.PreferredIPv4SubnetMaskBitsSet, 32),
|
||||
ipv6Mask: net.CIDRMask(cfg.PreferredIPv6SubnetMaskBitsSet, 128),
|
||||
shards: make([]*peerShard, cfg.ShardCount*2),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
for i := 0; i < cfg.ShardCount*2; i++ {
|
||||
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
|
||||
}
|
||||
|
||||
// Start a goroutine for garbage collection.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
return
|
||||
case <-time.After(cfg.GarbageCollectionInterval):
|
||||
before := time.Now().Add(-cfg.PeerLifetime)
|
||||
log.Debugln("memory: purging peers with no announces since", before)
|
||||
ps.collectGarbage(before)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start a goroutine for updating our cached system clock.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
t := time.NewTicker(1 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
t.Stop()
|
||||
return
|
||||
case now := <-t.C:
|
||||
ps.setClock(now.UnixNano())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start a goroutine for reporting statistics to Prometheus.
|
||||
ps.wg.Add(1)
|
||||
go func() {
|
||||
defer ps.wg.Done()
|
||||
t := time.NewTicker(cfg.PrometheusReportingInterval)
|
||||
for {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
t.Stop()
|
||||
return
|
||||
case <-t.C:
|
||||
before := time.Now()
|
||||
ps.populateProm()
|
||||
log.Debugf("memory: populateProm() took %s", time.Since(before))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
type serializedPeer string
|
||||
|
||||
func newPeerKey(p bittorrent.Peer) serializedPeer {
|
||||
b := make([]byte, 20+2+len(p.IP.IP))
|
||||
copy(b[:20], p.ID[:])
|
||||
binary.BigEndian.PutUint16(b[20:22], p.Port)
|
||||
copy(b[22:], p.IP.IP)
|
||||
|
||||
return serializedPeer(b)
|
||||
}
|
||||
|
||||
func decodePeerKey(pk serializedPeer) bittorrent.Peer {
|
||||
peer := bittorrent.Peer{
|
||||
ID: bittorrent.PeerIDFromString(string(pk[:20])),
|
||||
Port: binary.BigEndian.Uint16([]byte(pk[20:22])),
|
||||
IP: bittorrent.IP{IP: net.IP(pk[22:])}}
|
||||
|
||||
if ip := peer.IP.To4(); ip != nil {
|
||||
peer.IP.IP = ip
|
||||
peer.IP.AddressFamily = bittorrent.IPv4
|
||||
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
|
||||
peer.IP.AddressFamily = bittorrent.IPv6
|
||||
} else {
|
||||
panic("IP is neither v4 nor v6")
|
||||
}
|
||||
|
||||
return peer
|
||||
}
|
||||
|
||||
type peerSubnet string
|
||||
|
||||
func newPeerSubnet(ip bittorrent.IP, ipv4Mask, ipv6Mask net.IPMask) peerSubnet {
|
||||
var maskedIP net.IP
|
||||
switch ip.AddressFamily {
|
||||
case bittorrent.IPv4:
|
||||
maskedIP = ip.Mask(ipv4Mask)
|
||||
case bittorrent.IPv6:
|
||||
maskedIP = ip.Mask(ipv6Mask)
|
||||
default:
|
||||
panic("IP is neither v4 nor v6")
|
||||
}
|
||||
|
||||
return peerSubnet(maskedIP.String())
|
||||
}
|
||||
|
||||
type peerShard struct {
|
||||
swarms map[bittorrent.InfoHash]swarm
|
||||
numSeeders uint64
|
||||
numLeechers uint64
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
type swarm struct {
|
||||
seeders map[peerSubnet]map[serializedPeer]int64
|
||||
leechers map[peerSubnet]map[serializedPeer]int64
|
||||
}
|
||||
|
||||
func (s swarm) lenSeeders() (i int) {
|
||||
for _, subnet := range s.seeders {
|
||||
i += len(subnet)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s swarm) lenLeechers() (i int) {
|
||||
for _, subnet := range s.leechers {
|
||||
i += len(subnet)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type peerStore struct {
|
||||
cfg Config
|
||||
ipv4Mask net.IPMask
|
||||
ipv6Mask net.IPMask
|
||||
shards []*peerShard
|
||||
|
||||
// clock stores the current time nanoseconds, updated every second.
|
||||
// Must be accessed atomically!
|
||||
clock int64
|
||||
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
var _ storage.PeerStore = &peerStore{}
|
||||
|
||||
// populateProm aggregates metrics over all shards and then posts them to
|
||||
// prometheus.
|
||||
func (ps *peerStore) populateProm() {
|
||||
var numInfohashes, numSeeders, numLeechers uint64
|
||||
|
||||
for _, s := range ps.shards {
|
||||
s.RLock()
|
||||
numInfohashes += uint64(len(s.swarms))
|
||||
numSeeders += s.numSeeders
|
||||
numLeechers += s.numLeechers
|
||||
s.RUnlock()
|
||||
}
|
||||
|
||||
promInfohashesCount.Set(float64(numInfohashes))
|
||||
promSeedersCount.Set(float64(numSeeders))
|
||||
promLeechersCount.Set(float64(numLeechers))
|
||||
}
|
||||
|
||||
func (ps *peerStore) getClock() int64 {
|
||||
return atomic.LoadInt64(&ps.clock)
|
||||
}
|
||||
|
||||
func (ps *peerStore) setClock(to int64) {
|
||||
atomic.StoreInt64(&ps.clock, to)
|
||||
}
|
||||
|
||||
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
||||
// There are twice the amount of shards specified by the user, the first
|
||||
// half is dedicated to IPv4 swarms and the second half is dedicated to
|
||||
// IPv6 swarms.
|
||||
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
|
||||
if af == bittorrent.IPv6 {
|
||||
idx += uint32(len(ps.shards) / 2)
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
||||
pk := newPeerKey(p)
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
||||
shard.Lock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.swarms[ih] = swarm{
|
||||
seeders: make(map[peerSubnet]map[serializedPeer]int64),
|
||||
leechers: make(map[peerSubnet]map[serializedPeer]int64),
|
||||
}
|
||||
}
|
||||
|
||||
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
||||
|
||||
// Allocate a new map if necessary.
|
||||
if shard.swarms[ih].seeders[preferredSubnet] == nil {
|
||||
shard.swarms[ih].seeders[preferredSubnet] = make(map[serializedPeer]int64)
|
||||
}
|
||||
|
||||
// If this peer isn't already a seeder, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
|
||||
shard.numSeeders++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].seeders[preferredSubnet][pk] = ps.getClock()
|
||||
|
||||
shard.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
||||
pk := newPeerKey(p)
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
||||
shard.Lock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.Unlock()
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
||||
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
|
||||
shard.Unlock()
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
shard.numSeeders--
|
||||
delete(shard.swarms[ih].seeders[preferredSubnet], pk)
|
||||
|
||||
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
} else if len(shard.swarms[ih].seeders[preferredSubnet]) == 0 {
|
||||
delete(shard.swarms[ih].seeders, preferredSubnet)
|
||||
}
|
||||
|
||||
shard.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
||||
pk := newPeerKey(p)
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
||||
shard.Lock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.swarms[ih] = swarm{
|
||||
seeders: make(map[peerSubnet]map[serializedPeer]int64),
|
||||
leechers: make(map[peerSubnet]map[serializedPeer]int64),
|
||||
}
|
||||
}
|
||||
|
||||
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
||||
|
||||
// Allocate a new map if necessary.
|
||||
if shard.swarms[ih].leechers[preferredSubnet] == nil {
|
||||
shard.swarms[ih].leechers[preferredSubnet] = make(map[serializedPeer]int64)
|
||||
}
|
||||
|
||||
// If this peer isn't already a seeder, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; !ok {
|
||||
shard.numLeechers++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].leechers[preferredSubnet][pk] = ps.getClock()
|
||||
|
||||
shard.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
||||
pk := newPeerKey(p)
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
||||
shard.Lock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.Unlock()
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
||||
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; !ok {
|
||||
shard.Unlock()
|
||||
return storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers[preferredSubnet], pk)
|
||||
|
||||
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
} else if len(shard.swarms[ih].leechers[preferredSubnet]) == 0 {
|
||||
delete(shard.swarms[ih].leechers, preferredSubnet)
|
||||
}
|
||||
|
||||
shard.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
||||
pk := newPeerKey(p)
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
||||
shard.Lock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.swarms[ih] = swarm{
|
||||
seeders: make(map[peerSubnet]map[serializedPeer]int64),
|
||||
leechers: make(map[peerSubnet]map[serializedPeer]int64),
|
||||
}
|
||||
}
|
||||
|
||||
// If this peer is a leecher, update the stats for the swarm and remove them.
|
||||
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
||||
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; ok {
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers[preferredSubnet], pk)
|
||||
}
|
||||
|
||||
// Allocate a new map if necessary.
|
||||
if shard.swarms[ih].seeders[preferredSubnet] == nil {
|
||||
shard.swarms[ih].seeders[preferredSubnet] = make(map[serializedPeer]int64)
|
||||
}
|
||||
|
||||
// If this peer isn't already a seeder, update the stats for the swarm.
|
||||
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
|
||||
shard.numSeeders++
|
||||
}
|
||||
|
||||
// Update the peer in the swarm.
|
||||
shard.swarms[ih].seeders[preferredSubnet][pk] = ps.getClock()
|
||||
|
||||
shard.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
||||
shard := ps.shards[ps.shardIndex(ih, announcer.IP.AddressFamily)]
|
||||
shard.RLock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.RUnlock()
|
||||
return nil, storage.ErrResourceDoesNotExist
|
||||
}
|
||||
|
||||
preferredSubnet := newPeerSubnet(announcer.IP, ps.ipv4Mask, ps.ipv6Mask)
|
||||
|
||||
if seeder {
|
||||
// Append as many close leechers as possible.
|
||||
closestLeechers := shard.swarms[ih].leechers[preferredSubnet]
|
||||
for pk := range closestLeechers {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
|
||||
// Append the rest of the leechers.
|
||||
if numWant > 0 {
|
||||
for subnet := range shard.swarms[ih].leechers {
|
||||
// Already appended from this subnet explictly first.
|
||||
if subnet == preferredSubnet {
|
||||
continue
|
||||
}
|
||||
|
||||
for pk := range shard.swarms[ih].leechers[subnet] {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Append as many close seeders as possible.
|
||||
closestSeeders := shard.swarms[ih].seeders[preferredSubnet]
|
||||
for pk := range closestSeeders {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
|
||||
// Append as many close leechers as possible.
|
||||
if numWant > 0 {
|
||||
closestLeechers := shard.swarms[ih].leechers[preferredSubnet]
|
||||
announcerPK := newPeerKey(announcer)
|
||||
for pk := range closestLeechers {
|
||||
if pk == announcerPK {
|
||||
continue
|
||||
}
|
||||
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
}
|
||||
|
||||
// Append as the rest of the seeders.
|
||||
if numWant > 0 {
|
||||
for subnet := range shard.swarms[ih].seeders {
|
||||
// Already appended from this subnet explictly first.
|
||||
if subnet == preferredSubnet {
|
||||
continue
|
||||
}
|
||||
|
||||
for pk := range shard.swarms[ih].seeders[subnet] {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Append the rest of the leechers.
|
||||
if numWant > 0 {
|
||||
for subnet := range shard.swarms[ih].leechers {
|
||||
// Already appended from this subnet explictly first.
|
||||
if subnet == preferredSubnet {
|
||||
continue
|
||||
}
|
||||
|
||||
for pk := range shard.swarms[ih].leechers[subnet] {
|
||||
if numWant == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
peers = append(peers, decodePeerKey(pk))
|
||||
numWant--
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shard.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
panic("attempted to interact with stopped memory store")
|
||||
default:
|
||||
}
|
||||
|
||||
resp.InfoHash = ih
|
||||
shard := ps.shards[ps.shardIndex(ih, addressFamily)]
|
||||
shard.RLock()
|
||||
|
||||
if _, ok := shard.swarms[ih]; !ok {
|
||||
shard.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
resp.Incomplete = uint32(shard.swarms[ih].lenLeechers())
|
||||
resp.Complete = uint32(shard.swarms[ih].lenSeeders())
|
||||
shard.RUnlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// collectGarbage deletes all Peers from the PeerStore which are older than the
|
||||
// cutoff time.
|
||||
//
|
||||
// This function must be able to execute while other methods on this interface
|
||||
// are being executed in parallel.
|
||||
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
|
||||
select {
|
||||
case <-ps.closed:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
cutoffUnix := cutoff.UnixNano()
|
||||
start := time.Now()
|
||||
|
||||
for _, shard := range ps.shards {
|
||||
shard.RLock()
|
||||
var infohashes []bittorrent.InfoHash
|
||||
for ih := range shard.swarms {
|
||||
infohashes = append(infohashes, ih)
|
||||
}
|
||||
shard.RUnlock()
|
||||
runtime.Gosched()
|
||||
|
||||
for _, ih := range infohashes {
|
||||
shard.Lock()
|
||||
|
||||
if _, stillExists := shard.swarms[ih]; !stillExists {
|
||||
shard.Unlock()
|
||||
runtime.Gosched()
|
||||
continue
|
||||
}
|
||||
|
||||
for subnet := range shard.swarms[ih].leechers {
|
||||
for pk, mtime := range shard.swarms[ih].leechers[subnet] {
|
||||
if mtime <= cutoffUnix {
|
||||
shard.numLeechers--
|
||||
delete(shard.swarms[ih].leechers[subnet], pk)
|
||||
}
|
||||
}
|
||||
|
||||
if len(shard.swarms[ih].leechers[subnet]) == 0 {
|
||||
delete(shard.swarms[ih].leechers, subnet)
|
||||
}
|
||||
}
|
||||
|
||||
for subnet := range shard.swarms[ih].seeders {
|
||||
for pk, mtime := range shard.swarms[ih].seeders[subnet] {
|
||||
if mtime <= cutoffUnix {
|
||||
shard.numSeeders--
|
||||
delete(shard.swarms[ih].seeders[subnet], pk)
|
||||
}
|
||||
}
|
||||
|
||||
if len(shard.swarms[ih].seeders[subnet]) == 0 {
|
||||
delete(shard.swarms[ih].seeders, subnet)
|
||||
}
|
||||
}
|
||||
|
||||
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
|
||||
delete(shard.swarms, ih)
|
||||
}
|
||||
|
||||
shard.Unlock()
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
runtime.Gosched()
|
||||
}
|
||||
|
||||
recordGCDuration(time.Since(start))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *peerStore) Stop() <-chan error {
|
||||
c := make(chan error)
|
||||
go func() {
|
||||
close(ps.closed)
|
||||
ps.wg.Wait()
|
||||
|
||||
// Explicitly deallocate our storage.
|
||||
shards := make([]*peerShard, len(ps.shards))
|
||||
for i := 0; i < len(ps.shards); i++ {
|
||||
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
|
||||
}
|
||||
ps.shards = shards
|
||||
|
||||
close(c)
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (ps *peerStore) LogFields() log.Fields {
|
||||
return ps.cfg.LogFields()
|
||||
}
|
48
storage/memorybysubnet/peer_store_test.go
Normal file
48
storage/memorybysubnet/peer_store_test.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package memorybysubnet
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
s "github.com/chihaya/chihaya/storage"
|
||||
)
|
||||
|
||||
func createNew() s.PeerStore {
|
||||
ps, err := New(Config{
|
||||
ShardCount: 1024,
|
||||
GarbageCollectionInterval: 10 * time.Minute,
|
||||
PreferredIPv4SubnetMaskBitsSet: 31,
|
||||
PreferredIPv6SubnetMaskBitsSet: 64,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ps
|
||||
}
|
||||
|
||||
func TestPeerStore(t *testing.T) { s.TestPeerStore(t, createNew()) }
|
||||
|
||||
func BenchmarkPut(b *testing.B) { s.Put(b, createNew()) }
|
||||
func BenchmarkPut1k(b *testing.B) { s.Put1k(b, createNew()) }
|
||||
func BenchmarkPut1kInfohash(b *testing.B) { s.Put1kInfohash(b, createNew()) }
|
||||
func BenchmarkPut1kInfohash1k(b *testing.B) { s.Put1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkPutDelete(b *testing.B) { s.PutDelete(b, createNew()) }
|
||||
func BenchmarkPutDelete1k(b *testing.B) { s.PutDelete1k(b, createNew()) }
|
||||
func BenchmarkPutDelete1kInfohash(b *testing.B) { s.PutDelete1kInfohash(b, createNew()) }
|
||||
func BenchmarkPutDelete1kInfohash1k(b *testing.B) { s.PutDelete1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist(b *testing.B) { s.DeleteNonexist(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist1k(b *testing.B) { s.DeleteNonexist1k(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist1kInfohash(b *testing.B) { s.DeleteNonexist1kInfohash(b, createNew()) }
|
||||
func BenchmarkDeleteNonexist1kInfohash1k(b *testing.B) { s.DeleteNonexist1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkPutGradDelete(b *testing.B) { s.PutGradDelete(b, createNew()) }
|
||||
func BenchmarkPutGradDelete1k(b *testing.B) { s.PutGradDelete1k(b, createNew()) }
|
||||
func BenchmarkPutGradDelete1kInfohash(b *testing.B) { s.PutGradDelete1kInfohash(b, createNew()) }
|
||||
func BenchmarkPutGradDelete1kInfohash1k(b *testing.B) { s.PutGradDelete1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkGradNonexist(b *testing.B) { s.GradNonexist(b, createNew()) }
|
||||
func BenchmarkGradNonexist1k(b *testing.B) { s.GradNonexist1k(b, createNew()) }
|
||||
func BenchmarkGradNonexist1kInfohash(b *testing.B) { s.GradNonexist1kInfohash(b, createNew()) }
|
||||
func BenchmarkGradNonexist1kInfohash1k(b *testing.B) { s.GradNonexist1kInfohash1k(b, createNew()) }
|
||||
func BenchmarkAnnounceLeecher(b *testing.B) { s.AnnounceLeecher(b, createNew()) }
|
||||
func BenchmarkAnnounceLeecher1kInfohash(b *testing.B) { s.AnnounceLeecher1kInfohash(b, createNew()) }
|
||||
func BenchmarkAnnounceSeeder(b *testing.B) { s.AnnounceSeeder(b, createNew()) }
|
||||
func BenchmarkAnnounceSeeder1kInfohash(b *testing.B) { s.AnnounceSeeder1kInfohash(b, createNew()) }
|
|
@ -1,14 +1,33 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
|
||||
"github.com/chihaya/chihaya/bittorrent"
|
||||
"github.com/chihaya/chihaya/pkg/stop"
|
||||
)
|
||||
|
||||
var (
|
||||
driversM sync.RWMutex
|
||||
drivers = make(map[string]Driver)
|
||||
)
|
||||
|
||||
// Driver is the interface used to initalize a new type of PeerStore.
|
||||
type Driver interface {
|
||||
NewPeerStore(cfg interface{}) (PeerStore, error)
|
||||
}
|
||||
|
||||
// ErrResourceDoesNotExist is the error returned by all delete methods in the
|
||||
// store if the requested resource does not exist.
|
||||
var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
|
||||
|
||||
// ErrDriverDoesNotExist is the error returned by NewPeerStore when a peer
|
||||
// store driver with that name does not exist.
|
||||
var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist")
|
||||
|
||||
// PeerStore is an interface that abstracts the interactions of storing and
|
||||
// manipulating Peers such that it can be implemented for various data stores.
|
||||
type PeerStore interface {
|
||||
|
@ -69,4 +88,47 @@ type PeerStore interface {
|
|||
// PeerStore.
|
||||
// For more details see the documentation in the stop package.
|
||||
stop.Stopper
|
||||
|
||||
// LogFields returns a loggable version of the data used to configure and
|
||||
// operate a particular peer store.
|
||||
LogFields() log.Fields
|
||||
}
|
||||
|
||||
// RegisterDriver makes a Driver available by the provided name.
|
||||
//
|
||||
// If called twice with the same name, the name is blank, or if the provided
|
||||
// Driver is nil, this function panics.
|
||||
func RegisterDriver(name string, d Driver) {
|
||||
if name == "" {
|
||||
panic("storage: could not register a Driver with an empty name")
|
||||
}
|
||||
if d == nil {
|
||||
panic("storage: could not register a nil Driver")
|
||||
}
|
||||
|
||||
driversM.Lock()
|
||||
defer driversM.Unlock()
|
||||
|
||||
if _, dup := drivers[name]; dup {
|
||||
panic("storage: RegisterDriver called twice for " + name)
|
||||
}
|
||||
|
||||
drivers[name] = d
|
||||
}
|
||||
|
||||
// NewPeerStore attempts to initialize a new PeerStore with given a name from
|
||||
// the list of registered Drivers.
|
||||
//
|
||||
// If a driver does not exist, returns ErrDriverDoesNotExist.
|
||||
func NewPeerStore(name string, cfg interface{}) (ps PeerStore, err error) {
|
||||
driversM.RLock()
|
||||
defer driversM.RUnlock()
|
||||
|
||||
var d Driver
|
||||
d, ok := drivers[name]
|
||||
if !ok {
|
||||
return nil, ErrDriverDoesNotExist
|
||||
}
|
||||
|
||||
return d.NewPeerStore(cfg)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue