add cmd to populate db
fix store init try fixing unreasonable db bottleneck
This commit is contained in:
parent
6fb0620091
commit
35c713a26e
5 changed files with 104 additions and 173 deletions
45
cmd/populatedb.go
Normal file
45
cmd/populatedb.go
Normal file
|
@ -0,0 +1,45 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/meta"
|
||||
"github.com/lbryio/reflector.go/store/speedwalk"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var (
|
||||
diskStorePath string
|
||||
)
|
||||
|
||||
func init() {
|
||||
var cmd = &cobra.Command{
|
||||
Use: "populate-db",
|
||||
Short: "populate local database with blobs from a disk storage",
|
||||
Run: populateDbCmd,
|
||||
}
|
||||
cmd.Flags().StringVar(&diskStorePath, "store-path", "",
|
||||
"path of the store where all blobs are cached")
|
||||
rootCmd.AddCommand(cmd)
|
||||
}
|
||||
|
||||
func populateDbCmd(cmd *cobra.Command, args []string) {
|
||||
log.Printf("reflector %s", meta.VersionString())
|
||||
if diskStorePath == "" {
|
||||
log.Fatal("store-path must be defined")
|
||||
}
|
||||
localDb := new(db.SQL)
|
||||
localDb.SoftDelete = true
|
||||
localDb.TrackAccess = db.TrackAccessBlobs
|
||||
err := localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
blobs, err := speedwalk.AllFiles(diskStorePath, true)
|
||||
err = localDb.AddBlobs(blobs)
|
||||
if err != nil {
|
||||
log.Errorf("error while storing to db: %s", errors.FullTrace(err))
|
||||
}
|
||||
}
|
|
@ -179,18 +179,20 @@ func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStor
|
|||
}
|
||||
|
||||
localDb := new(db.SQL)
|
||||
localDb.SoftDelete = true
|
||||
localDb.TrackAccess = db.TrackAccessBlobs
|
||||
err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, true)
|
||||
wrapped = store.NewCachingStore(
|
||||
"reflector",
|
||||
wrapped,
|
||||
store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false),
|
||||
dbBackedDiskStore,
|
||||
)
|
||||
|
||||
go cleanOldestBlobs(int(realCacheSize), localDb, wrapped, cleanerStopper)
|
||||
go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper)
|
||||
}
|
||||
|
||||
diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache)
|
||||
|
@ -246,8 +248,7 @@ func diskCacheParams(diskParams string) (int, string) {
|
|||
}
|
||||
|
||||
func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) {
|
||||
const cleanupInterval = 10 * time.Second
|
||||
|
||||
const cleanupInterval = 10 * time.Minute
|
||||
for {
|
||||
select {
|
||||
case <-stopper.Ch():
|
||||
|
@ -288,4 +289,5 @@ func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Grou
|
|||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
46
db/db.go
46
db/db.go
|
@ -3,11 +3,13 @@ package db
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/dht/bits"
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
qt "github.com/lbryio/lbry.go/v2/extras/query"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
_ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used
|
||||
|
@ -88,6 +90,27 @@ func (s *SQL) AddBlob(hash string, length int, isStored bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// AddBlob adds a blob to the database.
|
||||
func (s *SQL) AddBlobs(hash []string) error {
|
||||
if s.conn == nil {
|
||||
return errors.Err("not connected")
|
||||
}
|
||||
// Split the slice into batches of 20 items.
|
||||
batch := 10000
|
||||
|
||||
for i := 0; i < len(hash); i += batch {
|
||||
j := i + batch
|
||||
if j > len(hash) {
|
||||
j = len(hash)
|
||||
}
|
||||
err := s.insertBlobs(hash[i:j]) // Process the batch.
|
||||
if err != nil {
|
||||
log.Errorf("error while inserting batch: %s", errors.FullTrace(err))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) {
|
||||
if length <= 0 {
|
||||
return 0, errors.Err("length must be positive")
|
||||
|
@ -130,6 +153,26 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error)
|
|||
return blobID, nil
|
||||
}
|
||||
|
||||
func (s *SQL) insertBlobs(hashes []string) error {
|
||||
var (
|
||||
q string
|
||||
args []interface{}
|
||||
)
|
||||
dayAgo := time.Now().AddDate(0, 0, -1)
|
||||
q = "insert into blob_ (hash, is_stored, length, last_accessed_at) values "
|
||||
for _, hash := range hashes {
|
||||
q += "(?,?,?,?),"
|
||||
args = append(args, hash, true, stream.MaxBlobSize, dayAgo)
|
||||
}
|
||||
q = strings.TrimSuffix(q, ",")
|
||||
_, err := s.exec(q, args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) {
|
||||
var (
|
||||
q string
|
||||
|
@ -180,12 +223,13 @@ func (s *SQL) HasBlob(hash string) (bool, error) {
|
|||
// HasBlobs checks if the database contains the set of blobs and returns a bool map.
|
||||
func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) {
|
||||
exists, idsNeedingTouch, err := s.hasBlobs(hashes)
|
||||
|
||||
go func() {
|
||||
if s.TrackAccess == TrackAccessBlobs {
|
||||
s.touchBlobs(idsNeedingTouch)
|
||||
} else if s.TrackAccess == TrackAccessStreams {
|
||||
s.touchStreams(idsNeedingTouch)
|
||||
}
|
||||
}()
|
||||
|
||||
return exists, err
|
||||
}
|
||||
|
|
|
@ -44,18 +44,18 @@ func (d *DBBackedStore) Get(hash string) (stream.Blob, error) {
|
|||
if !has {
|
||||
return nil, ErrBlobNotFound
|
||||
}
|
||||
if d.deleteOnMiss {
|
||||
|
||||
b, err := d.blobs.Get(hash)
|
||||
if d.deleteOnMiss {
|
||||
if err != nil && errors.Is(err, ErrBlobNotFound) {
|
||||
e2 := d.Delete(hash)
|
||||
if e2 != nil {
|
||||
log.Errorf("error while deleting blob from db: %s", errors.FullTrace(err))
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
}
|
||||
|
||||
return d.blobs.Get(hash)
|
||||
return b, err
|
||||
}
|
||||
|
||||
// Put stores the blob in the S3 store and stores the blob information in the DB.
|
||||
|
|
|
@ -1,160 +0,0 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/stop"
|
||||
"github.com/lbryio/reflector.go/db"
|
||||
"github.com/lbryio/reflector.go/internal/metrics"
|
||||
"github.com/lbryio/reflector.go/lite_db"
|
||||
|
||||
"github.com/lbryio/lbry.go/v2/extras/errors"
|
||||
"github.com/lbryio/lbry.go/v2/stream"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store.
|
||||
type LiteDBBackedStore struct {
|
||||
blobs BlobStore
|
||||
db *lite_db.SQL
|
||||
maxItems int
|
||||
stopper *stop.Stopper
|
||||
component string
|
||||
}
|
||||
|
||||
// NewDBBackedStore returns an initialized store pointer.
|
||||
func NewLiteDBBackedStore(component string, blobs BlobStore, db *lite_db.SQL, maxItems int) *LiteDBBackedStore {
|
||||
instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component}
|
||||
go func() {
|
||||
instance.selfClean()
|
||||
}()
|
||||
return instance
|
||||
}
|
||||
|
||||
const nameLiteDBBacked = "lite-db-backed"
|
||||
|
||||
// Name is the cache type name
|
||||
func (d *LiteDBBackedStore) Name() string { return nameDBBacked }
|
||||
|
||||
// Has returns true if the blob is in the store
|
||||
func (d *LiteDBBackedStore) Has(hash string) (bool, error) {
|
||||
return d.db.HasBlob(hash)
|
||||
}
|
||||
|
||||
// Get gets the blob
|
||||
func (d *LiteDBBackedStore) Get(hash string) (stream.Blob, error) {
|
||||
has, err := d.db.HasBlob(hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !has {
|
||||
return nil, ErrBlobNotFound
|
||||
}
|
||||
b, err := d.blobs.Get(hash)
|
||||
if err != nil && errors.Is(err, ErrBlobNotFound) {
|
||||
e2 := d.db.Delete(hash)
|
||||
if e2 != nil {
|
||||
log.Errorf("error while deleting blob from db: %s", errors.FullTrace(e2))
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
return d.blobs.Get(hash)
|
||||
}
|
||||
|
||||
// Put stores the blob in the S3 store and stores the blob information in the DB.
|
||||
func (d *LiteDBBackedStore) Put(hash string, blob stream.Blob) error {
|
||||
err := d.blobs.Put(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.db.AddBlob(hash, len(blob))
|
||||
}
|
||||
|
||||
// PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if
|
||||
// there is an error storing the blob information in the DB.
|
||||
func (d *LiteDBBackedStore) PutSD(hash string, blob stream.Blob) error {
|
||||
var blobContents db.SdBlob
|
||||
err := json.Unmarshal(blob, &blobContents)
|
||||
if err != nil {
|
||||
return errors.Err(err)
|
||||
}
|
||||
if blobContents.StreamHash == "" {
|
||||
return errors.Err("sd blob is missing stream hash")
|
||||
}
|
||||
|
||||
err = d.blobs.PutSD(hash, blob)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.db.AddSDBlob(hash, len(blob))
|
||||
}
|
||||
|
||||
func (d *LiteDBBackedStore) Delete(hash string) error {
|
||||
err := d.blobs.Delete(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.db.Delete(hash)
|
||||
}
|
||||
|
||||
// list returns the hashes of blobs that already exist in the database
|
||||
func (d *LiteDBBackedStore) list() ([]string, error) {
|
||||
blobs, err := d.db.AllBlobs()
|
||||
return blobs, err
|
||||
}
|
||||
|
||||
func (d *LiteDBBackedStore) selfClean() {
|
||||
d.stopper.Add(1)
|
||||
defer d.stopper.Done()
|
||||
lastCleanup := time.Now()
|
||||
const cleanupInterval = 10 * time.Second
|
||||
for {
|
||||
select {
|
||||
case <-d.stopper.Ch():
|
||||
log.Infoln("stopping self cleanup")
|
||||
return
|
||||
default:
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
if time.Since(lastCleanup) < cleanupInterval {
|
||||
continue
|
||||
}
|
||||
blobsCount, err := d.db.BlobsCount()
|
||||
if err != nil {
|
||||
log.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
if blobsCount >= d.maxItems {
|
||||
itemsToDelete := blobsCount / 100 * 10
|
||||
blobs, err := d.db.GetLRUBlobs(itemsToDelete)
|
||||
if err != nil {
|
||||
log.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
for _, hash := range blobs {
|
||||
select {
|
||||
case <-d.stopper.Ch():
|
||||
return
|
||||
default:
|
||||
|
||||
}
|
||||
err = d.Delete(hash)
|
||||
if err != nil {
|
||||
log.Errorf(errors.FullTrace(err))
|
||||
}
|
||||
metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc()
|
||||
}
|
||||
}
|
||||
lastCleanup = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown shuts down the store gracefully
|
||||
func (d *LiteDBBackedStore) Shutdown() {
|
||||
d.stopper.StopAndWait()
|
||||
return
|
||||
}
|
Loading…
Reference in a new issue