diff --git a/cmd/reflector.go b/cmd/reflector.go index 19802f0..e411781 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -25,6 +25,11 @@ var peerPort int var quicPeerPort int var reflectorPort int var metricsPort int +var disableUploads bool +var reflectorServerAddress string +var reflectorServerPort string +var reflectorServerProtocol string +var useDB bool func init() { var cmd = &cobra.Command{ @@ -33,25 +38,40 @@ func init() { Run: reflectorCmd, } cmd.Flags().StringVar(&reflectorCmdCacheDir, "cache", "", "Enable disk cache for blobs. Store them in this directory") + cmd.Flags().StringVar(&reflectorServerAddress, "reflector-server-address", "", "address of another reflector server where blobs are fetched from") + cmd.Flags().StringVar(&reflectorServerPort, "reflector-server-port", "5567", "port of another reflector server where blobs are fetched from") + cmd.Flags().StringVar(&reflectorServerProtocol, "reflector-server-protocol", "tcp", "protocol used to fetch blobs from another reflector server (tcp/udp)") cmd.Flags().IntVar(&peerPort, "peer-port", 5567, "The port reflector will distribute content from") cmd.Flags().IntVar(&quicPeerPort, "quic-peer-port", 5568, "The port reflector will distribute content from over QUIC protocol") cmd.Flags().IntVar(&reflectorPort, "reflector-port", 5566, "The port reflector will receive content from") cmd.Flags().IntVar(&metricsPort, "metrics-port", 2112, "The port reflector will use for metrics") + cmd.Flags().BoolVar(&disableUploads, "disable-uploads", false, "Disable uploads to this reflector server") + cmd.Flags().BoolVar(&useDB, "use-db", true, "whether to connect to the reflector db or not") rootCmd.AddCommand(cmd) } func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector version %s, built %s", meta.Version, meta.BuildTime.Format(time.RFC3339)) - // flip this flag to false when doing db maintenance. uploads will not work (as reflector server wont be running) - // but downloads will still work straight from s3 - useDB := true - - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + var blobStore store.BlobStore + if reflectorServerAddress != "" { + switch reflectorServerProtocol { + case "tcp": + blobStore = peer.NewStore(peer.StoreOpts{ + Address: reflectorServerAddress + ":" + reflectorServerPort, + Timeout: 30 * time.Second, + }) + case "udp": + blobStore = quic.NewStore(quic.StoreOpts{ + Address: reflectorServerAddress + ":" + reflectorServerPort, + Timeout: 30 * time.Second, + }) + } + } else { + blobStore = store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) + } var err error - - var blobStore store.BlobStore = s3 var reflectorServer *reflector.Server if useDB { @@ -61,7 +81,7 @@ func reflectorCmd(cmd *cobra.Command, args []string) { log.Fatal(err) } - blobStore = store.NewDBBackedStore(s3, db) + blobStore = store.NewDBBackedStore(blobStore, db) reflectorServer = reflector.NewServer(blobStore) reflectorServer.Timeout = 3 * time.Minute @@ -82,18 +102,18 @@ func reflectorCmd(cmd *cobra.Command, args []string) { } peerServer := peer.NewServer(blobStore) - err = peerServer.Start(":"+ strconv.Itoa(peerPort)) + err = peerServer.Start(":" + strconv.Itoa(peerPort)) if err != nil { log.Fatal(err) } quicPeerServer := quic.NewServer(blobStore) - err = quicPeerServer.Start(":"+ strconv.Itoa(quicPeerPort)) + err = quicPeerServer.Start(":" + strconv.Itoa(quicPeerPort)) if err != nil { log.Fatal(err) } - metricsServer := metrics.NewServer(":" + strconv.Itoa(metricsPort), "/metrics") + metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() interruptChan := make(chan os.Signal, 1) diff --git a/store/disk.go b/store/disk.go index b7dea85..18f7bd7 100644 --- a/store/disk.go +++ b/store/disk.go @@ -4,6 +4,7 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "sort" "syscall" "time" @@ -191,10 +192,10 @@ func (d *DiskBlobStore) WipeOldestBlobs() (err error) { File *os.FileInfo FullPath string } - datedFiles := make([]datedFile, 0, 500) + datedFiles := make([]datedFile, 0, 5000) for _, dir := range dirs { if dir.IsDir() { - files, err := ioutil.ReadDir(d.blobDir + "/" + dir.Name()) + files, err := ioutil.ReadDir(filepath.Join(d.blobDir, dir.Name())) if err != nil { return err } @@ -203,7 +204,7 @@ func (d *DiskBlobStore) WipeOldestBlobs() (err error) { datedFiles = append(datedFiles, datedFile{ Atime: atime(file), File: &file, - FullPath: d.blobDir + "/" + dir.Name() + "/" + file.Name(), + FullPath: filepath.Join(d.blobDir, dir.Name(), file.Name()), }) } }