diff --git a/cmd/upload.go b/cmd/upload.go index 49c5312..006f007 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -14,6 +14,7 @@ import ( var uploadWorkers int var uploadSkipExistsCheck bool +var uploadDeleteBlobsAfterUpload bool func init() { var cmd = &cobra.Command{ @@ -24,6 +25,7 @@ func init() { } cmd.PersistentFlags().IntVar(&uploadWorkers, "workers", 1, "How many worker threads to run at once") cmd.PersistentFlags().BoolVar(&uploadSkipExistsCheck, "skipExistsCheck", false, "Dont check if blobs exist before uploading") + cmd.PersistentFlags().BoolVar(&uploadDeleteBlobsAfterUpload, "deleteBlobsAfterUpload", false, "Delete blobs after uploading them") rootCmd.AddCommand(cmd) } @@ -36,7 +38,7 @@ func uploadCmd(cmd *cobra.Command, args []string) { store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), db) - uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck) + uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck, uploadDeleteBlobsAfterUpload) interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) diff --git a/reflector/uploader.go b/reflector/uploader.go index 48f9c10..ae762f4 100644 --- a/reflector/uploader.go +++ b/reflector/uploader.go @@ -29,24 +29,26 @@ type Summary struct { } type Uploader struct { - db *db.SQL - store *store.DBBackedStore // could just be store.BlobStore interface - workers int - skipExistsCheck bool - stopper *stop.Group - countChan chan increment + db *db.SQL + store *store.DBBackedStore // could just be store.BlobStore interface + workers int + skipExistsCheck bool + deleteBlobsAfterUpload bool + stopper *stop.Group + countChan chan increment count Summary } -func NewUploader(db *db.SQL, store *store.DBBackedStore, workers int, skipExistsCheck bool) *Uploader { +func NewUploader(db *db.SQL, store *store.DBBackedStore, workers int, skipExistsCheck, deleteBlobsAfterUpload bool) *Uploader { return &Uploader{ - db: db, - store: store, - workers: workers, - skipExistsCheck: skipExistsCheck, - stopper: stop.New(), - countChan: make(chan increment), + db: db, + store: store, + workers: workers, + skipExistsCheck: skipExistsCheck, + deleteBlobsAfterUpload: deleteBlobsAfterUpload, + stopper: stop.New(), + countChan: make(chan increment), } } @@ -126,7 +128,7 @@ Upload: return nil } -// worker reads paths from a channel and uploads them +// worker reads paths from a channel, uploads them, and optionally deletes them func (u *Uploader) worker(pathChan chan string) { for { select { @@ -140,6 +142,11 @@ func (u *Uploader) worker(pathChan chan string) { err := u.uploadBlob(filepath) if err != nil { log.Errorln(err) + } else if u.deleteBlobsAfterUpload { + err = os.Remove(filepath) + if err != nil { + log.Errorln(errors.Prefix("deleting blob", err)) + } } } } @@ -155,7 +162,7 @@ func (u *Uploader) uploadBlob(filepath string) (err error) { blob, err := ioutil.ReadFile(filepath) if err != nil { - return err + return errors.Err(err) } hash := BlobHash(blob)