From 39914c8ab8f41063304ff33d3c1d55f4508ee1fc Mon Sep 17 00:00:00 2001 From: Alex Grintsvayg Date: Tue, 15 May 2018 13:24:13 -0400 Subject: [PATCH] concurrency flags, fixes --- cmd/upload.go | 71 +++++++++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/cmd/upload.go b/cmd/upload.go index cb81b03..027947e 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -18,6 +18,8 @@ import ( "github.com/spf13/cobra" ) +var workers int + func init() { var cmd = &cobra.Command{ Use: "upload DIR", @@ -25,6 +27,7 @@ func init() { Args: cobra.ExactArgs(1), Run: uploadCmd, } + cmd.PersistentFlags().IntVar(&workers, "workers", 1, "How many worker threads to run at once") RootCmd.AddCommand(cmd) } @@ -54,11 +57,9 @@ func uploadCmd(cmd *cobra.Command, args []string) { var filenames []string for _, file := range files { - if file.IsDir() { - continue + if !file.IsDir() { + filenames = append(filenames, file.Name()) } - - filenames = append(filenames, file.Name()) } totalCount := len(filenames) @@ -74,34 +75,15 @@ func uploadCmd(cmd *cobra.Command, args []string) { sdCount := 0 blobCount := 0 - concurrency := 2 - wg := &sync.WaitGroup{} + workerWG := &sync.WaitGroup{} filenameChan := make(chan string) - sdChan := make(chan int) - blobChan := make(chan int) + counterWG := &sync.WaitGroup{} + countChan := make(chan bool) - go func() { - wg.Add(1) - defer wg.Done() - for { - select { - case <-stopper.Chan(): - return - case <-sdChan: - sdCount++ - case <-blobChan: - blobCount++ - } - if (sdCount+blobCount)%50 == 0 { - log.Printf("%d of %d done (%s elapsed)", sdCount+blobCount, totalCount-existsCount, time.Now().Sub(startTime).String()) - } - } - }() - - for i := 0; i < concurrency; i++ { + for i := 0; i < workers; i++ { go func(i int) { - wg.Add(1) - defer wg.Done() + workerWG.Add(1) + defer workerWG.Done() defer func(i int) { log.Printf("worker %d quitting", i) }(i) @@ -130,14 +112,14 @@ func uploadCmd(cmd *cobra.Command, args []string) { log.Printf("worker %d: PUTTING SD BLOB %s", i, hash) blobStore.PutSD(hash, blob) select { - case sdChan <- 1: + case countChan <- true: case <-stopper.Chan(): } } else { log.Printf("worker %d: putting %s", i, hash) blobStore.Put(hash, blob) select { - case blobChan <- 1: + case countChan <- false: case <-stopper.Chan(): } } @@ -146,6 +128,28 @@ func uploadCmd(cmd *cobra.Command, args []string) { }(i) } + go func() { + counterWG.Add(1) + defer counterWG.Done() + for { + select { + case <-stopper.Chan(): + return + case isSD, ok := <-countChan: + if !ok { + return + } else if isSD { + sdCount++ + } else { + blobCount++ + } + } + if (sdCount+blobCount)%50 == 0 { + log.Printf("%d of %d done (%s elapsed, %.2fs per blob)", sdCount+blobCount, totalCount-existsCount, time.Now().Sub(startTime).String(), time.Now().Sub(startTime).Seconds()/float64(sdCount+blobCount)) + } + } + }() + Upload: for _, filename := range filenames { if exists[filename] { @@ -161,10 +165,11 @@ Upload: } close(filenameChan) - wg.Wait() + workerWG.Wait() + close(countChan) + counterWG.Wait() stopper.Stop() - log.Println("") log.Println("SUMMARY") log.Printf("%d blobs total", totalCount) log.Printf("%d SD blobs uploaded", sdCount)