when reflecting a sdblob, insert all the stream and intermediate blobs using a transaction #50

Closed
shyba wants to merge 39 commits from insert_under_tx into master
5 changed files with 101 additions and 168 deletions
Showing only changes of commit 0d5004a83b - Show all commits

45
cmd/populatedb.go Normal file
View file

@ -0,0 +1,45 @@
package cmd
import (
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/meta"
"github.com/lbryio/reflector.go/store/speedwalk"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var (
diskStorePath string
)
func init() {
var cmd = &cobra.Command{
Use: "populate-db",
Short: "populate local database with blobs from a disk storage",
Run: populateDbCmd,
}
cmd.Flags().StringVar(&diskStorePath, "store-path", "",
"path of the store where all blobs are cached")
rootCmd.AddCommand(cmd)
}
func populateDbCmd(cmd *cobra.Command, args []string) {
log.Printf("reflector %s", meta.VersionString())
if diskStorePath == "" {
log.Fatal("store-path must be defined")
}
localDb := new(db.SQL)
localDb.SoftDelete = true
localDb.TrackAccess = db.TrackAccessBlobs
err := localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
if err != nil {
log.Fatal(err)
}
blobs, err := speedwalk.AllFiles(diskStorePath, true)
err = localDb.AddBlobs(blobs)
if err != nil {
log.Errorf("error while storing to db: %s", errors.FullTrace(err))
}
}

View file

@ -185,11 +185,11 @@ func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStor
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false) dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, true)
wrapped = store.NewCachingStore( wrapped = store.NewCachingStore(
"reflector", "reflector",
wrapped, wrapped,
store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false), dbBackedDiskStore,
) )
go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper) go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper)
@ -248,8 +248,7 @@ func diskCacheParams(diskParams string) (int, string) {
} }
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
const cleanupInterval = 10 * time.Second const cleanupInterval = 10 * time.Minute
for { for {
select { select {
case <-stopper.Ch(): case <-stopper.Ch():

View file

@ -3,11 +3,13 @@ package db
import ( import (
"context" "context"
"database/sql" "database/sql"
"strings"
"time" "time"
"github.com/lbryio/lbry.go/v2/dht/bits" "github.com/lbryio/lbry.go/v2/dht/bits"
"github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/extras/errors"
qt "github.com/lbryio/lbry.go/v2/extras/query" qt "github.com/lbryio/lbry.go/v2/extras/query"
"github.com/lbryio/lbry.go/v2/stream"
"github.com/go-sql-driver/mysql" "github.com/go-sql-driver/mysql"
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used _ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
@ -88,6 +90,27 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error {
return err return err
} }
// AddBlob adds a blob to the database.
func (s *SQL) AddBlobs(hash []string) error {
if s.conn == nil {
return errors.Err("not connected")
}
// Split the slice into batches of 20 items.
batch := 10000
for i := 0; i < len(hash); i += batch {
j := i + batch
if j > len(hash) {
j = len(hash)
}
err := s.insertBlobs(hash[i:j]) // Process the batch.
if err != nil {
log.Errorf("error while inserting batch: %s", errors.FullTrace(err))
}
}
return nil
}
func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) { func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) {
if length <= 0 { if length <= 0 {
return 0, errors.Err("length must be positive") return 0, errors.Err("length must be positive")
@ -130,6 +153,26 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
return blobID, nil return blobID, nil
} }
func (s *SQL) insertBlobs(hashes []string) error {
var (
q string
args []interface{}
)
dayAgo := time.Now().AddDate(0, 0, -1)
q = "insert into blob_ (hash, is_stored, length, last_accessed_at) values "
for _, hash := range hashes {
q += "(?,?,?,?),"
args = append(args, hash, true, stream.MaxBlobSize, dayAgo)
}
q = strings.TrimSuffix(q, ",")
_, err := s.exec(q, args...)
if err != nil {
return err
}
return nil
}
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
var ( var (
q string q string
@ -180,12 +223,13 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
// HasBlobs checks if the database contains the set of blobs and returns a bool map. // HasBlobs checks if the database contains the set of blobs and returns a bool map.
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
exists, idsNeedingTouch, err := s.hasBlobs(hashes) exists, idsNeedingTouch, err := s.hasBlobs(hashes)
go func() {
if s.TrackAccess == TrackAccessBlobs { if s.TrackAccess == TrackAccessBlobs {
s.touchBlobs(idsNeedingTouch) s.touchBlobs(idsNeedingTouch)
} else if s.TrackAccess == TrackAccessStreams { } else if s.TrackAccess == TrackAccessStreams {
s.touchStreams(idsNeedingTouch) s.touchStreams(idsNeedingTouch)
} }
}()
return exists, err return exists, err
} }

View file

@ -44,18 +44,18 @@ func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
if !has { if !has {
return nil, ErrBlobNotFound return nil, ErrBlobNotFound
} }
if d.deleteOnMiss {
b, err := d.blobs.Get(hash) b, err := d.blobs.Get(hash)
if d.deleteOnMiss {
if err != nil && errors.Is(err, ErrBlobNotFound) { if err != nil && errors.Is(err, ErrBlobNotFound) {
e2 := d.Delete(hash) e2 := d.Delete(hash)
if e2 != nil { if e2 != nil {
log.Errorf("error while deleting blob from db: %s", errors.FullTrace(err)) log.Errorf("error while deleting blob from db: %s", errors.FullTrace(err))
} }
return b, err
} }
} }
return d.blobs.Get(hash) return b, err
} }
// Put stores the blob in the S3 store and stores the blob information in the DB. // Put stores the blob in the S3 store and stores the blob information in the DB.

View file

@ -1,155 +0,0 @@
package store
import (
"encoding/json"
"github.com/lbryio/lbry.go/v2/extras/stop"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/lbry.go/v2/extras/errors"
"github.com/lbryio/lbry.go/v2/stream"
log "github.com/sirupsen/logrus"
)
// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store.
type LiteDBBackedStore struct {
blobs BlobStore
db *db.SQL
maxItems int
stopper *stop.Stopper
component string
}
// NewDBBackedStore returns an initialized store pointer.
func NewLiteDBBackedStore(component string, blobs BlobStore, db *db.SQL, maxItems int) *LiteDBBackedStore {
instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component}
return instance
}
const nameLiteDBBacked = "lite-db-backed"
// Name is the cache type name
func (d *LiteDBBackedStore) Name() string { return nameDBBacked }
// Has returns true if the blob is in the store
func (d *LiteDBBackedStore) Has(hash string) (bool, error) {
return d.db.HasBlob(hash)
}
// Get gets the blob
func (d *LiteDBBackedStore) Get(hash string) (stream.Blob, error) {
has, err := d.db.HasBlob(hash)
if err != nil {
return nil, err
}
if !has {
return nil, ErrBlobNotFound
}
b, err := d.blobs.Get(hash)
if err != nil && errors.Is(err, ErrBlobNotFound) {
e2 := d.db.Delete(hash)
if e2 != nil {
log.Errorf("error while deleting blob from db: %s", errors.FullTrace(e2))
}
return b, err
}
return d.blobs.Get(hash)
}
// Put stores the blob in the S3 store and stores the blob information in the DB.
func (d *LiteDBBackedStore) Put(hash string, blob stream.Blob) error {
err := d.blobs.Put(hash, blob)
if err != nil {
return err
}
return d.db.AddBlob(hash, len(blob), true)
}
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
// there is an error storing the blob information in the DB.
func (d *LiteDBBackedStore) PutSD(hash string, blob stream.Blob) error {
var blobContents db.SdBlob
err := json.Unmarshal(blob, &blobContents)
if err != nil {
return errors.Err(err)
}
if blobContents.StreamHash == "" {
return errors.Err("sd blob is missing stream hash")
}
err = d.blobs.PutSD(hash, blob)
if err != nil {
return err
}
return d.db.AddSDBlob(hash, len(blob), blobContents)
}
func (d *LiteDBBackedStore) Delete(hash string) error {
err := d.blobs.Delete(hash)
if err != nil {
return err
}
return d.db.Delete(hash)
}
// list returns the hashes of blobs that already exist in the database
func (d *LiteDBBackedStore) list() ([]string, error) {
//blobs, err := d.db.AllBlobs()
//return blobs, err
return nil, nil
}
//func (d *LiteDBBackedStore) selfClean() {
// d.stopper.Add(1)
// defer d.stopper.Done()
// lastCleanup := time.Now()
// const cleanupInterval = 10 * time.Second
// for {
// select {
// case <-d.stopper.Ch():
// log.Infoln("stopping self cleanup")
// return
// default:
// time.Sleep(1 * time.Second)
// }
// if time.Since(lastCleanup) < cleanupInterval {
// continue
// }
// blobsCount, err := d.db.BlobsCount()
// if err != nil {
// log.Errorf(errors.FullTrace(err))
// }
// if blobsCount >= d.maxItems {
// itemsToDelete := blobsCount / 100 * 10
// blobs, err := d.db.GetLRUBlobs(itemsToDelete)
// if err != nil {
// log.Errorf(errors.FullTrace(err))
// }
// for _, hash := range blobs {
// select {
// case <-d.stopper.Ch():
// return
// default:
//
// }
// err = d.Delete(hash)
// if err != nil {
// log.Errorf(errors.FullTrace(err))
// }
// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
// }
// }
// lastCleanup = time.Now()
// }
//}
// Shutdown shuts down the store gracefully
func (d *LiteDBBackedStore) Shutdown() {
d.stopper.StopAndWait()
return
}