storage: sync memory & memorybysubnet

This change is a manual merge of all of the optimizations that have been
added to the memory peer store into the memorybysubnet peer store.

This also fixes some inconsistencies between the two.
This commit is contained in:
Jimmy Zelinskie 2017-06-03 21:12:17 -04:00
parent 7786e1a915
commit c41519e73f
2 changed files with 437 additions and 268 deletions

View file

@ -1,10 +1,12 @@
// Package memory implements the storage interface for a Chihaya
// BitTorrent tracker keeping peer data in memory.
package memory
import (
"encoding/binary"
"errors"
"net"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -22,23 +24,26 @@ const Name = "memory"
func init() {
// Register Prometheus metrics.
prometheus.MustRegister(promGCDurationMilliseconds)
prometheus.MustRegister(promInfohashesCount)
prometheus.MustRegister(promSeedersCount, promLeechersCount)
prometheus.MustRegister(
promGCDurationMilliseconds,
promInfohashesCount,
promSeedersCount,
promLeechersCount,
)
// Register the storage driver.
storage.RegisterDriver(Name, driver{})
}
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "chihaya_storage_memory_gc_duration_milliseconds",
Name: "chihaya_storage_gc_duration_milliseconds",
Help: "The time it takes to perform storage garbage collection",
Buckets: prometheus.ExponentialBuckets(9.375, 2, 10),
})
var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_memory_infohashes_count",
Help: "The number of infohashes tracked",
Name: "chihaya_storage_infohashes_count",
Help: "The number of Infohashes tracked",
})
var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{
@ -56,11 +61,6 @@ func recordGCDuration(duration time.Duration) {
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
// recordInfohashesDelta records a change in the number of Infohashes tracked.
func recordInfohashesDelta(delta float64) {
promInfohashesCount.Add(delta)
}
type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
@ -80,10 +80,6 @@ func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
return New(cfg)
}
// ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is
// less than or equal to zero.
var ErrInvalidGCInterval = errors.New("invalid garbage collection interval")
// Config holds the configuration of a memory PeerStore.
type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
@ -103,36 +99,64 @@ func (cfg Config) LogFields() log.Fields {
}
}
// New creates a new PeerStore backed by memory.
func New(cfg Config) (storage.PeerStore, error) {
var shardCount int
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() Config {
validcfg := cfg
if cfg.ShardCount > 0 {
shardCount = cfg.ShardCount
validcfg.ShardCount = cfg.ShardCount
} else {
log.Warnln("storage: shardCount not configured, using 1 as default value.")
shardCount = 1
validcfg.ShardCount = 1024
log.WithFields(log.Fields{
"name": Name + ".ShardCount",
"provided": strconv.Itoa(cfg.ShardCount),
"default": strconv.Itoa(validcfg.ShardCount),
}).Warnln("falling back to default configuration")
}
if cfg.GarbageCollectionInterval <= 0 {
return nil, ErrInvalidGCInterval
validcfg.GarbageCollectionInterval = time.Minute * 14
log.WithFields(log.Fields{
"name": Name + ".GarbageCollectionInterval",
"provided": cfg.GarbageCollectionInterval,
"default": validcfg.GarbageCollectionInterval,
}).Warnln("falling back to default configuration")
}
if cfg.PrometheusReportingInterval <= 0 {
validcfg.PrometheusReportingInterval = time.Second * 1
log.WithFields(log.Fields{
"name": Name + ".PrometheusReportingInterval",
"provided": cfg.PrometheusReportingInterval,
"default": validcfg.PrometheusReportingInterval,
}).Warnln("falling back to default configuration")
}
return validcfg
}
// New creates a new PeerStore backed by memory.
func New(provided Config) (storage.PeerStore, error) {
cfg := provided.Validate()
ps := &peerStore{
cfg: cfg,
shards: make([]*peerShard, shardCount*2),
closing: make(chan struct{}),
cfg: cfg,
shards: make([]*peerShard, cfg.ShardCount*2),
closed: make(chan struct{}),
}
for i := 0; i < shardCount*2; i++ {
for i := 0; i < cfg.ShardCount*2; i++ {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
// Start a goroutine for garbage collection.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
for {
select {
case <-ps.closing:
case <-ps.closed:
return
case <-time.After(cfg.GarbageCollectionInterval):
before := time.Now().Add(-cfg.PeerLifetime)
@ -142,13 +166,14 @@ func New(cfg Config) (storage.PeerStore, error) {
}
}()
// Start a goroutine for updating our cached system clock.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(1 * time.Second)
for {
select {
case <-ps.closing:
case <-ps.closed:
t.Stop()
return
case now := <-t.C:
@ -157,17 +182,14 @@ func New(cfg Config) (storage.PeerStore, error) {
}
}()
// Start a goroutine for reporting statistics to Prometheus.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
if cfg.PrometheusReportingInterval <= 0 {
cfg.PrometheusReportingInterval = 1
log.Warn("storage: PrometheusReportingInterval not specified/invalid, defaulting to 1 second")
}
t := time.NewTicker(cfg.PrometheusReportingInterval)
for {
select {
case <-ps.closing:
case <-ps.closed:
t.Stop()
return
case <-t.C:
@ -183,70 +205,6 @@ func New(cfg Config) (storage.PeerStore, error) {
type serializedPeer string
type peerShard struct {
swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
numLeechers uint64
sync.RWMutex
}
type swarm struct {
// map serialized peer to mtime
seeders map[serializedPeer]int64
leechers map[serializedPeer]int64
}
type peerStore struct {
cfg Config
shards []*peerShard
// clock stores the current time nanoseconds, updated every second.
// Must be accessed atomically!
clock int64
closing chan struct{}
wg sync.WaitGroup
}
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
}
promInfohashesCount.Set(float64(numInfohashes))
promSeedersCount.Set(float64(numSeeders))
promLeechersCount.Set(float64(numLeechers))
}
var _ storage.PeerStore = &peerStore{}
func (ps *peerStore) getClock() int64 {
return atomic.LoadInt64(&ps.clock)
}
func (ps *peerStore) setClock(to int64) {
atomic.StoreInt64(&ps.clock, to)
}
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
// There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms.
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
if af == bittorrent.IPv6 {
idx += uint32(len(ps.shards) / 2)
}
return idx
}
func newPeerKey(p bittorrent.Peer) serializedPeer {
b := make([]byte, 20+2+len(p.IP.IP))
copy(b[:20], p.ID[:])
@ -274,9 +232,73 @@ func decodePeerKey(pk serializedPeer) bittorrent.Peer {
return peer
}
type peerShard struct {
swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
numLeechers uint64
sync.RWMutex
}
type swarm struct {
// map serialized peer to mtime
seeders map[serializedPeer]int64
leechers map[serializedPeer]int64
}
type peerStore struct {
cfg Config
shards []*peerShard
// clock stores the current time nanoseconds, updated every second.
// Must be accessed atomically!
clock int64
closed chan struct{}
wg sync.WaitGroup
}
var _ storage.PeerStore = &peerStore{}
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
}
promInfohashesCount.Set(float64(numInfohashes))
promSeedersCount.Set(float64(numSeeders))
promLeechersCount.Set(float64(numLeechers))
}
func (ps *peerStore) getClock() int64 {
return atomic.LoadInt64(&ps.clock)
}
func (ps *peerStore) setClock(to int64) {
atomic.StoreInt64(&ps.clock, to)
}
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
// There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms.
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
if af == bittorrent.IPv6 {
idx += uint32(len(ps.shards) / 2)
}
return idx
}
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closing:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
@ -293,10 +315,12 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
}
}
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
// new peer
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[pk] = ps.getClock()
shard.Unlock()
@ -305,7 +329,7 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closing:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
@ -325,11 +349,8 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
return storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[ih].seeders[pk]; ok {
// seeder actually removed
shard.numSeeders--
delete(shard.swarms[ih].seeders, pk)
}
shard.numSeeders--
delete(shard.swarms[ih].seeders, pk)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
@ -341,7 +362,7 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closing:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
@ -358,10 +379,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
}
}
// If this peer isn't already a leecher, update the stats for the swarm.
if _, ok := shard.swarms[ih].leechers[pk]; !ok {
// new leecher
shard.numLeechers++
}
// Update the peer in the swarm.
shard.swarms[ih].leechers[pk] = ps.getClock()
shard.Unlock()
@ -370,7 +393,7 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closing:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
@ -390,11 +413,8 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
return storage.ErrResourceDoesNotExist
}
if _, ok := shard.swarms[ih].leechers[pk]; ok {
// leecher actually removed
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
}
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
delete(shard.swarms, ih)
@ -406,7 +426,7 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-ps.closing:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
@ -423,16 +443,18 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
}
}
// If this peer is a leecher, update the stats for the swarm and remove them.
if _, ok := shard.swarms[ih].leechers[pk]; ok {
// leecher actually removed
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
}
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[pk]; !ok {
// new seeder
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[pk] = ps.getClock()
shard.Unlock()
@ -441,7 +463,7 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
select {
case <-ps.closing:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
@ -457,41 +479,40 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
if seeder {
// Append leechers as possible.
leechers := shard.swarms[ih].leechers
for p := range leechers {
decodedPeer := decodePeerKey(p)
for pk := range leechers {
if numWant == 0 {
break
}
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
} else {
// Append as many seeders as possible.
seeders := shard.swarms[ih].seeders
for p := range seeders {
decodedPeer := decodePeerKey(p)
for pk := range seeders {
if numWant == 0 {
break
}
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
// Append leechers until we reach numWant.
leechers := shard.swarms[ih].leechers
if numWant > 0 {
for p := range leechers {
decodedPeer := decodePeerKey(p)
leechers := shard.swarms[ih].leechers
announcerPK := newPeerKey(announcer)
for pk := range leechers {
if pk == announcerPK {
continue
}
if numWant == 0 {
break
}
if decodedPeer.Equal(announcer) {
continue
}
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
}
@ -503,7 +524,7 @@ func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
select {
case <-ps.closing:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
@ -531,13 +552,14 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorren
// are being executed in parallel.
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
select {
case <-ps.closing:
panic("attempted to interact with stopped memory store")
case <-ps.closed:
return nil
default:
}
cutoffUnix := cutoff.UnixNano()
start := time.Now()
for _, shard := range ps.shards {
shard.RLock()
var infohashes []bittorrent.InfoHash
@ -558,15 +580,15 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
for pk, mtime := range shard.swarms[ih].leechers {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].leechers, pk)
shard.numLeechers--
delete(shard.swarms[ih].leechers, pk)
}
}
for pk, mtime := range shard.swarms[ih].seeders {
if mtime <= cutoffUnix {
delete(shard.swarms[ih].seeders, pk)
shard.numSeeders--
delete(shard.swarms[ih].seeders, pk)
}
}
@ -589,7 +611,7 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
func (ps *peerStore) Stop() <-chan error {
c := make(chan error)
go func() {
close(ps.closing)
close(ps.closed)
ps.wg.Wait()
// Explicitly deallocate our storage.

View file

@ -1,14 +1,15 @@
// Package memorybysubnet implements the storage interface for a Chihaya
// BitTorrent tracker keeping peer data in memory organized by a pre-configured
// subnet.
// subnet mask.
package memorybysubnet
import (
"encoding/binary"
"errors"
"net"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
log "github.com/Sirupsen/logrus"
@ -23,34 +24,44 @@ import (
const Name = "memorybysubnet"
func init() {
prometheus.MustRegister(promGCDurationMilliseconds)
prometheus.MustRegister(promInfohashesCount)
// Register Prometheus metrics.
prometheus.MustRegister(
promGCDurationMilliseconds,
promInfohashesCount,
promSeedersCount,
promLeechersCount,
)
// Register the storage driver.
storage.RegisterDriver(Name, driver{})
}
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "chihaya_storage_memorybysubnet_gc_duration_milliseconds",
Name: "chihaya_storage_gc_duration_milliseconds",
Help: "The time it takes to perform storage garbage collection",
Buckets: prometheus.ExponentialBuckets(9.375, 2, 10),
})
var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_memorybysubnet_infohashes_count",
Name: "chihaya_storage_infohashes_count",
Help: "The number of Infohashes tracked",
})
var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_seeders_count",
Help: "The number of seeders tracked",
})
var promLeechersCount = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "chihaya_storage_leechers_count",
Help: "The number of leechers tracked",
})
// recordGCDuration records the duration of a GC sweep.
func recordGCDuration(duration time.Duration) {
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
// recordInfohashesDelta records a change in the number of Infohashes tracked.
func recordInfohashesDelta(delta float64) {
promInfohashesCount.Add(delta)
}
type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
@ -70,13 +81,10 @@ func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
return New(cfg)
}
// ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is
// less than or equal to zero.
var ErrInvalidGCInterval = errors.New("invalid garbage collection interval")
// Config holds the configuration of a memory PeerStore.
type Config struct {
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
PeerLifetime time.Duration `yaml:"peer_lifetime"`
ShardCount int `yaml:"shard_count"`
PreferredIPv4SubnetMaskBitsSet int `yaml:"preferred_ipv4_subnet_mask_bits_set"`
@ -86,39 +94,74 @@ type Config struct {
// LogFields renders the current config as a set of Logrus fields.
func (cfg Config) LogFields() log.Fields {
return log.Fields{
"name": Name,
"gcInterval": cfg.GarbageCollectionInterval,
"peerLifetime": cfg.PeerLifetime,
"shardCount": cfg.ShardCount,
"prefIPv4Mask": cfg.PreferredIPv4SubnetMaskBitsSet,
"prefIPv6Mask": cfg.PreferredIPv6SubnetMaskBitsSet,
"name": Name,
"gcInterval": cfg.GarbageCollectionInterval,
"promReportInterval": cfg.PrometheusReportingInterval,
"peerLifetime": cfg.PeerLifetime,
"shardCount": cfg.ShardCount,
"prefIPv4Mask": cfg.PreferredIPv4SubnetMaskBitsSet,
"prefIPv6Mask": cfg.PreferredIPv6SubnetMaskBitsSet,
}
}
// New creates a new PeerStore backed by memory.
func New(cfg Config) (storage.PeerStore, error) {
shardCount := 1
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() Config {
validcfg := cfg
if cfg.ShardCount > 0 {
shardCount = cfg.ShardCount
validcfg.ShardCount = cfg.ShardCount
} else {
validcfg.ShardCount = 1024
log.WithFields(log.Fields{
"name": Name + ".ShardCount",
"provided": strconv.Itoa(cfg.ShardCount),
"default": strconv.Itoa(validcfg.ShardCount),
}).Warnln("falling back to default configuration")
}
if cfg.GarbageCollectionInterval <= 0 {
return nil, ErrInvalidGCInterval
validcfg.GarbageCollectionInterval = time.Minute * 14
log.WithFields(log.Fields{
"name": Name + ".GarbageCollectionInterval",
"provided": cfg.GarbageCollectionInterval,
"default": validcfg.GarbageCollectionInterval,
}).Warnln("falling back to default configuration")
}
if cfg.PrometheusReportingInterval <= 0 {
validcfg.PrometheusReportingInterval = time.Second * 1
log.WithFields(log.Fields{
"name": Name + ".PrometheusReportingInterval",
"provided": cfg.PrometheusReportingInterval,
"default": validcfg.PrometheusReportingInterval,
}).Warnln("falling back to default configuration")
}
return validcfg
}
// New creates a new PeerStore backed by memory that organizes peers by a
// pre-configured subnet mask.
func New(provided Config) (storage.PeerStore, error) {
cfg := provided.Validate()
ps := &peerStore{
cfg: cfg,
ipv4Mask: net.CIDRMask(cfg.PreferredIPv4SubnetMaskBitsSet, 32),
ipv6Mask: net.CIDRMask(cfg.PreferredIPv6SubnetMaskBitsSet, 128),
shards: make([]*peerShard, shardCount*2),
shards: make([]*peerShard, cfg.ShardCount*2),
closed: make(chan struct{}),
}
for i := 0; i < shardCount*2; i++ {
for i := 0; i < cfg.ShardCount*2; i++ {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
// Start a goroutine for garbage collection.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
for {
select {
case <-ps.closed:
@ -131,6 +174,40 @@ func New(cfg Config) (storage.PeerStore, error) {
}
}()
// Start a goroutine for updating our cached system clock.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(1 * time.Second)
for {
select {
case <-ps.closed:
t.Stop()
return
case now := <-t.C:
ps.setClock(now.UnixNano())
}
}
}()
// Start a goroutine for reporting statistics to Prometheus.
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTicker(cfg.PrometheusReportingInterval)
for {
select {
case <-ps.closed:
t.Stop()
return
case <-t.C:
before := time.Now()
ps.populateProm()
log.Debugf("memory: populateProm() took %s", time.Since(before))
}
}
}()
return ps, nil
}
@ -145,6 +222,24 @@ func newPeerKey(p bittorrent.Peer) serializedPeer {
return serializedPeer(b)
}
func decodePeerKey(pk serializedPeer) bittorrent.Peer {
peer := bittorrent.Peer{
ID: bittorrent.PeerIDFromString(string(pk[:20])),
Port: binary.BigEndian.Uint16([]byte(pk[20:22])),
IP: bittorrent.IP{IP: net.IP(pk[22:])}}
if ip := peer.IP.To4(); ip != nil {
peer.IP.IP = ip
peer.IP.AddressFamily = bittorrent.IPv4
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
peer.IP.AddressFamily = bittorrent.IPv6
} else {
panic("IP is neither v4 nor v6")
}
return peer
}
type peerSubnet string
func newPeerSubnet(ip bittorrent.IP, ipv4Mask, ipv6Mask net.IPMask) peerSubnet {
@ -162,7 +257,9 @@ func newPeerSubnet(ip bittorrent.IP, ipv4Mask, ipv6Mask net.IPMask) peerSubnet {
}
type peerShard struct {
swarms map[bittorrent.InfoHash]swarm
swarms map[bittorrent.InfoHash]swarm
numSeeders uint64
numLeechers uint64
sync.RWMutex
}
@ -191,50 +288,63 @@ type peerStore struct {
ipv6Mask net.IPMask
shards []*peerShard
// clock stores the current time nanoseconds, updated every second.
// Must be accessed atomically!
clock int64
closed chan struct{}
wg sync.WaitGroup
}
var _ storage.PeerStore = &peerStore{}
func (s *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
// populateProm aggregates metrics over all shards and then posts them to
// prometheus.
func (ps *peerStore) populateProm() {
var numInfohashes, numSeeders, numLeechers uint64
for _, s := range ps.shards {
s.RLock()
numInfohashes += uint64(len(s.swarms))
numSeeders += s.numSeeders
numLeechers += s.numLeechers
s.RUnlock()
}
promInfohashesCount.Set(float64(numInfohashes))
promSeedersCount.Set(float64(numSeeders))
promLeechersCount.Set(float64(numLeechers))
}
func (ps *peerStore) getClock() int64 {
return atomic.LoadInt64(&ps.clock)
}
func (ps *peerStore) setClock(to int64) {
atomic.StoreInt64(&ps.clock, to)
}
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
// There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms.
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(s.shards)) / 2)
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
if af == bittorrent.IPv6 {
idx += uint32(len(s.shards) / 2)
idx += uint32(len(ps.shards) / 2)
}
return idx
}
func decodePeerKey(pk serializedPeer) bittorrent.Peer {
peer := bittorrent.Peer{
ID: bittorrent.PeerIDFromString(string(pk[:20])),
Port: binary.BigEndian.Uint16([]byte(pk[20:22])),
IP: bittorrent.IP{IP: net.IP(pk[22:])}}
if ip := peer.IP.To4(); ip != nil {
peer.IP.IP = ip
peer.IP.AddressFamily = bittorrent.IPv4
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
peer.IP.AddressFamily = bittorrent.IPv6
} else {
panic("IP is neither v4 nor v6")
}
return peer
}
func (s *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@ -242,29 +352,37 @@ func (s *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
seeders: make(map[peerSubnet]map[serializedPeer]int64),
leechers: make(map[peerSubnet]map[serializedPeer]int64),
}
recordInfohashesDelta(1)
}
preferredSubnet := newPeerSubnet(p.IP, s.ipv4Mask, s.ipv6Mask)
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
// Allocate a new map if necessary.
if shard.swarms[ih].seeders[preferredSubnet] == nil {
shard.swarms[ih].seeders[preferredSubnet] = make(map[serializedPeer]int64)
}
shard.swarms[ih].seeders[preferredSubnet][pk] = time.Now().UnixNano()
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[preferredSubnet][pk] = ps.getClock()
shard.Unlock()
return nil
}
func (s *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@ -272,33 +390,35 @@ func (s *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) erro
return storage.ErrResourceDoesNotExist
}
preferredSubnet := newPeerSubnet(p.IP, s.ipv4Mask, s.ipv6Mask)
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
shard.Unlock()
return storage.ErrResourceDoesNotExist
}
shard.numSeeders--
delete(shard.swarms[ih].seeders[preferredSubnet], pk)
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
delete(shard.swarms, ih)
recordInfohashesDelta(-1)
} else if len(shard.swarms[ih].seeders[preferredSubnet]) == 0 {
delete(shard.swarms[ih].seeders, preferredSubnet)
}
shard.Unlock()
return nil
}
func (s *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@ -306,29 +426,37 @@ func (s *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
seeders: make(map[peerSubnet]map[serializedPeer]int64),
leechers: make(map[peerSubnet]map[serializedPeer]int64),
}
recordInfohashesDelta(1)
}
preferredSubnet := newPeerSubnet(p.IP, s.ipv4Mask, s.ipv6Mask)
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
// Allocate a new map if necessary.
if shard.swarms[ih].leechers[preferredSubnet] == nil {
shard.swarms[ih].leechers[preferredSubnet] = make(map[serializedPeer]int64)
}
shard.swarms[ih].leechers[preferredSubnet][pk] = time.Now().UnixNano()
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; !ok {
shard.numLeechers++
}
// Update the peer in the swarm.
shard.swarms[ih].leechers[preferredSubnet][pk] = ps.getClock()
shard.Unlock()
return nil
}
func (s *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@ -336,33 +464,35 @@ func (s *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) err
return storage.ErrResourceDoesNotExist
}
preferredSubnet := newPeerSubnet(p.IP, s.ipv4Mask, s.ipv6Mask)
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; !ok {
shard.Unlock()
return storage.ErrResourceDoesNotExist
}
shard.numLeechers--
delete(shard.swarms[ih].leechers[preferredSubnet], pk)
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
delete(shard.swarms, ih)
recordInfohashesDelta(-1)
} else if len(shard.swarms[ih].leechers[preferredSubnet]) == 0 {
delete(shard.swarms[ih].leechers, preferredSubnet)
}
shard.Unlock()
return nil
}
func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@ -370,29 +500,40 @@ func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) e
seeders: make(map[peerSubnet]map[serializedPeer]int64),
leechers: make(map[peerSubnet]map[serializedPeer]int64),
}
recordInfohashesDelta(1)
}
preferredSubnet := newPeerSubnet(p.IP, s.ipv4Mask, s.ipv6Mask)
delete(shard.swarms[ih].leechers[preferredSubnet], pk)
// If this peer is a leecher, update the stats for the swarm and remove them.
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; ok {
shard.numLeechers--
delete(shard.swarms[ih].leechers[preferredSubnet], pk)
}
// Allocate a new map if necessary.
if shard.swarms[ih].seeders[preferredSubnet] == nil {
shard.swarms[ih].seeders[preferredSubnet] = make(map[serializedPeer]int64)
}
shard.swarms[ih].seeders[preferredSubnet][pk] = time.Now().UnixNano()
// If this peer isn't already a seeder, update the stats for the swarm.
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
shard.numSeeders++
}
// Update the peer in the swarm.
shard.swarms[ih].seeders[preferredSubnet][pk] = ps.getClock()
shard.Unlock()
return nil
}
func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
select {
case <-s.closed:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
shard := s.shards[s.shardIndex(ih, announcer.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, announcer.IP.AddressFamily)]
shard.RLock()
if _, ok := shard.swarms[ih]; !ok {
@ -400,35 +541,34 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
return nil, storage.ErrResourceDoesNotExist
}
preferredSubnet := newPeerSubnet(announcer.IP, s.ipv4Mask, s.ipv6Mask)
preferredSubnet := newPeerSubnet(announcer.IP, ps.ipv4Mask, ps.ipv6Mask)
if seeder {
// Append as many close leechers as possible.
closestLeechers := shard.swarms[ih].leechers[preferredSubnet]
for p := range closestLeechers {
for pk := range closestLeechers {
if numWant == 0 {
break
}
decodedPeer := decodePeerKey(p)
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
// Append the rest of the leechers.
if numWant > 0 {
for subnet := range shard.swarms[ih].leechers {
// Already appended from this subnet explictly first.
if subnet == preferredSubnet {
continue
}
for p := range shard.swarms[ih].leechers[subnet] {
for pk := range shard.swarms[ih].leechers[subnet] {
if numWant == 0 {
break
}
decodedPeer := decodePeerKey(p)
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
}
@ -436,42 +576,47 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
} else {
// Append as many close seeders as possible.
closestSeeders := shard.swarms[ih].seeders[preferredSubnet]
for p := range closestSeeders {
for pk := range closestSeeders {
if numWant == 0 {
break
}
decodedPeer := decodePeerKey(p)
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
// Append as many close leechers as possible.
closestLeechers := shard.swarms[ih].leechers[preferredSubnet]
for p := range closestLeechers {
if numWant == 0 {
break
}
decodedPeer := decodePeerKey(p)
if numWant > 0 {
closestLeechers := shard.swarms[ih].leechers[preferredSubnet]
announcerPK := newPeerKey(announcer)
for pk := range closestLeechers {
if pk == announcerPK {
continue
}
peers = append(peers, decodedPeer)
numWant--
if numWant == 0 {
break
}
peers = append(peers, decodePeerKey(pk))
numWant--
}
}
// Append as the rest of the seeders.
if numWant > 0 {
for subnet := range shard.swarms[ih].seeders {
// Already appended from this subnet explictly first.
if subnet == preferredSubnet {
continue
}
for p := range shard.swarms[ih].seeders[subnet] {
for pk := range shard.swarms[ih].seeders[subnet] {
if numWant == 0 {
break
}
decodedPeer := decodePeerKey(p)
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
}
@ -480,20 +625,17 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
// Append the rest of the leechers.
if numWant > 0 {
for subnet := range shard.swarms[ih].leechers {
// Already appended from this subnet explictly first.
if subnet == preferredSubnet {
continue
}
for p := range shard.swarms[ih].leechers[subnet] {
for pk := range shard.swarms[ih].leechers[subnet] {
if numWant == 0 {
break
}
decodedPeer := decodePeerKey(p)
if decodedPeer.Equal(announcer) {
continue
}
peers = append(peers, decodedPeer)
peers = append(peers, decodePeerKey(pk))
numWant--
}
}
@ -504,15 +646,15 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
return
}
func (s *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
select {
case <-s.closed:
case <-ps.closed:
panic("attempted to interact with stopped memory store")
default:
}
resp.InfoHash = ih
shard := s.shards[s.shardIndex(ih, addressFamily)]
shard := ps.shards[ps.shardIndex(ih, addressFamily)]
shard.RLock()
if _, ok := shard.swarms[ih]; !ok {
@ -532,18 +674,17 @@ func (s *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent
//
// This function must be able to execute while other methods on this interface
// are being executed in parallel.
func (s *peerStore) collectGarbage(cutoff time.Time) error {
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
select {
case <-s.closed:
panic("attempted to interact with stopped memory store")
case <-ps.closed:
return nil
default:
}
var ihDelta float64
cutoffUnix := cutoff.UnixNano()
start := time.Now()
for _, shard := range s.shards {
for _, shard := range ps.shards {
shard.RLock()
var infohashes []bittorrent.InfoHash
for ih := range shard.swarms {
@ -564,9 +705,11 @@ func (s *peerStore) collectGarbage(cutoff time.Time) error {
for subnet := range shard.swarms[ih].leechers {
for pk, mtime := range shard.swarms[ih].leechers[subnet] {
if mtime <= cutoffUnix {
shard.numLeechers--
delete(shard.swarms[ih].leechers[subnet], pk)
}
}
if len(shard.swarms[ih].leechers[subnet]) == 0 {
delete(shard.swarms[ih].leechers, subnet)
}
@ -575,18 +718,18 @@ func (s *peerStore) collectGarbage(cutoff time.Time) error {
for subnet := range shard.swarms[ih].seeders {
for pk, mtime := range shard.swarms[ih].seeders[subnet] {
if mtime <= cutoffUnix {
shard.numSeeders--
delete(shard.swarms[ih].seeders[subnet], pk)
}
}
if len(shard.swarms[ih].seeders[subnet]) == 0 {
delete(shard.swarms[ih].seeders, subnet)
}
}
// TODO(jzelinskie): fix this to sum all peers in all subnets
if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 {
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
delete(shard.swarms, ih)
ihDelta--
}
shard.Unlock()
@ -597,25 +740,29 @@ func (s *peerStore) collectGarbage(cutoff time.Time) error {
}
recordGCDuration(time.Since(start))
recordInfohashesDelta(ihDelta)
return nil
}
func (s *peerStore) Stop() <-chan error {
toReturn := make(chan error)
func (ps *peerStore) Stop() <-chan error {
c := make(chan error)
go func() {
shards := make([]*peerShard, len(s.shards))
for i := 0; i < len(s.shards); i++ {
close(ps.closed)
ps.wg.Wait()
// Explicitly deallocate our storage.
shards := make([]*peerShard, len(ps.shards))
for i := 0; i < len(ps.shards); i++ {
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
s.shards = shards
close(s.closed)
close(toReturn)
ps.shards = shards
close(c)
}()
return toReturn
return c
}
func (s *peerStore) LogFields() log.Fields {
return s.cfg.LogFields()
func (ps *peerStore) LogFields() log.Fields {
return ps.cfg.LogFields()
}