Merge branch 'uploader'
* uploader: separate uploader into reusable component
This commit is contained in:
commit
027409fe40
4 changed files with 270 additions and 243 deletions
236
cmd/upload.go
236
cmd/upload.go
|
@ -1,45 +1,20 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/peer"
|
||||
"github.com/lbryio/reflector.go/reflector"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
"github.com/lbryio/lbry.go/extras/stop"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var uploadWorkers int
|
||||
var uploadSkipExistsCheck bool
|
||||
|
||||
const (
|
||||
sdInc = 1
|
||||
blobInc = 2
|
||||
errInc = 3
|
||||
)
|
||||
|
||||
type uploaderParams struct {
|
||||
workerWG *sync.WaitGroup
|
||||
counterWG *sync.WaitGroup
|
||||
stopper *stop.Group
|
||||
pathChan chan string
|
||||
countChan chan int
|
||||
sdCount int
|
||||
blobCount int
|
||||
errCount int
|
||||
}
|
||||
|
||||
func init() {
|
||||
var cmd = &cobra.Command{
|
||||
Use: "upload PATH",
|
||||
|
@ -53,216 +28,23 @@ func init() {
|
|||
}
|
||||
|
||||
func uploadCmd(cmd *cobra.Command, args []string) {
|
||||
startTime := time.Now()
|
||||
db := new(db.SQL)
|
||||
err := db.Connect(globalConfig.DBConn)
|
||||
checkErr(err)
|
||||
|
||||
params := uploaderParams{
|
||||
workerWG: &sync.WaitGroup{},
|
||||
counterWG: &sync.WaitGroup{},
|
||||
pathChan: make(chan string),
|
||||
countChan: make(chan int),
|
||||
stopper: stop.New()}
|
||||
st := store.NewDBBackedS3Store(
|
||||
store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName),
|
||||
db)
|
||||
|
||||
setInterrupt(params.stopper)
|
||||
uploader := reflector.NewUploader(db, st, uploadWorkers, uploadSkipExistsCheck)
|
||||
|
||||
paths, err := getPaths(args[0])
|
||||
checkErr(err)
|
||||
|
||||
totalCount := len(paths)
|
||||
|
||||
hashes := make([]string, len(paths))
|
||||
for i, p := range paths {
|
||||
hashes[i] = path.Base(p)
|
||||
}
|
||||
|
||||
log.Println("checking for existing blobs")
|
||||
|
||||
exists := make(map[string]bool)
|
||||
if !uploadSkipExistsCheck {
|
||||
exists, err = db.HasBlobs(hashes)
|
||||
checkErr(err)
|
||||
}
|
||||
existsCount := len(exists)
|
||||
|
||||
log.Printf("%d new blobs to upload", totalCount-existsCount)
|
||||
|
||||
startUploadWorkers(¶ms)
|
||||
params.counterWG.Add(1)
|
||||
go func() {
|
||||
defer params.counterWG.Done()
|
||||
runCountReceiver(¶ms, startTime, totalCount, existsCount)
|
||||
}()
|
||||
|
||||
Upload:
|
||||
for _, f := range paths {
|
||||
if exists[path.Base(f)] {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case params.pathChan <- f:
|
||||
case <-params.stopper.Ch():
|
||||
log.Warnln("Caught interrupt, quitting at first opportunity...")
|
||||
break Upload
|
||||
}
|
||||
}
|
||||
|
||||
close(params.pathChan)
|
||||
params.workerWG.Wait()
|
||||
close(params.countChan)
|
||||
params.counterWG.Wait()
|
||||
params.stopper.Stop()
|
||||
|
||||
log.Println("SUMMARY")
|
||||
log.Printf("%d blobs total", totalCount)
|
||||
log.Printf("%d SD blobs uploaded", params.sdCount)
|
||||
log.Printf("%d content blobs uploaded", params.blobCount)
|
||||
log.Printf("%d blobs already stored", existsCount)
|
||||
log.Printf("%d errors encountered", params.errCount)
|
||||
}
|
||||
|
||||
func isJSON(data []byte) bool {
|
||||
var js json.RawMessage
|
||||
return json.Unmarshal(data, &js) == nil
|
||||
}
|
||||
|
||||
func newBlobStore() *store.DBBackedS3Store {
|
||||
db := new(db.SQL)
|
||||
err := db.Connect(globalConfig.DBConn)
|
||||
checkErr(err)
|
||||
|
||||
s3 := store.NewS3BlobStore(globalConfig.AwsID, globalConfig.AwsSecret, globalConfig.BucketRegion, globalConfig.BucketName)
|
||||
return store.NewDBBackedS3Store(s3, db)
|
||||
}
|
||||
|
||||
func setInterrupt(stopper *stop.Group) {
|
||||
interruptChan := make(chan os.Signal, 1)
|
||||
signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM)
|
||||
go func() {
|
||||
<-interruptChan
|
||||
stopper.Stop()
|
||||
uploader.Stop()
|
||||
}()
|
||||
}
|
||||
|
||||
func startUploadWorkers(params *uploaderParams) {
|
||||
for i := 0; i < uploadWorkers; i++ {
|
||||
params.workerWG.Add(1)
|
||||
go func(i int) {
|
||||
defer params.workerWG.Done()
|
||||
defer func(i int) {
|
||||
log.Printf("worker %d quitting", i)
|
||||
}(i)
|
||||
|
||||
blobStore := newBlobStore()
|
||||
launchFileUploader(params, blobStore, i)
|
||||
}(i)
|
||||
}
|
||||
}
|
||||
|
||||
func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, worker int) {
|
||||
for {
|
||||
select {
|
||||
case <-params.stopper.Ch():
|
||||
return
|
||||
case filepath, ok := <-params.pathChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
blob, err := ioutil.ReadFile(filepath)
|
||||
checkErr(err)
|
||||
|
||||
hash := peer.GetBlobHash(blob)
|
||||
if hash != path.Base(filepath) {
|
||||
log.Errorf("worker %d: file name does not match hash (%s != %s), skipping", worker, filepath, hash)
|
||||
select {
|
||||
case params.countChan <- errInc:
|
||||
case <-params.stopper.Ch():
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if isJSON(blob) {
|
||||
log.Printf("worker %d: PUTTING SD BLOB %s", worker, hash)
|
||||
err := blobStore.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
log.Error("PutSD Error: ", err)
|
||||
}
|
||||
select {
|
||||
case params.countChan <- sdInc:
|
||||
case <-params.stopper.Ch():
|
||||
}
|
||||
} else {
|
||||
log.Printf("worker %d: putting %s", worker, hash)
|
||||
err = blobStore.Put(hash, blob)
|
||||
if err != nil {
|
||||
log.Error("put Blob Error: ", err)
|
||||
}
|
||||
select {
|
||||
case params.countChan <- blobInc:
|
||||
case <-params.stopper.Ch():
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runCountReceiver(params *uploaderParams, startTime time.Time, totalCount int, existsCount int) {
|
||||
for {
|
||||
select {
|
||||
case <-params.stopper.Ch():
|
||||
return
|
||||
case countType, ok := <-params.countChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch countType {
|
||||
case sdInc:
|
||||
params.sdCount++
|
||||
case blobInc:
|
||||
params.blobCount++
|
||||
case errInc:
|
||||
params.errCount++
|
||||
}
|
||||
}
|
||||
if (params.sdCount+params.blobCount)%50 == 0 {
|
||||
log.Printf("%d of %d done (%s elapsed, %.3fs per blob)", params.sdCount+params.blobCount, totalCount-existsCount, time.Since(startTime).String(), time.Since(startTime).Seconds()/float64(params.sdCount+params.blobCount))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getPaths(path string) ([]string, error) {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.Mode().IsRegular() {
|
||||
return []string{path}, nil
|
||||
}
|
||||
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
files, err := f.Readdir(-1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var filenames []string
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
filenames = append(filenames, path+"/"+file.Name())
|
||||
}
|
||||
}
|
||||
|
||||
return filenames, nil
|
||||
|
||||
err = uploader.Upload(args[0])
|
||||
checkErr(err)
|
||||
}
|
||||
|
|
|
@ -2,8 +2,6 @@ package peer
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/sha512"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net"
|
||||
|
@ -215,7 +213,7 @@ func (s *Server) handleBlobRequest(data []byte) ([]byte, error) {
|
|||
}
|
||||
|
||||
response, err := json.Marshal(blobResponse{IncomingBlob: incomingBlob{
|
||||
BlobHash: GetBlobHash(blob),
|
||||
BlobHash: reflector.BlobHash(blob),
|
||||
Length: len(blob),
|
||||
}})
|
||||
if err != nil {
|
||||
|
@ -268,7 +266,7 @@ func (s *Server) handleCompositeRequest(data []byte) ([]byte, error) {
|
|||
return []byte{}, err
|
||||
} else {
|
||||
response.IncomingBlob = incomingBlob{
|
||||
BlobHash: GetBlobHash(blob),
|
||||
BlobHash: reflector.BlobHash(blob),
|
||||
Length: len(blob),
|
||||
}
|
||||
s.stats.AddBlob()
|
||||
|
@ -336,7 +334,7 @@ func readNextRequest(conn net.Conn) ([]byte, error) {
|
|||
}
|
||||
|
||||
// yes, this is how the peer protocol knows when the request finishes
|
||||
if isValidJSON(request) {
|
||||
if reflector.IsValidJSON(request) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -358,17 +356,6 @@ func readNextRequest(conn net.Conn) ([]byte, error) {
|
|||
return request, nil
|
||||
}
|
||||
|
||||
func isValidJSON(b []byte) bool {
|
||||
var r json.RawMessage
|
||||
return json.Unmarshal(b, &r) == nil
|
||||
}
|
||||
|
||||
// GetBlobHash returns the sha512 hash hex encoded string of the blob byte slice.
|
||||
func GetBlobHash(blob []byte) string {
|
||||
hashBytes := sha512.Sum384(blob)
|
||||
return hex.EncodeToString(hashBytes[:])
|
||||
}
|
||||
|
||||
const (
|
||||
maxRequestSize = 64 * (2 ^ 10) // 64kb
|
||||
paymentRateAccepted = "RATE_ACCEPTED"
|
||||
|
|
|
@ -403,11 +403,17 @@ func (s *Server) quitting() bool {
|
|||
}
|
||||
}
|
||||
|
||||
// BlobHash returns the sha512 hash hex encoded string of the blob byte slice.
|
||||
func BlobHash(blob []byte) string {
|
||||
hashBytes := sha512.Sum384(blob)
|
||||
return hex.EncodeToString(hashBytes[:])
|
||||
}
|
||||
|
||||
func IsValidJSON(b []byte) bool {
|
||||
var r json.RawMessage
|
||||
return json.Unmarshal(b, &r) == nil
|
||||
}
|
||||
|
||||
//type errorResponse struct {
|
||||
// Error string `json:"error"`
|
||||
//}
|
||||
|
|
252
reflector/uploader.go
Normal file
252
reflector/uploader.go
Normal file
|
@ -0,0 +1,252 @@
|
|||
package reflector
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/store"
|
||||
|
||||
"github.com/lbryio/lbry.go/extras/errors"
|
||||
"github.com/lbryio/lbry.go/extras/stop"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
sdInc = 1
|
||||
blobInc = 2
|
||||
errInc = 3
|
||||
)
|
||||
|
||||
type Uploader struct {
|
||||
db *db.SQL
|
||||
store *store.DBBackedS3Store // could just be store.BlobStore interface
|
||||
workers int
|
||||
skipExistsCheck bool
|
||||
stopper *stop.Group
|
||||
countChan chan int
|
||||
|
||||
count struct {
|
||||
total, alreadyStored, sd, blob, err int
|
||||
}
|
||||
}
|
||||
|
||||
func NewUploader(db *db.SQL, store *store.DBBackedS3Store, workers int, skipExistsCheck bool) *Uploader {
|
||||
return &Uploader{
|
||||
db: db,
|
||||
store: store,
|
||||
workers: workers,
|
||||
skipExistsCheck: skipExistsCheck,
|
||||
stopper: stop.New(),
|
||||
countChan: make(chan int),
|
||||
}
|
||||
}
|
||||
|
||||
func (u *Uploader) Stop() {
|
||||
log.Infoln("stopping uploader")
|
||||
u.stopper.StopAndWait()
|
||||
}
|
||||
|
||||
func (u *Uploader) Upload(dirOrFilePath string) error {
|
||||
paths, err := getPaths(dirOrFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
u.count.total = len(paths)
|
||||
|
||||
hashes := make([]string, len(paths))
|
||||
for i, p := range paths {
|
||||
hashes[i] = path.Base(p)
|
||||
}
|
||||
|
||||
log.Infoln("checking for existing blobs")
|
||||
|
||||
var exists map[string]bool
|
||||
if !u.skipExistsCheck {
|
||||
exists, err = u.db.HasBlobs(hashes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
u.count.alreadyStored = len(exists)
|
||||
}
|
||||
|
||||
log.Infof("%d new blobs to upload", u.count.total-u.count.alreadyStored)
|
||||
|
||||
workerWG := sync.WaitGroup{}
|
||||
pathChan := make(chan string)
|
||||
|
||||
for i := 0; i < u.workers; i++ {
|
||||
workerWG.Add(1)
|
||||
go func(i int) {
|
||||
defer workerWG.Done()
|
||||
defer func(i int) { log.Debugf("worker %d quitting", i) }(i)
|
||||
u.worker(pathChan)
|
||||
}(i)
|
||||
}
|
||||
|
||||
countWG := sync.WaitGroup{}
|
||||
countWG.Add(1)
|
||||
go func() {
|
||||
defer countWG.Done()
|
||||
u.counter()
|
||||
}()
|
||||
|
||||
Upload:
|
||||
for _, f := range paths {
|
||||
if exists != nil && exists[path.Base(f)] {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case pathChan <- f:
|
||||
case <-u.stopper.Ch():
|
||||
break Upload
|
||||
}
|
||||
}
|
||||
|
||||
close(pathChan)
|
||||
workerWG.Wait()
|
||||
close(u.countChan)
|
||||
countWG.Wait()
|
||||
u.stopper.Stop()
|
||||
|
||||
log.Infoln("SUMMARY")
|
||||
log.Infof("%d blobs total", u.count.total)
|
||||
log.Infof("%d blobs already stored", u.count.alreadyStored)
|
||||
log.Infof("%d SD blobs uploaded", u.count.sd)
|
||||
log.Infof("%d content blobs uploaded", u.count.blob)
|
||||
log.Infof("%d errors encountered", u.count.err)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// worker reads paths from a channel and uploads them
|
||||
func (u *Uploader) worker(pathChan chan string) {
|
||||
for {
|
||||
select {
|
||||
case <-u.stopper.Ch():
|
||||
return
|
||||
case filepath, ok := <-pathChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
err := u.uploadBlob(filepath)
|
||||
if err != nil {
|
||||
log.Errorln(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// uploadBlob uploads a blob
|
||||
func (u *Uploader) uploadBlob(filepath string) error {
|
||||
blob, err := ioutil.ReadFile(filepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash := BlobHash(blob)
|
||||
if hash != path.Base(filepath) {
|
||||
return errors.Err("file name does not match hash (%s != %s), skipping", filepath, hash)
|
||||
select {
|
||||
case u.countChan <- errInc:
|
||||
case <-u.stopper.Ch():
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if IsValidJSON(blob) {
|
||||
log.Debugf("Uploading SD blob %s", hash)
|
||||
err := u.store.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
return errors.Prefix("Uploading SD blob "+hash, err)
|
||||
}
|
||||
select {
|
||||
case u.countChan <- sdInc:
|
||||
case <-u.stopper.Ch():
|
||||
}
|
||||
} else {
|
||||
log.Debugf("Uploading blob %s", hash)
|
||||
err = u.store.Put(hash, blob)
|
||||
if err != nil {
|
||||
return errors.Prefix("Uploading blob "+hash, err)
|
||||
}
|
||||
select {
|
||||
case u.countChan <- blobInc:
|
||||
case <-u.stopper.Ch():
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// counter updates the counts of how many sd blobs and content blobs were uploaded, and how many
|
||||
// errors were encountered. It occasionally prints the upload progress to debug.
|
||||
func (u *Uploader) counter() {
|
||||
start := time.Now()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-u.stopper.Ch():
|
||||
return
|
||||
case countType, ok := <-u.countChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch countType {
|
||||
case sdInc:
|
||||
u.count.sd++
|
||||
case blobInc:
|
||||
u.count.blob++
|
||||
case errInc:
|
||||
u.count.err++
|
||||
}
|
||||
}
|
||||
if (u.count.sd+u.count.blob)%50 == 0 {
|
||||
log.Infof("%d of %d done (%s elapsed, %.3fs per blob)", u.count.sd+u.count.blob, u.count.total-u.count.alreadyStored, time.Since(start).String(), time.Since(start).Seconds()/float64(u.count.sd+u.count.blob))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getPaths returns the paths for files to upload. it takes a path to a file or a dir. for a file,
|
||||
// it returns the full path to that file. for a dir, it returns the paths for all the files in the
|
||||
// dir
|
||||
func getPaths(dirOrFilePath string) ([]string, error) {
|
||||
info, err := os.Stat(dirOrFilePath)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
|
||||
if info.Mode().IsRegular() {
|
||||
return []string{dirOrFilePath}, nil
|
||||
}
|
||||
|
||||
f, err := os.Open(dirOrFilePath)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
|
||||
files, err := f.Readdir(-1)
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return nil, errors.Err(err)
|
||||
}
|
||||
|
||||
var filenames []string
|
||||
for _, file := range files {
|
||||
if !file.IsDir() {
|
||||
filenames = append(filenames, dirOrFilePath+"/"+file.Name())
|
||||
}
|
||||
}
|
||||
|
||||
return filenames, nil
|
||||
}
|
Loading…
Reference in a new issue