storage: enforce all peer stores are loggable
This commit is contained in:
parent
ed69a0893e
commit
7786e1a915
4 changed files with 54 additions and 13 deletions
|
@ -56,11 +56,11 @@ func (r *Run) Start(ps storage.PeerStore) error {
|
|||
r.sg.Add(prometheus.NewServer(cfg.PrometheusAddr))
|
||||
|
||||
if ps == nil {
|
||||
log.WithFields(cfg.Storage.LogFields()).Info("starting storage")
|
||||
ps, err = storage.NewPeerStore(cfg.Storage.Name, cfg.Storage.Config)
|
||||
if err != nil {
|
||||
return errors.New("failed to create memory storage: " + err.Error())
|
||||
}
|
||||
log.WithFields(ps.LogFields()).Info("started storage")
|
||||
}
|
||||
r.peerStore = ps
|
||||
|
||||
|
|
|
@ -17,6 +17,9 @@ import (
|
|||
"github.com/chihaya/chihaya/storage"
|
||||
)
|
||||
|
||||
// Name is the name by which this peer store is registered with Chihaya.
|
||||
const Name = "memory"
|
||||
|
||||
func init() {
|
||||
// Register Prometheus metrics.
|
||||
prometheus.MustRegister(promGCDurationMilliseconds)
|
||||
|
@ -24,7 +27,7 @@ func init() {
|
|||
prometheus.MustRegister(promSeedersCount, promLeechersCount)
|
||||
|
||||
// Register the storage driver.
|
||||
storage.RegisterDriver("memory", driver{})
|
||||
storage.RegisterDriver(Name, driver{})
|
||||
}
|
||||
|
||||
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
|
@ -92,9 +95,11 @@ type Config struct {
|
|||
// LogFields renders the current config as a set of Logrus fields.
|
||||
func (cfg Config) LogFields() log.Fields {
|
||||
return log.Fields{
|
||||
"gcInterval": cfg.GarbageCollectionInterval,
|
||||
"peerLifetime": cfg.PeerLifetime,
|
||||
"shardCount": cfg.ShardCount,
|
||||
"name": Name,
|
||||
"gcInterval": cfg.GarbageCollectionInterval,
|
||||
"promReportInterval": cfg.PrometheusReportingInterval,
|
||||
"peerLifetime": cfg.PeerLifetime,
|
||||
"shardCount": cfg.ShardCount,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -113,6 +118,7 @@ func New(cfg Config) (storage.PeerStore, error) {
|
|||
}
|
||||
|
||||
ps := &peerStore{
|
||||
cfg: cfg,
|
||||
shards: make([]*peerShard, shardCount*2),
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
|
@ -191,12 +197,15 @@ type swarm struct {
|
|||
}
|
||||
|
||||
type peerStore struct {
|
||||
shards []*peerShard
|
||||
closing chan struct{}
|
||||
cfg Config
|
||||
shards []*peerShard
|
||||
|
||||
// clock stores the current time nanoseconds, updated every second.
|
||||
// Must be accessed atomically!
|
||||
clock int64
|
||||
wg sync.WaitGroup
|
||||
|
||||
closing chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// populateProm aggregates metrics over all shards and then posts them to
|
||||
|
@ -595,3 +604,7 @@ func (ps *peerStore) Stop() <-chan error {
|
|||
|
||||
return c
|
||||
}
|
||||
|
||||
func (ps *peerStore) LogFields() log.Fields {
|
||||
return ps.cfg.LogFields()
|
||||
}
|
||||
|
|
|
@ -19,12 +19,15 @@ import (
|
|||
"github.com/chihaya/chihaya/storage"
|
||||
)
|
||||
|
||||
// Name is the name by which this peer store is registered with Chihaya.
|
||||
const Name = "memorybysubnet"
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(promGCDurationMilliseconds)
|
||||
prometheus.MustRegister(promInfohashesCount)
|
||||
|
||||
// Register the storage driver.
|
||||
storage.RegisterDriver("memorybysubnet", driver{})
|
||||
storage.RegisterDriver(Name, driver{})
|
||||
}
|
||||
|
||||
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
|
@ -80,6 +83,18 @@ type Config struct {
|
|||
PreferredIPv6SubnetMaskBitsSet int `yaml:"preferred_ipv6_subnet_mask_bits_set"`
|
||||
}
|
||||
|
||||
// LogFields renders the current config as a set of Logrus fields.
|
||||
func (cfg Config) LogFields() log.Fields {
|
||||
return log.Fields{
|
||||
"name": Name,
|
||||
"gcInterval": cfg.GarbageCollectionInterval,
|
||||
"peerLifetime": cfg.PeerLifetime,
|
||||
"shardCount": cfg.ShardCount,
|
||||
"prefIPv4Mask": cfg.PreferredIPv4SubnetMaskBitsSet,
|
||||
"prefIPv6Mask": cfg.PreferredIPv6SubnetMaskBitsSet,
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new PeerStore backed by memory.
|
||||
func New(cfg Config) (storage.PeerStore, error) {
|
||||
shardCount := 1
|
||||
|
@ -92,10 +107,11 @@ func New(cfg Config) (storage.PeerStore, error) {
|
|||
}
|
||||
|
||||
ps := &peerStore{
|
||||
shards: make([]*peerShard, shardCount*2),
|
||||
closed: make(chan struct{}),
|
||||
cfg: cfg,
|
||||
ipv4Mask: net.CIDRMask(cfg.PreferredIPv4SubnetMaskBitsSet, 32),
|
||||
ipv6Mask: net.CIDRMask(cfg.PreferredIPv6SubnetMaskBitsSet, 128),
|
||||
shards: make([]*peerShard, shardCount*2),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
for i := 0; i < shardCount*2; i++ {
|
||||
|
@ -170,10 +186,12 @@ func (s swarm) lenLeechers() (i int) {
|
|||
}
|
||||
|
||||
type peerStore struct {
|
||||
shards []*peerShard
|
||||
closed chan struct{}
|
||||
cfg Config
|
||||
ipv4Mask net.IPMask
|
||||
ipv6Mask net.IPMask
|
||||
shards []*peerShard
|
||||
|
||||
closed chan struct{}
|
||||
}
|
||||
|
||||
var _ storage.PeerStore = &peerStore{}
|
||||
|
@ -597,3 +615,7 @@ func (s *peerStore) Stop() <-chan error {
|
|||
}()
|
||||
return toReturn
|
||||
}
|
||||
|
||||
func (s *peerStore) LogFields() log.Fields {
|
||||
return s.cfg.LogFields()
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"errors"
|
||||
"sync"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
|
||||
"github.com/chihaya/chihaya/bittorrent"
|
||||
"github.com/chihaya/chihaya/pkg/stop"
|
||||
)
|
||||
|
@ -86,6 +88,10 @@ type PeerStore interface {
|
|||
// PeerStore.
|
||||
// For more details see the documentation in the stop package.
|
||||
stop.Stopper
|
||||
|
||||
// LogFields returns a loggable version of the data used to configure and
|
||||
// operate a particular peer store.
|
||||
LogFields() log.Fields
|
||||
}
|
||||
|
||||
// RegisterDriver makes a Driver available by the provided name.
|
||||
|
|
Loading…
Reference in a new issue