diff --git a/cmd/reflector.go b/cmd/reflector.go index 3bb6b4e..ab9f53e 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -8,9 +8,10 @@ import ( "syscall" "time" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/lite_db" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer/http3" @@ -68,10 +69,11 @@ func init() { func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) + cleanerStopper := stop.New() // the blocklist logic requires the db backed store to be the outer-most store underlyingStore := setupStore() - outerStore := wrapWithCache(underlyingStore) + outerStore := wrapWithCache(underlyingStore, cleanerStopper) if !disableUploads { reflectorServer := reflector.NewServer(underlyingStore) @@ -102,13 +104,14 @@ func reflectorCmd(cmd *cobra.Command, args []string) { metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() defer metricsServer.Shutdown() - defer underlyingStore.Shutdown() defer outerStore.Shutdown() + defer underlyingStore.Shutdown() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan // deferred shutdowns happen now + cleanerStopper.StopAndWait() } func setupStore() store.BlobStore { @@ -149,20 +152,20 @@ func setupStore() store.BlobStore { } if useDB { - db := new(db.SQL) - db.TrackAccessTime = true - err := db.Connect(globalConfig.DBConn) + dbInst := new(db.SQL) + dbInst.TrackAccess = db.TrackAccessStreams + err := dbInst.Connect(globalConfig.DBConn) if err != nil { log.Fatal(err) } - s = store.NewDBBackedStore(s, db, false) + s = store.NewDBBackedStore(s, dbInst, false) } return s } -func wrapWithCache(s store.BlobStore) store.BlobStore { +func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore { wrapped := s diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache) @@ -174,17 +177,22 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { if err != nil { log.Fatal(err) } - localDb := new(lite_db.SQL) - localDb.TrackAccessTime = true + + localDb := new(db.SQL) + localDb.SoftDelete = true + localDb.TrackAccess = db.TrackAccessBlobs err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") if err != nil { log.Fatal(err) } + dbBackedDiskStore := store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false) wrapped = store.NewCachingStore( "reflector", wrapped, - store.NewLiteDBBackedStore("hdd", store.NewDiskStore(diskCachePath, 2), localDb, int(realCacheSize)), + store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false), ) + + go cleanOldestBlobs(int(realCacheSize), localDb, dbBackedDiskStore, cleanerStopper) } diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache) @@ -238,3 +246,49 @@ func diskCacheParams(diskParams string) (int, string) { } return int(maxSize), path } + +func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { + const cleanupInterval = 10 * time.Second + + for { + select { + case <-stopper.Ch(): + log.Infoln("stopping self cleanup") + return + case <-time.After(cleanupInterval): + err := doClean(maxItems, db, store, stopper) + if err != nil { + log.Error(errors.FullTrace(err)) + } + } + } +} + +func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error { + blobsCount, err := db.Count() + if err != nil { + return err + } + + if blobsCount >= maxItems { + itemsToDelete := blobsCount / 10 + blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete) + if err != nil { + return err + } + + for _, hash := range blobs { + select { + case <-stopper.Ch(): + return nil + default: + } + + err = store.Delete(hash) + if err != nil { + return err + } + } + } + return nil +} diff --git a/db/db.go b/db/db.go index c3bea24..40b5c7d 100644 --- a/db/db.go +++ b/db/db.go @@ -30,11 +30,26 @@ type SdBlob struct { StreamHash string `json:"stream_hash"` } +type trackAccess int + +const ( + // Don't track accesses + TrackAccessNone trackAccess = iota + // Track accesses at the stream level + TrackAccessStreams + // Track accesses at the blob level + TrackAccessBlobs +) + // SQL implements the DB interface type SQL struct { conn *sql.DB - TrackAccessTime bool + // Track the approx last time a blob or stream was accessed + TrackAccess trackAccess + + // Instead of deleting a blob, marked it as not stored in the db + SoftDelete bool } func logQuery(query string, args ...interface{}) { @@ -78,11 +93,19 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) return 0, errors.Err("length must be positive") } - args := []interface{}{hash, isStored, length} - blobID, err := s.exec( - "INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", - args..., + var ( + q string + args []interface{} ) + if s.TrackAccess == TrackAccessBlobs { + args = []interface{}{hash, isStored, length, time.Now()} + q = "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at = VALUES(last_accessed_at)" + } else { + args = []interface{}{hash, isStored, length} + q = "INSERT INTO blob_ (hash, is_stored, length) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))" + } + + blobID, err := s.exec(q, args...) if err != nil { return 0, err } @@ -95,17 +118,33 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) if blobID == 0 { return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing") } + + if s.TrackAccess == TrackAccessBlobs { + err := s.touchBlobs([]uint64{uint64(blobID)}) + if err != nil { + return 0, errors.Err(err) + } + } } return blobID, nil } func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { - args := []interface{}{hash, sdBlobID, time.Now()} - streamID, err := s.exec( - "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")", - args..., + var ( + q string + args []interface{} ) + + if s.TrackAccess == TrackAccessStreams { + args = []interface{}{hash, sdBlobID, time.Now()} + q = "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ")" + } else { + args = []interface{}{hash, sdBlobID} + q = "INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (" + qt.Qs(len(args)) + ")" + } + + streamID, err := s.exec(q, args...) if err != nil { return 0, errors.Err(err) } @@ -119,8 +158,8 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing") } - if s.TrackAccessTime { - err := s.touch([]uint64{uint64(streamID)}) + if s.TrackAccess == TrackAccessStreams { + err := s.touchStreams([]uint64{uint64(streamID)}) if err != nil { return 0, errors.Err(err) } @@ -140,12 +179,36 @@ func (s *SQL) HasBlob(hash string) (bool, error) { // HasBlobs checks if the database contains the set of blobs and returns a bool map. func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { - exists, streamsNeedingTouch, err := s.hasBlobs(hashes) - s.touch(streamsNeedingTouch) + exists, idsNeedingTouch, err := s.hasBlobs(hashes) + + if s.TrackAccess == TrackAccessBlobs { + s.touchBlobs(idsNeedingTouch) + } else if s.TrackAccess == TrackAccessStreams { + s.touchStreams(idsNeedingTouch) + } + return exists, err } -func (s *SQL) touch(streamIDs []uint64) error { +func (s *SQL) touchBlobs(blobIDs []uint64) error { + if len(blobIDs) == 0 { + return nil + } + + query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")" + args := make([]interface{}, len(blobIDs)+1) + args[0] = time.Now() + for i := range blobIDs { + args[i+1] = blobIDs[i] + } + + startTime := time.Now() + _, err := s.exec(query, args...) + log.Debugf("touched %d blobs and took %s", len(blobIDs), time.Since(startTime)) + return errors.Err(err) +} + +func (s *SQL) touchStreams(streamIDs []uint64) error { if len(streamIDs) == 0 { return nil } @@ -159,7 +222,7 @@ func (s *SQL) touch(streamIDs []uint64) error { startTime := time.Now() _, err := s.exec(query, args...) - log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime)) + log.Debugf("touched %d streams and took %s", len(streamIDs), time.Since(startTime)) return errors.Err(err) } @@ -170,7 +233,8 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { var ( hash string - streamID uint64 + blobID uint64 + streamID null.Uint64 lastAccessedAt null.Time ) @@ -189,17 +253,23 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) batch := hashes[doneIndex:sliceEnd] - // TODO: this query doesn't work for SD blobs, which are not in the stream_blob table + var lastAccessedAtSelect string + if s.TrackAccess == TrackAccessBlobs { + lastAccessedAtSelect = "b.last_accessed_at" + } else if s.TrackAccess == TrackAccessStreams { + lastAccessedAtSelect = "s.last_accessed_at" + } else { + lastAccessedAtSelect = "NULL" + } - query := `SELECT b.hash, s.id, s.last_accessed_at + query := `SELECT b.hash, b.id, s.id, ` + lastAccessedAtSelect + ` FROM blob_ b LEFT JOIN stream_blob sb ON b.id = sb.blob_id -INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id) -WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` - args := make([]interface{}, len(batch)+1) - args[0] = true +LEFT JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id) +WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)` + args := make([]interface{}, len(batch)) for i := range batch { - args[i+1] = batch[i] + args[i] = batch[i] } logQuery(query, args...) @@ -214,13 +284,17 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` defer closeRows(rows) for rows.Next() { - err := rows.Scan(&hash, &streamID, &lastAccessedAt) + err := rows.Scan(&hash, &blobID, &streamID, &lastAccessedAt) if err != nil { return errors.Err(err) } exists[hash] = true - if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) { - needsTouch = append(needsTouch, streamID) + if !lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline) { + if s.TrackAccess == TrackAccessBlobs { + needsTouch = append(needsTouch, blobID) + } else if s.TrackAccess == TrackAccessStreams && !streamID.IsZero() { + needsTouch = append(needsTouch, streamID.Uint64) + } } } @@ -240,8 +314,14 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` return exists, needsTouch, nil } -// Delete will remove the blob from the db +// Delete will remove (or soft-delete) the blob from the db +// NOTE: If SoftDelete is enabled, streams will never be deleted func (s *SQL) Delete(hash string) error { + if s.SoftDelete { + _, err := s.exec("UPDATE blob_ SET is_stored = 0 WHERE hash = ?", hash) + return errors.Err(err) + } + _, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash) if err != nil { return errors.Err(err) @@ -251,6 +331,88 @@ func (s *SQL) Delete(hash string) error { return errors.Err(err) } +// GetHashRange gets the smallest and biggest hashes in the db +func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) { + if s.conn == nil { + return nil, errors.Err("not connected") + } + + if s.TrackAccess != TrackAccessBlobs { + return nil, errors.Err("blob access tracking is disabled") + } + + query := "SELECT hash from blob_ where is_stored = 1 order by last_accessed_at limit ?" + logQuery(query, maxBlobs) + + rows, err := s.conn.Query(query, maxBlobs) + if err != nil { + return nil, errors.Err(err) + } + defer closeRows(rows) + + blobs := make([]string, 0, maxBlobs) + for rows.Next() { + var hash string + err := rows.Scan(&hash) + if err != nil { + return nil, errors.Err(err) + } + blobs = append(blobs, hash) + } + + return blobs, nil +} + +// AllHashes writes all hashes from the db into the channel. +// It does not close the channel when it finishes. +//func (s *SQL) AllHashes(ch chan<- string) error { +// if s.conn == nil { +// return errors.Err("not connected") +// } +// +// query := "SELECT hash from blob_" +// if s.SoftDelete { +// query += " where is_stored = 1" +// } +// logQuery(query) +// +// rows, err := s.conn.Query(query) +// if err != nil { +// return errors.Err(err) +// } +// defer closeRows(rows) +// +// for rows.Next() { +// var hash string +// err := rows.Scan(&hash) +// if err != nil { +// return errors.Err(err) +// } +// ch <- hash +// // TODO: this needs testing +// // TODO: need a way to cancel this early (e.g. in case of shutdown) +// } +// +// close(ch) +// return nil +//} + +func (s *SQL) Count() (int, error) { + if s.conn == nil { + return 0, errors.Err("not connected") + } + + query := "SELECT count(id) from blob_" + if s.SoftDelete { + query += " where is_stored = 1" + } + logQuery(query) + + var count int + err := s.conn.QueryRow(query).Scan(&count) + return count, errors.Err(err) +} + // Block will mark a blob as blocked func (s *SQL) Block(hash string) error { query := "INSERT IGNORE INTO blocked SET hash = ?" @@ -528,8 +690,10 @@ CREATE TABLE blob_ ( hash char(96) NOT NULL, is_stored TINYINT(1) NOT NULL DEFAULT 0, length bigint(20) unsigned DEFAULT NULL, + last_accessed_at TIMESTAMP NULL DEFAULT NULL, PRIMARY KEY (id), - UNIQUE KEY blob_hash_idx (hash) + UNIQUE KEY blob_hash_idx (hash), + KEY `blob_last_accessed_idx` (`last_accessed_at`) ); CREATE TABLE stream ( @@ -560,3 +724,47 @@ CREATE TABLE blocked ( ); */ + +//func (d *LiteDBBackedStore) selfClean() { +// d.stopper.Add(1) +// defer d.stopper.Done() +// lastCleanup := time.Now() +// const cleanupInterval = 10 * time.Second +// for { +// select { +// case <-d.stopper.Ch(): +// log.Infoln("stopping self cleanup") +// return +// default: +// time.Sleep(1 * time.Second) +// } +// if time.Since(lastCleanup) < cleanupInterval { +// continue +// +// blobsCount, err := d.db.BlobsCount() +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// if blobsCount >= d.maxItems { +// itemsToDelete := blobsCount / 100 * 10 +// blobs, err := d.db.GetLRUBlobs(itemsToDelete) +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// for _, hash := range blobs { +// select { +// case <-d.stopper.Ch(): +// return +// default: +// +// } +// err = d.Delete(hash) +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc() +// } +// } +// lastCleanup = time.Now() +// } +//} diff --git a/go.mod b/go.mod index d3384a1..4570f1d 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/go-sql-driver/mysql v1.4.1 github.com/golang/protobuf v1.4.2 - github.com/google/btree v1.0.0 // indirect - github.com/google/gops v0.3.7 // indirect + github.com/google/gops v0.3.7 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/golang-lru v0.5.4 @@ -33,7 +32,6 @@ require ( github.com/spf13/afero v1.4.1 // indirect github.com/spf13/cast v1.3.0 github.com/spf13/cobra v0.0.3 - github.com/spf13/pflag v1.0.3 // indirect github.com/spf13/viper v1.7.1 // indirect github.com/stretchr/testify v1.4.0 github.com/volatiletech/null v8.0.0+incompatible diff --git a/lite_db/db.go b/lite_db/db.go deleted file mode 100644 index 6be9837..0000000 --- a/lite_db/db.go +++ /dev/null @@ -1,348 +0,0 @@ -package lite_db - -import ( - "database/sql" - "time" - - "github.com/lbryio/lbry.go/v2/extras/errors" - qt "github.com/lbryio/lbry.go/v2/extras/query" - - "github.com/go-sql-driver/mysql" - _ "github.com/go-sql-driver/mysql" // blank import for db driver ensures its imported even if its not used - log "github.com/sirupsen/logrus" - "github.com/volatiletech/null" -) - -// SdBlob is a special blob that contains information on the rest of the blobs in the stream -type SdBlob struct { - StreamName string `json:"stream_name"` - Blobs []struct { - Length int `json:"length"` - BlobNum int `json:"blob_num"` - BlobHash string `json:"blob_hash,omitempty"` - IV string `json:"iv"` - } `json:"blobs"` - StreamType string `json:"stream_type"` - Key string `json:"key"` - SuggestedFileName string `json:"suggested_file_name"` - StreamHash string `json:"stream_hash"` -} - -// SQL implements the DB interface -type SQL struct { - conn *sql.DB - - TrackAccessTime bool -} - -func logQuery(query string, args ...interface{}) { - s, err := qt.InterpolateParams(query, args...) - if err != nil { - log.Errorln(err) - } else { - log.Debugln(s) - } -} - -// Connect will create a connection to the database -func (s *SQL) Connect(dsn string) error { - var err error - // interpolateParams is necessary. otherwise uploading a stream with thousands of blobs - // will hit MySQL's max_prepared_stmt_count limit because the prepared statements are all - // opened inside a transaction. closing them manually doesn't seem to help - dsn += "?parseTime=1&collation=utf8mb4_unicode_ci&interpolateParams=1" - s.conn, err = sql.Open("mysql", dsn) - if err != nil { - return errors.Err(err) - } - - s.conn.SetMaxIdleConns(12) - - return errors.Err(s.conn.Ping()) -} - -// AddBlob adds a blob to the database. -func (s *SQL) AddBlob(hash string, length int) error { - if s.conn == nil { - return errors.Err("not connected") - } - - _, err := s.insertBlob(hash, length) - return err -} - -func (s *SQL) insertBlob(hash string, length int) (int64, error) { - if length <= 0 { - return 0, errors.Err("length must be positive") - } - const isStored = true - now := time.Now() - args := []interface{}{hash, isStored, length, now} - blobID, err := s.exec( - "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at=VALUES(last_accessed_at)", - args..., - ) - if err != nil { - return 0, err - } - - if blobID == 0 { - err = s.conn.QueryRow("SELECT id FROM blob_ WHERE hash = ?", hash).Scan(&blobID) - if err != nil { - return 0, errors.Err(err) - } - if blobID == 0 { - return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing") - } - } - - return blobID, nil -} - -// HasBlob checks if the database contains the blob information. -func (s *SQL) HasBlob(hash string) (bool, error) { - exists, err := s.HasBlobs([]string{hash}) - if err != nil { - return false, err - } - return exists[hash], nil -} - -// HasBlobs checks if the database contains the set of blobs and returns a bool map. -func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { - exists, streamsNeedingTouch, err := s.hasBlobs(hashes) - s.touch(streamsNeedingTouch) - return exists, err -} - -func (s *SQL) touch(blobIDs []uint64) error { - if len(blobIDs) == 0 { - return nil - } - - query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")" - args := make([]interface{}, len(blobIDs)+1) - args[0] = time.Now() - for i := range blobIDs { - args[i+1] = blobIDs[i] - } - - startTime := time.Now() - _, err := s.exec(query, args...) - log.Debugf("blobs access query touched %d blobs and took %s", len(blobIDs), time.Since(startTime)) - return errors.Err(err) -} - -func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { - if s.conn == nil { - return nil, nil, errors.Err("not connected") - } - - var ( - hash string - blobID uint64 - lastAccessedAt null.Time - ) - - var needsTouch []uint64 - exists := make(map[string]bool) - - touchDeadline := time.Now().AddDate(0, 0, -1) // touch blob if last accessed before this time - maxBatchSize := 10000 - doneIndex := 0 - - for len(hashes) > doneIndex { - sliceEnd := doneIndex + maxBatchSize - if sliceEnd > len(hashes) { - sliceEnd = len(hashes) - } - log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) - batch := hashes[doneIndex:sliceEnd] - - // TODO: this query doesn't work for SD blobs, which are not in the stream_blob table - - query := `SELECT hash, id, last_accessed_at -FROM blob_ -WHERE is_stored = ? and hash IN (` + qt.Qs(len(batch)) + `)` - args := make([]interface{}, len(batch)+1) - args[0] = true - for i := range batch { - args[i+1] = batch[i] - } - - logQuery(query, args...) - - err := func() error { - startTime := time.Now() - rows, err := s.conn.Query(query, args...) - log.Debugf("hashes query took %s", time.Since(startTime)) - if err != nil { - return errors.Err(err) - } - defer closeRows(rows) - - for rows.Next() { - err := rows.Scan(&hash, &blobID, &lastAccessedAt) - if err != nil { - return errors.Err(err) - } - exists[hash] = true - if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) { - needsTouch = append(needsTouch, blobID) - } - } - - err = rows.Err() - if err != nil { - return errors.Err(err) - } - - doneIndex += len(batch) - return nil - }() - if err != nil { - return nil, nil, err - } - } - - return exists, needsTouch, nil -} - -// Delete will remove the blob from the db -func (s *SQL) Delete(hash string) error { - _, err := s.exec("UPDATE blob_ set is_stored = ? WHERE hash = ?", 0, hash) - return errors.Err(err) -} - -// AddSDBlob insert the SD blob and all the content blobs. The content blobs are marked as "not stored", -// but they are tracked so reflector knows what it is missing. -func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int) error { - if s.conn == nil { - return errors.Err("not connected") - } - - _, err := s.insertBlob(sdHash, sdBlobLength) - return err -} - -// GetHashRange gets the smallest and biggest hashes in the db -func (s *SQL) GetLRUBlobs(maxBlobs int) ([]string, error) { - if s.conn == nil { - return nil, errors.Err("not connected") - } - - query := "SELECT hash from blob_ where is_stored = ? order by last_accessed_at limit ?" - const isStored = true - logQuery(query, isStored, maxBlobs) - rows, err := s.conn.Query(query, isStored, maxBlobs) - if err != nil { - return nil, errors.Err(err) - } - defer closeRows(rows) - blobs := make([]string, 0, maxBlobs) - for rows.Next() { - var hash string - err := rows.Scan(&hash) - if err != nil { - return nil, errors.Err(err) - } - blobs = append(blobs, hash) - } - return blobs, nil -} - -func (s *SQL) AllBlobs() ([]string, error) { - if s.conn == nil { - return nil, errors.Err("not connected") - } - - query := "SELECT hash from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense? - const isStored = true - logQuery(query, isStored) - rows, err := s.conn.Query(query, isStored) - if err != nil { - return nil, errors.Err(err) - } - defer closeRows(rows) - totalBlobs, err := s.BlobsCount() - if err != nil { - return nil, err - } - blobs := make([]string, 0, totalBlobs) - for rows.Next() { - var hash string - err := rows.Scan(&hash) - if err != nil { - return nil, errors.Err(err) - } - blobs = append(blobs, hash) - } - return blobs, nil -} - -func (s *SQL) BlobsCount() (int, error) { - if s.conn == nil { - return 0, errors.Err("not connected") - } - - query := "SELECT count(id) from blob_ where is_stored = ?" //TODO: maybe sorting them makes more sense? - const isStored = true - logQuery(query, isStored) - var count int - err := s.conn.QueryRow(query, isStored).Scan(&count) - return count, errors.Err(err) -} - -func closeRows(rows *sql.Rows) { - if rows != nil { - err := rows.Close() - if err != nil { - log.Error("error closing rows: ", err) - } - } -} - -func (s *SQL) exec(query string, args ...interface{}) (int64, error) { - logQuery(query, args...) - attempt, maxAttempts := 0, 3 -Retry: - attempt++ - result, err := s.conn.Exec(query, args...) - if isLockTimeoutError(err) { - if attempt <= maxAttempts { - //Error 1205: Lock wait timeout exceeded; try restarting transaction - goto Retry - } - err = errors.Prefix("Lock timeout for query "+query, err) - } - - if err != nil { - return 0, errors.Err(err) - } - - lastID, err := result.LastInsertId() - return lastID, errors.Err(err) -} - -func isLockTimeoutError(err error) bool { - e, ok := err.(*mysql.MySQLError) - return ok && e != nil && e.Number == 1205 -} - -/* SQL schema - -in prod make sure you use latin1 or utf8 charset, NOT utf8mb4. that's a waste of space. - -CREATE TABLE `blob_` ( - `id` bigint unsigned NOT NULL AUTO_INCREMENT, - `hash` char(96) NOT NULL, - `is_stored` tinyint(1) NOT NULL DEFAULT '0', - `length` bigint unsigned DEFAULT NULL, - `last_accessed_at` datetime DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (`id`), - UNIQUE KEY `id` (`id`), - UNIQUE KEY `blob_hash_idx` (`hash`), - KEY `blob_last_accessed_idx` (`last_accessed_at`) -) ENGINE=InnoDB DEFAULT CHARSET=latin1 - -*/ diff --git a/store/caching_test.go b/store/caching_test.go index 1168a92..0636583 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -168,3 +168,7 @@ func (s *SlowBlobStore) Delete(hash string) error { time.Sleep(s.delay) return s.mem.Delete(hash) } + +func (s *SlowBlobStore) Shutdown() { + return +} diff --git a/store/dbbacked.go b/store/dbbacked.go index 0ebaaed..1c5088a 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -4,10 +4,9 @@ import ( "encoding/json" "sync" - "github.com/lbryio/reflector.go/db" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/db" log "github.com/sirupsen/logrus" ) diff --git a/store/litedbbacked.go b/store/litedbbacked.go index 93790d6..2e2d726 100644 --- a/store/litedbbacked.go +++ b/store/litedbbacked.go @@ -2,12 +2,9 @@ package store import ( "encoding/json" - "time" "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/reflector.go/db" - "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/lite_db" "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" @@ -18,18 +15,15 @@ import ( // DBBackedStore is a store that's backed by a DB. The DB contains data about what's in the store. type LiteDBBackedStore struct { blobs BlobStore - db *lite_db.SQL + db *db.SQL maxItems int stopper *stop.Stopper component string } // NewDBBackedStore returns an initialized store pointer. -func NewLiteDBBackedStore(component string, blobs BlobStore, db *lite_db.SQL, maxItems int) *LiteDBBackedStore { +func NewLiteDBBackedStore(component string, blobs BlobStore, db *db.SQL, maxItems int) *LiteDBBackedStore { instance := &LiteDBBackedStore{blobs: blobs, db: db, maxItems: maxItems, stopper: stop.New(), component: component} - go func() { - instance.selfClean() - }() return instance } @@ -71,7 +65,7 @@ func (d *LiteDBBackedStore) Put(hash string, blob stream.Blob) error { return err } - return d.db.AddBlob(hash, len(blob)) + return d.db.AddBlob(hash, len(blob), true) } // PutSD stores the SDBlob in the S3 store. It will return an error if the sd blob is missing the stream hash or if @@ -91,7 +85,7 @@ func (d *LiteDBBackedStore) PutSD(hash string, blob stream.Blob) error { return err } - return d.db.AddSDBlob(hash, len(blob)) + return d.db.AddSDBlob(hash, len(blob), blobContents) } func (d *LiteDBBackedStore) Delete(hash string) error { @@ -105,53 +99,54 @@ func (d *LiteDBBackedStore) Delete(hash string) error { // list returns the hashes of blobs that already exist in the database func (d *LiteDBBackedStore) list() ([]string, error) { - blobs, err := d.db.AllBlobs() - return blobs, err + //blobs, err := d.db.AllBlobs() + //return blobs, err + return nil, nil } -func (d *LiteDBBackedStore) selfClean() { - d.stopper.Add(1) - defer d.stopper.Done() - lastCleanup := time.Now() - const cleanupInterval = 10 * time.Second - for { - select { - case <-d.stopper.Ch(): - log.Infoln("stopping self cleanup") - return - default: - time.Sleep(1 * time.Second) - } - if time.Since(lastCleanup) < cleanupInterval { - continue - } - blobsCount, err := d.db.BlobsCount() - if err != nil { - log.Errorf(errors.FullTrace(err)) - } - if blobsCount >= d.maxItems { - itemsToDelete := blobsCount / 100 * 10 - blobs, err := d.db.GetLRUBlobs(itemsToDelete) - if err != nil { - log.Errorf(errors.FullTrace(err)) - } - for _, hash := range blobs { - select { - case <-d.stopper.Ch(): - return - default: - - } - err = d.Delete(hash) - if err != nil { - log.Errorf(errors.FullTrace(err)) - } - metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc() - } - } - lastCleanup = time.Now() - } -} +//func (d *LiteDBBackedStore) selfClean() { +// d.stopper.Add(1) +// defer d.stopper.Done() +// lastCleanup := time.Now() +// const cleanupInterval = 10 * time.Second +// for { +// select { +// case <-d.stopper.Ch(): +// log.Infoln("stopping self cleanup") +// return +// default: +// time.Sleep(1 * time.Second) +// } +// if time.Since(lastCleanup) < cleanupInterval { +// continue +// } +// blobsCount, err := d.db.BlobsCount() +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// if blobsCount >= d.maxItems { +// itemsToDelete := blobsCount / 100 * 10 +// blobs, err := d.db.GetLRUBlobs(itemsToDelete) +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// for _, hash := range blobs { +// select { +// case <-d.stopper.Ch(): +// return +// default: +// +// } +// err = d.Delete(hash) +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc() +// } +// } +// lastCleanup = time.Now() +// } +//} // Shutdown shuts down the store gracefully func (d *LiteDBBackedStore) Shutdown() {