diff --git a/cmd/populatedb.go b/cmd/populatedb.go new file mode 100644 index 0000000..d30bafe --- /dev/null +++ b/cmd/populatedb.go @@ -0,0 +1,45 @@ +package cmd + +import ( + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/reflector.go/db" + "github.com/lbryio/reflector.go/meta" + "github.com/lbryio/reflector.go/store/speedwalk" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + diskStorePath string +) + +func init() { + var cmd = &cobra.Command{ + Use: "populate-db", + Short: "populate local database with blobs from a disk storage", + Run: populateDbCmd, + } + cmd.Flags().StringVar(&diskStorePath, "store-path", "", + "path of the store where all blobs are cached") + rootCmd.AddCommand(cmd) +} + +func populateDbCmd(cmd *cobra.Command, args []string) { + log.Printf("reflector %s", meta.VersionString()) + if diskStorePath == "" { + log.Fatal("store-path must be defined") + } + localDb := new(db.SQL) + localDb.SoftDelete = true + localDb.TrackAccess = db.TrackAccessBlobs + err := localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") + if err != nil { + log.Fatal(err) + } + blobs, err := speedwalk.AllFiles(diskStorePath, true) + err = localDb.AddBlobs(blobs) + if err != nil { + log.Errorf("error while storing to db: %s", errors.FullTrace(err)) + } +} diff --git a/cmd/reflector.go b/cmd/reflector.go index 47a27ff..aacef08 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -179,18 +179,20 @@ func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStor } localDb := new(db.SQL) + localDb.SoftDelete = true localDb.TrackAccess = db.TrackAccessBlobs err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") if err != nil { log.Fatal(err) } + dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, true) wrapped = store.NewCachingStore( "reflector", wrapped, - store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false), + dbBackedDiskStore, ) - go cleanOldestBlobs(int(realCacheSize), localDb, wrapped, cleanerStopper) + go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper) } diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache) @@ -246,8 +248,7 @@ func diskCacheParams(diskParams string) (int, string) { } func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { - const cleanupInterval = 10 * time.Second - + const cleanupInterval = 10 * time.Minute for { select { case <-stopper.Ch(): @@ -288,4 +289,5 @@ func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Grou } } } + return nil } diff --git a/db/db.go b/db/db.go index 3ac2dab..1ae1717 100644 --- a/db/db.go +++ b/db/db.go @@ -3,11 +3,13 @@ package db import ( "context" "database/sql" + "strings" "time" "github.com/lbryio/lbry.go/v2/dht/bits" "github.com/lbryio/lbry.go/v2/extras/errors" qt "github.com/lbryio/lbry.go/v2/extras/query" + "github.com/lbryio/lbry.go/v2/stream" "github.com/go-sql-driver/mysql" _ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used @@ -88,6 +90,27 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error { return err } +// AddBlob adds a blob to the database. +func (s *SQL) AddBlobs(hash []string) error { + if s.conn == nil { + return errors.Err("not connected") + } + // Split the slice into batches of 20 items. + batch := 10000 + + for i := 0; i < len(hash); i += batch { + j := i + batch + if j > len(hash) { + j = len(hash) + } + err := s.insertBlobs(hash[i:j]) // Process the batch. + if err != nil { + log.Errorf("error while inserting batch: %s", errors.FullTrace(err)) + } + } + return nil +} + func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) { if length <= 0 { return 0, errors.Err("length must be positive") @@ -130,6 +153,26 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) return blobID, nil } +func (s *SQL) insertBlobs(hashes []string) error { + var ( + q string + args []interface{} + ) + dayAgo := time.Now().AddDate(0, 0, -1) + q = "insert into blob_ (hash, is_stored, length, last_accessed_at) values " + for _, hash := range hashes { + q += "(?,?,?,?)," + args = append(args, hash, true, stream.MaxBlobSize, dayAgo) + } + q = strings.TrimSuffix(q, ",") + _, err := s.exec(q, args...) + if err != nil { + return err + } + + return nil +} + func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { var ( q string @@ -180,12 +223,13 @@ func (s *SQL) HasBlob(hash string) (bool, error) { // HasBlobs checks if the database contains the set of blobs and returns a bool map. func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { exists, idsNeedingTouch, err := s.hasBlobs(hashes) - - if s.TrackAccess == TrackAccessBlobs { - s.touchBlobs(idsNeedingTouch) - } else if s.TrackAccess == TrackAccessStreams { - s.touchStreams(idsNeedingTouch) - } + go func() { + if s.TrackAccess == TrackAccessBlobs { + s.touchBlobs(idsNeedingTouch) + } else if s.TrackAccess == TrackAccessStreams { + s.touchStreams(idsNeedingTouch) + } + }() return exists, err } diff --git a/store/dbbacked.go b/store/dbbacked.go index 1c5088a..dc88ad2 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -44,18 +44,18 @@ func (d *DBBackedStore) Get(hash string) (stream.Blob, error) { if !has { return nil, ErrBlobNotFound } + + b, err := d.blobs.Get(hash) if d.deleteOnMiss { - b, err := d.blobs.Get(hash) if err != nil && errors.Is(err, ErrBlobNotFound) { e2 := d.Delete(hash) if e2 != nil { log.Errorf("error while deleting blob from db: %s", errors.FullTrace(err)) } - return b, err } } - return d.blobs.Get(hash) + return b, err } // Put stores the blob in the S3 store and stores the blob information in the DB. diff --git a/store/litedbbacked.go b/store/litedbbacked.go deleted file mode 100644 index 93790d6..0000000 --- a/store/litedbbacked.go +++ /dev/null @@ -1,160 +0,0 @@ -package store - -import ( - "encoding/json" - "time" - - "github.com/lbryio/lbry.go/v2/extras/stop" - "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/lite_db" - - "github.com/lbryio/lbry.go/v2/extras/errors" - "github.com/lbryio/lbry.go/v2/stream" - - log "github.com/sirupsen/logrus" -) - -// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store. -type LiteDBBackedStore struct { - blobs BlobStore - db *lite_db.SQL - maxItems int - stopper *stop.Stopper - component string -} - -// NewDBBackedStore returns an initialized store pointer. -func NewLiteDBBackedStore(component string, blobs BlobStore, db *lite_db.SQL, maxItems int) *LiteDBBackedStore { - instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component} - go func() { - instance.selfClean() - }() - return instance -} - -const nameLiteDBBacked = "lite-db-backed" - -// Name is the cache type name -func (d *LiteDBBackedStore) Name() string { return nameDBBacked } - -// Has returns true if the blob is in the store -func (d *LiteDBBackedStore) Has(hash string) (bool, error) { - return d.db.HasBlob(hash) -} - -// Get gets the blob -func (d *LiteDBBackedStore) Get(hash string) (stream.Blob, error) { - has, err := d.db.HasBlob(hash) - if err != nil { - return nil, err - } - if !has { - return nil, ErrBlobNotFound - } - b, err := d.blobs.Get(hash) - if err != nil && errors.Is(err, ErrBlobNotFound) { - e2 := d.db.Delete(hash) - if e2 != nil { - log.Errorf("error while deleting blob from db: %s", errors.FullTrace(e2)) - } - return b, err - } - - return d.blobs.Get(hash) -} - -// Put stores the blob in the S3 store and stores the blob information in the DB. -func (d *LiteDBBackedStore) Put(hash string, blob stream.Blob) error { - err := d.blobs.Put(hash, blob) - if err != nil { - return err - } - - return d.db.AddBlob(hash, len(blob)) -} - -// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if -// there is an error storing the blob information in the DB. -func (d *LiteDBBackedStore) PutSD(hash string, blob stream.Blob) error { - var blobContents db.SdBlob - err := json.Unmarshal(blob, &blobContents) - if err != nil { - return errors.Err(err) - } - if blobContents.StreamHash == "" { - return errors.Err("sd blob is missing stream hash") - } - - err = d.blobs.PutSD(hash, blob) - if err != nil { - return err - } - - return d.db.AddSDBlob(hash, len(blob)) -} - -func (d *LiteDBBackedStore) Delete(hash string) error { - err := d.blobs.Delete(hash) - if err != nil { - return err - } - - return d.db.Delete(hash) -} - -// list returns the hashes of blobs that already exist in the database -func (d *LiteDBBackedStore) list() ([]string, error) { - blobs, err := d.db.AllBlobs() - return blobs, err -} - -func (d *LiteDBBackedStore) selfClean() { - d.stopper.Add(1) - defer d.stopper.Done() - lastCleanup := time.Now() - const cleanupInterval = 10 * time.Second - for { - select { - case <-d.stopper.Ch(): - log.Infoln("stopping self cleanup") - return - default: - time.Sleep(1 * time.Second) - } - if time.Since(lastCleanup) < cleanupInterval { - continue - } - blobsCount, err := d.db.BlobsCount() - if err != nil { - log.Errorf(errors.FullTrace(err)) - } - if blobsCount >= d.maxItems { - itemsToDelete := blobsCount / 100 * 10 - blobs, err := d.db.GetLRUBlobs(itemsToDelete) - if err != nil { - log.Errorf(errors.FullTrace(err)) - } - for _, hash := range blobs { - select { - case <-d.stopper.Ch(): - return - default: - - } - err = d.Delete(hash) - if err != nil { - log.Errorf(errors.FullTrace(err)) - } - metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc() - } - } - lastCleanup = time.Now() - } -} - -// Shutdown shuts down the store gracefully -func (d *LiteDBBackedStore) Shutdown() { - d.stopper.StopAndWait() - return -}