e93c097fd9
replace deprecated function calls refactor build process
270 lines
5.8 KiB
Go
270 lines
5.8 KiB
Go
package reflector
|
|
|
|
import (
|
|
"os"
|
|
"path"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/lbryio/reflector.go/db"
|
|
"github.com/lbryio/reflector.go/internal/metrics"
|
|
"github.com/lbryio/reflector.go/store"
|
|
|
|
"github.com/lbryio/lbry.go/v2/extras/errors"
|
|
"github.com/lbryio/lbry.go/v2/extras/stop"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type increment int
|
|
|
|
const (
|
|
sdInc increment = iota + 1
|
|
blobInc
|
|
errInc
|
|
)
|
|
|
|
type Summary struct {
|
|
Total, AlreadyStored, Sd, Blob, Err int
|
|
}
|
|
|
|
type Uploader struct {
|
|
db *db.SQL
|
|
store *store.DBBackedStore // could just be store.BlobStore interface
|
|
workers int
|
|
skipExistsCheck bool
|
|
deleteBlobsAfterUpload bool
|
|
stopper *stop.Group
|
|
countChan chan increment
|
|
|
|
count Summary
|
|
}
|
|
|
|
func NewUploader(db *db.SQL, store *store.DBBackedStore, workers int, skipExistsCheck, deleteBlobsAfterUpload bool) *Uploader {
|
|
return &Uploader{
|
|
db: db,
|
|
store: store,
|
|
workers: workers,
|
|
skipExistsCheck: skipExistsCheck,
|
|
deleteBlobsAfterUpload: deleteBlobsAfterUpload,
|
|
stopper: stop.New(),
|
|
countChan: make(chan increment),
|
|
}
|
|
}
|
|
|
|
func (u *Uploader) Stop() {
|
|
log.Infoln("stopping uploader")
|
|
u.stopper.StopAndWait()
|
|
}
|
|
|
|
func (u *Uploader) Upload(dirOrFilePath string) error {
|
|
paths, err := getPaths(dirOrFilePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
u.count.Total = len(paths)
|
|
|
|
hashes := make([]string, len(paths))
|
|
for i, p := range paths {
|
|
hashes[i] = path.Base(p)
|
|
}
|
|
|
|
log.Debug("checking for existing blobs")
|
|
|
|
var exists map[string]bool
|
|
if !u.skipExistsCheck {
|
|
exists, err = u.db.HasBlobs(hashes, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
u.count.AlreadyStored = len(exists)
|
|
}
|
|
|
|
log.Debugf("%d new blobs to upload", u.count.Total-u.count.AlreadyStored)
|
|
|
|
workerWG := sync.WaitGroup{}
|
|
pathChan := make(chan string)
|
|
|
|
for i := 0; i < u.workers; i++ {
|
|
workerWG.Add(1)
|
|
metrics.RoutinesQueue.WithLabelValues("reflector", "upload").Inc()
|
|
go func(i int) {
|
|
defer metrics.RoutinesQueue.WithLabelValues("reflector", "upload").Dec()
|
|
defer workerWG.Done()
|
|
defer func(i int) { log.Debugf("worker %d quitting", i) }(i)
|
|
u.worker(pathChan)
|
|
}(i)
|
|
}
|
|
|
|
countWG := sync.WaitGroup{}
|
|
countWG.Add(1)
|
|
metrics.RoutinesQueue.WithLabelValues("reflector", "uploader").Inc()
|
|
go func() {
|
|
defer metrics.RoutinesQueue.WithLabelValues("reflector", "uploader").Dec()
|
|
defer countWG.Done()
|
|
u.counter()
|
|
}()
|
|
|
|
Upload:
|
|
for _, f := range paths {
|
|
if exists != nil && exists[path.Base(f)] {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case pathChan <- f:
|
|
case <-u.stopper.Ch():
|
|
break Upload
|
|
}
|
|
}
|
|
|
|
close(pathChan)
|
|
workerWG.Wait()
|
|
close(u.countChan)
|
|
countWG.Wait()
|
|
u.stopper.Stop()
|
|
|
|
log.Debugf(
|
|
"upload stats: %d blobs total, %d already stored, %d SD blobs uploaded, %d content blobs uploaded, %d errors",
|
|
u.count.Total, u.count.AlreadyStored, u.count.Sd, u.count.Blob, u.count.Err,
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// worker reads paths from a channel, uploads them, and optionally deletes them
|
|
func (u *Uploader) worker(pathChan chan string) {
|
|
for {
|
|
select {
|
|
case <-u.stopper.Ch():
|
|
return
|
|
case filepath, ok := <-pathChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
err := u.uploadBlob(filepath)
|
|
if err != nil {
|
|
log.Errorln(err)
|
|
} else if u.deleteBlobsAfterUpload {
|
|
err = os.Remove(filepath)
|
|
if err != nil {
|
|
log.Errorln(errors.Prefix("deleting blob", err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// uploadBlob uploads a blob
|
|
func (u *Uploader) uploadBlob(filepath string) (err error) {
|
|
defer func() {
|
|
if err != nil {
|
|
u.inc(errInc)
|
|
}
|
|
}()
|
|
|
|
blob, err := os.ReadFile(filepath)
|
|
if err != nil {
|
|
return errors.Err(err)
|
|
}
|
|
|
|
hash := BlobHash(blob)
|
|
if hash != path.Base(filepath) {
|
|
return errors.Err("file name does not match hash (%s != %s), skipping", filepath, hash)
|
|
}
|
|
|
|
if IsValidJSON(blob) {
|
|
log.Debugf("uploading SD blob %s", hash)
|
|
err := u.store.PutSD(hash, blob)
|
|
if err != nil {
|
|
return errors.Prefix("uploading SD blob "+hash, err)
|
|
}
|
|
u.inc(sdInc)
|
|
} else {
|
|
log.Debugf("uploading blob %s", hash)
|
|
err = u.store.Put(hash, blob)
|
|
if err != nil {
|
|
return errors.Prefix("uploading blob "+hash, err)
|
|
}
|
|
u.inc(blobInc)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// counter updates the counts of how many sd blobs and content blobs were uploaded, and how many
|
|
// errors were encountered. It occasionally prints the upload progress to debug.
|
|
func (u *Uploader) counter() {
|
|
start := time.Now()
|
|
|
|
for {
|
|
select {
|
|
case <-u.stopper.Ch():
|
|
return
|
|
case incrementType, ok := <-u.countChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
switch incrementType {
|
|
case sdInc:
|
|
u.count.Sd++
|
|
case blobInc:
|
|
u.count.Blob++
|
|
case errInc:
|
|
u.count.Err++
|
|
}
|
|
}
|
|
if (u.count.Sd+u.count.Blob)%50 == 0 {
|
|
log.Debugf("%d of %d done (%s elapsed, %.3fs per blob)", u.count.Sd+u.count.Blob, u.count.Total-u.count.AlreadyStored, time.Since(start).String(), time.Since(start).Seconds()/float64(u.count.Sd+u.count.Blob))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (u *Uploader) GetSummary() Summary {
|
|
return u.count
|
|
}
|
|
|
|
func (u *Uploader) inc(t increment) {
|
|
select {
|
|
case u.countChan <- t:
|
|
case <-u.stopper.Ch():
|
|
}
|
|
}
|
|
|
|
// getPaths returns the paths for files to upload. it takes a path to a file or a dir. for a file,
|
|
// it returns the full path to that file. for a dir, it returns the paths for all the files in the
|
|
// dir
|
|
func getPaths(dirOrFilePath string) ([]string, error) {
|
|
info, err := os.Stat(dirOrFilePath)
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
|
|
if info.Mode().IsRegular() {
|
|
return []string{dirOrFilePath}, nil
|
|
}
|
|
|
|
f, err := os.Open(dirOrFilePath)
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
|
|
files, err := f.Readdir(-1)
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
err = f.Close()
|
|
if err != nil {
|
|
return nil, errors.Err(err)
|
|
}
|
|
|
|
var filenames []string
|
|
for _, file := range files {
|
|
if !file.IsDir() {
|
|
filenames = append(filenames, dirOrFilePath+"/"+file.Name())
|
|
}
|
|
}
|
|
|
|
return filenames, nil
|
|
}
|