diff --git a/cmd/upload.go b/cmd/upload.go index f7df368..bbc3b97 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -1,45 +1,20 @@ package cmd import ( - "encoding/json" - "io/ioutil" "os" "os/signal" - "path" - "sync" "syscall" - "time" "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/peer" + "github.com/lbryio/reflector.go/reflector" "github.com/lbryio/reflector.go/store" - "github.com/lbryio/lbry.go/extras/stop" - - log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) var uploadWorkers int var uploadSkipExistsCheck bool -const ( - sdInc = 1 - blobInc = 2 - errInc = 3 -) - -type uploaderParams struct { - workerWG *sync.WaitGroup - counterWG *sync.WaitGroup - stopper *stop.Group - pathChan chan string - countChan chan int - sdCount int - blobCount int - errCount int -} - func init() { var cmd = &cobra.Command{ Use: "upload PATH", @@ -53,216 +28,23 @@ func init() { } func uploadCmd(cmd *cobra.Command, args []string) { - startTime := time.Now() db := new(db.SQL) err := db.Connect(globalConfig.DBConn) checkErr(err) - params := uploaderParams{ - workerWG: &sync.WaitGroup{}, - counterWG: &sync.WaitGroup{}, - pathChan: make(chan string), - countChan: make(chan int), - stopper: stop.New()} + st := store.NewDBBackedS3Store( + store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName), + db) - setInterrupt(params.stopper) + uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck) - paths, err := getPaths(args[0]) - checkErr(err) - - totalCount := len(paths) - - hashes := make([]string, len(paths)) - for i, p := range paths { - hashes[i] = path.Base(p) - } - - log.Println("checking for existing blobs") - - exists := make(map[string]bool) - if !uploadSkipExistsCheck { - exists, err = db.HasBlobs(hashes) - checkErr(err) - } - existsCount := len(exists) - - log.Printf("%d new blobs to upload", totalCount-existsCount) - - startUploadWorkers(¶ms) - params.counterWG.Add(1) - go func() { - defer params.counterWG.Done() - runCountReceiver(¶ms, startTime, totalCount, existsCount) - }() - -Upload: - for _, f := range paths { - if exists[path.Base(f)] { - continue - } - - select { - case params.pathChan <- f: - case <-params.stopper.Ch(): - log.Warnln("Caught interrupt, quitting at first opportunity...") - break Upload - } - } - - close(params.pathChan) - params.workerWG.Wait() - close(params.countChan) - params.counterWG.Wait() - params.stopper.Stop() - - log.Println("SUMMARY") - log.Printf("%d blobs total", totalCount) - log.Printf("%d SD blobs uploaded", params.sdCount) - log.Printf("%d content blobs uploaded", params.blobCount) - log.Printf("%d blobs already stored", existsCount) - log.Printf("%d errors encountered", params.errCount) -} - -func isJSON(data []byte) bool { - var js json.RawMessage - return json.Unmarshal(data, &js) == nil -} - -func newBlobStore() *store.DBBackedS3Store { - db := new(db.SQL) - err := db.Connect(globalConfig.DBConn) - checkErr(err) - - s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName) - return store.NewDBBackedS3Store(s3, db) -} - -func setInterrupt(stopper *stop.Group) { interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) go func() { <-interruptChan - stopper.Stop() + uploader.Stop() }() -} - -func startUploadWorkers(params *uploaderParams) { - for i := 0; i < uploadWorkers; i++ { - params.workerWG.Add(1) - go func(i int) { - defer params.workerWG.Done() - defer func(i int) { - log.Printf("worker %d quitting", i) - }(i) - - blobStore := newBlobStore() - launchFileUploader(params, blobStore, i) - }(i) - } -} - -func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, worker int) { - for { - select { - case <-params.stopper.Ch(): - return - case filepath, ok := <-params.pathChan: - if !ok { - return - } - - blob, err := ioutil.ReadFile(filepath) - checkErr(err) - - hash := peer.GetBlobHash(blob) - if hash != path.Base(filepath) { - log.Errorf("worker %d: file name does not match hash (%s != %s), skipping", worker, filepath, hash) - select { - case params.countChan <- errInc: - case <-params.stopper.Ch(): - } - continue - } - - if isJSON(blob) { - log.Printf("worker %d: PUTTING SD BLOB %s", worker, hash) - err := blobStore.PutSD(hash, blob) - if err != nil { - log.Error("PutSD Error: ", err) - } - select { - case params.countChan <- sdInc: - case <-params.stopper.Ch(): - } - } else { - log.Printf("worker %d: putting %s", worker, hash) - err = blobStore.Put(hash, blob) - if err != nil { - log.Error("put Blob Error: ", err) - } - select { - case params.countChan <- blobInc: - case <-params.stopper.Ch(): - } - } - } - } -} - -func runCountReceiver(params *uploaderParams, startTime time.Time, totalCount int, existsCount int) { - for { - select { - case <-params.stopper.Ch(): - return - case countType, ok := <-params.countChan: - if !ok { - return - } - switch countType { - case sdInc: - params.sdCount++ - case blobInc: - params.blobCount++ - case errInc: - params.errCount++ - } - } - if (params.sdCount+params.blobCount)%50 == 0 { - log.Printf("%d of %d done (%s elapsed, %.3fs per blob)", params.sdCount+params.blobCount, totalCount-existsCount, time.Since(startTime).String(), time.Since(startTime).Seconds()/float64(params.sdCount+params.blobCount)) - } - } -} - -func getPaths(path string) ([]string, error) { - info, err := os.Stat(path) - if err != nil { - return nil, err - } - - if info.Mode().IsRegular() { - return []string{path}, nil - } - - f, err := os.Open(path) - if err != nil { - return nil, err - } - - files, err := f.Readdir(-1) - if err != nil { - return nil, err - } - err = f.Close() - if err != nil { - return nil, err - } - - var filenames []string - for _, file := range files { - if !file.IsDir() { - filenames = append(filenames, path+"/"+file.Name()) - } - } - - return filenames, nil + + err = uploader.Upload(args[0]) + checkErr(err) } diff --git a/peer/server.go b/peer/server.go index 68a2b0c..37ab13e 100644 --- a/peer/server.go +++ b/peer/server.go @@ -2,8 +2,6 @@ package peer import ( "bufio" - "crypto/sha512" - "encoding/hex" "encoding/json" "io" "net" @@ -215,7 +213,7 @@ func (s *Server) handleBlobRequest(data []byte) ([]byte, error) { } response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{ - BlobHash: GetBlobHash(blob), + BlobHash: reflector.BlobHash(blob), Length: len(blob), }}) if err != nil { @@ -268,7 +266,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) { return []byte{}, err } else { response.IncomingBlob = incomingBlob{ - BlobHash: GetBlobHash(blob), + BlobHash: reflector.BlobHash(blob), Length: len(blob), } s.stats.AddBlob() @@ -336,7 +334,7 @@ func readNextRequest(conn net.Conn) ([]byte, error) { } // yes, this is how the peer protocol knows when the request finishes - if isValidJSON(request) { + if reflector.IsValidJSON(request) { break } } @@ -358,17 +356,6 @@ func readNextRequest(conn net.Conn) ([]byte, error) { return request, nil } -func isValidJSON(b []byte) bool { - var r json.RawMessage - return json.Unmarshal(b, &r) == nil -} - -// GetBlobHash returns the sha512 hash hex encoded string of the blob byte slice. -func GetBlobHash(blob []byte) string { - hashBytes := sha512.Sum384(blob) - return hex.EncodeToString(hashBytes[:]) -} - const ( maxRequestSize = 64 * (2 ^ 10) // 64kb paymentRateAccepted = "RATE_ACCEPTED" diff --git a/reflector/server.go b/reflector/server.go index 1aaf983..9c2e4c0 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -403,11 +403,17 @@ func (s *Server) quitting() bool { } } +// BlobHash returns the sha512 hash hex encoded string of the blob byte slice. func BlobHash(blob []byte) string { hashBytes := sha512.Sum384(blob) return hex.EncodeToString(hashBytes[:]) } +func IsValidJSON(b []byte) bool { + var r json.RawMessage + return json.Unmarshal(b, &r) == nil +} + //type errorResponse struct { // Error string `json:"error"` //} diff --git a/reflector/uploader.go b/reflector/uploader.go new file mode 100644 index 0000000..0c3e6dd --- /dev/null +++ b/reflector/uploader.go @@ -0,0 +1,252 @@ +package reflector + +import ( + "io/ioutil" + "os" + "path" + "sync" + "time" + + "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/store" + + "github.com/lbryio/lbry.go/extras/errors" + "github.com/lbryio/lbry.go/extras/stop" + + log "github.com/sirupsen/logrus" +) + +const ( + sdInc = 1 + blobInc = 2 + errInc = 3 +) + +type Uploader struct { + db *db.SQL + store *store.DBBackedS3Store // could just be store.BlobStore interface + workers int + skipExistsCheck bool + stopper *stop.Group + countChan chan int + + count struct { + total, alreadyStored, sd, blob, err int + } +} + +func NewUploader(db *db.SQL, store *store.DBBackedS3Store, workers int, skipExistsCheck bool) *Uploader { + return &Uploader{ + db: db, + store: store, + workers: workers, + skipExistsCheck: skipExistsCheck, + stopper: stop.New(), + countChan: make(chan int), + } +} + +func (u *Uploader) Stop() { + log.Infoln("stopping uploader") + u.stopper.StopAndWait() +} + +func (u *Uploader) Upload(dirOrFilePath string) error { + paths, err := getPaths(dirOrFilePath) + if err != nil { + return err + } + + u.count.total = len(paths) + + hashes := make([]string, len(paths)) + for i, p := range paths { + hashes[i] = path.Base(p) + } + + log.Infoln("checking for existing blobs") + + var exists map[string]bool + if !u.skipExistsCheck { + exists, err = u.db.HasBlobs(hashes) + if err != nil { + return err + } + u.count.alreadyStored = len(exists) + } + + log.Infof("%d new blobs to upload", u.count.total-u.count.alreadyStored) + + workerWG := sync.WaitGroup{} + pathChan := make(chan string) + + for i := 0; i < u.workers; i++ { + workerWG.Add(1) + go func(i int) { + defer workerWG.Done() + defer func(i int) { log.Debugf("worker %d quitting", i) }(i) + u.worker(pathChan) + }(i) + } + + countWG := sync.WaitGroup{} + countWG.Add(1) + go func() { + defer countWG.Done() + u.counter() + }() + +Upload: + for _, f := range paths { + if exists != nil && exists[path.Base(f)] { + continue + } + + select { + case pathChan <- f: + case <-u.stopper.Ch(): + break Upload + } + } + + close(pathChan) + workerWG.Wait() + close(u.countChan) + countWG.Wait() + u.stopper.Stop() + + log.Infoln("SUMMARY") + log.Infof("%d blobs total", u.count.total) + log.Infof("%d blobs already stored", u.count.alreadyStored) + log.Infof("%d SD blobs uploaded", u.count.sd) + log.Infof("%d content blobs uploaded", u.count.blob) + log.Infof("%d errors encountered", u.count.err) + + return nil +} + +// worker reads paths from a channel and uploads them +func (u *Uploader) worker(pathChan chan string) { + for { + select { + case <-u.stopper.Ch(): + return + case filepath, ok := <-pathChan: + if !ok { + return + } + + err := u.uploadBlob(filepath) + if err != nil { + log.Errorln(err) + } + } + } +} + +// uploadBlob uploads a blob +func (u *Uploader) uploadBlob(filepath string) error { + blob, err := ioutil.ReadFile(filepath) + if err != nil { + return err + } + + hash := BlobHash(blob) + if hash != path.Base(filepath) { + return errors.Err("file name does not match hash (%s != %s), skipping", filepath, hash) + select { + case u.countChan <- errInc: + case <-u.stopper.Ch(): + } + return nil + } + + if IsValidJSON(blob) { + log.Debugf("Uploading SD blob %s", hash) + err := u.store.PutSD(hash, blob) + if err != nil { + return errors.Prefix("Uploading SD blob "+hash, err) + } + select { + case u.countChan <- sdInc: + case <-u.stopper.Ch(): + } + } else { + log.Debugf("Uploading blob %s", hash) + err = u.store.Put(hash, blob) + if err != nil { + return errors.Prefix("Uploading blob "+hash, err) + } + select { + case u.countChan <- blobInc: + case <-u.stopper.Ch(): + } + } + + return nil +} + +// counter updates the counts of how many sd blobs and content blobs were uploaded, and how many +// errors were encountered. It occasionally prints the upload progress to debug. +func (u *Uploader) counter() { + start := time.Now() + + for { + select { + case <-u.stopper.Ch(): + return + case countType, ok := <-u.countChan: + if !ok { + return + } + switch countType { + case sdInc: + u.count.sd++ + case blobInc: + u.count.blob++ + case errInc: + u.count.err++ + } + } + if (u.count.sd+u.count.blob)%50 == 0 { + log.Infof("%d of %d done (%s elapsed, %.3fs per blob)", u.count.sd+u.count.blob, u.count.total-u.count.alreadyStored, time.Since(start).String(), time.Since(start).Seconds()/float64(u.count.sd+u.count.blob)) + } + } +} + +// getPaths returns the paths for files to upload. it takes a path to a file or a dir. for a file, +// it returns the full path to that file. for a dir, it returns the paths for all the files in the +// dir +func getPaths(dirOrFilePath string) ([]string, error) { + info, err := os.Stat(dirOrFilePath) + if err != nil { + return nil, errors.Err(err) + } + + if info.Mode().IsRegular() { + return []string{dirOrFilePath}, nil + } + + f, err := os.Open(dirOrFilePath) + if err != nil { + return nil, errors.Err(err) + } + + files, err := f.Readdir(-1) + if err != nil { + return nil, errors.Err(err) + } + err = f.Close() + if err != nil { + return nil, errors.Err(err) + } + + var filenames []string + for _, file := range files { + if !file.IsDir() { + filenames = append(filenames, dirOrFilePath+"/"+file.Name()) + } + } + + return filenames, nil +}