From 496cc1a31db1407883f84fa728d2808988655ecb Mon Sep 17 00:00:00 2001 From: Jimmy Zelinskie Date: Tue, 21 Feb 2017 00:58:57 -0500 Subject: [PATCH] storage: dynamically register drivers --- cmd/chihaya/config.go | 12 +++++- cmd/chihaya/main.go | 3 +- example_config.yaml | 18 +++++---- storage/memory/peer_store.go | 29 ++++++++++++++ storage/memorybysubnet/peer_store.go | 23 ++++++++++++ storage/storage.go | 56 ++++++++++++++++++++++++++++ 6 files changed, 129 insertions(+), 12 deletions(-) diff --git a/cmd/chihaya/config.go b/cmd/chihaya/config.go index cb0ae85..10a2300 100644 --- a/cmd/chihaya/config.go +++ b/cmd/chihaya/config.go @@ -13,7 +13,10 @@ import ( "github.com/chihaya/chihaya/middleware/clientapproval" "github.com/chihaya/chihaya/middleware/jwt" "github.com/chihaya/chihaya/middleware/varinterval" - "github.com/chihaya/chihaya/storage/memory" + + // Imported to register as Storage Drivers. + _ "github.com/chihaya/chihaya/storage/memory" + _ "github.com/chihaya/chihaya/storage/memorybysubnet" ) type hookConfig struct { @@ -33,13 +36,18 @@ func (hookCfgs hookConfigs) Names() (hookNames []string) { return } +type storageConfig struct { + Name string `yaml:"name"` + Config interface{} `yaml:"config"` +} + // Config represents the configuration used for executing Chihaya. type Config struct { middleware.Config `yaml:",inline"` PrometheusAddr string `yaml:"prometheus_addr"` HTTPConfig httpfrontend.Config `yaml:"http"` UDPConfig udpfrontend.Config `yaml:"udp"` - Storage memory.Config `yaml:"storage"` + Storage storageConfig `yaml:"storage"` PreHooks hookConfigs `yaml:"prehooks"` PostHooks hookConfigs `yaml:"posthooks"` } diff --git a/cmd/chihaya/main.go b/cmd/chihaya/main.go index 4e1a040..de38dbc 100644 --- a/cmd/chihaya/main.go +++ b/cmd/chihaya/main.go @@ -17,7 +17,6 @@ import ( "github.com/chihaya/chihaya/pkg/prometheus" "github.com/chihaya/chihaya/pkg/stop" "github.com/chihaya/chihaya/storage" - "github.com/chihaya/chihaya/storage/memory" ) // Run represents the state of a running instance of Chihaya. @@ -54,7 +53,7 @@ func (r *Run) Start(ps storage.PeerStore) error { if ps == nil { log.WithFields(cfg.Storage.LogFields()).Info("starting storage") - ps, err = memory.New(cfg.Storage) + ps, err = storage.NewPeerStore(cfg.Storage.Name, cfg.Storage.Config) if err != nil { return errors.New("failed to create memory storage: " + err.Error()) } diff --git a/example_config.yaml b/example_config.yaml index df80fc9..0303e4e 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -67,16 +67,18 @@ chihaya: # This block defines configuration used for the storage of peer data. storage: - # The frequency which stale peers are removed. - gc_interval: 14m + name: memory + config: + # The frequency which stale peers are removed. + gc_interval: 14m - # The amount of time until a peer is considered stale. - # To avoid churn, keep this slightly larger than `announce_interval` - peer_lifetime: 16m + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 16m - # The number of partitions data will be divided into in order to provide a - # higher degree of parallelism. - shard_count: 1024 + # The number of partitions data will be divided into in order to provide a + # higher degree of parallelism. + shards: 1024 # The interval at which metrics about the number of infohashes and peers # are collected and posted to Prometheus. diff --git a/storage/memory/peer_store.go b/storage/memory/peer_store.go index 4419cd5..ae61ee1 100644 --- a/storage/memory/peer_store.go +++ b/storage/memory/peer_store.go @@ -11,15 +11,20 @@ import ( log "github.com/Sirupsen/logrus" "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/storage" ) func init() { + // Register Prometheus metrics. prometheus.MustRegister(promGCDurationMilliseconds) prometheus.MustRegister(promInfohashesCount) prometheus.MustRegister(promSeedersCount, promLeechersCount) + + // Register the storage driver. + storage.RegisterDriver("memory", driver{}) } var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -48,6 +53,30 @@ func recordGCDuration(duration time.Duration) { promGCDurationMilliseconds.Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond)) } +// recordInfohashesDelta records a change in the number of Infohashes tracked. +func recordInfohashesDelta(delta float64) { + promInfohashesCount.Add(delta) +} + +type driver struct{} + +func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { + // Marshal the config back into bytes. + bytes, err := yaml.Marshal(icfg) + if err != nil { + return nil, err + } + + // Unmarshal the bytes into the proper config type. + var cfg Config + err = yaml.Unmarshal(bytes, &cfg) + if err != nil { + return nil, err + } + + return New(cfg) +} + // ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is // less than or equal to zero. var ErrInvalidGCInterval = errors.New("invalid garbage collection interval") diff --git a/storage/memorybysubnet/peer_store.go b/storage/memorybysubnet/peer_store.go index 35da7e8..6e3b2d8 100644 --- a/storage/memorybysubnet/peer_store.go +++ b/storage/memorybysubnet/peer_store.go @@ -13,6 +13,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/prometheus/client_golang/prometheus" + "gopkg.in/yaml.v2" "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/storage" @@ -21,6 +22,9 @@ import ( func init() { prometheus.MustRegister(promGCDurationMilliseconds) prometheus.MustRegister(promInfohashesCount) + + // Register the storage driver. + storage.RegisterDriver("memorybysubnet", driver{}) } var promGCDurationMilliseconds = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -44,6 +48,25 @@ func recordInfohashesDelta(delta float64) { promInfohashesCount.Add(delta) } +type driver struct{} + +func (d driver) NewPeerStore(icfg interface{}) (storage.PeerStore, error) { + // Marshal the config back into bytes. + bytes, err := yaml.Marshal(icfg) + if err != nil { + return nil, err + } + + // Unmarshal the bytes into the proper config type. + var cfg Config + err = yaml.Unmarshal(bytes, &cfg) + if err != nil { + return nil, err + } + + return New(cfg) +} + // ErrInvalidGCInterval is returned for a GarbageCollectionInterval that is // less than or equal to zero. var ErrInvalidGCInterval = errors.New("invalid garbage collection interval") diff --git a/storage/storage.go b/storage/storage.go index 1d3ed4c..8b63ee7 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,14 +1,31 @@ package storage import ( + "errors" + "sync" + "github.com/chihaya/chihaya/bittorrent" "github.com/chihaya/chihaya/pkg/stop" ) +var ( + driversM sync.RWMutex + drivers = make(map[string]Driver) +) + +// Driver is the interface used to initalize a new type of PeerStore. +type Driver interface { + NewPeerStore(cfg interface{}) (PeerStore, error) +} + // ErrResourceDoesNotExist is the error returned by all delete methods in the // store if the requested resource does not exist. var ErrResourceDoesNotExist = bittorrent.ClientError("resource does not exist") +// ErrDriverDoesNotExist is the error returned by NewPeerStore when a peer +// store driver with that name does not exist. +var ErrDriverDoesNotExist = errors.New("peer store driver with that name does not exist") + // PeerStore is an interface that abstracts the interactions of storing and // manipulating Peers such that it can be implemented for various data stores. type PeerStore interface { @@ -70,3 +87,42 @@ type PeerStore interface { // For more details see the documentation in the stop package. stop.Stopper } + +// RegisterDriver makes a Driver available by the provided name. +// +// If called twice with the same name, the name is blank, or if the provided +// Driver is nil, this function panics. +func RegisterDriver(name string, d Driver) { + if name == "" { + panic("storage: could not register a Driver with an empty name") + } + if d == nil { + panic("storage: could not register a nil Driver") + } + + driversM.Lock() + defer driversM.Unlock() + + if _, dup := drivers[name]; dup { + panic("storage: RegisterDriver called twice for " + name) + } + + drivers[name] = d +} + +// NewPeerStore attempts to initialize a new PeerStore with given a name from +// the list of registered Drivers. +// +// If a driver does not exist, returns ErrDriverDoesNotExist. +func NewPeerStore(name string, cfg interface{}) (ps PeerStore, err error) { + driversM.RLock() + defer driversM.RUnlock() + + var d Driver + d, ok := drivers[name] + if !ok { + return nil, ErrDriverDoesNotExist + } + + return d.NewPeerStore(cfg) +}