diff --git a/cmd/reflector.go b/cmd/reflector.go index 3bb6b4e..47a27ff 100644 --- a/cmd/reflector.go +++ b/cmd/reflector.go @@ -8,9 +8,10 @@ import ( "syscall" "time" + "github.com/lbryio/lbry.go/v2/extras/errors" + "github.com/lbryio/lbry.go/v2/extras/stop" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/internal/metrics" - "github.com/lbryio/reflector.go/lite_db" "github.com/lbryio/reflector.go/meta" "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/peer/http3" @@ -68,10 +69,11 @@ func init() { func reflectorCmd(cmd *cobra.Command, args []string) { log.Printf("reflector %s", meta.VersionString()) + cleanerStopper := stop.New() // the blocklist logic requires the db backed store to be the outer-most store underlyingStore := setupStore() - outerStore := wrapWithCache(underlyingStore) + outerStore := wrapWithCache(underlyingStore, cleanerStopper) if !disableUploads { reflectorServer := reflector.NewServer(underlyingStore) @@ -102,13 +104,14 @@ func reflectorCmd(cmd *cobra.Command, args []string) { metricsServer := metrics.NewServer(":"+strconv.Itoa(metricsPort), "/metrics") metricsServer.Start() defer metricsServer.Shutdown() - defer underlyingStore.Shutdown() defer outerStore.Shutdown() + defer underlyingStore.Shutdown() interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM) <-interruptChan // deferred shutdowns happen now + cleanerStopper.StopAndWait() } func setupStore() store.BlobStore { @@ -149,20 +152,20 @@ func setupStore() store.BlobStore { } if useDB { - db := new(db.SQL) - db.TrackAccessTime = true - err := db.Connect(globalConfig.DBConn) + dbInst := new(db.SQL) + dbInst.TrackAccess = db.TrackAccessStreams + err := dbInst.Connect(globalConfig.DBConn) if err != nil { log.Fatal(err) } - s = store.NewDBBackedStore(s, db, false) + s = store.NewDBBackedStore(s, dbInst, false) } return s } -func wrapWithCache(s store.BlobStore) store.BlobStore { +func wrapWithCache(s store.BlobStore, cleanerStopper *stop.Group) store.BlobStore { wrapped := s diskCacheMaxSize, diskCachePath := diskCacheParams(reflectorCmdDiskCache) @@ -174,8 +177,9 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { if err != nil { log.Fatal(err) } - localDb := new(lite_db.SQL) - localDb.TrackAccessTime = true + + localDb := new(db.SQL) + localDb.TrackAccess = db.TrackAccessBlobs err = localDb.Connect("reflector:reflector@tcp(localhost:3306)/reflector") if err != nil { log.Fatal(err) @@ -183,8 +187,10 @@ func wrapWithCache(s store.BlobStore) store.BlobStore { wrapped = store.NewCachingStore( "reflector", wrapped, - store.NewLiteDBBackedStore("hdd", store.NewDiskStore(diskCachePath, 2), localDb, int(realCacheSize)), + store.NewDBBackedStore(store.NewDiskStore(diskCachePath, 2), localDb, false), ) + + go cleanOldestBlobs(int(realCacheSize), localDb, wrapped, cleanerStopper) } diskCacheMaxSize, diskCachePath = diskCacheParams(bufferReflectorCmdDiskCache) @@ -238,3 +244,48 @@ func diskCacheParams(diskParams string) (int, string) { } return int(maxSize), path } + +func cleanOldestBlobs(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) { + const cleanupInterval = 10 * time.Second + + for { + select { + case <-stopper.Ch(): + log.Infoln("stopping self cleanup") + return + case <-time.After(cleanupInterval): + err := doClean(maxItems, db, store, stopper) + if err != nil { + log.Error(errors.FullTrace(err)) + } + } + } +} + +func doClean(maxItems int, db *db.SQL, store store.BlobStore, stopper *stop.Group) error { + blobsCount, err := db.Count() + if err != nil { + return err + } + + if blobsCount >= maxItems { + itemsToDelete := blobsCount / 10 + blobs, err := db.LeastRecentlyAccessedHashes(itemsToDelete) + if err != nil { + return err + } + + for _, hash := range blobs { + select { + case <-stopper.Ch(): + return nil + default: + } + + err = store.Delete(hash) + if err != nil { + return err + } + } + } +} diff --git a/db/db.go b/db/db.go index c3bea24..3ac2dab 100644 --- a/db/db.go +++ b/db/db.go @@ -30,11 +30,26 @@ type SdBlob struct { StreamHash string `json:"stream_hash"` } +type trackAccess int + +const ( + // Don't track accesses + TrackAccessNone trackAccess = iota + // Track accesses at the stream level + TrackAccessStreams + // Track accesses at the blob level + TrackAccessBlobs +) + // SQL implements the DB interface type SQL struct { conn *sql.DB - TrackAccessTime bool + // Track the approx last time a blob or stream was accessed + TrackAccess trackAccess + + // Instead of deleting a blob, marked it as not stored in the db + SoftDelete bool } func logQuery(query string, args ...interface{}) { @@ -78,11 +93,19 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) return 0, errors.Err("length must be positive") } - args := []interface{}{hash, isStored, length} - blobID, err := s.exec( - "INSERT INTO blob_ (hash, is_stored, length) VALUES ("+qt.Qs(len(args))+") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", - args..., + var ( + q string + args []interface{} ) + if s.TrackAccess == TrackAccessBlobs { + args = []interface{}{hash, isStored, length, time.Now()} + q = "INSERT INTO blob_ (hash, is_stored, length, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored)), last_accessed_at = VALUES(last_accessed_at)" + } else { + args = []interface{}{hash, isStored, length} + q = "INSERT INTO blob_ (hash, is_stored, length) VALUES (" + qt.Qs(len(args)) + ") ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))" + } + + blobID, err := s.exec(q, args...) if err != nil { return 0, err } @@ -95,17 +118,33 @@ func (s *SQL) insertBlob(hash string, length int, isStored bool) (int64, error) if blobID == 0 { return 0, errors.Err("blob ID is 0 even after INSERTing and SELECTing") } + + if s.TrackAccess == TrackAccessBlobs { + err := s.touchBlobs([]uint64{uint64(blobID)}) + if err != nil { + return 0, errors.Err(err) + } + } } return blobID, nil } func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { - args := []interface{}{hash, sdBlobID, time.Now()} - streamID, err := s.exec( - "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES ("+qt.Qs(len(args))+")", - args..., + var ( + q string + args []interface{} ) + + if s.TrackAccess == TrackAccessStreams { + args = []interface{}{hash, sdBlobID, time.Now()} + q = "INSERT IGNORE INTO stream (hash, sd_blob_id, last_accessed_at) VALUES (" + qt.Qs(len(args)) + ")" + } else { + args = []interface{}{hash, sdBlobID} + q = "INSERT IGNORE INTO stream (hash, sd_blob_id) VALUES (" + qt.Qs(len(args)) + ")" + } + + streamID, err := s.exec(q, args...) if err != nil { return 0, errors.Err(err) } @@ -119,8 +158,8 @@ func (s *SQL) insertStream(hash string, sdBlobID int64) (int64, error) { return 0, errors.Err("stream ID is 0 even after INSERTing and SELECTing") } - if s.TrackAccessTime { - err := s.touch([]uint64{uint64(streamID)}) + if s.TrackAccess == TrackAccessStreams { + err := s.touchStreams([]uint64{uint64(streamID)}) if err != nil { return 0, errors.Err(err) } @@ -140,12 +179,36 @@ func (s *SQL) HasBlob(hash string) (bool, error) { // HasBlobs checks if the database contains the set of blobs and returns a bool map. func (s *SQL) HasBlobs(hashes []string) (map[string]bool, error) { - exists, streamsNeedingTouch, err := s.hasBlobs(hashes) - s.touch(streamsNeedingTouch) + exists, idsNeedingTouch, err := s.hasBlobs(hashes) + + if s.TrackAccess == TrackAccessBlobs { + s.touchBlobs(idsNeedingTouch) + } else if s.TrackAccess == TrackAccessStreams { + s.touchStreams(idsNeedingTouch) + } + return exists, err } -func (s *SQL) touch(streamIDs []uint64) error { +func (s *SQL) touchBlobs(blobIDs []uint64) error { + if len(blobIDs) == 0 { + return nil + } + + query := "UPDATE blob_ SET last_accessed_at = ? WHERE id IN (" + qt.Qs(len(blobIDs)) + ")" + args := make([]interface{}, len(blobIDs)+1) + args[0] = time.Now() + for i := range blobIDs { + args[i+1] = blobIDs[i] + } + + startTime := time.Now() + _, err := s.exec(query, args...) + log.Debugf("touched %d blobs and took %s", len(blobIDs), time.Since(startTime)) + return errors.Err(err) +} + +func (s *SQL) touchStreams(streamIDs []uint64) error { if len(streamIDs) == 0 { return nil } @@ -159,7 +222,7 @@ func (s *SQL) touch(streamIDs []uint64) error { startTime := time.Now() _, err := s.exec(query, args...) - log.Debugf("stream access query touched %d streams and took %s", len(streamIDs), time.Since(startTime)) + log.Debugf("touched %d streams and took %s", len(streamIDs), time.Since(startTime)) return errors.Err(err) } @@ -169,9 +232,9 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { } var ( - hash string - streamID uint64 - lastAccessedAt null.Time + hash string + blobID, streamID uint64 + lastAccessedAt null.Time ) var needsTouch []uint64 @@ -189,17 +252,23 @@ func (s *SQL) hasBlobs(hashes []string) (map[string]bool, []uint64, error) { log.Debugf("getting hashes[%d:%d] of %d", doneIndex, sliceEnd, len(hashes)) batch := hashes[doneIndex:sliceEnd] - // TODO: this query doesn't work for SD blobs, which are not in the stream_blob table + var lastAccessedAtSelect string + if s.TrackAccess == TrackAccessBlobs { + lastAccessedAtSelect = "b.last_accessed_at" + } else if s.TrackAccess == TrackAccessStreams { + lastAccessedAtSelect = "s.last_accessed_at" + } else { + lastAccessedAtSelect = "NULL" + } - query := `SELECT b.hash, s.id, s.last_accessed_at + query := `SELECT b.hash, b.id, s.id, ` + lastAccessedAtSelect + ` FROM blob_ b LEFT JOIN stream_blob sb ON b.id = sb.blob_id -INNER JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id) -WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` - args := make([]interface{}, len(batch)+1) - args[0] = true +LEFT JOIN stream s on (sb.stream_id = s.id or s.sd_blob_id = b.id) +WHERE b.is_stored = 1 and b.hash IN (` + qt.Qs(len(batch)) + `)` + args := make([]interface{}, len(batch)) for i := range batch { - args[i+1] = batch[i] + args[i] = batch[i] } logQuery(query, args...) @@ -214,13 +283,17 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` defer closeRows(rows) for rows.Next() { - err := rows.Scan(&hash, &streamID, &lastAccessedAt) + err := rows.Scan(&hash, &blobID, &streamID, &lastAccessedAt) if err != nil { return errors.Err(err) } exists[hash] = true - if s.TrackAccessTime && (!lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline)) { - needsTouch = append(needsTouch, streamID) + if !lastAccessedAt.Valid || lastAccessedAt.Time.Before(touchDeadline) { + if s.TrackAccess == TrackAccessBlobs { + needsTouch = append(needsTouch, blobID) + } else if s.TrackAccess == TrackAccessStreams { + needsTouch = append(needsTouch, streamID) + } } } @@ -240,8 +313,14 @@ WHERE b.is_stored = ? and b.hash IN (` + qt.Qs(len(batch)) + `)` return exists, needsTouch, nil } -// Delete will remove the blob from the db +// Delete will remove (or soft-delete) the blob from the db +// NOTE: If SoftDelete is enabled, streams will never be deleted func (s *SQL) Delete(hash string) error { + if s.SoftDelete { + _, err := s.exec("UPDATE blob_ SET is_stored = 0 WHERE hash = ?", hash) + return errors.Err(err) + } + _, err := s.exec("DELETE FROM stream WHERE sd_blob_id = (SELECT id FROM blob_ WHERE hash = ?)", hash) if err != nil { return errors.Err(err) @@ -251,6 +330,88 @@ func (s *SQL) Delete(hash string) error { return errors.Err(err) } +// GetHashRange gets the smallest and biggest hashes in the db +func (s *SQL) LeastRecentlyAccessedHashes(maxBlobs int) ([]string, error) { + if s.conn == nil { + return nil, errors.Err("not connected") + } + + if s.TrackAccess != TrackAccessBlobs { + return nil, errors.Err("blob access tracking is disabled") + } + + query := "SELECT hash from blob_ where is_stored = 1 order by last_accessed_at limit ?" + logQuery(query, maxBlobs) + + rows, err := s.conn.Query(query, maxBlobs) + if err != nil { + return nil, errors.Err(err) + } + defer closeRows(rows) + + blobs := make([]string, 0, maxBlobs) + for rows.Next() { + var hash string + err := rows.Scan(&hash) + if err != nil { + return nil, errors.Err(err) + } + blobs = append(blobs, hash) + } + + return blobs, nil +} + +// AllHashes writes all hashes from the db into the channel. +// It does not close the channel when it finishes. +//func (s *SQL) AllHashes(ch chan<- string) error { +// if s.conn == nil { +// return errors.Err("not connected") +// } +// +// query := "SELECT hash from blob_" +// if s.SoftDelete { +// query += " where is_stored = 1" +// } +// logQuery(query) +// +// rows, err := s.conn.Query(query) +// if err != nil { +// return errors.Err(err) +// } +// defer closeRows(rows) +// +// for rows.Next() { +// var hash string +// err := rows.Scan(&hash) +// if err != nil { +// return errors.Err(err) +// } +// ch <- hash +// // TODO: this needs testing +// // TODO: need a way to cancel this early (e.g. in case of shutdown) +// } +// +// close(ch) +// return nil +//} + +func (s *SQL) Count() (int, error) { + if s.conn == nil { + return 0, errors.Err("not connected") + } + + query := "SELECT count(id) from blob_" + if s.SoftDelete { + query += " where is_stored = 1" + } + logQuery(query) + + var count int + err := s.conn.QueryRow(query).Scan(&count) + return count, errors.Err(err) +} + // Block will mark a blob as blocked func (s *SQL) Block(hash string) error { query := "INSERT IGNORE INTO blocked SET hash = ?" @@ -528,8 +689,10 @@ CREATE TABLE blob_ ( hash char(96) NOT NULL, is_stored TINYINT(1) NOT NULL DEFAULT 0, length bigint(20) unsigned DEFAULT NULL, + last_accessed_at TIMESTAMP NULL DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY blob_hash_idx (hash) + KEY `blob_last_accessed_idx` (`last_accessed_at`) ); CREATE TABLE stream ( @@ -560,3 +723,47 @@ CREATE TABLE blocked ( ); */ + +//func (d *LiteDBBackedStore) selfClean() { +// d.stopper.Add(1) +// defer d.stopper.Done() +// lastCleanup := time.Now() +// const cleanupInterval = 10 * time.Second +// for { +// select { +// case <-d.stopper.Ch(): +// log.Infoln("stopping self cleanup") +// return +// default: +// time.Sleep(1 * time.Second) +// } +// if time.Since(lastCleanup) < cleanupInterval { +// continue +// +// blobsCount, err := d.db.BlobsCount() +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// if blobsCount >= d.maxItems { +// itemsToDelete := blobsCount / 100 * 10 +// blobs, err := d.db.GetLRUBlobs(itemsToDelete) +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// for _, hash := range blobs { +// select { +// case <-d.stopper.Ch(): +// return +// default: +// +// } +// err = d.Delete(hash) +// if err != nil { +// log.Errorf(errors.FullTrace(err)) +// } +// metrics.CacheLRUEvictCount.With(metrics.CacheLabels(d.Name(), d.component)).Inc() +// } +// } +// lastCleanup = time.Now() +// } +//} diff --git a/go.mod b/go.mod index d3384a1..4570f1d 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/go-sql-driver/mysql v1.4.1 github.com/golang/protobuf v1.4.2 - github.com/google/btree v1.0.0 // indirect - github.com/google/gops v0.3.7 // indirect + github.com/google/gops v0.3.7 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/golang-lru v0.5.4 @@ -33,7 +32,6 @@ require ( github.com/spf13/afero v1.4.1 // indirect github.com/spf13/cast v1.3.0 github.com/spf13/cobra v0.0.3 - github.com/spf13/pflag v1.0.3 // indirect github.com/spf13/viper v1.7.1 // indirect github.com/stretchr/testify v1.4.0 github.com/volatiletech/null v8.0.0+incompatible diff --git a/store/caching_test.go b/store/caching_test.go index 1168a92..0636583 100644 --- a/store/caching_test.go +++ b/store/caching_test.go @@ -168,3 +168,7 @@ func (s *SlowBlobStore) Delete(hash string) error { time.Sleep(s.delay) return s.mem.Delete(hash) } + +func (s *SlowBlobStore) Shutdown() { + return +} diff --git a/store/dbbacked.go b/store/dbbacked.go index 0ebaaed..1c5088a 100644 --- a/store/dbbacked.go +++ b/store/dbbacked.go @@ -4,10 +4,9 @@ import ( "encoding/json" "sync" - "github.com/lbryio/reflector.go/db" - "github.com/lbryio/lbry.go/v2/extras/errors" "github.com/lbryio/lbry.go/v2/stream" + "github.com/lbryio/reflector.go/db" log "github.com/sirupsen/logrus" )