storage: dynamically register drivers

This commit is contained in:
Jimmy Zelinskie 2017-02-21 00:58:57 -05:00
parent 6fc3f618aa
commit 496cc1a31d
6 changed files with 129 additions and 12 deletions

View file

@ -13,7 +13,10 @@ import (
"github.com/chihaya/chihaya/middleware/clientapproval"
"github.com/chihaya/chihaya/middleware/jwt"
"github.com/chihaya/chihaya/middleware/varinterval"
"github.com/chihaya/chihaya/storage/memory"
// Imported to register as Storage Drivers.
_ "github.com/chihaya/chihaya/storage/memory"
_ "github.com/chihaya/chihaya/storage/memorybysubnet"
)
type hookConfig struct {
@ -33,13 +36,18 @@ func (hookCfgs hookConfigs) Names() (hookNames []string) {
return
}
type storageConfig struct {
Name string `yaml:"name"`
Config interface{} `yaml:"config"`
}
// Config represents the configuration used for executing Chihaya.
type Config struct {
middleware.Config `yaml:",inline"`
PrometheusAddr string `yaml:"prometheus_addr"`
HTTPConfig httpfrontend.Config `yaml:"http"`
UDPConfig udpfrontend.Config `yaml:"udp"`
Storage memory.Config `yaml:"storage"`
Storage storageConfig `yaml:"storage"`
PreHooks hookConfigs `yaml:"prehooks"`
PostHooks hookConfigs `yaml:"posthooks"`
}

View file

@ -17,7 +17,6 @@ import (
"github.com/chihaya/chihaya/pkg/prometheus"
"github.com/chihaya/chihaya/pkg/stop"
"github.com/chihaya/chihaya/storage"
"github.com/chihaya/chihaya/storage/memory"
)
// Run represents the state of a running instance of Chihaya.
@ -54,7 +53,7 @@ func (r *Run) Start(ps storage.PeerStore) error {
if ps == nil {
log.WithFields(cfg.Storage.LogFields()).Info("starting storage")
ps, err = memory.New(cfg.Storage)
ps, err = storage.NewPeerStore(cfg.Storage.Name, cfg.Storage.Config)
if err != nil {
return errors.New("failed to create memory storage: " + err.Error())
}

View file

@ -67,16 +67,18 @@ chihaya:
# This block defines configuration used for the storage of peer data.
storage:
# The frequency which stale peers are removed.
gc_interval: 14m
name: memory
config:
# The frequency which stale peers are removed.
gc_interval: 14m
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 16m
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 16m
# The number of partitions data will be divided into in order to provide a
# higher degree of parallelism.
shard_count: 1024
# The number of partitions data will be divided into in order to provide a
# higher degree of parallelism.
shards: 1024
# The interval at which metrics about the number of infohashes and peers
# are collected and posted to Prometheus.

View file

@ -11,15 +11,20 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/storage"
)
func init() {
// Register Prometheus metrics.
prometheus.MustRegister(promGCDurationMilliseconds)
prometheus.MustRegister(promInfohashesCount)
prometheus.MustRegister(promSeedersCount, promLeechersCount)
// Register the storage driver.
storage.RegisterDriver("memory", driver{})
}
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
@ -48,6 +53,30 @@ func recordGCDuration(duration time.Duration) {
promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}
// recordInfohashesDelta records a change in the number of Infohashes tracked.
func recordInfohashesDelta(delta float64) {
promInfohashesCount.Add(delta)
}
type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
return nil, err
}
// Unmarshal the bytes into the proper config type.
var cfg Config
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
return nil, err
}
return New(cfg)
}
// ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is
// less than or equal to zero.
var ErrInvalidGCInterval = errors.New("invalid garbage collection interval")

View file

@ -13,6 +13,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/storage"
@ -21,6 +22,9 @@ import (
func init() {
prometheus.MustRegister(promGCDurationMilliseconds)
prometheus.MustRegister(promInfohashesCount)
// Register the storage driver.
storage.RegisterDriver("memorybysubnet", driver{})
}
var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{
@ -44,6 +48,25 @@ func recordInfohashesDelta(delta float64) {
promInfohashesCount.Add(delta)
}
type driver struct{}
func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) {
// Marshal the config back into bytes.
bytes, err := yaml.Marshal(icfg)
if err != nil {
return nil, err
}
// Unmarshal the bytes into the proper config type.
var cfg Config
err = yaml.Unmarshal(bytes, &cfg)
if err != nil {
return nil, err
}
return New(cfg)
}
// ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is
// less than or equal to zero.
var ErrInvalidGCInterval = errors.New("invalid garbage collection interval")

View file

@ -1,14 +1,31 @@
package storage
import (
"errors"
"sync"
"github.com/chihaya/chihaya/bittorrent"
"github.com/chihaya/chihaya/pkg/stop"
)
var (
driversM sync.RWMutex
drivers = make(map[string]Driver)
)
// Driver is the interface used to initalize a new type of PeerStore.
type Driver interface {
NewPeerStore(cfg interface{}) (PeerStore, error)
}
// ErrResourceDoesNotExist is the error returned by all delete methods in the
// store if the requested resource does not exist.
var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist")
// ErrDriverDoesNotExist is the error returned by NewPeerStore when a peer
// store driver with that name does not exist.
var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist")
// PeerStore is an interface that abstracts the interactions of storing and
// manipulating Peers such that it can be implemented for various data stores.
type PeerStore interface {
@ -70,3 +87,42 @@ type PeerStore interface {
// For more details see the documentation in the stop package.
stop.Stopper
}
// RegisterDriver makes a Driver available by the provided name.
//
// If called twice with the same name, the name is blank, or if the provided
// Driver is nil, this function panics.
func RegisterDriver(name string, d Driver) {
if name == "" {
panic("storage: could not register a Driver with an empty name")
}
if d == nil {
panic("storage: could not register a nil Driver")
}
driversM.Lock()
defer driversM.Unlock()
if _, dup := drivers[name]; dup {
panic("storage: RegisterDriver called twice for " + name)
}
drivers[name] = d
}
// NewPeerStore attempts to initialize a new PeerStore with given a name from
// the list of registered Drivers.
//
// If a driver does not exist, returns ErrDriverDoesNotExist.
func NewPeerStore(name string, cfg interface{}) (ps PeerStore, err error) {
driversM.RLock()
defer driversM.RUnlock()
var d Driver
d, ok := drivers[name]
if !ok {
return nil, ErrDriverDoesNotExist
}
return d.NewPeerStore(cfg)
}