diff --git a/db/db.go b/db/db.go index dcdebf0..26c28ca 100644 --- a/db/db.go +++ b/db/db.go @@ -3,18 +3,22 @@ package db import ( "context" "database/sql" + "fmt" + "runtime" "strings" "time" "github.com/lbryio/lbry.go/v2/dht/bits" "github.com/lbryio/lbry.go/v2/extras/errors" qt "github.com/lbryio/lbry.go/v2/extras/query" + "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/lbry.go/v2/stream" "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used log "github.com/sirupsen/logrus" "github.com/volatiletech/null" + "go.uber.org/atomic" ) // SdBlob is a special blob that contains information on the rest of the blobs in the stream @@ -104,33 +108,62 @@ func (s *SQL) AddBlobs(hash []string) error { } // Split the slice into batches of 20 items. batch := 10000 - - for i := 0; i < len(hash); i += batch { - j := i + batch - if j > len(hash) { - j = len(hash) - } - err := s.insertBlobs(hash[i:j]) // Process the batch. - if err != nil { - log.Errorf("error while inserting batch: %s", errors.FullTrace(err)) + totalBlobs := int64(len(hash)) + work := make(chan []string, 1000) + stopper := stop.New() + var totalInserted atomic.Int64 + start := time.Now() + go func() { + for i := 0; i < len(hash); i += batch { + j := i + batch + if j > len(hash) { + j = len(hash) + } + work <- hash[i:j] } + log.Infof("done loading %d hashes in the work queue", len(hash)) + close(work) + }() + for i := 0; i < runtime.NumCPU(); i++ { + stopper.Add(1) + go func(worker int) { + log.Infof("starting worker %d", worker) + defer stopper.Done() + for hashes := range work { + inserted := totalInserted.Load() + remaining := totalBlobs - inserted + if inserted > 0 { + timePerBlob := time.Since(start).Microseconds() / inserted + remainingTime := time.Duration(remaining*timePerBlob) * time.Microsecond + log.Infof("[T%d] processing batch of %d items. ETA: %s", worker, len(hashes), remainingTime.String()) + } + err := s.insertBlobs(hashes) // Process the batch. + if err != nil { + log.Errorf("error while inserting batch: %s", errors.FullTrace(err)) + } + totalInserted.Add(int64(len(hashes))) + } + }(i) } + stopper.Wait() return nil } func (s *SQL) insertBlobs(hashes []string) error { var ( - q string - args []interface{} + q string + //args []interface{} ) - dayAgo := time.Now().AddDate(0, 0, -1) + dayAgo := time.Now().AddDate(0, 0, -1).Format("2006-01-02 15:04:05") q = "insert into blob_ (hash, is_stored, length, last_accessed_at) values " for _, hash := range hashes { - q += "(?,?,?,?)," - args = append(args, hash, true, stream.MaxBlobSize, dayAgo) + // prepared statements slow everything down by a lot due to reflection + // for this specific instance we'll go ahead and hardcode the query to make it go faster + q += fmt.Sprintf("('%s',1,%d,'%s'),", hash, stream.MaxBlobSize, dayAgo) + //args = append(args, hash, true, stream.MaxBlobSize, dayAgo) } q = strings.TrimSuffix(q, ",") - _, err := s.exec(q, args...) + _, err := s.exec(q) if err != nil { return err } @@ -292,7 +325,7 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { var needsTouch []uint64 exists := make(map[string]bool) - touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time + touchDeadline := time.Now().Add(-6 * time.Hour) // touch blob if last accessed before this time maxBatchSize := 10000 doneIndex := 0