This change is a manual merge of all of the optimizations that have been added to the memory peer store into the memorybysubnet peer store. This also fixes some inconsistencies between the two.
768 lines
19 KiB
Go
768 lines
19 KiB
Go
// Package memorybysubnet implements the storage interface for a Chihaya
|
|
// BitTorrent tracker keeping peer data in memory organized by a pre-configured
|
|
// subnet mask.
|
|
package memorybysubnet
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"net"
|
|
"runtime"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"gopkg.in/yaml.v2"
|
|
|
|
"github.com/chihaya/chihaya/bittorrent"
|
|
"github.com/chihaya/chihaya/storage"
|
|
)
|
|
|
|
// Name is the name by which this peer store is registered with Chihaya.
|
|
const Name = "memorybysubnet"
|
|
|
|
func init() {
|
|
// Register Prometheus metrics.
|
|
prometheus.MustRegister(
|
|
promGCDurationMilliseconds,
|
|
promInfohashesCount,
|
|
promSeedersCount,
|
|
promLeechersCount,
|
|
)
|
|
|
|
// Register the storage driver.
|
|
storage.RegisterDriver(Name, driver{})
|
|
}
|
|
|
|
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
|
|
Name: "chihaya_storage_gc_duration_milliseconds",
|
|
Help: "The time it takes to perform storage garbage collection",
|
|
Buckets: prometheus.ExponentialBuckets(9.375, 2, 10),
|
|
})
|
|
|
|
var promInfohashesCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "chihaya_storage_infohashes_count",
|
|
Help: "The number of Infohashes tracked",
|
|
})
|
|
|
|
var promSeedersCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "chihaya_storage_seeders_count",
|
|
Help: "The number of seeders tracked",
|
|
})
|
|
|
|
var promLeechersCount = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Name: "chihaya_storage_leechers_count",
|
|
Help: "The number of leechers tracked",
|
|
})
|
|
|
|
// recordGCDuration records the duration of a GC sweep.
|
|
func recordGCDuration(duration time.Duration) {
|
|
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
|
|
}
|
|
|
|
type driver struct{}
|
|
|
|
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
|
|
// Marshal the config back into bytes.
|
|
bytes, err := yaml.Marshal(icfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Unmarshal the bytes into the proper config type.
|
|
var cfg Config
|
|
err = yaml.Unmarshal(bytes, &cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return New(cfg)
|
|
}
|
|
|
|
// Config holds the configuration of a memory PeerStore.
|
|
type Config struct {
|
|
GarbageCollectionInterval time.Duration `yaml:"gc_interval"`
|
|
PrometheusReportingInterval time.Duration `yaml:"prometheus_reporting_interval"`
|
|
PeerLifetime time.Duration `yaml:"peer_lifetime"`
|
|
ShardCount int `yaml:"shard_count"`
|
|
PreferredIPv4SubnetMaskBitsSet int `yaml:"preferred_ipv4_subnet_mask_bits_set"`
|
|
PreferredIPv6SubnetMaskBitsSet int `yaml:"preferred_ipv6_subnet_mask_bits_set"`
|
|
}
|
|
|
|
// LogFields renders the current config as a set of Logrus fields.
|
|
func (cfg Config) LogFields() log.Fields {
|
|
return log.Fields{
|
|
"name": Name,
|
|
"gcInterval": cfg.GarbageCollectionInterval,
|
|
"promReportInterval": cfg.PrometheusReportingInterval,
|
|
"peerLifetime": cfg.PeerLifetime,
|
|
"shardCount": cfg.ShardCount,
|
|
"prefIPv4Mask": cfg.PreferredIPv4SubnetMaskBitsSet,
|
|
"prefIPv6Mask": cfg.PreferredIPv6SubnetMaskBitsSet,
|
|
}
|
|
}
|
|
|
|
// Validate sanity checks values set in a config and returns a new config with
|
|
// default values replacing anything that is invalid.
|
|
//
|
|
// This function warns to the logger when a value is changed.
|
|
func (cfg Config) Validate() Config {
|
|
validcfg := cfg
|
|
if cfg.ShardCount > 0 {
|
|
validcfg.ShardCount = cfg.ShardCount
|
|
} else {
|
|
validcfg.ShardCount = 1024
|
|
log.WithFields(log.Fields{
|
|
"name": Name + ".ShardCount",
|
|
"provided": strconv.Itoa(cfg.ShardCount),
|
|
"default": strconv.Itoa(validcfg.ShardCount),
|
|
}).Warnln("falling back to default configuration")
|
|
}
|
|
|
|
if cfg.GarbageCollectionInterval <= 0 {
|
|
validcfg.GarbageCollectionInterval = time.Minute * 14
|
|
log.WithFields(log.Fields{
|
|
"name": Name + ".GarbageCollectionInterval",
|
|
"provided": cfg.GarbageCollectionInterval,
|
|
"default": validcfg.GarbageCollectionInterval,
|
|
}).Warnln("falling back to default configuration")
|
|
}
|
|
|
|
if cfg.PrometheusReportingInterval <= 0 {
|
|
validcfg.PrometheusReportingInterval = time.Second * 1
|
|
log.WithFields(log.Fields{
|
|
"name": Name + ".PrometheusReportingInterval",
|
|
"provided": cfg.PrometheusReportingInterval,
|
|
"default": validcfg.PrometheusReportingInterval,
|
|
}).Warnln("falling back to default configuration")
|
|
}
|
|
|
|
return validcfg
|
|
}
|
|
|
|
// New creates a new PeerStore backed by memory that organizes peers by a
|
|
// pre-configured subnet mask.
|
|
func New(provided Config) (storage.PeerStore, error) {
|
|
cfg := provided.Validate()
|
|
ps := &peerStore{
|
|
cfg: cfg,
|
|
ipv4Mask: net.CIDRMask(cfg.PreferredIPv4SubnetMaskBitsSet, 32),
|
|
ipv6Mask: net.CIDRMask(cfg.PreferredIPv6SubnetMaskBitsSet, 128),
|
|
shards: make([]*peerShard, cfg.ShardCount*2),
|
|
closed: make(chan struct{}),
|
|
}
|
|
|
|
for i := 0; i < cfg.ShardCount*2; i++ {
|
|
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
|
|
}
|
|
|
|
// Start a goroutine for garbage collection.
|
|
ps.wg.Add(1)
|
|
go func() {
|
|
defer ps.wg.Done()
|
|
for {
|
|
select {
|
|
case <-ps.closed:
|
|
return
|
|
case <-time.After(cfg.GarbageCollectionInterval):
|
|
before := time.Now().Add(-cfg.PeerLifetime)
|
|
log.Debugln("memory: purging peers with no announces since", before)
|
|
ps.collectGarbage(before)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start a goroutine for updating our cached system clock.
|
|
ps.wg.Add(1)
|
|
go func() {
|
|
defer ps.wg.Done()
|
|
t := time.NewTicker(1 * time.Second)
|
|
for {
|
|
select {
|
|
case <-ps.closed:
|
|
t.Stop()
|
|
return
|
|
case now := <-t.C:
|
|
ps.setClock(now.UnixNano())
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start a goroutine for reporting statistics to Prometheus.
|
|
ps.wg.Add(1)
|
|
go func() {
|
|
defer ps.wg.Done()
|
|
t := time.NewTicker(cfg.PrometheusReportingInterval)
|
|
for {
|
|
select {
|
|
case <-ps.closed:
|
|
t.Stop()
|
|
return
|
|
case <-t.C:
|
|
before := time.Now()
|
|
ps.populateProm()
|
|
log.Debugf("memory: populateProm() took %s", time.Since(before))
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ps, nil
|
|
}
|
|
|
|
type serializedPeer string
|
|
|
|
func newPeerKey(p bittorrent.Peer) serializedPeer {
|
|
b := make([]byte, 20+2+len(p.IP.IP))
|
|
copy(b[:20], p.ID[:])
|
|
binary.BigEndian.PutUint16(b[20:22], p.Port)
|
|
copy(b[22:], p.IP.IP)
|
|
|
|
return serializedPeer(b)
|
|
}
|
|
|
|
func decodePeerKey(pk serializedPeer) bittorrent.Peer {
|
|
peer := bittorrent.Peer{
|
|
ID: bittorrent.PeerIDFromString(string(pk[:20])),
|
|
Port: binary.BigEndian.Uint16([]byte(pk[20:22])),
|
|
IP: bittorrent.IP{IP: net.IP(pk[22:])}}
|
|
|
|
if ip := peer.IP.To4(); ip != nil {
|
|
peer.IP.IP = ip
|
|
peer.IP.AddressFamily = bittorrent.IPv4
|
|
} else if len(peer.IP.IP) == net.IPv6len { // implies toReturn.IP.To4() == nil
|
|
peer.IP.AddressFamily = bittorrent.IPv6
|
|
} else {
|
|
panic("IP is neither v4 nor v6")
|
|
}
|
|
|
|
return peer
|
|
}
|
|
|
|
type peerSubnet string
|
|
|
|
func newPeerSubnet(ip bittorrent.IP, ipv4Mask, ipv6Mask net.IPMask) peerSubnet {
|
|
var maskedIP net.IP
|
|
switch ip.AddressFamily {
|
|
case bittorrent.IPv4:
|
|
maskedIP = ip.Mask(ipv4Mask)
|
|
case bittorrent.IPv6:
|
|
maskedIP = ip.Mask(ipv6Mask)
|
|
default:
|
|
panic("IP is neither v4 nor v6")
|
|
}
|
|
|
|
return peerSubnet(maskedIP.String())
|
|
}
|
|
|
|
type peerShard struct {
|
|
swarms map[bittorrent.InfoHash]swarm
|
|
numSeeders uint64
|
|
numLeechers uint64
|
|
sync.RWMutex
|
|
}
|
|
|
|
type swarm struct {
|
|
seeders map[peerSubnet]map[serializedPeer]int64
|
|
leechers map[peerSubnet]map[serializedPeer]int64
|
|
}
|
|
|
|
func (s swarm) lenSeeders() (i int) {
|
|
for _, subnet := range s.seeders {
|
|
i += len(subnet)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s swarm) lenLeechers() (i int) {
|
|
for _, subnet := range s.leechers {
|
|
i += len(subnet)
|
|
}
|
|
return
|
|
}
|
|
|
|
type peerStore struct {
|
|
cfg Config
|
|
ipv4Mask net.IPMask
|
|
ipv6Mask net.IPMask
|
|
shards []*peerShard
|
|
|
|
// clock stores the current time nanoseconds, updated every second.
|
|
// Must be accessed atomically!
|
|
clock int64
|
|
|
|
closed chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
var _ storage.PeerStore = &peerStore{}
|
|
|
|
// populateProm aggregates metrics over all shards and then posts them to
|
|
// prometheus.
|
|
func (ps *peerStore) populateProm() {
|
|
var numInfohashes, numSeeders, numLeechers uint64
|
|
|
|
for _, s := range ps.shards {
|
|
s.RLock()
|
|
numInfohashes += uint64(len(s.swarms))
|
|
numSeeders += s.numSeeders
|
|
numLeechers += s.numLeechers
|
|
s.RUnlock()
|
|
}
|
|
|
|
promInfohashesCount.Set(float64(numInfohashes))
|
|
promSeedersCount.Set(float64(numSeeders))
|
|
promLeechersCount.Set(float64(numLeechers))
|
|
}
|
|
|
|
func (ps *peerStore) getClock() int64 {
|
|
return atomic.LoadInt64(&ps.clock)
|
|
}
|
|
|
|
func (ps *peerStore) setClock(to int64) {
|
|
atomic.StoreInt64(&ps.clock, to)
|
|
}
|
|
|
|
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
|
|
// There are twice the amount of shards specified by the user, the first
|
|
// half is dedicated to IPv4 swarms and the second half is dedicated to
|
|
// IPv6 swarms.
|
|
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
|
|
if af == bittorrent.IPv6 {
|
|
idx += uint32(len(ps.shards) / 2)
|
|
}
|
|
return idx
|
|
}
|
|
|
|
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
|
select {
|
|
case <-ps.closed:
|
|
panic("attempted to interact with stopped memory store")
|
|
default:
|
|
}
|
|
|
|
pk := newPeerKey(p)
|
|
|
|
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
|
shard.Lock()
|
|
|
|
if _, ok := shard.swarms[ih]; !ok {
|
|
shard.swarms[ih] = swarm{
|
|
seeders: make(map[peerSubnet]map[serializedPeer]int64),
|
|
leechers: make(map[peerSubnet]map[serializedPeer]int64),
|
|
}
|
|
}
|
|
|
|
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
|
|
|
// Allocate a new map if necessary.
|
|
if shard.swarms[ih].seeders[preferredSubnet] == nil {
|
|
shard.swarms[ih].seeders[preferredSubnet] = make(map[serializedPeer]int64)
|
|
}
|
|
|
|
// If this peer isn't already a seeder, update the stats for the swarm.
|
|
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
|
|
shard.numSeeders++
|
|
}
|
|
|
|
// Update the peer in the swarm.
|
|
shard.swarms[ih].seeders[preferredSubnet][pk] = ps.getClock()
|
|
|
|
shard.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
|
select {
|
|
case <-ps.closed:
|
|
panic("attempted to interact with stopped memory store")
|
|
default:
|
|
}
|
|
|
|
pk := newPeerKey(p)
|
|
|
|
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
|
shard.Lock()
|
|
|
|
if _, ok := shard.swarms[ih]; !ok {
|
|
shard.Unlock()
|
|
return storage.ErrResourceDoesNotExist
|
|
}
|
|
|
|
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
|
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
|
|
shard.Unlock()
|
|
return storage.ErrResourceDoesNotExist
|
|
}
|
|
|
|
shard.numSeeders--
|
|
delete(shard.swarms[ih].seeders[preferredSubnet], pk)
|
|
|
|
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
|
|
delete(shard.swarms, ih)
|
|
} else if len(shard.swarms[ih].seeders[preferredSubnet]) == 0 {
|
|
delete(shard.swarms[ih].seeders, preferredSubnet)
|
|
}
|
|
|
|
shard.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
|
select {
|
|
case <-ps.closed:
|
|
panic("attempted to interact with stopped memory store")
|
|
default:
|
|
}
|
|
|
|
pk := newPeerKey(p)
|
|
|
|
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
|
shard.Lock()
|
|
|
|
if _, ok := shard.swarms[ih]; !ok {
|
|
shard.swarms[ih] = swarm{
|
|
seeders: make(map[peerSubnet]map[serializedPeer]int64),
|
|
leechers: make(map[peerSubnet]map[serializedPeer]int64),
|
|
}
|
|
}
|
|
|
|
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
|
|
|
// Allocate a new map if necessary.
|
|
if shard.swarms[ih].leechers[preferredSubnet] == nil {
|
|
shard.swarms[ih].leechers[preferredSubnet] = make(map[serializedPeer]int64)
|
|
}
|
|
|
|
// If this peer isn't already a seeder, update the stats for the swarm.
|
|
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; !ok {
|
|
shard.numLeechers++
|
|
}
|
|
|
|
// Update the peer in the swarm.
|
|
shard.swarms[ih].leechers[preferredSubnet][pk] = ps.getClock()
|
|
|
|
shard.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
|
select {
|
|
case <-ps.closed:
|
|
panic("attempted to interact with stopped memory store")
|
|
default:
|
|
}
|
|
|
|
pk := newPeerKey(p)
|
|
|
|
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
|
shard.Lock()
|
|
|
|
if _, ok := shard.swarms[ih]; !ok {
|
|
shard.Unlock()
|
|
return storage.ErrResourceDoesNotExist
|
|
}
|
|
|
|
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
|
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; !ok {
|
|
shard.Unlock()
|
|
return storage.ErrResourceDoesNotExist
|
|
}
|
|
|
|
shard.numLeechers--
|
|
delete(shard.swarms[ih].leechers[preferredSubnet], pk)
|
|
|
|
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
|
|
delete(shard.swarms, ih)
|
|
} else if len(shard.swarms[ih].leechers[preferredSubnet]) == 0 {
|
|
delete(shard.swarms[ih].leechers, preferredSubnet)
|
|
}
|
|
|
|
shard.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
|
|
select {
|
|
case <-ps.closed:
|
|
panic("attempted to interact with stopped memory store")
|
|
default:
|
|
}
|
|
|
|
pk := newPeerKey(p)
|
|
|
|
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
|
|
shard.Lock()
|
|
|
|
if _, ok := shard.swarms[ih]; !ok {
|
|
shard.swarms[ih] = swarm{
|
|
seeders: make(map[peerSubnet]map[serializedPeer]int64),
|
|
leechers: make(map[peerSubnet]map[serializedPeer]int64),
|
|
}
|
|
}
|
|
|
|
// If this peer is a leecher, update the stats for the swarm and remove them.
|
|
preferredSubnet := newPeerSubnet(p.IP, ps.ipv4Mask, ps.ipv6Mask)
|
|
if _, ok := shard.swarms[ih].leechers[preferredSubnet][pk]; ok {
|
|
shard.numLeechers--
|
|
delete(shard.swarms[ih].leechers[preferredSubnet], pk)
|
|
}
|
|
|
|
// Allocate a new map if necessary.
|
|
if shard.swarms[ih].seeders[preferredSubnet] == nil {
|
|
shard.swarms[ih].seeders[preferredSubnet] = make(map[serializedPeer]int64)
|
|
}
|
|
|
|
// If this peer isn't already a seeder, update the stats for the swarm.
|
|
if _, ok := shard.swarms[ih].seeders[preferredSubnet][pk]; !ok {
|
|
shard.numSeeders++
|
|
}
|
|
|
|
// Update the peer in the swarm.
|
|
shard.swarms[ih].seeders[preferredSubnet][pk] = ps.getClock()
|
|
|
|
shard.Unlock()
|
|
return nil
|
|
}
|
|
|
|
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
|
|
select {
|
|
case <-ps.closed:
|
|
panic("attempted to interact with stopped memory store")
|
|
default:
|
|
}
|
|
|
|
shard := ps.shards[ps.shardIndex(ih, announcer.IP.AddressFamily)]
|
|
shard.RLock()
|
|
|
|
if _, ok := shard.swarms[ih]; !ok {
|
|
shard.RUnlock()
|
|
return nil, storage.ErrResourceDoesNotExist
|
|
}
|
|
|
|
preferredSubnet := newPeerSubnet(announcer.IP, ps.ipv4Mask, ps.ipv6Mask)
|
|
|
|
if seeder {
|
|
// Append as many close leechers as possible.
|
|
closestLeechers := shard.swarms[ih].leechers[preferredSubnet]
|
|
for pk := range closestLeechers {
|
|
if numWant == 0 {
|
|
break
|
|
}
|
|
|
|
peers = append(peers, decodePeerKey(pk))
|
|
numWant--
|
|
}
|
|
|
|
// Append the rest of the leechers.
|
|
if numWant > 0 {
|
|
for subnet := range shard.swarms[ih].leechers {
|
|
// Already appended from this subnet explictly first.
|
|
if subnet == preferredSubnet {
|
|
continue
|
|
}
|
|
|
|
for pk := range shard.swarms[ih].leechers[subnet] {
|
|
if numWant == 0 {
|
|
break
|
|
}
|
|
|
|
peers = append(peers, decodePeerKey(pk))
|
|
numWant--
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// Append as many close seeders as possible.
|
|
closestSeeders := shard.swarms[ih].seeders[preferredSubnet]
|
|
for pk := range closestSeeders {
|
|
if numWant == 0 {
|
|
break
|
|
}
|
|
|
|
peers = append(peers, decodePeerKey(pk))
|
|
numWant--
|
|
}
|
|
|
|
// Append as many close leechers as possible.
|
|
if numWant > 0 {
|
|
closestLeechers := shard.swarms[ih].leechers[preferredSubnet]
|
|
announcerPK := newPeerKey(announcer)
|
|
for pk := range closestLeechers {
|
|
if pk == announcerPK {
|
|
continue
|
|
}
|
|
|
|
if numWant == 0 {
|
|
break
|
|
}
|
|
|
|
peers = append(peers, decodePeerKey(pk))
|
|
numWant--
|
|
}
|
|
}
|
|
|
|
// Append as the rest of the seeders.
|
|
if numWant > 0 {
|
|
for subnet := range shard.swarms[ih].seeders {
|
|
// Already appended from this subnet explictly first.
|
|
if subnet == preferredSubnet {
|
|
continue
|
|
}
|
|
|
|
for pk := range shard.swarms[ih].seeders[subnet] {
|
|
if numWant == 0 {
|
|
break
|
|
}
|
|
|
|
peers = append(peers, decodePeerKey(pk))
|
|
numWant--
|
|
}
|
|
}
|
|
}
|
|
|
|
// Append the rest of the leechers.
|
|
if numWant > 0 {
|
|
for subnet := range shard.swarms[ih].leechers {
|
|
// Already appended from this subnet explictly first.
|
|
if subnet == preferredSubnet {
|
|
continue
|
|
}
|
|
|
|
for pk := range shard.swarms[ih].leechers[subnet] {
|
|
if numWant == 0 {
|
|
break
|
|
}
|
|
|
|
peers = append(peers, decodePeerKey(pk))
|
|
numWant--
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
shard.RUnlock()
|
|
return
|
|
}
|
|
|
|
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
|
|
select {
|
|
case <-ps.closed:
|
|
panic("attempted to interact with stopped memory store")
|
|
default:
|
|
}
|
|
|
|
resp.InfoHash = ih
|
|
shard := ps.shards[ps.shardIndex(ih, addressFamily)]
|
|
shard.RLock()
|
|
|
|
if _, ok := shard.swarms[ih]; !ok {
|
|
shard.RUnlock()
|
|
return
|
|
}
|
|
|
|
resp.Incomplete = uint32(shard.swarms[ih].lenLeechers())
|
|
resp.Complete = uint32(shard.swarms[ih].lenSeeders())
|
|
shard.RUnlock()
|
|
|
|
return
|
|
}
|
|
|
|
// collectGarbage deletes all Peers from the PeerStore which are older than the
|
|
// cutoff time.
|
|
//
|
|
// This function must be able to execute while other methods on this interface
|
|
// are being executed in parallel.
|
|
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
|
|
select {
|
|
case <-ps.closed:
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
cutoffUnix := cutoff.UnixNano()
|
|
start := time.Now()
|
|
|
|
for _, shard := range ps.shards {
|
|
shard.RLock()
|
|
var infohashes []bittorrent.InfoHash
|
|
for ih := range shard.swarms {
|
|
infohashes = append(infohashes, ih)
|
|
}
|
|
shard.RUnlock()
|
|
runtime.Gosched()
|
|
|
|
for _, ih := range infohashes {
|
|
shard.Lock()
|
|
|
|
if _, stillExists := shard.swarms[ih]; !stillExists {
|
|
shard.Unlock()
|
|
runtime.Gosched()
|
|
continue
|
|
}
|
|
|
|
for subnet := range shard.swarms[ih].leechers {
|
|
for pk, mtime := range shard.swarms[ih].leechers[subnet] {
|
|
if mtime <= cutoffUnix {
|
|
shard.numLeechers--
|
|
delete(shard.swarms[ih].leechers[subnet], pk)
|
|
}
|
|
}
|
|
|
|
if len(shard.swarms[ih].leechers[subnet]) == 0 {
|
|
delete(shard.swarms[ih].leechers, subnet)
|
|
}
|
|
}
|
|
|
|
for subnet := range shard.swarms[ih].seeders {
|
|
for pk, mtime := range shard.swarms[ih].seeders[subnet] {
|
|
if mtime <= cutoffUnix {
|
|
shard.numSeeders--
|
|
delete(shard.swarms[ih].seeders[subnet], pk)
|
|
}
|
|
}
|
|
|
|
if len(shard.swarms[ih].seeders[subnet]) == 0 {
|
|
delete(shard.swarms[ih].seeders, subnet)
|
|
}
|
|
}
|
|
|
|
if shard.swarms[ih].lenSeeders()|shard.swarms[ih].lenLeechers() == 0 {
|
|
delete(shard.swarms, ih)
|
|
}
|
|
|
|
shard.Unlock()
|
|
runtime.Gosched()
|
|
}
|
|
|
|
runtime.Gosched()
|
|
}
|
|
|
|
recordGCDuration(time.Since(start))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ps *peerStore) Stop() <-chan error {
|
|
c := make(chan error)
|
|
go func() {
|
|
close(ps.closed)
|
|
ps.wg.Wait()
|
|
|
|
// Explicitly deallocate our storage.
|
|
shards := make([]*peerShard, len(ps.shards))
|
|
for i := 0; i < len(ps.shards); i++ {
|
|
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
|
|
}
|
|
ps.shards = shards
|
|
|
|
close(c)
|
|
}()
|
|
|
|
return c
|
|
}
|
|
|
|
func (ps *peerStore) LogFields() log.Fields {
|
|
return ps.cfg.LogFields()
|
|
}
|