add other reflector store

add flags
improve disk cleanup
This commit is contained in:
Niko Storni 2020-05-09 03:06:51 +02:00
parent 1bf3cb81b3
commit 3ffe7a10c7
2 changed files with 35 additions and 14 deletions

View file

@ -25,6 +25,11 @@ var peerPort int
var quicPeerPort int var quicPeerPort int
var reflectorPort int var reflectorPort int
var metricsPort int var metricsPort int
var disableUploads bool
var reflectorServerAddress string
var reflectorServerPort string
var reflectorServerProtocol string
var useDB bool
func init() { func init() {
var cmd = &cobra.Command{ var cmd = &cobra.Command{
@ -33,25 +38,40 @@ func init() {
Run: reflectorCmd, Run: reflectorCmd,
} }
cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory") cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory")
cmd.Flags().StringVar(&reflectorServerAddress, "reflector-server-address", "", "address of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&reflectorServerPort, "reflector-server-port", "5567", "port of another reflector server where blobs are fetched from")
cmd.Flags().StringVar(&reflectorServerProtocol, "reflector-server-protocol", "tcp", "protocol used to fetch blobs from another reflector server (tcp/udp)")
cmd.Flags().IntVar(&peerPort, "peer-port", 5567, "The port reflector will distribute content from") cmd.Flags().IntVar(&peerPort, "peer-port", 5567, "The port reflector will distribute content from")
cmd.Flags().IntVar(&quicPeerPort, "quic-peer-port", 5568, "The port reflector will distribute content from over QUIC protocol") cmd.Flags().IntVar(&quicPeerPort, "quic-peer-port", 5568, "The port reflector will distribute content from over QUIC protocol")
cmd.Flags().IntVar(&reflectorPort, "reflector-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&reflectorPort, "reflector-port", 5566, "The port reflector will receive content from")
cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics")
cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server")
cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not")
rootCmd.AddCommand(cmd) rootCmd.AddCommand(cmd)
} }
func reflectorCmd(cmd *cobra.Command, args []string) { func reflectorCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339)) log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339))
// flip this flag to false when doing db maintenance. uploads will not work (as reflector server wont be running) var blobStore store.BlobStore
// but downloads will still work straight from s3 if reflectorServerAddress != "" {
useDB := true switch reflectorServerProtocol {
case "tcp":
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) blobStore = peer.NewStore(peer.StoreOpts{
Address: reflectorServerAddress + ":" + reflectorServerPort,
Timeout: 30 * time.Second,
})
case "udp":
blobStore = quic.NewStore(quic.StoreOpts{
Address: reflectorServerAddress + ":" + reflectorServerPort,
Timeout: 30 * time.Second,
})
}
} else {
blobStore = store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
}
var err error var err error
var blobStore store.BlobStore = s3
var reflectorServer *reflector.Server var reflectorServer *reflector.Server
if useDB { if useDB {
@ -61,7 +81,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
log.Fatal(err) log.Fatal(err)
} }
blobStore = store.NewDBBackedStore(s3, db) blobStore = store.NewDBBackedStore(blobStore, db)
reflectorServer = reflector.NewServer(blobStore) reflectorServer = reflector.NewServer(blobStore)
reflectorServer.Timeout = 3 * time.Minute reflectorServer.Timeout = 3 * time.Minute
@ -82,18 +102,18 @@ func reflectorCmd(cmd *cobra.Command, args []string) {
} }
peerServer := peer.NewServer(blobStore) peerServer := peer.NewServer(blobStore)
err = peerServer.Start(":"+ strconv.Itoa(peerPort)) err = peerServer.Start(":" + strconv.Itoa(peerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
quicPeerServer := quic.NewServer(blobStore) quicPeerServer := quic.NewServer(blobStore)
err = quicPeerServer.Start(":"+ strconv.Itoa(quicPeerPort)) err = quicPeerServer.Start(":" + strconv.Itoa(quicPeerPort))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
metricsServer := metrics.NewServer(":" + strconv.Itoa(metricsPort), "/metrics") metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics")
metricsServer.Start() metricsServer.Start()
interruptChan := make(chan os.Signal, 1) interruptChan := make(chan os.Signal, 1)

View file

@ -4,6 +4,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
"path/filepath"
"sort" "sort"
"syscall" "syscall"
"time" "time"
@ -191,10 +192,10 @@ func (d *DiskBlobStore) WipeOldestBlobs() (err error) {
File *os.FileInfo File *os.FileInfo
FullPath string FullPath string
} }
datedFiles := make([]datedFile, 0, 500) datedFiles := make([]datedFile, 0, 5000)
for _, dir := range dirs { for _, dir := range dirs {
if dir.IsDir() { if dir.IsDir() {
files, err := ioutil.ReadDir(d.blobDir + "/" + dir.Name()) files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name()))
if err != nil { if err != nil {
return err return err
} }
@ -203,7 +204,7 @@ func (d *DiskBlobStore) WipeOldestBlobs() (err error) {
datedFiles = append(datedFiles, datedFile{ datedFiles = append(datedFiles, datedFile{
Atime: atime(file), Atime: atime(file),
File: &file, File: &file,
FullPath: d.blobDir + "/" + dir.Name() + "/" + file.Name(), FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()),
}) })
} }
} }